Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hdfs-17544. [ARR] The router client rpc protocol supports asynchrony. #6869

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
= new ThreadLocal<>();
public static final ThreadLocal<CompletableFuture<Object>> CALL_FUTURE_THREAD_LOCAL
= new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
Expand Down Expand Up @@ -283,6 +285,7 @@ static class Call {
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
private CompletableFuture<Object> completableFuture;

private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
Expand All @@ -304,6 +307,7 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
}

this.externalHandler = EXTERNAL_CALL_HANDLER.get();
this.completableFuture = CALL_FUTURE_THREAD_LOCAL.get();
}

@Override
Expand All @@ -322,6 +326,9 @@ protected synchronized void callComplete() {
externalHandler.notify();
}
}
if (completableFuture != null) {
completableFuture.complete(this);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public class RefreshUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshUserMappingsProtocol, Closeable {

/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
protected final static RpcController NULL_CONTROLLER = null;
private final RefreshUserMappingsProtocolPB rpcProxy;

private final static RefreshUserToGroupsMappingsRequestProto
protected final static RefreshUserToGroupsMappingsRequestProto
VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST =
RefreshUserToGroupsMappingsRequestProto.newBuilder().build();

private final static RefreshSuperUserGroupsConfigurationRequestProto
protected final static RefreshSuperUserGroupsConfigurationRequestProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST =
RefreshSuperUserGroupsConfigurationRequestProto.newBuilder().build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class GetUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GetUserMappingsProtocol, Closeable {

/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
protected final static RpcController NULL_CONTROLLER = null;
private final GetUserMappingsProtocolPB rpcProxy;

public GetUserMappingsProtocolClientSideTranslatorPB(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,49 +272,49 @@
@InterfaceStability.Stable
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
final protected ClientNamenodeProtocolPB rpcProxy;

static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
protected static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();

private final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
protected final static GetFsStatusRequestProto VOID_GET_FSSTATUS_REQUEST =
GetFsStatusRequestProto.newBuilder().build();

private final static GetFsReplicatedBlockStatsRequestProto
protected final static GetFsReplicatedBlockStatsRequestProto
VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST =
GetFsReplicatedBlockStatsRequestProto.newBuilder().build();

private final static GetFsECBlockGroupStatsRequestProto
protected final static GetFsECBlockGroupStatsRequestProto
VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST =
GetFsECBlockGroupStatsRequestProto.newBuilder().build();

private final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
protected final static RollEditsRequestProto VOID_ROLLEDITS_REQUEST =
RollEditsRequestProto.getDefaultInstance();

private final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
protected final static RefreshNodesRequestProto VOID_REFRESH_NODES_REQUEST =
RefreshNodesRequestProto.newBuilder().build();

private final static FinalizeUpgradeRequestProto
protected final static FinalizeUpgradeRequestProto
VOID_FINALIZE_UPGRADE_REQUEST =
FinalizeUpgradeRequestProto.newBuilder().build();

private final static UpgradeStatusRequestProto
protected final static UpgradeStatusRequestProto
VOID_UPGRADE_STATUS_REQUEST =
UpgradeStatusRequestProto.newBuilder().build();

private final static GetDataEncryptionKeyRequestProto
protected final static GetDataEncryptionKeyRequestProto
VOID_GET_DATA_ENCRYPTIONKEY_REQUEST =
GetDataEncryptionKeyRequestProto.newBuilder().build();

private final static GetStoragePoliciesRequestProto
protected final static GetStoragePoliciesRequestProto
VOID_GET_STORAGE_POLICIES_REQUEST =
GetStoragePoliciesRequestProto.newBuilder().build();

private final static GetErasureCodingPoliciesRequestProto
protected final static GetErasureCodingPoliciesRequestProto
VOID_GET_EC_POLICIES_REQUEST = GetErasureCodingPoliciesRequestProto
.newBuilder().build();

private final static GetErasureCodingCodecsRequestProto
protected final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.protocolPB;

import org.apache.hadoop.hdfs.server.federation.router.async.Async;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;


import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;

public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);

private AsyncRpcProtocolPBUtil() {}

public static <T> AsyncGet<T, Exception> asyncIpc(
ShadedProtobufHelper.IpcCall<T> call) throws IOException {
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
Client.CALL_FUTURE_THREAD_LOCAL.set(completableFuture);
ipc(call);
return (AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
}

public static <T> void asyncResponse(Response<T> response) {
CompletableFuture<T> callCompletableFuture =
(CompletableFuture<T>) Client.CALL_FUTURE_THREAD_LOCAL.get();
// transfer originCall & callerContext to worker threads of executor.
final Server.Call originCall = Server.getCurCall().get();
final CallerContext originContext = CallerContext.getCurrent();
CompletableFuture<Object> result = callCompletableFuture.thenApplyAsync(t -> {
try {
Server.getCurCall().set(originCall);
CallerContext.setCurrent(originContext);
return response.response();
}catch (Exception e) {
throw new CompletionException(e);
}
});
Async.CUR_COMPLETABLE_FUTURE.set(result);
}

@FunctionalInterface
interface Response<T> {
T response() throws Exception;
}
}
Loading
Loading