RocketMQ 技术内幕:消息消费
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可以订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。广播模式,主题下的同一条消息将被集群的所有消费者消费一次。消息服务器与消费者之间的消费传递也有两种方式:推模式、拉模式。所谓拉模式,是消费端主动发起拉消息请求,而推模式是消息达到消息服务器后,推送给消息消费者。RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
RocketMQ 支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。
消费者启动流程
Step 1:构建主题订阅信息 SubscriptiionData 并加入 RebalanceImpl 的订阅消息中。
Step 2:初始化 MQClientInstance、RebalanceImpl 等。
Step 3:初始化消息进度。如果消息消费是集群模式,那么消息进度保存在 Broker 上;如果是广播模式,那么消息消费进度存储在消费端。
Step 4:根据是否顺序消费,创建消息端消费线程服务。
Step 5:向 MQClientInstance 注册消费者,并启动 MQClientInstance,在一个 JVM 中的所有消费者、生产者持有一个 MQClientInstance,MQClientInstance 只会启动一次。
消息拉取
RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。
PullMessageService 实现机制
public void run() {
while (!this.isStopped()) {
try {
PullRequest plllRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(plllRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
}
}
}
ProcessQueue 实现机制
ProcessQueue 是 MessageQueue 在消费端的重新、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。
This chapter requires login to view full content. You are viewing a preview.
Login to View Full Content