目录
前言
前面文章我们介绍了broker是如何将消息全量存储到commitlog文件中,并异步生成dispatchrequest任务更新consumequeue,indexfile的过程以及consumequeue和indexfile的文件结构。由于是异步转发消息,就可能出现消息成功存储到commitlog文件,转发请求任务执行失败,broker宕机了,此时commitlog和index消息并未处理完,导致commitlog与consumequeue和indexfile文件中的数据不一致。如果由一部分消息在commitlog中存在,在consumequeue中不存在,那么这部分消息consumer将永远无法消费到了,那么broker是如何保证数据一致性的呢?
storecheckpoint介绍
storecheckpoint的作用是记录commitlog,consumequeue和indexfile的刷盘点,当broker异常结束时会根据storecheckpoint的数据恢复,storecheckpoint属性如下
public class storecheckpoint { // commitlog最后一条信息的刷盘时间戳 private volatile long physicmsgtimestamp = 0; // consumequeue最后一个存储单元刷盘时间戳 private volatile long logicsmsgtimestamp = 0; // 最近一个已经写完indexfile的最后一条记录刷盘时间戳 private volatile long indexmsgtimestamp = 0; }
storecheckpoint文件的存储位置是${user.home}/store/checkpoint
,文件的固定长度为4k,但storecheckpoint只占用了前24个字节,存储格式如下图所示
storecheckpoint时间戳更新时机
physicmsgtimestamp
flushrealtimeservice刷盘时更新
// org.apache.rocketmq.store.commitlog.flushrealtimeservice#run public void run() { // ... // 更新commitlog刷盘时间戳 if (storetimestamp > 0) { commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp); } }
groupcommitservice刷盘时更新
// org.apache.rocketmq.store.commitlog.groupcommitservice#docommit private void docommit() { // ... // 更新commitlog刷盘时间戳 if (storetimestamp > 0) { commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp); } }
logicsmsgtimestamp
consumequeue保存消息存储单元时更新
// org.apache.rocketmq.store.consumequeue#putmessagepositioninfowrapper public void putmessagepositioninfowrapper(dispatchrequest request, boolean multiqueue) { // ... // 如果consumequeue保存成功,则更新consumequeue存储点信息 if (result) { this.defaultmessagestore.getstorecheckpoint().setlogicsmsgtimestamp(request.getstoretimestamp()); } }
consumequeue刷盘时更新并触发storecheckpoint刷盘
// org.apache.rocketmq.store.defaultmessagestore.flushconsumequeueservice#doflush private void doflush(int retrytimes) { // ... // 更新consumequeue存储时间戳,并刷盘 if (0 == flushconsumequeueleastpages) { if (logicsmsgtimestamp > 0) { defaultmessagestore.this.getstorecheckpoint().setlogicsmsgtimestamp(logicsmsgtimestamp); } // 更新存储点 defaultmessagestore.this.getstorecheckpoint().flush(); } }
indexmsgtimestamp
// org.apache.rocketmq.store.index.indexservice#getandcreatelastindexfile public indexfile getandcreatelastindexfile() { // 获取最新indexfile,如果indexfile已经满了,需要创建一个新的indexfile if (indexfile == null) { indexfile = new indexfile(filename, this.hashslotnum, this.indexnum, lastupdateendphyoffset, lastupdateindextimestamp); // 如果创建新的indexfile成功,原indexfile刷盘 if (indexfile != null) { final indexfile flushthisfile = previndexfile; thread flushthread = new thread(new runnable() { @override public void run() { // indexfile刷盘 indexservice.this.flush(flushthisfile); } }, "flushindexfilethread"); flushthread.setdaemon(true); flushthread.start(); } } return indexfile; } // org.apache.rocketmq.store.index.indexservice#flush public void flush(final indexfile f) { if (null == f) return; long indexmsgtimestamp = 0; if (f.iswritefull()) { indexmsgtimestamp = f.getendtimestamp(); } f.flush(); if (indexmsgtimestamp > 0) { // 更新checkpoint的indexmsgtimestamp并触发刷盘 this.defaultmessagestore.getstorecheckpoint().setindexmsgtimestamp(indexmsgtimestamp); this.defaultmessagestore.getstorecheckpoint().flush(); } }
- 保存消息index,获取最新的indexfile如果满了,则会创建一个新的indexfile,并且更新indexmsgtimestamp并触发storecheckpoint刷盘
storecheckpoint刷盘源码
storecheckpoint刷盘源码如下所示,就是将commitlog,consumequeue和indexfile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机
- consumequeue刷盘时触发
- 创建新indexfile文件时触发
storecheckpoint刷盘源码如下
// org.apache.rocketmq.store.storecheckpoint#flush public void flush() { this.mappedbytebuffer.putlong(0, this.physicmsgtimestamp); this.mappedbytebuffer.putlong(8, this.logicsmsgtimestamp); this.mappedbytebuffer.putlong(16, this.indexmsgtimestamp); this.mappedbytebuffer.force(); }
消息加载源码分析
在brokercontroller启动时会调用defaultmessagestore#load
加载存储文件加载和恢复过程主要分为下面几步
- 判断broker上次是否正常退出。这个判断逻辑是根据
${user.home}/store/abort
是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。 - 加载commitlog
- 加载consumequeue
- 加载storecheckpoint
- 加载indexfile
- 恢复consumequeue与indexfile
- 加载延迟队列服务
// org.apache.rocketmq.store.defaultmessagestore#load public boolean load() { boolean result = true; try { // 1. broker上次是否正常退出 boolean lastexitok = !this.istempfileexist(); log.info("last shutdown {}", lastexitok ? "normally" : "abnormally"); // 2. 加载commitlog result = result && this.commitlog.load(); // 3. 加载consumequeue result = result && this.loadconsumequeue(); if (result) { // 4. 加载storecheckpoint this.storecheckpoint = new storecheckpoint(storepathconfighelper.getstorecheckpoint(this.messagestoreconfig.getstorepathrootdir())); // 5. 加载indexfile this.indexservice.load(lastexitok); // 6. 恢复consumequeue与indexfile this.recover(lastexitok); // 7. 延迟队列服务加载 if (null != schedulemessageservice) { result = this.schedulemessageservice.load(); } } } return result; }
commitlog加载
前面文章介绍过,commitlog文件的存储目录是${user.home}/store/commitlog/
,并且commitlog文件的底层是mappedfile,由mappedfilequeue管理。
commitlog文件的加载其实调用的是mappedfilequeue#load
方法,代码如下所示,load()中首先加载commitlog文件目录下的所有文件,并调用doload()方法加载commitlog。
// org.apache.rocketmq.store.mappedfilequeue#load public boolean load() { file dir = new file(this.storepath/*${user.home}/store/commitlog/*/); file[] ls = dir.listfiles(); if (ls != null) { return doload(arrays.aslist(ls)); } return true; }
mappedfile的加载过程如下所示,核心逻辑主要分为下面三步
- 按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中
- 校验文件大小与mappedfile是否一致,如果commitlog文件大小与mappedfilesize不一致,则说明配置被改了,或者commitlog文件被修改
- 创建mappedfile,并且设置wroteposition,flushedposition,committedposition为mappedfilesize
public boolean doload(listfiles) { // 按照文件名称排序 files.sort(comparator.comparing(file::getname)); for (file file : files) { // 如果commitlog文件大小与mappedfilesize不一致,则说明配置被改了,或者commitlog文件被修改 if (file.length() != this.mappedfilesize) { return false; } try { // 创建mappedfile mappedfile mappedfile = new mappedfile(file.getpath(), mappedfilesize); mappedfile.setwroteposition(this.mappedfilesize); mappedfile.setflushedposition(this.mappedfilesize); mappedfile.setcommittedposition(this.mappedfilesize); this.mappedfiles.add(mappedfile); } } return true; }
看到这里肯定会有疑问,加载后的mappedfile的wroteposition,flushedposition和committedposition的值都为mappedfilesize,如果最后一个mappedfile没有使用完,broker启动后还会从最后一个mappedfile开始写么?我们可以在后面消息文件恢复源码分析找到答案。
consumequeue加载
从前面文章我们知道,consumequeue文件底层其实也是mappedfile,因此consumequeue文件的加载与commitlog加载差别不大。consumequeue加载逻辑为
- 获取consumequeue目录下存储的所有topic目录,遍历topic目录
- 遍历每个topic目录下的所有queueid目录,逐个加载ququeid中的所有mappedfile
// org.apache.rocketmq.store.defaultmessagestore#loadconsumequeue private boolean loadconsumequeue() { // 获取consumequeue目录 file dirlogic = new file(storepathconfighelper.getstorepathconsumequeue(this.messagestoreconfig.getstorepathrootdir()/*${user.home}/store */)); // topic文件夹数组 file[] filetopiclist = dirlogic.listfiles(); if (filetopiclist != null) { // 遍历topic for (file filetopic : filetopiclist) { // 获取topic名称 string topic = filetopic.getname(); // 获取queueid文件夹数组 file[] filequeueidlist = filetopic.listfiles(); // 遍历queueid if (filequeueidlist != null) { for (file filequeueid : filequeueidlist) { int queueid; // 文件夹名称就是queueid queueid = integer.parseint(filequeueid.getname()); // 构建consumequeue consumequeue logic = new consumequeue(/* ... */); this.putconsumequeue(topic, queueid, logic); // consumequeue加载 if (!logic.load()) { return false; } } } } } return true; }
indexfile加载
indexfile文件加载过程调用的是indexservice#load
,首先获取${user.home}/store/index
目录下的所有文件,遍历所有文件,如果indexfile最后存储时间大于storecheckpoint中indexmsgtimestamp,则会先删除indexfile
// org.apache.rocketmq.store.index.indexservice#load public boolean load(final boolean lastexitok) { // indexfile文件目录 file dir = new file(this.storepath); // indexfile文件列表 file[] files = dir.listfiles(); if (files != null) { // 文件排序 arrays.sort(files); for (file file : files) { try { indexfile f = new indexfile(file.getpath(), this.hashslotnum, this.indexnum, 0, 0); f.load(); if (!lastexitok) { // 文件最后存储时间戳大于刷盘点,则摧毁indexfile,重建 if (f.getendtimestamp() > this.defaultmessagestore.getstorecheckpoint()/*存储点时间*/ .getindexmsgtimestamp()) { f.destroy(0); continue; } } this.indexfilelist.add(f); } } } return true; }
consumequeue与indexfile恢复
如果是正常退出,数据都已经正常刷盘,前面我们说到commitlog在加载时的wroteposition,flushedposition,committedposition都设置为mappedfilesize,
因此即使是正常退出,也会调用commitlog#recovernormally
找到最后一条消息的位置,更新这三个属性。
// org.apache.rocketmq.store.defaultmessagestore#recover private void recover(final boolean lastexitok) { // consumequeue中最大物理偏移量 long maxphyoffsetofconsumequeue = this.recoverconsumequeue(); if (lastexitok) { // 正常退出文件恢复 this.commitlog.recovernormally(maxphyoffsetofconsumequeue); } else { // 异常退出文件恢复 this.commitlog.recoverabnormally(maxphyoffsetofconsumequeue); } // 恢复topicqueuetable this.recovertopicqueuetable(); }
正常恢复的源码如下,由于broker是正常关闭,因此commitlog,consumequeue与indexfile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新commitlog的mappedfilequeue中的刷盘点(flushwhere)和提交点(committedwhere),
- 从最后3个mappedfile开始恢复,如果mappedfile总数不足3个,则从第0个mappedfile开始恢复
- 逐个遍历mappedfile,找到每个mappedfile的最后一条消息的偏移量,并将其更新到commitlog中mappedfilequeue的刷盘点和提交点中
- 清除consumequeue冗余数据
public void recovernormally(long maxphyoffsetofconsumequeue) { // 确认消息是否完整,默认是true boolean checkcrconrecover = this.defaultmessagestore.getmessagestoreconfig().ischeckcrconrecover(); final listmappedfiles = this.mappedfilequeue.getmappedfiles(); if (!mappedfiles.isempty()) { // 默认从最后3个mappedfile开始恢复 int index = mappedfiles.size() - 3; // 如果commitlog不足三个,则从第一个文件开始恢复 if (index < 0) index = 0; mappedfile mappedfile = mappedfiles.get(index); bytebuffer bytebuffer = mappedfile.slicebytebuffer(); // 最后一个mappedfile的文件起始偏移量 long processoffset = mappedfile.getfilefromoffset(); // mappedfileoffset偏移量 long mappedfileoffset = 0; // 遍历commitlog文件 while (true) { // 校验消息完整性 dispatchrequest dispatchrequest = this.checkmessageandreturnsize(bytebuffer, checkcrconrecover); // 获取消息size int size = dispatchrequest.getmsgsize(); // 返回结果为true并且消息size>0,说明消息是完整的 if (dispatchrequest.issuccess() && size > 0) { mappedfileoffset = size; } } // 最大物理偏移量 processoffset = mappedfileoffset; // 更新flushedwhere和committedposition指针 this.mappedfilequeue.setflushedwhere(processoffset); this.mappedfilequeue.setcommittedwhere(processoffset); this.mappedfilequeue.truncatedirtyfiles(processoffset); // 清除consumequeue冗余数据 if (maxphyoffsetofconsumequeue >= processoffset) { this.defaultmessagestore.truncatedirtylogicfiles(processoffset/*commitlog最大物理偏移量*/); } } }
异常恢复源码如下,由于上次broker没有正常关闭,因此由可能存在commitlog、consumequeue与indexfile不一致的情况,因此在异常恢复时可能需要恢复consumequeue和indexfile,异常恢复核心逻辑主要包括
- 倒序查commitlog的mappedfile文件,找到第一条消息存储的时间戳比storecheckpoint里的physicmsgtimestamp,logicsmsgtimestamp和indexmsgtimestamp三者都小的最大mappedfile,该mappedfile至少有一部分消息是被正常转发,正常存储,正常刷盘的
- 从该mappedfile开始逐条转发消息,重新恢复consumequeue和indexfile
- 当遍历到最后一条消息,将其偏移量更新到commitlog中mappedfilequeue的刷盘点和提交点中
- 清除consumequeue冗余数据
// org.apache.rocketmq.store.commitlog#recoverabnormally public void recoverabnormally(long maxphyoffsetofconsumequeue) { // 是否crc校验 boolean checkcrconrecover = this.defaultmessagestore.getmessagestoreconfig().ischeckcrconrecover(); final listmappedfiles = this.mappedfilequeue.getmappedfiles(); if (!mappedfiles.isempty()) { // 最后一个mappedfile的index int index = mappedfiles.size() - 1; mappedfile mappedfile = null; // 倒序遍历mappedfile数组, for (; index >= 0; index--) { mappedfile = mappedfiles.get(index); // 1. 如果第一条消息的时间戳小于存储点时间戳 if (this.ismappedfilematchedrecover(mappedfile)) { break; } } long processoffset = mappedfile.getfilefromoffset(); long mappedfileoffset = 0; while (true) { dispatchrequest dispatchrequest = this.checkmessageandreturnsize(bytebuffer, checkcrconrecover); int size = dispatchrequest.getmsgsize(); if (dispatchrequest.issuccess()) { if (size > 0) { mappedfileoffset = size; // 2. 转发消息 if (this.defaultmessagestore.getmessagestoreconfig().isduplicationenable()/*消息是否可以重复,默认是false*/) { if (dispatchrequest.getcommitlogoffset() < this.defaultmessagestore.getconfirmoffset()) { this.defaultmessagestore.dodispatch(dispatchrequest); } } else { this.defaultmessagestore.dodispatch(dispatchrequest); } } } // 3. 更新mappedfilequeue中的刷盘位置和提交位置 processoffset = mappedfileoffset; this.mappedfilequeue.setflushedwhere(processoffset); this.mappedfilequeue.setcommittedwhere(processoffset); this.mappedfilequeue.truncatedirtyfiles(processoffset); // 清除consumequeue中的冗余数据 if (maxphyoffsetofconsumequeue >= processoffset) { this.defaultmessagestore.truncatedirtylogicfiles(processoffset); } } }
总结
broker启动时会分别加载commitlog、consumequeue与indexfile。加载完成后,如果broker上次是正常退出,只需要找到commitlog的最后一条消息,并更新刷盘点和提交点。如果broker上次是异常退出,就有可能出现consumequeue、indexfile与commitlog不一致的情况,需要根据storecheckpoint存储的时间戳从commitlog找到消息,逐条恢复consumequeue与indexfile。
以上就是rocketmq | 源码分析】消息存储文件的加载与恢复机制的详细内容,更多关于rocketmq 消息存储文件加载恢复的资料请关注其它相关文章!