Skip to content

Commit

Permalink
[CELEBORN-1673] Support retry create client
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
As title

### Why are the changes needed?
Currently, only Flink retries establishing a client when a connection problem occurs. This would be beneficial for all other engines to implement as well.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes apache#2855 from RexXiong/CELEBORN-1673.

Lead-authored-by: Shuang <[email protected]>
Co-authored-by: lvshuang.xjs <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
2 people authored and SteNicholas committed Oct 31, 2024
1 parent 12f25d3 commit 14baec8
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,57 +32,27 @@
import org.apache.celeborn.common.network.client.TransportClientBootstrap;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;

public class FlinkTransportClientFactory extends TransportClientFactory {

public static final Logger logger = LoggerFactory.getLogger(FlinkTransportClientFactory.class);

private ConcurrentHashMap<Long, Supplier<ByteBuf>> bufferSuppliers;
private final int fetchMaxRetries;

public FlinkTransportClientFactory(
TransportContext context, int fetchMaxRetries, List<TransportClientBootstrap> bootstraps) {
TransportContext context, List<TransportClientBootstrap> bootstraps) {
super(context, bootstraps);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
this.fetchMaxRetries = fetchMaxRetries;
this.pooledAllocator = new UnpooledByteBufAllocator(true);
}

public TransportClient createClientWithRetry(String remoteHost, int remotePort)
throws IOException, InterruptedException {
int retryCount = fetchMaxRetries;

while (retryCount > 0) {
try {
return createClient(remoteHost, remotePort);
} catch (Exception e) {
retryCount--;
logger.warn(
"Retrying ({}/{}) times create client to {}:{}",
retryCount,
fetchMaxRetries,
remoteHost,
remotePort,
e);
if (retryCount == 0) {
if (e instanceof InterruptedException || e instanceof IOException) {
throw e;
} else {
Utils.rethrowAsRuntimeException(e);
}
}
}
}

return null;
}

@Override
public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
return createClient(
remoteHost, remotePort, -1, new TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
return retryCreateClient(
remoteHost,
remotePort,
-1,
() -> new TransportFrameDecoderWithBufferSupplier(bufferSuppliers));
}

public void registerSupplier(long streamId, Supplier<ByteBuf> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ public FlinkShuffleClientImpl(

private void initializeTransportClientFactory() {
if (null == flinkTransportClientFactory) {
flinkTransportClientFactory =
new FlinkTransportClientFactory(
context, conf.clientFetchMaxRetriesForEachReplica(), createBootstraps());
flinkTransportClientFactory = new FlinkTransportClientFactory(context, createBootstraps());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
Expand Down Expand Up @@ -92,6 +94,8 @@ private static class ClientPool {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
protected ByteBufAllocator pooledAllocator;
private final int maxClientConnectRetries;
private final int maxClientConnectRetryWaitTimeMs;

public TransportClientFactory(
TransportContext context, List<TransportClientBootstrap> clientBootstraps) {
Expand All @@ -114,6 +118,8 @@ public TransportClientFactory(
this.pooledAllocator =
NettyUtils.getPooledByteBufAllocator(
conf, context.getSource(), false, conf.clientThreads());
this.maxClientConnectRetries = conf.maxIORetries();
this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
}

/**
Expand All @@ -130,7 +136,40 @@ public TransportClientFactory(
*/
public TransportClient createClient(String remoteHost, int remotePort, int partitionId)
throws IOException, InterruptedException {
return createClient(remoteHost, remotePort, partitionId, new TransportFrameDecoder());
return retryCreateClient(remoteHost, remotePort, partitionId, TransportFrameDecoder::new);
}

public TransportClient retryCreateClient(
String remoteHost,
int remotePort,
int partitionId,
Supplier<ChannelInboundHandlerAdapter> supplier)
throws IOException, InterruptedException {
int numTries = 0;
while (numTries < maxClientConnectRetries) {
try {
return createClient(remoteHost, remotePort, partitionId, supplier.get());
} catch (Exception e) {
numTries++;
logger.warn(
"Retry create client, times {}/{} with error: {}",
numTries,
maxClientConnectRetries,
e.getMessage(),
e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (numTries == maxClientConnectRetries) {
throw e;
}

Uninterruptibles.sleepUninterruptibly(
maxClientConnectRetryWaitTimeMs, TimeUnit.MILLISECONDS);
}
}

return null;
}

public TransportClient createClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,10 @@ object CelebornConf extends Logging {
.doc(
"Max number of times we will try IO exceptions (such as connection timeouts) per request. " +
"If set to 0, we will not do any retries. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker." +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data.")
.intConf
Expand All @@ -2050,6 +2054,8 @@ object CelebornConf extends Logging {
"Only relevant if maxIORetries > 0. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker." +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data.")
.version("0.2.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@

import static org.apache.celeborn.common.util.JavaUtils.getLocalHost;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.server.BaseMessageHandler;
import org.apache.celeborn.common.network.server.TransportServer;
import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.network.util.TransportFrameDecoder;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;

Expand Down Expand Up @@ -240,4 +247,17 @@ public void unlimitedConnectionAndCreationTimeouts() throws IOException, Interru
assertNotEquals(exception.getCause(), null);
}
}

@Test
public void testRetryCreateClient() throws IOException, InterruptedException {
TransportClientFactory factory = Mockito.spy(context.createClientFactory());
TransportClient client = mock(TransportClient.class);
Mockito.doThrow(new IOException("xx"))
.doReturn(client)
.when(factory)
.createClient(anyString(), anyInt(), anyInt(), any());
TransportClient transportClient =
factory.retryCreateClient("xxx", 10, 1, TransportFrameDecoder::new);
Assert.assertEquals(transportClient, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;

import com.google.common.base.Throwables;
import org.junit.Before;
import org.junit.Test;

import org.apache.celeborn.common.CelebornConf;
Expand All @@ -38,10 +39,17 @@
* black boxes.
*/
public class RegistrationSuiteJ extends SaslTestBase {
private CelebornConf celebornConf;

@Before
public void before() throws Exception {
celebornConf = new CelebornConf();
celebornConf.set("celeborn.shuffle.io.maxRetries", "1");
}

@Test
public void testRegistration() throws Throwable {
TransportConf conf = new TransportConf("shuffle", new CelebornConf());
TransportConf conf = new TransportConf("shuffle", celebornConf);
RegistrationServerBootstrap serverBootstrap =
new RegistrationServerBootstrap(conf, new TestSecretRegistry());
RegistrationClientBootstrap clientBootstrap =
Expand All @@ -52,7 +60,7 @@ public void testRegistration() throws Throwable {

@Test(expected = IOException.class)
public void testReRegisterationFails() throws Throwable {
TransportConf conf = new TransportConf("shuffle", new CelebornConf());
TransportConf conf = new TransportConf("shuffle", celebornConf);
// The SecretRegistryImpl already has the entry for TEST_USER so re-registering the app should
// fail.
RegistrationServerBootstrap serverBootstrap =
Expand All @@ -71,7 +79,7 @@ public void testReRegisterationFails() throws Throwable {

@Test(expected = IOException.class)
public void testConnectionAuthWithoutRegistrationShouldFail() throws Throwable {
TransportConf conf = new TransportConf("shuffle", new CelebornConf());
TransportConf conf = new TransportConf("shuffle", celebornConf);
RegistrationServerBootstrap serverBootstrap =
new RegistrationServerBootstrap(conf, new TestSecretRegistry());
SaslClientBootstrap clientBootstrap =
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ license: |
| celeborn.&lt;module&gt;.io.connectionTimeout | &lt;value of celeborn.network.timeout&gt; | false | Connection active timeout. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |
| celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker.If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. | | |
| celeborn.&lt;module&gt;.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | | |
| celeborn.&lt;module&gt;.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | 0.2.0 | |
| celeborn.&lt;module&gt;.io.retryWait | 5s | false | Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for Flink shuffle client push data. | 0.2.0 | |
| celeborn.&lt;module&gt;.io.retryWait | 5s | false | Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker.If setting <module> to `push`, it works for Flink shuffle client push data. | 0.2.0 | |
| celeborn.&lt;module&gt;.io.saslTimeout | 30s | false | Timeout for a single round trip of auth message exchange, in milliseconds. | 0.5.0 | |
| celeborn.&lt;module&gt;.io.sendBuffer | 0b | false | Send buffer size (SO_SNDBUF). If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | 0.2.0 | |
| celeborn.&lt;module&gt;.io.serverThreads | 0 | false | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | | |
Expand Down

0 comments on commit 14baec8

Please sign in to comment.