分布式消息队列rocketmq概念详解-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

1.mq概述

1.1 rocketmq简介

rocketmq 是阿里开源的分布式消息中间件,跟其它中间件相比,rocketmq 的特点是纯java实现,是一套提供了消息生产,存储,消费全过程api的软件系统。

1.2 mq用途

限流削峰

mq可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

 异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个mq层。

 数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过mq完成此类数据收集是最好的选择。

1.3 常见mq产品

rabbitmq
rabbitmq是使用erlang语言开发的一款mq产品。其吞吐量较kafka与rocketmq要低,且由于其不是java语言开发,所以公司内部对其实现定制化开发难度较大。
kafka
kafka是使用scala/java语言开发的一款mq产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的mq协议,而是使用自研协议。
rocketmq
rocketmq是使用java语言开发的一款mq产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的mq协议,而是使用自研协议。

对比

2.rocketmq 基本概念

2.1 消息

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

rocketmq中每个消息拥有唯一的messageid,且可以携带具有业务标识的key,以方便对消息的查询。不过需要注意的是,messageid有两个:在生产者send()消息时会自动生成一个messageid(msgid),当消息到达broker后,broker也会自动生成一个messageid(offsetmsgid)。msgid、offsetmsgid与key都称为消息标识。 

msgid:由producer端生成,其生成规则为: producerip 进程pid messageclientidsetter类的classloader的hashcode 当前时间 automicinteger自增计数器 
offsetmsgid:由broker端生成,其生成规则为:brokerip 物理分区的offset(queue中的偏移量) 
key:由用户指定的业务相关的唯一标识

2.2 主题

topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是rocketmq进行消息订阅的基本单位。 一个生产者可以同时发送多种topic的消息;而一个消费者只对某种特定的topic感兴趣,即只可以订阅和消费一种topic的消息。 

2.3 标签

标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化rocketmq提供的查询系统。消费者可以根据tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 topic是消息的一级分类,tag是消息的二级分类。topic相当于货物,tag相当于上海山东等地区。

2.4 队列

存储消息的物理实体。 一个topic中可以包含多个queue,每个queue中存放的就是该topic的消息。 一个topic的queue也被称为一个topic中消息的分区(partition)。 一个topic的queue中的消息只能被一个消费者组中的一个消费者消费。 一个queue中的消息不允许同一个消费者组中的多个消费者同时消费。

分片不同于分区。在rocketmq中,分片指的是存放相应topic的broker。每个分片中会创建出相应数量的分区,即queue,每个queue的大小都是相同的。

2.5 producer

消息生产者,负责生产消息。producer通过mq的负载均衡模块选择相应的broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到mq的过程,就是消息生产的过程,在这里用户就是生产者 。

rocketmq中的消息生产者都是以生产者组(producer group)的形式出现的。生产者组是同一类生产者的集合,这类producer发送相同topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:

导入依赖

       
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.0.2
        

 生产者代码

  public static void main(string[] args) throws mqclientexception, remotingexception, interruptedexception, mqbrokerexception {
        defaultmqproducer order = new defaultmqproducer("order");
        order.setnamesrvaddr("localhost:9876");
        order.start();
        message message = new message("mytopic", "mytag", ("test").getbytes());
        sendresult result = order.send(message);
        system.out.println(result);
        order.shutdown();
    }

2.6 consumer

消息消费者,负责消费消息。一个消息消费者会从broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从mq中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。  

rocketmq中的消息消费者都是以消费者组(consumer group)的形式出现的。消费者组是同一类消费者的集合,这类consumer消费的是同一个topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个topic中的不同的queue平均分配给同一个consumer group的不同的consumer,注意,并不是将消息负载均衡)和容错(一个consmer挂了,该consumer group中的其它consumer可以接着消费原consumer消费的queue)的目标变得非常容易。

消费者代码

  public static void main(string[] args) throws mqclientexception {
 
        defaultmqpushconsumer consumer = new defaultmqpushconsumer("order");
        consumer.setnamesrvaddr("localhost:9876");
        consumer.subscribe("mytopic","*");
        consumer.registermessagelistener(new messagelistenerconcurrently() {
            public consumeconcurrentlystatus consumemessage(list list, consumeconcurrentlycontext consumeconcurrentlycontext) {
                system.out.println("收到的消息" list);
                return consumeconcurrentlystatus.consume_success;
            }
        });
        consumer.start();
 
    }

