Skip to content

Commit

Permalink
Support a single permit that exceeds maxReadsInFlightSize
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Jan 29, 2025
1 parent a691409 commit 0ef4052
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ private synchronized Optional<Handle> internalAcquire(long permits, Consumer<Han
}
updateMetrics();
return Optional.of(handle);
} else if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize) {
remainingBytes = 0;
if (log.isInfoEnabled()) {
log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
+ "Allowing request with permits set to maxReadsInFlightSize.",
permits, maxReadsInFlightSize, handle.creationTime, remainingBytes);
}
updateMetrics();
return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true));
} else {
if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
scheduleTimeOutCheck(acquireTimeoutMillis);
Expand Down Expand Up @@ -250,7 +259,9 @@ private synchronized void internalRelease(Handle handle) {
// remove the peeked handle from the queue
queuedHandles.poll();
handleTimeout(queuedHandle);
} else if (remainingBytes >= queuedHandle.handle.permits) {
} else if (remainingBytes >= queuedHandle.handle.permits
|| queuedHandle.handle.permits > maxReadsInFlightSize
&& remainingBytes == maxReadsInFlightSize) {
// remove the peeked handle from the queue
queuedHandles.poll();
handleQueuedHandle(queuedHandle);
Expand All @@ -265,16 +276,28 @@ private synchronized void internalRelease(Handle handle) {
}

private void handleQueuedHandle(QueuedHandle queuedHandle) {
remainingBytes -= queuedHandle.handle.permits;
if (log.isDebugEnabled()) {
log.debug("acquired queued permits: {}, creationTime: {}, remainingBytes:{}",
queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes);
long permits = queuedHandle.handle.permits;
Handle handleForCallback = queuedHandle.handle;
if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize) {
remainingBytes = 0;
if (log.isInfoEnabled()) {
log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
+ "Allowing request with permits set to maxReadsInFlightSize.",
permits, maxReadsInFlightSize, queuedHandle.handle.creationTime, remainingBytes);
}
handleForCallback = new Handle(maxReadsInFlightSize, queuedHandle.handle.creationTime, true);
} else {
remainingBytes -= permits;
if (log.isDebugEnabled()) {
log.debug("acquired queued permits: {}, creationTime: {}, remainingBytes:{}",
permits, queuedHandle.handle.creationTime, remainingBytes);
}
}
try {
queuedHandle.callback.accept(queuedHandle.handle);
queuedHandle.callback.accept(handleForCallback);
} catch (Exception e) {
log.error("Error in callback of acquired queued permits: {}, creationTime: {}, remainingBytes:{}",
queuedHandle.handle.permits, queuedHandle.handle.creationTime, remainingBytes, e);
handleForCallback.permits, handleForCallback.creationTime, remainingBytes, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ private static Object[][] isDisabled() {
};
}

@DataProvider
private static Object[] booleanValues() {
return new Object[]{ true, false };
}

@Test(dataProvider = "isDisabled")
public void testDisabled(long maxReadsInFlightSize, boolean shouldBeDisabled) throws Exception {
var otel = buildOpenTelemetryAndReader();
Expand Down Expand Up @@ -428,6 +433,111 @@ public void testQueueSizeLimitReached() throws Exception {
.hasValueSatisfying(handle -> assertThat(handle.success()).isFalse());
}

@Test(dataProvider = "booleanValues")
public void testAcquireExceedingMaxReadsInFlightSize(boolean firstInQueue) throws Exception {
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

long maxReadsInFlightSize = 100;
InflightReadsLimiter limiter =
new InflightReadsLimiter(maxReadsInFlightSize, ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor,
OpenTelemetry.noop());

// Initial state
assertThat(limiter.getRemainingBytes())
.as("Initial remaining bytes should match maxReadsInFlightSize")
.isEqualTo(maxReadsInFlightSize);

// Acquire all permits (consume 100 bytes)
Optional<InflightReadsLimiter.Handle> handle1 = limiter.acquire(100, null);
assertThat(handle1)
.as("The first handle should be present")
.isPresent();
assertThat(limiter.getRemainingBytes())
.as("Remaining bytes should be zero after acquiring all permits")
.isEqualTo(0);


AtomicReference<InflightReadsLimiter.Handle> handle2Reference = new AtomicReference<>();

if (!firstInQueue) {
Optional<InflightReadsLimiter.Handle> handle2 = limiter.acquire(50, handle2Reference::set);
assertThat(handle2)
.as("The second handle should not be present as remaining permits are zero")
.isNotPresent();
}

// Attempt to acquire more than maxReadsInFlightSize while all permits are in use
AtomicReference<InflightReadsLimiter.Handle> handleExceedingMaxReference = new AtomicReference<>();
Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
limiter.acquire(200, handleExceedingMaxReference::set);
assertThat(handleExceedingMaxOptional)
.as("The second handle should not be present as remaining permits are zero")
.isNotPresent();

// Release handle1 permits
limiter.release(handle1.get());

if (!firstInQueue) {
assertThat(handle2Reference)
.as("Handle2 should have been set in the callback and marked successful")
.hasValueSatisfying(handle -> {
assertThat(handle.success()).isTrue();
assertThat(handle.permits()).isEqualTo(50);
});
limiter.release(handle2Reference.get());
}

assertThat(handleExceedingMaxReference)
.as("Handle2 should have been set in the callback and marked successful")
.hasValueSatisfying(handle -> {
assertThat(handle.success()).isTrue();
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
});

limiter.release(handleExceedingMaxReference.get());

assertThat(limiter.getRemainingBytes())
.as("Remaining bytes should be fully replenished after releasing all permits")
.isEqualTo(maxReadsInFlightSize);
}

@Test
public void testAcquireExceedingMaxReadsWhenAllPermitsAvailable() throws Exception {
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

long maxReadsInFlightSize = 100;
InflightReadsLimiter limiter =
new InflightReadsLimiter(maxReadsInFlightSize, ACQUIRE_QUEUE_SIZE, ACQUIRE_TIMEOUT_MILLIS, executor,
OpenTelemetry.noop());

// Initial state
assertThat(limiter.getRemainingBytes())
.as("Initial remaining bytes should match maxReadsInFlightSize")
.isEqualTo(maxReadsInFlightSize);

// Acquire permits > maxReadsInFlightSize
Optional<InflightReadsLimiter.Handle> handleExceedingMaxOptional =
limiter.acquire(2 * maxReadsInFlightSize, null);
assertThat(handleExceedingMaxOptional)
.as("The handle for exceeding max permits should be present")
.hasValueSatisfying(handle -> {
assertThat(handle.success()).isTrue();
assertThat(handle.permits()).isEqualTo(maxReadsInFlightSize);
});
assertThat(limiter.getRemainingBytes())
.as("Remaining bytes should be zero after acquiring all permits")
.isEqualTo(0);

// Release permits
limiter.release(handleExceedingMaxOptional.get());

assertThat(limiter.getRemainingBytes())
.as("Remaining bytes should be fully replenished after releasing all permits")
.isEqualTo(maxReadsInFlightSize);
}

private Pair<OpenTelemetrySdk, InMemoryMetricReader> buildOpenTelemetryAndReader() {
var metricReader = InMemoryMetricReader.create();
var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
Expand Down

0 comments on commit 0ef4052

Please sign in to comment.