From 0bee27bf6a687329d8b8837d7e269e2656d69a94 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Tue, 27 Aug 2024 16:07:52 +0800 Subject: [PATCH] add gateway router --- cpp/ppc-framework/front/FrontConfig.h | 95 ++++++++ cpp/ppc-framework/front/IFront.h | 106 +++++++++ cpp/ppc-framework/gateway/GatewayProtocol.h | 12 + cpp/ppc-framework/gateway/IGateway.h | 70 ++++++ cpp/ppc-framework/protocol/INodeInfo.h | 65 ++++++ cpp/ppc-framework/protocol/Message.h | 42 +++- cpp/ppc-framework/protocol/MessagePayload.h | 11 +- cpp/ppc-framework/protocol/RouteType.h | 54 +++++ .../ppc-gateway/gateway/GatewayImpl.cpp | 195 +++++++++++++++++ .../ppc-gateway/gateway/GatewayImpl.h | 88 ++++++++ .../gateway/SendMessageWithRetry.cpp | 114 ++++++++++ .../gateway/SendMessageWithRetry.h | 62 ++++++ .../gateway/cache/MessageCache.cpp | 94 ++++++++ .../ppc-gateway/gateway/cache/MessageCache.h | 70 ++++++ .../gateway/router/GatewayNodeInfo.h | 74 +++++++ .../gateway/router/GatewayNodeInfoImpl.cpp | 206 ++++++++++++++++++ .../gateway/router/GatewayNodeInfoImpl.h | 100 +++++++++ .../gateway/router/LocalRouter.cpp | 122 +++++++++++ .../ppc-gateway/gateway/router/LocalRouter.h | 71 ++++++ .../gateway/router/PeerRouterTable.cpp | 138 ++++++++++++ .../gateway/router/PeerRouterTable.h | 57 +++++ cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp | 24 ++ cpp/ppc-gateway/ppc-gateway/p2p/Service.h | 10 +- .../ppc-gateway/p2p/router/RouterManager.h | 1 + cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp | 63 +++++- cpp/ppc-protocol/src/v1/MessageHeaderImpl.h | 3 + cpp/ppc-protocol/src/v1/MessageImpl.h | 28 +++ .../src/v1/MessagePayloadImpl.cpp | 19 +- cpp/ppc-protocol/src/v1/MessagePayloadImpl.h | 1 + .../ppc-tars-protocol/impl/NodeInfoImpl.cpp | 40 ++++ .../ppc-tars-protocol/impl/NodeInfoImpl.h | 88 ++++++++ .../ppc-tars-protocol/tars/NodeInfo.tars | 15 ++ 32 files changed, 2102 insertions(+), 36 deletions(-) create mode 100644 cpp/ppc-framework/front/FrontConfig.h create mode 100644 cpp/ppc-framework/front/IFront.h create mode 100644 cpp/ppc-framework/gateway/IGateway.h create mode 100644 cpp/ppc-framework/protocol/INodeInfo.h create mode 100644 cpp/ppc-framework/protocol/RouteType.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp create mode 100644 cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h create mode 100644 cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.cpp create mode 100644 cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.h create mode 100644 cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars diff --git a/cpp/ppc-framework/front/FrontConfig.h b/cpp/ppc-framework/front/FrontConfig.h new file mode 100644 index 00000000..e9775c79 --- /dev/null +++ b/cpp/ppc-framework/front/FrontConfig.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file FrontConfig.h + * @author: yujiechen + * @date 2024-08-22 + */ + +#pragma once +#include +#include +#include + +namespace ppc::front +{ +/** + * @brief the gateway endpoint + * + */ +class GatewayEndPoint +{ +public: + GatewayEndPoint() = default; + GatewayEndPoint(std::string const& host, uint16_t port) : m_host(std::move(host)), m_port(port) + {} + virtual ~GatewayEndPoint() = default; + + virtual std::string const& host() const { return m_host; } + uint16_t port() const { return m_port; } + + void setHost(std::string host) { m_host = std::move(host); } + void setPort(uint16_t port) { m_port = port; } + +private: + // the host + std::string m_host; + // the port + uint16_t m_port; +}; + +// Note: swig explosed interface +class FrontConfig +{ +public: + using Ptr = std::shared_ptr; + FrontConfig(int threadPoolSize, std::string agencyID) + : m_threadPoolSize(threadPoolSize), m_agencyID(std::move(agencyID)) + {} + virtual ~FrontConfig() = default; + + virtual int threadPoolSize() const { return m_threadPoolSize; } + virtual std::string const agencyID() const { return m_agencyID; } + virtual std::vector const& gatewayInfo() const { return m_gatewayInfo; } + virtual void setGatewayInfo(std::vector gatewayInfo) + { + m_gatewayInfo = std::move(gatewayInfo); + } + + virtual void appendGatewayInfo(GatewayEndPoint&& endpoint) + { + // TODO:check the endpoint + m_gatewayInfo.push_back(endpoint); + } + +private: + int m_threadPoolSize; + std::string m_agencyID; + std::vector m_gatewayInfo; +}; + +class FrontConfigBuilder +{ +public: + using Ptr = std::shared_ptr; + FrontConfigBuilder() = default; + virtual ~FrontConfigBuilder() = default; + + FrontConfig::Ptr build(int threadPoolSize, std::string agencyID) + { + return std::make_shared(threadPoolSize, agencyID); + } +}; +} // namespace ppc::front \ No newline at end of file diff --git a/cpp/ppc-framework/front/IFront.h b/cpp/ppc-framework/front/IFront.h new file mode 100644 index 00000000..be850b65 --- /dev/null +++ b/cpp/ppc-framework/front/IFront.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file IFront.h + * @author: yujiechen + * @date 2024-08-22 + */ +#pragma once +#include "FrontConfig.h" +#include "ppc-framework/protocol/Message.h" +#include "ppc-framework/protocol/RouteType.h" +#include + +namespace ppc::front +{ +class IFront +{ +public: + using Ptr = std::shared_ptr; + + IFront() = default; + virtual ~IFront() = default; + + /** + * @brief start the IFront + * + * @param front the IFront to start + */ + virtual void start() const = 0; + /** + * @brief stop the IFront + * + * @param front the IFront to stop + */ + virtual void stop() const = 0; + + /** + * + * @param front the front object + * @param topic the topic + * @param callback the callback called when receive specified topic + */ + virtual void registerTopicHandler( + std::string const& topic, ppc::protocol::MessageCallback callback) = 0; + + /** + * @brief async send message + * + * @param routeType the route type + * @param topic the topic + * @param dstInst the dst agency(must set when 'route by agency' and 'route by + * component') + * @param dstNodeID the dst nodeID(must set when 'route by nodeID') + * @param componentType the componentType(must set when 'route by component') + * @param payload the payload to send + * @param seq the message seq + * @param timeout timeout + * @param callback callback + */ + virtual void asyncSendMessage(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload, int seq, long timeout, ppc::protocol::MessageCallback callback) = 0; + + // the sync interface for async_send_message + virtual ppc::protocol::Message::Ptr push(ppc::protocol::RouteType routeType, std::string topic, + std::string dstInst, std::string dstNodeID, std::string const& componentType, + bcos::bytes&& payload, int seq, long timeout) = 0; + + /** + * @brief: receive message from gateway, call by gateway + * @param _message: received ppc message + * @return void + */ + virtual void onReceiveMessage( + ppc::protocol::Message::Ptr const& _msg, ppc::protocol::ReceiveMsgFunc _callback) = 0; +}; + +class IFrontBuilder +{ +public: + using Ptr = std::shared_ptr; + IFrontBuilder() = default; + virtual ~IFrontBuilder() = default; + + /** + * @brief create the Front using specified config + * + * @param config the config used to build the Front + * @return IFront::Ptr he created Front + */ + virtual IFront::Ptr build(ppc::front::FrontConfig::Ptr config) const = 0; + virtual IFront::Ptr buildClient(std::string endPoint) const = 0; +}; +} // namespace ppc::front \ No newline at end of file diff --git a/cpp/ppc-framework/gateway/GatewayProtocol.h b/cpp/ppc-framework/gateway/GatewayProtocol.h index c3c0a94d..b91137d6 100644 --- a/cpp/ppc-framework/gateway/GatewayProtocol.h +++ b/cpp/ppc-framework/gateway/GatewayProtocol.h @@ -25,6 +25,7 @@ namespace ppc::gateway enum class GatewayPacketType : uint16_t { P2PMessage = 0x00, + BroadcastMessage = 0x01, RouterTableSyncSeq = 0x10, RouterTableResponse = 0x11, RouterTableRequest = 0x12 @@ -33,5 +34,16 @@ enum class GatewayPacketType : uint16_t enum class GatewayMsgExtFlag : uint16_t { Response = 0x1, + RouteByNodeID = 0x2, + RouteByAgency = 0x4, + RouteByComponent = 0x8, + RouteByTopic = 0x10 +}; + +enum CommonError : int32_t +{ + SUCCESS = 0, + TIMEOUT = 1000, // for gateway + NotFoundFrontServiceDispatchMsg = 1001 }; } // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-framework/gateway/IGateway.h b/cpp/ppc-framework/gateway/IGateway.h new file mode 100644 index 00000000..9af59dba --- /dev/null +++ b/cpp/ppc-framework/gateway/IGateway.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file IGateway.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "../protocol/INodeInfo.h" +#include "../protocol/Message.h" +#include "../protocol/RouteType.h" +#include + + +namespace ppc::gateway +{ +using ErrorCallbackFunc = std::function; +/** + * @brief: A list of interfaces provided by the gateway which are called by the front service. + */ +class IGateway +{ +public: + using Ptr = std::shared_ptr; + IGateway() = default; + virtual ~IGateway() {} + + /** + * @brief: start/stop service + */ + virtual void start() = 0; + virtual void stop() = 0; + + /** + * @brief send message to gateway + * + * @param routeType the route type + * @param topic the topic + * @param dstInst the dst agency(must set when 'route by agency' and 'route by + * component') + * @param dstNodeID the dst nodeID(must set when 'route by nodeID') + * @param componentType the componentType(must set when 'route by component') + * @param payload the payload to send + * @param seq the message seq + * @param timeout timeout + * @param callback callback + */ + virtual void asyncSendMessage(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) = 0; + + virtual void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo); + virtual void unRegisterNodeInfo(bcos::bytesConstRef nodeID); + virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic); + virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic); +}; + +} // namespace ppc::gateway diff --git a/cpp/ppc-framework/protocol/INodeInfo.h b/cpp/ppc-framework/protocol/INodeInfo.h new file mode 100644 index 00000000..859d6ac6 --- /dev/null +++ b/cpp/ppc-framework/protocol/INodeInfo.h @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file INodeInfo.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/front/IFront.h" +#include + +namespace ppc::protocol +{ +// the node information +class INodeInfo +{ +public: + using Ptr = std::shared_ptr; + INodeInfo() = default; + virtual ~INodeInfo() = default; + + virtual std::string const& endPoint() const = 0; + virtual bcos::bytesConstRef nodeID() const = 0; + + // components + virtual void setComponents(std::vector const& components) = 0; + virtual std::set const& components() const = 0; + + virtual void encode(bcos::bytes& data) const = 0; + virtual void decode(bcos::bytesConstRef data) = 0; + + virtual void setFront(ppc::front::IFront::Ptr&& front) = 0; + virtual ppc::front::IFront::Ptr const& getFront() const = 0; + + virtual bool equal(INodeInfo::Ptr const& info) + { + return (nodeID() == info->nodeID()) && (components() == info->components()); + } +}; +class INodeInfoFactory +{ +public: + using Ptr = std::shared_ptr; + INodeInfoFactory(bcos::bytes nodeID) : m_nodeID(std::move(nodeID)) {} + virtual ~INodeInfoFactory() = default; + + virtual INodeInfo::Ptr build() = 0; + virtual INodeInfo::Ptr build(std::string const& endPoint) = 0; + +protected: + bcos::bytes m_nodeID; +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/Message.h b/cpp/ppc-framework/protocol/Message.h index 40c5f614..b318c1c0 100644 --- a/cpp/ppc-framework/protocol/Message.h +++ b/cpp/ppc-framework/protocol/Message.h @@ -19,12 +19,13 @@ */ #pragma once #include "../Common.h" +#include "RouteType.h" #include #include +#include #include #include #include - namespace ppc::protocol { class MessageOptionalHeader @@ -38,8 +39,8 @@ class MessageOptionalHeader virtual int64_t decode(bcos::bytesConstRef data, uint64_t const _offset) = 0; // the componentType - virtual uint8_t componentType() const { return m_componentType; } - virtual void setComponentType(uint8_t componentType) { m_componentType = componentType; } + virtual std::string componentType() const { return m_componentType; } + virtual void setComponentType(std::string componentType) { m_componentType = componentType; } // the source nodeID that send the message virtual bcos::bytes const& srcNode() const { return m_srcNode; } @@ -50,18 +51,24 @@ class MessageOptionalHeader virtual void setDstNode(bcos::bytes const& dstNode) { m_dstNode = dstNode; } // the target agency that need receive the message - virtual bcos::bytes const& dstInst() const { return m_dstInst; } - virtual void setDstInst(bcos::bytes const& dstInst) { m_dstInst = dstInst; } + virtual std::string const& dstInst() const { return m_dstInst; } + virtual void setDstInst(std::string const& dstInst) { m_dstInst = dstInst; } + + // the topic + virtual std::string const& topic() const { return m_topic; } + virtual void setTopic(std::string&& topic) { m_topic = std::move(topic); } + virtual void setTopic(std::string const& topic) { m_topic = topic; } protected: + std::string m_topic; // the componentType - uint8_t m_componentType; + std::string m_componentType; // the source nodeID that send the message bcos::bytes m_srcNode; // the target nodeID that should receive the message bcos::bytes m_dstNode; // the target agency that need receive the message - bcos::bytes m_dstInst; + std::string m_dstInst; }; class MessageHeader @@ -117,9 +124,12 @@ class MessageHeader // Note: only for log std::string_view dstP2PNodeIDView() const { return printP2PIDElegantly(m_dstGwNode); } + virtual uint16_t routeType() const = 0; + virtual void setRouteType(ppc::protocol::RouteType type) = 0; + protected: // the msg version, used to support compatibility - uint8_t m_version; + uint8_t m_version = 0; // the traceID std::string m_traceID; // the srcGwNode @@ -129,7 +139,7 @@ class MessageHeader // the packetType uint16_t m_packetType; // the ttl - int16_t m_ttl; + int16_t m_ttl = 0; // the ext(contains the router policy and response flag) uint16_t m_ext; //// the optional field(used to route between components and nodes) @@ -187,14 +197,18 @@ class MessageHeaderBuilder virtual MessageHeader::Ptr build() = 0; }; -class MessageBuilder +class MessageBuilder : public bcos::boostssl::MessageFaceFactory { public: + using Ptr = std::shared_ptr; MessageBuilder() = default; - virtual ~MessageBuilder() = default; + ~MessageBuilder() override = default; virtual Message::Ptr build() = 0; virtual Message::Ptr build(bcos::bytesConstRef buffer) = 0; + virtual Message::Ptr build(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload) = 0; }; inline std::string printMessage(Message::Ptr const& _msg) @@ -218,4 +232,10 @@ inline std::string printWsMessage(bcos::boostssl::MessageFace::Ptr const& _msg) return stringstream.str(); } +// function to send response +using SendResponseFunction = std::function; +using ReceiveMsgFunc = std::function; +using MessageCallback = std::function; + } // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-framework/protocol/MessagePayload.h b/cpp/ppc-framework/protocol/MessagePayload.h index 4329aae2..14304a16 100644 --- a/cpp/ppc-framework/protocol/MessagePayload.h +++ b/cpp/ppc-framework/protocol/MessagePayload.h @@ -36,22 +36,21 @@ class MessagePayload // the version virtual uint8_t version() const { return m_version; } virtual void setVersion(uint8_t version) { m_version = version; } - // the topic - virtual std::string const& topic() const { return m_topic; } - virtual void setTopic(std::string&& topic) { m_topic = std::move(topic); } - virtual void setTopic(std::string const& topic) { m_topic = topic; } // data virtual bcos::bytes const& data() const { return m_data; } virtual void setData(bcos::bytes&& data) { m_data = std::move(data); } virtual void setData(bcos::bytes const& data) { m_data = data; } + // the seq + virtual uint16_t seq() const { return m_seq; } + virtual void setSeq(uint16_t seq) { m_seq = seq; } // the length virtual int64_t length() const { return m_length; } protected: // the front payload version, used to support compatibility uint8_t m_version; - // the topic - std::string m_topic; + // the seq + uint16_t m_seq; bcos::bytes m_data; int64_t mutable m_length; }; diff --git a/cpp/ppc-framework/protocol/RouteType.h b/cpp/ppc-framework/protocol/RouteType.h new file mode 100644 index 00000000..1e3a715e --- /dev/null +++ b/cpp/ppc-framework/protocol/RouteType.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file RouteType.h + * @author: yujiechen + * @date 2024-08-22 + */ + +#pragma once +#include +#include + +namespace ppc::protocol +{ +enum class RouteType : uint8_t +{ + ROUTE_THROUGH_NODEID = 0x00, + ROUTE_THROUGH_COMPONENT = 0x01, + ROUTE_THROUGH_AGENCY = 0x02, + ROUTE_THROUGH_TOPIC = 0x03 +}; + +inline std::ostream& operator<<(std::ostream& _out, RouteType const& _type) +{ + switch (_type) + { + case RouteType::ROUTE_THROUGH_NODEID: + _out << "RouteThroughNodeID"; + break; + case RouteType::ROUTE_THROUGH_COMPONENT: + _out << "RouteThroughComponent"; + break; + case RouteType::ROUTE_THROUGH_AGENCY: + _out << "RouteThroughAgency"; + break; + default: + _out << "UnknownRouteType"; + break; + } + return _out; +} +} // namespace ppc::front \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp new file mode 100644 index 00000000..7b4240c9 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.cpp @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayImpl.cpp + * @author: yujiechen + * @date 2024-08-26 + */ +#include "GatewayImpl.h" +#include "SendMessageWithRetry.h" +#include "cache/MessageCache.h" +#include "ppc-framework/gateway/GatewayProtocol.h" +#include "router/GatewayNodeInfoImpl.h" + +using namespace bcos; +using namespace ppc; +using namespace ppc::protocol; +using namespace ppc::gateway; +using namespace bcos::boostssl; +using namespace bcos::boostssl::ws; + +GatewayImpl::GatewayImpl(Service::Ptr const& service, + ppc::front::IFrontBuilder::Ptr const& frontBuilder, + std::shared_ptr ioService, std::string const& agency) + : m_service(service), + m_msgBuilder( + std::dynamic_pointer_cast(service->messageFactory())), + m_frontBuilder(frontBuilder), + m_agency(agency), + m_p2pRouterManager(std::make_shared(service)), + m_gatewayInfoFactory(std::make_shared(service->nodeID(), agency)), + m_localRouter(std::make_shared( + m_gatewayInfoFactory, m_frontBuilder, std::make_shared(ioService))), + m_peerRouter(std::make_shared(m_service)) +{ + m_service->registerMsgHandler((uint16_t)GatewayPacketType::P2PMessage, + boost::bind(&GatewayImpl::onReceiveP2PMessage, this, boost::placeholders::_1, + boost::placeholders::_2)); + + m_service->registerMsgHandler((uint16_t)GatewayPacketType::BroadcastMessage, + boost::bind(&GatewayImpl::onReceiveBroadcastMessage, this, boost::placeholders::_1, + boost::placeholders::_2)); +} + +void GatewayImpl::start() +{ + if (m_running) + { + GATEWAY_LOG(INFO) << LOG_DESC("Gateway has already been started"); + return; + } + m_running = true; + m_service->start(); + m_p2pRouterManager->start(); + GATEWAY_LOG(INFO) << LOG_DESC("Start gateway success"); +} + +void GatewayImpl::stop() +{ + if (!m_running) + { + GATEWAY_LOG(INFO) << LOG_DESC("Gateway has already been stopped"); + return; + } + m_running = false; + m_service->stop(); + m_p2pRouterManager->stop(); + GATEWAY_LOG(INFO) << LOG_DESC("Stop gateway success"); +} + +void GatewayImpl::asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, + std::string const& topic, std::string const& dstInst, std::string const& componentType, + bcos::bytes&& payload) +{ + // dispatcher to all the local front + auto p2pMessage = m_msgBuilder->build( + routeType, topic, dstInst, bcos::bytes(), componentType, std::move(payload)); + p2pMessage->setPacketType((uint16_t)GatewayPacketType::BroadcastMessage); + m_localRouter->dispatcherMessage(p2pMessage, nullptr); + // broadcast message to all peers + m_peerRouter->asyncBroadcastMessage(p2pMessage); +} + + +void GatewayImpl::asyncSendMessage(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload, long timeout, ReceiveMsgFunc callback) +{ + // check the localRouter + auto p2pMessage = m_msgBuilder->build( + routeType, topic, dstInst, dstNodeID, componentType, std::move(payload)); + p2pMessage->setPacketType((uint16_t)GatewayPacketType::P2PMessage); + auto nodeList = m_localRouter->chooseReceiver(p2pMessage); + // case send to the same agency + if (!nodeList.empty()) + { + GATEWAY_LOG(TRACE) << LOG_DESC("hit the local router, dispatch message directly") + << LOG_KV("msg", printMessage(p2pMessage)); + m_localRouter->dispatcherMessage(p2pMessage, callback); + return; + } + // try to find the dstP2PNode + auto selectedP2PNodes = m_peerRouter->selectRouter(routeType, p2pMessage); + if (selectedP2PNodes.empty()) + { + GATEWAY_LOG(INFO) << LOG_DESC("can't find the gateway to send the message") + << LOG_KV("detail", printMessage(p2pMessage)); + if (callback) + { + callback(std::make_shared( + -1, "can't find the gateway to send the message, traceID: " + + p2pMessage->header()->traceID())); + } + return; + } + // send the message to gateway + auto retry = std::make_shared( + m_service, std::move(selectedP2PNodes), std::move(p2pMessage), callback, timeout); + retry->trySendMessage(); +} + +void GatewayImpl::onReceiveP2PMessage(MessageFace::Ptr msg, WsSession::Ptr session) +{ + // try to dispatcher to the front + auto p2pMessage = std::dynamic_pointer_cast(msg); + auto self = std::weak_ptr(shared_from_this()); + auto callback = [p2pMessage, session, self](Error::Ptr error) { + auto gateway = self.lock(); + if (!gateway) + { + return; + } + std::string errorCode = std::to_string(CommonError::SUCCESS); + if (error && error->errorCode() != 0) + { + GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveP2PMessage: dispatcherMessage failed") + << LOG_KV("code", error->errorCode()) + << LOG_KV("msg", error->errorMessage()); + errorCode = std::to_string(error->errorCode()); + } + + std::shared_ptr payload = + std::make_shared(errorCode.begin(), errorCode.end()); + gateway->m_service->sendRespMessageBySession(session, p2pMessage, std::move(payload)); + }; + + auto ret = m_localRouter->dispatcherMessage(p2pMessage, callback); + if (!ret) + { + GATEWAY_LOG(ERROR) + << LOG_DESC( + "onReceiveP2PMessage failed to find the node that can dispatch this message") + << LOG_KV("msg", printMessage(p2pMessage)); + callback(std::make_shared(CommonError::NotFoundFrontServiceDispatchMsg, + "unable to find the ndoe to dispatcher this message, message detail: " + + printMessage(p2pMessage))); + } +} + +void GatewayImpl::onReceiveBroadcastMessage(MessageFace::Ptr msg, WsSession::Ptr) +{ + auto p2pMessage = std::dynamic_pointer_cast(msg); + GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveBroadcastMessage, dispatcher") + << LOG_KV("msg", printMessage(p2pMessage)); + m_localRouter->dispatcherMessage(p2pMessage, nullptr); +} + +void GatewayImpl::registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) +{ + m_localRouter->registerNodeInfo(nodeInfo); +} + +void GatewayImpl::unRegisterNodeInfo(bcos::bytesConstRef nodeID) +{ + m_localRouter->unRegisterNode(nodeID.toBytes()); +} +void GatewayImpl::registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) +{ + m_localRouter->registerTopic(nodeID, topic); +} +void GatewayImpl::unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) +{ + m_localRouter->unRegisterTopic(nodeID, topic); +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h new file mode 100644 index 00000000..b71bd5b1 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/GatewayImpl.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayImpl.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/gateway/IGateway.h" +#include "ppc-gateway/gateway/router/GatewayNodeInfo.h" +#include "ppc-gateway/p2p/Service.h" +#include "ppc-gateway/p2p/router/RouterManager.h" +#include "router/LocalRouter.h" +#include "router/PeerRouterTable.h" + +namespace ppc::gateway +{ +class GatewayImpl : public IGateway, public std::enable_shared_from_this +{ +public: + using Ptr = std::shared_ptr; + GatewayImpl(Service::Ptr const& service, ppc::front::IFrontBuilder::Ptr const& frontBuilder, + std::shared_ptr ioService, std::string const& agency); + ~GatewayImpl() override = default; + + void start() override; + void stop() override; + + /** + * @brief send message to gateway + * + * @param routeType the route type + * @param topic the topic + * @param dstInst the dst agency(must set when 'route by agency' and 'route by + * component') + * @param dstNodeID the dst nodeID(must set when 'route by nodeID') + * @param componentType the componentType(must set when 'route by component') + * @param payload the payload to send + * @param seq the message seq + * @param timeout timeout + * @param callback callback + */ + void asyncSendMessage(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload, long timeout, ppc::protocol::ReceiveMsgFunc callback) override; + + void asyncSendbroadcastMessage(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, std::string const& componentType, bcos::bytes&& payload); + + + void registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override; + void unRegisterNodeInfo(bcos::bytesConstRef nodeID) override; + void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; + void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic) override; + +protected: + virtual void onReceiveP2PMessage( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + virtual void onReceiveBroadcastMessage( + bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::WsSession::Ptr session); + +private: + bool m_running = false; + Service::Ptr m_service; + ppc::protocol::MessageBuilder::Ptr m_msgBuilder; + + ppc::front::IFrontBuilder::Ptr m_frontBuilder; + std::string m_agency; + + RouterManager::Ptr m_p2pRouterManager; + + GatewayNodeInfoFactory::Ptr m_gatewayInfoFactory; + LocalRouter::Ptr m_localRouter; + PeerRouterTable::Ptr m_peerRouter; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp new file mode 100644 index 00000000..19093b32 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file SendMessageWithRetry.h + * @author: yujiechen + * @date 2024-08-26 + */ +#include "SendMessageWithRetry.h" +#include "ppc-framework/gateway/GatewayProtocol.h" +#include "ppc-gateway/Common.h" + +using namespace bcos; +using namespace ppc; +using namespace bcos::boostssl; +using namespace bcos::boostssl::ws; +using namespace ppc::gateway; +using namespace ppc::protocol; + +// random choose one p2pID to send message +GatewayNodeInfo::Ptr SendMessageWithRetry::chooseP2pNode() +{ + RecursiveGuard lock(x_mutex); + if (!m_dstNodeList.empty()) + { + auto selectedNode = m_dstNodeList.begin(); + m_dstNodeList.erase(m_dstNodeList.begin()); + return *selectedNode; + } + return nullptr; +} + +// send the message with retry +void SendMessageWithRetry::trySendMessage() +{ + if (m_dstNodeList.empty()) + { + GATEWAY_LOG(DEBUG) << LOG_DESC("Gateway::SendMessageWithRetry") + << LOG_DESC("unable to send the message") << printMessage(m_p2pMessage); + if (m_respFunc) + { + m_respFunc(std::make_shared( + -1, "can't find the gateway to send the message, detail: " + + printMessage(m_p2pMessage))); + } + return; + } + auto choosedNode = chooseP2pNode(); + auto self = shared_from_this(); + auto startT = utcTime(); + auto callback = [self, startT]( + bcos::Error::Ptr error, MessageFace::Ptr msg, WsSession::Ptr session) { + std::ignore = session; + if (error && error->errorCode() != 0) + { + GATEWAY_LOG(DEBUG) << LOG_BADGE("trySendMessage") + << LOG_DESC("send message failed, retry again") + << LOG_KV("msg", printMessage(self->m_p2pMessage)) + << LOG_KV("code", error->errorCode()) + << LOG_KV("msg", error->errorMessage()) + << LOG_KV("timeCost", (utcTime() - startT)); + // try again + self->trySendMessage(); + return; + } + // check the errorCode + try + { + auto payload = msg->payload(); + int respCode = boost::lexical_cast(std::string(payload->begin(), payload->end())); + // the peer gateway not response not ok ,it means the gateway not dispatch the + // message successfully,find another gateway and try again + if (respCode != CommonError::SUCCESS) + { + GATEWAY_LOG(DEBUG) + << LOG_BADGE("trySendMessage again") << LOG_KV("respCode", respCode) + << LOG_KV("msg", printMessage(self->m_p2pMessage)); + // try again + self->trySendMessage(); + return; + } + GATEWAY_LOG(TRACE) << LOG_BADGE("asyncSendMessageByNodeID success") + << LOG_KV("msg", printMessage(self->m_p2pMessage)); + // send message successfully + if (self->m_respFunc) + { + self->m_respFunc(nullptr); + } + return; + } + catch (const std::exception& e) + { + GATEWAY_LOG(ERROR) << LOG_BADGE("trySendMessage and receive response exception") + << LOG_KV("msg", printMessage(self->m_p2pMessage)) + << LOG_KV("error", boost::diagnostic_information(e)); + + self->trySendMessage(); + } + }; + // Note: make 10s configuarable here + m_service->asyncSendMessageByNodeID( + choosedNode->p2pNodeID(), m_p2pMessage, bcos::boostssl::ws::Options(m_timeout), callback); +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.h b/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.h new file mode 100644 index 00000000..7df3b6f3 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/SendMessageWithRetry.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file SendMessageWithRetry.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/protocol/Message.h" +#include "ppc-gateway/gateway/router/GatewayNodeInfo.h" +#include "ppc-gateway/p2p/Service.h" +#include +#include + +namespace ppc::gateway +{ +class SendMessageWithRetry : public std::enable_shared_from_this +{ +public: + using Ptr = std::shared_ptr; + SendMessageWithRetry(Service::Ptr const& service, GatewayNodeInfos&& dstNodeList, + ppc::protocol::Message::Ptr&& p2pMessage, ppc::protocol::ReceiveMsgFunc respFunc, + long timeout) + : m_service(service), + m_dstNodeList(std::move(dstNodeList)), + m_p2pMessage(std::move(p2pMessage)), + m_respFunc(std::move(respFunc)), + m_timeout(timeout) + { + if (m_timeout < 0) + { + m_timeout = 10000; + } + } + // random choose one p2pID to send message + GatewayNodeInfo::Ptr chooseP2pNode(); + + // send the message with retry + void trySendMessage(); + +private: + // mutex for p2pIDs + mutable bcos::RecursiveMutex x_mutex; + GatewayNodeInfos m_dstNodeList; + ppc::protocol::Message::Ptr m_p2pMessage; + Service::Ptr m_service; + ppc::protocol::ReceiveMsgFunc m_respFunc; + long m_timeout; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp new file mode 100644 index 00000000..09c8f8e2 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.cpp @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageCache.cpp + * @author: yujiechen + * @date 2024-08-26 + */ + +#include "MessageCache.h" +#include "ppc-gateway/Common.h" + +using namespace ppc; +using namespace bcos; +using namespace ppc::protocol; +using namespace ppc::gateway; + +void MessageCache::insertCache( + std::string const& topic, ppc::protocol::Message::Ptr const& msg, ReceiveMsgFunc callback) +{ + // hold the message + GATEWAY_LOG(DEBUG) << LOG_BADGE("MessageCache: insertCache") << LOG_KV("topic", topic); + bcos::ReadGuard l(x_msgCache); + auto it = m_msgCache.find(topic); + if (it == m_msgCache.end()) + { + it->second->messages.emplace_back(MessageInfo{msg, callback}); + return; + } + // insert new holding-queue + auto queue = std::make_shared(); + queue->messages.emplace_back(MessageInfo{msg, callback}); + // create timer to handle timeout + queue->timer = std::make_shared( + *m_ioService, boost::posix_time::minutes(m_holdingMessageMinutes)); + queue->timer->async_wait([self = weak_from_this(), topic](boost::system::error_code _error) { + if (!_error) + { + auto cache = self.lock(); + if (cache) + { + // remove timeout message + auto msgQueue = cache->pop(topic); + if (!msgQueue) + { + return; + } + msgQueue->timer->cancel(); + cache->onTimeout(msgQueue); + } + } + }); + m_msgCache[topic] = queue; +} + +HoldingMessageQueue::Ptr MessageCache::pop(const std::string& topic) +{ + WriteGuard lock(x_msgCache); + auto it = m_msgCache.find(topic); + if (it == m_msgCache.end()) + { + return nullptr; + } + HoldingMessageQueue::Ptr ret = it->second; + m_msgCache.erase(topic); + return ret; +} + +void MessageCache::onTimeout(HoldingMessageQueue::Ptr const& queue) +{ + if (!queue) + { + return; + } + // dispatch the ack + for (auto& msgInfo : queue->messages) + { + if (msgInfo.callback) + { + msgInfo.callback(std::make_shared(-1, SEND_MESSAGE_TO_FRONT_TIMEOUT)); + } + } +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.h b/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.h new file mode 100644 index 00000000..65502edb --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/cache/MessageCache.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file MessageCache.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/front/IFront.h" +#include "ppc-framework/protocol/Message.h" +#include "tbb/concurrent_vector.h" +#include +#include +#include + +namespace ppc::gateway +{ +struct MessageInfo +{ + ppc::protocol::Message::Ptr msg; + ppc::protocol::ReceiveMsgFunc callback; +}; +struct HoldingMessageQueue +{ + using Ptr = std::shared_ptr; + HoldingMessageQueue() = default; + + tbb::concurrent_vector messages; + std::shared_ptr timer; +}; + +class MessageCache : public std::enable_shared_from_this +{ +public: + using Ptr = std::shared_ptr; + MessageCache(std::shared_ptr ioService) + : m_ioService(std::move(ioService)) + {} + virtual ~MessageCache() = default; + + void insertCache(std::string const& topic, ppc::protocol::Message::Ptr const& msg, + ppc::protocol::ReceiveMsgFunc callback); + HoldingMessageQueue::Ptr pop(std::string const& topic); + +private: + void onTimeout(HoldingMessageQueue::Ptr const& queue); + +private: + int m_holdingMessageMinutes = 30; + std::shared_ptr m_ioService; + /** + * hold the message for the situation that + * gateway receives message from the other side while the task has not been registered. + */ + mutable bcos::SharedMutex x_msgCache; + std::unordered_map m_msgCache; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h new file mode 100644 index 00000000..e25db02e --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfo.h @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayNodeInfo.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/protocol/INodeInfo.h" +#include +namespace ppc::gateway +{ +class GatewayNodeInfo +{ +public: + using Ptr = std::shared_ptr; + GatewayNodeInfo() = default; + virtual ~GatewayNodeInfo() = default; + + // the gateway nodeID + virtual std::string const& p2pNodeID() const = 0; + // the agency + virtual std::string const& agency() const = 0; + // get the node information by nodeID + virtual ppc::protocol::INodeInfo::Ptr nodeInfo(bcos::bytes const& nodeID) const = 0; + virtual bool tryAddNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) = 0; + virtual void removeNodeInfo(bcos::bytes const& nodeID) = 0; + + virtual std::vector chooseRouteByComponent( + bool selectAll, std::string const& component) const = 0; + virtual std::vector chooseRouterByAgency(bool selectAll) const = 0; + virtual std::vector chooseRouterByTopic( + bool selectAll, std::string const& topic) const = 0; + + virtual void encode(bcos::bytes& data) const = 0; + virtual void decode(bcos::bytesConstRef data) = 0; + + virtual void registerTopic(bcos::bytes const& nodeID, std::string const& topic) = 0; + virtual void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) = 0; + + virtual std::map nodeList() const = 0; +}; + +class GatewayNodeInfoFactory +{ +public: + using Ptr = std::shared_ptr; + GatewayNodeInfoFactory() = default; + virtual ~GatewayNodeInfoFactory() = default; + + virtual GatewayNodeInfo::Ptr build() const = 0; +}; +struct GatewayNodeInfoCmp +{ + bool operator()(GatewayNodeInfo::Ptr const& _first, GatewayNodeInfo::Ptr const& _second) const + { + // increase order + return _first->p2pNodeID() > _second->p2pNodeID(); + } +}; +using GatewayNodeInfos = std::set; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp new file mode 100644 index 00000000..f7c9cefe --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.cpp @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayNodeInfoImpl.cpp + * @author: yujiechen + * @date 2024-08-26 + */ +#include "GatewayNodeInfoImpl.h" +#include "ppc-tars-protocol/Common.h" +#include "ppc-tars-protocol/impl/NodeInfoImpl.h" + +using namespace ppctars; +using namespace ppc::protocol; +using namespace ppc::gateway; + + +// the gateway nodeID +std::string const& GatewayNodeInfoImpl::p2pNodeID() const +{ + return m_inner()->p2pNodeID; +} +// the agency +std::string const& GatewayNodeInfoImpl::agency() const +{ + return m_inner()->agency; +} +// get the node information by nodeID +INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const +{ + bcos::ReadGuard l(x_nodeList); + if (m_nodeList.count(nodeID)) + { + return m_nodeList.at(nodeID); + } + return nullptr; +} + +bool GatewayNodeInfoImpl::tryAddNodeInfo(INodeInfo::Ptr const& info) +{ + auto nodeID = info->nodeID().toBytes(); + auto existedNodeInfo = nodeInfo(nodeID); + // update the info + if (existedNodeInfo == nullptr || !existedNodeInfo->equal(info)) + { + bcos::WriteGuard l(x_nodeList); + m_nodeList[nodeID] = info; + return true; + } + return false; +} + +void GatewayNodeInfoImpl::removeNodeInfo(bcos::bytes const& nodeID) +{ + // remove the nodeInfo + { + bcos::UpgradableGuard l(x_nodeList); + auto it = m_nodeList.find(nodeID); + if (it == m_nodeList.end()) + { + return; + } + bcos::UpgradeGuard ul(l); + m_nodeList.erase(it); + } + // remove the topic info + { + bcos::UpgradableGuard l(x_topicInfo); + auto it = m_topicInfo.find(nodeID); + if (it != m_topicInfo.end()) + { + bcos::UpgradeGuard ul(l); + m_topicInfo.erase(it); + } + } +} + +std::vector GatewayNodeInfoImpl::chooseRouteByComponent( + bool selectAll, std::string const& component) const +{ + std::vector result; + bcos::ReadGuard l(x_nodeList); + for (auto const& it : m_nodeList) + { + if (it.second->components().count(component)) + { + result.emplace_back(it.second->getFront()); + } + if (!result.empty() && !selectAll) + { + break; + } + } + return result; +} + + +vector GatewayNodeInfoImpl::chooseRouterByAgency(bool selectAll) const +{ + std::vector result; + bcos::ReadGuard l(x_nodeList); + for (auto const& it : m_nodeList) + { + result.emplace_back(it.second->getFront()); + if (!result.empty() && !selectAll) + { + break; + } + } + return result; +} + +std::vector GatewayNodeInfoImpl::chooseRouterByTopic( + bool selectAll, std::string const& topic) const +{ + std::vector result; + bcos::ReadGuard l(x_topicInfo); + for (auto const& it : m_topicInfo) + { + INodeInfo::Ptr selectedNode = nullptr; + if (it.second.count(topic)) + { + selectedNode = nodeInfo(it.first); + } + if (selectedNode != nullptr) + { + result.emplace_back(selectedNode->getFront()); + } + if (!result.empty() && !selectAll) + { + break; + } + } + return result; +} +void GatewayNodeInfoImpl::registerTopic(bcos::bytes const& nodeID, std::string const& topic) +{ + bcos::UpgradableGuard l(x_topicInfo); + if (m_topicInfo.count(nodeID) && m_topicInfo.at(nodeID).count(topic)) + { + return; + } + bcos::UpgradeGuard ul(l); + if (!m_topicInfo.count(nodeID)) + { + m_topicInfo[nodeID] = std::set(); + } + m_topicInfo[nodeID].insert(topic); +} + +void GatewayNodeInfoImpl::unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) +{ + bcos::UpgradableGuard l(x_topicInfo); + if (!m_topicInfo.count(nodeID) || !m_topicInfo.at(nodeID).count(topic)) + { + return; + } + bcos::UpgradeGuard ul(l); + m_topicInfo[nodeID].erase(topic); +} + +void GatewayNodeInfoImpl::encode(bcos::bytes& data) const +{ + m_inner()->nodeList.clear(); + { + bcos::ReadGuard l(x_nodeList); + // encode nodeList + for (auto const& it : m_nodeList) + { + auto nodeInfo = std::dynamic_pointer_cast(it.second); + m_inner()->nodeList.emplace_back(nodeInfo->inner()); + } + } + tars::TarsOutputStream output; + m_inner()->writeTo(output); + output.getByteBuffer().swap(data); +} + +void GatewayNodeInfoImpl::decode(bcos::bytesConstRef data) +{ + tars::TarsInputStream input; + input.setBuffer((const char*)data.data(), data.size()); + m_inner()->readFrom(input); + { + bcos::WriteGuard l(x_nodeList); + // decode into m_nodeList + m_nodeList.clear(); + for (auto& it : m_inner()->nodeList) + { + auto nodeInfoPtr = + std::make_shared([m_entry = it]() mutable { return &m_entry; }); + m_nodeList.insert(std::make_pair(nodeInfoPtr->nodeID().toBytes(), nodeInfoPtr)); + } + } +} \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h new file mode 100644 index 00000000..cb0dfc10 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/GatewayNodeInfoImpl.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GatewayNodeInfoImpl.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "GatewayNodeInfo.h" +#include "ppc-tars-protocol/tars/NodeInfo.h" +#include +#include + +namespace ppc::gateway +{ +class GatewayNodeInfoImpl : public GatewayNodeInfo +{ +public: + using Ptr = std::shared_ptr; + GatewayNodeInfoImpl(std::string const& p2pNodeID, std::string const& agency) + : m_inner([inner = ppctars::GatewayNodeInfo()]() mutable { return &inner; }) + { + m_inner()->p2pNodeID = p2pNodeID; + m_inner()->agency = agency; + } + ~GatewayNodeInfoImpl() override = default; + + // the gateway nodeID + std::string const& p2pNodeID() const override; + // the agency + std::string const& agency() const override; + // the node information + + // get the node information by nodeID + ppc::protocol::INodeInfo::Ptr nodeInfo(bcos::bytes const& nodeID) const override; + + void encode(bcos::bytes& data) const override; + void decode(bcos::bytesConstRef data) override; + + bool tryAddNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) override; + void removeNodeInfo(bcos::bytes const& nodeID) override; + + std::vector chooseRouteByComponent( + bool selectAll, std::string const& component) const override; + std::vector chooseRouterByAgency(bool selectAll) const override; + std::vector chooseRouterByTopic( + bool selectAll, std::string const& topic) const override; + + void registerTopic(bcos::bytes const& nodeID, std::string const& topic) override; + void unRegisterTopic(bcos::bytes const& nodeID, std::string const& topic) override; + + std::map nodeList() const override + { + bcos::WriteGuard l(x_nodeList); + return m_nodeList; + } + +private: + std::function m_inner; + // NodeID => nodeInfo + std::map m_nodeList; + mutable bcos::SharedMutex x_nodeList; + + // NodeID=>topics + using Topics = std::set; + std::map m_topicInfo; + mutable bcos::SharedMutex x_topicInfo; +}; + +class GatewayNodeInfoFactoryImpl : public GatewayNodeInfoFactory +{ +public: + using Ptr = std::shared_ptr; + GatewayNodeInfoFactoryImpl(std::string const& p2pNodeID, std::string const& agency) + : m_p2pNodeID(p2pNodeID), m_agency(agency) + {} + ~GatewayNodeInfoFactoryImpl() override = default; + + GatewayNodeInfo::Ptr build() const override + { + return std::make_shared(m_p2pNodeID, m_agency); + } + +private: + std::string m_p2pNodeID; + std::string m_agency; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp new file mode 100644 index 00000000..9df4e45f --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.cpp @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file LocalRouter.h + * @author: yujiechen + * @date 2024-08-26 + */ +#include "LocalRouter.h" +#include "ppc-framework/Common.h" +#include "ppc-framework/gateway/GatewayProtocol.h" + +using namespace bcos; +using namespace ppc::protocol; +using namespace ppc::gateway; + +void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const& topic) +{ + m_routerInfo->registerTopic(_nodeID.toBytes(), topic); + // try to dispatch the cacheInfo + if (!m_cache) + { + return; + } + auto msgQueue = m_cache->pop(topic); + if (!msgQueue) + { + return; + } + if (msgQueue->timer) + { + msgQueue->timer->cancel(); + } + for (auto const& msgInfo : msgQueue->messages) + { + dispatcherMessage(msgInfo.msg, msgInfo.callback, false); + } +} + +void LocalRouter::unRegisterTopic(bcos::bytesConstRef _nodeID, std::string const& topic) +{ + m_routerInfo->unRegisterTopic(_nodeID.toBytes(), topic); +} + +bool LocalRouter::dispatcherMessage(Message::Ptr const& msg, ReceiveMsgFunc callback, bool holding) +{ + auto frontList = chooseReceiver(msg); + // send success + if (!frontList.empty()) + { + for (auto const& front : frontList) + { + front->onReceiveMessage(msg, callback); + } + return true; + } + if (!holding) + { + return false; + } + // no connection found, cache the topic message and dispatcher later + if (msg->header()->routeType() == (uint16_t)RouteType::ROUTE_THROUGH_TOPIC && m_cache) + { + m_cache->insertCache(msg->header()->optionalField()->topic(), msg, callback); + return true; + } + return false; +} + +std::vector LocalRouter::chooseReceiver( + ppc::protocol::Message::Ptr const& msg) +{ + std::vector receivers; + if (msg->header()->optionalField()->dstInst() != m_routerInfo->agency()) + { + return receivers; + } + bool selectAll = + (msg->header()->packetType() == (uint16_t)GatewayPacketType::BroadcastMessage ? true : + false); + switch (msg->header()->routeType()) + { + case (uint16_t)RouteType::ROUTE_THROUGH_NODEID: + { + auto gatewayInfo = m_routerInfo->nodeInfo(msg->header()->optionalField()->dstNode()); + if (gatewayInfo != nullptr) + { + receivers.emplace_back(gatewayInfo->getFront()); + } + return receivers; + } + case (uint16_t)RouteType::ROUTE_THROUGH_COMPONENT: + { + return m_routerInfo->chooseRouteByComponent( + selectAll, msg->header()->optionalField()->componentType()); + } + case (uint16_t)RouteType::ROUTE_THROUGH_AGENCY: + { + return m_routerInfo->chooseRouterByAgency(selectAll); + } + case (uint16_t)RouteType::ROUTE_THROUGH_TOPIC: + { + return m_routerInfo->chooseRouterByTopic( + selectAll, msg->header()->optionalField()->topic()); + } + default: + BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment( + "chooseReceiver failed for unknown routeType, message detail: " + + printMessage(msg))); + } +} diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h new file mode 100644 index 00000000..194ef6ab --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/LocalRouter.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file LocalRouter.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "../cache/MessageCache.h" +#include "GatewayNodeInfo.h" +#include "ppc-framework/protocol/INodeInfo.h" +#include "ppc-framework/protocol/Message.h" +#include "ppc-framework/protocol/RouteType.h" + +namespace ppc::gateway +{ +class LocalRouter +{ +public: + using Ptr = std::shared_ptr; + LocalRouter(GatewayNodeInfoFactory::Ptr nodeInfoFactory, + ppc::front::IFrontBuilder::Ptr frontBuilder, MessageCache::Ptr msgCache) + : m_routerInfo(std::move(nodeInfoFactory->build())), + m_frontBuilder(std::move(frontBuilder)), + m_cache(std::move(msgCache)) + {} + + virtual ~LocalRouter() = default; + + virtual bool registerNodeInfo(ppc::protocol::INodeInfo::Ptr const& nodeInfo) + { + nodeInfo->setFront(m_frontBuilder->buildClient(nodeInfo->endPoint())); + return m_routerInfo->tryAddNodeInfo(nodeInfo); + } + + virtual void unRegisterNode(bcos::bytes const& nodeID) { m_routerInfo->removeNodeInfo(nodeID); } + + virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic); + virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic); + + virtual std::vector chooseReceiver( + ppc::protocol::Message::Ptr const& msg); + + // TODO: register component + virtual bool dispatcherMessage(ppc::protocol::Message::Ptr const& msg, + ppc::protocol::ReceiveMsgFunc callback, bool holding = true); + +private: + ppc::front::IFrontBuilder::Ptr m_frontBuilder; + GatewayNodeInfo::Ptr m_routerInfo; + + // NodeID=>topics + using Topics = std::set; + std::map m_topicInfo; + mutable bcos::SharedMutex x_topicInfo; + + MessageCache::Ptr m_cache; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp new file mode 100644 index 00000000..a2bccd0a --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.cpp @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file PeerRouterTable.cpp + * @author: yujiechen + * @date 2024-08-27 + */ +#include "PeerRouterTable.h" +#include "ppc-framework/Common.h" +#include + +using namespace bcos; +using namespace ppc; +using namespace ppc::gateway; +using namespace ppc::protocol; + +void PeerRouterTable::updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo) +{ + auto nodeList = gatewayInfo->nodeList(); + bcos::WriteGuard l(x_mutex); + for (auto const& it : nodeList) + { + // update nodeID => gatewayInfos + m_nodeID2GatewayInfos[it.first].insert(gatewayInfo); + } + // update agency => gatewayInfos + m_agency2GatewayInfos[gatewayInfo->agency()].insert(gatewayInfo); +} + +GatewayNodeInfos PeerRouterTable::selectRouter( + RouteType const& routeType, Message::Ptr const& msg) const +{ + switch (routeType) + { + case RouteType::ROUTE_THROUGH_NODEID: + return selectRouterByNodeID(msg); + case RouteType::ROUTE_THROUGH_COMPONENT: + return selectRouterByComponent(msg); + case RouteType::ROUTE_THROUGH_AGENCY: + case RouteType::ROUTE_THROUGH_TOPIC: + return selectRouterByAgency(msg); + default: + BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment( + "selectRouter failed for encounter unsupported routeType: " + + std::to_string((uint16_t)routeType))); + } +} + +GatewayNodeInfos PeerRouterTable::selectRouterByNodeID(Message::Ptr const& msg) const +{ + GatewayNodeInfos result; + bcos::ReadGuard l(x_mutex); + auto it = m_nodeID2GatewayInfos.find(msg->header()->optionalField()->dstNode()); + // no router found + if (it == m_nodeID2GatewayInfos.end()) + { + return result; + } + return it->second; +} + + +GatewayNodeInfos PeerRouterTable::selectRouterByAgency(Message::Ptr const& msg) const +{ + GatewayNodeInfos result; + bcos::ReadGuard l(x_mutex); + auto it = m_agency2GatewayInfos.find(msg->header()->optionalField()->dstInst()); + // no router found + if (it == m_agency2GatewayInfos.end()) + { + return result; + } + return it->second; +} + +GatewayNodeInfos PeerRouterTable::selectRouterByComponent(Message::Ptr const& msg) const +{ + GatewayNodeInfos result; + bcos::ReadGuard l(x_mutex); + auto it = m_agency2GatewayInfos.find(msg->header()->optionalField()->dstInst()); + // no router found + if (it == m_agency2GatewayInfos.end()) + { + return result; + } + auto const& gatewayInfos = it->second; + // foreach all gateways to find the component + for (auto const& it : gatewayInfos) + { + auto const& nodeListInfo = it->nodeList(); + for (auto const& nodeInfo : nodeListInfo) + { + if (nodeInfo.second->components().count( + msg->header()->optionalField()->componentType())) + { + result.insert(it); + break; + } + } + } + return result; +} + +void PeerRouterTable::asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const +{ + bcos::ReadGuard l(x_mutex); + for (auto const& it : m_agency2GatewayInfos) + { + auto selectedIndex = rand() % it.second.size(); + auto iterator = it.second.begin(); + if (selectedIndex > 0) + { + std::advance(iterator, selectedIndex); + } + auto selectedNode = *iterator; + // ignore self + if (selectedNode->p2pNodeID() == m_service->nodeID()) + { + continue; + } + GATEWAY_LOG(TRACE) << LOG_DESC("asyncBroadcastMessage") + << LOG_KV("nodeID", printP2PIDElegantly(selectedNode->p2pNodeID())) + << LOG_KV("msg", printMessage(msg)); + m_service->asyncSendMessageByNodeID(selectedNode->p2pNodeID(), msg); + } +} diff --git a/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h new file mode 100644 index 00000000..7a433303 --- /dev/null +++ b/cpp/ppc-gateway/ppc-gateway/gateway/router/PeerRouterTable.h @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file PeerRouterTable.h + * @author: yujiechen + * @date 2024-08-27 + */ +#pragma once +#include "GatewayNodeInfo.h" +#include "ppc-framework/protocol/Message.h" +#include "ppc-framework/protocol/RouteType.h" +#include "ppc-gateway/p2p/Service.h" +#include +#include + +namespace ppc::gateway +{ +class PeerRouterTable +{ +public: + using Ptr = std::shared_ptr; + PeerRouterTable(Service::Ptr service) : m_service(std::move(service)) {} + virtual ~PeerRouterTable() = default; + + virtual void updateGatewayInfo(GatewayNodeInfo::Ptr const& gatewayInfo); + virtual GatewayNodeInfos selectRouter( + ppc::protocol::RouteType const& routeType, ppc::protocol::Message::Ptr const& msg) const; + + virtual void asyncBroadcastMessage(ppc::protocol::Message::Ptr const& msg) const; + +private: + virtual GatewayNodeInfos selectRouterByNodeID(ppc::protocol::Message::Ptr const& msg) const; + virtual GatewayNodeInfos selectRouterByComponent(ppc::protocol::Message::Ptr const& msg) const; + virtual GatewayNodeInfos selectRouterByAgency(ppc::protocol::Message::Ptr const& msg) const; + + +private: + Service::Ptr m_service; + // nodeID => p2pNodes + std::map m_nodeID2GatewayInfos; + // agency => p2pNodes + std::map m_agency2GatewayInfos; + mutable bcos::SharedMutex x_mutex; +}; +} // namespace ppc::gateway \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp b/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp index 469e5b27..fd36e94e 100644 --- a/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp +++ b/cpp/ppc-gateway/ppc-gateway/p2p/Service.cpp @@ -296,4 +296,28 @@ void Service::asyncSendMessageByP2PNodeID(uint16_t type, std::string const& dstN message->setPacketType(type); message->setPayload(payload); asyncSendMessageByNodeID(dstNodeID, message, options, callback); +} + +void Service::sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& session, + bcos::boostssl::MessageFace::Ptr msg, std::shared_ptr&& payload) +{ + auto respMessage = std::dynamic_pointer_cast(m_messageFactory->buildMessage()); + auto requestMsg = std::dynamic_pointer_cast(msg); + if (requestMsg->header() && requestMsg->header()->optionalField()) + { + respMessage->header()->optionalField()->setDstNode( + requestMsg->header()->optionalField()->srcNode()); + respMessage->header()->optionalField()->setSrcNode( + requestMsg->header()->optionalField()->dstNode()); + } + respMessage->header()->setTraceID(requestMsg->header()->traceID()); + respMessage->header()->setRespPacket(); + respMessage->header()->setRouteType(ppc::protocol::RouteType::ROUTE_THROUGH_NODEID); + respMessage->setPayload(std::move(payload)); + + WsSessions sessions; + sessions.emplace_back(session); + WsService::asyncSendMessage(sessions, respMessage); + GATEWAY_LOG(TRACE) << "sendRespMessageBySession" << LOG_KV("resp", printMessage(respMessage)) + << LOG_KV("payload size", payload->size()); } \ No newline at end of file diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/Service.h b/cpp/ppc-gateway/ppc-gateway/p2p/Service.h index a3c8f5ea..e78aff23 100644 --- a/cpp/ppc-gateway/ppc-gateway/p2p/Service.h +++ b/cpp/ppc-gateway/ppc-gateway/p2p/Service.h @@ -36,7 +36,8 @@ class Service : public bcos::boostssl::ws::WsService bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options(), bcos::boostssl::ws::RespCallBack respFunc = bcos::boostssl::ws::RespCallBack()); - virtual void asyncSendMessageByP2PNodeID(uint16_t type, std::string const& dstNodeID, + + virtual void asyncSendMessageByP2PNodeID(uint16_t packetType, std::string const& dstNodeID, std::shared_ptr payload, bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options(), bcos::boostssl::ws::RespCallBack callback = bcos::boostssl::ws::RespCallBack()); @@ -44,10 +45,17 @@ class Service : public bcos::boostssl::ws::WsService virtual void asyncBroadcastMessage(bcos::boostssl::MessageFace::Ptr msg, bcos::boostssl::ws::Options options = bcos::boostssl::ws::Options()); + virtual void sendRespMessageBySession(bcos::boostssl::ws::WsSession::Ptr const& session, + bcos::boostssl::MessageFace::Ptr msg, std::shared_ptr&& payload); + RouterTableFactory::Ptr const& routerTableFactory() const { return m_routerTableFactory; } RouterTableInterface::Ptr const& routerTable() const { return m_routerTable; } std::string const& nodeID() const { return m_nodeID; } + bcos::boostssl::MessageFaceFactory::Ptr const& messageFactory() const + { + return m_messageFactory; + } protected: void onRecvMessage(bcos::boostssl::MessageFace::Ptr _msg, diff --git a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h index f99c0182..a1701c9e 100644 --- a/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h +++ b/cpp/ppc-gateway/ppc-gateway/p2p/router/RouterManager.h @@ -28,6 +28,7 @@ namespace ppc::gateway class RouterManager { public: + using Ptr = std::shared_ptr; RouterManager(Service::Ptr service); virtual ~RouterManager() = default; diff --git a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp index 08ba9cdd..f14e1cac 100644 --- a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp +++ b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.cpp @@ -29,9 +29,10 @@ using namespace ppc; void MessageOptionalHeaderImpl::encode(bcos::bytes& buffer) const { // the componentType - uint16_t componentType = - boost::asio::detail::socket_ops::host_to_network_short(m_componentType); - buffer.insert(buffer.end(), (byte*)&componentType, (byte*)&componentType + 2); + uint16_t componentTypeLen = + boost::asio::detail::socket_ops::host_to_network_short(m_componentType.size()); + buffer.insert(buffer.end(), (byte*)&componentTypeLen, (byte*)&componentTypeLen + 2); + buffer.insert(buffer.end(), m_componentType.begin(), m_componentType.end()); // the source nodeID that send the message uint16_t srcNodeLen = boost::asio::detail::socket_ops::host_to_network_short(m_srcNode.size()); buffer.insert(buffer.end(), (byte*)&srcNodeLen, (byte*)&srcNodeLen + 2); @@ -55,13 +56,17 @@ int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t con // the componentType auto pointer = data.data() + offset; m_componentType = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); - pointer += 2; + bcos::bytes componentType; + offset = decodeNetworkBuffer(componentType, data.data(), data.size(), (pointer - data.data())); + m_componentType = std::string(componentType.begin(), componentType.end()); // srcNode - offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), (pointer - data.data())); + offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset); // dstNode offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset); - // dstInst - offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset); + // dstInst, TODO: optimize here + bcos::bytes dstInstData; + offset = decodeNetworkBuffer(dstInstData, data.data(), data.size(), offset); + m_dstInst = std::string(dstInstData.begin(), dstInstData.end()); return offset; } @@ -144,3 +149,47 @@ int64_t MessageHeaderImpl::decode(bcos::bytesConstRef data) m_length = offset; return offset; } + +uint16_t MessageHeaderImpl::routeType() const +{ + if (m_ext & (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByComponent) + { + return (uint16_t)RouteType::ROUTE_THROUGH_COMPONENT; + } + if (m_ext & (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByAgency) + { + return (uint16_t)RouteType::ROUTE_THROUGH_AGENCY; + } + if (m_ext & (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByAgency) + { + return (uint16_t)RouteType::ROUTE_THROUGH_AGENCY; + } + if (m_ext & (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByTopic) + { + return (uint16_t)RouteType::ROUTE_THROUGH_TOPIC; + } + // default is route though nodeID + return (uint16_t)RouteType::ROUTE_THROUGH_NODEID; +} + +void MessageHeaderImpl::setRouteType(ppc::protocol::RouteType type) +{ + switch (type) + { + case RouteType::ROUTE_THROUGH_NODEID: + m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByNodeID; + break; + case RouteType::ROUTE_THROUGH_COMPONENT: + m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByComponent; + break; + case RouteType::ROUTE_THROUGH_AGENCY: + m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByAgency; + break; + case RouteType::ROUTE_THROUGH_TOPIC: + m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::RouteByTopic; + break; + default: + BOOST_THROW_EXCEPTION(WeDPRException() << errinfo_comment( + "Invalid route type: " + std::to_string((uint16_t)type))); + } +} \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h index a8dda5a9..9d9c3850 100644 --- a/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h +++ b/cpp/ppc-protocol/src/v1/MessageHeaderImpl.h @@ -61,6 +61,9 @@ class MessageHeaderImpl : public MessageHeader } void setRespPacket() override { m_ext |= (uint16_t)ppc::gateway::GatewayMsgExtFlag::Response; } + uint16_t routeType() const override; + void setRouteType(ppc::protocol::RouteType type) override; + private: // version(2) + packetType(2) + ttl(2) + ext(2) + traceIDLen(2) + srcGwNodeLen(2) + dstGwNode(2) const size_t MESSAGE_MIN_LENGTH = 14; diff --git a/cpp/ppc-protocol/src/v1/MessageImpl.h b/cpp/ppc-protocol/src/v1/MessageImpl.h index 208db1b3..aec9cb3d 100644 --- a/cpp/ppc-protocol/src/v1/MessageImpl.h +++ b/cpp/ppc-protocol/src/v1/MessageImpl.h @@ -20,6 +20,9 @@ #pragma once #include "ppc-framework/Common.h" #include "ppc-framework/protocol/Message.h" +#include +#include +#include namespace ppc::protocol { @@ -75,6 +78,31 @@ class MessageBuilderImpl : public MessageBuilder { return std::make_shared(m_msgHeaderBuilder, m_maxMessageLen, buffer); } + Message::Ptr build(ppc::protocol::RouteType routeType, std::string const& topic, + std::string const& dstInst, bcos::bytes const& dstNodeID, std::string const& componentType, + bcos::bytes&& payload) override + { + auto msg = build(); + msg->header()->setRouteType(routeType); + msg->header()->optionalField()->setDstInst(dstInst); + msg->header()->optionalField()->setDstNode(dstNodeID); + msg->header()->optionalField()->setTopic(topic); + msg->header()->optionalField()->setComponentType(componentType); + msg->setPayload(std::make_shared(std::move(payload))); + return msg; + } + + bcos::boostssl::MessageFace::Ptr buildMessage() override + { + return std::make_shared(m_msgHeaderBuilder, m_maxMessageLen); + } + + std::string newSeq() override + { + std::string seq = boost::uuids::to_string(boost::uuids::random_generator()()); + seq.erase(std::remove(seq.begin(), seq.end(), '-'), seq.end()); + return seq; + } private: MessageHeaderBuilder::Ptr m_msgHeaderBuilder; diff --git a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp index 59397edf..360567b4 100644 --- a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp +++ b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.cpp @@ -31,10 +31,9 @@ int64_t MessagePayloadImpl::encode(bcos::bytes& buffer) const // version uint16_t version = boost::asio::detail::socket_ops::host_to_network_short(m_version); buffer.insert(buffer.end(), (byte*)&version, (byte*)&version + 2); - // topic - uint16_t topicLen = boost::asio::detail::socket_ops::host_to_network_short(m_topic.size()); - buffer.insert(buffer.end(), (byte*)&topicLen, (byte*)&topicLen + 2); - buffer.insert(buffer.end(), m_topic.begin(), m_topic.end()); + // seq + uint16_t seq = boost::asio::detail::socket_ops::host_to_network_short(m_seq); + buffer.insert(buffer.end(), (byte*)&seq, (byte*)&seq + 2); // data uint16_t dataLen = boost::asio::detail::socket_ops::host_to_network_short(m_data.size()); buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 2); @@ -56,12 +55,10 @@ int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer) // the version m_version = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); pointer += 2; - // topic - bcos::bytes topicData; - auto offset = - decodeNetworkBuffer(topicData, buffer.data(), buffer.size(), (pointer - buffer.data())); - m_topic = std::string(topicData.begin(), topicData.end()); + // the seq + CHECK_OFFSET_WITH_THROW_EXCEPTION((pointer - buffer.data()), buffer.size()); + m_seq = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer)); + pointer += 2; // data - offset = decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset); - return offset; + return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), (pointer - buffer.data())); } \ No newline at end of file diff --git a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h index f7f5a3e8..c85c0589 100644 --- a/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h +++ b/cpp/ppc-protocol/src/v1/MessagePayloadImpl.h @@ -35,6 +35,7 @@ class MessagePayloadImpl : public MessagePayload int64_t decode(bcos::bytesConstRef data) override; private: + // version + seq + dataLen const unsigned int MIN_PAYLOAD_LEN = 6; }; diff --git a/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.cpp b/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.cpp new file mode 100644 index 00000000..e8f585d4 --- /dev/null +++ b/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.cpp @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file NodeInfoImpl.h + * @author: yujiechen + * @date 2024-08-26 + */ + +#include "NodeInfoImpl.h" +#include "../Common.h" + +using namespace ppctars; +using namespace ppc::protocol; + +void NodeInfoImpl::encode(bcos::bytes& data) const +{ + tars::TarsOutputStream output; + m_inner()->writeTo(output); + output.getByteBuffer().swap(data); +} +void NodeInfoImpl::decode(bcos::bytesConstRef data) +{ + tars::TarsInputStream input; + input.setBuffer((const char*)data.data(), data.size()); + m_inner()->readFrom(input); + m_components = + std::set(m_inner()->components.begin(), m_inner()->components.end()); +} \ No newline at end of file diff --git a/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.h b/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.h new file mode 100644 index 00000000..ff4e1627 --- /dev/null +++ b/cpp/ppc-tars-protocol/ppc-tars-protocol/impl/NodeInfoImpl.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2023 WeDPR. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file NodeInfoImpl.h + * @author: yujiechen + * @date 2024-08-26 + */ +#pragma once +#include "ppc-framework/protocol/INodeInfo.h" +#include "ppc-tars-protocol/tars/NodeInfo.h" +#include +namespace ppc::protocol +{ +// the node information +class NodeInfoImpl : public INodeInfo +{ +public: + using Ptr = std::shared_ptr; + explicit NodeInfoImpl(std::function inner) : m_inner(std::move(inner)) {} + + NodeInfoImpl(bcos::bytesConstRef const& nodeID) + : m_inner([inner = ppctars::NodeInfo()]() mutable { return &inner; }) + { + m_inner()->nodeID = std::vector(nodeID.begin(), nodeID.end()); + } + NodeInfoImpl(bcos::bytesConstRef const& nodeID, std::string const& endPoint) + : NodeInfoImpl(nodeID) + { + m_inner()->endPoint = endPoint; + } + ~NodeInfoImpl() override = default; + + void setComponents(std::vector const& components) override + { + m_components = std::set(components.begin(), components.end()); + m_inner()->components = components; + } + std::set const& components() const override { return m_components; } + + std::string const& endPoint() const override { return m_inner()->endPoint; } + + bcos::bytesConstRef nodeID() const override + { + return {reinterpret_cast(m_inner()->nodeID.data()), + m_inner()->nodeID.size()}; + } + + void encode(bcos::bytes& data) const override; + void decode(bcos::bytesConstRef data) override; + ppctars::NodeInfo const& inner() { return *(m_inner()); } + + void setFront(ppc::front::IFront::Ptr&& front) override { m_front = std::move(front); } + ppc::front::IFront::Ptr const& getFront() const override { return m_front; } + +private: + ppc::front::IFront::Ptr m_front; + std::set m_components; + std::function m_inner; +}; + +class NodeInfoFactory : public INodeInfoFactory +{ +public: + using Ptr = std::shared_ptr; + NodeInfoFactory(bcos::bytesConstRef const& nodeID) : INodeInfoFactory(nodeID.toBytes()) {} + ~NodeInfoFactory() override {} + + INodeInfo::Ptr build() override { return std::make_shared(bcos::ref(m_nodeID)); } + + + INodeInfo::Ptr build(std::string const& endPoint) override + { + return std::make_shared(bcos::ref(m_nodeID), endPoint); + } +}; +} // namespace ppc::protocol \ No newline at end of file diff --git a/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars new file mode 100644 index 00000000..9a0c67ec --- /dev/null +++ b/cpp/ppc-tars-protocol/ppc-tars-protocol/tars/NodeInfo.tars @@ -0,0 +1,15 @@ +module ppctars +{ + struct NodeInfo + { + 1 require vector nodeID; + 2 require string endPoint; + 3 optional vector components; + }; + struct GatewayNodeInfo + { + 1 require string p2pNodeID; + 2 require string agency; + 3 optional vector nodeList; + }; +};