3. Principle Analysis
After a delayed message is sent to the broker, the delay time is persisted to BookKeeper as a message property along with the message, so that delayed messages can be reloaded when the broker is restarted.
The logic of delayed messages is mainly implemented by DelayedDeliveryTracker. DelayedDeliveryTracker maintains an index structure in memory using a heap: a triple (time, ledgerId, entryId), sorted by time, ledgerId, and entryId in sequence. When the broker receives a delayed message, it is added to the heap, and once the specific delay time has elapsed, it is delivered to consumers, as shown in the following diagram:

This design faces the problem of considerable memory shortage consumption. A batch message corresponds to one messageId (i.e., ledgerId, entryId), and a triple occupies 8*3 byte = 24byte of memory, meaning each batch consumes 24 bytes of memory. Since each shared/key-shared subscription has an independent DelayedDeliveryTracker, multiple shared/key-shared subscriptions will cause multiple copies of memory consumption.
If users specify that each message is delayed by 1 day, and the message production rate is 10000 msg/s, then the number of delayed messages is 24*3600*10000=864000000 (i.e., 864 million delayed messages not yet expired). Assuming a batch size of 100, there are 864000000/100=8640000 batches, then a single subscription occupies 8640000*24/1024/1024MB = 198MB of memory, and 2 shared subscriptions occupy 198*2 = 396MB. This memory consumption is very large, and when the scale of delayed messages in the cluster increases, the broker is likely to face the risk of OOM.
3.1 TimeBucket Algorithm
To solve this problem, we designed a new data storage structure, combined with an algorithm to successfully reduce memory usage by 95+%, or even up to 99+%. This major optimization has been merged into the community's master repository. you can browse the specific code at:
Below we introduce its design principle. We used the following layered data structure:
Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>();
To avoid memory overhead from Java wrappers, we use high-performance data structures provided by
fastutil.Mapping
timestamp -> ledgerId -> entryId. Since we need to traverse in chronological order, the outermost map is aSortedMap, and the innerledgerId -> entryIdis also aSortedMap, otherwise it will cause message disorder.
We also have a key algorithm: truncating timestamps, which we might call the TimeBucket algorithm.
Obviously, different scenarios have different precision requirements for delayed messages. A delay of 1 minute may require within 1 second, while a delay of 1 day may be acceptable within 1 minute. Using information about users' precision requirements, we can map recorded timestamps to time buckets.
For example, if users allow delay precision within 1s, then 2**10 = 1024ms ≈ 1s. When adding an entry, we can directly set the lower 10 bits of the timestamp to 0, which means aggregating 1024 timestamps into one timestamp. The cost is that the delayed message specified by the user may be distributed 1s earlier, but it can reduce space usage by many times. The specific effect will be analyzed later.
Obviously, the lower the precision required by users, the higher the number of bits that can be truncated from the timestamp, and thus more storage space can be saved. Users can adjust delayedDeliveryTickTimeMillis according to their needs.
The outermost map compresses timestamp data, while the inner map compresses ledger id information. Since one user's topic uses the same ledger for a period of time, and the inner map uses ledgerId as the key, for all delayed messages belonging to the same ledger, only one ledgerId needs to be stored, without storing ledgerId for each entry. This can also save considerable space.
Finally, we use Roaring64Bitmap to store entryId information. EntryId is of long integer type, and we only need to record whether an entryId needs delayed distribution, i.e., consuming one bit is sufficient, so using a BitMap data structure is the most appropriate choice.
In addition, entryId values are consecutive integers starting from 0, and by default a ledger contains at most 50000 entries, which is controlled by the configuration managedLedgerMaxEntriesPerLedger.

We can use the following test code to compare the memory usage before and after the improvement.
To simulate a normal consumption scenario, we assume users consume 1000 messages per second, which is 1ms per message, so we increment the timestamp by 1 each time we produce a message.
By default, an error within 1.024s is allowed, so the lower 10 bits of the timestamp can be set to 0.
According to the default
managedLedgerMaxEntriesPerLedgerconfiguration value, switch ledger after writing 50000 messages, and write a total of 10 million messages.
The old implementation uses TripleLongPriorityQueue, which is a heap data structure using direct memory to store triple data. Therefore, the memory usage size information cannot be obtained through jmap, but it provides the bytesCapacity() method to get memory usage size information.

