From 3be648834a6fb2aa796e104d998f2a1cd76ae39a Mon Sep 17 00:00:00 2001 From: yefei Date: Mon, 25 May 2020 11:29:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=87=8D=E8=BD=BD=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 67 ++++++++++++++++++- .../java/com/leaf/common/utils/Reflects.java | 10 ++- .../java/com/leaf/console/config/Config.java | 5 +- .../src/main/resources/static/echarts8.html | 2 +- console/src/main/resources/static/index.html | 9 +-- .../com/leaf/example/demo/HelloService.java | 5 ++ .../leaf/example/demo/HelloServiceImpl.java | 12 +++- .../leaf/example/demo/{generic => }/User.java | 12 +++- .../demo/async/ConsumerAsyncExample.java | 17 ++--- .../demo/attachment/ConsumerExample.java | 2 +- .../example/demo/basic/ConsumerExample.java | 24 +++---- .../example/demo/basic/ProviderExample.java | 1 - .../broadcast/ConsumerBroadcastExample.java | 2 +- .../example/demo/generic/ConsumerExample.java | 17 ++--- .../example/demo/generic/HelloService.java | 12 ---- .../demo/generic/HelloServiceImpl.java | 24 ------- .../example/demo/generic/ProviderExample.java | 2 + .../demo/oneway/ConsumerOneWayExample.java | 2 +- .../example/demo/simple/ConsumerExample.java | 22 ++++++ .../example/demo/simple/ProviderExample.java | 19 ++++++ .../leaf/example/flow/ConsumerExample.java | 2 +- .../example/zookeeper/ProviderExample.java | 3 +- pom.xml | 2 +- .../leaf/register/api/RegisterService.java | 5 ++ .../leaf/register/DefaultRegisterService.java | 5 ++ .../zookeeper/ZookeeperRegisterService.java | 28 +++++++- .../com/leaf/remoting/netty/NettyClient.java | 4 +- .../com/leaf/remoting/netty/NettyServer.java | 4 +- .../com/leaf/rpc/AbstractProxyFactory.java | 14 ++-- .../com/leaf/rpc/DefaultProxyFactory.java | 3 +- .../com/leaf/rpc/GenericProxyFactory.java | 3 +- .../leaf/rpc/consumer/DefaultLeafClient.java | 20 ++++-- .../com/leaf/rpc/consumer/LeafClient.java | 6 +- .../dispatcher/AbstractDispatcher.java | 14 ++-- .../leaf/rpc/provider/DefaultLeafServer.java | 9 ++- .../com/leaf/rpc/provider/LeafServer.java | 5 ++ .../process/DefaultRequestProcessor.java | 10 +-- .../spring/init/bean/ConsumerFactory.java | 27 +++++++- .../spring/init/bean/ProviderFactoryBean.java | 24 ++++++- 39 files changed, 319 insertions(+), 135 deletions(-) rename example/src/main/java/com/leaf/example/demo/{generic => }/User.java (67%) delete mode 100644 example/src/main/java/com/leaf/example/demo/generic/HelloService.java delete mode 100644 example/src/main/java/com/leaf/example/demo/generic/HelloServiceImpl.java create mode 100644 example/src/main/java/com/leaf/example/demo/simple/ConsumerExample.java create mode 100644 example/src/main/java/com/leaf/example/demo/simple/ProviderExample.java diff --git a/README.md b/README.md index 5db1385..9ed5ca8 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,73 @@ + 负载均衡算法:加权轮询、加权随机 + 注解配置 - #### 示例: +##### 一个最简单的例子 +##### 1、provider +```` java +public interface HelloService { + + String sayHello(String name); + + String sayHello(String name, String age); + + String sayHello(User user); +} + +public class HelloServiceImpl implements HelloService { + + /** + * logger + */ + private final static Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class); + + @Override + public String sayHello(String name) { + return "hello" + name; + } + + @Override + public String sayHello(String name, String age) { + return "hello:" + name + ",age:" + age; + } + + @Override + public String sayHello(User user) { + logger.info("HelloServiceImpl param:{}", user.getName()); + return "HelloServiceImpl param: " + user.getName(); + } +} + +public class ProviderExample { + + public static void main(String[] args) { + LeafServer leafServer = new DefaultLeafServer(9180); + leafServer.start(); + + leafServer.serviceRegistry() + .provider(new HelloServiceImpl()) + .interfaceClass(HelloService.class) + .register(); + } +} +```` + +#### 2、consumer +```` java +public class ConsumerExample { + + public static void main(String[] args) { + LeafClient leafClient = new DefaultLeafClient("consumer"); + + HelloService helloService = DefaultProxyFactory.factory(HelloService.class) + .consumer(leafClient) + .providers(new UnresolvedAddress("127.0.0.1", 9180)) + .newProxy(); + System.out.println(helloService.sayHello("i'm king", "119")); + System.out.println(helloService.sayHello(new User("达维安爵士", "108"))); + } +} +```` ##### 具体可参考example模块 ##### 集成spring示例 diff --git a/common/src/main/java/com/leaf/common/utils/Reflects.java b/common/src/main/java/com/leaf/common/utils/Reflects.java index 462ec61..0172f9c 100644 --- a/common/src/main/java/com/leaf/common/utils/Reflects.java +++ b/common/src/main/java/com/leaf/common/utils/Reflects.java @@ -27,7 +27,15 @@ public static Object Invoke(Object obj, String methodName, Object[] args) { METHODACCESS_CACHE.remove(aClass); invoker = MethodAccess.get(aClass); } - return invoker.invoke(obj, methodName, args); + if (Collections.isNotEmpty(args)) { + Class[] paramTypes = new Class[args.length]; + for (int i = 0; i < args.length; i++) { + paramTypes[i] = args[i].getClass(); + } + return invoker.invoke(obj, methodName, paramTypes, args); + } else { + return invoker.invoke(obj, methodName, args); + } } public static Object getTypeDefaultValue(Class clazz) { diff --git a/console/src/main/java/com/leaf/console/config/Config.java b/console/src/main/java/com/leaf/console/config/Config.java index 4962c9f..8d6491f 100644 --- a/console/src/main/java/com/leaf/console/config/Config.java +++ b/console/src/main/java/com/leaf/console/config/Config.java @@ -1,6 +1,5 @@ package com.leaf.console.config; -import com.leaf.common.concurrent.ConcurrentSet; import com.leaf.common.model.ServiceMeta; import com.leaf.register.api.*; import com.leaf.register.api.model.RegisterMeta; @@ -41,7 +40,7 @@ public void notify(SubscribeMeta subscribeMeta, NotifyEvent event) { } } if (event == NotifyEvent.ADD) { - subscribeMetas.add(subscribeMeta); + subscribeMetas.addIfAbsent(subscribeMeta); } else { subscribeMetas.remove(subscribeMeta); } @@ -61,7 +60,7 @@ public void notify(RegisterMeta registerMeta, NotifyEvent event) { } } if (event == NotifyEvent.ADD) { - registerMetas.add(registerMeta); + registerMetas.addIfAbsent(registerMeta); } else { registerMetas.remove(registerMeta); } diff --git a/console/src/main/resources/static/echarts8.html b/console/src/main/resources/static/echarts8.html index 0e2df17..fd8d0aa 100755 --- a/console/src/main/resources/static/echarts8.html +++ b/console/src/main/resources/static/echarts8.html @@ -2,7 +2,7 @@ - 后台登录-X-admin2.2 + console diff --git a/console/src/main/resources/static/index.html b/console/src/main/resources/static/index.html index e3bcf68..5162bfe 100755 --- a/console/src/main/resources/static/index.html +++ b/console/src/main/resources/static/index.html @@ -26,7 +26,7 @@
+ console
@@ -127,13 +127,6 @@ - \ No newline at end of file diff --git a/example/src/main/java/com/leaf/example/demo/HelloService.java b/example/src/main/java/com/leaf/example/demo/HelloService.java index 0d75b21..e154162 100644 --- a/example/src/main/java/com/leaf/example/demo/HelloService.java +++ b/example/src/main/java/com/leaf/example/demo/HelloService.java @@ -7,4 +7,9 @@ public interface HelloService { String sayHello(String name); + + String sayHello(String name, String age); + + String sayHello(User user); + } diff --git a/example/src/main/java/com/leaf/example/demo/HelloServiceImpl.java b/example/src/main/java/com/leaf/example/demo/HelloServiceImpl.java index f1cf936..bdef1b8 100644 --- a/example/src/main/java/com/leaf/example/demo/HelloServiceImpl.java +++ b/example/src/main/java/com/leaf/example/demo/HelloServiceImpl.java @@ -1,6 +1,5 @@ package com.leaf.example.demo; -import com.leaf.common.context.RpcContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,4 +18,15 @@ public class HelloServiceImpl implements HelloService { public String sayHello(String name) { return "hello" + name; } + + @Override + public String sayHello(String name, String age) { + return "hello:" + name + ",age:" + age; + } + + @Override + public String sayHello(User user) { + logger.info("HelloServiceImpl param:{}", user.getName()); + return "HelloServiceImpl param: " + user.getName(); + } } diff --git a/example/src/main/java/com/leaf/example/demo/generic/User.java b/example/src/main/java/com/leaf/example/demo/User.java similarity index 67% rename from example/src/main/java/com/leaf/example/demo/generic/User.java rename to example/src/main/java/com/leaf/example/demo/User.java index 2b8b975..a0c770d 100644 --- a/example/src/main/java/com/leaf/example/demo/generic/User.java +++ b/example/src/main/java/com/leaf/example/demo/User.java @@ -1,4 +1,4 @@ -package com.leaf.example.demo.generic; +package com.leaf.example.demo; public class User { @@ -6,6 +6,14 @@ public class User { private String age; + public User() { + } + + public User(String name, String age) { + this.name = name; + this.age = age; + } + public String getName() { return name; } @@ -21,4 +29,4 @@ public String getAge() { public void setAge(String age) { this.age = age; } -} +} \ No newline at end of file diff --git a/example/src/main/java/com/leaf/example/demo/async/ConsumerAsyncExample.java b/example/src/main/java/com/leaf/example/demo/async/ConsumerAsyncExample.java index eaa627a..b3f2aa0 100644 --- a/example/src/main/java/com/leaf/example/demo/async/ConsumerAsyncExample.java +++ b/example/src/main/java/com/leaf/example/demo/async/ConsumerAsyncExample.java @@ -2,13 +2,11 @@ import com.leaf.common.UnresolvedAddress; import com.leaf.common.context.RpcContext; -import com.leaf.common.model.ServiceMeta; import com.leaf.example.demo.HelloService; -import com.leaf.remoting.netty.NettyClientConfig; import com.leaf.rpc.DefaultProxyFactory; -import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.DefaultLeafClient; import com.leaf.rpc.consumer.InvokeType; +import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.future.InvokeFuture; import com.leaf.rpc.consumer.future.InvokeFutureContext; import com.leaf.rpc.consumer.future.InvokeFutureListener; @@ -16,17 +14,12 @@ public class ConsumerAsyncExample { public static void main(String[] args) { - NettyClientConfig config = new NettyClientConfig(); - LeafClient leafClient = new DefaultLeafClient("consumer", config); - UnresolvedAddress address = new UnresolvedAddress("127.0.0.1", 9180); - leafClient.connect(address); - - ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); + LeafClient leafClient = new DefaultLeafClient("consumer"); HelloService helloService = DefaultProxyFactory.factory(HelloService.class) .consumer(leafClient) - .directory(serviceMeta) + .providers(new UnresolvedAddress("127.0.0.1", 9180)) + .group("test1") .timeMillis(3000L) .invokeType(InvokeType.ASYNC) .newProxy(); @@ -52,6 +45,6 @@ public void failure(Throwable cause) { } catch (Throwable throwable) { throwable.printStackTrace(); } - } + } diff --git a/example/src/main/java/com/leaf/example/demo/attachment/ConsumerExample.java b/example/src/main/java/com/leaf/example/demo/attachment/ConsumerExample.java index b36cb5d..a358a9e 100644 --- a/example/src/main/java/com/leaf/example/demo/attachment/ConsumerExample.java +++ b/example/src/main/java/com/leaf/example/demo/attachment/ConsumerExample.java @@ -20,7 +20,7 @@ public class ConsumerExample { leafClient.connect(address); ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); helloService = DefaultProxyFactory.factory(HelloService.class) .consumer(leafClient) diff --git a/example/src/main/java/com/leaf/example/demo/basic/ConsumerExample.java b/example/src/main/java/com/leaf/example/demo/basic/ConsumerExample.java index 1139b6e..ba0ec68 100644 --- a/example/src/main/java/com/leaf/example/demo/basic/ConsumerExample.java +++ b/example/src/main/java/com/leaf/example/demo/basic/ConsumerExample.java @@ -1,39 +1,31 @@ package com.leaf.example.demo.basic; import com.leaf.common.UnresolvedAddress; -import com.leaf.common.model.ServiceMeta; import com.leaf.example.demo.HelloService; import com.leaf.rpc.DefaultProxyFactory; -import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.DefaultLeafClient; +import com.leaf.rpc.consumer.LeafClient; public class ConsumerExample { private static HelloService helloService; - static { + public static void main(String[] args) { LeafClient leafClient = new DefaultLeafClient("consumer"); - UnresolvedAddress address = new UnresolvedAddress("127.0.0.1", 9180); - leafClient.connect(address); - leafClient.connect(address); - leafClient.connect(address); - leafClient.connect(address); - - ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); helloService = DefaultProxyFactory.factory(HelloService.class) .consumer(leafClient) - .directory(serviceMeta) - .timeMillis(300000L) + .providers(new UnresolvedAddress("127.0.0.1", 9180)) + .group("test") + .version("1.0.0") + .timeMillis(3000L) .newProxy(); - } - public static void main(String[] args) { new ConsumerExample().invoke(); } public void invoke() { - helloService.sayHello(" biu biu biu!!!"); + String s = helloService.sayHello(" biu biu biu!!!"); + System.out.println(s); } } diff --git a/example/src/main/java/com/leaf/example/demo/basic/ProviderExample.java b/example/src/main/java/com/leaf/example/demo/basic/ProviderExample.java index c8659af..28ebfdd 100644 --- a/example/src/main/java/com/leaf/example/demo/basic/ProviderExample.java +++ b/example/src/main/java/com/leaf/example/demo/basic/ProviderExample.java @@ -19,7 +19,6 @@ public static void main(String[] args) { ServiceWrapper serviceWrapper = leafServer.serviceRegistry() .provider(helloService) .interfaceClass(HelloService.class) - .providerName("org.rpc.example.demo.HelloService") .group("test") .version("1.0.0") .register(); diff --git a/example/src/main/java/com/leaf/example/demo/broadcast/ConsumerBroadcastExample.java b/example/src/main/java/com/leaf/example/demo/broadcast/ConsumerBroadcastExample.java index 88cf42b..8a868f3 100644 --- a/example/src/main/java/com/leaf/example/demo/broadcast/ConsumerBroadcastExample.java +++ b/example/src/main/java/com/leaf/example/demo/broadcast/ConsumerBroadcastExample.java @@ -32,7 +32,7 @@ public static void main(String[] args) { for (UnresolvedAddress address : addresses) { leafClient.connect(address); - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); } HelloService helloService = DefaultProxyFactory.factory(HelloService.class) diff --git a/example/src/main/java/com/leaf/example/demo/generic/ConsumerExample.java b/example/src/main/java/com/leaf/example/demo/generic/ConsumerExample.java index 8cc0b2c..9a3ea70 100644 --- a/example/src/main/java/com/leaf/example/demo/generic/ConsumerExample.java +++ b/example/src/main/java/com/leaf/example/demo/generic/ConsumerExample.java @@ -2,25 +2,22 @@ import com.leaf.common.UnresolvedAddress; import com.leaf.common.model.ServiceMeta; -import com.leaf.remoting.netty.NettyClientConfig; +import com.leaf.example.demo.User; import com.leaf.rpc.GenericProxyFactory; -import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.DefaultLeafClient; +import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.invoke.GenericInvoke; public class ConsumerExample { public static void main(String[] args) throws Throwable { - NettyClientConfig config = new NettyClientConfig(); - LeafClient leafClient = new DefaultLeafClient("consumer", config); - UnresolvedAddress address = new UnresolvedAddress("127.0.0.1", 9180); - leafClient.connect(address); + LeafClient leafClient = new DefaultLeafClient("consumer"); - ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); + ServiceMeta serviceMeta = new ServiceMeta("leaf", "com.leaf.example.demo.HelloService", "1.0.0"); GenericInvoke genericInvoke = GenericProxyFactory.factory() .consumer(leafClient) + .providers(new UnresolvedAddress("127.0.0.1", 9180)) .directory(serviceMeta) .timeMillis(3000L) .newProxy(); @@ -34,9 +31,9 @@ public static void main(String[] args) throws Throwable { GenericInvoke genericInvoke2 = GenericProxyFactory.factory() .consumer(leafClient) .directory(serviceMeta) - .timeMillis(1L) + .timeMillis(100L) .newProxy(); - String s2 = (String) genericInvoke2.$invoke("sayHello", "yefei", "18"); + String s2 = (String) genericInvoke2.$invoke("sayHello", new User("yefei", "18")); System.out.printf("---------->: receive provider message %s \n", s2); } diff --git a/example/src/main/java/com/leaf/example/demo/generic/HelloService.java b/example/src/main/java/com/leaf/example/demo/generic/HelloService.java deleted file mode 100644 index 07a330d..0000000 --- a/example/src/main/java/com/leaf/example/demo/generic/HelloService.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.leaf.example.demo.generic; - -/** - * @author yefei - * @date 2017-06-20 14:13 - */ -public interface HelloService { - - String sayHello(String name, String age); - - String sayHello(User user); -} diff --git a/example/src/main/java/com/leaf/example/demo/generic/HelloServiceImpl.java b/example/src/main/java/com/leaf/example/demo/generic/HelloServiceImpl.java deleted file mode 100644 index 0be9210..0000000 --- a/example/src/main/java/com/leaf/example/demo/generic/HelloServiceImpl.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.leaf.example.demo.generic; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HelloServiceImpl implements HelloService { - - /** - * logger - */ - private final static Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class); - - @Override - public String sayHello(String name, String age) { - logger.info("HelloServiceImpl name:{}, age:{}", name, age); - return "hello" + name; - } - - @Override - public String sayHello(User user) { - logger.info("HelloServiceImpl param:{}", user.getName()); - return null; - } -} diff --git a/example/src/main/java/com/leaf/example/demo/generic/ProviderExample.java b/example/src/main/java/com/leaf/example/demo/generic/ProviderExample.java index ccc89c3..964eedc 100644 --- a/example/src/main/java/com/leaf/example/demo/generic/ProviderExample.java +++ b/example/src/main/java/com/leaf/example/demo/generic/ProviderExample.java @@ -1,5 +1,7 @@ package com.leaf.example.demo.generic; +import com.leaf.example.demo.HelloService; +import com.leaf.example.demo.HelloServiceImpl; import com.leaf.rpc.local.ServiceWrapper; import com.leaf.remoting.netty.NettyServerConfig; import com.leaf.rpc.provider.DefaultLeafServer; diff --git a/example/src/main/java/com/leaf/example/demo/oneway/ConsumerOneWayExample.java b/example/src/main/java/com/leaf/example/demo/oneway/ConsumerOneWayExample.java index 542b681..6d000ee 100644 --- a/example/src/main/java/com/leaf/example/demo/oneway/ConsumerOneWayExample.java +++ b/example/src/main/java/com/leaf/example/demo/oneway/ConsumerOneWayExample.java @@ -18,7 +18,7 @@ public static void main(String[] args) { leafClient.connect(address); ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); HelloService helloService = DefaultProxyFactory.factory(HelloService.class) .consumer(leafClient) diff --git a/example/src/main/java/com/leaf/example/demo/simple/ConsumerExample.java b/example/src/main/java/com/leaf/example/demo/simple/ConsumerExample.java new file mode 100644 index 0000000..5b80c70 --- /dev/null +++ b/example/src/main/java/com/leaf/example/demo/simple/ConsumerExample.java @@ -0,0 +1,22 @@ +package com.leaf.example.demo.simple; + +import com.leaf.common.UnresolvedAddress; +import com.leaf.example.demo.HelloService; +import com.leaf.example.demo.User; +import com.leaf.rpc.DefaultProxyFactory; +import com.leaf.rpc.consumer.DefaultLeafClient; +import com.leaf.rpc.consumer.LeafClient; + +public class ConsumerExample { + + public static void main(String[] args) { + LeafClient leafClient = new DefaultLeafClient("consumer"); + + HelloService helloService = DefaultProxyFactory.factory(HelloService.class) + .consumer(leafClient) + .providers(new UnresolvedAddress("127.0.0.1", 9180)) + .newProxy(); + System.out.println(helloService.sayHello("i'm king", "119")); + System.out.println(helloService.sayHello(new User("达维安爵士", "108"))); + } +} diff --git a/example/src/main/java/com/leaf/example/demo/simple/ProviderExample.java b/example/src/main/java/com/leaf/example/demo/simple/ProviderExample.java new file mode 100644 index 0000000..a6d46cd --- /dev/null +++ b/example/src/main/java/com/leaf/example/demo/simple/ProviderExample.java @@ -0,0 +1,19 @@ +package com.leaf.example.demo.simple; + +import com.leaf.example.demo.HelloService; +import com.leaf.example.demo.HelloServiceImpl; +import com.leaf.rpc.provider.DefaultLeafServer; +import com.leaf.rpc.provider.LeafServer; + +public class ProviderExample { + + public static void main(String[] args) { + LeafServer leafServer = new DefaultLeafServer(9180); + leafServer.start(); + + leafServer.serviceRegistry() + .provider(new HelloServiceImpl()) + .interfaceClass(HelloService.class) + .register(); + } +} diff --git a/example/src/main/java/com/leaf/example/flow/ConsumerExample.java b/example/src/main/java/com/leaf/example/flow/ConsumerExample.java index 7f63fc5..a5029ea 100644 --- a/example/src/main/java/com/leaf/example/flow/ConsumerExample.java +++ b/example/src/main/java/com/leaf/example/flow/ConsumerExample.java @@ -17,7 +17,7 @@ public static void main(String[] args) throws RemotingConnectException, Interrup leafClient.connect(address); ServiceMeta serviceMeta = new ServiceMeta("test", "org.rpc.example.demo.HelloService", "1.0.0"); - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); HelloService helloService = DefaultProxyFactory.factory(HelloService.class) .consumer(leafClient) diff --git a/example/src/main/java/com/leaf/example/zookeeper/ProviderExample.java b/example/src/main/java/com/leaf/example/zookeeper/ProviderExample.java index 9a92e37..db251c2 100644 --- a/example/src/main/java/com/leaf/example/zookeeper/ProviderExample.java +++ b/example/src/main/java/com/leaf/example/zookeeper/ProviderExample.java @@ -28,13 +28,14 @@ public static void main(String[] args) throws InterruptedException { @Override public void run() { leafServer.start(); - // 注册到本地容器 未发布到注册中心 ServiceWrapper serviceWrapper = leafServer.serviceRegistry() .provider(helloService) .register(); leafServer.connectToRegistryServer("121.43.175.216:2181"); leafServer.publishService(serviceWrapper); countDownLatch.countDown(); + + Runtime.getRuntime().addShutdownHook(new Thread(leafServer::shutdown)); } }).start(); Thread.sleep(500); diff --git a/pom.xml b/pom.xml index 182aacc..7f5469d 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ pom - 2.7.1 + 2.9.1 3.4.14 diff --git a/register/register-api/src/main/java/com/leaf/register/api/RegisterService.java b/register/register-api/src/main/java/com/leaf/register/api/RegisterService.java index debc018..cc68ba8 100644 --- a/register/register-api/src/main/java/com/leaf/register/api/RegisterService.java +++ b/register/register-api/src/main/java/com/leaf/register/api/RegisterService.java @@ -74,4 +74,9 @@ public interface RegisterService { */ void connectToRegistryServer(String addresses); + /** + * close + */ + void shutdown(); + } diff --git a/register/register-default/src/main/java/com/leaf/register/DefaultRegisterService.java b/register/register-default/src/main/java/com/leaf/register/DefaultRegisterService.java index bf0be60..f71c084 100644 --- a/register/register-default/src/main/java/com/leaf/register/DefaultRegisterService.java +++ b/register/register-default/src/main/java/com/leaf/register/DefaultRegisterService.java @@ -93,4 +93,9 @@ public void doSubscribeRegisterMeta(SubscribeMeta subscribeMeta) { public RegisterType registerType() { return RegisterType.DEFAULT; } + + @Override + public void shutdown() { + + } } diff --git a/register/register-zookeeper/src/main/java/com/leaf/register/zookeeper/ZookeeperRegisterService.java b/register/register-zookeeper/src/main/java/com/leaf/register/zookeeper/ZookeeperRegisterService.java index 8f261b6..99f239d 100644 --- a/register/register-zookeeper/src/main/java/com/leaf/register/zookeeper/ZookeeperRegisterService.java +++ b/register/register-zookeeper/src/main/java/com/leaf/register/zookeeper/ZookeeperRegisterService.java @@ -66,6 +66,8 @@ public class ZookeeperRegisterService extends AbstractRegisterService { */ private final ConcurrentHashMap subscribeConsumerVersion = new ConcurrentHashMap<>(); + private ScheduledThreadPoolExecutor executor; + private CuratorFramework curatorFramework; private String namespace = Constants.ZOOKEEPER_NAME_SPACE; @@ -120,7 +122,7 @@ public void stateChanged(CuratorFramework curatorFramework, ConnectionState conn curatorFramework.start(); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> { + executor = new ScheduledThreadPoolExecutor(1, r -> { Thread thread = new Thread(r); thread.setName("check pre register metas"); return thread; @@ -149,7 +151,7 @@ public void stateChanged(CuratorFramework curatorFramework, ConnectionState conn } } - }, 500, 100, TimeUnit.MILLISECONDS); + }, 1000, 3000, TimeUnit.MILLISECONDS); } @Override @@ -433,6 +435,28 @@ public List lookup() { return serviceMetas; } + @Override + public void shutdown() { + if (executor != null) { + executor.shutdownNow(); + } + for (Map.Entry entry : pathChildrenCache.entrySet()) { + try { + entry.getValue().close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + for (Map.Entry entry : serviceMetaPathChildrenCache.entrySet()) { + try { + entry.getValue().close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + curatorFramework.close(); + } + /** * +-------------------------------------------------------------------------------+ * | |/leaf(namespace) | diff --git a/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyClient.java b/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyClient.java index f3a8c58..8d4539c 100644 --- a/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyClient.java +++ b/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyClient.java @@ -18,7 +18,6 @@ import com.leaf.remoting.netty.handler.client.NettyClientHandler; import com.leaf.remoting.netty.handler.client.NettyConnectManageHandler; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -29,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -149,7 +147,7 @@ public ChannelGroup group(UnresolvedAddress address) { @Override public boolean addChannelGroup(Directory directory, UnresolvedAddress address) { ChannelGroup group = group(address); - CopyOnWriteArrayList groups = directoryChannelGroup.find(directory); + CopyOnWriteArrayList groups = directoryChannelGroup.find(directory); boolean added = groups.addIfAbsent(group); if (added) { if (logger.isInfoEnabled()) { diff --git a/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyServer.java b/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyServer.java index 2c8c067..3fa2b06 100644 --- a/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyServer.java +++ b/remoting/remoting-netty/src/main/java/com/leaf/remoting/netty/NettyServer.java @@ -267,10 +267,10 @@ public void shutdownGracefully() { nioEventLoopGroupWorker.shutdownGracefully().syncUninterruptibly(); if (publicExecutorService != null) { - publicExecutorService.shutdown(); + publicExecutorService.shutdownNow(); } if (scanResponseTableExecutorService != null) { - scanResponseTableExecutorService.shutdown(); + scanResponseTableExecutorService.shutdownNow(); } } diff --git a/rpc/src/main/java/com/leaf/rpc/AbstractProxyFactory.java b/rpc/src/main/java/com/leaf/rpc/AbstractProxyFactory.java index 451afa9..be7ebd3 100644 --- a/rpc/src/main/java/com/leaf/rpc/AbstractProxyFactory.java +++ b/rpc/src/main/java/com/leaf/rpc/AbstractProxyFactory.java @@ -135,10 +135,10 @@ protected void subscribe(ServiceMeta serviceMeta) { leafClient.subscribe(serviceMeta, new NotifyListener() { @Override public void notify(RegisterMeta registerMeta, NotifyEvent event) { - ChannelGroup group = leafClient.client().group(registerMeta.getAddress()); + ChannelGroup group = leafClient.remotingClient().group(registerMeta.getAddress()); switch (event) { case ADD: { - if (!leafClient.client().group(registerMeta.getAddress()).isAvailable()) { + if (!leafClient.remotingClient().group(registerMeta.getAddress()).isAvailable()) { int connCount = registerMeta.getConnCount() < 1 ? 1 : registerMeta.getConnCount(); for (int i = 0; i < connCount; i++) { leafClient.connect(registerMeta.getAddress()); @@ -146,23 +146,23 @@ public void notify(RegisterMeta registerMeta, NotifyEvent event) { leafClient.offlineListening(registerMeta.getAddress(), new OfflineListener() { @Override public void offline() { - leafClient.client().cancelReconnect(registerMeta.getAddress()); + leafClient.remotingClient().cancelReconnect(registerMeta.getAddress()); if (!group.isAvailable()) { - leafClient.client().removeChannelGroup(serviceMeta, registerMeta.getAddress()); + leafClient.remotingClient().removeChannelGroup(serviceMeta, registerMeta.getAddress()); } } }); } // channelGroup 和 serviceMeta 关系 - leafClient.client().addChannelGroup(serviceMeta, registerMeta.getAddress()); + leafClient.remotingClient().addChannelGroup(serviceMeta, registerMeta.getAddress()); // 设置channelGroup(相同地址的channel) weight - leafClient.client() + leafClient.remotingClient() .group(registerMeta.getAddress()) .setWeight(serviceMeta, registerMeta.getWeight()); break; } case REMOVE: { - leafClient.client().removeChannelGroup(serviceMeta, registerMeta.getAddress()); + leafClient.remotingClient().removeChannelGroup(serviceMeta, registerMeta.getAddress()); group.removeWeight(serviceMeta); break; } diff --git a/rpc/src/main/java/com/leaf/rpc/DefaultProxyFactory.java b/rpc/src/main/java/com/leaf/rpc/DefaultProxyFactory.java index cd53e05..1566aa9 100644 --- a/rpc/src/main/java/com/leaf/rpc/DefaultProxyFactory.java +++ b/rpc/src/main/java/com/leaf/rpc/DefaultProxyFactory.java @@ -50,7 +50,8 @@ public T newProxy() { Strings.isNullOrEmpty(version) ? Constants.DEFAULT_SERVICE_VERSION : version); for (UnresolvedAddress address : addresses) { - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.connect(address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); } if (leafClient.registerService() != null) { diff --git a/rpc/src/main/java/com/leaf/rpc/GenericProxyFactory.java b/rpc/src/main/java/com/leaf/rpc/GenericProxyFactory.java index baba802..521ce7a 100644 --- a/rpc/src/main/java/com/leaf/rpc/GenericProxyFactory.java +++ b/rpc/src/main/java/com/leaf/rpc/GenericProxyFactory.java @@ -31,7 +31,8 @@ public GenericInvoke newProxy() { Strings.isNullOrEmpty(version) ? Constants.DEFAULT_SERVICE_VERSION : version); for (UnresolvedAddress address : addresses) { - leafClient.client().addChannelGroup(serviceMeta, address); + leafClient.connect(address); + leafClient.remotingClient().addChannelGroup(serviceMeta, address); } if (leafClient.registerService() != null) { diff --git a/rpc/src/main/java/com/leaf/rpc/consumer/DefaultLeafClient.java b/rpc/src/main/java/com/leaf/rpc/consumer/DefaultLeafClient.java index 7b910e5..57748c2 100644 --- a/rpc/src/main/java/com/leaf/rpc/consumer/DefaultLeafClient.java +++ b/rpc/src/main/java/com/leaf/rpc/consumer/DefaultLeafClient.java @@ -20,7 +20,7 @@ public class DefaultLeafClient implements LeafClient { private final String application; - private final RemotingClient rpcClient; + private final RemotingClient remotingClient; private final RegisterType registerType; @@ -43,19 +43,19 @@ public DefaultLeafClient(String application, NettyClientConfig nettyClientConfig this.application = application; this.registerType = registerType; - this.rpcClient = new NettyClient(nettyClientConfig); - this.rpcClient.start(); + this.remotingClient = new NettyClient(nettyClientConfig); + this.remotingClient.start(); } @Override - public RemotingClient client() { - return rpcClient; + public RemotingClient remotingClient() { + return remotingClient; } @Override public void connect(UnresolvedAddress address) { try { - rpcClient.connect(address); + remotingClient.connect(address); } catch (Exception e) { AnyThrow.throwUnchecked(e); } @@ -80,6 +80,14 @@ public void connectToRegistryServer(String addresses) { registerService.connectToRegistryServer(addresses); } + @Override + public void shutdown() { + remotingClient.shutdownGracefully(); + if (registerService != null) { + registerService.shutdown(); + } + } + @Override public String application() { return application; diff --git a/rpc/src/main/java/com/leaf/rpc/consumer/LeafClient.java b/rpc/src/main/java/com/leaf/rpc/consumer/LeafClient.java index 6e6c0d5..6c35628 100644 --- a/rpc/src/main/java/com/leaf/rpc/consumer/LeafClient.java +++ b/rpc/src/main/java/com/leaf/rpc/consumer/LeafClient.java @@ -16,7 +16,7 @@ public interface LeafClient { * 通信客户端 * @return */ - RemotingClient client() ; + RemotingClient remotingClient() ; /** * @@ -52,4 +52,8 @@ public interface LeafClient { */ void offlineListening(UnresolvedAddress address, OfflineListener listener); + /** + * + */ + void shutdown(); } diff --git a/rpc/src/main/java/com/leaf/rpc/consumer/dispatcher/AbstractDispatcher.java b/rpc/src/main/java/com/leaf/rpc/consumer/dispatcher/AbstractDispatcher.java index b6d3b43..ff2c965 100644 --- a/rpc/src/main/java/com/leaf/rpc/consumer/dispatcher/AbstractDispatcher.java +++ b/rpc/src/main/java/com/leaf/rpc/consumer/dispatcher/AbstractDispatcher.java @@ -50,7 +50,7 @@ public AbstractDispatcher(LeafClient leafClient, LoadBalancer loadBalancer, Seri } protected ChannelGroup select(ServiceMeta metadata) { - List groups = leafClient.client().directory(metadata); + List groups = leafClient.remotingClient().directory(metadata); ChannelGroup group = loadBalancer.select(groups, metadata); @@ -69,7 +69,7 @@ protected ChannelGroup select(ServiceMeta metadata) { } protected ChannelGroup[] groups(ServiceMeta metadata) { - List channelGroups = leafClient.client().directory(metadata); + List channelGroups = leafClient.remotingClient().directory(metadata); checkState(channelGroups.size() > 0, metadata + " no channel"); ChannelGroup[] channelGroupsArray = new ChannelGroup[channelGroups.size()]; @@ -131,7 +131,7 @@ protected InvokeFuture invoke(final RequestCommand requestCommand, private InvokeFuture invokeSync(RequestCommand requestCommand, Class returnType, ChannelGroup channelGroup) throws Throwable { InvokeFuture invokeFuture = new DefaultInvokeFuture<>(returnType, timeoutMillis); ResponseCommand responseCommand = leafClient - .client() + .remotingClient() .invokeSync(channelGroup.remoteAddress(), requestCommand, timeoutMillis); @@ -151,7 +151,7 @@ private InvokeFuture invokeAsync(RequestCommand requestCommand, DispatchT switch (dispatchType) { case ROUND: { invokeFuture = new DefaultInvokeFuture(returnType, timeoutMillis); - leafClient.client().invokeAsync( + leafClient.remotingClient().invokeAsync( channelGroup[0].remoteAddress(), requestCommand, timeoutMillis, @@ -163,7 +163,7 @@ private InvokeFuture invokeAsync(RequestCommand requestCommand, DispatchT invokeFuture = new DefaultInvokeFutureGroup(futures); for (int i = 0; i < channelGroup.length; i++) { futures[i] = new DefaultInvokeFuture(returnType, timeoutMillis); - leafClient.client().invokeAsync( + leafClient.remotingClient().invokeAsync( channelGroup[i].remoteAddress(), requestCommand.clone(), timeoutMillis, @@ -183,14 +183,14 @@ private InvokeFuture invokeAsync(RequestCommand requestCommand, DispatchT private void invokeOneWay(RequestCommand requestCommand, DispatchType dispatchType, ChannelGroup... channelGroup) throws Throwable { switch (dispatchType) { case ROUND: { - leafClient.client().invokeOneWay(channelGroup[0].remoteAddress(), + leafClient.remotingClient().invokeOneWay(channelGroup[0].remoteAddress(), requestCommand, timeoutMillis); break; } case BROADCAST: { for (int i = 0; i < channelGroup.length; i++) { - leafClient.client().invokeOneWay(channelGroup[i].remoteAddress(), + leafClient.remotingClient().invokeOneWay(channelGroup[i].remoteAddress(), requestCommand.clone(), timeoutMillis); } diff --git a/rpc/src/main/java/com/leaf/rpc/provider/DefaultLeafServer.java b/rpc/src/main/java/com/leaf/rpc/provider/DefaultLeafServer.java index 4b9300b..eaea658 100644 --- a/rpc/src/main/java/com/leaf/rpc/provider/DefaultLeafServer.java +++ b/rpc/src/main/java/com/leaf/rpc/provider/DefaultLeafServer.java @@ -87,6 +87,14 @@ public void start() { this.server.start(); } + @Override + public void shutdown() { + if (registerService != null) { + registerService.shutdown(); + } + server.shutdownGracefully(); + } + @Override public String application() { return null; @@ -142,5 +150,4 @@ public void publishService(ServiceWrapper serviceWrapper) { registerService.register(registerMeta); } - } diff --git a/rpc/src/main/java/com/leaf/rpc/provider/LeafServer.java b/rpc/src/main/java/com/leaf/rpc/provider/LeafServer.java index 8288563..c1b2e5b 100644 --- a/rpc/src/main/java/com/leaf/rpc/provider/LeafServer.java +++ b/rpc/src/main/java/com/leaf/rpc/provider/LeafServer.java @@ -17,6 +17,11 @@ public interface LeafServer { */ void start(); + /** + * + */ + void shutdown(); + /** * 本地容器查找 * diff --git a/rpc/src/main/java/com/leaf/rpc/provider/process/DefaultRequestProcessor.java b/rpc/src/main/java/com/leaf/rpc/provider/process/DefaultRequestProcessor.java index f677aef..139a0ba 100644 --- a/rpc/src/main/java/com/leaf/rpc/provider/process/DefaultRequestProcessor.java +++ b/rpc/src/main/java/com/leaf/rpc/provider/process/DefaultRequestProcessor.java @@ -6,6 +6,7 @@ import com.leaf.remoting.api.RemotingCommandFactory; import com.leaf.remoting.api.RequestCommandProcessor; import com.leaf.remoting.api.ResponseStatus; +import com.leaf.remoting.api.exception.RemotingException; import com.leaf.remoting.api.payload.RequestCommand; import com.leaf.remoting.api.payload.ResponseCommand; import com.leaf.rpc.container.ServiceProviderContainer; @@ -104,11 +105,12 @@ public ResponseCommand process(ChannelHandlerContext context, RequestCommand req Object result = null; if (serviceWrapper == null) { String message = String.format( - "%s service: [%s] not found", - context.channel(), - requestWrapper.getServiceMeta() + "service: [%s] not found, channel %s ", + requestWrapper.getServiceMeta(), + context.channel() ); - logger.error(message); + result = new RemotingException(message); + logger.error(message, result); } else { if (filters.size() > 0) { for (RequestProcessFilter filter : filters) { diff --git a/spring-support/src/main/java/com/leaf/spring/init/bean/ConsumerFactory.java b/spring-support/src/main/java/com/leaf/spring/init/bean/ConsumerFactory.java index 8e47ca0..340de41 100644 --- a/spring-support/src/main/java/com/leaf/spring/init/bean/ConsumerFactory.java +++ b/spring-support/src/main/java/com/leaf/spring/init/bean/ConsumerFactory.java @@ -3,9 +3,14 @@ import com.leaf.register.api.RegisterType; import com.leaf.rpc.consumer.LeafClient; import com.leaf.rpc.consumer.DefaultLeafClient; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; -public class ConsumerFactory implements InitializingBean { +/** + * + */ +public class ConsumerFactory implements FactoryBean, InitializingBean, DisposableBean { private String id; @@ -39,4 +44,24 @@ public void setRegisterType(String registerType) { throw new IllegalArgumentException("registerType:" + registerType); } } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public LeafClient getObject() throws Exception { + return leafClient; + } + + @Override + public Class getObjectType() { + return LeafClient.class; + } + + @Override + public void destroy() throws Exception { + leafClient.shutdown(); + } } diff --git a/spring-support/src/main/java/com/leaf/spring/init/bean/ProviderFactoryBean.java b/spring-support/src/main/java/com/leaf/spring/init/bean/ProviderFactoryBean.java index 567873e..8907ace 100644 --- a/spring-support/src/main/java/com/leaf/spring/init/bean/ProviderFactoryBean.java +++ b/spring-support/src/main/java/com/leaf/spring/init/bean/ProviderFactoryBean.java @@ -4,9 +4,11 @@ import com.leaf.register.api.RegisterType; import com.leaf.rpc.provider.DefaultLeafServer; import com.leaf.rpc.provider.LeafServer; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; -public class ProviderFactoryBean implements InitializingBean { +public class ProviderFactoryBean implements FactoryBean, InitializingBean, DisposableBean { private Integer port; @@ -72,4 +74,24 @@ public void setRegisterType(String registerType) { throw new IllegalArgumentException("registerType:" + registerType); } } + + @Override + public void destroy() throws Exception { + leafServer.shutdown(); + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public LeafServer getObject() throws Exception { + return leafServer; + } + + @Override + public Class getObjectType() { + return LeafServer.class; + } }