-
Notifications
You must be signed in to change notification settings - Fork 11
/
Sockets.hs
112 lines (97 loc) · 3.8 KB
/
Sockets.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
module Sockets
( newServer
, Broadcaster
) where
import ClassyPrelude
import Control.Concurrent.Lifted
import Control.Concurrent.Suspend (sDelay)
import Control.Concurrent.Timer
import Control.Monad.Trans.Maybe (runMaybeT)
import qualified Data.Aeson as Aeson
import Data.Unique
import Data.UnixTime (UnixTime, getUnixTime, secondsToUnixDiffTime, diffUnixTime)
import qualified Network.HTTP.Types.URI as URI
import qualified Network.WebSockets as WS
import System.IO.Streams.Attoparsec (ParseException)
import Types
type Broadcaster = Post -> IO ()
data Client = Client { clientIdentifier :: Unique
} deriving (Eq, Ord)
data Beat = Beat { beatLastTime :: UnixTime
, beatConnection :: WS.Connection
}
type ServerState = Map Client Beat
newServerState :: ServerState
newServerState = mempty
addClient :: Client -> Beat -> ServerState -> ServerState
addClient = insertMap
removeClient :: Client -> ServerState -> ServerState
removeClient = deleteMap
broadcast :: Aeson.ToJSON a => a -> ServerState -> IO ()
broadcast message state =
forM_ (toList state) send
where
send Beat{beatConnection} =
WS.sendTextData beatConnection (Aeson.encode message)
ping :: WS.Connection -> IO ()
ping = flip WS.sendPing ("ping" :: ByteString)
heartbeatIntervalSeconds :: Int64
heartbeatIntervalSeconds = 20
heartbeat :: MVar ServerState -> IO ()
heartbeat db = modifyMVar_ db $ \state ->
mapFromList <$> filterM predicate (mapToList state)
where
maximumDelta =
secondsToUnixDiffTime (heartbeatIntervalSeconds * 2)
predicate (_, Beat{beatLastTime, beatConnection}) = do
now <- getUnixTime
if diffUnixTime now beatLastTime > maximumDelta then do
WS.sendClose beatConnection ("pong better" :: ByteString)
pure False
else do
ping beatConnection
pure True
newServer :: Aeson.ToJSON a => Chan a -> IO WS.ServerApp
newServer chan = do
state <- newMVar newServerState
_ <- repeatedTimer (heartbeat state) (sDelay heartbeatIntervalSeconds)
_ <- fork $ getChanContents chan >>= mapM_ (makeBroadcast state)
pure (application state)
where
makeBroadcast db post = readMVar db >>= broadcast post
ifAccept :: WS.PendingConnection -> (WS.Connection -> IO ()) -> IO ()
ifAccept pending callback =
case (URI.decodePath . WS.requestPath . WS.pendingRequest) pending of
([], _) -> WS.acceptRequest pending >>= callback
_ -> WS.rejectRequest pending "You can only connect to / right now."
handleMessages :: IO () -> WS.Connection -> IO ()
handleMessages onPong conn = void $ (runMaybeT . forever) $ do
msg <- lift $ WS.receive conn
case msg of
WS.DataMessage _ _ _ _ -> pure ()
WS.ControlMessage cm -> case cm of
WS.Close _ _ -> mzero
WS.Pong _ -> lift onPong
WS.Ping a -> lift (WS.send conn (WS.ControlMessage (WS.Pong a)))
application_ :: MVar ServerState -> WS.ServerApp
application_ db pending = ifAccept pending $ \conn -> do
clientIdentifier <- newUnique
let client = Client{clientIdentifier}
(`finally` disconnect client) $ do
setTime client conn
handleMessages (setTime client conn) conn
where
withState = modifyMVar_ db
setTime client conn = withState $ \state -> do
beatLastTime <- getUnixTime
let beat = Beat{beatLastTime, beatConnection = conn}
pure $ addClient client beat state
disconnect client = withState (pure . removeClient client)
application :: MVar ServerState -> WS.ServerApp
application db pending =
(handle connectionExceptions . handle parseExceptions) (application_ db pending)
where
parseExceptions =
const $ throwM WS.ConnectionClosed :: ParseException -> IO ()
connectionExceptions =
const $ pure () :: WS.ConnectionException -> IO ()