3. 原理分析
延迟消息发送到broker之后,延迟时间会作为消息属性跟消息一起持久化到bookie,这样重启broker的时候延迟消息能够重新加载回来。
延迟消息的逻辑主要由DelayedDeliveryTracker来实现,DelayedDeliveryTracker会在内存中使用一个堆来维护索引结构:三元组(time, ledgerId, entryId),依次按照时间、ledgerId、entryId进行排序。broker收到延迟消息时,会将其加入到堆中,一旦经过特定的延迟时间就会传递给消费者,如下图所示:

这种设计会面临内存紧缺的问题,一个batch消息对应一个messageId(即ledgerId, entryId),一个三元组占用内存8*3 byte = 24byte,即每个batch要消耗24byte内存。由于每个shared/key-shared订阅都对应有独立的DelayedDeliveryTracker,因此多个shared/key-shared订阅会造成多份的内存消耗。
如果用户指定每条消息都延迟1天,生产消息速率为10000 msg/s,那么延迟消息条数为24*3600*10000=864000000(即8.64亿条延迟消息未到期),假设batch 大小为100,则有864000000/100=8640000个batch,那么单个订阅占用的内存为8640000*24/1024/1024MB =198MB,2个shared订阅则占用198*2 = 396MB。这个内存消耗是很大的,当集群里延迟消息的规模上来后,broker很有可能面临OOM的风险。
3.1 TimeBucket算法
为了解决这个问题,我们设计了一种新的数据存储结构,结合一个算法来成功将内存占用大小降低95+%,甚至可达99+%。这个重大优化已经合并进社区主仓库,感兴趣的读者可以浏览具体代码:
下面介绍其设计原理,我们使用了如下分层数据结构:
Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>();
为了避免Java包装器带来的内存开销,我们使用fastutil提供的高性能数据结构。
映射 timestamp -> ledgerId -> entryId,由于我们需要按时间顺序进行优先遍历,因此最外层的map是一个SortedMap,内层ledgerId -> entryId也是一个SortedMap,否则会引起消息乱序。
我们还有一个关键的算法:对时间戳进行截断,不妨将其命名为TimeBucket算法。
显然,不同场景对延迟消息是有不同的精度要求的,延迟1分钟的可能要求1秒以内,延迟1天的可能1分钟以内就可以了。利用用户对精度要求的信息,我们可以将记录的时间戳映射到时间桶。
比如说,如果用户允许延迟精度在1s以内,那么2**10 = 1024ms ≈ 1s,那么在加入一条entry的时候,我们就可以直接将timestamp的低10bit置为0,也就是说将1024个timestamp聚合成一个timestamp,代价是用户指定的延迟消息可能提前1s分发,但是能将空间占用降低很多倍,具体效果后面会进行分析。
显然,用户要求的精度越低,这里时间戳截断的位数就可以越高,那么就能节省更多的存储空间,用户可以根据需要调节delayedDeliveryTickTimeMillis。
最外层的map是对时间戳数据的压缩,内层map则是对ledger id信息的压缩,因为用户topic在一段时间内都是使用同一个ledger,内层使用了以ledgerId为key的map,那么对应于所有属于同一个ledger的延迟消息,只需要存储一个ledgerId就可以了,不需要每条entry都存储ledgerId,这也能节省不少空间。
最后,我们使用Roaring64Bitmap来存储entryId信息,entryId是长整数类型的,而且我们只需要记录某个entryId是否要延迟分发即可,即消耗一个bit位即可,那么使用BitMap数据结构是最合适的。
另外,entryId的取值是从0开始的连续整数,默认一个ledger最多包含50000条entry,这是由配置managedLedgerMaxEntriesPerLedger来控制的。

