Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: command rate monitoring #1143

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies:
- direct-sqlcipher == 2.3.*
- directory == 1.3.*
- filepath == 1.4.*
- hashable == 1.4.*
- hourglass == 0.2.*
- http-types == 0.12.*
- http2 >= 4.2.2 && < 4.3
Expand All @@ -61,6 +62,7 @@ dependencies:
- network-udp >= 0.0 && < 0.1
- optparse-applicative >= 0.15 && < 0.17
- process == 1.6.*
- psqueues == 0.2.8.*
- random >= 1.1 && < 1.3
- simple-logger == 0.1.*
- socks == 0.6.*
Expand Down
14 changes: 14 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ library
Simplex.Messaging.Server.QueueStore.QueueInfo
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Simplex.Messaging.Server.Stats.Client
Simplex.Messaging.Server.Stats.Timeline
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.ServiceScheme
Simplex.Messaging.Session
Expand Down Expand Up @@ -233,6 +235,7 @@ library
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -247,6 +250,7 @@ library
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, socks ==0.6.*
Expand Down Expand Up @@ -307,6 +311,7 @@ executable ntf-server
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -321,6 +326,7 @@ executable ntf-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -386,6 +392,7 @@ executable smp-server
, directory ==1.3.*
, file-embed
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -400,6 +407,7 @@ executable smp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -464,6 +472,7 @@ executable xftp
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -478,6 +487,7 @@ executable xftp
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -539,6 +549,7 @@ executable xftp-server
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -553,6 +564,7 @@ executable xftp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -650,6 +662,7 @@ test-suite simplexmq-test
, directory ==1.3.*
, filepath ==1.4.*
, generic-random ==1.5.*
, hashable ==1.4.*
, hourglass ==0.2.*
, hspec ==2.11.*
, hspec-core ==2.11.*
Expand All @@ -667,6 +680,7 @@ test-suite simplexmq-test
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, silently ==1.2.*
, simple-logger ==0.1.*
Expand Down
190 changes: 181 additions & 9 deletions src/Simplex/Messaging/Server.hs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Server/Control.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ data ControlProtocol
| CPResume
| CPClients
| CPStats
| CPStatsClients
| CPStatsRTS
| CPThreads
| CPSockets
Expand All @@ -33,6 +34,7 @@ instance StrEncoding ControlProtocol where
CPResume -> "resume"
CPClients -> "clients"
CPStats -> "stats"
CPStatsClients -> "stats-clients"
CPStatsRTS -> "stats-rts"
CPThreads -> "threads"
CPSockets -> "sockets"
Expand All @@ -49,6 +51,7 @@ instance StrEncoding ControlProtocol where
"resume" -> pure CPResume
"clients" -> pure CPClients
"stats" -> pure CPStats
"stats-clients" -> pure CPStatsClients
"stats-rts" -> pure CPStatsRTS
"threads" -> pure CPThreads
"sockets" -> pure CPSockets
Expand Down
33 changes: 28 additions & 5 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Clock.System (SystemTime)
import Data.X509.Validation (Fingerprint (..))
import Network.Socket (ServiceName)
Expand All @@ -34,10 +35,12 @@ import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsC, ClientStatsId)
import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
Expand Down Expand Up @@ -74,6 +77,12 @@ data ServerConfig = ServerConfig
serverStatsLogFile :: FilePath,
-- | file to save and restore stats
serverStatsBackupFile :: Maybe FilePath,
-- | rate limit monitoring interval / bucket width, seconds
rateStatsInterval :: Maybe Int64,
-- | number of rate limit samples to keep
rateStatsLength :: Int,
rateStatsLogFile :: FilePath,
rateStatsBackupFile :: Maybe FilePath,
-- | CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
Expand Down Expand Up @@ -123,6 +132,12 @@ data Env = Env
storeLog :: Maybe (StoreLog 'WriteMode),
tlsServerParams :: T.ServerParams,
serverStats :: ServerStats,
qCreatedByIp :: Timeline Int,
msgSentByIp :: Timeline Int,
clientStats :: TVar (IntMap ClientStats), -- transitive session stats
statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets
sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders
serverRates :: TVar [ClientStatsC (Distribution Int)], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations
sockets :: SocketState,
clientSeq :: TVar ClientId,
clients :: TVar (IntMap Client),
Expand All @@ -145,6 +160,7 @@ type ClientId = Int

data Client = Client
{ clientId :: ClientId,
peerId :: PeerId, -- send updates for this Id to time series
subscriptions :: TMap RecipientId (TVar Sub),
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
Expand Down Expand Up @@ -177,8 +193,8 @@ newServer = do
savingLock <- createLock
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock}

newClient :: TVar ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
newClient nextClientId qSize thVersion sessionId createdAt = do
newClient :: PeerId -> TVar ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
newClient peerId nextClientId qSize thVersion sessionId createdAt = do
clientId <- stateTVar nextClientId $ \next -> (next, next + 1)
subscriptions <- TM.empty
ntfSubscriptions <- TM.empty
Expand All @@ -191,7 +207,7 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
connected <- newTVar True
rcvActiveAt <- newTVar createdAt
sndActiveAt <- newTVar createdAt
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
return Client {peerId, clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}

newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
Expand All @@ -213,7 +229,14 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
clientSeq <- newTVarIO 0
clients <- newTVarIO mempty
proxyAgent <- atomically $ newSMPProxyAgent smpAgentCfg random
pure Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent}
now <- getPOSIXTime
qCreatedByIp <- atomically $ newTimeline perMinute now
msgSentByIp <- atomically $ newTimeline perMinute now
clientStats <- newTVarIO mempty
statsClients <- newTVarIO mempty
sendSignedClients <- newTVarIO mempty
serverRates <- newTVarIO mempty
return Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, serverRates}
where
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
restoreQueues QueueStore {queues, senders, notifiers} f = do
Expand Down
4 changes: 4 additions & 0 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ smpServerCLI_ generateSite serveStaticFiles cfgPath logPath =
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
rateStatsInterval = Just 60, -- TODO: add to options
rateStatsLength = 0, -- Just (24 * 60), -- TODO: add to options
rateStatsLogFile = combine logPath "smp-server-rates.daily.log",
rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log",
smpServerVRange = supportedServerSMPRelayVRange,
transportConfig =
defaultTransportServerConfig
Expand Down
70 changes: 68 additions & 2 deletions src/Simplex/Messaging/Server/Stats.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
Expand All @@ -10,8 +12,14 @@ module Simplex.Messaging.Server.Stats where
import Control.Applicative (optional, (<|>))
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Foldable (toList)
import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as IM
import Data.List (find)
import Data.Maybe (listToMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import Data.String (IsString)
import Data.Time.Calendar.Month (pattern MonthDay)
import Data.Time.Calendar.OrdinalDate (mondayStartWeek)
import Data.Time.Clock (UTCTime (..))
Expand Down Expand Up @@ -121,7 +129,7 @@ getServerStatsData s = do
_qDeletedSecured <- readTVar $ qDeletedSecured s
_qSub <- readTVar $ qSub s
_qSubAuth <- readTVar $ qSubAuth s
_qSubDuplicate <- readTVar $ qSubDuplicate s
_qSubDuplicate <- readTVar $ qSubDuplicate s
_qSubProhibited <- readTVar $ qSubProhibited s
_msgSent <- readTVar $ msgSent s
_msgSentAuth <- readTVar $ msgSentAuth s
Expand Down Expand Up @@ -151,7 +159,7 @@ setServerStats s d = do
writeTVar (qDeletedNew s) $! _qDeletedNew d
writeTVar (qDeletedSecured s) $! _qDeletedSecured d
writeTVar (qSub s) $! _qSub d
writeTVar (qSubAuth s) $! _qSubAuth d
writeTVar (qSubAuth s) $! _qSubAuth d
writeTVar (qSubDuplicate s) $! _qSubDuplicate d
writeTVar (qSubProhibited s) $! _qSubProhibited d
writeTVar (msgSent s) $! _msgSent d
Expand Down Expand Up @@ -400,3 +408,61 @@ instance StrEncoding ProxyStatsData where
_pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine
_pErrorsOther <- "errorsOther=" *> strP
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}

