Skip to content

Commit

Permalink
Merge pull request atomashpolskiy#207 from NyannCat/issue-shared-exec…
Browse files Browse the repository at this point in the history
…utor

Shared executor issue
  • Loading branch information
atomashpolskiy authored Jun 8, 2022
2 parents dcd0f41 + 485d3f0 commit c7e74be
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
20 changes: 13 additions & 7 deletions bt-core/src/main/java/bt/net/ConnectionSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ConnectionSource implements IConnectionSource {

private final IPeerConnectionFactory connectionFactory;
private final IPeerConnectionPool connectionPool;
private final ExecutorService connectionExecutor;
private final ExecutorService outgoingConnectionExecutor;
private final Config config;
private final Object lock = new Object();

Expand All @@ -56,17 +56,23 @@ public ConnectionSource(Set<PeerConnectionAcceptor> connectionAcceptors,
this.connectionPool = connectionPool;
this.config = config;

String threadName = String.format("%d.bt.net.pool.connection-worker", config.getAcceptorPort());
this.connectionExecutor = Executors.newFixedThreadPool(
String outgoingThreadName = String.format("%d.bt.net.pool.outgoing-connection-worker", config.getAcceptorPort());
this.outgoingConnectionExecutor = Executors.newFixedThreadPool(
config.getMaxPendingConnectionRequests(),
CountingThreadFactory.daemonFactory(threadName));
lifecycleBinder.onShutdown("Shutdown connection workers", connectionExecutor::shutdownNow);
CountingThreadFactory.daemonFactory(outgoingThreadName));
lifecycleBinder.onShutdown("Shutdown outgoing connection workers", outgoingConnectionExecutor::shutdownNow);

this.pendingConnections = new ConcurrentHashMap<>();
this.unreachablePeers = new ConcurrentHashMap<>();

String incomingThreadName = String.format("%d.bt.net.pool.incoming-connection-worker", config.getAcceptorPort());
ExecutorService incomingConnectionExecutor = Executors.newFixedThreadPool(
config.getMaxPendingConnectionRequests(),
CountingThreadFactory.daemonFactory(incomingThreadName));
lifecycleBinder.onShutdown("Shutdown incoming connection workers", incomingConnectionExecutor::shutdownNow);

IncomingConnectionListener incomingListener =
new IncomingConnectionListener(connectionAcceptors, connectionExecutor, connectionPool, config);
new IncomingConnectionListener(connectionAcceptors, incomingConnectionExecutor, connectionPool, config);
lifecycleBinder.onStartup("Initialize incoming connection acceptors", incomingListener::startup);
lifecycleBinder.onShutdown("Shutdown incoming connection acceptors", incomingListener::shutdown);
}
Expand Down Expand Up @@ -156,7 +162,7 @@ private CompletableFuture<ConnectionResult> createPendingConnFuture(Peer peer, T
pendingConnections.remove(key);
}
}
}, connectionExecutor).whenComplete((acquiredConnection, throwable) -> {
}, outgoingConnectionExecutor).whenComplete((acquiredConnection, throwable) -> {
if (acquiredConnection == null || throwable != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Peer is unreachable: {}. Will prevent further attempts to establish connection.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ private void fireDataReceived() {
public boolean encode(Message message) {
checkHandlerIsBound();

ByteBuffer buffer = outboundBuffer.lockAndGet();
if (buffer == null) {
// buffer has been released
// TODO: So what? Maybe throw an exception then?
return false;
}

try {
ByteBuffer buffer = outboundBuffer.lockAndGet();
if (buffer == null) {
// buffer has been released
// this may happen in handshake stage when client send EOF, buffer has been released by SocketChannelHandler
// TODO: So what? Maybe throw an exception then?
return false;
}
return writeMessageToBuffer(message, buffer);
} finally {
outboundBuffer.unlock();
Expand Down
11 changes: 6 additions & 5 deletions bt-core/src/main/java/bt/net/pipeline/SocketChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ private boolean processInboundData() throws IOException {
public void flush() {
synchronized (outboundBufferLock) {
ByteBuffer buffer = outboundBuffer.lockAndGet();
if (buffer == null) {
// buffer has been released
return;
}
buffer.flip();
try {
if (buffer == null) {
// buffer has been released
outboundBuffer.unlock();
return;
}
buffer.flip();
while (buffer.hasRemaining() && !closing) {
channel.write(buffer);
}
Expand Down

0 comments on commit c7e74be

Please sign in to comment.