目录
前言
前面文章讲了消息是如何保存的以及consumequeue与index文件更新机制。随着消息的增加,broker不可能一直保存所有消息,broker是按照什么规则清理消息的呢?被消费过后的消息就会被清理掉吗?下面我们来介绍broker消息清理机制。
broker消息清理机制简介
消息是被顺序存储在commitlog文件中的,且消息长度不定长,因此消息的清理不是以消息为单位进行的,而是以commitlog为单位进行的。默认情况下,broker会清理单个commitlog文件中最后一条消息超过72小时的commitlog文件,除了用户手动清理为,下面几种情况会被默认清理。
commitlog清理机制
commitlog文件过期(72小时),且达到清理时间点(默认为04:00~05:00),自动清理过期的commitlog文件
commitlog文件过期(72小时),且commitlog所在磁盘分区占用率已经达到过期清理警戒线(默认75%),无论是否到达清理时间点都会自动清理过期文件
commitlog所在磁盘分区占用率已经达到清理警戒线(默认85%),无论是否过期,都会从最早的文件开始清理,一次最多清理10个文件
commitlog所在磁盘分区占用率已经达到系统危险警戒线(默认90%),broker将拒绝消息写入
broker至少会保留最新的commitlog文件
consumequeue清理机制
- 如果consumequeue文件关联commitlog都被清理,则清理此consumequeue文件
- broker每个topic-queueid至少会保留最新的文件
indexfile清理机制
- 如果indexfile所有索引单元关联commitlog都被清理,则清理此indexfile
broker与消息清理相关配置
# 文件自动清理时间,单位h,默认72 filereservedtime=72 # commitlog物理文件删除间隔,但是ms,默认100 deletecommitlogfilesinterval = 100 # 文件自动清理时间,默认04,即凌晨4点 deletewhen = "04" # 硬盘占用率所在分区过期清理警戒线,超过这个值,无论是否到达清理时间,都会自动清理过期文件 diskmaxusedspaceratio = 75
消息清理机制源码分析
消息定时清理的是由defaultmessagestore类负责的,它在启动时(start)会调用defaultmessagestore#addscheduletask
添加和消息存储相关的定时任务,其中就包括消息删除相关的定时任务defaultmessagestore.this.cleanfilesperiodically()
,这个定时任务在broker启动后60s开始,每隔10秒执行一次。
// org.apache.rocketmq.store.defaultmessagestore#addscheduletask private void addscheduletask() { this.scheduledexecutorservice.scheduleatfixedrate(new runnable() { @override public void run() { // commitlog、consumequeue和indexfile定时删除 defaultmessagestore.this.cleanfilesperiodically(); } }, 1000 * 60, this.messagestoreconfig.getcleanresourceinterval()/*10s*/, timeunit.milliseconds); // ... }
在cleanfilesperiodically()中有两个方法,cleancommitlogservice.run()
负责清理commitlog,cleanconsumequeueservice.run()
负责清理consumequeue和indexfile。
// org.apache.rocketmq.store.defaultmessagestore#cleanfilesperiodically private void cleanfilesperiodically() { // 清理commitlog this.cleancommitlogservice.run(); // 清理consumequeue和indexfile this.cleanconsumequeueservice.run(); }
commitlog清理源码分析
commitlog清理方法cleancommitlogservice#run
调用了cleancommitlogservice#deleteexpiredfiles
,deleteexpiredfiles方法的核心代码逻辑如下,以下三种情况会触发commitlog文件的删除
- 当前时间是凌晨4点
- commitlog所在磁盘分区硬盘占用率超过75%
- 手动删除commitlog
// org.apache.rocketmq.store.defaultmessagestore.cleancommitlogservice#deleteexpiredfiles private void deleteexpiredfiles() { // 是否是凌晨4点,用小时匹配[04:00,05:00) boolean timeup = this.istimetodelete(); // >75%就会返回true,如果大于85%,则触发强制删除 boolean spacefull = this.isspacetodelete(); // 手动删除次数是否>0 boolean manualdelete = this.manualdeletefileseveraltimes > 0; if (timeup/*凌晨4点*/ || spacefull/*空间满了*/ || manualdelete/*手动删除*/) { boolean cleanatonce = defaultmessagestore.this.getmessagestoreconfig().iscleanfileforciblyenable()/*默认true*/ && this.cleanimmediately/*空间占用超过85%,触发强制删除*/; // 删除commitlog deletecount = defaultmessagestore.this.commitlog.deleteexpiredfile(filereservedtime, deletephysicfilesinterval, destroymapedfileintervalforcibly, cleanatonce); if (deletecount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } }
commitlog的清理逻辑在mappedfilequeue#deleteexpiredfilebytime
,其核心代码如下所示,主要分为下面几个步骤
- 复制mappedfilequeue中的mappedfiles,循环处理删除逻辑,循环
mfslength-1
次,也就是无论如何都会保留最新的mappedfile - 如果mappedfile的最后修改时间超过72小时或commitlog所在磁盘分区硬盘占用率超过85%触发强制删除mappedfile,则会删除mappedfile,每次删除最多删除10个mappedfile,相邻mappedfile删除时间间隔默认100ms
- 删除mappedfilequeue的mappedfiles数组中已删除的mappedfile,并返回删除mappedfile的数量
// org.apache.rocketmq.store.mappedfilequeue#deleteexpiredfilebytime public int deleteexpiredfilebytime(final long expiredtime,final int deletefilesinterval,final long intervalforcibly,final boolean cleanimmediately) { // 复制一份当前mappedfile object[] mfs = this.copymappedfiles(0); // 会保留最后一个mappedfile int mfslength = mfs.length - 1; int deletecount = 0; listfiles = new arraylist (); if (null != mfs) { for (int i = 0; i < mfslength; i ) { mappedfile mappedfile = (mappedfile) mfs[i]; // 最后修改时间 过期时间 long livemaxtimestamp = mappedfile.getlastmodifiedtimestamp() expiredtime; // 如果commitlog所在磁盘分区总容量超过85%,触发立即删除,或者超过了72小时的mappedfile if (system.currenttimemillis() >= livemaxtimestamp || cleanimmediately) { // 删除mappedfile if (mappedfile.destroy(intervalforcibly)) { files.add(mappedfile); deletecount ; // 一次最多删除10个mappedfile if (files.size() >= delete_files_batch_max/*10*/) { break; } if (deletefilesinterval > 0 && (i 1) < mfslength) { try { // 删除文件时间间隔,默认100ms thread.sleep(deletefilesinterval); } catch (interruptedexception e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } // 从mappedfilequeue的mappedfiles中删除这个mappedfile deleteexpiredfile(files); return deletecount; }
consumequeue和indexfile清理源码分析
consumequeue和indexfile清理方法cleanconsumequeueservice#run
调用了cleanconsumequeueservice#deleteexpiredfiles
方法清理consumequeue和indexfile。cleanconsumequeueservice#deleteexpiredfiles
核心代码如下,包括两个主要逻辑
- 遍历consumequeuetable中的consumequeue,调用
consumequeue#deleteexpiredfile
删除过期consumequeue - 调用
indexservice#deleteexpiredfile
删除过期indexfile
// org.apache.rocketmq.store.defaultmessagestore.cleanconsumequeueservice#deleteexpiredfiles private void deleteexpiredfiles() { if (minoffset > this.lastphysicalminoffset) { concurrentmap> tables = defaultmessagestore.this.consumequeuetable; // 遍历consumequeue for (concurrentmap maps : tables.values()) { for (consumequeue logic : maps.values()) { // 删除consumequeue int deletecount = logic.deleteexpiredfile(minoffset); // ... 间隔100ms } } // 删除indexfile defaultmessagestore.this.indexservice.deleteexpiredfile(minoffset); } }
consumequeue文件清理
consumequeue文件底层也是mappedfile,清理consumequeue调用mappedfilequeue#deleteexpiredfilebyoffset
清理consumequeue的过期mappedfile,源码如下,核心逻辑
- 复制mappedfilequeue中的mappedfiles,循环处理清理逻辑,循环
mfslength-1
次,也就是无论如何都会保留最新的mappedfile - 如果consumequeue的mappedfile最后一个存储单元对应消息在commitlog中的偏移量小于commitlog的最小偏移量,说明当前mappedfile所有存储单元对应所有commitlog的消息都已经被清理,因此调用
mappedfile#destroy
清理当前mappedfile - 删除mappedfilequeue缓存的mappedfiles列表中已经被清理mappedfile
// org.apache.rocketmq.store.mappedfilequeue#deleteexpiredfilebyoffset public int deleteexpiredfilebyoffset(long offset, int unitsize) { // 复制一份mappedfiles object[] mfs = this.copymappedfiles(0); listfiles = new arraylist (); int deletecount = 0; if (null != mfs) { int mfslength = mfs.length - 1; for (int i = 0; i < mfslength; i ) { boolean destroy; mappedfile mappedfile = (mappedfile) mfs[i]; // 取consumequeue最后一条消息buffer切片 selectmappedbufferresult result = mappedfile.selectmappedbuffer(this.mappedfilesize - unitsize); if (result != null) { // consumequeue最后一个存储单元消息在commitlog的偏移量 long maxoffsetinlogicqueue = result.getbytebuffer().getlong(); result.release(); // 如果consumequeue最后一条消息已经小于commitlog的最小offset,则说明要删除了 destroy = maxoffsetinlogicqueue < offset; if (destroy) { log.info("physic min offset " offset ", logics in current mappedfile max offset " maxoffsetinlogicqueue ", delete it"); } } // 删除consumequeue的mappedfile if (destroy && mappedfile.destroy(1000 * 60)) { files.add(mappedfile); deletecount ; } else { break; } } } // 删除mappedfilequeue的mappedfiles列表中已经删除的mappedfile deleteexpiredfile(files); return deletecount; }
indexfile清理
indexfile清理逻辑与consumequeue类似,都是删除文件中关联的commitlog消息全部被删除的文件。核心逻辑包括下面两个
- 获取indexfilelist中所有最大offset小于commitlog最小offset的indexfile
- 删除过期的indexfile
// org.apache.rocketmq.store.index.indexservice#deleteexpiredfile(long) public void deleteexpiredfile(long offset) { object[] files = null; try { // indexfilelist的第一个索引文件的最后一个offset long endphyoffset = this.indexfilelist.get(0).getendphyoffset(); if (endphyoffset < offset) { files = this.indexfilelist.toarray(); } } if (files != null) { listfilelist = new arraylist (); for (int i = 0; i < (files.length - 1); i ) { indexfile f = (indexfile) files[i]; // indexfile中最大的offset小于commitlog最小offset,说明文件可以被删除 if (f.getendphyoffset() < offset) { filelist.add(f); } else { break; } } // 删除过期的indexfile,并将其从indexfilelist缓存中删除 this.deleteexpiredfile(filelist); } }
总结
broker消息清理机制由defaultmessagestore负责,commitlog、consumequeue和indexfile的清理都是按照文件颗粒度进行。
每10s检查一次,通常情况下每天凌晨4点删除超过72小时的commitlog;如果commitlog所在磁盘分区的磁盘占用率超过75%,则会触发commitlog文件清理;如果commitlog所在磁盘分区的磁盘占用率超过85%,则会强制删除commitlog文件;
如果consumequeue和indexfile关联commitlog都被删除,consumequeue文件和indexfile也会被清理。
以上就是rocketmq | 源码分析】broker过期消息清理机制的详细内容,更多关于rocketmq | 源码分析】broker过期消息清理机制的资料请关注其它相关文章!