rocketmq消息存储文件的加载与恢复机制源码分析-kb88凯时官网登录

来自:网络
时间:2023-05-17
阅读:
免费资源网 - https://freexyz.cn/
目录

前言

前面文章我们介绍了broker是如何将消息全量存储到commitlog文件中,并异步生成dispatchrequest任务更新consumequeue,indexfile的过程以及consumequeue和indexfile的文件结构。由于是异步转发消息,就可能出现消息成功存储到commitlog文件,转发请求任务执行失败,broker宕机了,此时commitlog和index消息并未处理完,导致commitlog与consumequeue和indexfile文件中的数据不一致。如果由一部分消息在commitlog中存在,在consumequeue中不存在,那么这部分消息consumer将永远无法消费到了,那么broker是如何保证数据一致性的呢?

storecheckpoint介绍

storecheckpoint的作用是记录commitlog,consumequeue和indexfile的刷盘点,当broker异常结束时会根据storecheckpoint的数据恢复,storecheckpoint属性如下

public class storecheckpoint {
    // commitlog最后一条信息的刷盘时间戳
    private volatile long physicmsgtimestamp = 0;
    // consumequeue最后一个存储单元刷盘时间戳
    private volatile long logicsmsgtimestamp = 0;
    // 最近一个已经写完indexfile的最后一条记录刷盘时间戳
    private volatile long indexmsgtimestamp = 0;
}

storecheckpoint文件的存储位置是${user.home}/store/checkpoint,文件的固定长度为4k,但storecheckpoint只占用了前24个字节,存储格式如下图所示

storecheckpoint时间戳更新时机

physicmsgtimestamp

flushrealtimeservice刷盘时更新

// org.apache.rocketmq.store.commitlog.flushrealtimeservice#run
public void run() {
  // ...
  // 更新commitlog刷盘时间戳
  if (storetimestamp > 0) { 					       commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp);
  }
}

groupcommitservice刷盘时更新

// org.apache.rocketmq.store.commitlog.groupcommitservice#docommit
private void docommit() {
  // ...
  // 更新commitlog刷盘时间戳
  if (storetimestamp > 0) {
commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp);
  }
}

logicsmsgtimestamp

consumequeue保存消息存储单元时更新

// org.apache.rocketmq.store.consumequeue#putmessagepositioninfowrapper
public void putmessagepositioninfowrapper(dispatchrequest request, boolean multiqueue) {
  // ...
  // 如果consumequeue保存成功,则更新consumequeue存储点信息
  if (result) {
this.defaultmessagestore.getstorecheckpoint().setlogicsmsgtimestamp(request.getstoretimestamp());
  }
}

consumequeue刷盘时更新并触发storecheckpoint刷盘

// org.apache.rocketmq.store.defaultmessagestore.flushconsumequeueservice#doflush
private void doflush(int retrytimes) {
  // ...
  // 更新consumequeue存储时间戳,并刷盘
  if (0 == flushconsumequeueleastpages) {
    if (logicsmsgtimestamp > 0) {
  defaultmessagestore.this.getstorecheckpoint().setlogicsmsgtimestamp(logicsmsgtimestamp);
    }
    // 更新存储点
    defaultmessagestore.this.getstorecheckpoint().flush();
  }
}

indexmsgtimestamp

// org.apache.rocketmq.store.index.indexservice#getandcreatelastindexfile
public indexfile getandcreatelastindexfile() {
  // 获取最新indexfile,如果indexfile已经满了,需要创建一个新的indexfile
  if (indexfile == null) {
          indexfile =
              new indexfile(filename, this.hashslotnum, this.indexnum, lastupdateendphyoffset,
                  lastupdateindextimestamp);
			// 如果创建新的indexfile成功,原indexfile刷盘
      if (indexfile != null) {
          final indexfile flushthisfile = previndexfile;
          thread flushthread = new thread(new runnable() {
              @override
              public void run() {
                	// indexfile刷盘
                  indexservice.this.flush(flushthisfile);
              }
          }, "flushindexfilethread");
          flushthread.setdaemon(true);
          flushthread.start();
      }
  }
  return indexfile;
}
// org.apache.rocketmq.store.index.indexservice#flush
public void flush(final indexfile f) {
    if (null == f)
        return;
    long indexmsgtimestamp = 0;
    if (f.iswritefull()) {
        indexmsgtimestamp = f.getendtimestamp();
    }
    f.flush();
    if (indexmsgtimestamp > 0) {
        // 更新checkpoint的indexmsgtimestamp并触发刷盘
        this.defaultmessagestore.getstorecheckpoint().setindexmsgtimestamp(indexmsgtimestamp);
        this.defaultmessagestore.getstorecheckpoint().flush();
    }
}
  • 保存消息index,获取最新的indexfile如果满了,则会创建一个新的indexfile,并且更新indexmsgtimestamp并触发storecheckpoint刷盘

