rocketmq consumequeue与indexfile实时更新机制源码解析-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

前言

前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是commitlog是如何存储和刷盘的。虽然commitlog顺序存储着所有消息,但是commitlog中的消息并没有区分topic、keys等,如果需要消费某个topic的消息或者查找某一条消息只能遍历commitlog文件去查找,性能相当低下,因此有了consumelog和indexfile两个文件类型,这两个文件的作用主要是提升消息消费和查询的性能。

consumequeue详解

为了提高消费消息查询性能,broker会为每个topic在~/store/consumequeue中创建一个topic名称的目录,并再为该topic创建目录名为queueid的目录,每个目录存放着若干consumequeue文件,consumequeue属于commitlog的索引文件,可以根据consumequeue定位到具体的消息,consumequeue存储文件见下图

consumequeue文件名由20位数字构成,表示当前文件的第一个索引条目的起始偏移量。与commitlog文件名不同的是,consumequeue后续文件名是固定的,由于consumequeue文件大小是固定不变的。

consumequeue文件大小由mappedfilesizeconsumequeue配置控制,它的默认大小是30w * consumequeue.cq_store_unit_size(20),也就是600w字节大小,consumequeue.cq_store_unit_size是consumequeue每个索引条目的大小,每隔索引条目包含了三个消息的重要属性:消息在mappedfile文件中的物理偏移量(8字节)、消息的长度(4字节)、消息tag的hashcode值,这三个属性占了20个字节,单个索引条目结构如下图所示

indexfile详解

rocketmq除了提供消息的topic给消息消费外,rocketmq还提供了根据key来查找消息的功能,producer创建消息时可以传入keys值,用于快速查找消息。

// 构建message参数
message msg = new message("topictest",  // 消息topic
    "taga",															// 消息tag
    "key1 key2 key3",										// 消息keys,多个key用" "隔开
    "hello linshifu!".getbytes(remotinghelper.default_charset)); // 消息体

indexfile可以看做是一个key的哈希索引文件,通过计算key的hash值,快速找到某个key对应的消息在commitlog中的位置。indexfile由下面三个部分构成:

  • indexheader
  • slots槽位
  • indexes索引数据

indexfile结构如下图所示

每个indexfile的长度是固定的,其中indexheader占用40字节,slots占用500w * 4字节,index索引数据占用2000w * 20字节

indexheader

indexheader占用indexfile的前40个字节,它主要存储着indexfile索引文件的相关信息,indexheader包含如下属性

// org.apache.rocketmq.store.index.indexheader
public class indexheader {
    // 索引文件第一条消息在commitlog中的存储时间
    private final atomiclong begintimestamp = new atomiclong(0);
    // 索引文件最后一条消息在commitlog中的存储时间
    private final atomiclong endtimestamp = new atomiclong(0);
    // 索引文件第一条消息的偏移量
    private final atomiclong beginphyoffset = new atomiclong(0);
    // 索引文件最后一条消息的偏移量
    private final atomiclong endphyoffset = new atomiclong(0);
    // 已经填充slot的hash槽数量
    private final atomicinteger hashslotcount = new atomicinteger(0);
    // 该indexfile种包含的索引单元数量
    private final atomicinteger indexcount = new atomicinteger(1);
}

数据结构如下图所示

slots槽位

在indexfile中间部分存储的是indexflie中key的hash槽,每个hash槽存储的是index索引单元的indexno,添加索引时会将key的hash值p0w的结果计算哈希槽序号,然后将index索引单元的indexno放入slot槽中,indexno是int类型,slots槽位总共有500w个,因此slots槽位占用的大小是500w * 4=2000w

indexes索引数据

index索引由2000w个索引单元构成,每个索引单元大小为20字节,每隔索引单元由下面四个部分构成

  • keyhash

keyhash是消息索引key的hash值

  • phyoffet

phyoffset是当前key对应消息在commitlog中的偏移量commitlog offset

  • timediff

timediff是当前key对应消息存储时间与当前indexfile第一个索引存储时间差

  • preindex

当前slot的index索引单元的前一个索引单元的indexno

索引单元数据结构如下

实时更新consumequeue与indexfile源码分析

之前的文章我们只了解了broker的commitlog文件保存和刷盘的流程,现在我们来了解broker实时更新consumequeue和indexfile的流程。

消息保存的过程仅仅会保存commitlog,consumequeue文件及indexfile中的数据是通过reputmessageservice将commitlog中的消息转发到consumequeue及indexfile。

reputmessageservice和之前的刷盘服务类似,都是异步线程执行的服务。reputmessageservice是defaultmessagestore的一个内部类,它跟随者消息存储对象defaultmessagestore创建时共同创建。reputmessageservice刷新consumequeue与indexfile的逻辑可以从它的run()方法开始分析。

// org.apache.rocketmq.store.defaultmessagestore.reputmessageservice#run
@override
public void run() {
		// 死循环
    while (!this.isstopped()) {
        try {
             // 睡眠1ms
            thread.sleep(1);
						// 更新consumequeue和indexfile
            this.doreput();
        } catch (exception e) {
            defaultmessagestore.log.warn(this.getservicename()   " service has exception. ", e);
        }
    }
}

从上面代码可以看出,更新consumequeue与indexfile在死循环中执行,每隔1ms执行一次doreput()来更新更新consumequeue和indexfile,在doreput()中的主要逻辑如下

  • 如果重放消息偏移量reputfromoffset小于commitlog的最大offset,则会循环重放消息,更新consumequeue及indexfile
  • 从commitlog的重放偏移量开始获取映射缓冲结果selectmappedbufferresult,selectmappedbufferresult包含如下属性
// org.apache.rocketmq.store.selectmappedbufferresult
public class selectmappedbufferresult {
    // mappedfile文件起始偏移量 position
    private final long startoffset;
    // reputfromoffset开始的缓冲
    private final bytebuffer bytebuffer;
    // 消息size
    private int size;
    // commitlog的mappedfile
    private mappedfile mappedfile;
}
  • 根据selectmappedbufferresult校验消息,并创建转发请求dispatchrequest,dispatchrequest中包含更新consumequeue和indexfile中需要用到的属性,如topic,消息偏移量,消息key,消息存储时间戳,消息长度,消息taghashcode等。
  • 如果当前消息size>0,则说明当前消息需要被转发更新consumequeue和indexfile,会调用关键方法defaultmessagestore.this.dodispatch转发更新
  • 如果当前消息size=0,则说明已经读到了commitlog当前mappedfile的结尾,因此需要读取下一个mappedfile,并进行转发。
// org.apache.rocketmq.store.defaultmessagestore.reputmessageservice#doreput
private void doreput() {
    // 1.reputfromoffset ≤ commitlog最大offset,则循环重放
    for (boolean donext = true; this.iscommitlogavailable()/*reputfromoffset≤commitlog最大offset*/&&donext; ) {
        // 2.根据reputfromoffset的物理偏移量找到mappedfilequeue中对应的commitlog文件的mappedfile
        // 然后从该mappedfile中截取一段自reputfromoffset偏移量开始的bytebuffer,这段内存存储着将要重放的消息
        selectmappedbufferresult result = defaultmessagestore.this.commitlog.getdata(reputfromoffset);
        if (result != null) {
            try {
                // 遍历消息,开始reput
                for (int readsize = 0; readsize < result.getsize() && donext; ) {
                    // 3. 检查消息属性,并构建一个消息的dispatchrequest
                    dispatchrequest dispatchrequest =
                        defaultmessagestore.this.commitlog.checkmessageandreturnsize(result.getbytebuffer(), false, false);
                    if (dispatchrequest.issuccess()) {
                        if (size > 0) {
                            // 4.消息分发,写consumequeue和index
                            defaultmessagestore.this.dodispatch(dispatchrequest);
                            // 设置reputoffset加上当前消息大小
                            this.reputfromoffset  = size;
                            // 设置读取的大小加上当前消息大小
                            readsize  = size;
                             //如果size=0,说明读取到了mappedfile的文件结尾
                        } else if (size == 0) {
                            // 5. 获取下个文件的起始offset
                            this.reputfromoffset = defaultmessagestore.this.commitlog.rollnextfile(this.reputfromoffset);
                            // 设置readsize=0,结束循环
                            readsize = result.getsize();
                        }
                    } else if (!dispatchrequest.issuccess()) {
                       // ...
                }
            } finally {
                result.release();
            }
        } else {
            donext = false;
        }
    }
}

由上面代码可知,转发更新consumequeue和indexfile的关键代码在defaultmessagestore.this.dodispatch(dispatchrequest)中,在dodispatch()方法中循环遍历dispatcherlist中的commitlogdispatcher。

public void dodispatch(dispatchrequest req) {
    for (commitlogdispatcher dispatcher : this.dispatcherlist) {
        dispatcher.dispatch(req);
    }
}

debug代码可以中包含处理转发请求的dispatcher类,通过类名就可以很容易判断出commitlogdispatcherbuildconsumequeue是将commitlog转发到consumequeue中,commitlogdispatcherbuildindex是将消息构建indexfile,下面我们来分别分析两者是如何处理commitlog消息转发的。

commitlogdispatcherbuildconsumequeue源码分析

commitlogdispatcherbuildconsumequeue将消息保存到consumequeue如下所示,主要是下面两步

  • 先根据消息topic和queueid从consumequeuetable找到consumequeue,如果找不到会创建一个新的consumequeue
  • 调用consumequeue#putmessagepositioninfowrapper,将消息保存到consumequeue中
// org.apache.rocketmq.store.defaultmessagestore#putmessagepositioninfo
public void putmessagepositioninfo(dispatchrequest dispatchrequest) {
    // 找到consumequeue,如果找不到会创建一个consumequeue
    consumequeue cq = this.findconsumequeue(dispatchrequest.gettopic(), dispatchrequest.getqueueid()); 
    // 消息保存到consumequeue中
    cq.putmessagepositioninfowrapper(dispatchrequest, checkmultidispatchqueue(dispatchrequest));
}

保存consumequeue存储单元消息如下,主要分为下面三个步骤

  • 将consumequeue存储单元offset(8字节) 消息长度(4字节) tags的哈希码(8字节)保存到consumequeue的缓存bytebufferindex中
  • 根据consumequeue的offset找到mappedfile
  • 将缓冲中的存储单元存储到mappedfile中
// org.apache.rocketmq.store.consumequeue#putmessagepositioninfo
private boolean putmessagepositioninfo(final long offset, final int size, final long tagscode,
        final long cqoffset) {
    this.bytebufferindex.flip();
    // consumequeue存储单元的长度
    this.bytebufferindex.limit(cq_store_unit_size);
    // 消息物理偏移量
    this.bytebufferindex.putlong(offset);
    // 消息长度
    this.bytebufferindex.putint(size);
    // 消息tags的哈希码
    this.bytebufferindex.putlong(tagscode);
    final long expectlogicoffset = cqoffset * cq_store_unit_size;
		// 获取最后一个mappedfile
    mappedfile mappedfile = this.mappedfilequeue.getlastmappedfile(expectlogicoffset);
    if (mappedfile != null) {
        // 更新物理offset
        this.maxphysicoffset = offset   size;
      	// 数据保存到consumequeue
        return mappedfile.appendmessage(this.bytebufferindex.array());
    }
    return false;
}

commitlogdispatcherbuildindex源码分析

除了commitlogdispatcherbuildconsumequeue,下面我们来分析在dispatcherlist中另一个commitlogdispatcher的实现类commitlogdispatcherbuildindex是如何将index索引单元保存到indexfile中的,存储消息索引的核心逻辑如下所示。

  • 获取或者创建最新的indexfile
  • 将msgid构建index索引单元并保存到indexfile中
  • 将message中的keys用空格分隔成key数组,并循环保存到indexfile中
public void buildindex(dispatchrequest req) {
    // 获取或者创建最新索引文件,支持重试最多3次
    indexfile indexfile = retrygetandcreateindexfile();
    if (indexfile != null) {
        // 获取结束物理索引
        long endphyoffset = indexfile.getendphyoffset();
        dispatchrequest msg = req;
        // 获取topic和keys
        string topic = msg.gettopic();
        string keys = msg.getkeys();
        // 如果当前消息的commitlogoffset小于当前indexfile的endphyoffset时,说明当前消息已经构建过index索引,因此直接返回
        if (msg.getcommitlogoffset() < endphyoffset) {
            return;
        }
        // 获取客户端生成的uniqueid(msgid),代表客户端生成的唯一一条消息
        // 消息解密时生成的
        if (req.getuniqkey() != null) {
            indexfile = putkey(indexfile, msg, buildkey(topic, req.getuniqkey()));
        }
        // 客户端传递的keys,消息是从keys属性中获取的
        if (keys != null && keys.length() > 0) {
            string[] keyset = keys.split(messageconst.key_separator/*空格*/);
            for (int i = 0; i < keyset.length; i  ) {
                string key = keyset[i];
                if (key.length() > 0) {
                    indexfile = putkey(indexfile, msg, buildkey(topic, key));
                    if (indexfile == null) {
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

从上面源码可知,保存消息的关键就在putkey方法中主要分为下面三个步骤

  • 获取要保存到indexfile的keyhashcode(keyhash),hashslot的绝对位置(absslotpos),hash槽中的索引值(slotvalue),保存消息时间差(timediff),索引的绝对位置(absindexpos)等。
  • 更新index索引单元信息,keyhashcode(keyhash),消息在commitlog中的偏移量(phyoffset),消息存储时间与索引文件开始存储时间差(timediff),前置消息索引值(slotvalue)
  • 更新slots的indexcount
  • 更新indexheader中的indexcount,更新物理偏移量(phyoffset),最后存储时间戳(sotretimestamp)
    public boolean putkey(final string key, final long phyoffset, final long storetimestamp) {
        // 索引数量小于2000w,否则说明当前索引文件已经满了,不能添加索引
        if (this.indexheader.getindexcount() < this.indexnum) {
            // keyhashcode
            int keyhash = indexkeyhashmethod(key);
            // 索引槽位置
            int slotpos = keyhash % this.hashslotnum;
            // 绝对位置
            int absslotpos = indexheader.index_header_size   slotpos * hashslotsize;
            try {
                int slotvalue = this.mappedbytebuffer.getint(absslotpos);
                long timediff = storetimestamp - this.indexheader.getbegintimestamp();
                int absindexpos =
                    indexheader.index_header_size   this.hashslotnum * hashslotsize/*哈希槽数量*哈希槽大小=500w*4*/
                          this.indexheader.getindexcount() * indexsize;
                // 更新indexfile索引单元信息
              	// keyhash(4) 消息在commitlog中的偏移量(8) 消息存储时间-索引文件开始存储时间(4) 前置消息索引值(4)
                this.mappedbytebuffer.putint(absindexpos/*索引位置*/, keyhash);
                this.mappedbytebuffer.putlong(absindexpos   4, phyoffset);
                this.mappedbytebuffer.putint(absindexpos   4   8, (int) timediff);
                this.mappedbytebuffer.putint(absindexpos   4   8   4, slotvalue);
								// 更新slots的indexcount
                this.mappedbytebuffer.putint(absslotpos/*hash槽的绝对位置*/, this.indexheader.getindexcount());
              	//...
                // 更新indexheader信息
                this.indexheader.incindexcount();
                this.indexheader.setendphyoffset(phyoffset);
                this.indexheader.setendtimestamp(storetimestamp);
                return true;
            } catch (exception e) {
                log.error("putkey exception, key: "   key   " keyhashcode: "   key.hashcode(), e);
            }
        } 
        return false;
    }

indexfile如何解决hash冲突

假设在indexfile的索引indexn的是一个keyhash为100的索引,如下图所示,此时slots槽位100存储着indexn的序号,在indexfile索引单元保存的数据keyhash=100,preindexno=0。

如果又有一个索引单元indexn x的keyhashcode=100,保存消息时发现solt-100已经指向了索引单元indexn,会将当前索引单元indxen x的preindexno更新为indexn,使得当前索引单元indexn x的前置索引单元指向indeno,再更新slots-100槽位的值为indexn x,保存完成后的索引关系如下图所示。相当于在slots槽位下面挂了index索引单元链表,根据key查找消息时,可以根据key计算出keyhashcode,然后顺着链表查询链表中的消息。

总结

consumequeue可以看成是消息消费的索引,不同topic的consumequeue存储到不同目录中,默认存储在~/store/consumequeue/${topic}目录中,其底层也是使用mappedfile,broker会按照消息在commitlog中的顺序,异步转发到consumequeue中,每条消息在consumequeue生成固定大小20字节的存储单元指向commitlog。

indexfile保存着producer发送消息keys中的索引,有了indexfile就可以根据消息key快速找到消息。indexfile的数据接口与hashmap类似,它使用链表的方式解决解决哈希冲突,并且使用头插法将数据插入链表中。

以上就是rocketmq consumequeue与indexfile实时更新机制源码解析的详细内容,更多关于rocketmq 实时更新机制的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图