Skip to content

Commit

Permalink
remove client from servers subscribers map after client disconnection (
Browse files Browse the repository at this point in the history
…#228)

Co-authored-by: Efim Poberezkin <[email protected]>
  • Loading branch information
epoberezkin and spaced4ndy authored Dec 17, 2021
1 parent f15067c commit 5f7fe8b
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 22 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- master
- stable
tags:
- "v*"
pull_request:
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
[![GitHub build](https://github.com/simplex-chat/simplexmq/workflows/build/badge.svg)](https://github.com/simplex-chat/simplexmq/actions?query=workflow%3Abuild)
[![GitHub release](https://img.shields.io/github/v/release/simplex-chat/simplexmq)](https://github.com/simplex-chat/simplexmq/releases)

📢 **v0.5.1 brings a hotfix to the server's subscription management logic, to apply it log in to your server via SSH and run the following command. If you have store log enabled for your server, information about already established queues will be preserved.** If you're doing a custom installation instead of Linode or DigitalOcean you may have to change the path for binary download.

```sh
systemctl stop smp-server && curl -L -o /opt/simplex/bin/smp-server https://github.com/simplex-chat/simplexmq/releases/download/v0.5.1/smp-server-ubuntu-20_04-x86-64 && chmod +x /opt/simplex/bin/smp-server && systemctl start smp-server
```

## Message broker for unidirectional (simplex) queues

SimpleXMQ is a message broker for managing message queues and sending messages over public network. It consists of SMP server, SMP client library and SMP agent that implement [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md) for client-server communication and [SMP agent protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md) to manage duplex connections via simplex queues on multiple SMP servers.
Expand Down
1 change: 1 addition & 0 deletions apps/smp-server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ serverConfig :: ServerConfig
serverConfig =
ServerConfig
{ tbqSize = 16,
serverTbqSize = 128,
msgQueueQuota = 256,
queueIdBytes = 12,
msgIdBytes = 6,
Expand Down
49 changes: 35 additions & 14 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,25 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
runServer (tcpPort, ATransport t) = runTransportServer started tcpPort (runClient t)

serverThread :: MonadUnliftIO m' => Server -> m' ()
serverThread Server {subscribedQ, subscribers} = forever . atomically $ do
(rId, clnt) <- readTBQueue subscribedQ
cs <- readTVar subscribers
case M.lookup rId cs of
Just Client {rcvQ} -> writeTBQueue rcvQ (CorrId B.empty, rId, Cmd SBroker END)
Nothing -> return ()
writeTVar subscribers $ M.insert rId clnt cs
serverThread Server {subscribedQ, subscribers} = forever $ do
atomically updateSubscribers >>= \case
Just (rId, Client {rcvQ}) ->
void . forkIO . atomically $
writeTBQueue rcvQ (CorrId "", rId, Cmd SBroker END)
_ -> pure ()
where
updateSubscribers :: STM (Maybe (RecipientId, Client))
updateSubscribers = do
(rId, c) <- readTBQueue subscribedQ
stateTVar subscribers (\cs -> (M.lookup rId cs, M.insert rId c cs)) >>= \case
Just c' -> clientToBeNotified rId c c'
_ -> pure Nothing
clientToBeNotified :: RecipientId -> Client -> Client -> STM (Maybe (RecipientId, Client))
clientToBeNotified rId c c'@Client {connected}
| clientId c /= clientId c' = do
yes <- readTVar connected
pure $ if yes then Just (rId, c') else Nothing
| otherwise = pure Nothing

runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
runClient _ h = do
Expand All @@ -98,14 +110,23 @@ runClient _ h = do
runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m ()
runClientTransport th = do
q <- asks $ tbqSize . config
c <- atomically $ newClient q
s <- asks server
c <- atomically $ newClient s q
raceAny_ [send th c, client c s, receive th c]
`finally` cancelSubscribers c

cancelSubscribers :: MonadUnliftIO m => Client -> m ()
cancelSubscribers Client {subscriptions} =
readTVarIO subscriptions >>= mapM_ cancelSub
`finally` clientDisconnected c

clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m ()
clientDisconnected c@Client {subscriptions, connected} = do
atomically $ writeTVar connected False
subs <- readTVarIO subscriptions
mapM_ cancelSub subs
cs <- asks $ subscribers . server
atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs
where
deleteCurrentClient :: Client -> Maybe Client
deleteCurrentClient c'
| clientId c == clientId c' = Nothing
| otherwise = Just c'

cancelSub :: MonadUnliftIO m => Sub -> m ()
cancelSub = \case
Expand Down Expand Up @@ -326,7 +347,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
subscriber :: MsgQueue -> m ()
subscriber q = atomically $ do
msg <- peekMsg q
writeTBQueue sndQ $ mkResp (CorrId B.empty) rId (msgCmd msg)
writeTBQueue sndQ $ mkResp (CorrId "") rId (msgCmd msg)
setSub (\s -> s {subThread = NoSub})
void setDelivered

Expand Down
22 changes: 15 additions & 7 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
module Simplex.Messaging.Server.Env.STM where

import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (stateTVar)
import Control.Monad.IO.Unlift
import Crypto.Random
import Data.Map.Strict (Map)
Expand All @@ -25,6 +26,7 @@ import UnliftIO.STM
data ServerConfig = ServerConfig
{ transports :: [(ServiceName, ATransport)],
tbqSize :: Natural,
serverTbqSize :: Natural,
msgQueueQuota :: Natural,
queueIdBytes :: Int,
msgIdBytes :: Int,
Expand All @@ -46,13 +48,16 @@ data Env = Env

data Server = Server
{ subscribedQ :: TBQueue (RecipientId, Client),
subscribers :: TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client),
nextClientId :: TVar Natural
}

data Client = Client
{ subscriptions :: TVar (Map RecipientId Sub),
rcvQ :: TBQueue Transmission,
sndQ :: TBQueue Transmission
sndQ :: TBQueue Transmission,
clientId :: Natural,
connected :: TVar Bool
}

data SubscriptionThread = NoSub | SubPending | SubThread ThreadId
Expand All @@ -66,14 +71,17 @@ newServer :: Natural -> STM Server
newServer qSize = do
subscribedQ <- newTBQueue qSize
subscribers <- newTVar M.empty
return Server {subscribedQ, subscribers}
nextClientId <- newTVar 0
return Server {subscribedQ, subscribers, nextClientId}

newClient :: Natural -> STM Client
newClient qSize = do
newClient :: Server -> Natural -> STM Client
newClient Server {nextClientId} qSize = do
subscriptions <- newTVar M.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
return Client {subscriptions, rcvQ, sndQ}
clientId <- stateTVar nextClientId $ \i -> (i, i + 1)
connected <- newTVar True
return Client {subscriptions, rcvQ, sndQ, clientId, connected}

newSubscription :: STM Sub
newSubscription = do
Expand All @@ -82,7 +90,7 @@ newSubscription = do

newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
newEnv config = do
server <- atomically $ newServer (tbqSize config)
server <- atomically $ newServer (serverTbqSize config)
queueStore <- atomically newQueueStore
msgStore <- atomically newMsgStore
idsDrg <- drgNew >>= newTVarIO
Expand Down
1 change: 1 addition & 0 deletions tests/SMPClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ cfg =
ServerConfig
{ transports = undefined,
tbqSize = 1,
serverTbqSize = 1,
msgQueueQuota = 4,
queueIdBytes = 12,
msgIdBytes = 6,
Expand Down
19 changes: 18 additions & 1 deletion tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module ServerTests where
import Control.Concurrent (ThreadId, killThread)
import Control.Concurrent.STM
import Control.Exception (SomeException, try)
import Control.Monad.Except (forM_, runExceptT)
import Control.Monad.Except (forM, forM_, runExceptT)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
Expand All @@ -34,6 +34,7 @@ serverTests t = do
describe "SMP queues" do
describe "NEW and KEY commands, SEND messages" $ testCreateSecure t
describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t
describe "Stress test" $ stressTest t
describe "SMP messages" do
describe "duplex communication over 2 SMP connections" $ testDuplex t
describe "switch subscription to another SMP queue" $ testSwitchSub t
Expand Down Expand Up @@ -179,6 +180,22 @@ testCreateDelete (ATransport t) =
Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, "SUB")
(err10, ERR AUTH) #== "rejects SUB when deleted"

stressTest :: ATransport -> Spec
stressTest (ATransport t) =
it "should create many queues, disconnect and re-connect" $
smpTest3 t $ \h1 h2 h3 -> do
(rPub, rKey) <- C.generateKeyPair rsaKeySize
rIds <- forM [1 .. 50 :: Int] . const $ do
Resp "" "" (IDS rId _) <- signSendRecv h1 rKey ("", "", "NEW " <> C.serializePubKey rPub)
pure rId
let subscribeQueues h = forM_ rIds $ \rId -> do
Resp "" rId' OK <- signSendRecv h rKey ("", rId, "SUB")
rId' `shouldBe` rId
closeConnection $ connection h1
subscribeQueues h2
closeConnection $ connection h2
subscribeQueues h3

testDuplex :: ATransport -> Spec
testDuplex (ATransport t) =
it "should create 2 simplex connections and exchange messages" $
Expand Down

0 comments on commit 5f7fe8b

Please sign in to comment.