storecheckpoint刷盘源码

storecheckpoint刷盘源码如下所示,就是将commitlog,consumequeue和indexfile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机

  • consumequeue刷盘时触发
  • 创建新indexfile文件时触发

storecheckpoint刷盘源码如下

// org.apache.rocketmq.store.storecheckpoint#flush
public void flush() {
    this.mappedbytebuffer.putlong(0, this.physicmsgtimestamp);
    this.mappedbytebuffer.putlong(8, this.logicsmsgtimestamp);
    this.mappedbytebuffer.putlong(16, this.indexmsgtimestamp);
    this.mappedbytebuffer.force();
}

消息加载源码分析

在brokercontroller启动时会调用defaultmessagestore#load加载存储文件加载和恢复过程主要分为下面几步

  • 判断broker上次是否正常退出。这个判断逻辑是根据${user.home}/store/abort是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。
  • 加载commitlog
  • 加载consumequeue
  • 加载storecheckpoint
  • 加载indexfile
  • 恢复consumequeue与indexfile
  • 加载延迟队列服务
// org.apache.rocketmq.store.defaultmessagestore#load
public boolean load() {
    boolean result = true;
    try {
      	// 1. broker上次是否正常退出	
        boolean lastexitok = !this.istempfileexist();
        log.info("last shutdown {}", lastexitok ? "normally" : "abnormally");
        // 2. 加载commitlog
        result = result && this.commitlog.load();
				// 3. 加载consumequeue
        result = result && this.loadconsumequeue();
        if (result) {
            // 4. 加载storecheckpoint
            this.storecheckpoint =
                new storecheckpoint(storepathconfighelper.getstorecheckpoint(this.messagestoreconfig.getstorepathrootdir()));
            // 5. 加载indexfile
            this.indexservice.load(lastexitok);
            // 6. 恢复consumequeue与indexfile
            this.recover(lastexitok);
						// 7. 延迟队列服务加载
            if (null != schedulemessageservice) {
                result =  this.schedulemessageservice.load();
            }
        }
    } 
    return result;
}

commitlog加载

前面文章介绍过,commitlog文件的存储目录是${user.home}/store/commitlog/,并且commitlog文件的底层是mappedfile,由mappedfilequeue管理。

commitlog文件的加载其实调用的是mappedfilequeue#load方法,代码如下所示,load()中首先加载commitlog文件目录下的所有文件,并调用doload()方法加载commitlog。

// org.apache.rocketmq.store.mappedfilequeue#load
public boolean load() {
    file dir = new file(this.storepath/*${user.home}/store/commitlog/*/);
    file[] ls = dir.listfiles();
    if (ls != null) {
        return doload(arrays.aslist(ls));
    }
    return true;
}

mappedfile的加载过程如下所示,核心逻辑主要分为下面三步

  • 按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中
  • 校验文件大小与mappedfile是否一致,如果commitlog文件大小与mappedfilesize不一致,则说明配置被改了,或者commitlog文件被修改
  • 创建mappedfile,并且设置wroteposition,flushedposition,committedposition为mappedfilesize
public boolean doload(list files) {
    // 按照文件名称排序
    files.sort(comparator.comparing(file::getname));
    for (file file : files) {
      	// 如果commitlog文件大小与mappedfilesize不一致,则说明配置被改了,或者commitlog文件被修改
        if (file.length() != this.mappedfilesize) {
            return false;
        }
        try {
          	// 创建mappedfile
            mappedfile mappedfile = new mappedfile(file.getpath(), mappedfilesize);
            mappedfile.setwroteposition(this.mappedfilesize);
            mappedfile.setflushedposition(this.mappedfilesize);
            mappedfile.setcommittedposition(this.mappedfilesize);
            this.mappedfiles.add(mappedfile);
        } 
    }
    return true;
}

看到这里肯定会有疑问,加载后的mappedfile的wroteposition,flushedposition和committedposition的值都为mappedfilesize,如果最后一个mappedfile没有使用完,broker启动后还会从最后一个mappedfile开始写么?我们可以在后面消息文件恢复源码分析找到答案。

consumequeue加载

从前面文章我们知道,consumequeue文件底层其实也是mappedfile,因此consumequeue文件的加载与commitlog加载差别不大。consumequeue加载逻辑为

  • 获取consumequeue目录下存储的所有topic目录,遍历topic目录
  • 遍历每个topic目录下的所有queueid目录,逐个加载ququeid中的所有mappedfile