According to the above output, 10 million entries occupy 240MB.
Long2ObjectSortedMap uses heap memory, which can be dumped and analyzed using jmap:

It can be seen that the new data structure only occupies 25MB of memory, reducing memory usage to 25/240 = 10.4% of the original!
The potential of the new algorithm is of course more than this. We further do theoretical analysis to understand the relationship between memory usage and various parameters, so that we can know the optimization direction of the new algorithm.
3.2 Complexity Analysis
The space occupied by the new data structure is related to many factors, such as message rate, allowed time deviation, and the maximum number of entries allowed to be written to one ledger.
Pulsar's configuration
managedLedgerMaxEntriesPerLedger=50000determines the maximum number of entries allowed to be written to each ledger. This analysis directly uses this default configuration.The allowed time deviation, the above uses 10 bits, i.e., allowing 2**10ms=1024ms time error, then message mappings within this 1024ms will be aggregated into one bucket.
We assume message rate x messages/ms, allowing y bits of time error (x corresponds to the messagePerMs variable in the previous code, y corresponds to the trimLowerBits variable), then 2**y ms will be used as a bucket. First analyze the space occupied by one bucket, then
Only one timestamp needs to be stored, a long value of 8 bytes.
Next, we need to store ledgerID. In these 2**y ms, M=x*2**y messages are sent, we need to consider whether it will switch ledgers, causing ledgerID to change. During this time there will be L=
ledger ids, i.e., ledger id consumes 8L bytes.Then there's entry id. The entry id of each ledger is stored using a
Roaring64Bitmap, but since we limit a ledger to write at most 50,000 entries, the range of entry id can be expressed using the int type, soRoaringBitmapcan be used instead ofRoaring64Bitmapto further reduce memory usage. However, actual tests found that memory usage before and after replacement is about the same, so there is no need to do this replacement for now.
For ease of analysis, we analyze according to the space complexity of RoaringBitmap:
When the number of entries does not reach 4096, this is a sparse data scenario. RoaringBitmap uses an ordered array Array Container to store data. Space usage increases linearly with the number of entries, with each short int entry occupying 2 bytes, i.e., 2N bytes;
When entries exceed 4096, BitmapContainer is used to store data, suitable for dense data. A RoaringBitmap occupies a fixed 8KB of space.
In fact, RoaringBitmap also optimizes consecutive data internally. RunContainer uses Run-Length Encoding (RLE) to compress consecutive data, compressing consecutive number sequences into the form of "start value + length" to save space. For example: the sequence [11, 12, 13, 14, 15] will be compressed to (11, 4), representing 5 consecutive numbers starting from 11. And Pulsar's entry id is a scenario of consecutive integers. If every message produced in a topic is a delayed message, this optimization can be used.
In RoaringBitmap, container selection is automated. When inserting data, RoaringBitmap automatically selects the most suitable container type (ArrayContainer, BitmapContainer, or RunContainer) based on the distribution and density of the data. To simplify the analysis, we do not consider the optimization of RunContainer.
L ledgers will correspond to L RoaringBitmaps, L =
, where M is the number of messages written in one bucket. Therefore, the total memory occupied Total Entry Size (TES) is
When M < 4096, in most cases there is no need to switch ledgers, so L=1, TES = 8byte + 8byte + 2M byte≈2M byte.
When M > 4096, RoaringBitmap occupies space 8*L KB, so TES = 8byte + 8L byte + 8L KB≈8L KB.
To simplify the analysis, the first two terms are directly ignored here.
Next, evaluate the Average Entry Size per message, abbreviated as AES, then AES=TES/M, discussed by cases as follows:
When M >> 50000,
≈M/50000, then 8L KB≈M/50000*8 *1024byte = 0.163M byte, i.e., AES≈0.163byte.When 4096 < M < 50000, there is only one bucket, i.e., L=1, one RoaringBitmap occupies space 8KB, AMS = 1*8*1024/M byte = 8192/M byte. When M = 4096, AMS = 8*1024/M byte = 2byte, when M=8192, AES = 1byte.
When M<4096, AES = TES/M = 2byte.
Obviously, the larger M is, the better the space compression effect. Therefore, if we want to improve the data compression efficiency of the algorithm, the goal is to increase M. To increase M, we can increase message speed and increase the time length of buckets.
Moreover, the minimum value of AES is not 0.163byte. The previous discussion assumes managedLedgerMaxEntriesPerLedger=50000. If the number of entries contained in each ledger is further increased, AES can be further reduced, but there will be a theoretical lower limit. Because each delayed message requires at least one bit to record whether it needs delayed distribution, the lower limit is 1 bit, and the 0.163byte=1.304bit derived earlier is already approaching this theoretical lower limit.
3.3 Experimental Verification
Below we use a series of parameters to conduct experiments to verify the previous analysis. The x and y used below correspond to messagePerMs and trimLowerBits in the code respectively.
x=1, y=10
These are the parameters of the first experiment. At this time M=x*2**y=1024, which has not reached 4096 entries, so AES=2, 10 million messages correspond to space 2*10000000/1024/1024MB = 19.1MB, while the experiment measured 25MB. The extra part is memory consumption generated by Java objects such as Long2ObjectSortedMap. When M is too small, the number of buckets will be too many, resulting in more objects. At this time there are 9767 buckets, which creates a deviation from the theoretical prediction.

