rocketmq源码分析之broker过期消息清理机制-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

前言

前面文章讲了消息是如何保存的以及consumequeue与index文件更新机制。随着消息的增加,broker不可能一直保存所有消息,broker是按照什么规则清理消息的呢?被消费过后的消息就会被清理掉吗?下面我们来介绍broker消息清理机制。

broker消息清理机制简介

消息是被顺序存储在commitlog文件中的,且消息长度不定长,因此消息的清理不是以消息为单位进行的,而是以commitlog为单位进行的。默认情况下,broker会清理单个commitlog文件中最后一条消息超过72小时的commitlog文件,除了用户手动清理为,下面几种情况会被默认清理。

  • commitlog清理机制

    • commitlog文件过期(72小时),且达到清理时间点(默认为04:00~05:00),自动清理过期的commitlog文件

    • commitlog文件过期(72小时),且commitlog所在磁盘分区占用率已经达到过期清理警戒线(默认75%),无论是否到达清理时间点都会自动清理过期文件

    • commitlog所在磁盘分区占用率已经达到清理警戒线(默认85%),无论是否过期,都会从最早的文件开始清理,一次最多清理10个文件

    • commitlog所在磁盘分区占用率已经达到系统危险警戒线(默认90%),broker将拒绝消息写入

    • broker至少会保留最新的commitlog文件

  • consumequeue清理机制

    • 如果consumequeue文件关联commitlog都被清理,则清理此consumequeue文件
    • broker每个topic-queueid至少会保留最新的文件
  • indexfile清理机制

    • 如果indexfile所有索引单元关联commitlog都被清理,则清理此indexfile

broker与消息清理相关配置

# 文件自动清理时间,单位h,默认72
filereservedtime=72
# commitlog物理文件删除间隔,但是ms,默认100
deletecommitlogfilesinterval = 100
# 文件自动清理时间,默认04,即凌晨4点
deletewhen = "04"
# 硬盘占用率所在分区过期清理警戒线,超过这个值,无论是否到达清理时间,都会自动清理过期文件
diskmaxusedspaceratio = 75

消息清理机制源码分析

消息定时清理的是由defaultmessagestore类负责的,它在启动时(start)会调用defaultmessagestore#addscheduletask添加和消息存储相关的定时任务,其中就包括消息删除相关的定时任务defaultmessagestore.this.cleanfilesperiodically(),这个定时任务在broker启动后60s开始,每隔10秒执行一次

// org.apache.rocketmq.store.defaultmessagestore#addscheduletask
private void addscheduletask() {
    this.scheduledexecutorservice.scheduleatfixedrate(new runnable() {
        @override
        public void run() {
            // commitlog、consumequeue和indexfile定时删除
            defaultmessagestore.this.cleanfilesperiodically();
        }
    }, 1000 * 60, this.messagestoreconfig.getcleanresourceinterval()/*10s*/, timeunit.milliseconds);
  	// ...
}

在cleanfilesperiodically()中有两个方法,cleancommitlogservice.run()负责清理commitlog,cleanconsumequeueservice.run()负责清理consumequeue和indexfile。

// org.apache.rocketmq.store.defaultmessagestore#cleanfilesperiodically
private void cleanfilesperiodically() {
    // 清理commitlog
    this.cleancommitlogservice.run();
    // 清理consumequeue和indexfile
    this.cleanconsumequeueservice.run();
}

commitlog清理源码分析

commitlog清理方法cleancommitlogservice#run调用了cleancommitlogservice#deleteexpiredfiles,deleteexpiredfiles方法的核心代码逻辑如下,以下三种情况会触发commitlog文件的删除

  • 当前时间是凌晨4点
  • commitlog所在磁盘分区硬盘占用率超过75%
  • 手动删除commitlog