// org.apache.rocketmq.store.defaultmessagestore#loadconsumequeue
private boolean loadconsumequeue() {
  // 获取consumequeue目录
  file dirlogic = new file(storepathconfighelper.getstorepathconsumequeue(this.messagestoreconfig.getstorepathrootdir()/*${user.home}/store */));
  // topic文件夹数组
  file[] filetopiclist = dirlogic.listfiles();
  if (filetopiclist != null) {
      // 遍历topic	
      for (file filetopic : filetopiclist) {
          // 获取topic名称
          string topic = filetopic.getname();
          // 获取queueid文件夹数组
          file[] filequeueidlist = filetopic.listfiles();
          // 遍历queueid
          if (filequeueidlist != null) {
              for (file filequeueid : filequeueidlist) {
                  int queueid;
                  // 文件夹名称就是queueid
                  queueid = integer.parseint(filequeueid.getname());
                  // 构建consumequeue
                  consumequeue logic = new consumequeue(/* ... */);
                  this.putconsumequeue(topic, queueid, logic);
                  // consumequeue加载
                  if (!logic.load()) {
                      return false;
                  }
              }
          }
      }
  }
  return true;
}

indexfile加载

indexfile文件加载过程调用的是indexservice#load,首先获取${user.home}/store/index目录下的所有文件,遍历所有文件,如果indexfile最后存储时间大于storecheckpoint中indexmsgtimestamp,则会先删除indexfile

// org.apache.rocketmq.store.index.indexservice#load
public boolean load(final boolean lastexitok) {
    // indexfile文件目录
    file dir = new file(this.storepath);
    // indexfile文件列表
    file[] files = dir.listfiles();
    if (files != null) {
        // 文件排序
        arrays.sort(files);
        for (file file : files) {
            try {
                indexfile f = new indexfile(file.getpath(), this.hashslotnum, this.indexnum, 0, 0);
                f.load();
                if (!lastexitok) {
                    // 文件最后存储时间戳大于刷盘点,则摧毁indexfile,重建
                    if (f.getendtimestamp() > this.defaultmessagestore.getstorecheckpoint()/*存储点时间*/
                        .getindexmsgtimestamp()) {
                        f.destroy(0);
                        continue;
                    }
                }
                this.indexfilelist.add(f);
            } 
        }
    }
    return true;
}

consumequeue与indexfile恢复

如果是正常退出,数据都已经正常刷盘,前面我们说到commitlog在加载时的wroteposition,flushedposition,committedposition都设置为mappedfilesize,

因此即使是正常退出,也会调用commitlog#recovernormally找到最后一条消息的位置,更新这三个属性。

// org.apache.rocketmq.store.defaultmessagestore#recover
private void recover(final boolean lastexitok) {
    // consumequeue中最大物理偏移量
    long maxphyoffsetofconsumequeue = this.recoverconsumequeue();
    if (lastexitok) {
        // 正常退出文件恢复
        this.commitlog.recovernormally(maxphyoffsetofconsumequeue);
    } else {
        // 异常退出文件恢复
        this.commitlog.recoverabnormally(maxphyoffsetofconsumequeue);
    }
    // 恢复topicqueuetable
    this.recovertopicqueuetable();
}

正常恢复的源码如下,由于broker是正常关闭,因此commitlog,consumequeue与indexfile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新commitlog的mappedfilequeue中的刷盘点(flushwhere)和提交点(committedwhere),

  • 从最后3个mappedfile开始恢复,如果mappedfile总数不足3个,则从第0个mappedfile开始恢复
  • 逐个遍历mappedfile,找到每个mappedfile的最后一条消息的偏移量,并将其更新到commitlog中mappedfilequeue的刷盘点和提交点中
  • 清除consumequeue冗余数据
    public void recovernormally(long maxphyoffsetofconsumequeue) {
        // 确认消息是否完整,默认是true
        boolean checkcrconrecover = this.defaultmessagestore.getmessagestoreconfig().ischeckcrconrecover();
        final list mappedfiles = this.mappedfilequeue.getmappedfiles();
        if (!mappedfiles.isempty()) {
            // 默认从最后3个mappedfile开始恢复
            int index = mappedfiles.size() - 3;
            // 如果commitlog不足三个,则从第一个文件开始恢复
            if (index < 0)
                index = 0;
            mappedfile mappedfile = mappedfiles.get(index);
            bytebuffer bytebuffer = mappedfile.slicebytebuffer();
            // 最后一个mappedfile的文件起始偏移量
            long processoffset = mappedfile.getfilefromoffset();
            // mappedfileoffset偏移量
            long mappedfileoffset = 0;
            // 遍历commitlog文件
            while (true) {
                // 校验消息完整性
                dispatchrequest dispatchrequest = this.checkmessageandreturnsize(bytebuffer, checkcrconrecover);
                // 获取消息size
                int size = dispatchrequest.getmsgsize();
                // 返回结果为true并且消息size>0,说明消息是完整的
                if (dispatchrequest.issuccess() && size > 0) {
                    mappedfileoffset  = size;
                }
            }
            // 最大物理偏移量
            processoffset  = mappedfileoffset;
            // 更新flushedwhere和committedposition指针
            this.mappedfilequeue.setflushedwhere(processoffset);
            this.mappedfilequeue.setcommittedwhere(processoffset);
            this.mappedfilequeue.truncatedirtyfiles(processoffset);
            // 清除consumequeue冗余数据
            if (maxphyoffsetofconsumequeue >= processoffset) {
                this.defaultmessagestore.truncatedirtylogicfiles(processoffset/*commitlog最大物理偏移量*/);
            }
        } 
    }

