From 5ebe3d0ae4f0215e2ef8d4b4976bbbfa0ac2b7d9 Mon Sep 17 00:00:00 2001 From: AlexBob <5199840@qq.com> Date: Wed, 3 Jul 2024 14:22:03 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E4=BA=86=20RsocketManager,=20MessageType,=20RsocketCo?= =?UTF-8?q?ntroller,=20MessageOut,=20MessageIn,=20ConnectedClient=20?= =?UTF-8?q?=E7=B1=BB=E4=BB=A5=E5=8F=8A=E7=9B=B8=E5=85=B3=E7=9A=84=20yml=20?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=90=8C=E6=97=B6=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E4=BA=86=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- boot/platform/build.gradle | 1 + .../relational/rsocket/ConnectedClient.java | 23 ++++++++ .../boot/relational/rsocket/MessageIn.java | 30 ++++++++++ .../boot/relational/rsocket/MessageOut.java | 36 +++++++++++ .../boot/relational/rsocket/MessageType.java | 18 ++++++ .../relational/rsocket/RsocketController.java | 40 +++++++++++++ .../relational/rsocket/RsocketManager.java | 59 +++++++++++++++++++ .../src/main/resources/application-local.yml | 2 +- .../src/main/resources/application.yml | 13 ++-- 9 files changed, 215 insertions(+), 7 deletions(-) create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java create mode 100644 boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java diff --git a/boot/platform/build.gradle b/boot/platform/build.gradle index b64c46cb..36521efe 100644 --- a/boot/platform/build.gradle +++ b/boot/platform/build.gradle @@ -57,6 +57,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-actuator") implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("org.springframework.boot:spring-boot-starter-validation") + implementation("org.springframework.boot:spring-boot-starter-rsocket") implementation("org.springframework.session:spring-session-data-redis") implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive") diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java new file mode 100644 index 00000000..8ff17670 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java @@ -0,0 +1,23 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import org.springframework.messaging.rsocket.RSocketRequester; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * @author Alex Bob + */ +@Data +public class ConnectedClient implements Serializable { + + private final RSocketRequester requester; + private final LocalDateTime connectedTime; + + ConnectedClient(RSocketRequester requester) { + this.requester = requester; + this.connectedTime = LocalDateTime.now(); + } + +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java new file mode 100644 index 00000000..732b335e --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java @@ -0,0 +1,30 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author Alex Bob + */ +@Data +@NoArgsConstructor +public class MessageIn implements Serializable { + + private MessageType type; + private String content; + private Object data; + private String from; + private String to; + + public MessageIn(MessageType type, String content, Object data) { + this.type = type; + this.content = content; + this.data = data; + } + + public static MessageIn of(MessageType type, String content, Object data) { + return new MessageIn(type, content, data); + } +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java new file mode 100644 index 00000000..06e06a92 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java @@ -0,0 +1,36 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * @author Alex Bob + */ +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class MessageOut extends MessageIn { + + private Integer status; + + public MessageOut(MessageType type, String content, Object data) { + super(type, content, data); + } + + public static MessageOut of(MessageType type, String content, Object data) { + return new MessageOut(type, content, data); + } + + public MessageOut status(Integer status) { + this.status = status; + return this; + } + + public MessageOut content(String content) { + this.setContent(content); + return this; + } +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java new file mode 100644 index 00000000..8c44d9f7 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java @@ -0,0 +1,18 @@ +package com.platform.boot.relational.rsocket; + +import java.io.Serializable; + +/** + * @author Alex Bob + */ +public enum MessageType implements Serializable { + /** + * 命令 + */ + COMMAND, + + /** + * 未知 + */ + UNKNOWN; +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java new file mode 100644 index 00000000..5f2fce93 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java @@ -0,0 +1,40 @@ +package com.platform.boot.relational.rsocket; + +import lombok.RequiredArgsConstructor; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.annotation.ConnectMapping; +import org.springframework.stereotype.Controller; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +/** + * @author Alex Bob + */ +@Controller +@RequiredArgsConstructor +public class RsocketController { + + private final RsocketManager rsocketManager; + + @ConnectMapping("connect.setup") + public Mono setup(String clientIdentifier, RSocketRequester requester) { + Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null"); + this.rsocketManager.connect(clientIdentifier, requester); + return Mono.empty(); + } + + @MessageMapping("request.stream") + public Flux stream(String clientIdentifier, RSocketRequester requester) { + return this.rsocketManager.radars(clientIdentifier, requester); + } + + @MessageMapping("request.sender") + public Mono sender(Mono messageInMono) { + return messageInMono.doOnNext(this.rsocketManager::send) + .map(in -> MessageOut.of(in.getType(), in.getContent(), in.getData()).status(200)); + } + +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java new file mode 100644 index 00000000..1bc724ff --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java @@ -0,0 +1,59 @@ +package com.platform.boot.relational.rsocket; + +import lombok.extern.log4j.Log4j2; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Alex Bob + */ +@Log4j2 +@Service +public class RsocketManager { + + private final Sinks.Many replaySink = Sinks.many().replay().limit(Duration.ofMinutes(5)); + private final ConcurrentHashMap clients = new ConcurrentHashMap<>(200); + + public void connect(String clientIdentifier, RSocketRequester requester) { + log.debug("Connect [{}] RSocketRequester.", clientIdentifier); + Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null") + .onClose() + .doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester))) + .doFinally(sig -> { + log.debug("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString()); + this.clients.remove(clientIdentifier); + }).subscribe(); + } + + public Flux radars(String clientIdentifier, RSocketRequester requester) { + log.debug("Radars [{}] RSocketRequester.", clientIdentifier); + this.clients.put(clientIdentifier, new ConnectedClient(requester)); + return this.replaySink.asFlux(); + } + + public void send(MessageIn message) { + MessageOut messageOut = MessageOut.of(message.getType(), message.getContent(), message.getData()); + try { + this.replaySink.tryEmitNext(messageOut.status(200)); + } catch (Exception e) { + log.error("send message error : {}", e.getMessage()); + this.replaySink.tryEmitNext(messageOut.status(500).content(e.getMessage())); + } + } + + public void taskTest() { + if (this.clients.isEmpty() || !this.clients.containsKey("CommandClient")) { + return; + } + ConnectedClient connectedClient = this.clients.get("CommandClient"); + connectedClient.getRequester().route("user.message") + .data(MessageOut.of(MessageType.COMMAND, "test", "test").status(200)) + .send().subscribe(); + } +} diff --git a/boot/platform/src/main/resources/application-local.yml b/boot/platform/src/main/resources/application-local.yml index 9f110151..17b4af5a 100644 --- a/boot/platform/src/main/resources/application-local.yml +++ b/boot/platform/src/main/resources/application-local.yml @@ -6,7 +6,7 @@ logging: org.springframework.r2dbc: debug io.r2dbc.postgresql.PARAM: debug -server.port: 8081 +server.port: 8080 spring: application.name: plate diff --git a/boot/platform/src/main/resources/application.yml b/boot/platform/src/main/resources/application.yml index b0c596f6..5479edc6 100644 --- a/boot/platform/src/main/resources/application.yml +++ b/boot/platform/src/main/resources/application.yml @@ -7,6 +7,9 @@ spring: threads.virtual.enabled: true main.keep-alive: true application.name: plate + rsocket.server: + mapping-path: /rsocket + transport: websocket webflux.format: time: "HH:mm:ss" date-time: "yyyy-MM-dd HH:mm:ss" @@ -18,9 +21,7 @@ spring: codec: max-in-memory-size: 10MB log-request-details: false - cache: - type: redis - redis: - key-prefix: "plate:caches:" - time-to-live: 300s - enable-statistics: true \ No newline at end of file + cache.redis: + key-prefix: "plate:caches:" + time-to-live: 5M + enable-statistics: true \ No newline at end of file