Skip to content

Commit

Permalink
fix push_by_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 5, 2024
1 parent 241fd1a commit e801e17
Showing 1 changed file with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ void LocalRouter::registerTopic(bcos::bytesConstRef _nodeID, std::string const&
}
for (auto const& msgInfo : msgQueue->messages)
{
LOCAL_ROUTER_LOG(INFO) << LOG_DESC("registerTopic, dispatcher the holding msg queue")
<< LOG_KV("topic", topic) << LOG_KV("nodeID", printNodeID(_nodeID));
dispatcherMessage(msgInfo.msg, msgInfo.callback, false);
}
}
Expand All @@ -80,6 +82,15 @@ bool LocalRouter::dispatcherMessage(
P2PMessage::Ptr const& msg, ReceiveMsgFunc callback, bool holding)
{
auto frontList = chooseReceiver(msg);
auto commonCallback = [](bcos::Error::Ptr error) {
if (!error || error->errorCode() == 0)
{
return;
}
LOCAL_ROUTER_LOG(WARNING) << LOG_DESC("dispatcherMessage to front failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
};
// find the front
if (!frontList.empty())
{
Expand All @@ -93,15 +104,7 @@ bool LocalRouter::dispatcherMessage(
}
else
{
front->onReceiveMessage(msg->msg(), [](bcos::Error::Ptr error) {
if (!error || error->errorCode() == 0)
{
return;
}
LOCAL_ROUTER_LOG(WARNING) << LOG_DESC("dispatcherMessage to front failed")
<< LOG_KV("code", error->errorCode())
<< LOG_KV("msg", error->errorMessage());
});
front->onReceiveMessage(msg->msg(), commonCallback);
}
i++;
}
Expand All @@ -122,7 +125,12 @@ bool LocalRouter::dispatcherMessage(
// no connection found, cache the topic message and dispatcher later
if (msg->header()->routeType() == (uint16_t)RouteType::ROUTE_THROUGH_TOPIC && m_cache)
{
m_cache->insertCache(msg->header()->optionalField()->topic(), msg, callback);
// send response when hodling the message
if (callback)
{
callback(nullptr);
}
m_cache->insertCache(msg->header()->optionalField()->topic(), msg, commonCallback);
return true;
}
return false;
Expand Down

0 comments on commit e801e17

Please sign in to comment.