From 5095f497775528e2b275645595aa9dba58e8284f Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 6 Feb 2025 10:02:09 -0500 Subject: [PATCH] fix: fully encapsulating rejected execution handling closes: #6863 Signed-off-by: Steve Hawkins --- CHANGELOG.md | 2 ++ .../dsl/internal/ExecWebSocketListener.java | 29 +++++++-------- .../PortForwarderWebsocketListener.java | 35 ++++++++----------- .../informers/impl/cache/ProcessorStore.java | 2 +- .../informers/impl/cache/SharedProcessor.java | 32 +++++++---------- .../client/utils/internal/SerialExecutor.java | 19 ++++++++-- 6 files changed, 59 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4611fd5db5b..16138a8e3d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ #### Improvements +* Fix #6863: ensuring SerialExecutor does not throw RejectedExecutionException to prevent unnecessary error logs + #### Dependency Upgrade #### New Features diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 0eb8337996c..f10884c18ce 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -47,7 +47,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -371,23 +370,19 @@ public void onClose(WebSocket webSocket, int code, String reason) { } closeWebSocketOnce(code, reason); LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", code, reason); - try { - serialExecutor.execute(() -> { - try { - if (exitCode.complete(null)) { - // this is expected for processes that don't terminate - uploads for example - LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose"); - } - cleanUpOnce(); - } finally { - if (listener != null) { - listener.onClose(code, reason); - } + serialExecutor.execute(() -> { + try { + if (exitCode.complete(null)) { + // this is expected for processes that don't terminate - uploads for example + LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose"); } - }); - } catch (RejectedExecutionException e) { - LOGGER.debug("Client already shutdown, aborting normal closure", e); - } + cleanUpOnce(); + } finally { + if (listener != null) { + listener.onClose(code, reason); + } + } + }); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java index 3a4a5e347bc..9c481be13d4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java @@ -32,7 +32,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -130,27 +129,23 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) { } else { // Data if (out != null) { - try { - serialExecutor.execute(() -> { - try { - while (buffer.hasRemaining()) { - int written = out.write(buffer); // channel byte already skipped - if (written == 0) { - // out is non-blocking, prevent a busy loop - Thread.sleep(50); - } - } - webSocket.request(); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + serialExecutor.execute(() -> { + try { + while (buffer.hasRemaining()) { + int written = out.write(buffer); // channel byte already skipped + if (written == 0) { + // out is non-blocking, prevent a busy loop + Thread.sleep(50); } - clientError(webSocket, "forwarding data to the client", e); } - }); - } catch (RejectedExecutionException e) { - // just ignore - } + webSocket.request(); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + clientError(webSocket, "forwarding data to the client", e); + } + }); } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java index d21fed01e2a..15884622bf9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java @@ -113,7 +113,7 @@ public void retainAll(Set nextKeys, Consumer cacheStateComplet } }); if (cacheStateComplete != null) { - cacheStateComplete.accept(this.processor::executeIfPossible); + cacheStateComplete.accept(this.processor.getSerialExecutor()::execute); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 1750b9e849f..730eda09d7e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -108,20 +107,16 @@ public void distribute(Consumer> operation, boolean isSync) } finally { lock.readLock().unlock(); } - try { - executor.execute(() -> { - for (ProcessorListener listener : toCall) { - try { - operation.accept(listener); - } catch (Exception ex) { - log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), - ex); - } + executor.execute(() -> { + for (ProcessorListener listener : toCall) { + try { + operation.accept(listener); + } catch (Exception ex) { + log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), + ex); } - }); - } catch (RejectedExecutionException e) { - // do nothing - } + } + }); } public boolean shouldResync() { @@ -202,11 +197,8 @@ public Optional getMinimalNonZeroResyncPeriod() { } } - public void executeIfPossible(Runnable runnable) { - try { - this.executor.execute(runnable); - } catch (RejectedExecutionException e) { - // already shutdown - } + public SerialExecutor getSerialExecutor() { + return executor; } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java index 3ade70d47ea..eb221c9c62d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java @@ -15,6 +15,9 @@ */ package io.fabric8.kubernetes.client.utils.internal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; @@ -30,6 +33,9 @@ * Added shutdown support */ public class SerialExecutor implements Executor { + + private static final Logger log = LoggerFactory.getLogger(SerialExecutor.class); + final Queue tasks = new LinkedBlockingDeque<>(); final Executor executor; Runnable active; @@ -41,10 +47,14 @@ public SerialExecutor(Executor executor) { this.executor = executor; } + /** + * Executes the given command at some time in the future. Unlike a normal {@link Executor}, it will + * not throw a {@link RejectedExecutionException} + */ @Override public synchronized void execute(final Runnable r) { if (shutdown) { - throw new RejectedExecutionException(); + log.debug("Task submitted after the executor was shutdown"); } tasks.offer(() -> { try { @@ -72,7 +82,11 @@ public synchronized void execute(final Runnable r) { protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { - executor.execute(active); + try { + executor.execute(active); + } catch (RejectedExecutionException e) { + log.debug("Underlying executor rejected execution", e); + } } } @@ -94,4 +108,5 @@ public void shutdownNow() { public boolean isShutdown() { return shutdown; } + }