Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fully encapsulating rejected execution handling #6867

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### Improvements

* Fix #6863: ensuring SerialExecutor does not throw RejectedExecutionException to prevent unnecessary error logs

#### Dependency Upgrade

#### New Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -371,23 +370,19 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}
closeWebSocketOnce(code, reason);
LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", code, reason);
try {
serialExecutor.execute(() -> {
try {
if (exitCode.complete(null)) {
// this is expected for processes that don't terminate - uploads for example
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
}
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
}
serialExecutor.execute(() -> {
try {
if (exitCode.complete(null)) {
// this is expected for processes that don't terminate - uploads for example
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
}
});
} catch (RejectedExecutionException e) {
LOGGER.debug("Client already shutdown, aborting normal closure", e);
}
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

Expand Down Expand Up @@ -130,27 +129,23 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) {
} else {
// Data
if (out != null) {
try {
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
}
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
}
clientError(webSocket, "forwarding data to the client", e);
}
});
} catch (RejectedExecutionException e) {
// just ignore
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
clientError(webSocket, "forwarding data to the client", e);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void retainAll(Set<String> nextKeys, Consumer<Executor> cacheStateComplet
}
});
if (cacheStateComplete != null) {
cacheStateComplete.accept(this.processor::executeIfPossible);
cacheStateComplete.accept(this.processor.getSerialExecutor()::execute);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -108,20 +107,16 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
} finally {
lock.readLock().unlock();
}
try {
executor.execute(() -> {
for (ProcessorListener<T> listener : toCall) {
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
executor.execute(() -> {
for (ProcessorListener<T> listener : toCall) {
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
});
} catch (RejectedExecutionException e) {
// do nothing
}
}
});
}

public boolean shouldResync() {
Expand Down Expand Up @@ -202,11 +197,8 @@ public Optional<Long> getMinimalNonZeroResyncPeriod() {
}
}

public void executeIfPossible(Runnable runnable) {
try {
this.executor.execute(runnable);
} catch (RejectedExecutionException e) {
// already shutdown
}
public SerialExecutor getSerialExecutor() {
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.fabric8.kubernetes.client.utils.internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -30,6 +33,9 @@
* Added shutdown support
*/
public class SerialExecutor implements Executor {

private static final Logger log = LoggerFactory.getLogger(SerialExecutor.class);

final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
final Executor executor;
Runnable active;
Expand All @@ -41,10 +47,14 @@ public SerialExecutor(Executor executor) {
this.executor = executor;
}

/**
* Executes the given command at some time in the future. Unlike a normal {@link Executor}, it will
* not throw a {@link RejectedExecutionException}
*/
@Override
public synchronized void execute(final Runnable r) {
if (shutdown) {
throw new RejectedExecutionException();
log.debug("Task submitted after the executor was shutdown");
}
tasks.offer(() -> {
try {
Expand Down Expand Up @@ -72,7 +82,11 @@ public synchronized void execute(final Runnable r) {

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
try {
executor.execute(active);
} catch (RejectedExecutionException e) {
log.debug("Underlying executor rejected execution", e);
}
}
}

Expand All @@ -94,4 +108,5 @@ public void shutdownNow() {
public boolean isShutdown() {
return shutdown;
}

}
Loading