Skip to content

Commit

Permalink
router async rpc client & ThreadLocalContext
Browse files Browse the repository at this point in the history
  • Loading branch information
KeeProMise committed Jun 6, 2024
1 parent b121dea commit 5844e5f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
Expand All @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand All @@ -65,9 +67,9 @@
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;

public class RouterAsyncRpcClient extends RouterRpcClient{
private static final Logger LOG =
Expand Down Expand Up @@ -106,7 +108,7 @@ public <T extends RemoteLocationContext> boolean invokeAll(
Boolean.class);
asyncApply((ApplyFunction<Map<T, Boolean>, Object>)
results -> results.containsValue(true));
return asyncReturn(Boolean.class);
return (boolean) getFutureResult();
}

@Override
Expand All @@ -128,7 +130,7 @@ public Object invokeMethod(
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
}, RouterRpcServer.getAsyncRouterHandler());
return asyncReturn(Object.class);
return getFutureResult();
}

@SuppressWarnings("checkstyle:MethodLength")
Expand Down Expand Up @@ -308,7 +310,9 @@ private void invokeAsync(
int retryCount, final Method method,
final Object obj, final Object... params) {
try {
Client.setAsynchronousMode(true);
method.invoke(obj, params);
Client.setAsynchronousMode(false);
asyncCatch((AsyncCatchFunction<Object, Throwable>) (o, e) -> {
if (e instanceof IOException) {
IOException ioe = (IOException) e;
Expand Down Expand Up @@ -355,7 +359,7 @@ public <T> T invokeSequential(
Object expectedResultValue) throws IOException {
invokeSequential(remoteMethod, locations, expectedResultClass, expectedResultValue);
asyncApply((ApplyFunction<RemoteResult, Object>) result -> result.getResult());
return asyncReturn(expectedResultClass);
return (T) getFutureResult();
}

@Override
Expand Down Expand Up @@ -450,7 +454,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
@SuppressWarnings("unchecked") T ret = (T) firstResult[0];
return new RemoteResult<>(locations.get(0), ret);
});
return asyncReturn(RemoteResult.class);
return (RemoteResult) getFutureResult();
}

@Override
Expand Down Expand Up @@ -491,7 +495,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
}
return ret;
});
return asyncReturn(Map.class);
return (Map<T, R>) getFutureResult();
}

@Override
Expand Down Expand Up @@ -530,7 +534,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
releasePermit(ns, ugi, method, controller);
return o;
});
return null;
return (List<RemoteResult<T, R>>) getFutureResult();
}

if (rpcMonitor != null) {
Expand Down Expand Up @@ -606,7 +610,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
releasePermit(CONCURRENT_NS, ugi, method, controller);
return results;
});
return asyncReturn(List.class);
return (List<RemoteResult<T, R>>) getFutureResult();
}

@Override
Expand All @@ -628,8 +632,20 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
releasePermit(nsId, ugi, method, controller);
return o;
});
return asyncReturn(Object.class);
return getFutureResult();
}


// TODO: only test!!!
private Object getFutureResult() throws IOException {
try {
return syncReturn(Object.class);
} catch (ExecutionException | CompletionException e) {
throw (IOException) e.getCause();
} catch (IOException e) {
throw e;
} catch (Exception e) {
LOG.error("Unexpected exception", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public RouterRpcServer(Configuration conf, Router router,
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);

this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
true);
LOG.info("Router enable async {}", this.enableAsync);
if (this.enableAsync) {
initAsyncThreadPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static <R> void asyncComplete(R value) {
*/
public static void asyncThrowException(Throwable e) {
CompletableFuture<Object> result = new CompletableFuture<>();
result.completeExceptionally(e);
result.completeExceptionally(new CompletionException(e));
CUR_COMPLETABLE_FUTURE.set(result);
}

Expand Down

0 comments on commit 5844e5f

Please sign in to comment.