目录
代码路径:/lib/bloomfilter
概述
victoriametrics的vmstorage
组件会接收上游传递过来的指标,在现实场景中,指标或瞬时指标的数量级可能会非常恐怖,如果不限制缓存的大小,有可能会由于cache miss而导致出现过高的。
为此,vmstorage提供了两个参数:maxhourlyseries
和maxdailyseries
,用于限制每小时/每天添加到缓存的唯一序列。
唯一序列指表示唯一的时间序列,如metrics{label1="value1",label2="value2"}属于一个时间序列,但多条不同值的metrics{label1="value1",label2="value2"}属于同一条时间序列。victoriametrics使用如下方式来获取时序的唯一标识:
func getlabelshash(labels []prompbmarshal.label) uint64 { bb := labelshashbufpool.get() b := bb.b[:0] for _, label := range labels { b = append(b, label.name...) b = append(b, label.value...) } h := xxhash.sum64(b) bb.b = b labelshashbufpool.put(bb) return h }
限速器的初始化
victoriametrics使用了一个类似限速器的概念,限制每小时/每天新增的唯一序列,但与普通的限速器不同的是,它需要在序列级别进行限制,即判断某个序列是否是新的唯一序列,如果是,则需要进一步判断一段时间内缓存中新的时序数目是否超过限制,而不是简单地在请求层面进行限制。
hourlyserieslimiter = bloomfilter.newlimiter(*maxhourlyseries, time.hour) dailyserieslimiter = bloomfilter.newlimiter(*maxdailyseries, 24*time.hour)
下面是新建限速器的函数,传入一个最大(序列)值,以及一个刷新时间。该函数中会:
- 初始化一个限速器,限速器的最大元素个数为
maxitems
- 则启用了一个goroutine,当时间达到
refreshinterval
时会重置限速器
func newlimiter(maxitems int, refreshinterval time.duration) *limiter { l := &limiter{ maxitems: maxitems, stopch: make(chan struct{}), } l.v.store(newlimiter(maxitems)) //1 l.wg.add(1) go func() { defer l.wg.done() t := time.newticker(refreshinterval) defer t.stop() for { select { case <-t.c: l.v.store(newlimiter(maxitems))//2 case <-l.stopch: return } } }() return l }
限速器只有一个核心函数add
,当vmstorage接收到一个指标之后,会(通过getlabelshash
计算该指标的唯一标识(h),然后调用下面的add
函数来判断该唯一标识是否存在于缓存中。
如果当前存储的元素个数大于等于允许的最大元素,则通过过滤器判断缓存中是否已经存在该元素;否则将该元素直接加入过滤器中,后续允许将该元素加入到缓存中。
func (l *limiter) add(h uint64) bool { lm := l.v.load().(*limiter) return lm.add(h) } func (l *limiter) add(h uint64) bool { currentitems := atomic.loaduint64(&l.currentitems) if currentitems >= uint64(l.f.maxitems) { return l.f.has(h) } if l.f.add(h) { atomic.adduint64(&l.currentitems, 1) } return true }
上面的过滤器采用的是布隆过滤器,核心函数为has
和add
,分别用于判断某个元素是否存在于过滤器中,以及将元素添加到布隆过滤器中。
过滤器的初始化函数如下,bitsperitem
是个常量,值为16。bitscount
统计了过滤器中的总bit数,每个bit表示某个值的存在性。bits
以64bit为单位的(后续称之为slot,目的是为了在bitscount中快速检索目标bit)。计算bits
时加上63
的原因是为了四舍五入向上取值,比如当maxitems=1时至少需要1个unit64的slot。
func newfilter(maxitems int) *filter { bitscount := maxitems * bitsperitem bits := make([]uint64, (bitscount 63)/64) return &filter{ maxitems: maxitems, bits: bits, } }
为什么bitsperitem
为16?给出了如何计算布隆过滤器的大小。在本代码中,k为4(hashescount
),期望的漏失率为0.003(可以从官方的filter_test.go
中看出),则要求总存储和总元素的比例为15,为了方便检索slot(64bit,为16的倍数),将之设置为16。
if p > 0.003 { t.fatalf("too big false hits share for maxitems=%d: %.5f, falsehits: %d", maxitems, p, falsehits) }
下面是过滤器的add
操作,目的是在过滤器中添加某个元素。add
函数中没有使用多个哈希函数来计算元素的哈希值,转而改变同一个元素的值,然后对相应的值应用相同的哈希函数,元素改变的次数受hashescount
的限制。
- 获取过滤器的完整存储,并转换为以bit单位
- 将元素
h
转换为byte数组,便于xxhash.sum64计算 - 后续将执行hashescount次哈希,降低漏失率
- 计算元素h的哈希
- 递增元素
h
,为下一次哈希做准备 - 取余法获取元素的bit范围
- 获取元素所在的slot(即uint64大小的bit范围)
- 获取元素所在的slot中的bit位,该位为1表示该元素存在,为0表示该元素不存在
- 获取元素所在bit位的掩码
- 加载元素所在的slot的数值
- 如果
w & mask
结果为0,说明该元素不存在, - 将元素所在的slot(
w
)中的元素所在的bit位(mask)置为1,表示添加了该元素 - 由于
add
函数可以并发访问,因此bits[i]
有可能被其他操作修改,因此需要通过重新加载(14)并通过循环来在bits[i]
中设置该元素的存在性
func (f *filter) add(h uint64) bool { bits := f.bits maxbits := uint64(len(bits)) * 64 //1 bp := (*[8]byte)(unsafe.pointer(&h))//2 b := bp[:] isnew := false for i := 0; i < hashescount; i {//3 hi := xxhash.sum64(b)//4 h //5 idx := hi % maxbits //6 i := idx / 64 //7 j := idx % 64 //8 mask := uint64(1) << j //9 w := atomic.loaduint64(&bits[i])//10 for (w & mask) == 0 {//11 wnew := w | mask //12 if atomic.compareandswapuint64(&bits[i], w, wnew) {//13 isnew = true//14 break } w = atomic.loaduint64(&bits[i])//14 } } return isnew }
看懂了add
函数,has
就相当简单了,它只是add
函数的缩减版,无需设置bits[i]
:
func (f *filter) has(h uint64) bool { bits := f.bits maxbits := uint64(len(bits)) * 64 bp := (*[8]byte)(unsafe.pointer(&h)) b := bp[:] for i := 0; i < hashescount; i { hi := xxhash.sum64(b) h idx := hi % maxbits i := idx / 64 j := idx % 64 mask := uint64(1) << j w := atomic.loaduint64(&bits[i]) if (w & mask) == 0 { return false } } return true }
总结
由于victoriametrics的过滤器采用的是布隆过滤器,因此它的限速并不精准,在源码条件下, 大约有3%的偏差。但同样地,由于采用了布隆过滤器,降低了所需的内存以及相关计算资源。此外victoriametrics的过滤器实现了并发访问。
在大流量场景中,如果需要对请求进行相对精准的过滤,可以考虑使用布隆过滤器,降低所需要的资源,但前提是过滤的结果能够忍受一定程度的漏失率。
以上就是victoriametrics库布隆过滤器初始化及使用详解的详细内容,更多关于victoriametrics库布隆过滤器的资料请关注其它相关文章!