When we directly measure the memory size of Roaring64Bitmap, the result is also 19MB as theoretically predicted.
x=4, y=10
At this time M=x*2**y=4096, AES=2, 10 million messages correspond to space still 19.1MB. Although the theoretical prediction of memory usage is still 19.1MB, since the number of buckets has greatly decreased from 9767 to 2443, the memory consumption generated by Java objects such as Long2ObjectSortedMap can be almost ignored, making the actual memory usage almost the same as the theoretical prediction, i.e., 20.48MB.

x=8, y=10
At this time M=x*2**y=8192, AES=8192/M=1, then 10 million messages correspond to space 10000000*1/1024/1024byte=9.53MB. The actual memory usage is 11MB.

x=8, y=10 means this task produces 8000 messages per second, each bucket corresponds to 1.024s, which is also the most common scenario. At this time, memory usage is only 11/240=4.5% of before optimization.
When users have longer delay times, the tolerance for time precision can be increased to 1min, then the time length of each bucket can also be further increased. Since 2**15/1000s=32.7s, 2**16/1000s=65s, we can increase y to 15.
x=8, y=15
At this time M=x*2**y=262144 >> 50000, AES≈0.163byte, then 10 million messages correspond to space 10000000*0.163/1024/1024byte=1.55MB. The actual memory usage is 2.25MB.

Basically consistent with the prediction, at this time memory usage is only 2.25/240=0.93% of before optimization!
At this time AES reaches 2.25*1024*1024/10000000=0.235byte, already approaching the theoretical value of 0.163byte.
Readers can also try adjusting other parameter combinations to predict their memory usage. The corresponding broker adjustable parameters are delayedDeliveryTickTimeMillis and managedLedgerMaxEntriesPerLedger. Message speed is determined by user tasks, and the delay time precision users can accept is also determined by user tasks. However, currently when determining the time span of buckets, the global configuration delayedDeliveryTickTimeMillis is directly used. If readers want to do more fine-grained configuration, they can consider adding subscription-level configuration to allow different subscription tasks to use buckets of different time lengths, which can further reduce memory usage. As shown in the previous experiments, if the delay time reaches 1 day, then reducing delay precision to 1min may also be acceptable, and the data compression effect can be significantly improved.
Last updated
Was this helpful?

