From fd1edd3dfeffc4e1289999533557e41dc65132c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20B=C3=A4renz?= Date: Thu, 18 Apr 2024 21:38:16 +0200 Subject: [PATCH] WIP resbuf initial --- automaton/src/Data/Stream/Internal.hs | 1 + .../FRP/Rhine/Reactimation/ClockErasure.hs | 15 ++--- rhine/src/FRP/Rhine/ResamplingBuffer.hs | 29 +++++---- rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs | 25 ++++---- .../src/FRP/Rhine/ResamplingBuffer/Collect.hs | 9 +-- rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs | 14 +++-- .../FRP/Rhine/ResamplingBuffer/KeepLast.hs | 3 +- rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs | 13 ++-- .../FRP/Rhine/ResamplingBuffer/Timeless.hs | 21 +++---- rhine/src/FRP/Rhine/ResamplingBuffer/Util.hs | 60 +++++++++++-------- 10 files changed, 104 insertions(+), 86 deletions(-) diff --git a/automaton/src/Data/Stream/Internal.hs b/automaton/src/Data/Stream/Internal.hs index 6885da2e1..ca662233d 100644 --- a/automaton/src/Data/Stream/Internal.hs +++ b/automaton/src/Data/Stream/Internal.hs @@ -1,6 +1,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE StrictData #-} +-- | Helper functions and types for Data.Stream. You will typically not need them. module Data.Stream.Internal where -- | A strict tuple type diff --git a/rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs b/rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs index 7f9caeafb..846962dbb 100644 --- a/rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs +++ b/rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs @@ -15,6 +15,7 @@ import Control.Monad (join) -- automaton import Data.Automaton.Trans.Reader +import Data.Stream.Result (Result (..)) -- rhine import FRP.Rhine.ClSF hiding (runReaderS) @@ -98,17 +99,17 @@ eraseClockSN initialTime (Precompose clsf sn) = proc (time, tag, aMaybe) -> do bMaybe <- mapMaybeS $ eraseClockClSF (inProxy proxy) initialTime clsf -< (time,,) <$> inTag proxy tag <*> aMaybe eraseClockSN initialTime sn -< (time, tag, bMaybe) -eraseClockSN initialTime (Feedback buf0 sn) = +eraseClockSN initialTime (Feedback ResamplingBuffer {buffer, put, get} sn) = let proxy = toClockProxy sn in - feedback buf0 $ proc ((time, tag, aMaybe), buf) -> do + feedback buffer $ proc ((time, tag, aMaybe), buf) -> do (cMaybe, buf') <- case inTag proxy tag of Nothing -> do returnA -< (Nothing, buf) Just tagIn -> do timeInfo <- genTimeInfo (inProxy proxy) initialTime -< (time, tagIn) - (c, buf') <- arrM $ uncurry get -< (buf, timeInfo) + Result buf' c <- arrM $ uncurry get -< (timeInfo, buf) returnA -< (Just c, buf') bdMaybe <- eraseClockSN initialTime sn -< (time, tag, (,) <$> aMaybe <*> cMaybe) case (,) <$> outTag proxy tag <*> bdMaybe of @@ -116,7 +117,7 @@ eraseClockSN initialTime (Feedback buf0 sn) = returnA -< (Nothing, buf') Just (tagOut, (b, d)) -> do timeInfo <- genTimeInfo (outProxy proxy) initialTime -< (time, tagOut) - buf'' <- arrM $ uncurry $ uncurry put -< ((buf', timeInfo), d) + buf'' <- arrM $ uncurry $ uncurry put -< ((timeInfo, d), buf') returnA -< (Just b, buf'') eraseClockSN initialTime (FirstResampling sn buf) = let @@ -149,14 +150,14 @@ eraseClockResBuf :: Time cl1 -> ResBuf m cl1 cl2 a b -> Automaton m (Either (Time cl1, Tag cl1, a) (Time cl2, Tag cl2)) (Maybe b) -eraseClockResBuf proxy1 proxy2 initialTime resBuf0 = feedback resBuf0 $ proc (input, resBuf) -> do +eraseClockResBuf proxy1 proxy2 initialTime ResamplingBuffer {buffer, put, get} = feedback buffer $ proc (input, resBuf) -> do case input of Left (time1, tag1, a) -> do timeInfo1 <- genTimeInfo proxy1 initialTime -< (time1, tag1) - resBuf' <- arrM (uncurry $ uncurry put) -< ((resBuf, timeInfo1), a) + resBuf' <- arrM (uncurry $ uncurry put) -< ((timeInfo1, a), resBuf) returnA -< (Nothing, resBuf') Right (time2, tag2) -> do timeInfo2 <- genTimeInfo proxy2 initialTime -< (time2, tag2) - (b, resBuf') <- arrM (uncurry get) -< (resBuf, timeInfo2) + Result resBuf' b <- arrM (uncurry get) -< (timeInfo2, resBuf) returnA -< (Just b, resBuf') {-# INLINE eraseClockResBuf #-} diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer.hs b/rhine/src/FRP/Rhine/ResamplingBuffer.hs index 57e6fafde..698228603 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} @@ -15,10 +16,8 @@ module FRP.Rhine.ResamplingBuffer ( ) where --- base -import Control.Arrow - -- rhine +import Data.Stream.Result import FRP.Rhine.Clock -- A quick note on naming conventions, to whoever cares: @@ -39,18 +38,23 @@ or specific to certain clocks. * 'a': The input type * 'b': The output type -} -data ResamplingBuffer m cla clb a b = ResamplingBuffer - { put :: +data ResamplingBuffer m cla clb a b = forall s. + ResamplingBuffer + { buffer :: s + -- ^ The internal state of the buffer. + , put :: TimeInfo cla -> a -> - m (ResamplingBuffer m cla clb a b) + s -> + m s -- ^ Store one input value of type 'a' at a given time stamp, - -- and return a continuation. + -- and return an updated state. , get :: TimeInfo clb -> - m (b, ResamplingBuffer m cla clb a b) + s -> + m (Result s b) -- ^ Retrieve one output value of type 'b' at a given time stamp, - -- and a continuation. + -- and an updated state. } -- | A type synonym to allow for abbreviation. @@ -62,8 +66,9 @@ hoistResamplingBuffer :: (forall c. m1 c -> m2 c) -> ResamplingBuffer m1 cla clb a b -> ResamplingBuffer m2 cla clb a b -hoistResamplingBuffer hoist ResamplingBuffer {..} = +hoistResamplingBuffer morph ResamplingBuffer {..} = ResamplingBuffer - { put = (((hoistResamplingBuffer hoist <$>) . hoist) .) . put - , get = (second (hoistResamplingBuffer hoist) <$>) . hoist . get + { put = ((morph .) .) . put + , get = (morph .) . get + , buffer } diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs index 7c965ed0d..e2d87cdf6 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs @@ -1,16 +1,16 @@ -{-# LANGUAGE RecordWildCards #-} - {- | Collect and process all incoming values statefully and with time stamps. -} module FRP.Rhine.ResamplingBuffer.ClSF where -- transformers -import Control.Monad.Trans.Reader (runReaderT) +import Control.Monad.Trans.Reader (ReaderT, runReaderT) -- automaton import Data.Automaton -import Data.Stream.Result +import Data.Stream +import Data.Stream.Optimized (toStreamT) +import Data.Stream.Result (mapResultState) -- rhine import FRP.Rhine.ClSF.Core @@ -30,16 +30,15 @@ clsfBuffer :: -- The list will contain the /newest/ element in the head. ClSF m cl2 [(TimeInfo cl1, a)] b -> ResamplingBuffer m cl1 cl2 a b -clsfBuffer = clsfBuffer' [] +clsfBuffer = clsfBuffer' . toStreamT . getAutomaton where clsfBuffer' :: (Monad m) => - [(TimeInfo cl1, a)] -> - ClSF m cl2 [(TimeInfo cl1, a)] b -> + StreamT (ReaderT [(TimeInfo cl1, a)] (ReaderT (TimeInfo cl2) m)) b -> ResamplingBuffer m cl1 cl2 a b - clsfBuffer' as automaton = ResamplingBuffer {..} - where - put ti1 a = return $ clsfBuffer' ((ti1, a) : as) automaton - get ti2 = do - Result automaton' b <- runReaderT (stepAutomaton automaton as) ti2 - return (b, clsfBuffer automaton') + clsfBuffer' StreamT {state, step} = + ResamplingBuffer + { buffer = (state, []) + , put = \ti1 a (s, as) -> return (s, (ti1, a) : as) + , get = \ti2 (s, as) -> mapResultState (,[]) <$> runReaderT (runReaderT (step s) as) ti2 + } diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs index 465f4f62a..ada05bfcc 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs @@ -11,6 +11,7 @@ module FRP.Rhine.ResamplingBuffer.Collect where import Data.Sequence -- rhine +import Data.Stream.Result (Result (..)) import FRP.Rhine.ResamplingBuffer import FRP.Rhine.ResamplingBuffer.Timeless @@ -21,7 +22,7 @@ collect :: (Monad m) => ResamplingBuffer m cl1 cl2 a [a] collect = timelessResamplingBuffer AsyncMealy {..} [] where amPut as a = return $ a : as - amGet as = return (as, []) + amGet as = return $! Result [] as {- | Reimplementation of 'collect' with sequences, which gives a performance benefit if the sequence needs to be reversed or searched. @@ -30,7 +31,7 @@ collectSequence :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Seq a) collectSequence = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ a <| as - amGet as = return (as, empty) + amGet as = return $! Result empty as {- | 'pureBuffer' collects all input values lazily in a list and processes it when output is required. @@ -41,7 +42,7 @@ pureBuffer :: (Monad m) => ([a] -> b) -> ResamplingBuffer m cl1 cl2 a b pureBuffer f = timelessResamplingBuffer AsyncMealy {..} [] where amPut as a = return (a : as) - amGet as = return (f as, []) + amGet as = return $! Result [] $! f as -- TODO Test whether strictness works here, or consider using deepSeq @@ -58,4 +59,4 @@ foldBuffer :: foldBuffer f = timelessResamplingBuffer AsyncMealy {..} where amPut b a = let !b' = f a b in return b' - amGet b = return (b, b) + amGet b = return $! Result b b diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs index 073b92b95..71276d959 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs @@ -12,6 +12,8 @@ import Prelude hiding (length, take) import Data.Sequence -- rhine + +import Data.Stream.Result (Result (..)) import FRP.Rhine.ResamplingBuffer import FRP.Rhine.ResamplingBuffer.Timeless @@ -25,8 +27,8 @@ fifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ a <| as amGet as = case viewr as of - EmptyR -> return (Nothing, empty) - as' :> a -> return (Just a, as') + EmptyR -> return $! Result empty Nothing + as' :> a -> return $! Result as' (Just a) {- | A bounded FIFO buffer that forgets the oldest values when the size is above a given threshold. If the buffer is empty, it will return 'Nothing'. @@ -36,8 +38,8 @@ fifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ take threshold $ a <| as amGet as = case viewr as of - EmptyR -> return (Nothing, empty) - as' :> a -> return (Just a, as') + EmptyR -> return $! Result empty Nothing + as' :> a -> return $! Result as' (Just a) -- | An unbounded FIFO buffer that also returns its current size. fifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int) @@ -45,5 +47,5 @@ fifoWatch = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ a <| as amGet as = case viewr as of - EmptyR -> return ((Nothing, 0), empty) - as' :> a -> return ((Just a, length as'), as') + EmptyR -> return $! Result empty (Nothing, 0) + as' :> a -> return $! Result as' (Just a, length as') diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs index 491210e59..d1a210798 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs @@ -5,6 +5,7 @@ A buffer keeping the last value, or zero-order hold. -} module FRP.Rhine.ResamplingBuffer.KeepLast where +import Data.Stream.Result (Result (..)) import FRP.Rhine.ResamplingBuffer import FRP.Rhine.ResamplingBuffer.Timeless @@ -16,5 +17,5 @@ import FRP.Rhine.ResamplingBuffer.Timeless keepLast :: (Monad m) => a -> ResamplingBuffer m cl1 cl2 a a keepLast = timelessResamplingBuffer AsyncMealy {..} where - amGet a = return (a, a) + amGet a = return $! Result a a amPut _ = return diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs index 92a61412b..c55a74b43 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs @@ -12,6 +12,7 @@ import Prelude hiding (length, take) import Data.Sequence -- rhine +import Data.Stream.Result (Result (..)) import FRP.Rhine.ResamplingBuffer import FRP.Rhine.ResamplingBuffer.Timeless @@ -25,8 +26,8 @@ lifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ a <| as amGet as = case viewl as of - EmptyL -> return (Nothing, empty) - a :< as' -> return (Just a, as') + EmptyL -> return $! Result empty Nothing + a :< as' -> return $! Result as' (Just a) {- | A bounded LIFO buffer that forgets the oldest values when the size is above a given threshold. If the buffer is empty, it will return 'Nothing'. @@ -36,8 +37,8 @@ lifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ take threshold $ a <| as amGet as = case viewl as of - EmptyL -> return (Nothing, empty) - a :< as' -> return (Just a, as') + EmptyL -> return $! Result empty Nothing + a :< as' -> return $! Result as' (Just a) -- | An unbounded LIFO buffer that also returns its current size. lifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int) @@ -45,5 +46,5 @@ lifoWatch = timelessResamplingBuffer AsyncMealy {..} empty where amPut as a = return $ a <| as amGet as = case viewl as of - EmptyL -> return ((Nothing, 0), empty) - a :< as' -> return ((Just a, length as'), as') + EmptyL -> return $! Result empty (Nothing, 0) + a :< as' -> return $! Result as' (Just a, length as') diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs index 767b1e288..341c99155 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs @@ -6,6 +6,7 @@ These are used in many other modules implementing 'ResamplingBuffer's. -} module FRP.Rhine.ResamplingBuffer.Timeless where +import Data.Stream.Result import FRP.Rhine.ResamplingBuffer {- | An asynchronous, effectful Mealy machine description. @@ -16,7 +17,7 @@ import FRP.Rhine.ResamplingBuffer data AsyncMealy m s a b = AsyncMealy { amPut :: s -> a -> m s -- ^ Given the previous state and an input value, return the new state. - , amGet :: s -> m (b, s) + , amGet :: s -> m (Result s b) -- ^ Given the previous state, return an output value and a new state. } {- FOURMOLU_ENABLE -} @@ -30,21 +31,15 @@ data AsyncMealy m s a b = AsyncMealy -} timelessResamplingBuffer :: (Monad m) => - AsyncMealy m s a b -> -- The asynchronous Mealy machine from which the buffer is built - + -- | The asynchronous Mealy machine from which the buffer is built + AsyncMealy m s a b -> -- | The initial state s -> ResamplingBuffer m cl1 cl2 a b -timelessResamplingBuffer AsyncMealy {..} = go +timelessResamplingBuffer AsyncMealy {..} buffer = ResamplingBuffer {..} where - go s = - let - put _ a = go <$> amPut s a - get _ = do - (b, s') <- amGet s - return (b, go s') - in - ResamplingBuffer {..} + put _ a s = amPut s a + get _ = amGet -- | A resampling buffer that only accepts and emits units. trivialResamplingBuffer :: (Monad m) => ResamplingBuffer m cl1 cl2 () () @@ -52,6 +47,6 @@ trivialResamplingBuffer = timelessResamplingBuffer AsyncMealy { amPut = const (const (return ())) - , amGet = const (return ((), ())) + , amGet = const (return $! Result () ()) } () diff --git a/rhine/src/FRP/Rhine/ResamplingBuffer/Util.hs b/rhine/src/FRP/Rhine/ResamplingBuffer/Util.hs index 08c058e18..e46b0c9bf 100644 --- a/rhine/src/FRP/Rhine/ResamplingBuffer/Util.hs +++ b/rhine/src/FRP/Rhine/ResamplingBuffer/Util.hs @@ -9,10 +9,13 @@ module FRP.Rhine.ResamplingBuffer.Util where import Control.Monad.Trans.Reader (runReaderT) -- automaton -import Data.Stream.Result (Result (..)) +import Data.Stream (StreamT (..)) +import Data.Stream.Internal (JointState (..)) +import Data.Stream.Optimized (toStreamT) +import Data.Stream.Result (Result (..), mapResultState) -- rhine -import FRP.Rhine.ClSF +import FRP.Rhine.ClSF hiding (step) import FRP.Rhine.Clock import FRP.Rhine.ResamplingBuffer @@ -28,13 +31,16 @@ infix 2 >>-^ ResamplingBuffer m cl1 cl2 a b -> ClSF m cl2 b c -> ResamplingBuffer m cl1 cl2 a c -resBuf >>-^ clsf = ResamplingBuffer put_ get_ +resbuf >>-^ clsf = helper resbuf $ toStreamT $ getAutomaton clsf where - put_ theTimeInfo a = (>>-^ clsf) <$> put resBuf theTimeInfo a - get_ theTimeInfo = do - (b, resBuf') <- get resBuf theTimeInfo - Result clsf' c <- stepAutomaton clsf b `runReaderT` theTimeInfo - return (c, resBuf' >>-^ clsf') + helper ResamplingBuffer { buffer, put, get} StreamT { state, step} = ResamplingBuffer + { buffer = JointState buffer state, + put = \theTimeInfo a (JointState b s) -> (`JointState` s) <$> put theTimeInfo a b + , get = \theTimeInfo (JointState b s) -> do + Result b' b <- get theTimeInfo b + Result s' c <- step s `runReaderT` b `runReaderT` theTimeInfo + return $! Result (JointState b' s') c + } infix 1 ^->> @@ -44,13 +50,17 @@ infix 1 ^->> ClSF m cl1 a b -> ResamplingBuffer m cl1 cl2 b c -> ResamplingBuffer m cl1 cl2 a c -clsf ^->> resBuf = ResamplingBuffer put_ get_ +clsf ^->> resBuf = helper (toStreamT (getAutomaton clsf)) resBuf where - put_ theTimeInfo a = do - Result clsf' b <- stepAutomaton clsf a `runReaderT` theTimeInfo - resBuf' <- put resBuf theTimeInfo b - return $ clsf' ^->> resBuf' - get_ theTimeInfo = second (clsf ^->>) <$> get resBuf theTimeInfo + helper StreamT {state, step} ResamplingBuffer {buffer, put, get} = ResamplingBuffer + { + buffer = JointState buffer state + , put = \theTimeInfo a (JointState buf s) -> do + Result s' b <- step s `runReaderT` a `runReaderT` theTimeInfo + buf' <- put theTimeInfo b buf + return $! JointState buf' s' + , get = \theTimeInfo (JointState buf s) -> mapResultState (`JointState` s) <$> get theTimeInfo buf + } infixl 4 *-* @@ -60,16 +70,18 @@ infixl 4 *-* ResamplingBuffer m cl1 cl2 a b -> ResamplingBuffer m cl1 cl2 c d -> ResamplingBuffer m cl1 cl2 (a, c) (b, d) -resBuf1 *-* resBuf2 = ResamplingBuffer put_ get_ - where - put_ theTimeInfo (a, c) = do - resBuf1' <- put resBuf1 theTimeInfo a - resBuf2' <- put resBuf2 theTimeInfo c - return $ resBuf1' *-* resBuf2' - get_ theTimeInfo = do - (b, resBuf1') <- get resBuf1 theTimeInfo - (d, resBuf2') <- get resBuf2 theTimeInfo - return ((b, d), resBuf1' *-* resBuf2') +ResamplingBuffer buf1 put1 get1 *-* ResamplingBuffer buf2 put2 get2 = ResamplingBuffer + { + buffer = JointState buf1 buf2 + , put = \theTimeInfo (a, c) (JointState s1 s2) -> do + s1' <- put1 theTimeInfo a s1 + s2' <- put2 theTimeInfo c s2 + return $! JointState s1' s2' + , get = \theTimeInfo (JointState s1 s2) -> do + Result s1' b <- get1 theTimeInfo s1 + Result s2' d <- get2 theTimeInfo s2 + return $! Result (JointState s1' s2') (b, d) + } infixl 4 &-&