diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java index eae4aea0af347..d12f21c718cc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java @@ -207,12 +207,14 @@ void shouldTolerateInstableClockSourceWhenUpdatingTokens() { DefaultMonotonicSnapshotClock monotonicSnapshotClock = new DefaultMonotonicSnapshotClock(resolutionNanos, () -> offset.get() + manualClockSource.get()); + long initialTokens = 500L; asyncTokenBucket = AsyncTokenBucket.builder().resolutionNanos(resolutionNanos) - .capacity(100000).rate(1000).initialTokens(500).clock(monotonicSnapshotClock).build(); + .capacity(100000).rate(1000).initialTokens(initialTokens).clock(monotonicSnapshotClock).build(); Random random = new Random(0); int randomOffsetCount = 0; for (int i = 0; i < 100000; i++) { + // increment the clock by 1ms, since rate is 1000 tokens/s, this should make 1 token available incrementMillis(1); if (i % 39 == 0) { // randomly offset the clock source @@ -222,9 +224,15 @@ void shouldTolerateInstableClockSourceWhenUpdatingTokens() { asyncTokenBucket.tokens(true); randomOffsetCount++; } + // consume 1 token asyncTokenBucket.consumeTokens(1); } assertThat(asyncTokenBucket.tokens(true)) - .isGreaterThan(500L).isCloseTo(500L, Offset.offset(3L * randomOffsetCount)); + // since the rate is 1/ms and the test increments the clock by 1ms and consumes 1 token in each + // iteration, the tokens should be greater than or equal to the initial tokens + .isGreaterThanOrEqualTo(initialTokens) + // tolerate difference in added tokens since when clock leaps forward or backwards, the clock + // is assumed to have moved forward by the resolutionNanos + .isCloseTo(initialTokens, Offset.offset(3L * randomOffsetCount)); } } \ No newline at end of file