Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
Expand Down Expand Up @@ -158,7 +157,7 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
log.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;

import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
Expand Down Expand Up @@ -210,7 +209,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -477,7 +476,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(
request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
POP_LOGGER.error("Fail to transfer messages from page cache to {}",
channel.remoteAddress(), future.cause());
Expand Down Expand Up @@ -630,7 +629,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
POP_LOGGER.error("Fail to transfer messages from page cache to {}",
channel.remoteAddress(), future.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
Expand Down Expand Up @@ -122,7 +121,7 @@ public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand r
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode()))
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
LOGGER.error("transfer query message by page cache failed, ", future.cause());
}
Expand Down Expand Up @@ -168,7 +167,7 @@ public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingComman
.put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode()))
.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future))
.build();
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributes);
if (!future.isSuccess()) {
LOGGER.error("Transfer one message from page cache failed, ", future.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.remoting.netty;

import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand All @@ -39,14 +38,14 @@ public NettyDecoder() {
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
Stopwatch timer = Stopwatch.createStarted();
long timerNanos = System.nanoTime();
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
RemotingCommand cmd = RemotingCommand.decode(frame);
cmd.setProcessTimer(timer);
cmd.setProcessTimerNanos(timerNanos);
return cmd;
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
if (request.isOnewayRPC()) {
if (attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
return;
}
Expand All @@ -264,7 +264,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
}
if (remotingMetricsManager != null) {
attributesBuilder.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future));
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
if (callback != null) {
callback.accept(future);
Expand All @@ -276,7 +276,7 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null
log.error(response.toString());
if (remotingMetricsManager != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
}

Expand All @@ -299,7 +299,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
if (request.isOnewayRPC()) {
if (attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
return;
}
Expand All @@ -316,7 +316,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
}
if (this.remotingMetricsManager != null && attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, this.remotingMetricsManager.getWriteAndFlushResult(future));
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
if (callback != null) {
callback.accept(future);
Expand All @@ -328,7 +328,7 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re
log.error(response.toString());
if (this.remotingMetricsManager != null && attributesBuilder != null) {
attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
this.remotingMetricsManager.getRpcLatency().record(request.processTimerElapsedMs(), attributesBuilder.build());
}
}
}
Expand Down Expand Up @@ -396,7 +396,7 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin
AttributesBuilder attributesBuilder = remotingMetricsManager.newAttributesBuilder()
.put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(cmd.getCode()))
.put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED);
remotingMetricsManager.getRpcLatency().record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
remotingMetricsManager.getRpcLatency().record(cmd.processTimerElapsedMs(), attributesBuilder.build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,17 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
log.info("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable());
if (log.isDebugEnabled()) {
log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable());
}
Comment thread
wang-jiahua marked this conversation as resolved.
}
} else {
channel.config().setAutoRead(false);
log.warn("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable());
if (log.isDebugEnabled()) {
log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}",
RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable());
}
Comment thread
wang-jiahua marked this conversation as resolved.
}
super.channelWritabilityChanged(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.rocketmq.remoting.protocol;

import com.alibaba.fastjson2.annotation.JSONField;
import com.google.common.base.Stopwatch;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -31,6 +32,7 @@
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
Expand All @@ -42,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class RemotingCommand {
Expand All @@ -54,6 +57,12 @@ public class RemotingCommand {
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<>();
// Caches the no-arg constructor of each CommandCustomHeader class.
// Why: Class.getDeclaredConstructor() copies the Constructor object on every call
// (sample showed ~70MB of Constructor allocations during a 60s benchmark).
// The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent
// pays the reflective lookup once per class and reuses the cached Constructor thereafter.
private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new ConcurrentHashMap<>();
Comment thread
wang-jiahua marked this conversation as resolved.
// 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<>();
Expand Down Expand Up @@ -97,7 +106,7 @@ public class RemotingCommand {

private transient byte[] body;
private boolean suspended;
private transient Stopwatch processTimer;
private transient long processTimerNanos;
private transient List<CommandCallback> callbackList;

protected RemotingCommand() {
Expand Down Expand Up @@ -159,8 +168,7 @@ public static RemotingCommand createResponseCommand(int code, String remark,

if (classHeader != null) {
try {
CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance();
cmd.customHeader = objectHeader;
cmd.customHeader = newHeaderInstance(classHeader);
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
Expand All @@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int code, String remark,
return cmd;
}

@SuppressWarnings("unchecked")
private static <T> T newHeaderInstance(Class<T> cls)
throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
if (ctor == null) {
ctor = cls.getDeclaredConstructor();
ctor.setAccessible(true);
HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
}
return (T) ctor.newInstance();
Comment thread
wang-jiahua marked this conversation as resolved.
}

public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
Expand Down Expand Up @@ -283,7 +303,7 @@ public <T extends CommandCustomHeader> T decodeCommandCustomHeaderDirectly(Class
boolean useFastEncode) throws RemotingCommandException {
T objectHeader;
try {
objectHeader = classHeader.getDeclaredConstructor().newInstance();
objectHeader = newHeaderInstance(classHeader);
} catch (Exception e) {
return null;
}
Expand Down Expand Up @@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
return result;
}

public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
ByteBuf buf = Unpooled.buffer(192);
int beginIndex = buf.writerIndex();
buf.writeLong(0);
int headerSize;
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
this.makeCustomHeaderToNet();
}
headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf);
} else {
this.makeCustomHeaderToNet();
byte[] header = RemotingSerializable.encode(this);
headerSize = header.length;
buf.writeBytes(header);
}
buf.setInt(beginIndex, 4 + headerSize + bodyLength);
buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
ByteBuffer result = buf.nioBuffer();
buf.release();
return result;
}
Comment thread
wang-jiahua marked this conversation as resolved.

public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
Expand Down Expand Up @@ -611,7 +654,10 @@ public void setExtFields(HashMap<String, String> extFields) {

public void addExtField(String key, String value) {
if (null == extFields) {
extFields = new HashMap<>(256);
// Default capacity (16) is plenty for the typical few extFields plus
// a CustomHeader's reflected fields. Capacity 256 was 16x oversized
// and allocated a Node[256] per command on the send/response path.
extFields = new HashMap<>();
}
extFields.put(key, value);
}
Expand All @@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}

public Stopwatch getProcessTimer() {
return processTimer;
public long processTimerElapsedMs() {
return (System.nanoTime() - processTimerNanos) / 1_000_000;
}

public void setProcessTimer(Stopwatch processTimer) {
this.processTimer = processTimer;
public void setProcessTimerNanos(long nanos) {
this.processTimerNanos = nanos;
}
Comment thread
wang-jiahua marked this conversation as resolved.

public List<CommandCallback> getCallbackList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;

public class TopicQueueMappingContext {
public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null);

private String topic;
private Integer globalId;
private TopicQueueMappingDetail mappingDetail;
Comment thread
wang-jiahua marked this conversation as resolved.
Expand Down
Loading