Skip to content

Commit

Permalink
fix: fully encapsulating rejected execution handling
Browse files Browse the repository at this point in the history
closes: #6863

Signed-off-by: Steve Hawkins <[email protected]>
  • Loading branch information
shawkins authored Feb 7, 2025
1 parent 108f35d commit b6b66a1
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#### Improvements

* Fix #6863: ensuring SerialExecutor does not throw RejectedExecutionException to prevent unnecessary error logs

#### Dependency Upgrade

#### New Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -371,23 +370,19 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}
closeWebSocketOnce(code, reason);
LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", code, reason);
try {
serialExecutor.execute(() -> {
try {
if (exitCode.complete(null)) {
// this is expected for processes that don't terminate - uploads for example
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
}
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
}
serialExecutor.execute(() -> {
try {
if (exitCode.complete(null)) {
// this is expected for processes that don't terminate - uploads for example
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
}
});
} catch (RejectedExecutionException e) {
LOGGER.debug("Client already shutdown, aborting normal closure", e);
}
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

Expand Down Expand Up @@ -130,27 +129,23 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) {
} else {
// Data
if (out != null) {
try {
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
}
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
}
clientError(webSocket, "forwarding data to the client", e);
}
});
} catch (RejectedExecutionException e) {
// just ignore
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
clientError(webSocket, "forwarding data to the client", e);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void retainAll(Set<String> nextKeys, Consumer<Executor> cacheStateComplet
}
});
if (cacheStateComplete != null) {
cacheStateComplete.accept(this.processor::executeIfPossible);
cacheStateComplete.accept(this.processor.getSerialExecutor()::execute);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -108,20 +107,16 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
} finally {
lock.readLock().unlock();
}
try {
executor.execute(() -> {
for (ProcessorListener<T> listener : toCall) {
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
executor.execute(() -> {
for (ProcessorListener<T> listener : toCall) {
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
});
} catch (RejectedExecutionException e) {
// do nothing
}
}
});
}

public boolean shouldResync() {
Expand Down Expand Up @@ -202,11 +197,8 @@ public Optional<Long> getMinimalNonZeroResyncPeriod() {
}
}

public void executeIfPossible(Runnable runnable) {
try {
this.executor.execute(runnable);
} catch (RejectedExecutionException e) {
// already shutdown
}
public SerialExecutor getSerialExecutor() {
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.fabric8.kubernetes.client.utils.internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -30,6 +33,9 @@
* Added shutdown support
*/
public class SerialExecutor implements Executor {

private static final Logger log = LoggerFactory.getLogger(SerialExecutor.class);

final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
final Executor executor;
Runnable active;
Expand All @@ -41,10 +47,14 @@ public SerialExecutor(Executor executor) {
this.executor = executor;
}

/**
* Executes the given command at some time in the future. Unlike a normal {@link Executor}, it will
* not throw a {@link RejectedExecutionException}
*/
@Override
public synchronized void execute(final Runnable r) {
if (shutdown) {
throw new RejectedExecutionException();
log.debug("Task submitted after the executor was shutdown");
}
tasks.offer(() -> {
try {
Expand Down Expand Up @@ -72,7 +82,11 @@ public synchronized void execute(final Runnable r) {

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
try {
executor.execute(active);
} catch (RejectedExecutionException e) {
log.debug("Underlying executor rejected execution", e);
}
}
}

Expand All @@ -94,4 +108,5 @@ public void shutdownNow() {
public boolean isShutdown() {
return shutdown;
}

}

0 comments on commit b6b66a1

Please sign in to comment.