rocketmq之consumer整体介绍启动源码分析-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

前言

从本篇文章开始,我们将逐步开始分析consumer的源码,首先我们将整体介绍consumer的接口和相关实现类以及defaultmqpushconsumer的主要api和关键属性,然后我们将分析consumer的启动过程源码,通过对启动过程的分析,之前我们分析过producer和broker的启动源码,consumer的启动源码与producer还是有很多相似的地方。

consumer整体介绍

consumer实现类

rocketmq给我们提供的consumer实现类如下图所示,包括推送式的defaultmqpushconsumer和拉取式的defaultmqpullconsumerdefaultlitepullconsumer,从图中可以看到defaultmqpullconsumer已经被标注为deprecated,如果需要使用拉取式的consumer,官方推荐使用defaultlitepullconsumer。

consumer消费类型

  • 拉取式消费

consumer主动从broker拉去消息,消费消息的主动权由consumer控制。一旦获取了批量消息,就会启动消费过程。不过这种方式实时性较弱,即broker中有了新的消息时消费者并不能及时发现并消费。

  • 推送式消费

该模式下broker收到数据后会主动推送给consumer,这种方式一般实时性比较高。

rocketmq官方更推荐我们在日常工作中使用defaultmqpushconsumer,它已经能够满足我们大多数使用场景。从技术上讲,这个defaultmqpushconsumer客户端实际上是底层拉取服务的包装器。当从代理中提取的消息到达时,它大致调用注册的回调处理程序来馈送消息。本篇文章,我们将介绍defaultmqpushconsumer的启动流程

defaultmqpushconsumer主要api

defaultmqpushconsumer实现了mqconsumer和mqpushconsumer接口,defaultmqpushconsumer的主要api都在这两个接口中定义了,如下所示

// org.apache.rocketmq.client.consumer.mqconsumer
public interface mqconsumer extends mqadmin {
    // 如果消费失败,消息将被发送回代理,并延迟消耗一些时间
    void sendmessageback(final messageext msg/*消息*/, final int delaylevel/*延迟级别*/, final string brokername);  
    // 根据topic从使用者缓存中获取消息队列
    set fetchsubscribemessagequeues(final string topic) throws mqclientexception;
}
// org.apache.rocketmq.client.consumer.mqpushconsumer
public interface mqpushconsumer extends mqconsumer {
    // 启动consumer
    void start() throws mqclientexception;
    // 关闭consumer
    void shutdown();
    // 注册并发消息listener
    void registermessagelistener(final messagelistenerconcurrently messagelistener);
    // 注册顺序消息listener,将会有序地接收消息。一个队列一个线程
    void registermessagelistener(final messagelistenerorderly messagelistener);
    // 订阅topic
    void subscribe(final string topic, final string subexpression) throws mqclientexception;
    // 退订topic
    void unsubscribe(final string topic);
}

defaultmqpushconsumer关键属性

defaultmqpushconsumer的关键属性如下所示

// org.apache.rocketmq.client.consumer.defaultmqpushconsumer
public class defaultmqpushconsumer extends clientconfig implements mqpushconsumer {
    // defaultmqpushconsumer的默认实现,defaultmqpushconsumer中大部分功能都是对它的代理
    protected final transient defaultmqpushconsumerimpl defaultmqpushconsumerimpl;
    // 相同角色的消费者需要具有完全相同的subscriptions和consumergroup才能正确实现负载平衡,它需要全局唯一
    private string consumergroup;
    // 消息模型定义了如何将消息传递到每个消费者客户端的方式,默认是集群模式
    private messagemodel messagemodel = messagemodel.clustering;
  	// 第一次消费时指定的消费策略,默认是consume_from_last_offset
    private consumefromwhere consumefromwhere = consumefromwhere.consume_from_last_offset;
  	// 队列分配算法,指定如何将消息队列分配给每个使用者客户端。
    private allocatemessagequeuestrategy allocatemessagequeuestrategy;
    // 订阅关系
    private map subscription = new hashmap();
    // 消息监听器
    private messagelistener messagelistener;
    // 消息消费进度存储器
    private offsetstore offsetstore;
  	// 最小消费线程数
    private int consumethreadmin = 20;
    // 最大消费线程数
    private int consumethreadmax = 20;
    // 推送模式下拉去消息的间隔时间,默认一次拉取消息完成后立刻继续拉取
    private long pullinterval = 0;
    // 批量消费数量
    private int consumemessagebatchmaxsize = 1;
    // 批量拉取的数量
    private int pullbatchsize = 32;
  	// 每次拉取时是否更新订阅关系,默认是false
    private boolean postsubscriptionwhenpull = false;
    // 消息最大重试次数,如果消息消费最大次数超过maxreconsumetimes还未成功,则消息将被转移到一个失败队列
    private int maxreconsumetimes = -1;
    //延迟将该队列的消息提交到消费者线程的等待时间,默认延迟1s
    private long suspendcurrentqueuetimemillis = 1000;
    // 消息阻塞消费线程的最大超时时间,默认15分钟
    private long consumetimeout = 15;
    // 关闭使用者时等待消息的最长时间,0表示没有等待。
    private long awaitterminationmilliswhenshutdown = 0;
}

