RocketMQ 技术内幕:消息消费

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可以订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。广播模式,主题下的同一条消息将被集群的所有消费者消费一次。消息服务器与消费者之间的消费传递也有两种方式:推模式、拉模式。所谓拉模式,是消费端主动发起拉消息请求,而推模式是消息达到消息服务器后,推送给消息消费者。RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

RocketMQ 支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。

消费者启动流程

Step 1:构建主题订阅信息 SubscriptiionData 并加入 RebalanceImpl 的订阅消息中。

Step 2:初始化 MQClientInstance、RebalanceImpl 等。

Step 3:初始化消息进度。如果消息消费是集群模式,那么消息进度保存在 Broker 上;如果是广播模式,那么消息消费进度存储在消费端。

Step 4:根据是否顺序消费,创建消息端消费线程服务。

Step 5:向 MQClientInstance 注册消费者,并启动 MQClientInstance,在一个 JVM 中的所有消费者、生产者持有一个 MQClientInstance,MQClientInstance 只会启动一次。

消息拉取

RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。

PullMessageService 实现机制

public void run() {
  while (!this.isStopped()) {
    try {
      PullRequest plllRequest = this.pullRequestQueue.take();
      if (pullRequest != null) {
        this.pullMessage(plllRequest);
      }
    } catch (InterruptedException e) {
    } catch (Exception e) {
    }
  }
}

ProcessQueue 实现机制

ProcessQueue 是 MessageQueue 在消费端的重新、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。

This chapter requires login to view full content. You are viewing a preview.

Login to View Full Content

Course Curriculum

3

框架与 I/O:Spring、Netty 与 Web 容器

理解 Spring Boot 自动装配、AOP 与事务原理,掌握 Netty Reactor 模型及 Tomcat 连接处理机制,构建高内聚、易扩展的应用服务层。
4

高性能中间件:消息、缓存与存储

熟练运用 MySQL 索引/事务、Redis 缓存策略、Kafka/RocketMQ 消息可靠性,以及 ZooKeeper 分布式协调,搭建稳定、解耦的分布式数据底座。
6

云原生:容器化、可观测性与工程效能

通过 Docker/K8s 实现弹性部署,集成 Metrics/Logs/Traces 构建可观测体系,推动 DevOps 与自动化,让架构在云上持续交付与进化。