为了方便跟踪消息发送和消费的轨迹,RocketMQ 引入了轨迹消息,今天一起来学习一下。
默认情况下,RocketMQ 是不开启轨迹消息的,需要我们手工开启。
Broker 端开启轨迹消息,需要增加下面的配置:
traceTopicEnable=true
对于生产者端,要开启轨迹消息,需要在定义生产者时增加参数。定义消费者使用类 DefaultMQProducer,这个类支持开启轨迹消息的构造函数如下:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)
从上面的构造函数可以看出,自定义消费者时,不仅可以定义开启轨迹消息,还可以指定轨迹消息发送的 Topic。如果不指定轨迹消息的 Topic,默认发送的 Topic 是 RMQ_SYS_TRACE_TOPIC。
对于消费者,要开启轨迹消息,需要在定义消费者时增加参数。定义消费者使用类 DefaultMQPushConsumer,这个类支持开启轨迹消息的构造函数如下:
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
首先看一个支持轨迹消息的生产者示例:
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true, "");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
下面是一张生产者端的 UML 类图:
在 DefaultMQProducer 创建时,会初始化 defaultMQProducerImpl、traceDispatcher 和钩子函数 SendMessageHook。
生产者初始化代码如下:
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
//注册轨迹消息钩子函数
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
//省略事务消息的钩子注册
} catch (Throwable e) {
}
}
}
初始化的代码中,传入了是否开启轨迹消息(enableMsgTrace)和自定义轨迹消息的 Topic(customizedTraceTopic),同时初始化了 traceDispatcher 并注册了钩子函数 SendMessageTraceHook。
生产者启动时 defaultMQProducerImpl 和 traceDispatcher 也会启动,代码如下:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
生产者初始化的时候初始化了 traceDispatcher。traceDispatcher 是轨迹消息的处理器,AsyncTraceDispatcher 构造函数定义一个专门发送轨迹消息的生产者 traceProducer(DefaultMQProducer 类型)。
注意:traceProducer 发送消息的最大值 maxMessageSize 是 128k,虽然 maxMessageSize 初始值被定义为 4M,但是创建 traceProducer 时赋值 128k。
上面提到,生产者启动时 traceDispatcher 也会启动,看一下它的启动方法:
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
可以看到,traceDispatcher 的启动首先启动了 traceProducer,然后启动了一个异步线程 AsyncRunnable,下面看一下 run 方法:
public void run() {
while (!stopped) {
//batchSize=100
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
//traceContextQueue队列长度等于1024
synchronized (traceContextQueue) {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue - traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
从上面的代码可以看到,每次从 traceContextQueue 中拉取 100 条 TraceContext,然后通过 AsyncAppenderRequest 异步发送出去。
注意:
看到这里相信你一定会有一个疑问,traceContextQueue 中的消息是从哪儿来的呢?答案是生产者初始化时定义的 SendMessageTraceHook。
看一下发送消息的代码:
//DefaultMQProducerImpl 类
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略部分代码
SendMessageContext context = null;
if (brokerAddr != null) {
try {
//省略部分代码
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
//1.发送消息前执行钩子函数
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//省略requestHeader封装代码
SendResult sendResult = null;
//-------------2.这里发送消息-------------
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
//3.发送消息后执行钩子函数
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
//catch和finally省略
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
由于 sendKernelImpl 代码比较多,我这里只贴了骨架代码。我在上面加了注释,可以看到在发送消息前后都会执行钩子函数。
在发送消息前,通过调用钩子函数封装一个轨迹消息。发送消息后,再通过钩子函数对轨迹消息进行完善,主要加入消息发送结果、发送消息花费时间等属性,然后把轨迹消息加到 traceContextQueue 上。轨迹消息包含的内容如下图:
轨迹消息的内容比较多,包含了发送消息的详细信息,比如:Topic、Message、MessageQueue、Group、生产者地址(clientHost)、消息发送结果等。
轨迹消息发送到 Broker 后,会保存到 Broker 上,默认保存的 Topic 是 RMQ_SYS_TRACE_TOPIC。Broker 启动时,会自动初始化默认 Topic 的路由配置,代码如下:
//TopicConfigManager 类
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
前面提到过,生产者也可以自己定义轨迹消息 Topic,不过需要在 Broker 上提前创建好自定义的 Topic。
如果想要轨迹消息和业务消息隔离,可以专门用一个 Broker 来保存轨迹消息,这样需要单独在这个 Broker 上开启轨迹消息。
消费端对轨迹消息的处理跟生产端非常类似。首先我们看一下消费端处理的 UML 类图:
我们以推模式处理并发消息为例,ConsumeMessageConcurrentlyService 在消费消息前,通过 DefaultMQPushConsumerImpl 调用了钩子函数 executeHookBefore,消费消息后通过 DefaultMQPushConsumerImpl 调用了钩子函数 executeHookAfter。代码如下:
//ConsumeMessageConcurrentlyService 类
public void run() {
//省略部分逻辑
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
//1.消费消息前执行钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
//省略部分逻辑
try {
//2.消费消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
//省略部分逻辑
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
//3.消费消息前执行钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
//省略部分逻辑
}
如果消费端开启轨迹消息,就会初始化 traceDispatcher 并且注册钩子函数。
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
可以看到,traceDispatcher 跟生产者使用的都是 AsyncTraceDispatcher,处理逻辑完全一样。
同样,钩子函数的使用跟生产者也类似,在消费消息之前调用钩子函数(executeHookBefore)封装轨迹消息,在消费消息之后再次调用钩子函数(executeHookAfter)完善轨迹消息。消费端轨迹消息的内容如下图:
本文主要讲解了 RocketMQ 的轨迹消息实现机制。轨迹消息分为生产端和消费端的轨迹消息,生产端和消费端 RocketMQ 都提供了构造函数来指定是否开启轨迹消息。通过钩子函数,把轨迹消息加入队列,也就是变量 traceContextQueue,而 traceDispatcher 则以 100 条为单位不停地从队列中拉取消息进行组装并发送到 Broker。如下图:
理解了 traceDispatcher 和钩子函数 ,就很容易理解 RocketMQ 轨迹消息的处理逻辑了。
在 Broker 端,则通过增加配置参数 traceTopicEnable 来指定是否存储轨迹消息。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/G0bfuLzonDPncoVC_dx1IA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。