consumer消费模式

consumer提供下面两种消费模式,由上面defaultmqpushconsumer的messagemodel定义

  • 广播模式(broadcasting)

广播消费模式下,相同consumer group的每个consumer实例都接收同一个topic的全量消息。即每条消息会被相同consumer group中的所有consumer消费

  • 集群模式(clustering)

集群模式是consumer默认的消费模式,集群消费模式下,相同consumer group的每个consumer按照负载均衡策略分摊同一个topic消息,即每条消息只会被相同consumer group中的一个consumer消费

consumer消费策略

consumer主要提了下面三种消费策略

  • consume_from_last_offset

这是consumer默认的消费策略,它分为两种情况,如果broker的磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期,并被删除,则从最大偏移量开始消费。

  • consume_from_first_offset

从最早可用的消息开始消费

  • consume_from_timestamp

从指定的时间戳开始消费,这意味着在consumetimestamp之前生成的消息将被忽略

consumer使用

要使用consumer开始消费消息,至少需要下面5个步骤

public static void main(string[] args) throws mqclientexception {
    // 1. 传入consumer_group,创建defaultmqpushconsumer
    defaultmqpushconsumer consumer = new defaultmqpushconsumer(consumer_group);
  	// 2. 设置namesrvaddr
  	consumer.setnamesrvaddr("127.0.0.1:9876");
    // 3. 订阅topic 
    consumer.subscribe(topic, "*");
    // 4.注册消息listener
    consumer.registermessagelistener((messagelistenerconcurrently) (msg, context) -> {
        system.out.printf("%s receive new messages: %s %n", thread.currentthread().getname(), msg);
        return consumeconcurrentlystatus.consume_success;
    });
    // 5.启动consumer
    consumer.start();
}

defaultmqpushconsumer源码分析

启动源码分析

defaultmqpushconsumer只是设置属性,consumer的初始化实际是在defaultmqpushconsumer#start中执行的,defaultmqpushconsumer#start实际调用了defaultmqpushconsumerimpl#start执行初始化。

