Skip to content

Commit

Permalink
WIP: command rate monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dpwiz committed May 13, 2024
1 parent 4455b8b commit dbc6ae2
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 21 deletions.
2 changes: 2 additions & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies:
- direct-sqlcipher == 2.3.*
- directory == 1.3.*
- filepath == 1.4.*
- hashable == 1.4.*
- hourglass == 0.2.*
- http-types == 0.12.*
- http2 >= 4.2.2 && < 4.3
Expand All @@ -59,6 +60,7 @@ dependencies:
- network-udp >= 0.0 && < 0.1
- optparse-applicative >= 0.15 && < 0.17
- process == 1.6.*
- psqueues == 0.2.8.*
- random >= 1.1 && < 1.3
- simple-logger == 0.1.*
- socks == 0.6.*
Expand Down
14 changes: 14 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ library
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -218,6 +219,7 @@ library
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, socks ==0.6.*
Expand Down Expand Up @@ -278,6 +280,7 @@ executable ntf-server
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -292,6 +295,7 @@ executable ntf-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -353,6 +357,7 @@ executable smp-agent
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -367,6 +372,7 @@ executable smp-agent
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -428,6 +434,7 @@ executable smp-server
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -442,6 +449,7 @@ executable smp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -503,6 +511,7 @@ executable xftp
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -517,6 +526,7 @@ executable xftp
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -578,6 +588,7 @@ executable xftp-server
, direct-sqlcipher ==2.3.*
, directory ==1.3.*
, filepath ==1.4.*
, hashable ==1.4.*
, hourglass ==0.2.*
, http-types ==0.12.*
, http2 >=4.2.2 && <4.3
Expand All @@ -592,6 +603,7 @@ executable xftp-server
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, simple-logger ==0.1.*
, simplexmq
Expand Down Expand Up @@ -689,6 +701,7 @@ test-suite simplexmq-test
, directory ==1.3.*
, filepath ==1.4.*
, generic-random ==1.5.*
, hashable ==1.4.*
, hourglass ==0.2.*
, hspec ==2.11.*
, hspec-core ==2.11.*
Expand All @@ -706,6 +719,7 @@ test-suite simplexmq-test
, network-udp ==0.0.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, psqueues ==0.2.8.*
, random >=1.1 && <1.3
, silently ==1.2.*
, simple-logger ==0.1.*
Expand Down
46 changes: 43 additions & 3 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ())
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg
)
`finally` withLock' (savingLock s) "final" (saveServer False)
where
Expand Down Expand Up @@ -205,6 +205,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
[logServerStats logStatsStartTime interval serverStatsLogFile]
serverStatsThread_ _ = []

rateStatsThread_ :: ServerConfig -> [M ()]
rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} =
[ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while
]
rateStatsThread_ _ = []

logServerStats :: Int64 -> Int64 -> FilePath -> M ()
logServerStats startAt logInterval statsFilePath = do
labelMyThread "logServerStats"
Expand Down Expand Up @@ -257,6 +264,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
]
liftIO $ threadDelay' interval

monitorServerRates :: Int64 -> M ()
monitorServerRates bucketWidth = do
labelMyThread "monitorServerRates"
forever $ do
-- TODO: calculate delay for the next bucket closing time
liftIO $ threadDelay' bucketWidth
-- TODO: collect and reset buckets

logServerRates :: Int64 -> Int64 -> FilePath -> M ()
logServerRates startAt logInterval statsFilePath = do
labelMyThread "logServerStats"
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
let interval = 1000000 * logInterval
forever $ do
-- write the thing
liftIO $ threadDelay' interval

runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
kh <- asks serverIdentity
Expand Down Expand Up @@ -411,13 +437,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "AUTH"

runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do
runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
nextClientId <- asks clientSeq
c <- atomically $ do
new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts
new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts
modifyTVar' active $ IM.insert clientId new
pure new
s <- asks server
Expand Down Expand Up @@ -643,6 +669,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
where
createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg)
createQueue st recipientKey dhKey subMode = time "NEW" $ do
-- TODO: read client Q rate
-- TODO: read server Q rate for peerId
-- TODO: read global server Q rate
-- TODO: add throttling delay/blackhole request if needed
(rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random
let rcvDhSecret = C.dh' dhKey privDhKey
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey}
Expand Down Expand Up @@ -673,6 +703,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
stats <- asks serverStats
atomically $ modifyTVar' (qCreated stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (+ 1)
-- TODO: increment client Q counter
-- TODO: increment current Q counter in IP timeline
-- TODO: increment current Q counter in server timeline
case subMode of
SMOnlyCreate -> pure ()
SMSubscribe -> void $ subscribeQueue qr rId
Expand Down Expand Up @@ -835,6 +868,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
case C.maxLenBS msgBody of
Left _ -> pure $ err LARGE_MSG
Right body -> do
-- TODO: read client S rate
-- TODO: read server S rate for peerId
-- TODO: read global server S rate
-- TODO: add throttling delay/blackhole request if needed
msg_ <- time "SEND" $ do
q <- getStoreMsgQueue "SEND" $ recipientId qr
expireMessages q
Expand All @@ -850,6 +887,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
-- TODO: increment client S counter
-- TODO: increment current S counter in IP timeline
-- TODO: increment current S counter in server timeline
pure ok
where
mkMessage :: C.MaxLenBS MaxMessageLen -> M Message
Expand Down
23 changes: 18 additions & 5 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Clock.System (SystemTime)
import Data.X509.Validation (Fingerprint (..))
import Network.Socket (ServiceName)
Expand All @@ -33,7 +34,7 @@ import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
Expand Down Expand Up @@ -70,6 +71,10 @@ data ServerConfig = ServerConfig
serverStatsLogFile :: FilePath,
-- | file to save and restore stats
serverStatsBackupFile :: Maybe FilePath,
-- | rate limit monitoring interval / bucket width, seconds
rateStatsInterval :: Maybe Int64,
rateStatsLogFile :: FilePath,
rateStatsBackupFile :: Maybe FilePath,
-- | CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
Expand Down Expand Up @@ -109,6 +114,8 @@ data Env = Env
storeLog :: Maybe (StoreLog 'WriteMode),
tlsServerParams :: T.ServerParams,
serverStats :: ServerStats,
qCreatedByIp :: Timeline,
msgSentByIp :: Timeline,
sockets :: SocketState,
clientSeq :: TVar Int,
clients :: TVar (IntMap Client)
Expand All @@ -124,6 +131,8 @@ data Server = Server

data Client = Client
{ clientId :: Int,
peerId :: PeerId, -- send updates for this Id to time series
clientStats :: ClientStats, -- capture final values on disconnect
subscriptions :: TMap RecipientId (TVar Sub),
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
Expand Down Expand Up @@ -155,8 +164,8 @@ newServer = do
savingLock <- createLock
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock}

newClient :: TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
newClient nextClientId qSize thVersion sessionId createdAt = do
newClient :: PeerId -> TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
newClient peerId nextClientId qSize thVersion sessionId createdAt = do
clientId <- stateTVar nextClientId $ \next -> (next, next + 1)
subscriptions <- TM.empty
ntfSubscriptions <- TM.empty
Expand All @@ -168,7 +177,8 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
connected <- newTVar True
rcvActiveAt <- newTVar createdAt
sndActiveAt <- newTVar createdAt
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
clientStats <- ClientStats <$> newTVar 0 <*> newTVar 0
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats}

newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
Expand All @@ -189,7 +199,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
sockets <- atomically newSocketState
clientSeq <- newTVarIO 0
clients <- newTVarIO mempty
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients}
now <- getPOSIXTime
qCreatedByIp <- atomically $ newTimeline perMinute now
msgSentByIp <- atomically $ newTimeline perMinute now
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients}
where
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
restoreQueues QueueStore {queues, senders, notifiers} f = do
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ smpServerCLI cfgPath logPath =
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
rateStatsInterval = Just 60, -- TODO: add to options
rateStatsLogFile = combine logPath "smp-server-rates.daily.log",
rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log",
smpServerVRange = supportedServerSMPRelayVRange,
transportConfig =
defaultTransportServerConfig
Expand Down
Loading

0 comments on commit dbc6ae2

Please sign in to comment.