我们可以使用下面测试代码来对比改进前后的内存占用大小。
为了模拟普通的消费场景,我们假设用户每秒消费1000条消息,那么就是1ms一条,因此每次生产消息时将timestamp递增1。
默认允许1.024s内的误差,因此可以将时间戳的低10bit位置为0。
按照默认的managedLedgerMaxEntriesPerLedger配置值,每写入50000条消息切换一次ledger,然后总共写入1000w条消息。
旧版实现是使用TripleLongPriorityQueue来实现的,它是一个堆数据结构,使用直接内存来存储三元组数据,因此无法通过jmap来得到内存占用大小信息,但是它提供了bytesCapacity()方法来获取内存占用大小信息。

根据上面输出可见,1000w条entry占用240MB。
Long2ObjectSortedMap则使用的堆内存,可以使用jmap将内存dump下来分析:

可见新的数据结构仅占用内存25MB,将内存使用率降低到原来的25/240 = 10.4%!
新算法的潜力当然不只这点,我们进一步做理论分析,从而搞清楚内存占用大小跟各个参数之间的关系,那么就能知晓新算法的调优方向了。
3.2 复杂度分析
新数据结构占用的空间大小跟很多因素有关,比如说消息速率、允许的时间误差大小、每个ledger允许写入的最多entry条数。
Pulsar的配置managedLedgerMaxEntriesPerLedger=50000决定了每个ledger允许写入的最多entry条数,这里的分析直接沿用这个默认的配置。
允许的时间误差大小,上面使用的是10bit,即允许2**10ms=1024ms的时间误差,那么这1024ms内的消息映射都会聚合成一个bucket。
我们假设消息速率x条/ms,允许y bit的时间误差(x对应了前面代码里的messagePerMs变量,y对应了trimLowerBits变量),那么2**y ms会作为一个bucket,先分析这一个bucket的空间占用,那么
1. 只需要存储一个时间戳,一个long数值 8byte。
2. 接下来要存储ledgerID,这2**y ms内有M=x*2**y条消息发送过来,我们需要考虑它是否会切换ledger,导致ledgerID发生改变,那么这段时间会有L=
个ledger id存在,即ledger id消耗8L byte。
3. 然后是entry id,每个ledger的entry id都使用一个Roaring64Bitmap来存储,但是由于我们限制一个ledger最多写5w条entry,因此entry id的范围是能够使用int类型来表达的,因此能使用RoaringBitmap代替Roaring64Bitmap来进一步降低内存占用,但是实测发现替换前后的内存占用大小差不多,暂时没必要做这个替换。
为了方便分析,我们按RoaringBitmap的空间复杂度来分析:
当条目个数没达到4096时,这是稀疏数据的场景,RoaringBitmap使用有序数组Array Container来存储数据,空间占用随条目个数线性增加,每个short int条目占用2byte,即2N byte;
当条目超过4096个之后,使用位图BitmapContainer 存储数据,适合稠密数据,一个RoaringBitmap固定占用8KB空间。
实际上RoaringBitmap内部还会针对连续数据进行优化,RunContainer 使用行程长度编码(Run-Length Encoding, RLE)对连续数据进行压缩,将连续的数字序列压缩为“起始值+长度”的形式来节省空间。例如:数列 [11, 12, 13, 14, 15] 会被压缩为 (11, 4),表示从 11 开始的 5 个连续数字。而Pulsar的entry id就是连续整数的场景,如果某个topic里生产的每条消息都是延迟消息,那么就能用上这种优化。
在 RoaringBitmap 中,容器的选择是自动化的。当插入数据时,RoaringBitmap 会根据数据的分布和密度自动选择最适合的容器类型(ArrayContainer、BitmapContainer 或 RunContainer)。为了简化分析,我们不考虑RunContainer的优化。
L个ledger会对应L个RoaringBitmap,L =
,M为一个bucket里写入的消息条数。因此总占用内存Total Entry Size(TES)为
当M < 4096时,大部分情况下不用切换ledger,因此L=1,TES = 8byte + 8byte + 2M byte≈2M byte。
当M > 4096时,RoaringBitmap占用空间 8*L KB,因此TES = 8byte + 8L byte + 8L KB≈8L KB。
为了简化分析,这里直接忽略了前面两项。
接下来评估每条消息平均占用内存大小Average Entry Size,简称为AES,则有AES=TES/M,分类讨论如下:
M >> 50000时,
≈M/50000,则8L KB≈M/50000*8 *1024byte = 0.163M byte,即AES≈0.163byte。4096 < M < 50000时,只有一个bucket,即L=1,一个RoaringBitmap占用空间 8KB,AMS = 1*8*1024/M byte = 8192/M byte。如M = 4096时,AMS = 8*1024/M byte = 2byte,M=8192时,AES = 1byte。
M<4096时,AES = TES/M = 2byte。
显然是M越大,空间压缩效果越好,因此如果要提高算法的数据压缩效率,目标就是提高M,要提高M,可以提高消息速度、增大bucket的时间长度。
而且AES的最小值并不是0.163byte,前面的讨论都是假设managedLedgerMaxEntriesPerLedger=50000,如果每个ledger包含的entry个数进一步调大,AES能进一步降低,但是会有一个理论下限,因为每条延迟消息至少需要一个bit位来记录它是否要延迟分发,因此下限是1bit,而前面推导出来的0.163byte=1.304bit,已经逼近这个理论下限了。
3.3 实验验证
下面使用一系列参数来进行实验,验证前面的分析,下面使用的x、y分别对应代码里的messagePerMs、trimLowerBits。
x=1,y=10
这是前面第一次实验的参数,此时M=x*2**y=1024,还没到4096个条目,因此AES=2,1000w条消息对应空间2*10000000/1024/1024MB = 19.1MB,而实验测得25MB,多出来的部分是由Long2ObjectSortedMap等java对象产生的内存消耗,当M过小时,bucket个数会偏多,从而产生较多的对象,此时有9767个buckets,从而跟理论预测结果产生了误差。

