Skip to content

Commit

Permalink
server: log stats for QUOTA and other errors (#1177)
Browse files Browse the repository at this point in the history
* server: log stats for QUOTA errors

* fix test

* more stats

* remove duplicate column
  • Loading branch information
epoberezkin authored May 28, 2024
1 parent 199f85e commit e55ec07
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 45 deletions.
101 changes: 67 additions & 34 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
Expand All @@ -240,7 +240,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
qDeletedAll' <- atomically $ swapTVar qDeletedAll 0
qDeletedNew' <- atomically $ swapTVar qDeletedNew 0
qDeletedSecured' <- atomically $ swapTVar qDeletedSecured 0
qSub' <- atomically $ swapTVar qSub 0
qSubAuth' <- atomically $ swapTVar qSubAuth 0
qSubDuplicate' <- atomically $ swapTVar qSubDuplicate 0
qSubProhibited' <- atomically $ swapTVar qSubProhibited 0
msgSent' <- atomically $ swapTVar msgSent 0
msgSentAuth' <- atomically $ swapTVar msgSentAuth 0
msgSentQuota' <- atomically $ swapTVar msgSentQuota 0
msgSentLarge' <- atomically $ swapTVar msgSentLarge 0
msgRecv' <- atomically $ swapTVar msgRecv 0
msgExpired' <- atomically $ swapTVar msgExpired 0
ps <- atomically $ periodStatCounts activeQueues ts
Expand Down Expand Up @@ -281,7 +288,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
<> showProxyStats pRelaysOwn'
<> showProxyStats pMsgFwds'
<> showProxyStats pMsgFwdsOwn'
<> [show pMsgFwdsRecv']
<> [ show pMsgFwdsRecv',
show qSub',
show qSubAuth',
show qSubDuplicate',
show qSubProhibited',
show msgSentAuth',
show msgSentQuota',
show msgSentLarge'
]
)
liftIO $ threadDelay' interval
where
Expand Down Expand Up @@ -504,19 +519,25 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
forever $ do
ts <- L.toList <$> liftIO (tGet h)
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
(errs, cmds) <- partitionEithers <$> mapM cmdAction ts
stats <- asks serverStats
(errs, cmds) <- partitionEithers <$> mapM (cmdAction stats) ts
write sndQ errs
write rcvQ cmds
where
cmdAction :: SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
cmdAction (tAuth, authorized, (corrId, entId, cmdOrError)) =
cmdAction :: ServerStats -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
cmdAction stats (tAuth, authorized, (corrId, entId, cmdOrError)) =
case cmdOrError of
Left e -> pure $ Left (corrId, entId, ERR e)
Right cmd -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd
Right cmd -> verified =<< verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd
where
verified = \case
VRVerified qr -> Right (qr, (corrId, entId, cmd))
VRFailed -> Left (corrId, entId, ERR AUTH)
VRVerified qr -> pure $ Right (qr, (corrId, entId, cmd))
VRFailed -> do
case cmd of
Cmd _ SEND {} -> atomically $ modifyTVar' (msgSentAuth stats) (+ 1)
Cmd _ SUB -> atomically $ modifyTVar' (qSubAuth stats) (+ 1)
_ -> pure ()
pure $ Left (corrId, entId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty

send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
Expand Down Expand Up @@ -856,15 +877,19 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi

subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg)
subscribeQueue qr rId = do
stats <- asks serverStats
atomically (TM.lookup rId subscriptions) >>= \case
Nothing ->
Nothing -> do
atomically $ modifyTVar' (qSub stats) (+ 1)
newSub >>= deliver
Just sub ->
readTVarIO sub >>= \case
Sub {subThread = ProhibitSub} ->
Sub {subThread = ProhibitSub} -> do
-- cannot use SUB in the same connection where GET was used
atomically $ modifyTVar' (qSubProhibited stats) (+ 1)
pure (corrId, rId, ERR $ CMD PROHIBITED)
s ->
s -> do
atomically $ modifyTVar' (qSubDuplicate stats) (+ 1)
atomically (tryTakeTMVar $ delivered s) >> deliver sub
where
newSub :: M (TVar Sub)
Expand Down Expand Up @@ -958,29 +983,37 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi

sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg)
sendMessage qr msgFlags msgBody
| B.length msgBody > maxMessageLength thVersion = pure $ err LARGE_MSG
| otherwise = case status qr of
QueueOff -> return $ err AUTH
QueueActive ->
case C.maxLenBS msgBody of
Left _ -> pure $ err LARGE_MSG
Right body -> do
msg_ <- time "SEND" $ do
q <- getStoreMsgQueue "SEND" $ recipientId qr
expireMessages q
atomically . writeMsg q =<< mkMessage body
case msg_ of
Nothing -> pure $ err QUOTA
Just msg -> time "SEND ok" $ do
stats <- asks serverStats
when (notification msgFlags) $ do
atomically . trySendNotification msg =<< asks random
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
pure ok
| B.length msgBody > maxMessageLength thVersion = do
stats <- asks serverStats
atomically $ modifyTVar' (msgSentLarge stats) (+ 1)
pure $ err LARGE_MSG
| otherwise = do
stats <- asks serverStats
case status qr of
QueueOff -> do
atomically $ modifyTVar' (msgSentAuth stats) (+ 1)
pure $ err AUTH
QueueActive ->
case C.maxLenBS msgBody of
Left _ -> pure $ err LARGE_MSG
Right body -> do
msg_ <- time "SEND" $ do
q <- getStoreMsgQueue "SEND" $ recipientId qr
expireMessages q
atomically . writeMsg q =<< mkMessage body
case msg_ of
Nothing -> do
atomically $ modifyTVar' (msgSentQuota stats) (+ 1)
pure $ err QUOTA
Just msg -> time "SEND ok" $ do
when (notification msgFlags) $ do
atomically . trySendNotification msg =<< asks random
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
pure ok
where
THandleParams {thVersion} = thParams'
mkMessage :: C.MaxLenBS MaxMessageLen -> M Message
Expand Down
66 changes: 58 additions & 8 deletions src/Simplex/Messaging/Server/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ data ServerStats = ServerStats
qDeletedAll :: TVar Int,
qDeletedNew :: TVar Int,
qDeletedSecured :: TVar Int,
qSub :: TVar Int,
qSubAuth :: TVar Int,
qSubDuplicate :: TVar Int,
qSubProhibited :: TVar Int,
msgSent :: TVar Int,
msgSentAuth :: TVar Int,
msgSentQuota :: TVar Int,
msgSentLarge :: TVar Int,
msgRecv :: TVar Int,
msgExpired :: TVar Int,
activeQueues :: PeriodStats RecipientId,
Expand All @@ -49,7 +56,14 @@ data ServerStatsData = ServerStatsData
_qDeletedAll :: Int,
_qDeletedNew :: Int,
_qDeletedSecured :: Int,
_qSub :: Int,
_qSubAuth :: Int,
_qSubDuplicate :: Int,
_qSubProhibited :: Int,
_msgSent :: Int,
_msgSentAuth :: Int,
_msgSentQuota :: Int,
_msgSentLarge :: Int,
_msgRecv :: Int,
_msgExpired :: Int,
_activeQueues :: PeriodStatsData RecipientId,
Expand All @@ -74,7 +88,14 @@ newServerStats ts = do
qDeletedAll <- newTVar 0
qDeletedNew <- newTVar 0
qDeletedSecured <- newTVar 0
qSub <- newTVar 0
qSubAuth <- newTVar 0
qSubDuplicate <- newTVar 0
qSubProhibited <- newTVar 0
msgSent <- newTVar 0
msgSentAuth <- newTVar 0
msgSentQuota <- newTVar 0
msgSentLarge <- newTVar 0
msgRecv <- newTVar 0
msgExpired <- newTVar 0
activeQueues <- newPeriodStats
Expand All @@ -88,7 +109,7 @@ newServerStats ts = do
pMsgFwdsRecv <- newTVar 0
qCount <- newTVar 0
msgCount <- newTVar 0
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount}
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, qSub, qSubAuth, qSubDuplicate, qSubProhibited, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount}

getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
Expand All @@ -98,7 +119,14 @@ getServerStatsData s = do
_qDeletedAll <- readTVar $ qDeletedAll s
_qDeletedNew <- readTVar $ qDeletedNew s
_qDeletedSecured <- readTVar $ qDeletedSecured s
_qSub <- readTVar $ qSub s
_qSubAuth <- readTVar $ qSubAuth s
_qSubDuplicate <- readTVar $ qSubDuplicate s
_qSubProhibited <- readTVar $ qSubProhibited s
_msgSent <- readTVar $ msgSent s
_msgSentAuth <- readTVar $ msgSentAuth s
_msgSentQuota <- readTVar $ msgSentQuota s
_msgSentLarge <- readTVar $ msgSentLarge s
_msgRecv <- readTVar $ msgRecv s
_msgExpired <- readTVar $ msgExpired s
_activeQueues <- getPeriodStatsData $ activeQueues s
Expand All @@ -112,7 +140,7 @@ getServerStatsData s = do
_pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s
_qCount <- readTVar $ qCount s
_msgCount <- readTVar $ msgCount s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount}
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount}

setServerStats :: ServerStats -> ServerStatsData -> STM ()
setServerStats s d = do
Expand All @@ -122,7 +150,14 @@ setServerStats s d = do
writeTVar (qDeletedAll s) $! _qDeletedAll d
writeTVar (qDeletedNew s) $! _qDeletedNew d
writeTVar (qDeletedSecured s) $! _qDeletedSecured d
writeTVar (qSub s) $! _qSub d
writeTVar (qSubAuth s) $! _qSubAuth d
writeTVar (qSubDuplicate s) $! _qSubDuplicate d
writeTVar (qSubProhibited s) $! _qSubProhibited d
writeTVar (msgSent s) $! _msgSent d
writeTVar (msgSentAuth s) $! _msgSentAuth d
writeTVar (msgSentQuota s) $! _msgSentQuota d
writeTVar (msgSentLarge s) $! _msgSentLarge d
writeTVar (msgRecv s) $! _msgRecv d
writeTVar (msgExpired s) $! _msgExpired d
setPeriodStats (activeQueues s) (_activeQueues d)
Expand All @@ -147,7 +182,14 @@ instance StrEncoding ServerStatsData where
"qDeletedNew=" <> strEncode (_qDeletedNew d),
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qCount=" <> strEncode (_qCount d),
"qSub=" <> strEncode (_qSub d),
"qSubAuth=" <> strEncode (_qSubAuth d),
"qSubDuplicate=" <> strEncode (_qSubDuplicate d),
"qSubProhibited=" <> strEncode (_qSubProhibited d),
"msgSent=" <> strEncode (_msgSent d),
"msgSentAuth=" <> strEncode (_msgSentAuth d),
"msgSentQuota=" <> strEncode (_msgSentQuota d),
"msgSentLarge=" <> strEncode (_msgSentLarge d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgExpired=" <> strEncode (_msgExpired d),
"msgSentNtf=" <> strEncode (_msgSentNtf d),
Expand All @@ -173,12 +215,19 @@ instance StrEncoding ServerStatsData where
(_qDeletedAll, _qDeletedNew, _qDeletedSecured) <-
(,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine)
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
_qCount <- "qCount=" *> strP <* A.endOfLine <|> pure 0
_qCount <- opt "qCount="
_qSub <- opt "qSub="
_qSubAuth <- opt "qSubAuth="
_qSubDuplicate <- opt "qSubDuplicate="
_qSubProhibited <- opt "qSubProhibited="
_msgSent <- "msgSent=" *> strP <* A.endOfLine
_msgSentAuth <- opt "msgSentAuth="
_msgSentQuota <- opt "msgSentQuota="
_msgSentLarge <- opt "msgSentLarge="
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_msgExpired <- "msgExpired=" *> strP <* A.endOfLine <|> pure 0
_msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0
_msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0
_msgExpired <- opt "msgExpired="
_msgSentNtf <- opt "msgSentNtf="
_msgRecvNtf <- opt "msgRecvNtf="
_activeQueues <-
optional ("activeQueues:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
Expand All @@ -195,9 +244,10 @@ instance StrEncoding ServerStatsData where
_pRelaysOwn <- proxyStatsP "pRelaysOwn:"
_pMsgFwds <- proxyStatsP "pMsgFwds:"
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
_pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0}
_pMsgFwdsRecv <- opt "pMsgFwdsRecv="
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _qSub, _qSubAuth, _qSubDuplicate, _qSubProhibited, _msgSent, _msgSentAuth, _msgSentQuota, _msgSentLarge, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0}
where
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
proxyStatsP key =
optional (A.string key >> A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
Expand Down
6 changes: 3 additions & 3 deletions tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) =

logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 45
logSize testServerStatsBackupFile `shouldReturn` 52
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1

Expand All @@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 45
logSize testServerStatsBackupFile `shouldReturn` 52
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3

Expand All @@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) =

logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 45
logSize testServerStatsBackupFile `shouldReturn` 52
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5

Expand Down

0 comments on commit e55ec07

Please sign in to comment.