-- counter -> occurences
newtype Histogram = Histogram (IntMap Int)
deriving (Show)

histogram :: Foldable t => t Int -> Histogram
histogram = Histogram . IM.fromListWith (+) . map (,1) . toList
{-# INLINE histogram #-}

distribution :: Histogram -> Distribution Int
distribution h =
Distribution
{ minimal = maybe 0 fst $ listToMaybe cdf',
bottom50p = bot 0.5, -- std median
top50p = top 0.5,
top20p = top 0.2,
top10p = top 0.1,
top5p = top 0.05,
top1p = top 0.01,
maximal = maybe 0 fst $ listToMaybe rcdf'
}
where
bot p = maybe 0 fst $ find (\(_, p') -> p' >= p) cdf'
top p = maybe 0 fst $ find (\(_, p') -> p' <= 1 - p) rcdf'
cdf' = cdf h
rcdf' = reverse cdf' -- allow find to work from the smaller end

cdf :: Histogram -> [(Int, Float)]
cdf (Histogram h) = map (\(v, cc) -> (v, fromIntegral cc / total)) . scanl1 cumulative $ IM.assocs h
where
total :: Float
total = fromIntegral $ sum h
cumulative (_, acc) (v, c) = (v, acc + c)

data Distribution a = Distribution
{ minimal :: a,
bottom50p :: a,
top50p :: a,
top20p :: a,
top10p :: a,
top5p :: a,
top1p :: a,
maximal :: a
}
deriving (Show, Functor, Foldable, Traversable)

distributionLabels :: IsString a => Distribution a
distributionLabels =
Distribution
{ minimal = "minimal",
bottom50p = "bottom50p",
top50p = "top50p",
top20p = "top20p",
top10p = "top10p",
top5p = "top5p",
top1p = "top1p",
maximal = "maximal"
}
Loading
Loading