Skip to content

Commit

Permalink
Merge pull request crossoverJie#146 from crossoverJie/refactor-proxy-…
Browse files Browse the repository at this point in the history
…manager

Refactor rpc proxy manager
  • Loading branch information
crossoverJie authored Sep 8, 2024
2 parents ab5f7a8 + e951ade commit 7db4b55
Show file tree
Hide file tree
Showing 40 changed files with 693 additions and 826 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
首先需要安装 `Zookeeper、Redis` 并保证网络通畅。

```shell
docker run --name zookeeper -d -p 2181:2181 zookeeper:
docker run --rm --name zookeeper -d -p 2181:2181 zookeeper:3.9.2
docker run --rm --name redis -d -p 6379:6379 redis:7.4.0
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.crossoverjie.cim.client.service.impl.ClientInfo;
import com.crossoverjie.cim.client.thread.ContextHolder;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import io.netty.bootstrap.Bootstrap;
Expand All @@ -22,15 +22,13 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.stereotype.Component;

import jakarta.annotation.PostConstruct;

/**
* Function:
*
Expand Down Expand Up @@ -79,7 +77,7 @@ public class CIMClient {
public void start() throws Exception {

//登录 + 获取可以使用的服务器 ip+port
CIMServerResVO.ServerInfo cimServer = userLogin();
CIMServerResVO cimServer = userLogin();

//启动客户端
startClient(cimServer);
Expand All @@ -96,7 +94,7 @@ public void start() throws Exception {
* @param cimServer
* @throws Exception
*/
private void startClient(CIMServerResVO.ServerInfo cimServer) {
private void startClient(CIMServerResVO cimServer) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
Expand Down Expand Up @@ -128,17 +126,17 @@ private void startClient(CIMServerResVO.ServerInfo cimServer) {
* @return 路由服务器信息
* @throws Exception
*/
private CIMServerResVO.ServerInfo userLogin() {
private CIMServerResVO userLogin() {
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
CIMServerResVO.ServerInfo cimServer = null;
CIMServerResVO cimServer = null;
try {
cimServer = routeRequest.getCIMServer(loginReqVO);

//保存系统信息
clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
.saveUserInfo(userId, userName);

log.info("cimServer=[{}]", cimServer.toString());
log.info("cimServer=[{}]", cimServer);
} catch (Exception e) {
errorCount++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public CIMRequestProto.CIMReqProtocol heartBeat() {
@Bean
public OkHttpClient okHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.writeTimeout(10,TimeUnit.SECONDS)
builder.connectTimeout(3, TimeUnit.SECONDS)
.readTimeout(3, TimeUnit.SECONDS)
.writeTimeout(3,TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import com.crossoverjie.cim.client.client.CIMClient;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.SendMsgReqVO;
import com.crossoverjie.cim.client.vo.req.StringReqVO;
import com.crossoverjie.cim.client.vo.res.SendMsgResVO;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.res.BaseResponse;
import com.crossoverjie.cim.common.res.NULLBody;
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
Expand Down Expand Up @@ -93,17 +92,15 @@ public BaseResponse<NULLBody> sendProtoBufMsg(@RequestBody GoogleProtocolVO goog

/**
* 群发消息
* @param sendMsgReqVO
* @param chatReqVO
* @return
*/
@Operation(summary = "群发消息")
@RequestMapping(value = "sendGroupMsg",method = RequestMethod.POST)
@ResponseBody
public BaseResponse sendGroupMsg(@RequestBody SendMsgReqVO sendMsgReqVO) throws Exception {
public BaseResponse sendGroupMsg(@RequestBody ChatReqVO chatReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();

GroupReqVO groupReqVO = new GroupReqVO(sendMsgReqVO.getUserId(),sendMsgReqVO.getMsg()) ;
routeRequest.sendGroupMsg(groupReqVO) ;
routeRequest.sendGroupMsg(chatReqVO) ;

// TODO: 2024/5/30 metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.vdurmont.emoji.EmojiParser;

import java.util.Scanner;
import lombok.SneakyThrows;

/**
* Function:
Expand Down Expand Up @@ -37,6 +38,7 @@ public Scan() {
this.echoService = SpringBeanFactory.getBean(EchoService.class) ;
}

@SneakyThrows
@Override
public void run() {
Scanner sc = new Scanner(System.in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ public interface InnerCommand {
* 执行
* @param msg
*/
void process(String msg) ;
void process(String msg) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crossoverjie.cim.client.service;

import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;

/**
* Function:消息处理器
Expand All @@ -23,7 +23,7 @@ public interface MsgHandle {
* @param groupReqVO 群聊消息 其中的 userId 为发送者的 userID
* @throws Exception
*/
void groupChat(GroupReqVO groupReqVO) throws Exception ;
void groupChat(ChatReqVO groupReqVO) throws Exception ;

/**
* 私聊
Expand All @@ -47,13 +47,13 @@ public interface MsgHandle {
* @param msg
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
*/
boolean innerCommand(String msg) ;
boolean innerCommand(String msg) throws Exception;


/**
* 关闭系统
*/
void shutdown() ;
void shutdown() throws Exception;

/**
* 开启 AI 模式
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.crossoverjie.cim.client.service;

import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
import com.crossoverjie.cim.client.vo.res.OnlineUsersResVO;

import java.util.List;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
import java.util.Set;

/**
* Function:
Expand All @@ -19,10 +18,10 @@ public interface RouteRequest {

/**
* 群发消息
* @param groupReqVO 消息
* @param chatReqVO 消息
* @throws Exception
*/
void sendGroupMsg(GroupReqVO groupReqVO) throws Exception;
void sendGroupMsg(ChatReqVO chatReqVO) throws Exception;


/**
Expand All @@ -38,16 +37,16 @@ public interface RouteRequest {
* @param loginReqVO
* @throws Exception
*/
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
CIMServerResVO getCIMServer(LoginReqVO loginReqVO) throws Exception;

/**
* 获取所有在线用户
* @return
* @throws Exception
*/
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
Set<CIMUserInfo> onlineUsers()throws Exception ;


void offLine() ;
void offLine() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
import com.crossoverjie.cim.client.service.MsgHandle;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.RouteRequest;
import com.crossoverjie.cim.client.vo.req.GroupReqVO;
import com.crossoverjie.cim.client.vo.req.P2PReqVO;
import com.crossoverjie.cim.common.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
import jakarta.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* Function:
Expand Down Expand Up @@ -78,7 +77,7 @@ private void normalChat(String msg) {

} else {
//群聊
GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
ChatReqVO groupReqVO = new ChatReqVO(configuration.getUserId(), msg);
try {
groupChat(groupReqVO);
} catch (Exception e) {
Expand All @@ -102,7 +101,7 @@ private void aiChat(String msg) {
}

@Override
public void groupChat(GroupReqVO groupReqVO) throws Exception {
public void groupChat(ChatReqVO groupReqVO) throws Exception {
routeRequest.sendGroupMsg(groupReqVO);
}

Expand All @@ -123,7 +122,7 @@ public boolean checkMsg(String msg) {
}

@Override
public boolean innerCommand(String msg) {
public boolean innerCommand(String msg) throws Exception {

if (msg.startsWith(":")) {

Expand All @@ -143,7 +142,7 @@ public boolean innerCommand(String msg) {
* 关闭系统
*/
@Override
public void shutdown() {
public void shutdown() throws Exception {
log.info("系统关闭中。。。。");
routeRequest.offLine();
msgLogger.stop();
Expand Down
Loading

0 comments on commit 7db4b55

Please sign in to comment.