From dbc6ae2a478441992fb247cb12deceeb601d65ed Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 10 May 2024 18:09:49 +0300 Subject: [PATCH 01/13] WIP: command rate monitoring --- package.yaml | 2 + simplexmq.cabal | 14 ++++ src/Simplex/Messaging/Server.hs | 46 ++++++++++- src/Simplex/Messaging/Server/Env/STM.hs | 23 ++++-- src/Simplex/Messaging/Server/Main.hs | 3 + src/Simplex/Messaging/Server/Stats.hs | 81 ++++++++++++++++++- src/Simplex/Messaging/Transport.hs | 36 +++++++-- src/Simplex/Messaging/Transport/Server.hs | 5 +- src/Simplex/Messaging/Transport/WebSockets.hs | 15 ++-- tests/SMPClient.hs | 3 + 10 files changed, 207 insertions(+), 21 deletions(-) diff --git a/package.yaml b/package.yaml index 02a088e23..2caefad0a 100644 --- a/package.yaml +++ b/package.yaml @@ -45,6 +45,7 @@ dependencies: - direct-sqlcipher == 2.3.* - directory == 1.3.* - filepath == 1.4.* + - hashable == 1.4.* - hourglass == 0.2.* - http-types == 0.12.* - http2 >= 4.2.2 && < 4.3 @@ -59,6 +60,7 @@ dependencies: - network-udp >= 0.0 && < 0.1 - optparse-applicative >= 0.15 && < 0.17 - process == 1.6.* + - psqueues == 0.2.8.* - random >= 1.1 && < 1.3 - simple-logger == 0.1.* - socks == 0.6.* diff --git a/simplexmq.cabal b/simplexmq.cabal index 3366cb0b8..f8dd2d8c6 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -204,6 +204,7 @@ library , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -218,6 +219,7 @@ library , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , socks ==0.6.* @@ -278,6 +280,7 @@ executable ntf-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -292,6 +295,7 @@ executable ntf-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -353,6 +357,7 @@ executable smp-agent , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -367,6 +372,7 @@ executable smp-agent , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -428,6 +434,7 @@ executable smp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -442,6 +449,7 @@ executable smp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -503,6 +511,7 @@ executable xftp , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -517,6 +526,7 @@ executable xftp , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -578,6 +588,7 @@ executable xftp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -592,6 +603,7 @@ executable xftp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -689,6 +701,7 @@ test-suite simplexmq-test , directory ==1.3.* , filepath ==1.4.* , generic-random ==1.5.* + , hashable ==1.4.* , hourglass ==0.2.* , hspec ==2.11.* , hspec-core ==2.11.* @@ -706,6 +719,7 @@ test-suite simplexmq-test , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , silently ==1.2.* , simple-logger ==0.1.* diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a09759814..d62e79987 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -128,7 +128,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do raceAny_ ( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) - : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg + : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg ) `finally` withLock' (savingLock s) "final" (saveServer False) where @@ -205,6 +205,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] + rateStatsThread_ :: ServerConfig -> [M ()] + rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = + [ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while + ] + rateStatsThread_ _ = [] + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do labelMyThread "logServerStats" @@ -257,6 +264,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ] liftIO $ threadDelay' interval + monitorServerRates :: Int64 -> M () + monitorServerRates bucketWidth = do + labelMyThread "monitorServerRates" + forever $ do + -- TODO: calculate delay for the next bucket closing time + liftIO $ threadDelay' bucketWidth + -- TODO: collect and reset buckets + + logServerRates :: Int64 -> Int64 -> FilePath -> M () + logServerRates startAt logInterval statsFilePath = do + labelMyThread "logServerStats" + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + let interval = 1000000 * logInterval + forever $ do + -- write the thing + liftIO $ threadDelay' interval + runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do kh <- asks serverIdentity @@ -411,13 +437,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "AUTH" runClientTransport :: Transport c => THandleSMP c 'TServer -> M () -runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do +runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients nextClientId <- asks clientSeq c <- atomically $ do - new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts + new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -643,6 +669,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv where createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode = time "NEW" $ do + -- TODO: read client Q rate + -- TODO: read server Q rate for peerId + -- TODO: read global server Q rate + -- TODO: add throttling delay/blackhole request if needed (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -673,6 +703,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) + -- TODO: increment client Q counter + -- TODO: increment current Q counter in IP timeline + -- TODO: increment current Q counter in server timeline case subMode of SMOnlyCreate -> pure () SMSubscribe -> void $ subscribeQueue qr rId @@ -835,6 +868,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv case C.maxLenBS msgBody of Left _ -> pure $ err LARGE_MSG Right body -> do + -- TODO: read client S rate + -- TODO: read server S rate for peerId + -- TODO: read global server S rate + -- TODO: add throttling delay/blackhole request if needed msg_ <- time "SEND" $ do q <- getStoreMsgQueue "SEND" $ recipientId qr expireMessages q @@ -850,6 +887,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) + -- TODO: increment client S counter + -- TODO: increment current S counter in IP timeline + -- TODO: increment current S counter in server timeline pure ok where mkMessage :: C.MaxLenBS MaxMessageLen -> M Message diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index bd8262f07..74017346c 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -16,6 +16,7 @@ import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -33,7 +34,7 @@ import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP) +import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP) import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState) import System.IO (IOMode (..)) import System.Mem.Weak (Weak) @@ -70,6 +71,10 @@ data ServerConfig = ServerConfig serverStatsLogFile :: FilePath, -- | file to save and restore stats serverStatsBackupFile :: Maybe FilePath, + -- | rate limit monitoring interval / bucket width, seconds + rateStatsInterval :: Maybe Int64, + rateStatsLogFile :: FilePath, + rateStatsBackupFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -109,6 +114,8 @@ data Env = Env storeLog :: Maybe (StoreLog 'WriteMode), tlsServerParams :: T.ServerParams, serverStats :: ServerStats, + qCreatedByIp :: Timeline, + msgSentByIp :: Timeline, sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) @@ -124,6 +131,8 @@ data Server = Server data Client = Client { clientId :: Int, + peerId :: PeerId, -- send updates for this Id to time series + clientStats :: ClientStats, -- capture final values on disconnect subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), @@ -155,8 +164,8 @@ newServer = do savingLock <- createLock return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock} -newClient :: TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client -newClient nextClientId qSize thVersion sessionId createdAt = do +newClient :: PeerId -> TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client +newClient peerId nextClientId qSize thVersion sessionId createdAt = do clientId <- stateTVar nextClientId $ \next -> (next, next + 1) subscriptions <- TM.empty ntfSubscriptions <- TM.empty @@ -168,7 +177,8 @@ newClient nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + clientStats <- ClientStats <$> newTVar 0 <*> newTVar 0 + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do @@ -189,7 +199,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, sockets <- atomically newSocketState clientSeq <- newTVarIO 0 clients <- newTVarIO mempty - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients} + now <- getPOSIXTime + qCreatedByIp <- atomically $ newTimeline perMinute now + msgSentByIp <- atomically $ newTimeline perMinute now + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index d75d02812..6b6d555f3 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -208,6 +208,9 @@ smpServerCLI cfgPath logPath = logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", + rateStatsInterval = Just 60, -- TODO: add to options + rateStatsLogFile = combine logPath "smp-server-rates.daily.log", + rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log", smpServerVRange = supportedServerSMPRelayVRange, transportConfig = defaultTransportServerConfig diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 0b4c677c2..7a80715b3 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -10,11 +10,18 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP +import Data.Monoid (getSum) import Data.Set (Set) import qualified Data.Set as S import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (UTCTime (..)) +import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Word (Word32) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM @@ -231,3 +238,75 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) + +data ClientStats = ClientStats + { qCreatedClient :: TVar Int, + msgSentClient :: TVar Int + } + +-- may be combined with session duration to produce average rates (q/s, msg/s) +data ClientStatsData = ClientStatsData + { _qCreatedClient :: Int, + _msgSentClient :: Int + } + +type Timeline = (TVar SparseSeries, Current) + +newTimeline :: QuantFun -> POSIXTime -> STM Timeline +newTimeline quantF now = (,current) <$> newTVar IP.empty + where + current :: Current + current = (quantF, quantF now, mempty) + +-- Sparse timeseries with 1 second resolution (or more coarse): +-- priotity - time/bucket +-- key -- PeerId +-- value -- final counter value of the bucket that was current +-- May be combined with bucket width to produce rolling rates. +type SparseSeries = IntPSQ BucketId Int + +-- POSIXTime, or quantized +type BucketId = Word32 + +type QuantFun = POSIXTime -> BucketId + +-- Current bucket that gets filled +type Current = (QuantFun, BucketId, IntMap (TVar Int)) + +perSecond :: POSIXTime -> BucketId +perSecond = truncate + +perMinute :: POSIXTime -> BucketId +perMinute = (60 `secondsWidth`) + +secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId +secondsWidth w t = truncate $ t / w + +finishCurrent :: POSIXTime -> Timeline -> STM Timeline +finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" + +type WindowData = IntMap Int -- PeerId -> counter + +window :: BucketId -> BucketId -> SparseSeries -> WindowData +window = error "TODO: pick elements inside the range and drop bucket ids" + +-- counter -> occurences +type Histogram = IntMap Int + +histogram :: WindowData -> Histogram +histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems + +distribution :: Histogram -> Distribution Int +distribution = error "TODO: unroll histogram, sample elements at percentiles" + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show) diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index e1d383b5a..90c19ab17 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -54,6 +54,9 @@ module Simplex.Messaging.Transport ATransport (..), TransportPeer (..), getServerVerifyKey, + PeerId, + clientPeerId, + addrPeerId, -- * TLS Transport TLS (..), @@ -95,12 +98,14 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Default (def) import Data.Functor (($>)) +import Data.Hashable (hash) import Data.Version (showVersion) import Data.Word (Word16) import qualified Data.X509 as X import qualified Data.X509.Validation as XV import GHC.IO.Handle.Internals (ioe_EOF) import Network.Socket +import qualified Network.Socket.Address as SA import qualified Network.TLS as T import qualified Network.TLS.Extra as TE import qualified Paths_simplexmq as SMQ @@ -196,13 +201,15 @@ class Transport c where transportConfig :: c -> TransportConfig -- | Upgrade server TLS context to connection (used in the server) - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO c -- | Upgrade client TLS context to connection (used in the client) getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c getServerCerts :: c -> X.CertificateChain + getPeerId :: c -> PeerId + -- | tls-unique channel binding per RFC5929 tlsUnique :: c -> SessionId @@ -243,6 +250,7 @@ getServerVerifyKey c = data TLS = TLS { tlsContext :: T.Context, tlsPeer :: TransportPeer, + tlsPeerId :: PeerId, tlsUniq :: ByteString, tlsBuffer :: TBuffer, tlsALPN :: Maybe ALPN, @@ -261,13 +269,13 @@ connectTLS host_ TransportConfig {logTLSErrors} params sock = logThrow e = putStrLn ("TLS error" <> host <> ": " <> show e) >> E.throwIO e host = maybe "" (\h -> " (" <> h <> ")") host_ -getTLS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS -getTLS tlsPeer cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS +getTLS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS +getTLS tlsPeer tlsPeerId cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS where newTLS tlsUniq = do tlsBuffer <- atomically newTBuffer tlsALPN <- T.getNegotiatedProtocol cxt - pure TLS {tlsContext = cxt, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} + pure TLS {tlsContext = cxt, tlsPeerId, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} withTlsUnique :: TransportPeer -> T.Context -> (ByteString -> IO c) -> IO c withTlsUnique peer cxt f = @@ -301,7 +309,8 @@ instance Transport TLS where transportPeer = tlsPeer transportConfig = tlsTransportConfig getServerConnection = getTLS TServer - getClientConnection = getTLS TClient + getClientConnection = getTLS TClient 0 + getPeerId = tlsPeerId getServerCerts = tlsServerCerts getSessionALPN = tlsALPN tlsUnique = tlsUniq @@ -545,6 +554,23 @@ smpTHandle c = THandle {connection = c, params} where params = THandleParams {sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = VersionSMP 0, thAuth = Nothing, implySessId = False, batch = True} +-- | Stats key, hashed from IPs, circuits etc. We don't want to keep actual identities, just attach counters to them. +type PeerId = Int -- XXX: perhaps more fields needed if we want subnet escalation + +clientPeerId :: Socket -> IO PeerId +clientPeerId = fmap addrPeerId . SA.getPeerName + +addrPeerId :: SockAddr -> PeerId +addrPeerId peer = hash peer6 -- XXX: for extra paranoia can be salted with a seed randomized on server start + where + -- ingore ports and fluff, normalize to ipv6 address space + -- most of the ipv6 is unused as clients get /64 subnets for a few IPs, so 128bit to 64bit hashing is ok for using as intmap keys + peer6 = case peer of + SockAddrInet _port a -> embed4in6 a + SockAddrInet6 _port _flow a _scope -> a + SockAddrUnix _path -> error "use for TOR circuits?" + embed4in6 v4 = (0, 0, 0xFFFF, v4) -- RFC4038: the IPv6 address ::FFFF:x.y.z.w represents the IPv4 address x.y.z.w. + $(J.deriveJSON (sumTypeJSON id) ''HandshakeError) $(J.deriveJSON (sumTypeJSON $ dropPrefix "TE") ''TransportError) diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 145b438e0..744664c80 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -19,7 +19,7 @@ module Simplex.Messaging.Transport.Server loadTLSServerParams, loadFingerprint, smpServerHandshake, - tlsServerCredentials + tlsServerCredentials, ) where @@ -95,8 +95,9 @@ runTransportServerSocketState ss started getSocket threadLabel serverParams cfg tCfg = serverTransportConfig cfg setup conn = timeout (tlsSetupTimeout cfg) $ do labelMyThread $ threadLabel <> "/setup" + peerId <- clientPeerId conn tls <- connectTLS Nothing tCfg serverParams conn - getServerConnection tCfg (fst $ tlsServerCredentials serverParams) tls + getServerConnection peerId tCfg (fst $ tlsServerCredentials serverParams) tls tlsServerCredentials :: T.ServerParams -> (X.CertificateChain, X.PrivKey) tlsServerCredentials serverParams = case T.sharedCredentials $ T.serverShared serverParams of diff --git a/src/Simplex/Messaging/Transport/WebSockets.hs b/src/Simplex/Messaging/Transport/WebSockets.hs index 0883fcc28..817cc3796 100644 --- a/src/Simplex/Messaging/Transport/WebSockets.hs +++ b/src/Simplex/Messaging/Transport/WebSockets.hs @@ -20,6 +20,7 @@ import Simplex.Messaging.Transport TransportConfig (..), TransportError (..), TransportPeer (..), + PeerId, closeTLS, smpBlockSize, withTlsUnique, @@ -28,6 +29,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) data WS = WS { wsPeer :: TransportPeer, + wsPeerId :: PeerId, tlsUniq :: ByteString, wsALPN :: Maybe ALPN, wsStream :: Stream, @@ -54,11 +56,14 @@ instance Transport WS where transportConfig :: WS -> TransportConfig transportConfig = wsTransportConfig - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS getServerConnection = getWS TServer getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS - getClientConnection = getWS TClient + getClientConnection = getWS TClient 0 + + getPeerId :: WS -> PeerId + getPeerId = wsPeerId getServerCerts :: WS -> X.CertificateChain getServerCerts = wsServerCerts @@ -89,14 +94,14 @@ instance Transport WS where then E.throwIO TEBadBlock else pure $ B.init s -getWS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS -getWS wsPeer cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS +getWS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS +getWS wsPeer wsPeerId cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS where connectWS tlsUniq = do s <- makeTLSContextStream cxt wsConnection <- connectPeer wsPeer s wsALPN <- T.getNegotiatedProtocol cxt - pure $ WS {wsPeer, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} + pure $ WS {wsPeer, wsPeerId, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} connectPeer :: TransportPeer -> Stream -> IO Connection connectPeer TServer = acceptClientRequest connectPeer TClient = sendClientRequest diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index ae9baeb3c..7cb00de83 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -107,6 +107,9 @@ cfg = logStatsStartTime = 0, serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, + rateStatsInterval = Nothing, + rateStatsLogFile = "", + rateStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt", From 2fcc8d0107b7ad8858612000ca97b31f500c7a72 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Mon, 13 May 2024 16:35:19 +0300 Subject: [PATCH 02/13] expand ClientStats --- simplexmq.cabal | 2 + src/Simplex/Messaging/Server/Env/STM.hs | 6 +- src/Simplex/Messaging/Server/Stats.hs | 72 ------------ src/Simplex/Messaging/Server/Stats/Client.hs | 110 ++++++++++++++++++ .../Messaging/Server/Stats/Timeline.hs | 88 ++++++++++++++ 5 files changed, 204 insertions(+), 74 deletions(-) create mode 100644 src/Simplex/Messaging/Server/Stats/Client.hs create mode 100644 src/Simplex/Messaging/Server/Stats/Timeline.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index f8dd2d8c6..27c59e5dc 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -145,6 +145,8 @@ library Simplex.Messaging.Server.QueueStore Simplex.Messaging.Server.QueueStore.STM Simplex.Messaging.Server.Stats + Simplex.Messaging.Server.Stats.Client + Simplex.Messaging.Server.Stats.Timeline Simplex.Messaging.Server.StoreLog Simplex.Messaging.ServiceScheme Simplex.Messaging.Session diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 74017346c..6ab1b1baa 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -17,7 +17,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime) -import Data.Time.Clock.System (SystemTime) +import Data.Time.Clock.System (SystemTime, systemToUTCTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) import qualified Network.TLS as T @@ -31,6 +31,8 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Server.Stats.Client (ClientStats, newClientStats) +import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -177,7 +179,7 @@ newClient peerId nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - clientStats <- ClientStats <$> newTVar 0 <*> newTVar 0 + clientStats <- newClientStats newTVar (systemToUTCTime createdAt) return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats} newSubscription :: SubscriptionThread -> STM Sub diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 7a80715b3..53f977f8f 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -238,75 +238,3 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) - -data ClientStats = ClientStats - { qCreatedClient :: TVar Int, - msgSentClient :: TVar Int - } - --- may be combined with session duration to produce average rates (q/s, msg/s) -data ClientStatsData = ClientStatsData - { _qCreatedClient :: Int, - _msgSentClient :: Int - } - -type Timeline = (TVar SparseSeries, Current) - -newTimeline :: QuantFun -> POSIXTime -> STM Timeline -newTimeline quantF now = (,current) <$> newTVar IP.empty - where - current :: Current - current = (quantF, quantF now, mempty) - --- Sparse timeseries with 1 second resolution (or more coarse): --- priotity - time/bucket --- key -- PeerId --- value -- final counter value of the bucket that was current --- May be combined with bucket width to produce rolling rates. -type SparseSeries = IntPSQ BucketId Int - --- POSIXTime, or quantized -type BucketId = Word32 - -type QuantFun = POSIXTime -> BucketId - --- Current bucket that gets filled -type Current = (QuantFun, BucketId, IntMap (TVar Int)) - -perSecond :: POSIXTime -> BucketId -perSecond = truncate - -perMinute :: POSIXTime -> BucketId -perMinute = (60 `secondsWidth`) - -secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId -secondsWidth w t = truncate $ t / w - -finishCurrent :: POSIXTime -> Timeline -> STM Timeline -finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" - -type WindowData = IntMap Int -- PeerId -> counter - -window :: BucketId -> BucketId -> SparseSeries -> WindowData -window = error "TODO: pick elements inside the range and drop bucket ids" - --- counter -> occurences -type Histogram = IntMap Int - -histogram :: WindowData -> Histogram -histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems - -distribution :: Histogram -> Distribution Int -distribution = error "TODO: unroll histogram, sample elements at percentiles" - -data Distribution a = Distribution - { minimal :: a, - bottom50p :: a, - top50p :: a, - top20p :: a, - top10p :: a, - top5p :: a, - top1p :: a, - maximal :: a - } - deriving (Show) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs new file mode 100644 index 000000000..22a29a5d2 --- /dev/null +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -0,0 +1,110 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} + +module Simplex.Messaging.Server.Stats.Client where + +import Control.Applicative (optional, (<|>)) +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP +import Data.Monoid (getSum) +import Data.Set (Set) +import qualified Data.Set as S +import Data.Time.Calendar.Month (pattern MonthDay) +import Data.Time.Calendar.OrdinalDate (mondayStartWeek) +import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Word (Word32) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.Transport (PeerId) +import UnliftIO.STM + +data ClientStats = ClientStats + { peerAddresses :: TVar (Set PeerId), + socketCount :: TVar Int, + createdAt :: TVar UTCTime, + updatedAt :: TVar UTCTime, + qCreated :: TVar (Set RecipientId), + qSentSigned :: TVar (Set RecipientId), + msgSentSigned :: TVar Int, + msgSentUnsigned :: TVar Int, + msgSentViaProxy :: TVar Int, + msgDeliveredSigned :: TVar Int + } + +-- may be combined with session duration to produce average rates (q/s, msg/s) +data ClientStatsData = ClientStatsData + { _peerAddresses :: Set PeerId, + _socketCount :: Int, + _createdAt :: UTCTime, + _updatedAt :: UTCTime, + _qCreated :: Set RecipientId, + _qSentSigned :: Set RecipientId, + _msgSentSigned :: Int, + _msgSentUnsigned :: Int, + _msgSentViaProxy :: Int, + _msgDeliveredSigned :: Int + } + +newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> UTCTime -> m ClientStats +newClientStats newF ts = do + peerAddresses <- newF mempty + socketCount <- newF 0 + createdAt <- newF ts + updatedAt <- newF ts + qCreated <- newF mempty + qSentSigned <- newF mempty + msgSentSigned <- newF 0 + msgSentUnsigned <- newF 0 + msgSentViaProxy <- newF 0 + msgDeliveredSigned <- newF 0 + pure + ClientStats + { peerAddresses, + socketCount, + createdAt, + updatedAt, + qCreated, + qSentSigned, + msgSentSigned, + msgSentUnsigned, + msgSentViaProxy, + msgDeliveredSigned + } +{-# INLINE newClientStats #-} + +readClientStatsData :: Monad m => (forall a. TVar a -> m a) -> ClientStats -> m ClientStatsData +readClientStatsData readF cs = do + _peerAddresses <- readF $ peerAddresses cs + _socketCount <- readF $ socketCount cs + _createdAt <- readF $ createdAt cs + _updatedAt <- readF $ updatedAt cs + _qCreated <- readF $ qCreated cs + _qSentSigned <- readF $ qSentSigned cs + _msgSentSigned <- readF $ msgSentSigned cs + _msgSentUnsigned <- readF $ msgSentUnsigned cs + _msgSentViaProxy <- readF $ msgSentViaProxy cs + _msgDeliveredSigned <- readF $ msgDeliveredSigned cs + pure + ClientStatsData + { _peerAddresses, + _socketCount, + _createdAt, + _updatedAt, + _qCreated, + _qSentSigned, + _msgSentSigned, + _msgSentUnsigned, + _msgSentViaProxy, + _msgDeliveredSigned + } +{-# INLINE readClientStatsData #-} diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs new file mode 100644 index 000000000..112823f4e --- /dev/null +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -0,0 +1,88 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} + +module Simplex.Messaging.Server.Stats.Timeline where + +import Control.Applicative (optional, (<|>)) +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP +import Data.Monoid (getSum) +import Data.Set (Set) +import qualified Data.Set as S +import Data.Time.Calendar.Month (pattern MonthDay) +import Data.Time.Calendar.OrdinalDate (mondayStartWeek) +import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Word (Word32) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (RecipientId) +import UnliftIO.STM + +type Timeline = (TVar SparseSeries, Current) + +newTimeline :: QuantFun -> POSIXTime -> STM Timeline +newTimeline quantF now = (,current) <$> newTVar IP.empty + where + current :: Current + current = (quantF, quantF now, mempty) + +-- Sparse timeseries with 1 second resolution (or more coarse): +-- priotity - time/bucket +-- key -- PeerId +-- value -- final counter value of the bucket that was current +-- May be combined with bucket width to produce rolling rates. +type SparseSeries = IntPSQ BucketId Int + +-- POSIXTime, or quantized +type BucketId = Word32 + +type QuantFun = POSIXTime -> BucketId + +-- Current bucket that gets filled +type Current = (QuantFun, BucketId, IntMap (TVar Int)) + +perSecond :: POSIXTime -> BucketId +perSecond = truncate + +perMinute :: POSIXTime -> BucketId +perMinute = (60 `secondsWidth`) + +secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId +secondsWidth w t = truncate $ t / w + +finishCurrent :: POSIXTime -> Timeline -> STM Timeline +finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" + +type WindowData = IntMap Int -- PeerId -> counter + +window :: BucketId -> BucketId -> SparseSeries -> WindowData +window = error "TODO: pick elements inside the range and drop bucket ids" + +-- counter -> occurences +type Histogram = IntMap Int + +histogram :: WindowData -> Histogram +histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems + +distribution :: Histogram -> Distribution Int +distribution = error "TODO: unroll histogram, sample elements at percentiles" + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show) From 3d9e5a501ecdc12358a551bbf4f714d7798dc93e Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Mon, 13 May 2024 22:34:33 +0300 Subject: [PATCH 03/13] draft SEND stats update --- src/Simplex/Messaging/Server.hs | 49 +++++++++++++++++-- src/Simplex/Messaging/Server/Env/STM.hs | 23 ++++++--- src/Simplex/Messaging/Server/Stats/Client.hs | 19 ++++--- .../Messaging/Server/Stats/Timeline.hs | 11 +++-- 4 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d62e79987..bda4a4572 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -80,6 +80,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM as QS import Simplex.Messaging.Server.Stats +import qualified Simplex.Messaging.Server.Stats.Client as CS import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -440,10 +441,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime + statsIds <- asks statsClients active <- asks clients nextClientId <- asks clientSeq + let peerId = getPeerId connection + skipStats = False -- TODO: check peerId c <- atomically $ do - new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts + new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts + unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -631,7 +636,7 @@ dummyKeyX25519 :: C.PublicKey 'C.X25519 dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg=" client :: Client -> Server -> M () -client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do +client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" forever $ atomically (readTBQueue rcvQ) @@ -887,11 +892,49 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - -- TODO: increment client S counter + + onwers' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) + statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + stats' <- asks clientStats -- TVar (IntMap ClientStats) + now <- liftIO getCurrentTime + atomically $ case senderKey qr of + Nothing -> do + -- unsecured queue, no merging + currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' + forM_ currentStatsId_ $ \statsId -> do + cs <- getClientStats stats' statsId now + -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions + modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _ -> do + -- secured queue, merging is possible + currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' + forM_ currentStatsId_ $ \currentStatsId -> do + owners <- readTVar onwers' + statsId <- forM (M.lookup (recipientId qr) owners) readTVar >>= \case + Just ownerId | ownerId == currentStatsId -> pure ownerId -- keep going + Just olderSessionId -> do + -- TODO: merge client stats + pure olderSessionId + -- olderSessionId <$ mergeClientStats owners olderSessionId currentStatsId + Nothing -> do -- claim queue ownership (should've happened on NEW instead) + newOwner <- newTVar currentStatsId + writeTVar onwers' $ M.insert (recipientId qr) newOwner owners + pure currentStatsId + cs <- getClientStats stats' statsId now + modifyTVar' (CS.msgSentSigned cs) (+ 1) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where + getClientStats stats' statsId now = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 6ab1b1baa..28792a0b1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -15,8 +15,10 @@ import qualified Data.IntMap.Strict as IM import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP import Data.Time.Clock (getCurrentTime) -import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) import Data.Time.Clock.System (SystemTime, systemToUTCTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -31,7 +33,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, newClientStats) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) @@ -116,8 +118,11 @@ data Env = Env storeLog :: Maybe (StoreLog 'WriteMode), tlsServerParams :: T.ServerParams, serverStats :: ServerStats, - qCreatedByIp :: Timeline, - msgSentByIp :: Timeline, + qCreatedByIp :: Timeline Int, + msgSentByIp :: Timeline Int, + clientStats :: TVar (IntMap ClientStats), -- transitive session stats + statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients + sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) @@ -134,7 +139,7 @@ data Server = Server data Client = Client { clientId :: Int, peerId :: PeerId, -- send updates for this Id to time series - clientStats :: ClientStats, -- capture final values on disconnect + -- socketStats :: ClientStats, -- TODO: measure and export histogram on disconnect subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), @@ -179,8 +184,7 @@ newClient peerId nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - clientStats <- newClientStats newTVar (systemToUTCTime createdAt) - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats} + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do @@ -204,7 +208,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, now <- getPOSIXTime qCreatedByIp <- atomically $ newTimeline perMinute now msgSentByIp <- atomically $ newTimeline perMinute now - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients} + clientStats <- newTVarIO mempty + statsClients <- newTVarIO mempty + sendSignedClients <- newTVarIO mempty + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, sockets, clientSeq, clients} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 22a29a5d2..d5badcaca 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -15,6 +15,8 @@ import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM import Data.IntPSQ (IntPSQ) import qualified Data.IntPSQ as IP +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS import Data.Monoid (getSum) import Data.Set (Set) import qualified Data.Set as S @@ -28,13 +30,16 @@ import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) import UnliftIO.STM +-- | Ephemeral client ID across reconnects +type ClientStatsId = Int + data ClientStats = ClientStats - { peerAddresses :: TVar (Set PeerId), + { peerAddresses :: TVar IntSet, -- cumulative set of used PeerIds socketCount :: TVar Int, createdAt :: TVar UTCTime, updatedAt :: TVar UTCTime, - qCreated :: TVar (Set RecipientId), - qSentSigned :: TVar (Set RecipientId), + qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs + qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, msgSentViaProxy :: TVar Int, @@ -43,7 +48,7 @@ data ClientStats = ClientStats -- may be combined with session duration to produce average rates (q/s, msg/s) data ClientStatsData = ClientStatsData - { _peerAddresses :: Set PeerId, + { _peerAddresses :: IntSet, _socketCount :: Int, _createdAt :: UTCTime, _updatedAt :: UTCTime, @@ -55,9 +60,9 @@ data ClientStatsData = ClientStatsData _msgDeliveredSigned :: Int } -newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> UTCTime -> m ClientStats -newClientStats newF ts = do - peerAddresses <- newF mempty +newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats +newClientStats newF peerId ts = do + peerAddresses <- newF $ IS.singleton peerId socketCount <- newF 0 createdAt <- newF ts updatedAt <- newF ts diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs index 112823f4e..77bf7ba97 100644 --- a/src/Simplex/Messaging/Server/Stats/Timeline.hs +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -26,12 +26,13 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM -type Timeline = (TVar SparseSeries, Current) +-- A time series of counters with an active head +type Timeline a = (TVar SparseSeries, Current a) -newTimeline :: QuantFun -> POSIXTime -> STM Timeline +newTimeline :: forall a. QuantFun -> POSIXTime -> STM (Timeline a) newTimeline quantF now = (,current) <$> newTVar IP.empty where - current :: Current + current :: Current a current = (quantF, quantF now, mempty) -- Sparse timeseries with 1 second resolution (or more coarse): @@ -47,7 +48,7 @@ type BucketId = Word32 type QuantFun = POSIXTime -> BucketId -- Current bucket that gets filled -type Current = (QuantFun, BucketId, IntMap (TVar Int)) +type Current a = (QuantFun, BucketId, IntMap (TVar a)) perSecond :: POSIXTime -> BucketId perSecond = truncate @@ -58,7 +59,7 @@ perMinute = (60 `secondsWidth`) secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId secondsWidth w t = truncate $ t / w -finishCurrent :: POSIXTime -> Timeline -> STM Timeline +finishCurrent :: POSIXTime -> Timeline a -> STM (Timeline a) finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" type WindowData = IntMap Int -- PeerId -> counter From e639a85bcc7d174ec5e654c01a87fa1d97d3cdc8 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Tue, 14 May 2024 21:49:27 +0300 Subject: [PATCH 04/13] stats merging on sendSigned --- src/Simplex/Messaging/Server.hs | 71 +++++++++++++------- src/Simplex/Messaging/Server/Stats/Client.hs | 28 ++++++++ 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bda4a4572..50cc14991 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -38,6 +38,7 @@ module Simplex.Messaging.Server ) where +import GHC.Conc (unsafeIOToSTM) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -52,11 +53,14 @@ import qualified Data.ByteString.Lazy.Char8 as LB import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) +import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M +import Data.Set (Set) +import qualified Data.Set as S import Data.Maybe (isNothing) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) @@ -893,39 +897,45 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - onwers' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) + logDebug $ "Senders gonna send..." + senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) stats' <- asks clientStats -- TVar (IntMap ClientStats) now <- liftIO getCurrentTime atomically $ case senderKey qr of - Nothing -> do + Nothing -> withClientStatId statIds' $ \statsId -> do -- unsecured queue, no merging - currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' - forM_ currentStatsId_ $ \statsId -> do - cs <- getClientStats stats' statsId now - -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions - modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _ -> do + cs <- getClientStats stats' statsId now + -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions + modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _ -> withClientStatId statIds' $ \currentStatsId -> do -- secured queue, merging is possible - currentStatsId_ <- IM.lookup clientId <$> readTVar statIds' - forM_ currentStatsId_ $ \currentStatsId -> do - owners <- readTVar onwers' - statsId <- forM (M.lookup (recipientId qr) owners) readTVar >>= \case - Just ownerId | ownerId == currentStatsId -> pure ownerId -- keep going - Just olderSessionId -> do - -- TODO: merge client stats - pure olderSessionId - -- olderSessionId <$ mergeClientStats owners olderSessionId currentStatsId - Nothing -> do -- claim queue ownership (should've happened on NEW instead) - newOwner <- newTVar currentStatsId - writeTVar onwers' $ M.insert (recipientId qr) newOwner owners - pure currentStatsId - cs <- getClientStats stats' statsId now - modifyTVar' (CS.msgSentSigned cs) (+ 1) + senders <- readTVar senders' + statsId <- case M.lookup (recipientId qr) senders of + Nothing -> do -- claim queue ownership (should've happened on NEW instead) + unsafeIOToSTM . logNote $ "Needs claiming: " <> tshow (strEncode $ recipientId qr, currentStatsId) + newOwner <- newTVar currentStatsId + writeTVar senders' $ M.insert (recipientId qr) newOwner senders + pure currentStatsId + Just owner -> do + prevStatsId <- readTVar owner + unless (prevStatsId == currentStatsId) $ do + unsafeIOToSTM . logNote $ "Needs merge: " <> tshow (currentStatsId, prevStatsId) + modifyTVar' statIds' $ IM.insert clientId prevStatsId + qsToUpdate <- mergeClientStats stats' prevStatsId currentStatsId + unsafeIOToSTM . logNote $ "Queues to transfer: " <> tshow (currentStatsId, prevStatsId, qsToUpdate) + unless (S.null qsToUpdate) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k owner os) senders qsToUpdate + pure prevStatsId + cs <- getClientStats stats' statsId now + modifyTVar' (CS.msgSentSigned cs) (+ 1) + unsafeIOToSTM . logWarn $ "msgSentSigned +1 for " <> tshow (clientId, currentStatsId, statsId) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where + -- missing clientId entry means the client is exempt from stats + withClientStatId statIds' action = readTVar statIds' >>= mapM_ action . IM.lookup clientId + getClientStats stats' statsId now = do stats <- readTVar stats' case IM.lookup statsId stats of @@ -935,6 +945,21 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd pure new Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated + mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index d5badcaca..717b30e63 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -113,3 +113,31 @@ readClientStatsData readF cs = do _msgDeliveredSigned } {-# INLINE readClientStatsData #-} + +writeClientStatsData :: ClientStats -> ClientStatsData -> STM () +writeClientStatsData cs csd = do + writeTVar (peerAddresses cs) (_peerAddresses csd) + writeTVar (socketCount cs) (_socketCount csd) + writeTVar (createdAt cs) (_createdAt csd) + writeTVar (updatedAt cs) (_updatedAt csd) + writeTVar (qCreated cs) (_qCreated csd) + writeTVar (qSentSigned cs) (_qSentSigned csd) + writeTVar (msgSentSigned cs) (_msgSentSigned csd) + writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd) + writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) + writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd) + +mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData +mergeClientStatsData a b = + ClientStatsData + { _peerAddresses = _peerAddresses a <> _peerAddresses b, + _socketCount = _socketCount a + _socketCount b, + _createdAt = min (_createdAt a) (_createdAt b), + _updatedAt = max (_updatedAt a) (_updatedAt b), + _qCreated = _qCreated a <> _qCreated b, + _qSentSigned = _qSentSigned a <> _qSentSigned b, + _msgSentSigned = _msgSentSigned a + _msgSentSigned b, + _msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b, + _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b, + _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b + } From ed88441cbc8f099db6a3b1cef0cc91635fff09e2 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 14:12:46 +0300 Subject: [PATCH 05/13] collect clientstats --- src/Simplex/Messaging/Server.hs | 97 +++++++++++--------- src/Simplex/Messaging/Server/Env/STM.hs | 4 +- src/Simplex/Messaging/Server/Stats/Client.hs | 20 +--- 3 files changed, 59 insertions(+), 62 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 50cc14991..63f6e2a3a 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -445,14 +445,14 @@ runClientTransport :: Transport c => THandleSMP c 'TServer -> M () runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime - statsIds <- asks statsClients active <- asks clients nextClientId <- asks clientSeq let peerId = getPeerId connection skipStats = False -- TODO: check peerId + statsIds' <- asks statsClients c <- atomically $ do new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts - unless skipStats $ modifyTVar' statsIds $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id + unless skipStats $ modifyTVar' statsIds' $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -477,6 +477,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT atomically $ modifyTVar' srvSubs $ \cs -> M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) + asks statsClients >>= atomically . (`modifyTVar'` IM.delete clientId) tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where @@ -712,7 +713,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) - -- TODO: increment client Q counter + + now <- liftIO getCurrentTime + statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + stats' <- asks clientStats -- TVar (IntMap ClientStats) + atomically $ withClientStatId statsIds' $ \statsId -> do + cs <- getClientStats stats' statsId now + modifyTVar' (CS.qCreated cs) $ S.insert rId -- TODO: increment current Q counter in IP timeline -- TODO: increment current Q counter in server timeline case subMode of @@ -867,6 +874,13 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd when (notification msgFlags) $ do atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) atomically $ updatePeriodStats (activeQueuesNtf stats) queueId + senders' <- asks sendSignedClients + stats' <- asks clientStats + atomically $ do + sender_ <- mapM readTVar =<< TM.lookup (recipientId qr) senders' + forM_ sender_ $ \statsId -> do + cs_ <- IM.lookup statsId <$> readTVar stats' + forM_ cs_ $ \cs -> modifyTVar' (CS.msgDeliveredSigned cs) (+ 1) sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -899,67 +913,37 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd logDebug $ "Senders gonna send..." senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) - statIds' <- asks statsClients -- TVar (IntMap ClientStatsId) + statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) stats' <- asks clientStats -- TVar (IntMap ClientStats) now <- liftIO getCurrentTime atomically $ case senderKey qr of - Nothing -> withClientStatId statIds' $ \statsId -> do + Nothing -> withClientStatId statsIds' $ \statsId -> do -- unsecured queue, no merging cs <- getClientStats stats' statsId now -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _ -> withClientStatId statIds' $ \currentStatsId -> do + Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do -- secured queue, merging is possible senders <- readTVar senders' statsId <- case M.lookup (recipientId qr) senders of - Nothing -> do -- claim queue ownership (should've happened on NEW instead) - unsafeIOToSTM . logNote $ "Needs claiming: " <> tshow (strEncode $ recipientId qr, currentStatsId) + Nothing -> do newOwner <- newTVar currentStatsId writeTVar senders' $ M.insert (recipientId qr) newOwner senders pure currentStatsId - Just owner -> do - prevStatsId <- readTVar owner + Just sender -> do + prevStatsId <- readTVar sender unless (prevStatsId == currentStatsId) $ do - unsafeIOToSTM . logNote $ "Needs merge: " <> tshow (currentStatsId, prevStatsId) - modifyTVar' statIds' $ IM.insert clientId prevStatsId - qsToUpdate <- mergeClientStats stats' prevStatsId currentStatsId - unsafeIOToSTM . logNote $ "Queues to transfer: " <> tshow (currentStatsId, prevStatsId, qsToUpdate) - unless (S.null qsToUpdate) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k owner os) senders qsToUpdate + modifyTVar' statsIds' $ IM.insert clientId prevStatsId + qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId + unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer pure prevStatsId cs <- getClientStats stats' statsId now + modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) modifyTVar' (CS.msgSentSigned cs) (+ 1) - unsafeIOToSTM . logWarn $ "msgSentSigned +1 for " <> tshow (clientId, currentStatsId, statsId) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok where - -- missing clientId entry means the client is exempt from stats - withClientStatId statIds' action = readTVar statIds' >>= mapM_ action . IM.lookup clientId - - getClientStats stats' statsId now = do - stats <- readTVar stats' - case IM.lookup statsId stats of - Nothing -> do - new <- CS.newClientStats newTVar peerId now - writeTVar stats' $ IM.insert statsId new stats - pure new - Just cs -> cs <$ writeTVar (CS.updatedAt cs) now - - mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) - mergeClientStats stats' prevId curId = do - stats <- readTVar stats' - case (IM.lookup prevId stats, IM.lookup curId stats) of - (_, Nothing) -> pure mempty - (Nothing, Just cur@CS.ClientStats {qCreated}) -> do - writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) - readTVar qCreated - (Just prev, Just cur) -> do - curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur - prevData <- CS.readClientStatsData readTVar prev - CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData - writeTVar stats' $ IM.delete curId stats - pure _qCreated - mkMessage :: C.MaxLenBS MaxMessageLen -> M Message mkMessage body = do msgId <- randomId =<< asks (msgIdBytes . config) @@ -1063,6 +1047,33 @@ client clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, snd okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok + -- missing clientId entry means the client is exempt from stats + withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId + + getClientStats stats' statsId now = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated + updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do stats <- asks serverStats diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 28792a0b1..3e4ab2c29 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -121,8 +121,8 @@ data Env = Env qCreatedByIp :: Timeline Int, msgSentByIp :: Timeline Int, clientStats :: TVar (IntMap ClientStats), -- transitive session stats - statsClients :: TVar (IntMap ClientStatsId), -- reverse index from active clients - sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their owners + statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets + sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders sockets :: SocketState, clientSeq :: TVar Int, clients :: TVar (IntMap Client) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 717b30e63..285e2b255 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -8,24 +8,10 @@ module Simplex.Messaging.Server.Stats.Client where -import Control.Applicative (optional, (<|>)) -import qualified Data.Attoparsec.ByteString.Char8 as A -import qualified Data.ByteString.Char8 as B -import Data.IntMap (IntMap) -import qualified Data.IntMap.Strict as IM -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP import Data.IntSet (IntSet) import qualified Data.IntSet as IS -import Data.Monoid (getSum) import Data.Set (Set) -import qualified Data.Set as S -import Data.Time.Calendar.Month (pattern MonthDay) -import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) -import Data.Time.Clock.POSIX (POSIXTime) -import Data.Word (Word32) -import Simplex.Messaging.Encoding.String +import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) import UnliftIO.STM @@ -38,11 +24,11 @@ data ClientStats = ClientStats socketCount :: TVar Int, createdAt :: TVar UTCTime, updatedAt :: TVar UTCTime, - qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs + qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs, for dumping into suspicous qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, - msgSentViaProxy :: TVar Int, + msgSentViaProxy :: TVar Int, -- TODO msgDeliveredSigned :: TVar Int } From e19f50b66c97e55991558453063efb68a15a167f Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 16:33:25 +0300 Subject: [PATCH 06/13] add proxy counters --- src/Simplex/Messaging/Server.hs | 140 ++++++++++--------- src/Simplex/Messaging/Server/Env/STM.hs | 8 +- src/Simplex/Messaging/Server/Stats/Client.hs | 42 ++++-- 3 files changed, 106 insertions(+), 84 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d7eb652ce..cfd05b87f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -680,8 +680,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth getRelay = do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysRequested cs) (+ 1) ProxyAgent {smpAgent} <- asks proxyAgent - liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) + r <- liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) + case r of + PKEY {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1) + _ -> pure () + pure r where proxyResp = \case Left err -> ERR $ smpProxyError err @@ -698,9 +703,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, ProxyAgent {smpAgent} <- asks proxyAgent atomically (lookupSMPServerClient smpAgent sessId) >>= \case Just smp - | v >= sendingProxySMPVersion -> - liftIO $ either (ERR . smpProxyError) PRES <$> + | v >= sendingProxySMPVersion -> do + r <- liftIO $ either (ERR . smpProxyError) PRES <$> runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError) + case r of + PRES {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1) + _ -> pure () + pure r | otherwise -> pure . ERR $ transportErr TEVersion where THandleParams {thVersion = v} = thParams smp @@ -773,13 +782,7 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) - - now <- liftIO getCurrentTime - statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) - stats' <- asks clientStats -- TVar (IntMap ClientStats) - atomically $ withClientStatId statsIds' $ \statsId -> do - cs <- getClientStats stats' statsId now - modifyTVar' (CS.qCreated cs) $ S.insert rId + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.qCreated cs) $ S.insert rId -- TODO: increment current Q counter in IP timeline -- TODO: increment current Q counter in server timeline case subMode of @@ -970,36 +973,12 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) - - logDebug $ "Senders gonna send..." - senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId) - statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId) - stats' <- asks clientStats -- TVar (IntMap ClientStats) - now <- liftIO getCurrentTime - atomically $ case senderKey qr of - Nothing -> withClientStatId statsIds' $ \statsId -> do - -- unsecured queue, no merging - cs <- getClientStats stats' statsId now - -- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions - modifyTVar' (CS.msgSentUnsigned cs) (+ 1) - Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do - -- secured queue, merging is possible - senders <- readTVar senders' - statsId <- case M.lookup (recipientId qr) senders of - Nothing -> do - newOwner <- newTVar currentStatsId - writeTVar senders' $ M.insert (recipientId qr) newOwner senders - pure currentStatsId - Just sender -> do - prevStatsId <- readTVar sender - unless (prevStatsId == currentStatsId) $ do - modifyTVar' statsIds' $ IM.insert clientId prevStatsId - qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId - unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer - pure prevStatsId - cs <- getClientStats stats' statsId now - modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) - modifyTVar' (CS.msgSentSigned cs) (+ 1) + case senderKey qr of + Nothing -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _secured -> do + withMergedClientStatsId qr $ \cs -> do + atomically $ modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) + atomically $ modifyTVar' (CS.msgSentSigned cs) (+ 1) -- TODO: increment current S counter in IP timeline -- TODO: increment current S counter in server timeline pure ok @@ -1158,32 +1137,61 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok - -- missing clientId entry means the client is exempt from stats - withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId - - getClientStats stats' statsId now = do - stats <- readTVar stats' - case IM.lookup statsId stats of - Nothing -> do - new <- CS.newClientStats newTVar peerId now - writeTVar stats' $ IM.insert statsId new stats - pure new - Just cs -> cs <$ writeTVar (CS.updatedAt cs) now - - mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) - mergeClientStats stats' prevId curId = do - stats <- readTVar stats' - case (IM.lookup prevId stats, IM.lookup curId stats) of - (_, Nothing) -> pure mempty - (Nothing, Just cur@CS.ClientStats {qCreated}) -> do - writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) - readTVar qCreated - (Just prev, Just cur) -> do - curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur - prevData <- CS.readClientStatsData readTVar prev - CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData - writeTVar stats' $ IM.delete curId stats - pure _qCreated + -- missing clientId entry means the client is exempt from stats + withClientStatsId_ statsIds' getCS = IM.lookup clientId <$> readTVar statsIds' >>= mapM getCS + + withClientStatsId updateCS = do + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getClientStats stats' now) >>= mapM_ updateCS + + getClientStats stats' now statsId = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + + withMergedClientStatsId qr updateCS = do + senders' <- asks sendSignedClients + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getMergeClientStats senders' statsIds' stats' now qr) >>= mapM_ updateCS + + getMergeClientStats senders' statsIds' stats' now qr currentStatsId = do + senders <- readTVar senders' + statsId <- case M.lookup (recipientId qr) senders of + Nothing -> do + newOwner <- newTVar currentStatsId + writeTVar senders' $ M.insert (recipientId qr) newOwner senders + pure currentStatsId + Just sender -> do + prevStatsId <- readTVar sender + unless (prevStatsId == currentStatsId) $ do + modifyTVar' statsIds' $ IM.insert clientId prevStatsId + qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId + unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer + pure prevStatsId + getClientStats stats' now statsId + + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 1f0a045f5..2f02d78c5 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -15,11 +15,9 @@ import qualified Data.IntMap.Strict as IM import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP import Data.Time.Clock (getCurrentTime) -import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) -import Data.Time.Clock.System (SystemTime, systemToUTCTime) +import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) import qualified Network.TLS as T @@ -34,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 285e2b255..03722256e 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -28,8 +28,10 @@ data ClientStats = ClientStats qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs msgSentSigned :: TVar Int, msgSentUnsigned :: TVar Int, - msgSentViaProxy :: TVar Int, -- TODO - msgDeliveredSigned :: TVar Int + msgDeliveredSigned :: TVar Int, + proxyRelaysRequested :: TVar Int, + proxyRelaysConnected :: TVar Int, + msgSentViaProxy :: TVar Int } -- may be combined with session duration to produce average rates (q/s, msg/s) @@ -42,8 +44,10 @@ data ClientStatsData = ClientStatsData _qSentSigned :: Set RecipientId, _msgSentSigned :: Int, _msgSentUnsigned :: Int, - _msgSentViaProxy :: Int, - _msgDeliveredSigned :: Int + _msgDeliveredSigned :: Int, + _proxyRelaysRequested :: Int, + _proxyRelaysConnected :: Int, + _msgSentViaProxy :: Int } newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats @@ -56,8 +60,10 @@ newClientStats newF peerId ts = do qSentSigned <- newF mempty msgSentSigned <- newF 0 msgSentUnsigned <- newF 0 - msgSentViaProxy <- newF 0 msgDeliveredSigned <- newF 0 + proxyRelaysRequested <- newF 0 + proxyRelaysConnected <- newF 0 + msgSentViaProxy <- newF 0 pure ClientStats { peerAddresses, @@ -68,8 +74,10 @@ newClientStats newF peerId ts = do qSentSigned, msgSentSigned, msgSentUnsigned, - msgSentViaProxy, - msgDeliveredSigned + msgDeliveredSigned, + proxyRelaysRequested, + proxyRelaysConnected, + msgSentViaProxy } {-# INLINE newClientStats #-} @@ -83,8 +91,10 @@ readClientStatsData readF cs = do _qSentSigned <- readF $ qSentSigned cs _msgSentSigned <- readF $ msgSentSigned cs _msgSentUnsigned <- readF $ msgSentUnsigned cs - _msgSentViaProxy <- readF $ msgSentViaProxy cs _msgDeliveredSigned <- readF $ msgDeliveredSigned cs + _proxyRelaysRequested <- readF $ proxyRelaysRequested cs + _proxyRelaysConnected <- readF $ proxyRelaysConnected cs + _msgSentViaProxy <- readF $ msgSentViaProxy cs pure ClientStatsData { _peerAddresses, @@ -95,8 +105,10 @@ readClientStatsData readF cs = do _qSentSigned, _msgSentSigned, _msgSentUnsigned, - _msgSentViaProxy, - _msgDeliveredSigned + _msgDeliveredSigned, + _proxyRelaysRequested, + _proxyRelaysConnected, + _msgSentViaProxy } {-# INLINE readClientStatsData #-} @@ -110,8 +122,10 @@ writeClientStatsData cs csd = do writeTVar (qSentSigned cs) (_qSentSigned csd) writeTVar (msgSentSigned cs) (_msgSentSigned csd) writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd) - writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd) + writeTVar (proxyRelaysRequested cs) (_proxyRelaysRequested csd) + writeTVar (proxyRelaysConnected cs) (_proxyRelaysConnected csd) + writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData mergeClientStatsData a b = @@ -124,6 +138,8 @@ mergeClientStatsData a b = _qSentSigned = _qSentSigned a <> _qSentSigned b, _msgSentSigned = _msgSentSigned a + _msgSentSigned b, _msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b, - _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b, - _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b + _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b, + _proxyRelaysRequested = _proxyRelaysRequested a + _proxyRelaysRequested b, + _proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b, + _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b } From 4b6ab5e89ef72adb6f33a7c011b56721cce6e426 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 16:58:48 +0300 Subject: [PATCH 07/13] add control port stats-clients --- src/Simplex/Messaging/Server.hs | 8 ++++++++ src/Simplex/Messaging/Server/Control.hs | 3 +++ src/Simplex/Messaging/Server/Stats/Client.hs | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index cfd05b87f..a977025f6 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -54,6 +54,7 @@ import Data.Functor (($>)) import Data.Int (Int64) import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM +import qualified Data.IntSet as IS import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L @@ -390,6 +391,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do where putStat :: Show a => String -> TVar a -> IO () putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v + CPStatsClients -> withAdminRole $ do + stats' <- unliftIO u (asks clientStats) >>= readTVarIO + B.hPutStr h "peerAddresses,socketCount,createdAt,updatedAt,qCreated,qSentSigned,msgSentSigned,msgSentUnsigned,msgDeliveredSigned,proxyRelaysRequested,proxyRelaysConnected,msgSentViaProxy\n" + forM_ stats' $ \cs -> do + csd <- CS.readClientStatsData readTVarIO cs + let CS.ClientStatsData {_peerAddresses, _socketCount, _createdAt, _updatedAt, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd + B.hPutStrLn h $ B.intercalate "," [bshow $ IS.size _peerAddresses, bshow _socketCount, strEncode _createdAt, strEncode _updatedAt, bshow $ S.size _qCreated, bshow $ S.size _qSentSigned, bshow _msgSentSigned, bshow _msgSentUnsigned, bshow _msgDeliveredSigned, bshow _proxyRelaysRequested, bshow _proxyRelaysConnected, bshow _msgSentViaProxy] CPStatsRTS -> getRTSStats >>= hPrint h CPThreads -> withAdminRole $ do #if MIN_VERSION_base(4,18,0) diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 9463fa777..bf703832e 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -16,6 +16,7 @@ data ControlProtocol | CPResume | CPClients | CPStats + | CPStatsClients | CPStatsRTS | CPThreads | CPSockets @@ -33,6 +34,7 @@ instance StrEncoding ControlProtocol where CPResume -> "resume" CPClients -> "clients" CPStats -> "stats" + CPStatsClients -> "stats-clients" CPStatsRTS -> "stats-rts" CPThreads -> "threads" CPSockets -> "sockets" @@ -49,6 +51,7 @@ instance StrEncoding ControlProtocol where "resume" -> pure CPResume "clients" -> pure CPClients "stats" -> pure CPStats + "stats-clients" -> pure CPStatsClients "stats-rts" -> pure CPStatsRTS "threads" -> pure CPThreads "sockets" -> pure CPSockets diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 03722256e..095f9f49c 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -53,7 +53,7 @@ data ClientStatsData = ClientStatsData newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats newClientStats newF peerId ts = do peerAddresses <- newF $ IS.singleton peerId - socketCount <- newF 0 + socketCount <- newF 1 createdAt <- newF ts updatedAt <- newF ts qCreated <- newF mempty From f884ecc3ab57c8447bc0e71304858792ba88872d Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Wed, 15 May 2024 23:39:57 +0300 Subject: [PATCH 08/13] collect distributions over counters --- src/Simplex/Messaging/Server.hs | 33 +++++++++-- src/Simplex/Messaging/Server/Stats.hs | 56 +++++++++++++++++-- src/Simplex/Messaging/Server/Stats/Client.hs | 31 ++++++++++ .../Messaging/Server/Stats/Timeline.hs | 35 +----------- 4 files changed, 113 insertions(+), 42 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a977025f6..e0e98ef3d 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -231,7 +231,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rateStatsThread_ :: ServerConfig -> [M ()] rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = - [ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + [ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while ] rateStatsThread_ _ = [] @@ -291,16 +291,41 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do monitorServerRates :: Int64 -> M () monitorServerRates bucketWidth = do labelMyThread "monitorServerRates" - forever $ do + stats' <- asks clientStats + liftIO . forever $ do + -- now <- getCurrentTime -- TODO: calculate delay for the next bucket closing time - liftIO $ threadDelay' bucketWidth + threadDelay' bucketWidth -- TODO: collect and reset buckets + stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) + logNote . tshow $ fmap (distribution . histogram) $ collect stats + where + collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) + collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats + where + toColumns acc statsId csd = + CS.ClientStatsC + { peerAddressesC = IS.size _peerAddresses +> CS.peerAddressesC acc, + socketCountC = _socketCount +> CS.socketCountC acc, + -- created/updated skpped + qCreatedC = S.size _qCreated +> CS.qCreatedC acc, + qSentSignedC = S.size _qSentSigned +> CS.qSentSignedC acc, + msgSentSignedC = _msgSentSigned +> CS.msgSentSignedC acc, + msgSentUnsignedC = _msgSentUnsigned +> CS.msgSentUnsignedC acc, + msgDeliveredSignedC = _msgDeliveredSigned +> CS.msgDeliveredSignedC acc, + proxyRelaysRequestedC = _proxyRelaysRequested +> CS.proxyRelaysRequestedC acc, + proxyRelaysConnectedC = _proxyRelaysConnected +> CS.proxyRelaysConnectedC acc, + msgSentViaProxyC = _msgSentViaProxy +> CS.msgSentViaProxyC acc + } + where + (+>) = IM.insertWith (+) statsId + CS.ClientStatsData {_peerAddresses, _socketCount, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd logServerRates :: Int64 -> Int64 -> FilePath -> M () logServerRates startAt logInterval statsFilePath = do labelMyThread "logServerStats" initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime - liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) let interval = 1000000 * logInterval forever $ do diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 53f977f8f..ece54cfcf 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -10,18 +12,16 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.Foldable (toList) import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM -import Data.IntPSQ (IntPSQ) -import qualified Data.IntPSQ as IP -import Data.Monoid (getSum) +import Data.List (find) +import Data.Maybe (listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) -import Data.Time.Clock.POSIX (POSIXTime) -import Data.Word (Word32) +import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM @@ -238,3 +238,47 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) + +-- counter -> occurences +newtype Histogram = Histogram (IntMap Int) + deriving (Show) + +histogram :: Foldable t => t Int -> Histogram +histogram = Histogram . IM.fromListWith (+) . map (,1) . toList +{-# INLINE histogram #-} + +distribution :: Histogram -> Distribution (Maybe Int) +distribution h = Distribution + { minimal = fst <$> listToMaybe cdf', + bottom50p = bot 0.5, -- std median + top50p = top 0.5, + top20p = top 0.2, + top10p = top 0.1, + top5p = top 0.05, + top1p = top 0.01, + maximal = fst <$> listToMaybe rcdf' + } + where + bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf' + top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf' + cdf' = cdf h + rcdf' = reverse cdf' -- allow find to work from the smaller end + +cdf :: Histogram -> [(Int, Float)] +cdf (Histogram h) = map (\(v, cc) -> (v, fromIntegral cc / total)) . scanl1 cumulative $ IM.assocs h + where + total :: Float + total = fromIntegral $ sum h + cumulative (_, acc) (v, c) = (v, acc + c) + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show, Functor, Foldable) diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 095f9f49c..82c86f326 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -143,3 +144,33 @@ mergeClientStatsData a b = _proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b, _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b } + +-- | A column-based collection of ClientStats-related data. +data ClientStatsC a = ClientStatsC + { peerAddressesC :: a, + socketCountC :: a, + qCreatedC :: a, + qSentSignedC :: a, + msgSentSignedC :: a, + msgSentUnsignedC :: a, + msgDeliveredSignedC :: a, + proxyRelaysRequestedC :: a, + proxyRelaysConnectedC :: a, + msgSentViaProxyC :: a + } + deriving (Show, Functor) + +clientStatsC :: a -> ClientStatsC a +clientStatsC x = ClientStatsC + { peerAddressesC = x, + socketCountC = x, + qCreatedC = x, + qSentSignedC = x, + msgSentSignedC = x, + msgSentUnsignedC = x, + msgDeliveredSignedC = x, + proxyRelaysRequestedC = x, + proxyRelaysConnectedC = x, + msgSentViaProxyC = x + } +{-# INLINE clientStatsC #-} diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs index 77bf7ba97..504f82f08 100644 --- a/src/Simplex/Messaging/Server/Stats/Timeline.hs +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -7,23 +7,15 @@ module Simplex.Messaging.Server.Stats.Timeline where -import Control.Applicative (optional, (<|>)) -import qualified Data.Attoparsec.ByteString.Char8 as A -import qualified Data.ByteString.Char8 as B import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as IM import Data.IntPSQ (IntPSQ) import qualified Data.IntPSQ as IP -import Data.Monoid (getSum) -import Data.Set (Set) -import qualified Data.Set as S -import Data.Time.Calendar.Month (pattern MonthDay) -import Data.Time.Calendar.OrdinalDate (mondayStartWeek) -import Data.Time.Clock (NominalDiffTime, UTCTime (..)) +import Data.List (find, sortOn) +import Data.Maybe (listToMaybe) +import Data.Time.Clock (NominalDiffTime) import Data.Time.Clock.POSIX (POSIXTime) import Data.Word (Word32) -import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (RecipientId) import UnliftIO.STM -- A time series of counters with an active head @@ -66,24 +58,3 @@ type WindowData = IntMap Int -- PeerId -> counter window :: BucketId -> BucketId -> SparseSeries -> WindowData window = error "TODO: pick elements inside the range and drop bucket ids" - --- counter -> occurences -type Histogram = IntMap Int - -histogram :: WindowData -> Histogram -histogram = fmap getSum . IM.fromListWith (<>) . map (,1) . IM.elems - -distribution :: Histogram -> Distribution Int -distribution = error "TODO: unroll histogram, sample elements at percentiles" - -data Distribution a = Distribution - { minimal :: a, - bottom50p :: a, - top50p :: a, - top20p :: a, - top10p :: a, - top5p :: a, - top1p :: a, - maximal :: a - } - deriving (Show) From ba7bcef399bbc454faef10ca7f7ce88286423aa3 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 16 May 2024 21:00:05 +0300 Subject: [PATCH 09/13] write distributions to files --- src/Simplex/Messaging/Server.hs | 36 +++++++++++----- src/Simplex/Messaging/Server/Env/STM.hs | 8 +++- src/Simplex/Messaging/Server/Main.hs | 1 + src/Simplex/Messaging/Server/Stats.hs | 39 +++++++++++------ src/Simplex/Messaging/Server/Stats/Client.hs | 45 ++++++++++++++------ tests/SMPClient.hs | 1 + 6 files changed, 93 insertions(+), 37 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e0e98ef3d..cf0ffd4bc 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -61,7 +61,7 @@ import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Set (Set) import qualified Data.Set as S -import Data.Maybe (isNothing) +import Data.Maybe (isNothing, listToMaybe) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -96,12 +96,13 @@ import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Exit (exitFailure) +import System.FilePath (takeDirectory) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Async (mapConcurrently) import UnliftIO.Concurrent -import UnliftIO.Directory (doesFileExist, renameFile) +import UnliftIO.Directory (createDirectoryIfMissing, doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -230,9 +231,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do serverStatsThread_ _ = [] rateStatsThread_ :: ServerConfig -> [M ()] - rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = - [ monitorServerRates (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection - logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while + rateStatsThread_ ServerConfig {rateStatsLength = nBuckets, rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = + [ monitorServerRates nBuckets (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + logServerRates logStatsStartTime logInterval rateStatsLogFile -- log current distributions once in a while ] rateStatsThread_ _ = [] @@ -288,17 +289,19 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ] liftIO $ threadDelay' interval - monitorServerRates :: Int64 -> M () - monitorServerRates bucketWidth = do + monitorServerRates :: Int -> Int64 -> M () + monitorServerRates nBuckets bucketWidth = do labelMyThread "monitorServerRates" stats' <- asks clientStats + rates' <- asks serverRates liftIO . forever $ do -- now <- getCurrentTime -- TODO: calculate delay for the next bucket closing time threadDelay' bucketWidth -- TODO: collect and reset buckets stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) - logNote . tshow $ fmap (distribution . histogram) $ collect stats + let !rates = distribution . histogram <$> collect stats + atomically . modifyTVar' rates' $ (rates :) . take nBuckets where collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats @@ -324,13 +327,26 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do logServerRates :: Int64 -> Int64 -> FilePath -> M () logServerRates startAt logInterval statsFilePath = do labelMyThread "logServerStats" + liftIO . unlessM (doesFileExist statsFilePath) $ do + createDirectoryIfMissing True (takeDirectory statsFilePath) + B.writeFile statsFilePath $ B.intercalate "," csvLabels <> "\n" initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) let interval = 1000000 * logInterval - forever $ do + rates' <- asks serverRates + liftIO . forever $ do -- write the thing - liftIO $ threadDelay' interval + threadDelay' interval + rates <- readTVarIO rates' + forM_ (listToMaybe rates) $ \cs -> do + ts <- getCurrentTime + let values = concatMap (concatMap $ pure . maybe "0" bshow) cs + withFile statsFilePath AppendMode $ \h -> liftIO $ do + hSetBuffering h LineBuffering + B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n" + where + csvLabels = "ts" : concatMap (\s -> concatMap (\d -> [s <> "." <> d]) distributionLabels) CS.clientStatsLabels runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 2f02d78c5..c33b3abd8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -32,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats -import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId) +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsC, ClientStatsId) import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) @@ -76,6 +76,8 @@ data ServerConfig = ServerConfig serverStatsBackupFile :: Maybe FilePath, -- | rate limit monitoring interval / bucket width, seconds rateStatsInterval :: Maybe Int64, + -- | number of rate limit samples to keep + rateStatsLength :: Int, rateStatsLogFile :: FilePath, rateStatsBackupFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization @@ -124,6 +126,7 @@ data Env = Env clientStats :: TVar (IntMap ClientStats), -- transitive session stats statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders + serverRates :: TVar [ClientStatsC (Distribution (Maybe Int))], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations sockets :: SocketState, clientSeq :: TVar ClientId, clients :: TVar (IntMap Client), @@ -219,7 +222,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, clientStats <- newTVarIO mempty statsClients <- newTVarIO mempty sendSignedClients <- newTVarIO mempty - return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients} + serverRates <- newTVarIO mempty + return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, serverRates} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 5ebb82ebb..2e4aba88a 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -225,6 +225,7 @@ smpServerCLI cfgPath logPath = serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", rateStatsInterval = Just 60, -- TODO: add to options + rateStatsLength = 0, -- Just (24 * 60), -- TODO: add to options rateStatsLogFile = combine logPath "smp-server-rates.daily.log", rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log", smpServerVRange = supportedServerSMPRelayVRange, diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index ece54cfcf..d6db73188 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,5 +1,5 @@ -{-# LANGUAGE DeriveFoldable #-} {-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -19,6 +19,7 @@ import Data.List (find) import Data.Maybe (listToMaybe) import Data.Set (Set) import qualified Data.Set as S +import Data.String (IsString) import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..)) @@ -248,16 +249,17 @@ histogram = Histogram . IM.fromListWith (+) . map (,1) . toList {-# INLINE histogram #-} distribution :: Histogram -> Distribution (Maybe Int) -distribution h = Distribution - { minimal = fst <$> listToMaybe cdf', - bottom50p = bot 0.5, -- std median - top50p = top 0.5, - top20p = top 0.2, - top10p = top 0.1, - top5p = top 0.05, - top1p = top 0.01, - maximal = fst <$> listToMaybe rcdf' - } +distribution h = + Distribution + { minimal = fst <$> listToMaybe cdf', + bottom50p = bot 0.5, -- std median + top50p = top 0.5, + top20p = top 0.2, + top10p = top 0.1, + top5p = top 0.05, + top1p = top 0.01, + maximal = fst <$> listToMaybe rcdf' + } where bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf' top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf' @@ -281,4 +283,17 @@ data Distribution a = Distribution top1p :: a, maximal :: a } - deriving (Show, Functor, Foldable) + deriving (Show, Functor, Foldable, Traversable) + +distributionLabels :: IsString a => Distribution a +distributionLabels = + Distribution + { minimal = "minimal", + bottom50p = "bottom50p", + top50p = "top50p", + top20p = "top20p", + top10p = "top10p", + top5p = "top5p", + top1p = "top1p", + maximal = "maximal" + } diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs index 82c86f326..a45984011 100644 --- a/src/Simplex/Messaging/Server/Stats/Client.hs +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -12,6 +13,7 @@ module Simplex.Messaging.Server.Stats.Client where import Data.IntSet (IntSet) import qualified Data.IntSet as IS import Data.Set (Set) +import Data.String (IsString) import Data.Time.Clock (UTCTime (..)) import Simplex.Messaging.Protocol (RecipientId) import Simplex.Messaging.Transport (PeerId) @@ -158,19 +160,36 @@ data ClientStatsC a = ClientStatsC proxyRelaysConnectedC :: a, msgSentViaProxyC :: a } - deriving (Show, Functor) + deriving (Show, Functor, Foldable, Traversable) clientStatsC :: a -> ClientStatsC a -clientStatsC x = ClientStatsC - { peerAddressesC = x, - socketCountC = x, - qCreatedC = x, - qSentSignedC = x, - msgSentSignedC = x, - msgSentUnsignedC = x, - msgDeliveredSignedC = x, - proxyRelaysRequestedC = x, - proxyRelaysConnectedC = x, - msgSentViaProxyC = x - } +clientStatsC x = + ClientStatsC + { peerAddressesC = x, + socketCountC = x, + qCreatedC = x, + qSentSignedC = x, + msgSentSignedC = x, + msgSentUnsignedC = x, + msgDeliveredSignedC = x, + proxyRelaysRequestedC = x, + proxyRelaysConnectedC = x, + msgSentViaProxyC = x + } {-# INLINE clientStatsC #-} + +clientStatsLabels :: IsString a => ClientStatsC a +clientStatsLabels = + ClientStatsC + { peerAddressesC = "peerAddresses", + socketCountC = "socketCount", + qCreatedC = "qCreated", + qSentSignedC = "qSentSigned", + msgSentSignedC = "msgSentSigned", + msgSentUnsignedC = "msgSentUnsigned", + msgDeliveredSignedC = "msgDeliveredSigned", + proxyRelaysRequestedC = "proxyRelaysRequested", + proxyRelaysConnectedC = "proxyRelaysConnected", + msgSentViaProxyC = "msgSentViaProxy" + } +{-# INLINE clientStatsLabels #-} diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 0affc4d88..d79ce2e73 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -113,6 +113,7 @@ cfg = serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, rateStatsInterval = Nothing, + rateStatsLength = 0, rateStatsLogFile = "", rateStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", From da660be6e7c6e51fd52e8f67d29080abb9d11140 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Tue, 21 May 2024 12:16:14 +0300 Subject: [PATCH 10/13] fix lint --- src/Simplex/Messaging/Server.hs | 2 +- src/Simplex/Messaging/Server/Stats.hs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 868ca3c80..6ad1c0aa8 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -321,7 +321,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do atomically . modifyTVar' rates' $ (rates :) . take nBuckets where collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) - collect stats = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) stats + collect = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) where toColumns acc statsId csd = CS.ClientStatsC diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index d92c58693..758964829 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -380,8 +380,8 @@ distribution h = maximal = fst <$> listToMaybe rcdf' } where - bot p = fmap fst $ find (\(_, p') -> p' >= p) cdf' - top p = fmap fst $ find (\(_, p') -> p' <= 1 - p) rcdf' + bot p = fst <$> find (\(_, p') -> p' >= p) cdf' + top p = fst <$> find (\(_, p') -> p' <= 1 - p) rcdf' cdf' = cdf h rcdf' = reverse cdf' -- allow find to work from the smaller end From f5c2e0f19a69ae28a98232e13a68b08ede3a010e Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 14 Jun 2024 17:00:13 +0300 Subject: [PATCH 11/13] remove Maybe --- src/Simplex/Messaging/Server.hs | 3 ++- src/Simplex/Messaging/Server/Env/STM.hs | 2 +- src/Simplex/Messaging/Server/Stats.hs | 14 +++++++------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 84cfd14da..7ec133b91 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -52,6 +52,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Either (fromRight, partitionEithers) +import Data.Foldable (toList) import Data.Functor (($>)) import Data.Int (Int64) import Data.IntMap.Strict (IntMap) @@ -369,7 +370,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rates <- readTVarIO rates' forM_ (listToMaybe rates) $ \cs -> do ts <- getCurrentTime - let values = concatMap (concatMap $ pure . maybe "0" bshow) cs + let values = concatMap (map bshow . toList) cs withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n" diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index aba2d6bef..6c7179161 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -137,7 +137,7 @@ data Env = Env clientStats :: TVar (IntMap ClientStats), -- transitive session stats statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders - serverRates :: TVar [ClientStatsC (Distribution (Maybe Int))], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations + serverRates :: TVar [ClientStatsC (Distribution Int)], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations sockets :: SocketState, clientSeq :: TVar ClientId, clients :: TVar (IntMap Client), diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 336e39408..08c2be841 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -129,7 +129,7 @@ getServerStatsData s = do _qDeletedSecured <- readTVar $ qDeletedSecured s _qSub <- readTVar $ qSub s _qSubAuth <- readTVar $ qSubAuth s - _qSubDuplicate <- readTVar $ qSubDuplicate s + _qSubDuplicate <- readTVar $ qSubDuplicate s _qSubProhibited <- readTVar $ qSubProhibited s _msgSent <- readTVar $ msgSent s _msgSentAuth <- readTVar $ msgSentAuth s @@ -159,7 +159,7 @@ setServerStats s d = do writeTVar (qDeletedNew s) $! _qDeletedNew d writeTVar (qDeletedSecured s) $! _qDeletedSecured d writeTVar (qSub s) $! _qSub d - writeTVar (qSubAuth s) $! _qSubAuth d + writeTVar (qSubAuth s) $! _qSubAuth d writeTVar (qSubDuplicate s) $! _qSubDuplicate d writeTVar (qSubProhibited s) $! _qSubProhibited d writeTVar (msgSent s) $! _msgSent d @@ -417,21 +417,21 @@ histogram :: Foldable t => t Int -> Histogram histogram = Histogram . IM.fromListWith (+) . map (,1) . toList {-# INLINE histogram #-} -distribution :: Histogram -> Distribution (Maybe Int) +distribution :: Histogram -> Distribution Int distribution h = Distribution - { minimal = fst <$> listToMaybe cdf', + { minimal = maybe 0 fst $ listToMaybe cdf', bottom50p = bot 0.5, -- std median top50p = top 0.5, top20p = top 0.2, top10p = top 0.1, top5p = top 0.05, top1p = top 0.01, - maximal = fst <$> listToMaybe rcdf' + maximal = maybe 0 fst $ listToMaybe rcdf' } where - bot p = fst <$> find (\(_, p') -> p' >= p) cdf' - top p = fst <$> find (\(_, p') -> p' <= 1 - p) rcdf' + bot p = maybe 0 fst $ find (\(_, p') -> p' >= p) cdf' + top p = maybe 0 fst $ find (\(_, p') -> p' <= 1 - p) rcdf' cdf' = cdf h rcdf' = reverse cdf' -- allow find to work from the smaller end From dcb8e33ccb0c7a23af5a0b683393300c9dbefb66 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 14 Jun 2024 18:16:31 +0300 Subject: [PATCH 12/13] fix initial delay --- src/Simplex/Messaging/Server.hs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 7ec133b91..605c0f837 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -232,7 +232,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rateStatsThread_ :: ServerConfig -> [M ()] rateStatsThread_ ServerConfig {rateStatsLength = nBuckets, rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = - [ monitorServerRates nBuckets (bucketWidth * 1000000), -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + [ monitorServerRates nBuckets bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection logServerRates logStatsStartTime logInterval rateStatsLogFile -- log current distributions once in a while ] rateStatsThread_ _ = [] @@ -326,8 +326,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do liftIO . forever $ do -- now <- getCurrentTime -- TODO: calculate delay for the next bucket closing time - threadDelay' bucketWidth - -- TODO: collect and reset buckets + threadDelay' $ bucketWidth * 1000000 stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) let !rates = distribution . histogram <$> collect stats atomically . modifyTVar' rates' $ (rates :) . take nBuckets @@ -366,7 +365,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do rates' <- asks serverRates liftIO . forever $ do -- write the thing - threadDelay' interval rates <- readTVarIO rates' forM_ (listToMaybe rates) $ \cs -> do ts <- getCurrentTime @@ -374,6 +372,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n" + threadDelay' interval where csvLabels = "ts" : concatMap (\s -> concatMap (\d -> [s <> "." <> d]) distributionLabels) CS.clientStatsLabels From 76bd21e086c15f09cb5be466e1b68ee7e69284a0 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Mon, 17 Jun 2024 11:41:11 +0300 Subject: [PATCH 13/13] strict timeline tail --- src/Simplex/Messaging/Server.hs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 605c0f837..9b7433471 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -329,7 +329,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do threadDelay' $ bucketWidth * 1000000 stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) let !rates = distribution . histogram <$> collect stats - atomically . modifyTVar' rates' $ (rates :) . take nBuckets + atomically . modifyTVar' rates' $ \old -> + let timeline = take nBuckets old + in length timeline `seq` rates : timeline where collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) collect = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty)