目录
前言
前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是commitlog是如何存储和刷盘的。虽然commitlog顺序存储着所有消息,但是commitlog中的消息并没有区分topic、keys等,如果需要消费某个topic的消息或者查找某一条消息只能遍历commitlog文件去查找,性能相当低下,因此有了consumelog和indexfile两个文件类型,这两个文件的作用主要是提升消息消费和查询的性能。
consumequeue详解
为了提高消费消息查询性能,broker会为每个topic在~/store/consumequeue
中创建一个topic名称的目录,并再为该topic创建目录名为queueid的目录,每个目录存放着若干consumequeue文件,consumequeue属于commitlog的索引文件,可以根据consumequeue定位到具体的消息,consumequeue存储文件见下图
consumequeue文件名由20位数字构成,表示当前文件的第一个索引条目的起始偏移量。与commitlog文件名不同的是,consumequeue后续文件名是固定的,由于consumequeue文件大小是固定不变的。
consumequeue文件大小由mappedfilesizeconsumequeue
配置控制,它的默认大小是30w * consumequeue.cq_store_unit_size(20)
,也就是600w字节大小,consumequeue.cq_store_unit_size
是consumequeue每个索引条目的大小,每隔索引条目包含了三个消息的重要属性:消息在mappedfile文件中的物理偏移量(8字节)、消息的长度(4字节)、消息tag的hashcode值,这三个属性占了20个字节,单个索引条目结构如下图所示
indexfile详解
rocketmq除了提供消息的topic给消息消费外,rocketmq还提供了根据key来查找消息的功能,producer创建消息时可以传入keys值,用于快速查找消息。
// 构建message参数 message msg = new message("topictest", // 消息topic "taga", // 消息tag "key1 key2 key3", // 消息keys,多个key用" "隔开 "hello linshifu!".getbytes(remotinghelper.default_charset)); // 消息体
indexfile可以看做是一个key的哈希索引文件,通过计算key的hash值,快速找到某个key对应的消息在commitlog中的位置。indexfile由下面三个部分构成:
- indexheader
- slots槽位
- indexes索引数据
indexfile结构如下图所示
每个indexfile的长度是固定的,其中indexheader占用40字节,slots占用500w * 4字节,index索引数据占用2000w * 20字节
indexheader
indexheader占用indexfile的前40个字节,它主要存储着indexfile索引文件的相关信息,indexheader包含如下属性
// org.apache.rocketmq.store.index.indexheader public class indexheader { // 索引文件第一条消息在commitlog中的存储时间 private final atomiclong begintimestamp = new atomiclong(0); // 索引文件最后一条消息在commitlog中的存储时间 private final atomiclong endtimestamp = new atomiclong(0); // 索引文件第一条消息的偏移量 private final atomiclong beginphyoffset = new atomiclong(0); // 索引文件最后一条消息的偏移量 private final atomiclong endphyoffset = new atomiclong(0); // 已经填充slot的hash槽数量 private final atomicinteger hashslotcount = new atomicinteger(0); // 该indexfile种包含的索引单元数量 private final atomicinteger indexcount = new atomicinteger(1); }
数据结构如下图所示
slots槽位
在indexfile中间部分存储的是indexflie中key的hash槽,每个hash槽存储的是index索引单元的indexno,添加索引时会将key的hash值p0w的结果计算哈希槽序号,然后将index索引单元的indexno放入slot槽中,indexno是int类型,slots槽位总共有500w个,因此slots槽位占用的大小是500w * 4=2000w
indexes索引数据
index索引由2000w个索引单元构成,每个索引单元大小为20字节,每隔索引单元由下面四个部分构成
- keyhash
keyhash是消息索引key的hash值
- phyoffet
phyoffset是当前key对应消息在commitlog中的偏移量commitlog offset
- timediff
timediff是当前key对应消息存储时间与当前indexfile第一个索引存储时间差
- preindex
当前slot的index索引单元的前一个索引单元的indexno
索引单元数据结构如下
实时更新consumequeue与indexfile源码分析
之前的文章我们只了解了broker的commitlog文件保存和刷盘的流程,现在我们来了解broker实时更新consumequeue和indexfile的流程。
消息保存的过程仅仅会保存commitlog,consumequeue文件及indexfile中的数据是通过reputmessageservice将commitlog中的消息转发到consumequeue及indexfile。
reputmessageservice和之前的刷盘服务类似,都是异步线程执行的服务。reputmessageservice是defaultmessagestore的一个内部类,它跟随者消息存储对象defaultmessagestore创建时共同创建。reputmessageservice刷新consumequeue与indexfile的逻辑可以从它的run()方法开始分析。
// org.apache.rocketmq.store.defaultmessagestore.reputmessageservice#run @override public void run() { // 死循环 while (!this.isstopped()) { try { // 睡眠1ms thread.sleep(1); // 更新consumequeue和indexfile this.doreput(); } catch (exception e) { defaultmessagestore.log.warn(this.getservicename() " service has exception. ", e); } } }
从上面代码可以看出,更新consumequeue与indexfile在死循环中执行,每隔1ms执行一次doreput()来更新更新consumequeue和indexfile,在doreput()中的主要逻辑如下
- 如果重放消息偏移量reputfromoffset小于commitlog的最大offset,则会循环重放消息,更新consumequeue及indexfile
- 从commitlog的重放偏移量开始获取映射缓冲结果selectmappedbufferresult,selectmappedbufferresult包含如下属性
// org.apache.rocketmq.store.selectmappedbufferresult public class selectmappedbufferresult { // mappedfile文件起始偏移量 position private final long startoffset; // reputfromoffset开始的缓冲 private final bytebuffer bytebuffer; // 消息size private int size; // commitlog的mappedfile private mappedfile mappedfile; }
- 根据selectmappedbufferresult校验消息,并创建转发请求dispatchrequest,dispatchrequest中包含更新consumequeue和indexfile中需要用到的属性,如topic,消息偏移量,消息key,消息存储时间戳,消息长度,消息taghashcode等。
- 如果当前消息size>0,则说明当前消息需要被转发更新consumequeue和indexfile,会调用关键方法
defaultmessagestore.this.dodispatch
转发更新 - 如果当前消息size=0,则说明已经读到了commitlog当前mappedfile的结尾,因此需要读取下一个mappedfile,并进行转发。
// org.apache.rocketmq.store.defaultmessagestore.reputmessageservice#doreput private void doreput() { // 1.reputfromoffset ≤ commitlog最大offset,则循环重放 for (boolean donext = true; this.iscommitlogavailable()/*reputfromoffset≤commitlog最大offset*/&&donext; ) { // 2.根据reputfromoffset的物理偏移量找到mappedfilequeue中对应的commitlog文件的mappedfile // 然后从该mappedfile中截取一段自reputfromoffset偏移量开始的bytebuffer,这段内存存储着将要重放的消息 selectmappedbufferresult result = defaultmessagestore.this.commitlog.getdata(reputfromoffset); if (result != null) { try { // 遍历消息,开始reput for (int readsize = 0; readsize < result.getsize() && donext; ) { // 3. 检查消息属性,并构建一个消息的dispatchrequest dispatchrequest dispatchrequest = defaultmessagestore.this.commitlog.checkmessageandreturnsize(result.getbytebuffer(), false, false); if (dispatchrequest.issuccess()) { if (size > 0) { // 4.消息分发,写consumequeue和index defaultmessagestore.this.dodispatch(dispatchrequest); // 设置reputoffset加上当前消息大小 this.reputfromoffset = size; // 设置读取的大小加上当前消息大小 readsize = size; //如果size=0,说明读取到了mappedfile的文件结尾 } else if (size == 0) { // 5. 获取下个文件的起始offset this.reputfromoffset = defaultmessagestore.this.commitlog.rollnextfile(this.reputfromoffset); // 设置readsize=0,结束循环 readsize = result.getsize(); } } else if (!dispatchrequest.issuccess()) { // ... } } finally { result.release(); } } else { donext = false; } } }
由上面代码可知,转发更新consumequeue和indexfile的关键代码在defaultmessagestore.this.dodispatch(dispatchrequest)
中,在dodispatch()方法中循环遍历dispatcherlist中的commitlogdispatcher。
public void dodispatch(dispatchrequest req) { for (commitlogdispatcher dispatcher : this.dispatcherlist) { dispatcher.dispatch(req); } }
debug代码可以中包含处理转发请求的dispatcher类,通过类名就可以很容易判断出commitlogdispatcherbuildconsumequeue是将commitlog转发到consumequeue中,commitlogdispatcherbuildindex是将消息构建indexfile,下面我们来分别分析两者是如何处理commitlog消息转发的。
commitlogdispatcherbuildconsumequeue源码分析
commitlogdispatcherbuildconsumequeue将消息保存到consumequeue如下所示,主要是下面两步
- 先根据消息topic和queueid从consumequeuetable找到consumequeue,如果找不到会创建一个新的consumequeue
- 调用consumequeue#putmessagepositioninfowrapper,将消息保存到consumequeue中
// org.apache.rocketmq.store.defaultmessagestore#putmessagepositioninfo public void putmessagepositioninfo(dispatchrequest dispatchrequest) { // 找到consumequeue,如果找不到会创建一个consumequeue consumequeue cq = this.findconsumequeue(dispatchrequest.gettopic(), dispatchrequest.getqueueid()); // 消息保存到consumequeue中 cq.putmessagepositioninfowrapper(dispatchrequest, checkmultidispatchqueue(dispatchrequest)); }
保存consumequeue存储单元消息如下,主要分为下面三个步骤
- 将consumequeue存储单元
offset(8字节) 消息长度(4字节) tags的哈希码(8字节)
保存到consumequeue的缓存bytebufferindex中 - 根据consumequeue的offset找到mappedfile
- 将缓冲中的存储单元存储到mappedfile中
// org.apache.rocketmq.store.consumequeue#putmessagepositioninfo private boolean putmessagepositioninfo(final long offset, final int size, final long tagscode, final long cqoffset) { this.bytebufferindex.flip(); // consumequeue存储单元的长度 this.bytebufferindex.limit(cq_store_unit_size); // 消息物理偏移量 this.bytebufferindex.putlong(offset); // 消息长度 this.bytebufferindex.putint(size); // 消息tags的哈希码 this.bytebufferindex.putlong(tagscode); final long expectlogicoffset = cqoffset * cq_store_unit_size; // 获取最后一个mappedfile mappedfile mappedfile = this.mappedfilequeue.getlastmappedfile(expectlogicoffset); if (mappedfile != null) { // 更新物理offset this.maxphysicoffset = offset size; // 数据保存到consumequeue return mappedfile.appendmessage(this.bytebufferindex.array()); } return false; }
commitlogdispatcherbuildindex源码分析
除了commitlogdispatcherbuildconsumequeue,下面我们来分析在dispatcherlist中另一个commitlogdispatcher的实现类commitlogdispatcherbuildindex是如何将index索引单元保存到indexfile中的,存储消息索引的核心逻辑如下所示。
- 获取或者创建最新的indexfile
- 将msgid构建index索引单元并保存到indexfile中
- 将message中的keys用空格分隔成key数组,并循环保存到indexfile中
public void buildindex(dispatchrequest req) { // 获取或者创建最新索引文件,支持重试最多3次 indexfile indexfile = retrygetandcreateindexfile(); if (indexfile != null) { // 获取结束物理索引 long endphyoffset = indexfile.getendphyoffset(); dispatchrequest msg = req; // 获取topic和keys string topic = msg.gettopic(); string keys = msg.getkeys(); // 如果当前消息的commitlogoffset小于当前indexfile的endphyoffset时,说明当前消息已经构建过index索引,因此直接返回 if (msg.getcommitlogoffset() < endphyoffset) { return; } // 获取客户端生成的uniqueid(msgid),代表客户端生成的唯一一条消息 // 消息解密时生成的 if (req.getuniqkey() != null) { indexfile = putkey(indexfile, msg, buildkey(topic, req.getuniqkey())); } // 客户端传递的keys,消息是从keys属性中获取的 if (keys != null && keys.length() > 0) { string[] keyset = keys.split(messageconst.key_separator/*空格*/); for (int i = 0; i < keyset.length; i ) { string key = keyset[i]; if (key.length() > 0) { indexfile = putkey(indexfile, msg, buildkey(topic, key)); if (indexfile == null) { return; } } } } } else { log.error("build index error, stop building index"); } }
从上面源码可知,保存消息的关键就在putkey方法中主要分为下面三个步骤
- 获取要保存到indexfile的keyhashcode(keyhash),hashslot的绝对位置(absslotpos),hash槽中的索引值(slotvalue),保存消息时间差(timediff),索引的绝对位置(absindexpos)等。
- 更新index索引单元信息,keyhashcode(keyhash),消息在commitlog中的偏移量(phyoffset),消息存储时间与索引文件开始存储时间差(timediff),前置消息索引值(slotvalue)
- 更新slots的indexcount
- 更新indexheader中的indexcount,更新物理偏移量(phyoffset),最后存储时间戳(sotretimestamp)
public boolean putkey(final string key, final long phyoffset, final long storetimestamp) { // 索引数量小于2000w,否则说明当前索引文件已经满了,不能添加索引 if (this.indexheader.getindexcount() < this.indexnum) { // keyhashcode int keyhash = indexkeyhashmethod(key); // 索引槽位置 int slotpos = keyhash % this.hashslotnum; // 绝对位置 int absslotpos = indexheader.index_header_size slotpos * hashslotsize; try { int slotvalue = this.mappedbytebuffer.getint(absslotpos); long timediff = storetimestamp - this.indexheader.getbegintimestamp(); int absindexpos = indexheader.index_header_size this.hashslotnum * hashslotsize/*哈希槽数量*哈希槽大小=500w*4*/ this.indexheader.getindexcount() * indexsize; // 更新indexfile索引单元信息 // keyhash(4) 消息在commitlog中的偏移量(8) 消息存储时间-索引文件开始存储时间(4) 前置消息索引值(4) this.mappedbytebuffer.putint(absindexpos/*索引位置*/, keyhash); this.mappedbytebuffer.putlong(absindexpos 4, phyoffset); this.mappedbytebuffer.putint(absindexpos 4 8, (int) timediff); this.mappedbytebuffer.putint(absindexpos 4 8 4, slotvalue); // 更新slots的indexcount this.mappedbytebuffer.putint(absslotpos/*hash槽的绝对位置*/, this.indexheader.getindexcount()); //... // 更新indexheader信息 this.indexheader.incindexcount(); this.indexheader.setendphyoffset(phyoffset); this.indexheader.setendtimestamp(storetimestamp); return true; } catch (exception e) { log.error("putkey exception, key: " key " keyhashcode: " key.hashcode(), e); } } return false; }
indexfile如何解决hash冲突
假设在indexfile的索引indexn的是一个keyhash为100的索引,如下图所示,此时slots槽位100存储着indexn的序号,在indexfile索引单元保存的数据keyhash=100,preindexno=0。
如果又有一个索引单元indexn x的keyhashcode=100,保存消息时发现solt-100已经指向了索引单元indexn,会将当前索引单元indxen x的preindexno更新为indexn,使得当前索引单元indexn x的前置索引单元指向indeno,再更新slots-100槽位的值为indexn x,保存完成后的索引关系如下图所示。相当于在slots槽位下面挂了index索引单元链表,根据key查找消息时,可以根据key计算出keyhashcode,然后顺着链表查询链表中的消息。
总结
consumequeue可以看成是消息消费的索引,不同topic的consumequeue存储到不同目录中,默认存储在~/store/consumequeue/${topic}
目录中,其底层也是使用mappedfile,broker会按照消息在commitlog中的顺序,异步转发到consumequeue中,每条消息在consumequeue生成固定大小20字节的存储单元指向commitlog。
indexfile保存着producer发送消息keys中的索引,有了indexfile就可以根据消息key快速找到消息。indexfile的数据接口与hashmap类似,它使用链表的方式解决解决哈希冲突,并且使用头插法将数据插入链表中。
以上就是rocketmq consumequeue与indexfile实时更新机制源码解析的详细内容,更多关于rocketmq 实时更新机制的资料请关注其它相关文章!