From 837b2088c8d62ae9f52c685459b461a8659af702 Mon Sep 17 00:00:00 2001 From: "liujianjun.ljj" Date: Mon, 15 Apr 2024 16:31:14 +0800 Subject: [PATCH] triple pojo mode support stream --- .../bootstrap/DefaultClientProxyInvoker.java | 2 +- .../sofa/rpc/client/AbstractCluster.java | 4 - .../rpc/config/AbstractInterfaceConfig.java | 22 +-- .../sofa/rpc/config/ConsumerConfig.java | 61 +------ .../alipay/sofa/rpc/config/MethodConfig.java | 38 ---- .../sofa/rpc/config/ProviderConfig.java | 1 - .../rpc/filter/ConsumerGenericFilter.java | 2 +- .../stream/ClientStreamObserverAdapter.java | 6 +- .../ResponseSerializeStreamHandler.java | 12 +- .../rpc/server/triple/GenericServiceImpl.java | 53 ++---- .../sofa/rpc/server/triple/TripleServer.java | 6 +- .../transport/triple/TripleClientInvoker.java | 37 ++-- .../alipay/sofa/rpc/utils/SofaProtoUtils.java | 116 +++++++++++- .../server/triple/GenericServiceImplTest.java | 2 +- .../src/main/proto/helloworld.proto | 4 + .../sofa/rpc/test/triple/GreeterImpl.java | 56 +++++- .../rpc/test/triple/TripleServerTest.java | 53 ------ .../triple/stream/ClientRequest.java | 2 +- .../triple/stream/ExtendClientRequest.java | 39 ++++ .../triple/stream/ExtendServerResponse.java | 39 ++++ .../triple/stream/HelloService.java | 8 +- .../test/triple/stream/HelloServiceImpl.java | 80 +++++++++ .../triple/stream/ServerResponse.java | 2 +- .../stream/TripleGenericStreamTest.java | 108 ++++++----- .../triple/stream/TripleStubStreamTest.java | 169 ++++++++++++++++++ .../rpc/triple/stream/HelloServiceImpl.java | 90 ---------- 26 files changed, 624 insertions(+), 388 deletions(-) rename test/test-integration/src/test/java/com/alipay/sofa/rpc/{ => test}/triple/stream/ClientRequest.java (95%) create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendServerResponse.java rename test/test-integration/src/test/java/com/alipay/sofa/rpc/{ => test}/triple/stream/HelloService.java (89%) create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java rename test/test-integration/src/test/java/com/alipay/sofa/rpc/{ => test}/triple/stream/ServerResponse.java (95%) rename test/test-integration/src/test/java/com/alipay/sofa/rpc/{ => test}/triple/stream/TripleGenericStreamTest.java (75%) create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleStubStreamTest.java delete mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloServiceImpl.java diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java index 0daa4bb56..0aadda9c9 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java @@ -92,7 +92,7 @@ protected void decorateRequest(SofaRequest request) { if (!consumerConfig.isGeneric()) { // 找到调用类型, generic的时候类型在filter里进行判断 - request.setInvokeType(consumerConfig.getMethodInvokeType(request)); + request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName())); } RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext(); diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java index 60280f440..33fc4cc2a 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java @@ -657,10 +657,6 @@ else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) { // 放入线程上下文 RpcInternalContext.getContext().setFuture(future); response = buildEmptyResponse(request); - } else if (RpcConstants.INVOKER_TYPE_CLIENT_STREAMING.equals(invokeType) - || RpcConstants.INVOKER_TYPE_BI_STREAMING.equals(invokeType) - || RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(invokeType)) { - response = transport.syncSend(request, Integer.MAX_VALUE); } else { throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType); } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java index 4cdf1674b..336292369 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java @@ -212,11 +212,6 @@ public abstract class AbstractInterfaceConfig configValueCache = null; - /** - * 方法调用类型,(方法全名 - 调用类型) - */ - protected transient volatile Map methodCallType = null; - /** * 代理接口类,和T对应,主要针对泛化调用 */ @@ -252,21 +247,6 @@ public S setProxyClass(Class proxyClass) { return castThis(); } - /** - * Cache the call type of interface methods - */ - protected void loadMethodCallType(Class interfaceClass){ - Method[] methods = interfaceClass.getDeclaredMethods(); - this.methodCallType = new ConcurrentHashMap<>(); - for(Method method :methods) { - methodCallType.put(method.getName(),MethodConfig.mapStreamType(method,RpcConstants.INVOKER_TYPE_SYNC)); - } - } - - public String getMethodCallType(String methodName) { - return methodCallType.get(methodName); - } - /** * Gets application. * @@ -1035,7 +1015,7 @@ public Object getMethodConfigValue(String methodName, String configKey) { * @param key the key * @return the string */ - protected String buildmkey(String methodName, String key) { + private String buildmkey(String methodName, String key) { return RpcConstants.HIDE_KEY_PREFIX + methodName + RpcConstants.HIDE_KEY_PREFIX + key; } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java index e151665a1..5bb2ef39c 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/ConsumerConfig.java @@ -27,7 +27,6 @@ import com.alipay.sofa.rpc.common.utils.ExceptionUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; -import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.listener.ChannelListener; import com.alipay.sofa.rpc.listener.ConsumerStateListener; import com.alipay.sofa.rpc.listener.ProviderInfoListener; @@ -39,7 +38,6 @@ import static com.alipay.sofa.rpc.common.RpcConfigs.getBooleanValue; import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue; import static com.alipay.sofa.rpc.common.RpcConfigs.getStringValue; -import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REJECTED_EXECUTION_POLICY; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_HOLDER; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_WAIT; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_CHECK; @@ -55,6 +53,7 @@ import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_LAZY; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_LOAD_BALANCER; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_RECONNECT_PERIOD; +import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REJECTED_EXECUTION_POLICY; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REPEATED_REFERENCE_LIMIT; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_RETRIES; import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_STICKY; @@ -936,62 +935,12 @@ public SofaResponseCallback getMethodOnreturn(String methodName) { /** * Gets the call type corresponding to the method name * - * @param sofaRequest the request + * @param methodName the method name * @return the call type */ - public String getMethodInvokeType(SofaRequest sofaRequest) { - String methodName = sofaRequest.getMethodName(); - - String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE, null); - - if (invokeType == null) { - invokeType = getAndCacheCallType(sofaRequest); - } - - return invokeType; - } - - /** - * Get and cache the call type of certain method - * @param request RPC request - * @return request call type - */ - public String getAndCacheCallType(SofaRequest request) { - Method method = request.getMethod(); - String callType = MethodConfig - .mapStreamType( - method, - (String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE, - getInvokeType()) - ); - //Method level config - updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true); - return callType; - } - - /** - * 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头) - * @param request RPC请求 - * @return 方法配置名称,带方法参数列表 - */ - public String buildMethodConfigKey(SofaRequest request, String propertyKey) { - return "." + getMethodSignature(request.getMethod()) + "." + propertyKey; - } - - public static String getMethodSignature(Method method) { - Class[] parameterTypes = method.getParameterTypes(); - StringBuilder methodSignature = new StringBuilder(); - methodSignature.append(method.getName()).append("("); - - for (int i = 0; i < parameterTypes.length; i++) { - methodSignature.append(parameterTypes[i].getSimpleName()); - if (i < parameterTypes.length - 1) { - methodSignature.append(", "); - } - } - - methodSignature.append(")"); - return methodSignature.toString(); + public String getMethodInvokeType(String methodName) { + return (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE, + getInvokeType()); } /** diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/MethodConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/MethodConfig.java index 265995505..01ce32faa 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/MethodConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/MethodConfig.java @@ -16,15 +16,10 @@ */ package com.alipay.sofa.rpc.config; -import com.alipay.sofa.rpc.common.RpcConstants; -import com.alipay.sofa.rpc.core.exception.RpcErrorType; -import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; import com.alipay.sofa.rpc.transport.StreamHandler; import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -334,37 +329,4 @@ public String getParameter(String key) { return parameters == null ? null : parameters.get(key); } - /** - * Gets the stream call type of certain method - * @param method the method - * @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value - */ - public static String mapStreamType(Method method, String defaultValue){ - Class[] paramClasses = method.getParameterTypes(); - Class returnClass = method.getReturnType(); - - int paramLen = paramClasses.length; - String callType; - - //BidirectionalStream & ClientStream - if(paramLen > 0 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && StreamHandler.class.isAssignableFrom(returnClass)){ - if(paramLen > 1){ - throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional/Client stream method parameters can be only one StreamHandler."); - } - callType = RpcConstants.INVOKER_TYPE_BI_STREAMING; - } - //ServerStream - else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass){ - callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING; - } - else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) { - throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs."); - } - //Other call types - else { - callType = defaultValue; - } - - return callType; - } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/ProviderConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/ProviderConfig.java index cc8872d0e..33b0d6897 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/ProviderConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/ProviderConfig.java @@ -208,7 +208,6 @@ public T getRef() { */ public ProviderConfig setRef(T ref) { this.ref = ref; - loadMethodCallType(ref.getClass()); return this; } diff --git a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/filter/ConsumerGenericFilter.java b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/filter/ConsumerGenericFilter.java index 1a30a39ca..4dcfa9309 100644 --- a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/filter/ConsumerGenericFilter.java +++ b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/filter/ConsumerGenericFilter.java @@ -90,7 +90,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So // 修正类型 ConsumerConfig consumerConfig = (ConsumerConfig) invoker.getConfig(); - String invokeType = consumerConfig.getMethodInvokeType(request); + String invokeType = consumerConfig.getMethodInvokeType(request.getMethodName()); request.setInvokeType(invokeType); request.addRequestProp(RemotingConstants.HEAD_INVOKE_TYPE, invokeType); request.addRequestProp(REVISE_KEY, REVISE_VALUE); diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ClientStreamObserverAdapter.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ClientStreamObserverAdapter.java index be61a5060..a7036c374 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ClientStreamObserverAdapter.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ClientStreamObserverAdapter.java @@ -42,10 +42,10 @@ public ClientStreamObserverAdapter(StreamHandler streamHandler, byte ser @Override public void onNext(triple.Response response) { - byte[] responseDate = response.getData().toByteArray(); + byte[] responseData = response.getData().toByteArray(); Object appResponse = null; String returnTypeName = response.getType(); - if (responseDate != null && responseDate.length > 0) { + if (responseData != null && responseData.length > 0) { if (returnType == null && !returnTypeName.isEmpty()) { try { returnType = Class.forName(returnTypeName); @@ -53,7 +53,7 @@ public void onNext(triple.Response response) { throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Can not find return type :" + returnType); } } - appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null); + appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseData), returnType, null); } streamHandler.onMessage(appResponse); diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ResponseSerializeStreamHandler.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ResponseSerializeStreamHandler.java index 51edcedf7..f3bae0a71 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ResponseSerializeStreamHandler.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/message/triple/stream/ResponseSerializeStreamHandler.java @@ -18,6 +18,7 @@ import com.alipay.sofa.rpc.codec.Serializer; import com.alipay.sofa.rpc.codec.SerializerFactory; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.transport.StreamHandler; import com.alipay.sofa.rpc.utils.TripleExceptionUtils; import com.google.protobuf.ByteString; @@ -37,8 +38,10 @@ public class ResponseSerializeStreamHandler implements StreamHandler { public ResponseSerializeStreamHandler(StreamObserver streamObserver, String serializeType) { this.streamObserver = streamObserver; - serializer = SerializerFactory.getSerializer(serializeType); - this.serializeType = serializeType; + if (StringUtils.isNotBlank(serializeType)) { + this.serializer = SerializerFactory.getSerializer(serializeType); + this.serializeType = serializeType; + } } @Override @@ -61,4 +64,9 @@ public void onException(Throwable throwable) { streamObserver.onError(TripleExceptionUtils.asStatusRuntimeException(throwable)); } + public void setSerializeType(String serializeType) { + this.serializer = SerializerFactory.getSerializer(serializeType); + this.serializeType = serializeType; + } + } diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java index 722c0796b..92de04fc9 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java @@ -22,8 +22,6 @@ import com.alipay.sofa.rpc.common.cache.ReflectCache; import com.alipay.sofa.rpc.common.utils.ClassTypeUtils; import com.alipay.sofa.rpc.common.utils.ClassUtils; -import com.alipay.sofa.rpc.common.utils.StringUtils; -import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; @@ -46,8 +44,6 @@ import java.lang.reflect.Method; import java.util.List; -import static com.alipay.sofa.rpc.common.RpcOptions.DEFAULT_SERIALIZATION; - /** * @author zhaowang * @version : GenericServiceImpl.java, v 0.1 2020年05月27日 9:19 下午 zhaowang Exp $ @@ -58,12 +54,9 @@ public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceI protected UniqueIdInvoker invoker; - private ProviderConfig providerConfig; - - public GenericServiceImpl(UniqueIdInvoker invoker, ProviderConfig serverConfig) { + public GenericServiceImpl(UniqueIdInvoker invoker) { super(); this.invoker = invoker; - this.providerConfig = serverConfig; } @Override @@ -114,7 +107,7 @@ public StreamObserver genericBiStream(StreamObserver response String methodName = serviceMethod.getName(); try { ResponseSerializeStreamHandler serverResponseHandler = new ResponseSerializeStreamHandler(responseObserver, - getSerialization()); + null); setBidirectionalStreamRequestParams(sofaRequest, serviceMethod, serverResponseHandler); @@ -123,37 +116,41 @@ public StreamObserver genericBiStream(StreamObserver response StreamHandler clientHandler = (StreamHandler) sofaResponse.getAppResponse(); return new StreamObserver() { - volatile Serializer serializer = null; + private volatile Serializer serializer = null; + + private volatile String serializeType = null; - volatile Class[] argTypes = null; + private volatile Class[] argTypes = null; @Override public void onNext(Request request) { checkInitialize(request); Object message = getInvokeArgs(request, argTypes, serializer, false)[0]; + serverResponseHandler.setSerializeType(serializeType); clientHandler.onMessage(message); } + @Override + public void onError(Throwable t) { + clientHandler.onException(t); + } + + @Override + public void onCompleted() { + clientHandler.onFinish(); + } + private void checkInitialize(Request request) { if (serializer == null && argTypes == null) { synchronized (this) { if (serializer == null && argTypes == null) { + serializeType = request.getSerializeType(); serializer = SerializerFactory.getSerializer(request.getSerializeType()); argTypes = getArgTypes(request, false); } } } } - - @Override - public void onError(Throwable t) { - clientHandler.onException(t); - } - - @Override - public void onCompleted() { - clientHandler.onFinish(); - } }; } catch (Exception e) { LOGGER.error("Invoke " + methodName + " error:", e); @@ -178,7 +175,7 @@ public void genericServerStream(Request request, StreamObserver respon Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType()); setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, serviceMethod, true); - sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, getSerialization()); + sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType()); invoker.invoke(sofaRequest); } catch (Exception e) { @@ -321,16 +318,4 @@ private Object[] getInvokeArgs(Request request, Class[] argTypes, Serializer ser } return args; } - - private String getSerialization() { - String serialization = providerConfig.getSerialization(); - if (StringUtils.isBlank(serialization)) { - serialization = getDefaultSerialization(); - } - return serialization; - } - - private String getDefaultSerialization() { - return DEFAULT_SERIALIZATION; - } } \ No newline at end of file diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java index f21fe5e39..79f86834f 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java @@ -298,7 +298,7 @@ private ServerServiceDefinition getServerServiceDefinition(ProviderConfig provid BindableService bindableService = (BindableService) providerConfig.getRef(); serviceDef = bindableService.bindService(); } else { - GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker,providerConfig); + GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker); genericService.setProxiedImpl(genericService); serviceDef = buildSofaServiceDef(genericService, providerConfig); } @@ -377,9 +377,9 @@ private ServiceDescriptor getServiceDescriptor(ServerServiceDefinition template, private List> getMethodDescriptor(ProviderConfig providerConfig) { List> result = new ArrayList<>(); Set methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId()); - + Map streamCallTypeMap = SofaProtoUtils.cacheStreamCallType(providerConfig); for (String name : methodNames) { - MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(providerConfig.getMethodCallType(name)); + MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(streamCallTypeMap.get(name)); MethodDescriptor methodDescriptor = MethodDescriptor.newBuilder() .setType(methodType) .setFullMethodName(generateFullMethodName(providerConfig.getInterfaceId(), name)) diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java index c660a4f8f..142293883 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java @@ -90,6 +90,11 @@ public class TripleClientInvoker implements TripleInvoker { private Map methodMap = new ConcurrentHashMap<>(); + /** + * 方法调用类型,(方法全名 - 流式调用类型) + */ + private Map methodCallType = null; + public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerInfo, Channel channel) { this.channel = channel; this.consumerConfig = consumerConfig; @@ -105,6 +110,8 @@ public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerI } catch (NoSuchMethodException e) { LOGGER.error("getSofaStub not found in enclosingClass" + enclosingClass.getName()); } + } else { + methodCallType = SofaProtoUtils.cacheStreamCallType(consumerConfig); } } @@ -141,21 +148,20 @@ protected String getDefaultSerialization() { @Override public SofaResponse invoke(SofaRequest sofaRequest, int timeout) throws Exception { - - MethodDescriptor.MethodType callType = mapCallType(sofaRequest); - if(!useGeneric){ return stubCall(sofaRequest,timeout); - } else if (callType.equals(MethodDescriptor.MethodType.UNARY)) { + } + MethodDescriptor.MethodType callType = mapCallType(sofaRequest.getMethod()); + if (callType.equals(MethodDescriptor.MethodType.UNARY)) { return unaryCall(sofaRequest, timeout); } else { return streamCall(sofaRequest, timeout, callType); } } - private MethodDescriptor.MethodType mapCallType(SofaRequest sofaRequest) { - String sofaCallType = sofaRequest.getInvokeType(); - switch (sofaCallType) { + private MethodDescriptor.MethodType mapCallType(Method method) { + String streamType = methodCallType.get(method.getName()); + switch (streamType) { case RpcConstants.INVOKER_TYPE_BI_STREAMING: return MethodDescriptor.MethodType.BIDI_STREAMING; case RpcConstants.INVOKER_TYPE_CLIENT_STREAMING: @@ -188,12 +194,12 @@ private SofaResponse unaryCall(SofaRequest sofaRequest, int timeout) throws Exce buildCustomCallOptions(sofaRequest, timeout), request); SofaResponse sofaResponse = new SofaResponse(); - byte[] responseDate = response.getData().toByteArray(); + byte[] responseData = response.getData().toByteArray(); Class returnType = sofaRequest.getMethod().getReturnType(); if (returnType != void.class) { - if (responseDate != null && responseDate.length > 0) { + if (responseData != null && responseData.length > 0) { Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType()); - Object appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null); + Object appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseData), returnType, null); sofaResponse.setAppResponse(appResponse); } } @@ -295,8 +301,7 @@ public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws E } } } - Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout), - null, consumerConfig, timeout); + Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout), timeout); m.invoke(stub, sofaRequest.getMethodArgs()[0], new StreamObserver() { @Override public void onNext(Object o) { @@ -384,12 +389,12 @@ private void processSuccess(boolean needDecode, RpcInternalContext context, Sofa Object appResponse = o; if (needDecode) { Response response = (Response) o; - byte[] responseDate = response.getData().toByteArray(); + byte[] responseData = response.getData().toByteArray(); Class returnType = sofaRequest.getMethod().getReturnType(); if (returnType != void.class) { - if (responseDate != null && responseDate.length > 0) { + if (responseData != null && responseData.length > 0) { Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType()); - appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null); + appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseData), returnType, null); } } } @@ -490,7 +495,7 @@ private MethodDescriptor getMethodDescriptor(SofaRequest sofa .setRequestMarshaller((MethodDescriptor.Marshaller) requestMarshaller) .setResponseMarshaller((MethodDescriptor.Marshaller) responseMarshaller); - MethodDescriptor.MethodType callType = SofaProtoUtils.mapGrpcCallType(sofaRequest.getInvokeType()); + MethodDescriptor.MethodType callType = mapCallType(sofaRequest.getMethod()); builder.setType(callType); return builder.build(); } diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java index 9afccbeef..c9a575f42 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java @@ -16,21 +16,30 @@ */ package com.alipay.sofa.rpc.utils; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.utils.ClassUtils; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; - +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.transport.StreamHandler; import io.grpc.BindableService; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.MethodDescriptor; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; -import static com.alipay.sofa.rpc.common.RpcConstants.*; +import static com.alipay.sofa.rpc.common.RpcConstants.INVOKER_TYPE_BI_STREAMING; +import static com.alipay.sofa.rpc.common.RpcConstants.INVOKER_TYPE_CLIENT_STREAMING; +import static com.alipay.sofa.rpc.common.RpcConstants.INVOKER_TYPE_SERVER_STREAMING; /** * @author zhaowang @@ -69,12 +78,10 @@ public static boolean checkIfUseGeneric(ConsumerConfig consumerConfig) { } public static MethodDescriptor.MethodType mapGrpcCallType(String callType) { + if (StringUtils.isBlank(callType)) { + return MethodDescriptor.MethodType.UNARY; + } switch (callType) { - case INVOKER_TYPE_ONEWAY: - case INVOKER_TYPE_FUTURE: - case INVOKER_TYPE_CALLBACK: - case INVOKER_TYPE_SYNC: - return MethodDescriptor.MethodType.UNARY; case INVOKER_TYPE_BI_STREAMING: return MethodDescriptor.MethodType.BIDI_STREAMING; case INVOKER_TYPE_CLIENT_STREAMING: @@ -86,4 +93,99 @@ public static MethodDescriptor.MethodType mapGrpcCallType(String callType) { } } + public static Map cacheStreamCallType(ConsumerConfig consumerConfig) { + Map methodCallType = new ConcurrentHashMap<>(); + Class proxyClass = consumerConfig.getProxyClass(); + Method[] declaredMethods = proxyClass.getDeclaredMethods(); + for (Method method : declaredMethods) { + String streamType = mapStreamType(method); + if (StringUtils.isNotBlank(streamType)) { + methodCallType.put(method.getName(), streamType); + } + } + return methodCallType; + } + + public static Map cacheStreamCallType(ProviderConfig providerConfig) { + Map methodCallType = new ConcurrentHashMap<>(); + Class proxyClass = providerConfig.getProxyClass(); + Method[] declaredMethods = proxyClass.getDeclaredMethods(); + for (Method method : declaredMethods) { + String streamType = mapStreamType(method); + if (StringUtils.isNotBlank(streamType)) { + methodCallType.put(method.getName(), streamType); + } + } + return methodCallType; + } + + /** + * Gets the stream call type of certain method + * + * @param method the method + * @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value + */ + private static String mapStreamType(Method method) { + Class[] paramClasses = method.getParameterTypes(); + Class returnClass = method.getReturnType(); + + int paramLen = paramClasses.length; + + //BidirectionalStream & ClientStream + if (paramLen > 0 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && StreamHandler.class.isAssignableFrom(returnClass)) { + if (paramLen > 1) { + throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Bidirectional/Client stream method parameters can be only one StreamHandler."); + } + return RpcConstants.INVOKER_TYPE_BI_STREAMING; + } + //ServerStream + else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass) { + return RpcConstants.INVOKER_TYPE_SERVER_STREAMING; + } else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) { + throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs."); + } + return null; + } + + /** + * Get and cache the call type of certain method + * @param request RPC request + * @return request call type + */ + /* + public String getAndCacheCallType(SofaRequest request) { + Method method = request.getMethod(); + String callType = MethodConfig + .mapStreamType( + method, null); + //Method level config + updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true); + return callType; + }*/ + + /** + * 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头) + * + * @param request RPC请求 + * @return 方法配置名称,带方法参数列表 + */ + public String buildMethodConfigKey(SofaRequest request, String propertyKey) { + return "." + getMethodSignature(request.getMethod()) + "." + propertyKey; + } + + public static String getMethodSignature(Method method) { + Class[] parameterTypes = method.getParameterTypes(); + StringBuilder methodSignature = new StringBuilder(); + methodSignature.append(method.getName()).append("("); + + for (int i = 0; i < parameterTypes.length; i++) { + methodSignature.append(parameterTypes[i].getSimpleName()); + if (i < parameterTypes.length - 1) { + methodSignature.append(", "); + } + } + + methodSignature.append(")"); + return methodSignature.toString(); + } } \ No newline at end of file diff --git a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java index a70c2199f..a45105de6 100644 --- a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java +++ b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java @@ -63,7 +63,7 @@ public GenericServiceImplTest(){ ProviderProxyInvoker invoker = new ProviderProxyInvoker(providerConfig); UniqueIdInvoker uniqueIdInvoker = new UniqueIdInvoker(); uniqueIdInvoker.registerInvoker(providerConfig, invoker); - genericService = new GenericServiceImpl(uniqueIdInvoker,providerConfig); + genericService = new GenericServiceImpl(uniqueIdInvoker); responseObserver = new MockStreamObserver<>(); } diff --git a/test/test-integration/src/main/proto/helloworld.proto b/test/test-integration/src/main/proto/helloworld.proto index be12afabc..0b8c94f14 100644 --- a/test/test-integration/src/main/proto/helloworld.proto +++ b/test/test-integration/src/main/proto/helloworld.proto @@ -27,6 +27,10 @@ service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} rpc SayHelloBinary (stream HelloRequest) returns (stream HelloReply){} + + rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply){} + + rpc SayHelloServerStream (HelloRequest) returns (stream HelloReply){} } // The request message containing the user's name. diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl.java index b917c1c75..a16e5be06 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.test.triple; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.examples.helloworld.SofaGreeterTriple; @@ -23,13 +25,18 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; public class GreeterImpl extends SofaGreeterTriple.GreeterImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(GreeterImpl.class); + //Intentionally using unsupported format - static final DateTimeFormatter[] datetimeFormatter = new DateTimeFormatter[] { DateTimeFormatter.ISO_DATE_TIME, - DateTimeFormatter.ISO_LOCAL_DATE_TIME, - DateTimeFormatter.BASIC_ISO_DATE }; + private static final DateTimeFormatter[] datetimeFormatter = new DateTimeFormatter[] { + DateTimeFormatter.ISO_DATE_TIME, + DateTimeFormatter.ISO_LOCAL_DATE_TIME, + DateTimeFormatter.BASIC_ISO_DATE }; @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { @@ -58,17 +65,60 @@ public StreamObserver sayHelloBinary(StreamObserver re @Override public void onNext(HelloRequest value) { + LOGGER.info("bi stream req onNext"); + responseObserver.onNext(HelloReply.newBuilder().setMessage(value.getName()) + .build()); responseObserver.onNext(HelloReply.newBuilder().setMessage(value.getName()) .build()); } @Override public void onError(Throwable t) { + LOGGER.info("bi stream req onError"); + } + + @Override + public void onCompleted() { + LOGGER.info("bi stream req onCompleted"); + responseObserver.onCompleted(); + } + }; + } + + @Override + public StreamObserver sayHelloClientStream(StreamObserver responseObserver) { + + List helloRequestList = new ArrayList<>(); + + return new StreamObserver() { + + @Override + public void onNext(HelloRequest value) { + LOGGER.info("client stream req receive"); + helloRequestList.add(value); + } + + @Override + public void onError(Throwable t) { + LOGGER.info("client stream req onError"); } @Override public void onCompleted() { + LOGGER.info("client stream req completed"); + responseObserver.onNext(HelloReply.newBuilder().setMessage(helloRequestList.get(0).getName() + helloRequestList.size()) + .build()); + responseObserver.onCompleted(); } }; } + + @Override + public void sayHelloServerStream(HelloRequest request, StreamObserver responseObserver) { + LOGGER.info("server stream req receive"); + responseObserver.onNext(HelloReply.newBuilder().setMessage(request.getName() + 1).build()); + responseObserver.onNext(HelloReply.newBuilder().setMessage(request.getName() + 2).build()); + responseObserver.onNext(HelloReply.newBuilder().setMessage(request.getName() + 3).build()); + responseObserver.onCompleted(); + } } \ No newline at end of file diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java index 1693fa45a..3c608ca86 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java @@ -33,15 +33,11 @@ import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; import io.grpc.examples.helloworld.SofaGreeterTriple; -import io.grpc.stub.StreamObserver; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * @author leizhiyuan */ @@ -212,55 +208,6 @@ public String messageSize(String msg, int responseSize) { Assert.assertEquals(reply, "Hello! world"); } - @Test - public void testBiStream() throws InterruptedException { - ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server"); - - int port = 50052; - - ServerConfig serverConfig = new ServerConfig() - .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) - .setPort(port); - - ProviderConfig providerConfig = new ProviderConfig() - .setApplication(applicationConfig) - .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) - .setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) - .setRef(new GreeterImpl()) - .setServer(serverConfig); - - providerConfig.export(); - - ConsumerConfig consumerConfig = new ConsumerConfig(); - consumerConfig.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) - .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) - .setDirectUrl("tri://127.0.0.1:" + port); - - SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer(); - - HelloRequest request = HelloRequest.newBuilder().setName("Hello world!").build(); - CountDownLatch countDownLatch = new CountDownLatch(1); - StreamObserver requestStreamObserver = greeterBlockingStub - .sayHelloBinary(new StreamObserver() { - @Override - public void onNext(HelloReply value) { - Assert.assertEquals(value.getMessage(), request.getName()); - countDownLatch.countDown(); - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - } - }); - requestStreamObserver.onNext(request); - Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); - requestStreamObserver.onCompleted(); - } - @Test //同步调用,直连 public void testSyncTimeout() { diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ClientRequest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java similarity index 95% rename from test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ClientRequest.java rename to test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java index efe29dc09..731cc5aa4 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ClientRequest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.rpc.triple.stream; +package com.alipay.sofa.rpc.test.triple.stream; public class ClientRequest { private String meg; diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java new file mode 100644 index 000000000..2a309fe26 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.triple.stream; + +/** + * @author Even + * @date 2024/4/15 10:33 + */ +public class ExtendClientRequest extends ClientRequest { + + private String extendString; + + public ExtendClientRequest(String meg, int count, String extendString) { + super(meg, count); + this.extendString = extendString; + } + + public String getExtendString() { + return extendString; + } + + public void setExtendString(String extendString) { + this.extendString = extendString; + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendServerResponse.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendServerResponse.java new file mode 100644 index 000000000..9037680da --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendServerResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.triple.stream; + +/** + * @author Even + * @date 2024/4/15 10:33 + */ +public class ExtendServerResponse extends ServerResponse { + + private String extendString; + + public ExtendServerResponse(String msg, int count, String extendString) { + super(msg, count); + this.extendString = extendString; + } + + public String getExtendString() { + return extendString; + } + + public void setExtendString(String extendString) { + this.extendString = extendString; + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloService.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java similarity index 89% rename from test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloService.java rename to test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java index f8fa147be..8f8c280cd 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloService.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.rpc.triple.stream; +package com.alipay.sofa.rpc.test.triple.stream; import com.alipay.sofa.rpc.transport.StreamHandler; @@ -26,12 +26,6 @@ public interface HelloService { String ERROR_MSG = "error msg"; - void sayHello(); - - void sayHello(String msg); - - String sayHelloUnary(String message); - StreamHandler sayHelloBiStream(StreamHandler streamHandler); void sayHelloServerStream(StreamHandler streamHandler, ClientRequest clientRequest); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java new file mode 100644 index 000000000..2527b78fa --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.triple.stream; + +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.transport.StreamHandler; + +public class HelloServiceImpl implements HelloService { + + private static final Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class); + + @Override + public StreamHandler sayHelloBiStream(StreamHandler streamHandler) { + return new StreamHandler() { + + @Override + public void onMessage(ClientRequest clientRequest) { + LOGGER.info("bi stream req onMessage"); + if (clientRequest.getMsg().equals(CMD_TRIGGER_STREAM_FINISH)) { + streamHandler.onFinish(); + } else if (clientRequest.getMsg().equals(CMD_TRIGGER_STEAM_ERROR)) { + streamHandler.onException(new RuntimeException(ERROR_MSG)); + } else { + if (clientRequest instanceof ExtendClientRequest) { + streamHandler.onMessage(new ExtendServerResponse(clientRequest.getMsg(), clientRequest + .getCount(), ((ExtendClientRequest) clientRequest).getExtendString())); + } else { + streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount())); + } + } + } + + @Override + public void onFinish() { + LOGGER.info("bi stream req onFinish"); + streamHandler.onFinish(); + } + + @Override + public void onException(Throwable throwable) { + LOGGER.error("bi stream req onException", throwable); + streamHandler.onMessage(new ServerResponse("Received exception:" + throwable.getMessage(), -2)); + streamHandler.onException(throwable); + } + }; + } + + @Override + public void sayHelloServerStream(StreamHandler streamHandler, ClientRequest clientRequest) { + LOGGER.info("server stream req receive"); + streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount())); + streamHandler.onMessage(new ExtendServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 1, + "extendString")); + streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 2)); + streamHandler.onMessage(new ExtendServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 3, + "extendString")); + streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 4)); + if (clientRequest.getMsg().equals(CMD_TRIGGER_STEAM_ERROR)) { + streamHandler.onException(new RuntimeException(ERROR_MSG)); + } else { + streamHandler.onFinish(); + } + } + +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ServerResponse.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ServerResponse.java similarity index 95% rename from test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ServerResponse.java rename to test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ServerResponse.java index 53eb0d970..3d7ec3878 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/ServerResponse.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ServerResponse.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.rpc.triple.stream; +package com.alipay.sofa.rpc.test.triple.stream; public class ServerResponse { private String msg; diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/TripleGenericStreamTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java similarity index 75% rename from test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/TripleGenericStreamTest.java rename to test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java index 1c5572403..b77ea5013 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/TripleGenericStreamTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.rpc.triple.stream; +package com.alipay.sofa.rpc.test.triple.stream; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; @@ -23,64 +23,60 @@ import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.context.RpcRunningState; import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.transport.StreamHandler; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class TripleGenericStreamTest { - static final String HELLO_MSG = "Hello, world!"; - ConsumerConfig consumerConfig; - ProviderConfig providerConfig; - HelloService helloServiceInst; + private static final Logger LOGGER = LoggerFactory.getLogger(TripleGenericStreamTest.class); + private static final String HELLO_MSG = "Hello, world!"; + private static ConsumerConfig consumerConfig; + private static ProviderConfig providerConfig; + private static HelloService helloServiceInst; + private static HelloService helloServiceRef; - ConsumerConfig consumerRefer() { - ConsumerConfig consumerConfig = new ConsumerConfig() - .setInterfaceId(HelloService.class.getName()) - .setProtocol("tri") - .setDirectUrl("triple://127.0.0.1:12200"); - consumerConfig.refer(); - return consumerConfig; - } - - ProviderConfig providerExport() { + @BeforeClass + public static void beforeClass() throws InterruptedException { + RpcRunningState.setUnitTestMode(true); ServerConfig serverConfig = new ServerConfig() .setProtocol("tri") .setPort(12200) .setDaemon(false); helloServiceInst = Mockito.spy(new HelloServiceImpl()); - - ProviderConfig providerConfig = new ProviderConfig() + providerConfig = new ProviderConfig() .setInterfaceId(HelloService.class.getName()) .setRef(helloServiceInst) .setServer(serverConfig); - providerConfig.export(); - return providerConfig; - } - @Before - public void bootStrap() { - RpcRunningState.setUnitTestMode(true); - providerConfig = providerExport(); - consumerConfig = consumerRefer(); + consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("tri") + .setDirectUrl("triple://127.0.0.1:12200"); + helloServiceRef = consumerConfig.refer(); } - @After - public void shutdown() { + @AfterClass + public static void afterClass() { consumerConfig.unRefer(); providerConfig.unExport(); RpcRuntimeContext.destroy(); @@ -88,55 +84,73 @@ public void shutdown() { RpcInvokeContext.removeContext(); } - public void testTripleBiStream(boolean endWithException) throws InterruptedException { + @Test + public void testTripleBiStreamFinish() throws InterruptedException { + testTripleBiStream(false); + } + + @Test + public void testTripleBiStreamException() throws InterruptedException { + testTripleBiStream(true); + } + public void testTripleBiStream(boolean endWithException) throws InterruptedException { int requestTimes = 5; CountDownLatch countDownLatch = new CountDownLatch(requestTimes + 1); AtomicBoolean receivedFinish = new AtomicBoolean(false); AtomicBoolean receivedException = new AtomicBoolean(false); - HelloService helloServiceRef = consumerConfig.refer(); - + List serverResponseList = new ArrayList<>(); StreamHandler streamHandler = helloServiceRef .sayHelloBiStream(new StreamHandler() { final AtomicInteger requestCount = new AtomicInteger(0); @Override public void onMessage(ServerResponse message) { + LOGGER.info("bi stream resp onMessage"); Assert.assertEquals(requestCount.getAndIncrement(), message.getCount()); Assert.assertEquals(HELLO_MSG, message.getMsg()); + serverResponseList.add(message); countDownLatch.countDown(); } @Override public void onFinish() { + LOGGER.info("bi stream resp onFinish"); receivedFinish.set(true); countDownLatch.countDown(); } @Override public void onException(Throwable throwable) { + LOGGER.error("bi stream resp onException", throwable); Assert.assertTrue(throwable.getMessage().contains(HelloService.ERROR_MSG)); receivedException.set(true); countDownLatch.countDown(); } }); + for (int k = 0; k < requestTimes; k++) { - streamHandler.onMessage(new ClientRequest(HELLO_MSG, k)); + if(k % 2 ==0) { + streamHandler.onMessage(new ClientRequest(HELLO_MSG, k)); + } else { + streamHandler.onMessage(new ExtendClientRequest(HELLO_MSG, k, "testExtendString")); + } } if (!endWithException) { streamHandler.onMessage(new ClientRequest(HelloService.CMD_TRIGGER_STREAM_FINISH, -2)); Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); Assert.assertTrue(receivedFinish.get()); streamHandler.onFinish(); + assertServerResponseType(serverResponseList); Assert.assertFalse(receivedException.get()); Assert.assertThrows(Throwable.class, () -> streamHandler.onMessage(new ClientRequest("", 123))); } else { streamHandler.onMessage(new ClientRequest(HelloService.CMD_TRIGGER_STEAM_ERROR, -2)); Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); streamHandler.onException(new RuntimeException(HelloService.ERROR_MSG)); - Assert.assertThrows(Throwable.class,()->streamHandler.onMessage(new ClientRequest(HELLO_MSG,0))); + Assert.assertThrows(Throwable.class, () -> streamHandler.onMessage(new ClientRequest(HELLO_MSG, 0))); Assert.assertFalse(receivedFinish.get()); Assert.assertTrue(receivedException.get()); } @@ -144,29 +158,33 @@ public void onException(Throwable throwable) { } @Test - public void testTripleBiStreamException() throws InterruptedException { - testTripleBiStream(true); + public void testTripleServerStreamFinish() throws InterruptedException { + testTripleServerStream(false); } @Test - public void testTripleBiStreamFinish() throws InterruptedException { - testTripleBiStream(false); + public void testTripleServerStreamException() throws InterruptedException { + testTripleServerStream(true); } public void testTripleServerStream(boolean endWithException) throws InterruptedException { + reset(helloServiceInst); HelloService helloServiceRef = consumerConfig.refer(); + Thread.sleep(5000); AtomicInteger count = new AtomicInteger(0); int responseTimes = 5; CountDownLatch countDownLatch = new CountDownLatch(responseTimes + 1); AtomicBoolean responseFinished = new AtomicBoolean(false); AtomicBoolean responseException = new AtomicBoolean(false); + List serverResponseList = new ArrayList<>(); helloServiceRef.sayHelloServerStream(new StreamHandler() { @Override public void onMessage(ServerResponse message) { Assert.assertEquals(endWithException ? HelloService.CMD_TRIGGER_STEAM_ERROR : HELLO_MSG, message.getMsg()); Assert.assertEquals(count.getAndIncrement(), message.getCount()); + serverResponseList.add(message); countDownLatch.countDown(); } @@ -188,6 +206,7 @@ public void onException(Throwable throwable) { if (endWithException) { Assert.assertTrue(responseException.get()); Assert.assertFalse(responseFinished.get()); + assertServerResponseType(serverResponseList); } else { Assert.assertTrue(responseFinished.get()); Assert.assertFalse(responseException.get()); @@ -196,13 +215,12 @@ public void onException(Throwable throwable) { verify(helloServiceInst, times(1)).sayHelloServerStream(any(), any()); } - @Test - public void testTripleServerStreamFinish() throws InterruptedException { - testTripleServerStream(false); + private void assertServerResponseType(List serverResponseList) { + for (int i = 0; i < serverResponseList.size(); i++) { + if (i % 2 != 0) { + Assert.assertTrue(serverResponseList.get(i) instanceof ExtendServerResponse); + } + } } - @Test - public void testTripleServerStreamException() throws InterruptedException { - testTripleServerStream(true); - } } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleStubStreamTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleStubStreamTest.java new file mode 100644 index 000000000..838797b43 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleStubStreamTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.triple.stream; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.test.triple.GreeterImpl; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.helloworld.SofaGreeterTriple; +import io.grpc.stub.StreamObserver; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author Even + * @date 2024/4/10 14:45 + */ +public class TripleStubStreamTest { + + private static final Logger LOGGER = LoggerFactory + .getLogger(TripleStubStreamTest.class); + + private static ConsumerConfig consumerConfig; + + private static ProviderConfig providerConfig; + + private static SofaGreeterTriple.IGreeter greeterStub; + + @BeforeClass + public static void beforeClass() throws InterruptedException { + RpcRunningState.setUnitTestMode(true); + ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server"); + int port = 50052; + ServerConfig serverConfig = new ServerConfig() + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setPort(port); + + providerConfig = new ProviderConfig() + .setApplication(applicationConfig) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setRef(new GreeterImpl()) + .setServer(serverConfig); + providerConfig.export(); + + consumerConfig = new ConsumerConfig<>(); + consumerConfig.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("tri://127.0.0.1:" + port); + + greeterStub = consumerConfig.refer(); + } + + @AfterClass + public static void afterClass() { + consumerConfig.unRefer(); + providerConfig.unExport(); + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + } + + @Test + public void testTripleStubBiStream() throws InterruptedException { + HelloRequest request = HelloRequest.newBuilder().setName("Hello world!").build(); + CountDownLatch countDownLatch = new CountDownLatch(5); + StreamObserver requestStreamObserver = greeterStub + .sayHelloBinary(new StreamObserver() { + @Override + public void onNext(HelloReply value) { + Assert.assertEquals(value.getMessage(), request.getName()); + LOGGER.info("bi stream resp onNext"); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable t) { + LOGGER.error("bi stream resp error", t); + } + + @Override + public void onCompleted() { + LOGGER.info("bi stream resp onCompleted"); + countDownLatch.countDown(); + } + }); + requestStreamObserver.onNext(request); + requestStreamObserver.onNext(request); + requestStreamObserver.onCompleted(); + Assert.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + + } + + @Test + public void testTripleStubClientStream() throws InterruptedException { + HelloRequest request = HelloRequest.newBuilder().setName("Hello world!").build(); + CountDownLatch clientStreamCountDownLatch = new CountDownLatch(2); + StreamObserver helloRequestStreamObserver = greeterStub + .sayHelloClientStream(new StreamObserver() { + @Override + public void onNext(HelloReply value) { + LOGGER.info("client stream resp onCompleted"); + Assert.assertEquals(value.getMessage(), request.getName() + 2); + clientStreamCountDownLatch.countDown(); + } + + @Override + public void onError(Throwable t) { + LOGGER.error("client stream resp error", t); + } + + @Override + public void onCompleted() { + clientStreamCountDownLatch.countDown(); + LOGGER.info("client stream resp onCompleted"); + } + }); + + helloRequestStreamObserver.onNext(request); + helloRequestStreamObserver.onNext(request); + helloRequestStreamObserver.onCompleted(); + Assert.assertTrue(clientStreamCountDownLatch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testTripleStubServerStream() throws InterruptedException { + HelloRequest request = HelloRequest.newBuilder().setName("Hello world!").build(); + CountDownLatch serverStreamCountDownLatch = new CountDownLatch(3); + Iterator helloReplyIterator = greeterStub.sayHelloServerStream(request); + int i = 0; + while (helloReplyIterator.hasNext()) { + i++; + serverStreamCountDownLatch.countDown(); + Assert.assertEquals(helloReplyIterator.next().getMessage(), request.getName() + i); + } + Assert.assertTrue(serverStreamCountDownLatch.await(10, TimeUnit.SECONDS)); + } + +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloServiceImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloServiceImpl.java deleted file mode 100644 index 491281f61..000000000 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/stream/HelloServiceImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.rpc.triple.stream; - -import com.alipay.sofa.rpc.transport.StreamHandler; - -public class HelloServiceImpl implements HelloService { - - @Override - public void sayHello() { - System.out.println("Get hello from consumer!"); - } - - @Override - public void sayHello(String msg) { - System.out.println("Get " + msg + "from consumer"); - } - - @Override - public String sayHelloUnary(String message) { - System.out.println("Get hello from consumer and try response..."); - return "Hello too, " + message; - } - - @Override - public StreamHandler sayHelloBiStream(StreamHandler streamHandler) { - return new ClientRequestEchoHandler(streamHandler); - } - - @Override - public void sayHelloServerStream(StreamHandler streamHandler, ClientRequest clientRequest) { - streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount())); - streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 1)); - streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 2)); - streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 3)); - streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 4)); - if (clientRequest.getMsg().equals(HelloService.CMD_TRIGGER_STEAM_ERROR)) { - streamHandler.onException(new RuntimeException(HelloService.ERROR_MSG)); - } else { - streamHandler.onFinish(); - } - } - - static class ClientRequestEchoHandler implements StreamHandler { - - StreamHandler respHandler; - - public ClientRequestEchoHandler(StreamHandler respHandler) { - this.respHandler = respHandler; - } - - @Override - public void onMessage(ClientRequest clientRequest) { - if (clientRequest.getMsg().equals(CMD_TRIGGER_STREAM_FINISH)) { - respHandler.onFinish(); - } else if (clientRequest.getMsg().equals(CMD_TRIGGER_STEAM_ERROR)) { - respHandler.onException(new RuntimeException(ERROR_MSG)); - } - else { - respHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount())); - } - } - - @Override - public void onFinish() { - respHandler.onFinish(); - } - - @Override - public void onException(Throwable throwable) { - respHandler.onMessage(new ServerResponse("Received exception:" + throwable.getMessage(), -2)); - respHandler.onException(throwable); - throwable.printStackTrace(); - } - }; -}