3. UniformLoadShedder
As analyzed earlier, neither ThresholdShedder
nor OverloadShedder
can address the issue of resource wastage by brokers with low load. Therefore, UniformLoadShedder
was introduced.
Note: Although ThresholdShedder
later added the lowerBoundaryShedding
feature to address the issue of low-load brokers, this was after the introduction of UniformLoadShedder
.
3.1 Highest and Lowest Loaded Brokers
Below is a detailed introduction to the UniformLoadShedder
algorithm. It first calculates the maximum and minimum message rates, maximum and minimum traffic throughput, and the corresponding brokers, as follows:
Then, it calculates the differences between the maximum and minimum values. There are two thresholds, one for message rate and one for throughput size.
Note: The data type for both two configuration values is double
.
loadBalancerMsgRateDifferenceShedderThreshold: This is the percentage threshold for the difference in message rates between the most loaded and the least loaded brokers. The default value is 50.
When the maximum message rate is 1.5 times the minimum message rate, bundle unloading can be triggered. For example, if the message rate of Broker 1 is 50K and the message rate of Broker 2 is 30K, the difference is
(50 - 30) / 30 = 66%
. In this case, the load balancer can unload bundles from Broker 1 to Broker 2.Although the configuration description states that this is a percentage value, it can actually exceed 100, allowing for a larger difference between the maximum and minimum message rates. Let the maximum and minimum message rates be X and Y, respectively, and the threshold be P. The trigger condition is:
When P is 100, it allows a maximum difference of one times.
loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold: This is the multiplier threshold for the difference in throughput between the most loaded and the least loaded brokers. The default value is 4.
When the maximum throughput is four times the minimum throughput, bundle unloading can be triggered. For example, if the throughput of Broker 1 is 450MB and the throughput of Broker 2 is 100MB, the throughput difference is
450 / 100 = 4.5
times. In this case, the load balancer can unload bundles from Broker 1 to Broker 2.If the threshold is configured to a value not greater than 0, shedding based on message rate or throughput will be disabled; if both thresholds are met, shedding will prioritize based on message rate.
3.2 Selecting Bundles
Next, calculate the amount of message rate/throughput corresponding to the bundles that need to be unloaded. The algorithm is as follows: the difference between the maximum and minimum values multiplied by maxUnloadPercentage
, with a default value of 0.2.
3.2.1 Based on message rate
If shedding is based on message rate, perform some checks, such as ensuring that the number of bundles on the overloaded broker is greater than 1, and that the total message rate to be unloaded reaches the threshold minUnloadMessage
, which is configured by default to be 1000
.
Note: this configuration is not included in broker.conf
, but does exist in org.apache.pulsar.broker.ServiceConfiguration
.
After passing the checks, start selecting bundles. The selection is also from largest to smallest, but with an additional constraint: the total number of bundles to be unloaded cannot exceed maxUnloadBundleNumPerShedding
, which is unlimited by default.
3.2.2 Based on throughput
If shedding is based on throughput, similar checks are performed, ensuring that the amount of traffic to be unloaded is at least minUnloadMessageThroughput
, with a default value of 1MB.
The logic for selecting bundles is essentially the same, so it will not be repeated here.
3.3 Evaluation
After introducing the algorithm of UniformLoadShedder
, we can clearly obtain the following information:
UniformLoadShedder
does not have the problem of resource wastage by low-load brokers. It considers the situation of low-load brokers, which is also the motivation for its introduction.UniformLoadShedder
does not handle load fluctuations. For example, when traffic suddenly surges or drops sharply in a short period, it will adopt these load data points and trigger bundle unloading. However, the traffic of the topic will soon return to normal, which may trigger bundle unloading again. Such bundle unloading should actually be avoided. This scenario is quite common, as shown in the figure below:
In contrast,
ThresholdShedder
assigns a high weight to historical scores when scoring. Significant changes in resource usage do not immediately reflect in the load score, which requires multiple iterations to approach the resource usage. Therefore, it can handle scenarios with load fluctuations.
UniformLoadShedder
does not rely on indicators such as CPU usage or network card usage to determine high-load and low-load brokers. Instead, it judges based on message rate and throughput. In contrast,ThresholdShedder
andOverloadShedder
rely on machine resource indicators such as CPU usage.If the machines in the cluster are heterogeneous, that is, if there are differences in the hardware configurations of different machines, or if the broker's machine shares resources with other processes,
UniformLoadShedder
is highly likely to misjudge high-load and low-load brokers. It may then migrate load from a high-performance, low-load broker to a low-performance, high-load broker. Therefore, it is not recommended for users to useUniformLoadShedder
in a heterogeneous environment.To illustrate with a simple example, if Broker 1 has twice the performance of Broker 2, and Broker 1 is handling a load of 150MB/s with a CPU usage of 40%, while Broker 2 is handling a load of 100MB/s with a CPU usage of 60%,
UniformLoadShedder
will still judge Broker 1 as the high-load broker and Broker 2 as the low-load broker, and migrate the load from Broker 1 to Broker 2. This is clearly a very wrong decision.
UniformLoadShedder
only unloads bundles from one highest-load broker per shedding operation. For large clusters, this may cause the cluster to take a considerable amount of time to complete all load balancing tasks.For example, if there are currently 100 high-load brokers in the cluster and 100 new machines are added, it is roughly estimated that 100 shedding operations are needed to complete the balancing task. However, since the execution interval of the
LoadSheddingStrategy
is determined by the configurationloadBalancerSheddingIntervalMinutes
, which defaults to once per minute, it will take 100 minutes to complete all tasks. For users with large partition topics, their tasks are highly likely to experience multiple connection interruptions during these 100 minutes, which will undoubtedly have a very negative impact on user experience.From a user experience perspective, when encountering occasional events such as broker scaling or user onboarding/offboarding that cause load increases or decreases, the ideal outcome is to quickly complete a load balancing operation and then basically not need to perform load balancing again, rather than continuously executing load balancing for a long time.
Last updated
Was this helpful?