Skip to content

Commit

Permalink
server: more efficient response to batched subscriptions (#1141)
Browse files Browse the repository at this point in the history
* server: more efficient response to batched subscriptions

* add sndMsgQ for interleaving messages with replies

* remove redundant liftIO

* refactor

* refactor2

* rename

* fix

* diff

* remove comment

* remove comment

---------

Co-authored-by: Alexander Bondarenko <[email protected]>
  • Loading branch information
epoberezkin and dpwiz authored May 10, 2024
1 parent dc11143 commit 727fd8b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 21 deletions.
50 changes: 30 additions & 20 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "AUTH"

runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do
runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
Expand All @@ -422,11 +422,12 @@ runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} =
pure new
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg)
raceAny_ ([liftIO $ send th c, liftIO $ sendMsg th c, client c s, receive h c] <> disconnectThread_ c expCfg)
`finally` clientDisconnected c
where
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ _ _ = []
noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c)

Expand Down Expand Up @@ -459,10 +460,10 @@ cancelSub sub =
_ -> return ()

receive :: Transport c => THandleSMP c 'TServer -> Client -> M ()
receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
forever $ do
ts <- L.toList <$> liftIO (tGet th)
ts <- L.toList <$> liftIO (tGet h)
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
as <- partitionEithers <$> mapM cmdAction ts
write sndQ $ fst as
Expand All @@ -479,33 +480,41 @@ receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActi
VRFailed -> Left (corrId, queueId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty

send :: Transport c => THandleSMP c 'TServer -> Client -> IO ()
send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
send th c@Client {sndQ, msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ do
sendTransmissions =<< atomically (readTBQueue sndQ)
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
where
sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO ()
sendTransmissions ts
| L.length ts <= 2 = tSend ts
| L.length ts <= 2 = tSend th c ts
| otherwise = do
let (msgs, ts') = mapAccumR splitMessages [] ts
let (msgs_, ts') = mapAccumR splitMessages [] ts
-- If the request had batched subscriptions (L.length ts > 2)
-- this will reply OK to all SUBs in the first batched transmission,
-- to reduce client timeouts.
tSend ts'
tSend th c ts'
-- After that all messages will be sent in separate transmissions,
-- without any client response timeouts.
mapM_ tSend (L.nonEmpty msgs)
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_
where
splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg)
splitMessages msgs t@(corrId, entId, cmd) = case cmd of
-- replace MSG response with OK, accumulating MSG in a separate list.
MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK))
_ -> (msgs, t)
tSend :: NonEmpty (Transmission BrokerMsg) -> IO ()
tSend = void . tPut h . L.map (\t -> Right (Nothing, encodeTransmission params t))

sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
sendMsg th c@Client {msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg"
forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t])

tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO ()
tSend th Client {sndActiveAt} ts = do
withMVar th $ \h@THandle {params} ->
void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime

disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO ()
disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do
Expand Down Expand Up @@ -989,9 +998,10 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag
>>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId)

restoreServerMessages :: M Int
restoreServerMessages = asks (storeMsgsFile . config) >>= \case
Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0)
Nothing -> pure 0
restoreServerMessages =
asks (storeMsgsFile . config) >>= \case
Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0)
Nothing -> pure 0
where
restoreMessages f = do
logInfo $ "restoring messages from file " <> T.pack f
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ data Client = Client
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
endThreads :: TVar (IntMap (Weak ThreadId)),
endThreadSeq :: TVar Int,
thVersion :: VersionSMP,
Expand Down Expand Up @@ -161,12 +162,13 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
ntfSubscriptions <- TM.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
endThreads <- newTVar IM.empty
endThreadSeq <- newTVar 0
connected <- newTVar True
rcvActiveAt <- newTVar createdAt
sndActiveAt <- newTVar createdAt
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}

newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
Expand Down

0 comments on commit 727fd8b

Please sign in to comment.