负载均衡策略

queue 个数大于 consumer个数, 那么 consumer 会平均分配 queue,不够平均,会根据clientid排序来拿取余数
queue个数小于consumer个数,那么会有consumer闲置,就是浪费掉了,其余consumer平均分配到queue

消费者组中consumer的数量应该小于等于订阅topic的queue数量。如果超出queue数量,则多出的consumer将不能消费消息。

2.7 nameserver

nameserver是一个broker与topic路由的注册中心,支持broker的动态注册与发现。 
主要包括两个功能: 
broker管理:接受broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查broker是否还存活。

路由信息管理:每个nameserver中都保存着broker集群的整个路由信息和用于客户端查询的队列信息。producer和conumser通过nameserver可以获取整个broker集群的路由信息,从而进行消息的投递和消费。nameserver可以获取整个broker集群的路由信息,从而进行消息的投递和消费。 

路由注册 

name server既然是注册中心,那么是如何完成注册的呢? nameserver通常也是以集群的方式部署,不过,nameserver是无状态的,即nameserver集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在broker节点启动时,轮询nameserver列表,与每个nameserver节点建立长连接,发起注册请求。在nameserver内部维护着⼀个broker列表,用来动态存储broker的信息。  

broker节点为了证明自己是活着的,为了维护与nameserver间的长连接,会将最新的信息以心跳包的方式上报给nameserver,每30秒发送一次心跳。心跳包中包含 brokerid、broker地址(ip port)、broker名称、broker所属集群名称等等。nameserver在接收到心跳包后,会更新心跳时间戳,记录这个broker的最新存活时间。 

路由剔除 
由于broker关机、宕机或网络抖动等原因,nameserver没有收到broker的心跳,nameserver可能会将其从broker列表中剔除。 nameserver中有⼀个定时任务,每隔10秒就会扫描⼀次broker表,查看每一个broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定broker失效,然后将其从broker列表中剔除。 

路由发现 
rocketmq的路由发现采用的是pull模型。当topic路由信息出现变化时,nameserver不会主动推送给客户端,而是客户端定时拉取topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。

2.8 broker

broker充当着消息中转角色,负责存储消息、转发消息。
broker在rocketmq系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

remoting module:整个broker的实体,负责处理来自clients端的请求。而这个broker实体则由以下模块构成。

client manager:客户端管理器。负责接收、解析客户端(producer/consumer)请求,管理客户端。例如,维护consumer的topic订阅信息

store service:存储服务。提供方便简单的api接口,处理消息存储到物理硬盘和消息查询功能。

ha service:高可用服务,提供master broker 和 slave broker之间的数据同步功能。

index service:索引服务。根据特定的message key,对投递到broker的消息进行索引服务,同时也提供根据message key对消息进行快速查询的功能。

2.9 rocketmq 工作流程

工作流程如下图:

1)启动nameserver,nameserver启动后开始监听端口,等待broker、producer、consumer连接。

2)启动broker时,broker会与所有的nameserver建立并保持长连接,然后每50秒向nameserver定时发送心跳包。

3)发送消息前,可以先创建topic,创建topic时需要指定该topic要存储在哪些broker上,当然,在创建topic时也会将topic与broker的关系写入到nameserver中。不过,这步是可选的,也可以在发送消息时自动创建topic。

4) producer发送消息,启动时先跟nameserver集群中的其中一台建立长连接,并从nameserver中获取路由信息,即当前发送的topic消息的queue与broker的地址(ip port)的映射关系。然后根据算法策略从队选择一个queue,与队列所在的broker建立长连接从而向broker发消息。当然,在获取到路由信息后,producer会首先将路由信息缓存到本地,再每30秒从nameserver更新一次路由信息。

5)consumer跟producer类似,跟其中一台nameserver建立长连接,获取其所订阅topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的queue,然后直接跟broker建立长连接,开始消费其中的消息。consumer在获取到路由信息后,同样也会每30秒从nameserver更新一次路由信息。不过不同于producer的是,consumer还会向broker发送心跳,以确保broker的存活状态。

以上就是分布式消息队列rocketmq概念详解的详细内容,更多关于rocketmq概念的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图