// org.apache.rocketmq.client.consumer.defaultmqpushconsumer#start
public void start() throws mqclientexception {
    // consumergroup封装namespace
    setconsumergroup(namespaceutil.wrapnamespace(this.getnamespace(), this.consumergroup));
    // defaultmqpushconsumerimpl启动
    this.defaultmqpushconsumerimpl.start();
    // 消息轨迹跟踪服务,默认null
    if (null != tracedispatcher) {
        try {
            tracedispatcher.start(this.getnamesrvaddr(), this.getaccesschannel());
        } catch (mqclientexception e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

下面我们来分步骤分析defaultmqpushconsumerimpl#start代码

第一步:

  • 先将consumer的状态更新为start_failed
  • 校验consumer的配置。主要校验consumergroup,
  • 消费模式校验(messagemodel),消费开始位置(consumefromwhere),消费时间戳(默认是半小时之前),队列分配策略(默认是allocatemessagequeueaveragely),订阅topic和subscription关系校验,消息监听器(messagelistener)校验等。
  • 将consumer中的订阅关系拷贝到rebalanceimpl中,consumer中订阅关系的来源主要包括defaultmqpushconsumerimpl#subscribe方法获取,也会订阅重试topic,其主题名为%retry% 消费者组名,消费者启动时会自动订阅该主题
  • 如果是集群模式,则修改消费者名称为pid#时间戳
// org.apache.rocketmq.client.impl.consumer.defaultmqpushconsumerimpl#start
public synchronized void start() throws mqclientexception {
  //...
  // 状态先设置为启动失败
  this.servicestate = servicestate.start_failed;
  // 校验配置,consumergroup校验,
  this.checkconfig();
  // 订阅关系copy到rebalanceimpl中
  this.copysubscription();
  // 如果是集群模式,消费者名称如果是default,则会改成:pid#时间戳
  if (this.defaultmqpushconsumer.getmessagemodel() == messagemodel.clustering) {
      this.defaultmqpushconsumer.changeinstancenametopid();
  }
  //...
}

第二步:

主要是初始化mqclientinstance、rebalanceimpl和pullapiwrapper。

**mqclientinstance:**是消息拉取服务,主要用于拉取消息,同一个进程内的所有consumer会使用同一个mqclientinstance

**rebalanceimpl:**是消费者负载均衡服务,用于确定消费者消费的消息队列以及负载均衡。

// org.apache.rocketmq.client.impl.consumer.defaultmqpushconsumerimpl#start
// 生成一个mqclientinstance
this.mqclientfactory = mqclientmanager.getinstance().getorcreatemqclientinstance(this.defaultmqpushconsumer, this.rpchook);
// 设置消费者组
this.rebalanceimpl.setconsumergroup(this.defaultmqpushconsumer.getconsumergroup());
// 消息消费模式
this.rebalanceimpl.setmessagemodel(this.defaultmqpushconsumer.getmessagemodel());
// 设置消息消费模式
this.rebalanceimpl.setallocatemessagequeuestrategy(this.defaultmqpushconsumer.getallocatemessagequeuestrategy());
// 设置mqclientinstance
this.rebalanceimpl.setmqclientfactory(this.mqclientfactory);
// 构建拉消息包装器
this.pullapiwrapper = new pullapiwrapper(
    mqclientfactory,
    this.defaultmqpushconsumer.getconsumergroup(), isunitmode());
this.pullapiwrapper.registerfiltermessagehook(filtermessagehooklist);

第三步:

根据消息消费模式的不同设置不同的消息消费进度存储器(offsetstore),如果是广播模式,则使用localfileoffsetstore作为消息进度存储器,如果是集群模式则使用remotebrokeroffsetstore作为消息进度存储器。创建完成之后调用load()方法加载偏移量,如果是localfileoffsetstore将会从本地加载。

广播模式下:localfileoffsetstore将消费进度存储在consumer本地的${user.home}/.rocketmq_offsets/clientid/consumergroup/offsets.json文件中

集群模式下:remotebrokeroffsetstore将消费进度存储在broker

// org.apache.rocketmq.client.impl.consumer.defaultmqpushconsumerimpl#start
if (this.defaultmqpushconsumer.getoffsetstore() != null) {
    this.offsetstore = this.defaultmqpushconsumer.getoffsetstore();
} else {
    switch (this.defaultmqpushconsumer.getmessagemodel()) {
        case broadcasting:
            // 如果是广播模式,则使用localfileoffsetstore存储偏移量
            this.offsetstore = new localfileoffsetstore(this.mqclientfactory, this.defaultmqpushconsumer.getconsumergroup());
            break;
        case clustering:
            // 如果是集群模式,则使用remotebrokeroffsetstore存储偏移量
            this.offsetstore = new remotebrokeroffsetstore(this.mqclientfactory, this.defaultmqpushconsumer.getconsumergroup());
            break;
        default:
            break;
    }
    this.defaultmqpushconsumer.setoffsetstore(this.offsetstore);
}
// 如果是广播模式,则从本地文件load偏移量,如果是集群模式则是一个空实现
this.offsetstore.load();

第四步:

根据消息监听器的类型不同创建不同的消息消费服务(并发/顺序消息消费服务),并启动。然后注册消费者组和消费者信息到mqclientinstance中的consumertable中,注册成功后启动mqclientinstance客户端通信实例。

// org.apache.rocketmq.client.impl.consumer.defaultmqpushconsumerimpl#start
// 如果是顺序消费
if (this.getmessagelistenerinner() instanceof messagelistenerorderly) {
    this.consumeorderly = true;
    this.consumemessageservice =
        new consumemessageorderlyservice(this, (messagelistenerorderly) this.getmessagelistenerinner());
// 如果是并发消费
} else if (this.getmessagelistenerinner() instanceof messagelistenerconcurrently) {
    this.consumeorderly = false;
    this.consumemessageservice =
        new consumemessageconcurrentlyservice(this, (messagelistenerconcurrently) this.getmessagelistenerinner());
}
this.consumemessageservice.start();
// 将自身注册到mqclientinstance
boolean registerok = mqclientfactory.registerconsumer(this.defaultmqpushconsumer.getconsumergroup(), this);
// ...
mqclientfactory.start();

第五步:

// org.apache.rocketmq.client.impl.consumer.defaultmqpushconsumerimpl#start
// 向namesrv拉取并更新当前消费者订阅topic路由信息
this.updatetopicsubscribeinfowhensubscriptionchanged();
// 随机选择一个broker,发送检查客户端tag配置的请求,主要是检测broker是否支持sql92类型的tag过滤以及sql92的tag语法是否正确
this.mqclientfactory.checkclientinbroker();
// 给所有broker发送心跳
this.mqclientfactory.sendheartbeattoallbrokerwithlock();
// 唤醒负载均衡服务rebalanceservice,并进行rebalance
this.mqclientfactory.rebalanceimmediately();

总结

本篇文章我们介绍了consumer的api,属性,接口和实现类,通过对这几部分的了解,我们能够对consumer有一个整体的认识。我们还分析了defaultmqpushconsumer的启动的源码,通过对defaultmqpushconsumer#start开始逐渐深入分析defaultmqpushconsumer的启动过程,能够帮助我们对consumer消费消息一些关键的类如mqclientinstance,offsetstore,rebalanceimpl,consumemessageservice由一个初步的认识,由助于我们后续详细了解这些服务的工作原理。

以上就是rocketmq 源码分析之consumer整体介绍启动分析的详细内容,更多关于rocketmq consumer源码解析的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图