异常恢复源码如下,由于上次broker没有正常关闭,因此由可能存在commitlog、consumequeue与indexfile不一致的情况,因此在异常恢复时可能需要恢复consumequeue和indexfile,异常恢复核心逻辑主要包括

  • 倒序查commitlog的mappedfile文件,找到第一条消息存储的时间戳比storecheckpoint里的physicmsgtimestamp,logicsmsgtimestamp和indexmsgtimestamp三者都小的最大mappedfile,该mappedfile至少有一部分消息是被正常转发,正常存储,正常刷盘的
  • 从该mappedfile开始逐条转发消息,重新恢复consumequeue和indexfile
  • 当遍历到最后一条消息,将其偏移量更新到commitlog中mappedfilequeue的刷盘点和提交点中
  • 清除consumequeue冗余数据
// org.apache.rocketmq.store.commitlog#recoverabnormally
public void recoverabnormally(long maxphyoffsetofconsumequeue) {
    // 是否crc校验
    boolean checkcrconrecover = this.defaultmessagestore.getmessagestoreconfig().ischeckcrconrecover();
    final list mappedfiles = this.mappedfilequeue.getmappedfiles();
    if (!mappedfiles.isempty()) {
        // 最后一个mappedfile的index
        int index = mappedfiles.size() - 1;
        mappedfile mappedfile = null;
        // 倒序遍历mappedfile数组,
        for (; index >= 0; index--) {
            mappedfile = mappedfiles.get(index);
            // 1. 如果第一条消息的时间戳小于存储点时间戳
            if (this.ismappedfilematchedrecover(mappedfile)) {
                break;
            }
        }
        long processoffset = mappedfile.getfilefromoffset();
        long mappedfileoffset = 0;
        while (true) {
            dispatchrequest dispatchrequest = this.checkmessageandreturnsize(bytebuffer, checkcrconrecover);
            int size = dispatchrequest.getmsgsize();
            if (dispatchrequest.issuccess()) {
                if (size > 0) {
                    mappedfileoffset  = size;
                    // 2. 转发消息
                    if (this.defaultmessagestore.getmessagestoreconfig().isduplicationenable()/*消息是否可以重复,默认是false*/) {
                        if (dispatchrequest.getcommitlogoffset() < this.defaultmessagestore.getconfirmoffset()) {
                            this.defaultmessagestore.dodispatch(dispatchrequest);
                        }
                    } else {
                        this.defaultmessagestore.dodispatch(dispatchrequest);
                    }
                }
        }
				// 3. 更新mappedfilequeue中的刷盘位置和提交位置
        processoffset  = mappedfileoffset;
        this.mappedfilequeue.setflushedwhere(processoffset);
        this.mappedfilequeue.setcommittedwhere(processoffset);
        this.mappedfilequeue.truncatedirtyfiles(processoffset);
        // 清除consumequeue中的冗余数据
        if (maxphyoffsetofconsumequeue >= processoffset) {
            this.defaultmessagestore.truncatedirtylogicfiles(processoffset);
        }
    }
}

总结

broker启动时会分别加载commitlog、consumequeue与indexfile。加载完成后,如果broker上次是正常退出,只需要找到commitlog的最后一条消息,并更新刷盘点和提交点。如果broker上次是异常退出,就有可能出现consumequeue、indexfile与commitlog不一致的情况,需要根据storecheckpoint存储的时间戳从commitlog找到消息,逐条恢复consumequeue与indexfile。

以上就是rocketmq | 源码分析】消息存储文件的加载与恢复机制的详细内容,更多关于rocketmq 消息存储文件加载恢复的资料请关注其它相关文章!

免费资源网 - https://freexyz.cn/
返回顶部
顶部
网站地图