我们直接度量Roaring64Bitmap的内存大小时,结果跟理论预测一样也是19MB。
x=4,y=10
此时M=x*2**y=4096,AES=2,1000w条消息对应空间仍为19.1MB,虽然理论预测的内存占用一样是19.1MB,但是由于bucket个数大大减少了,从9767下降到2443,因此Long2ObjectSortedMap等java对象产生的内存消耗几乎可以忽略不计,从而使得实际内存占用大小跟理论预测几乎相同,即20.48MB。

x=8,y=10
此时M=x*2**y=8192,AES=8192/M=1,则1000w条消息对应空间为10000000*1/1024/1024byte=9.53MB。实际内存占用大小为11MB。

x=8,y=10意味着该任务每秒生产8000条消息,每个bucket对应1.024s,这也是最为常见的一种场景,此时内存占用只有优化前的11/240=4.5%。
当用户延迟时间较长时,对时间精度的容忍能提高到1min,那么每个bucket的时间长度也可以进一步提高,由于2**15/1000s=32.7s,2**16/1000s=65s,因此我们可以将y提高到15。
x=8,y=15
此时M=x*2**y=262144 >> 50000,AES≈0.163byte,则1000w条消息对应空间为10000000*0.163/1024/1024byte=1.55MB。实际内存占用大小为2.25MB。

跟预测基本吻合,此时内存占用只有优化前的2.25/240=0.93%!
此时AES达到2.25*1024*1024/10000000=0.235byte,已经逼近理论值0.163byte了。
读者也可以尝试调整使用其他参数组合,从而预测其内存占用大小,对应的broker可调节参数有delayedDeliveryTickTimeMillis、managedLedgerMaxEntriesPerLedger,消息速度由用户任务决定,用户能接受的延迟时间精度也是由用户任务决定的,但是当前在确定bucket所跨时间长度时直接使用了全局配置delayedDeliveryTickTimeMillis,读者如果希望做更精细化的配置,可以考虑增加订阅级别的配置,让不同订阅任务使用不同时间长度的bucket,这样能进一步降低内存占用。正如前面实验所展示的,如果延迟时间达到1d,那么延迟精度降低到1min也许也能接受,那么数据压缩的效果能显著提高。
Last updated
Was this helpful?

