Skip to content

Commit

Permalink
WIP resbuf initial
Browse files Browse the repository at this point in the history
  • Loading branch information
turion committed Apr 19, 2024
1 parent 001ef49 commit 1927977
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 80 deletions.
15 changes: 8 additions & 7 deletions rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import FRP.Rhine.Clock.Proxy
import FRP.Rhine.Clock.Util
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.SN
import Data.Automaton.Result (Result(..))

{- | Run a clocked signal function as a monadic stream function,
accepting the timestamps and tags as explicit inputs.
Expand Down Expand Up @@ -96,25 +97,25 @@ eraseClockSN initialTime (Precompose clsf sn) =
proc (time, tag, aMaybe) -> do
bMaybe <- mapMaybeS $ eraseClockClSF (inProxy proxy) initialTime clsf -< (time,,) <$> inTag proxy tag <*> aMaybe
eraseClockSN initialTime sn -< (time, tag, bMaybe)
eraseClockSN initialTime (Feedback buf0 sn) =
eraseClockSN initialTime (Feedback ResamplingBuffer {buffer, put, get} sn) =
let
proxy = toClockProxy sn
in
feedback buf0 $ proc ((time, tag, aMaybe), buf) -> do
feedback buffer $ proc ((time, tag, aMaybe), buf) -> do
(cMaybe, buf') <- case inTag proxy tag of
Nothing -> do
returnA -< (Nothing, buf)
Just tagIn -> do
timeInfo <- genTimeInfo (inProxy proxy) initialTime -< (time, tagIn)
(c, buf') <- arrM $ uncurry get -< (buf, timeInfo)
Result buf' c <- arrM $ uncurry get -< (timeInfo, buf)
returnA -< (Just c, buf')
bdMaybe <- eraseClockSN initialTime sn -< (time, tag, (,) <$> aMaybe <*> cMaybe)
case (,) <$> outTag proxy tag <*> bdMaybe of
Nothing -> do
returnA -< (Nothing, buf')
Just (tagOut, (b, d)) -> do
timeInfo <- genTimeInfo (outProxy proxy) initialTime -< (time, tagOut)
buf'' <- arrM $ uncurry $ uncurry put -< ((buf', timeInfo), d)
buf'' <- arrM $ uncurry $ uncurry put -< ((timeInfo, d), buf')
returnA -< (Just b, buf'')
eraseClockSN initialTime (FirstResampling sn buf) =
let
Expand Down Expand Up @@ -147,14 +148,14 @@ eraseClockResBuf ::
Time cl1 ->
ResBuf m cl1 cl2 a b ->
MSF m (Either (Time cl1, Tag cl1, a) (Time cl2, Tag cl2)) (Maybe b)
eraseClockResBuf proxy1 proxy2 initialTime resBuf0 = feedback resBuf0 $ proc (input, resBuf) -> do
eraseClockResBuf proxy1 proxy2 initialTime ResamplingBuffer {buffer, put, get} = feedback buffer $ proc (input, resBuf) -> do
case input of
Left (time1, tag1, a) -> do
timeInfo1 <- genTimeInfo proxy1 initialTime -< (time1, tag1)
resBuf' <- arrM (uncurry $ uncurry put) -< ((resBuf, timeInfo1), a)
resBuf' <- arrM (uncurry $ uncurry put) -< ((timeInfo1, a), resBuf)
returnA -< (Nothing, resBuf')
Right (time2, tag2) -> do
timeInfo2 <- genTimeInfo proxy2 initialTime -< (time2, tag2)
(b, resBuf') <- arrM (uncurry get) -< (resBuf, timeInfo2)
Result resBuf' b <- arrM (uncurry get) -< (timeInfo2, resBuf)
returnA -< (Just b, resBuf')
{-# INLINE eraseClockResBuf #-}
24 changes: 14 additions & 10 deletions rhine/src/FRP/Rhine/ResamplingBuffer.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ExistentialQuantification #-}

{- |
This module introduces 'ResamplingBuffer's,
Expand All @@ -15,10 +16,8 @@ module FRP.Rhine.ResamplingBuffer (
)
where

-- base
import Control.Arrow

-- rhine
import Data.Automaton.Result
import FRP.Rhine.Clock

-- A quick note on naming conventions, to whoever cares:
Expand All @@ -39,16 +38,20 @@ or specific to certain clocks.
* 'a': The input type
* 'b': The output type
-}
data ResamplingBuffer m cla clb a b = ResamplingBuffer
{ put ::
data ResamplingBuffer m cla clb a b = forall s . ResamplingBuffer
{ buffer :: s
-- ^ The internal state of the buffer.
, put ::
TimeInfo cla ->
a ->
m (ResamplingBuffer m cla clb a b)
s ->
m s
-- ^ Store one input value of type 'a' at a given time stamp,
-- and return a continuation.
, get ::
TimeInfo clb ->
m (b, ResamplingBuffer m cla clb a b)
s ->
m (Result s b)
-- ^ Retrieve one output value of type 'b' at a given time stamp,
-- and a continuation.
}
Expand All @@ -62,8 +65,9 @@ hoistResamplingBuffer ::
(forall c. m1 c -> m2 c) ->
ResamplingBuffer m1 cla clb a b ->
ResamplingBuffer m2 cla clb a b
hoistResamplingBuffer hoist ResamplingBuffer {..} =
hoistResamplingBuffer morph ResamplingBuffer {..} =
ResamplingBuffer
{ put = (((hoistResamplingBuffer hoist <$>) . hoist) .) . put
, get = (second (hoistResamplingBuffer hoist) <$>) . hoist . get
{ put = ((morph .) .) . put
, get = (morph .) . get
, buffer
}
22 changes: 12 additions & 10 deletions rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ Collect and process all incoming values statefully and with time stamps.
module FRP.Rhine.ResamplingBuffer.ClSF where

-- transformers
import Control.Monad.Trans.Reader (runReaderT)
import Control.Monad.Trans.Reader (runReaderT, ReaderT)

-- rhine
import Data.Automaton.MSF
import FRP.Rhine.ClSF.Core
import FRP.Rhine.ResamplingBuffer
import Data.Automaton.Optimized (toAutomatonT)
import Data.Automaton
import Data.Automaton.Result (Result(..), mapResultState)

{- | Given a clocked signal function that accepts
a varying number of timestamped inputs (a list),
Expand All @@ -27,16 +30,15 @@ clsfBuffer ::
-- The list will contain the /newest/ element in the head.
ClSF m cl2 [(TimeInfo cl1, a)] b ->
ResamplingBuffer m cl1 cl2 a b
clsfBuffer = clsfBuffer' []
clsfBuffer = clsfBuffer' . toAutomatonT . getMSF
where
clsfBuffer' ::
(Monad m) =>
[(TimeInfo cl1, a)] ->
ClSF m cl2 [(TimeInfo cl1, a)] b ->
AutomatonT (ReaderT [(TimeInfo cl1, a)] (ReaderT(TimeInfo cl2) m)) b ->
ResamplingBuffer m cl1 cl2 a b
clsfBuffer' as msf = ResamplingBuffer {..}
where
put ti1 a = return $ clsfBuffer' ((ti1, a) : as) msf
get ti2 = do
StrictTuple b msf' <- runReaderT (stepMSF msf as) ti2
return (b, clsfBuffer msf')
clsfBuffer' AutomatonT {state, step} = ResamplingBuffer
{ buffer = (state, [])
,
put = \ti1 a (s, as) -> return (s, (ti1, a) : as)
, get = \ti2 (s, as) -> mapResultState (, []) <$> runReaderT (runReaderT (step s) as) ti2
}
9 changes: 5 additions & 4 deletions rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import Data.Sequence
-- rhine
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless
import Data.Automaton.Result (Result(..))

{- | Collects all input in a list, with the newest element at the head,
which is returned and emptied upon `get`.
Expand All @@ -21,7 +22,7 @@ collect :: (Monad m) => ResamplingBuffer m cl1 cl2 a [a]
collect = timelessResamplingBuffer AsyncMealy {..} []
where
amPut as a = return $ a : as
amGet as = return (as, [])
amGet as = return $! Result [] as

{- | Reimplementation of 'collect' with sequences,
which gives a performance benefit if the sequence needs to be reversed or searched.
Expand All @@ -30,7 +31,7 @@ collectSequence :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Seq a)
collectSequence = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = return (as, empty)
amGet as = return $! Result empty as

{- | 'pureBuffer' collects all input values lazily in a list
and processes it when output is required.
Expand All @@ -41,7 +42,7 @@ pureBuffer :: (Monad m) => ([a] -> b) -> ResamplingBuffer m cl1 cl2 a b
pureBuffer f = timelessResamplingBuffer AsyncMealy {..} []
where
amPut as a = return (a : as)
amGet as = return (f as, [])
amGet as = return $! Result [] $! f as

-- TODO Test whether strictness works here, or consider using deepSeq

Expand All @@ -58,4 +59,4 @@ foldBuffer ::
foldBuffer f = timelessResamplingBuffer AsyncMealy {..}
where
amPut b a = let !b' = f a b in return b'
amGet b = return (b, b)
amGet b = return $! Result b b
13 changes: 7 additions & 6 deletions rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Data.Sequence
-- rhine
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless
import Data.Automaton.Result (Result(..))

-- * FIFO (first-in-first-out) buffers

Expand All @@ -25,8 +26,8 @@ fifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewr as of
EmptyR -> return (Nothing, empty)
as' :> a -> return (Just a, as')
EmptyR -> return $! Result empty Nothing
as' :> a -> return $! Result as' $! Just a

{- | A bounded FIFO buffer that forgets the oldest values when the size is above a given threshold.
If the buffer is empty, it will return 'Nothing'.
Expand All @@ -36,14 +37,14 @@ fifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ take threshold $ a <| as
amGet as = case viewr as of
EmptyR -> return (Nothing, empty)
as' :> a -> return (Just a, as')
EmptyR -> return $! Result empty Nothing
as' :> a -> return $! Result as' (Just a)

-- | An unbounded FIFO buffer that also returns its current size.
fifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int)
fifoWatch = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewr as of
EmptyR -> return ((Nothing, 0), empty)
as' :> a -> return ((Just a, length as'), as')
EmptyR -> return $!Result empty (Nothing, 0)
as' :> a -> return $!Result as' (Just a, length as')
3 changes: 2 additions & 1 deletion rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module FRP.Rhine.ResamplingBuffer.KeepLast where

import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless
import Data.Automaton.Result (Result(..))

{- | Always keeps the last input value,
or in case of no input an initialisation value.
Expand All @@ -16,5 +17,5 @@ import FRP.Rhine.ResamplingBuffer.Timeless
keepLast :: (Monad m) => a -> ResamplingBuffer m cl1 cl2 a a
keepLast = timelessResamplingBuffer AsyncMealy {..}
where
amGet a = return (a, a)
amGet a = return $! Result a a
amPut _ = return
13 changes: 7 additions & 6 deletions rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Data.Sequence
-- rhine
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless
import Data.Automaton.Result (Result(..))

-- * LIFO (last-in-first-out) buffers

Expand All @@ -25,8 +26,8 @@ lifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewl as of
EmptyL -> return (Nothing, empty)
a :< as' -> return (Just a, as')
EmptyL -> return $! Result empty Nothing
a :< as' -> return $! Result as' (Just a)

{- | A bounded LIFO buffer that forgets the oldest values when the size is above a given threshold.
If the buffer is empty, it will return 'Nothing'.
Expand All @@ -36,14 +37,14 @@ lifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ take threshold $ a <| as
amGet as = case viewl as of
EmptyL -> return (Nothing, empty)
a :< as' -> return (Just a, as')
EmptyL -> return $!Result empty Nothing
a :< as' -> return $!Result as' (Just a)

-- | An unbounded LIFO buffer that also returns its current size.
lifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int)
lifoWatch = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewl as of
EmptyL -> return ((Nothing, 0), empty)
a :< as' -> return ((Just a, length as'), as')
EmptyL -> return $! Result empty (Nothing, 0)
a :< as' -> return $! Result as' (Just a, length as')
21 changes: 8 additions & 13 deletions rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ These are used in many other modules implementing 'ResamplingBuffer's.
module FRP.Rhine.ResamplingBuffer.Timeless where

import FRP.Rhine.ResamplingBuffer
import Data.Automaton.Result

{- | An asynchronous, effectful Mealy machine description.
(Input and output do not happen simultaneously.)
Expand All @@ -16,7 +17,7 @@ import FRP.Rhine.ResamplingBuffer
data AsyncMealy m s a b = AsyncMealy
{ amPut :: s -> a -> m s
-- ^ Given the previous state and an input value, return the new state.
, amGet :: s -> m (b, s)
, amGet :: s -> m (Result s b)
-- ^ Given the previous state, return an output value and a new state.
}
{- FOURMOLU_ENABLE -}
Expand All @@ -30,28 +31,22 @@ data AsyncMealy m s a b = AsyncMealy
-}
timelessResamplingBuffer ::
(Monad m) =>
AsyncMealy m s a b -> -- The asynchronous Mealy machine from which the buffer is built

-- | The asynchronous Mealy machine from which the buffer is built
AsyncMealy m s a b ->
-- | The initial state
s ->
ResamplingBuffer m cl1 cl2 a b
timelessResamplingBuffer AsyncMealy {..} = go
timelessResamplingBuffer AsyncMealy {..} buffer = ResamplingBuffer {..}
where
go s =
let
put _ a = go <$> amPut s a
get _ = do
(b, s') <- amGet s
return (b, go s')
in
ResamplingBuffer {..}
put _ a s = amPut s a
get _ = amGet

-- | A resampling buffer that only accepts and emits units.
trivialResamplingBuffer :: (Monad m) => ResamplingBuffer m cl1 cl2 () ()
trivialResamplingBuffer =
timelessResamplingBuffer
AsyncMealy
{ amPut = const (const (return ()))
, amGet = const (return ((), ()))
, amGet = const (return $! Result () ())
}
()
Loading

0 comments on commit 1927977

Please sign in to comment.