// org.apache.rocketmq.store.defaultmessagestore.cleancommitlogservice#deleteexpiredfiles
private void deleteexpiredfiles() {
    // 是否是凌晨4点,用小时匹配[04:00,05:00)
    boolean timeup = this.istimetodelete();
    // >75%就会返回true,如果大于85%,则触发强制删除
    boolean spacefull = this.isspacetodelete();
    // 手动删除次数是否>0
    boolean manualdelete = this.manualdeletefileseveraltimes > 0;
    if (timeup/*凌晨4点*/ || spacefull/*空间满了*/ || manualdelete/*手动删除*/) {
        boolean cleanatonce = defaultmessagestore.this.getmessagestoreconfig().iscleanfileforciblyenable()/*默认true*/ && this.cleanimmediately/*空间占用超过85%,触发强制删除*/;
				// 删除commitlog
        deletecount = defaultmessagestore.this.commitlog.deleteexpiredfile(filereservedtime, deletephysicfilesinterval,
            destroymapedfileintervalforcibly, cleanatonce);
        if (deletecount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

commitlog的清理逻辑在mappedfilequeue#deleteexpiredfilebytime,其核心代码如下所示,主要分为下面几个步骤

  • 复制mappedfilequeue中的mappedfiles,循环处理删除逻辑,循环mfslength-1次,也就是无论如何都会保留最新的mappedfile
  • 如果mappedfile的最后修改时间超过72小时或commitlog所在磁盘分区硬盘占用率超过85%触发强制删除mappedfile,则会删除mappedfile,每次删除最多删除10个mappedfile,相邻mappedfile删除时间间隔默认100ms
  • 删除mappedfilequeue的mappedfiles数组中已删除的mappedfile,并返回删除mappedfile的数量
// org.apache.rocketmq.store.mappedfilequeue#deleteexpiredfilebytime
public int deleteexpiredfilebytime(final long expiredtime,final int deletefilesinterval,final long intervalforcibly,final boolean cleanimmediately) {
    // 复制一份当前mappedfile
    object[] mfs = this.copymappedfiles(0);
		// 会保留最后一个mappedfile
    int mfslength = mfs.length - 1;
    int deletecount = 0;
    list files = new arraylist();
    if (null != mfs) {
        for (int i = 0; i < mfslength; i  ) {
            mappedfile mappedfile = (mappedfile) mfs[i];
            // 最后修改时间 过期时间
            long livemaxtimestamp = mappedfile.getlastmodifiedtimestamp()   expiredtime;
            // 如果commitlog所在磁盘分区总容量超过85%,触发立即删除,或者超过了72小时的mappedfile
            if (system.currenttimemillis() >= livemaxtimestamp || cleanimmediately) {
                // 删除mappedfile
                if (mappedfile.destroy(intervalforcibly)) {
                    files.add(mappedfile);
                    deletecount  ;
                    // 一次最多删除10个mappedfile
                    if (files.size() >= delete_files_batch_max/*10*/) {
                        break;
                    }
                    if (deletefilesinterval > 0 && (i   1) < mfslength) {
                        try {
                            // 删除文件时间间隔,默认100ms
                            thread.sleep(deletefilesinterval);
                        } catch (interruptedexception e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }
    // 从mappedfilequeue的mappedfiles中删除这个mappedfile
    deleteexpiredfile(files);
    return deletecount;
}

consumequeue和indexfile清理源码分析

consumequeue和indexfile清理方法cleanconsumequeueservice#run调用了cleanconsumequeueservice#deleteexpiredfiles方法清理consumequeue和indexfile。cleanconsumequeueservice#deleteexpiredfiles核心代码如下,包括两个主要逻辑

  • 遍历consumequeuetable中的consumequeue,调用consumequeue#deleteexpiredfile删除过期consumequeue
  • 调用indexservice#deleteexpiredfile删除过期indexfile
// org.apache.rocketmq.store.defaultmessagestore.cleanconsumequeueservice#deleteexpiredfiles
private void deleteexpiredfiles() {
    if (minoffset > this.lastphysicalminoffset) {
        concurrentmap> tables = defaultmessagestore.this.consumequeuetable;
				// 遍历consumequeue
        for (concurrentmap maps : tables.values()) {
            for (consumequeue logic : maps.values()) {
                // 删除consumequeue
                int deletecount = logic.deleteexpiredfile(minoffset);
								// ... 间隔100ms
            }
        }
				// 删除indexfile
        defaultmessagestore.this.indexservice.deleteexpiredfile(minoffset);
    }
}

consumequeue文件清理

consumequeue文件底层也是mappedfile,清理consumequeue调用mappedfilequeue#deleteexpiredfilebyoffset清理consumequeue的过期mappedfile,源码如下,核心逻辑

  • 复制mappedfilequeue中的mappedfiles,循环处理清理逻辑,循环mfslength-1次,也就是无论如何都会保留最新的mappedfile
  • 如果consumequeue的mappedfile最后一个存储单元对应消息在commitlog中的偏移量小于commitlog的最小偏移量,说明当前mappedfile所有存储单元对应所有commitlog的消息都已经被清理,因此调用mappedfile#destroy清理当前mappedfile
  • 删除mappedfilequeue缓存的mappedfiles列表中已经被清理mappedfile
// org.apache.rocketmq.store.mappedfilequeue#deleteexpiredfilebyoffset
public int deleteexpiredfilebyoffset(long offset, int unitsize) {
    // 复制一份mappedfiles
    object[] mfs = this.copymappedfiles(0);
    list files = new arraylist();
    int deletecount = 0;
    if (null != mfs) {
        int mfslength = mfs.length - 1;
        for (int i = 0; i < mfslength; i  ) {
            boolean destroy;
            mappedfile mappedfile = (mappedfile) mfs[i];
            // 取consumequeue最后一条消息buffer切片
            selectmappedbufferresult result = mappedfile.selectmappedbuffer(this.mappedfilesize - unitsize);
            if (result != null) {
                // consumequeue最后一个存储单元消息在commitlog的偏移量
                long maxoffsetinlogicqueue = result.getbytebuffer().getlong();
                result.release();
                // 如果consumequeue最后一条消息已经小于commitlog的最小offset,则说明要删除了
                destroy = maxoffsetinlogicqueue < offset;
                if (destroy) {
                    log.info("physic min offset "   offset   ", logics in current mappedfile max offset "
                          maxoffsetinlogicqueue   ", delete it");
                }
            } 
            // 删除consumequeue的mappedfile
            if (destroy && mappedfile.destroy(1000 * 60)) {
                files.add(mappedfile);
                deletecount  ;
            } else {
                break;
            }
        }
    }
    // 删除mappedfilequeue的mappedfiles列表中已经删除的mappedfile
    deleteexpiredfile(files);
    return deletecount;
}

indexfile清理

indexfile清理逻辑与consumequeue类似,都是删除文件中关联的commitlog消息全部被删除的文件。核心逻辑包括下面两个

  • 获取indexfilelist中所有最大offset小于commitlog最小offset的indexfile
  • 删除过期的indexfile
// org.apache.rocketmq.store.index.indexservice#deleteexpiredfile(long)
public void deleteexpiredfile(long offset) {
    object[] files = null;
    try {
        // indexfilelist的第一个索引文件的最后一个offset
        long endphyoffset = this.indexfilelist.get(0).getendphyoffset();
        if (endphyoffset < offset) {
            files = this.indexfilelist.toarray();
        }
    } 
    if (files != null) {
        list filelist = new arraylist();
        for (int i = 0; i < (files.length - 1); i  ) {
            indexfile f = (indexfile) files[i];
          	// indexfile中最大的offset小于commitlog最小offset,说明文件可以被删除
            if (f.getendphyoffset() < offset) {
                filelist.add(f);
            } else {
                break;
            }
        }
        // 删除过期的indexfile,并将其从indexfilelist缓存中删除
        this.deleteexpiredfile(filelist);
    }
}

总结

broker消息清理机制由defaultmessagestore负责,commitlog、consumequeue和indexfile的清理都是按照文件颗粒度进行。

每10s检查一次,通常情况下每天凌晨4点删除超过72小时的commitlog;如果commitlog所在磁盘分区的磁盘占用率超过75%,则会触发commitlog文件清理;如果commitlog所在磁盘分区的磁盘占用率超过85%,则会强制删除commitlog文件;

如果consumequeue和indexfile关联commitlog都被删除,consumequeue文件和indexfile也会被清理。

以上就是rocketmq | 源码分析】broker过期消息清理机制的详细内容,更多关于rocketmq | 源码分析】broker过期消息清理机制的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图