Skip to content

Commit

Permalink
refector
Browse files Browse the repository at this point in the history
  • Loading branch information
liushiming committed Jul 19, 2017
1 parent 0ac3150 commit f1f1f56
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 231 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/*
* Copyright (c) 2016, Quancheng-ec.com All right reserved. This software is the
* confidential and proprietary information of Quancheng-ec.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Quancheng-ec.com.
* Copyright (c) 2016, Quancheng-ec.com All right reserved. This software is the confidential and
* proprietary information of Quancheng-ec.com ("Confidential Information"). You shall not disclose
* such Confidential Information and shall use it only in accordance with the terms of the license
* agreement you entered into with Quancheng-ec.com.
*/
package com.quancheng.saluki.core.grpc.client;

Expand All @@ -13,8 +12,8 @@
import com.google.protobuf.Message;
import com.quancheng.saluki.core.common.Constants;
import com.quancheng.saluki.core.common.GrpcURL;
import com.quancheng.saluki.core.grpc.util.MethodDescriptorUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtils;
import com.quancheng.saluki.core.grpc.util.GrpcUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtil;
import com.quancheng.saluki.serializer.exception.ProtobufException;

import io.grpc.Channel;
Expand All @@ -26,137 +25,136 @@
*/
public interface GrpcRequest {

public Message getRequestArg() throws ProtobufException;
public Message getRequestArg() throws ProtobufException;

public MethodDescriptor<Message, Message> getMethodDescriptor();
public MethodDescriptor<Message, Message> getMethodDescriptor();

public Channel getChannel();
public Channel getChannel();

public void returnChannel(Channel channel);
public void returnChannel(Channel channel);

public String getServiceName();
public String getServiceName();

public GrpcURL getRefUrl();
public GrpcURL getRefUrl();

public MethodRequest getMethodRequest();
public MethodRequest getMethodRequest();

public void setMethodRequest(MethodRequest methodRequest);
public void setMethodRequest(MethodRequest methodRequest);

public static class Default implements GrpcRequest, Serializable {
public static class Default implements GrpcRequest, Serializable {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;

private final GrpcURL refUrl;
private final GrpcURL refUrl;

private final GrpcProtocolClient.ChannelCall chanelPool;
private final GrpcProtocolClient.ChannelCall chanelPool;

private MethodRequest methodRequest;
private MethodRequest methodRequest;

public Default(GrpcURL refUrl, GrpcProtocolClient.ChannelCall chanelPool){
super();
this.refUrl = refUrl;
this.chanelPool = chanelPool;
}

@Override
public Message getRequestArg() throws ProtobufException {
Object arg = this.getMethodRequest().getArg();
return SerializerUtils.Pojo2Protobuf(arg);
}
public Default(GrpcURL refUrl, GrpcProtocolClient.ChannelCall chanelPool) {
super();
this.refUrl = refUrl;
this.chanelPool = chanelPool;
}

@Override
public MethodDescriptor<Message, Message> getMethodDescriptor() {
Message argsReq = MethodDescriptorUtil.buildDefaultInstance(this.getMethodRequest().getRequestType());
Message argsRep = MethodDescriptorUtil.buildDefaultInstance(this.getMethodRequest().getResponseType());
return MethodDescriptorUtil.createMethodDescriptor(this.getServiceName(),
this.getMethodRequest().getMethodName(), argsReq,
argsRep);
}
@Override
public Message getRequestArg() throws ProtobufException {
Object arg = this.getMethodRequest().getArg();
return SerializerUtil.pojo2Protobuf(arg);
}

@Override
public Channel getChannel() {
return chanelPool.borrowChannel(refUrl);
}
@Override
public MethodDescriptor<Message, Message> getMethodDescriptor() {
Message argsReq = GrpcUtil.createDefaultInstance(this.getMethodRequest().getRequestType());
Message argsRep = GrpcUtil.createDefaultInstance(this.getMethodRequest().getResponseType());
return GrpcUtil.createMethodDescriptor(this.getServiceName(),
this.getMethodRequest().getMethodName(), argsReq, argsRep);
}

@Override
public void returnChannel(Channel channel) {
chanelPool.returnChannel(refUrl, channel);
}
@Override
public Channel getChannel() {
return chanelPool.borrowChannel(refUrl);
}

@Override
public String getServiceName() {
return refUrl.getServiceInterface();
}
@Override
public void returnChannel(Channel channel) {
chanelPool.returnChannel(refUrl, channel);
}

@Override
public MethodRequest getMethodRequest() {
return methodRequest;
}
@Override
public String getServiceName() {
return refUrl.getServiceInterface();
}

@Override
public void setMethodRequest(MethodRequest methodRequest) {
this.methodRequest = methodRequest;
}
@Override
public MethodRequest getMethodRequest() {
return methodRequest;
}

@Override
public GrpcURL getRefUrl() {
Object arg = this.methodRequest.getArg();
return this.refUrl.addParameter(Constants.METHOD_KEY, this.methodRequest.getMethodName())//
.addParameterAndEncoded(Constants.ARG_KEY, new Gson().toJson(arg));
}
@Override
public void setMethodRequest(MethodRequest methodRequest) {
this.methodRequest = methodRequest;
}

@Override
public GrpcURL getRefUrl() {
Object arg = this.methodRequest.getArg();
return this.refUrl.addParameter(Constants.METHOD_KEY, this.methodRequest.getMethodName())//
.addParameterAndEncoded(Constants.ARG_KEY, new Gson().toJson(arg));
}

public static class MethodRequest implements Serializable {
}

private static final long serialVersionUID = 5280935790994972153L;
public static class MethodRequest implements Serializable {

private final String methodName;
private static final long serialVersionUID = 5280935790994972153L;

private final Class<?> requestType;
private final String methodName;

private final Class<?> responseType;
private final Class<?> requestType;

private final Object arg;
private final Class<?> responseType;

private final int callType;
private final Object arg;

private final int callTimeout;
private final int callType;

public MethodRequest(String methodName, Class<?> requestType, Class<?> responseType, Object arg, int callType,
int callTimeout){
super();
this.methodName = methodName;
this.requestType = requestType;
this.responseType = responseType;
this.arg = arg;
this.callType = callType;
this.callTimeout = callTimeout;
}
private final int callTimeout;

public String getMethodName() {
return methodName;
}
public MethodRequest(String methodName, Class<?> requestType, Class<?> responseType, Object arg,
int callType, int callTimeout) {
super();
this.methodName = methodName;
this.requestType = requestType;
this.responseType = responseType;
this.arg = arg;
this.callType = callType;
this.callTimeout = callTimeout;
}

public Class<?> getRequestType() {
return requestType;
}
public String getMethodName() {
return methodName;
}

public Class<?> getResponseType() {
return responseType;
}
public Class<?> getRequestType() {
return requestType;
}

public Object getArg() {
return arg;
}
public Class<?> getResponseType() {
return responseType;
}

public int getCallType() {
return callType;
}
public Object getArg() {
return arg;
}

public int getCallTimeout() {
return callTimeout;
}
public int getCallType() {
return callType;
}

public int getCallTimeout() {
return callTimeout;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.Serializable;

import com.google.protobuf.Message;
import com.quancheng.saluki.core.grpc.util.SerializerUtils;
import com.quancheng.saluki.core.grpc.util.SerializerUtil;
import com.quancheng.saluki.serializer.exception.ProtobufException;

/**
Expand All @@ -25,7 +25,7 @@ public static class Default implements GrpcResponse, Serializable {

@Override
public Object getResponseArg() throws ProtobufException {
return SerializerUtils.Protobuf2Pojo(this.getMessage(), this.getReturnType());
return SerializerUtil.protobuf2Pojo(this.getMessage(), this.getReturnType());
}

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.quancheng.saluki.core.grpc.exception.RpcFrameworkException;
import com.quancheng.saluki.core.grpc.service.ClientServerMonitor;
import com.quancheng.saluki.core.grpc.service.MonitorService;
import com.quancheng.saluki.core.grpc.util.MethodDescriptorUtil;
import com.quancheng.saluki.core.grpc.util.GrpcUtil;
import com.quancheng.saluki.serializer.exception.ProtobufException;

import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -110,7 +110,7 @@ protected Object run() throws Exception {
@Override
protected Object getFallback() {
Class<?> responseType = this.request.getMethodRequest().getResponseType();
Message response = MethodDescriptorUtil.buildDefaultInstance(responseType);
Message response = GrpcUtil.createDefaultInstance(responseType);
Object obj = this.transformMessage(response);
collect(serviceName, methodName, getRequestMessage(), response, true);
return obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.slf4j.LoggerFactory;

import com.quancheng.saluki.core.common.RpcContext;
import com.quancheng.saluki.core.grpc.util.MetadataUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtils;
import com.quancheng.saluki.core.grpc.util.GrpcUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtil;

import io.grpc.CallOptions;
import io.grpc.Channel;
Expand Down Expand Up @@ -65,10 +65,10 @@ private void copyThreadLocalToMetadata(Metadata headers) {
Map<String, Object> values = RpcContext.getContext().get();
try {
if (!attachments.isEmpty()) {
headers.put(MetadataUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtils.toJson(attachments));
headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));
}
if (!values.isEmpty()) {
headers.put(MetadataUtil.GRPC_CONTEXT_VALUES, SerializerUtils.toJson(values));
headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import com.google.gson.reflect.TypeToken;
import com.quancheng.saluki.core.common.Constants;
import com.quancheng.saluki.core.common.RpcContext;
import com.quancheng.saluki.core.grpc.util.MetadataUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtils;
import com.quancheng.saluki.core.grpc.util.GrpcUtil;
import com.quancheng.saluki.core.grpc.util.SerializerUtil;

import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Grpc;
Expand Down Expand Up @@ -62,17 +62,17 @@ public void request(int numMessages) {
}

private void copyMetadataToThreadLocal(Metadata headers) {
String attachments = headers.get(MetadataUtil.GRPC_CONTEXT_ATTACHMENTS);
String values = headers.get(MetadataUtil.GRPC_CONTEXT_VALUES);
String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);
String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);
try {
if (attachments != null) {
Map<String, String> attachmentsMap = SerializerUtils.fromJson(attachments,
Map<String, String> attachmentsMap = SerializerUtil.fromJson(attachments,
new TypeToken<Map<String, String>>() {}.getType());
RpcContext.getContext().setAttachments(attachmentsMap);
}
if (values != null) {
Map<String, Object> valuesMap =
SerializerUtils.fromJson(values, new TypeToken<Map<String, Object>>() {}.getType());
SerializerUtil.fromJson(values, new TypeToken<Map<String, Object>>() {}.getType());
for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
RpcContext.getContext().set(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.quancheng.saluki.core.grpc.server.GrpcProtocolExporter;
import com.quancheng.saluki.core.grpc.service.ClientServerMonitor;
import com.quancheng.saluki.core.grpc.service.MonitorService;
import com.quancheng.saluki.core.grpc.util.MethodDescriptorUtil;
import com.quancheng.saluki.core.grpc.util.GrpcUtil;
import com.quancheng.saluki.core.utils.ReflectUtils;

import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -60,7 +60,7 @@ public ServerServiceDefinition export(Class<?> protocol, Object protocolImpl) {
final ConcurrentMap<String, AtomicInteger> concurrents = Maps.newConcurrentMap();
for (Method method : methods) {
MethodDescriptor<Message, Message> methodDescriptor =
MethodDescriptorUtil.createMethodDescriptor(serivce, method);
GrpcUtil.createMethodDescriptor(serivce, method);
serviceDefBuilder.addMethod(methodDescriptor, ServerCalls.asyncUnaryCall(
new ServerInvocation(serviceRef, method, providerUrl, concurrents, clientServerMonitor)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.quancheng.saluki.core.common.GrpcURL;
import com.quancheng.saluki.core.common.RpcContext;
import com.quancheng.saluki.core.grpc.service.MonitorService;
import com.quancheng.saluki.core.grpc.util.SerializerUtils;
import com.quancheng.saluki.core.grpc.util.SerializerUtil;
import com.quancheng.saluki.core.utils.ReflectUtils;

import io.grpc.Status;
Expand Down Expand Up @@ -63,10 +63,10 @@ public void invoke(Message request, StreamObserver<Message> responseObserver) {
try {
getConcurrent().getAndIncrement();
Class<?> requestType = ReflectUtils.getTypedOfReq(method);
Object reqPojo = SerializerUtils.Protobuf2Pojo(reqProtoBufer, requestType);
Object reqPojo = SerializerUtil.protobuf2Pojo(reqProtoBufer, requestType);
Object[] requestParams = new Object[] { reqPojo };
Object respPojo = method.invoke(serviceToInvoke, requestParams);
respProtoBufer = SerializerUtils.Pojo2Protobuf(respPojo);
respProtoBufer = SerializerUtil.pojo2Protobuf(respPojo);
collect(reqProtoBufer, respProtoBufer, start, false);
responseObserver.onNext(respProtoBufer);
responseObserver.onCompleted();
Expand Down
Loading

0 comments on commit f1f1f56

Please sign in to comment.