From dd12c4f7cbae814f62c48cde6b868a0f60391eee Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 14 Feb 2024 11:25:03 +0530 Subject: [PATCH 01/24] Add maxThreads 1 benchmarks for parConcat --- .../Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs index b234ec30b4..723ffedb17 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs @@ -147,10 +147,10 @@ concatFmapStreamsWith f outer inner n = $ Async.parConcat f $ fmap (sourceUnfoldrM inner) (sourceUnfoldrM outer n) -o_1_space_concatMap :: Int -> (Config -> Config) -> [Benchmark] -o_1_space_concatMap value f = +o_1_space_concatMap :: String -> Int -> (Config -> Config) -> [Benchmark] +o_1_space_concatMap label value f = value2 `seq` - [ bgroup "concat" + [ bgroup ("concat" ++ label) [ benchIO "parConcatMap (n of 1)" (concatMapStreamsWith f value 1) , benchIO "parConcatMap (sqrt x of sqrt x)" @@ -198,7 +198,8 @@ allBenchmarks moduleName wide modifier value = [ bgroup (o_1_space_prefix moduleName) $ concat [ o_1_space_mapping value modifier , o_1_space_concatFoldable value modifier - , o_1_space_concatMap value modifier + , o_1_space_concatMap "" value modifier + , o_1_space_concatMap "-maxThreads-1" value (modifier . Async.maxThreads 1) , o_1_space_joining value modifier ] ++ if wide then [] else o_1_space_outerProduct value modifier , bgroup (o_n_heap_prefix moduleName) $ concat From 0ea617de2af1cc62663abc968cee017806ce182c Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 13 Feb 2024 13:40:15 +0530 Subject: [PATCH 02/24] Fix work queue for parConcatMap --- src/Streamly/Internal/Data/Stream/Channel/Operations.hs | 8 ++++++-- src/Streamly/Internal/Data/Stream/Concurrent.hs | 9 ++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index de149f240d..f73fd338af 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -90,9 +90,13 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) -- be read back from the SVar using 'fromSVar'. {-# INLINE toChannelK #-} toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m () -toChannelK sv m = do +toChannelK chan m = do runIn <- askRunInIO - liftIO $ enqueue sv False (runIn, m) + -- The second argument to enqeue is used in case of lazy on-demand + -- scheduling. See comments in mkEnqueue. By default we enqueue on the + -- inner work q (True). When using concatMap the outer loop is enqueued on + -- the outer work q. + liftIO $ enqueue chan True (runIn, m) -- INLINE for fromStreamK/toStreamK fusion diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index a8c3bcaa04..cea4f1c8f6 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -295,7 +295,14 @@ mkEnqueue chan runner = do runInIO <- askRunInIO return $ let q stream = do - -- Enqueue the outer loop + -- When using parConcatMap with lazy dispatch we enqueue the + -- outer stream tail and then map a stream generator on the + -- head, which is also queued. If we pick both head and tail + -- with equal priority we may keep blowing up the tail into + -- more and more streams. To avoid that we give preference to + -- the inner streams when picking up for execution. This + -- requires two work queues, one for outer stream and one for + -- inner. Here we enqueue the outer loop stream. liftIO $ enqueue chan False (runInIO, runner q stream) -- XXX In case of eager dispatch we can just directly dispatch -- a worker with the tail stream here rather than first queuing From 184168ca53cd3f4336f8a65fe1a3eacace6afe00 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 14 Feb 2024 11:26:54 +0530 Subject: [PATCH 03/24] Update docs for stream channels --- src/Streamly/Internal/Data/Channel/Types.hs | 12 +- src/Streamly/Internal/Data/Channel/Worker.hs | 15 +- src/Streamly/Internal/Data/Stream/Channel.hs | 24 +-- .../Internal/Data/Stream/Channel/Append.hs | 41 +++- .../Internal/Data/Stream/Channel/Consumer.hs | 27 ++- .../Data/Stream/Channel/Dispatcher.hs | 49 +++-- .../Data/Stream/Channel/Interleave.hs | 10 +- .../Data/Stream/Channel/Operations.hs | 70 +++++-- .../Internal/Data/Stream/Channel/Type.hs | 181 +++++++++++++----- .../Internal/Data/Stream/Concurrent.hs | 43 +++-- 10 files changed, 353 insertions(+), 119 deletions(-) diff --git a/src/Streamly/Internal/Data/Channel/Types.hs b/src/Streamly/Internal/Data/Channel/Types.hs index 0a758b65fd..74a3d6409c 100644 --- a/src/Streamly/Internal/Data/Channel/Types.hs +++ b/src/Streamly/Internal/Data/Channel/Types.hs @@ -394,6 +394,13 @@ defaultConfig = Config -- execution beyond the limit. -- -- 'Nothing' means there is no limit. +-- +-- Keep in mind that checking this limit all the time has a performance +-- overhead. +-- +-- Known Bugs: currently this works only when rate is specified. +-- Known Bugs: for ordered streams sometimes the actual count is less than +-- expected. maxYields :: Maybe Int64 -> Config -> Config maxYields lim st = st { _yieldLimit = @@ -499,7 +506,10 @@ getStreamLatency = _streamLatency -- XXX Rename to "inspect" --- | Print debug information about the 'Channel' when the stream ends. +-- | Print debug information about the 'Channel' when the stream ends. When the +-- stream does not end normally, the channel debug information is printed when +-- the channel is garbage collected. If you are expecting but not seeing the +-- debug info try adding a 'performMajorGC' before the program ends. -- inspect :: Bool -> Config -> Config inspect flag st = st { _inspect = flag } diff --git a/src/Streamly/Internal/Data/Channel/Worker.hs b/src/Streamly/Internal/Data/Channel/Worker.hs index 1eff9e9849..097314541a 100644 --- a/src/Streamly/Internal/Data/Channel/Worker.hs +++ b/src/Streamly/Internal/Data/Channel/Worker.hs @@ -89,6 +89,7 @@ sendEvent q bell msg = do workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64)) workerCollectLatency winfo = do + -- XXX make this unboxed IORef (cnt0, t0) <- readIORef (workerLatencyStart winfo) cnt1 <- readIORef (workerYieldCount winfo) let cnt = cnt1 - cnt0 @@ -318,6 +319,7 @@ updateLatencyAndCheckRate :: -> Count -> IO Bool updateLatencyAndCheckRate workerLimit workerCount rateInfo workerInfo ycnt = do + -- XXX make this an unboxed IORef i <- readIORef (workerPollingInterval rateInfo) -- XXX use generation count to check if the interval has been updated if i /= 0 && (ycnt `mod` i) == 0 @@ -354,6 +356,16 @@ incrWorkerYieldCount workerLimit workerCount rateInfo workerInfo = do -- or when we yield to heap. -- | Add a 'ChildYield' event to the channel's output queue. +-- +-- This is a wrapper over 'sendEvent', it does a few more things: +-- +-- * performs a buffer limit check, returns False if exceeded +-- +-- When rate control is enabled and 'WorkerInfo' is supplied:: +-- +-- * increments the worker yield count +-- * periodically pushes the worker latency stats to the channel +-- * performs a rate limit check, returns False if exceeded {-# INLINE sendYield #-} sendYield :: Limit -- ^ Channel's max buffer limit @@ -394,7 +406,8 @@ workerStopUpdate winfo info = do i <- readIORef (workerPollingInterval info) when (i /= 0) $ workerUpdateLatency info winfo --- | Add a 'ChildStop' event to the channel's output queue. +-- | Add a 'ChildStop' event to the channel's output queue. When rate control +-- is enabled, it pushes the worker latency stats to the channel. {-# INLINABLE sendStop #-} sendStop :: IORef Int -- ^ Channel's current worker count diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index 7f53b1f64b..3758ee125a 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -11,19 +11,18 @@ module Streamly.Internal.Data.Stream.Channel module Streamly.Internal.Data.Stream.Channel.Type -- ** Allocation - , newChannel , module Streamly.Internal.Data.Stream.Channel.Append , module Streamly.Internal.Data.Stream.Channel.Interleave + , newChannel + -- ** Event Processing Loop , module Streamly.Internal.Data.Stream.Channel.Dispatcher , module Streamly.Internal.Data.Stream.Channel.Consumer - - -- ** Conversion , module Streamly.Internal.Data.Stream.Channel.Operations -- ** Evaluation - , withChannel , withChannelK + , withChannel -- quiesceChannel -- wait for running tasks but do not schedule any more. ) where @@ -54,14 +53,15 @@ newChannel modifier = then newInterleaveChannel modifier else newAppendChannel modifier --- | Allocate a channel and evaluate the stream using the channel and the --- supplied evaluator function. The evaluator is run in a worker thread. +-- | Allocate a channel and evaluate the stream concurrently using the channel +-- and the supplied evaluator function. The evaluator is run in a worker +-- thread. {-# INLINE withChannelK #-} withChannelK :: MonadAsync m => - (Config -> Config) - -> K.StreamK m a - -> (Channel m b -> K.StreamK m a -> K.StreamK m b) - -> K.StreamK m b + (Config -> Config) -- ^ config modifier + -> K.StreamK m a -- ^ input stream + -> (Channel m b -> K.StreamK m a -> K.StreamK m b) -- ^ stream evaluator + -> K.StreamK m b -- ^ output stream withChannelK modifier input evaluator = K.concatEffect action where @@ -71,8 +71,8 @@ withChannelK modifier input evaluator = K.concatEffect action toChannelK chan (evaluator chan input) return $ fromChannelK chan --- | Allocate a channel and evaluate the stream using the channel and the --- supplied evaluator function. The evaluator is run in a worker thread. +-- | A wrapper over 'withChannelK', converts 'Stream' to 'StreamK' and invokes +-- 'withChannelK'. {-# INLINE withChannel #-} withChannel :: MonadAsync m => (Config -> Config) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 069c565050..2878fc257b 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -48,15 +48,33 @@ import Streamly.Internal.Data.Stream.Channel.Type -- Concurrent streams with first-come-first serve results ------------------------------------------------------------------------------ --- Note: For purely right associated expressions this queue should have at most --- one element. It grows to more than one when we have left associcated --- expressions. Large left associated compositions can grow this to a --- large size +-- | We use two queues, one outer and the other inner. When entries are present +-- in both the queues, inner queue is given preference when dequeuing. +-- +-- Normally entries are queued to the inner queue only. The parConcatMap +-- implementation makes use of the outer queue as well. The tail of the outer +-- stream is queued to the outer queue whereas the inner loop streams are +-- queued to the inner queue so that inner streams are given preference, +-- otherwise we might just keep generating streams from the outer stream and +-- not use them fast enough. We need to go depth first rather than breadth +-- first. +-- +-- If we do not use outer and inner distinction there are two problematic +-- cases. The outer stream gets executed faster than inner and may keep adding +-- more entries. When we queue it back on the work queue it may be the first +-- one to be picked if it is on top of the LIFO. +-- +-- Normally, when using parConcatMap the outer queue would have only one entry +-- which is the tail of the outer stream. However, when manually queueing +-- streams on the channel using 'toChannelK' you could queue to outer or inner, +-- in which case outer queue may have multiple entries. +-- {-# INLINE enqueueLIFO #-} enqueueLIFO :: Channel m a + -- | (outer queue, inner queue) -> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) - -> Bool + -> Bool -- True means put it on inner queue, otherwise outer -> (RunInIO m, K.StreamK m a) -> IO () enqueueLIFO sv q inner m = do @@ -64,10 +82,19 @@ enqueueLIFO sv q inner m = do if inner then (xs, m : ys) else (m : xs, ys) ringDoorBell (doorBellOnWorkQ sv) (outputDoorBell sv) -data QResult a = QEmpty | QOuter a | QInner a +-- | We need to know whether an entry was dequeued from the outer q or inner q +-- because when we consume it partially and q it back on the q we need to know +-- which q to put it back on. +data QResult a = + QEmpty + | QOuter a -- ^ Entry dequeued from outer q + | QInner a -- ^ Entry dequeued from inner q +-- | Dequeues from inner q first and if it is empty then dequeue from the +-- outer. {-# INLINE dequeue #-} dequeue :: MonadIO m => + -- | (outer queue, inner queue) IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) -> m (QResult (RunInIO m, K.StreamK m a)) dequeue qref = @@ -1038,6 +1065,8 @@ getLifoSVar mrun cfg = do -- | Create a new async style concurrent stream evaluation channel. The monad -- state used to run the stream actions is taken from the call site of -- newAppendChannel. +-- +-- This is a low level API, use newChannel instead. {-# INLINABLE newAppendChannel #-} {-# SPECIALIZE newAppendChannel :: (Config -> Config) -> IO (Channel IO a) #-} newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs index 32f8812849..bf173649ef 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs @@ -8,11 +8,12 @@ -- module Streamly.Internal.Data.Stream.Channel.Consumer ( - -- ** Reading - readOutputQPaced - , readOutputQBounded - , postProcessPaced + -- *** Reading Events + -- | Low level functions used to build 'fromChannelK'. + readOutputQBounded + , readOutputQPaced , postProcessBounded + , postProcessPaced ) where @@ -36,6 +37,11 @@ readOutputQChan sv = do let ss = if svarInspectMode sv then Just (svarStats sv) else Nothing in readOutputQRaw (outputQueue sv) ss +-- | Read the channel's output queue. When there is no output dispatches +-- workers and waits for output (using 'sendWorkerWait'). Always ensures that +-- there is at least one outstanding worker. +-- +-- To be used as 'readOutputQ' function for the channel. readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a] readOutputQBounded eagerEval sv = do (list, len) <- liftIO $ readOutputQChan sv @@ -64,6 +70,11 @@ readOutputQBounded eagerEval sv = do sendWorkerWait eagerEval sendWorkerDelay (dispatchWorker 0) sv liftIO (fst `fmap` readOutputQChan sv) +-- | Same as 'readOutputQBounded' but uses 'dispatchWorkerPaced' to +-- dispatch workers with rate control. +-- +-- To be used as 'readOutputQ' function for the channel when rate control is +-- on. readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a] readOutputQPaced sv = do (list, len) <- liftIO $ readOutputQChan sv @@ -82,6 +93,11 @@ readOutputQPaced sv = do sendWorkerWait False sendWorkerDelayPaced dispatchWorkerPaced sv liftIO (fst `fmap` readOutputQChan sv) +-- | If there is work to do dispatch as many workers as the target rate +-- requires. +-- +-- To be used as 'postProcess' function for the channel when rate control is +-- enabled. postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool postProcessPaced sv = do workersDone <- allThreadsDone (workerThreads sv) @@ -101,6 +117,9 @@ postProcessPaced sv = do return r else return False +-- | If there is work to do ensure that we have at least one worker disptached. +-- +-- To be used as 'postProcess' function for the channel. postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool postProcessBounded sv = do workersDone <- allThreadsDone (workerThreads sv) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs index 49761008ad..0b968d5f17 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs @@ -9,14 +9,16 @@ -- module Streamly.Internal.Data.Stream.Channel.Dispatcher ( - -- ** Dispatching + -- *** Worker Dispatching + -- | Low level functions used to build readOutputQ and postProcess + -- functions. pushWorker , dispatchWorker , dispatchWorkerPaced , sendWorkerWait - , startChannel , sendWorkerDelay , sendWorkerDelayPaced + , startChannel -- XXX bootstrap? ) where @@ -42,14 +44,22 @@ import Streamly.Internal.Data.Stream.Channel.Type -- Dispatching workers ------------------------------------------------------------------------------- +-- | Low level API to create a worker. Forks a thread which executes the +-- 'workLoop' of the channel. {-# NOINLINE pushWorker #-} -pushWorker :: MonadRunInIO m => Count -> Channel m a -> m () +pushWorker :: MonadRunInIO m => + Count -- ^ max yield limit for the worker + -> Channel m a + -> m () pushWorker yieldMax sv = do liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 when (svarInspectMode sv) $ recordMaxWorkers (workerCount sv) (svarStats sv) -- This allocation matters when significant number of workers are being -- sent. We allocate it only when needed. + -- + -- XXX WorkerInfo is required for maxYields to work even if rate control is + -- not enabled. winfo <- case yieldRateInfo sv of Nothing -> return Nothing @@ -130,8 +140,17 @@ checkMaxBuffer active sv = do (_, n) <- liftIO $ readIORef (outputQueue sv) return $ fromIntegral lim > n + active +-- | Higher level API to dispatch a worker, it uses 'pushWorker' to create a +-- worker. Dispatches a worker only if: +-- +-- * the channel has work to do +-- * max thread count is not reached +-- * max buffer limit is not reached +-- dispatchWorker :: MonadRunInIO m => - Count -> Channel m a -> m Bool + Count -- ^ max yield limit for the worker + -> Channel m a + -> m Bool -- ^ can disptach more workers dispatchWorker yieldCount sv = do -- XXX in case of Ahead streams we should not send more than one worker -- when the work queue is done but heap is not done. @@ -172,11 +191,10 @@ dispatchWorker yieldCount sv = do -- where we keep dispatching and they keep returning. So we must have exactly -- the same logic for not dispatching and for returning. --- | Dispatcher with rate control. The number of workers to be dispatched are --- decided based on the target rate. +-- | Like 'dispatchWorker' but with rate control. The number of workers to be +-- dispatched are decided based on the target rate. Uses 'dispatchWorker' to +-- actually dispatch when required. -- --- This is called when reading the output queue of the channel and after all --- the items read in one batch are all processed. dispatchWorkerPaced :: MonadRunInIO m => Channel m a -> m Bool -- ^ True means can dispatch more @@ -289,12 +307,19 @@ dispatchWorkerPaced sv = do then dispatchN (n - 1) else return False +-- | Dispatches as many workers as it can until output is seen in the event +-- queue of the channel. If the dispatcher function returns 'False' then no +-- more dispatches can be done. If no more dispatches are possible blocks until +-- output arrives in the event queue. +-- +-- When this function returns we are sure that there is some output available. +-- {-# NOINLINE sendWorkerWait #-} sendWorkerWait :: MonadIO m - => Bool - -> (Channel m a -> IO ()) - -> (Channel m a -> m Bool) + => Bool -- ^ 'eager' option is on + -> (Channel m a -> IO ()) -- ^ delay function + -> (Channel m a -> m Bool) -- ^ dispatcher function -> Channel m a -> m () sendWorkerWait eagerEval delay dispatch sv = go @@ -398,9 +423,11 @@ startChannel chan = do then liftIO $ threadDelay maxBound else pushWorker 1 chan +-- | Noop as of now. sendWorkerDelayPaced :: Channel m a -> IO () sendWorkerDelayPaced _ = return () +-- | Noop as of now. sendWorkerDelay :: Channel m a -> IO () sendWorkerDelay _sv = -- XXX we need a better way to handle this than hardcoded delays. The diff --git a/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs b/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs index dcdfa161d4..a981127053 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs @@ -39,7 +39,7 @@ data WorkerStatus = Continue | Suspend -- XXX This is not strictly round-robin as the streams that are faster may -- yield more elements than the ones that are slower. Also, when streams -- suspend due to buffer getting full they get added to the queue in a random --- order. +-- order. Document this under interleaved config option or fix it. {-# INLINE enqueueFIFO #-} enqueueFIFO :: @@ -239,10 +239,12 @@ getFifoSVar mrun cfg = do -- XXX GHC: If instead of MonadAsync we use (MonadIO m, MonadBaseControl IO m) -- constraint we get a 2x perf regression. Need to look into that. --- --- | Create a new async style concurrent stream evaluation channel. The monad --- state used to run the stream actions is taken from the call site of + +-- | Create a new 'interleaved' style concurrent stream evaluation channel. The +-- monad state used to run the stream actions is taken from the call site of -- newInterleaveChannel. +-- +-- This is a low level API, use newChannel instead. {-# INLINABLE newInterleaveChannel #-} {-# SPECIALIZE newInterleaveChannel :: (Config -> Config) -> IO (Channel IO a) #-} newInterleaveChannel :: MonadAsync m => diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index f73fd338af..2aedde5ad3 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -17,10 +17,13 @@ module Streamly.Internal.Data.Stream.Channel.Operations ( - toChannel - , toChannelK + -- *** Reading Stream + fromChannelK , fromChannel - , fromChannelK + + -- ** Enqueuing Work + , toChannelK + , toChannel ) where @@ -86,8 +89,24 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) -- XXX Should be a Fold, singleton API could be called joinChannel, or the fold -- can be called joinChannel. --- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then --- be read back from the SVar using 'fromSVar'. +-- | High level function to enqueue a work item on the channel. The fundamental +-- unit of work is a stream. Each stream enqueued on the channel is picked up +-- and evaluated by a worker thread. The worker evaluates the stream it picked +-- up serially. When multiple streams are queued on the channel each stream can +-- be evaluated concurrently by different workers. +-- +-- Note that the items in each stream are not concurrently evaluated, streams +-- are fundamentally serial, therefore, elements in one particular stream will +-- be generated serially one after the other. Only two or more streams can be +-- run concurrently with each other. +-- +-- Items from each evaluated streams are queued to the same output queue of the +-- channel which can be read using 'fromChannelK'. 'toChannelK' can be called +-- multiple times to enqueue multiple streams on the channel. +-- +-- The fundamental unit of work is a stream. If you want to run single actions +-- concurrently, wrap each action into a singleton stream and queue all those +-- streams on the channel. {-# INLINE toChannelK #-} toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m () toChannelK chan m = do @@ -100,7 +119,7 @@ toChannelK chan m = do -- INLINE for fromStreamK/toStreamK fusion --- | Send a stream to a given channel for concurrent evaluation. +-- | A wrapper over 'toChannelK' for 'Stream' type. {-# INLINE toChannel #-} toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m () toChannel chan = toChannelK chan . Stream.toStreamK @@ -184,10 +203,33 @@ inspect $ hasNoTypeClassesExcept 'fromChannelRaw ] #endif --- XXX fromChannel Should not be called multiple times, we can add a --- safeguard for that. Or we can replicate the stream so that we can distribute --- it to multiple consumers. or should we use an explicit dupChannel for that? +-- XXX Add a lock in the channel so that fromChannel cannot be called multiple +-- times. +-- +-- XXX Add an option to block the consumer rather than stopping the stream if +-- the work queue gets over. +-- | Draw a stream from a concurrent channel. The stream consists of the +-- evaluated values from the input streams that were enqueued on the channel +-- using 'toChannelK'. +-- +-- This is the event processing loop for the channel which does two +-- things, (1) dispatch workers, (2) process the events sent by the workers. +-- Workers are dispatched based on the channel's configuration settings. +-- +-- The stream stops and the channel is shutdown if any of the following occurs: +-- +-- * the work queue becomes empty +-- * channel's max yield limit is reached +-- * an exception is thrown by a worker +-- * 'shutdown' is called on the channel +-- +-- Before the channel stops, all the workers are drained and no more workers +-- are dispatched. When the channel is garbage collected a 'ThreadAbort' +-- exception is thrown to all pending workers. If 'inspect' option is enabled +-- then channel's stats are printed on stdout when the channel stops. +-- +-- CAUTION! This API must not be called more than once on a channel. {-# INLINE fromChannelK #-} fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a fromChannelK sv = @@ -207,19 +249,13 @@ fromChannelK sv = when (svarInspectMode sv) $ do r <- liftIO $ readIORef (svarStopTime (svarStats sv)) when (isNothing r) $ - printSVar (dumpChannel sv) "SVar Garbage Collected" + printSVar (dumpChannel sv) "Channel Garbage Collected" cleanupSVar (workerThreads sv) -- If there are any SVars referenced by this SVar a GC will prompt -- them to be cleaned up quickly. when (svarInspectMode sv) performMajorGC --- | Generate a stream of results from concurrent evaluations from a channel. --- Evaluation of the channel does not start until this API is called. This API --- must not be called more than once on a channel. It kicks off evaluation of --- the channel by dispatching concurrent workers and ensures that as long there --- is work queued on the channel workers are dispatched proportional to the --- demand by the consumer. --- +-- | A wrapper over 'fromChannelK' for 'Stream' type. {-# INLINE fromChannel #-} fromChannel :: MonadAsync m => Channel m a -> Stream m a fromChannel = Stream.fromStreamK . fromChannelK diff --git a/src/Streamly/Internal/Data/Stream/Channel/Type.hs b/src/Streamly/Internal/Data/Stream/Channel/Type.hs index ff7a4f5139..45d5ca85c5 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Type.hs @@ -38,11 +38,13 @@ module Streamly.Internal.Data.Stream.Channel.Type -- *** Diagnostics , inspect - -- ** Primitives + -- ** Sending Worker Events , yieldWith , stopWith , exceptionWith , shutdown + + -- ** Diagnostics , dumpChannel ) where @@ -74,17 +76,29 @@ import Streamly.Internal.Data.Channel.Types -- | A mutable channel to evaluate multiple streams concurrently and provide -- the combined results as output stream. +-- +-- There are only two actors working on the channel data structure, the event +-- processing loop (single thread), and the workers (multiple threads). Locking +-- notes are provided below for concurrent access. data Channel m a = Channel { + -- XXX Do we need this? We store the runner in the work q, is that enough? + -- This seems to be used only by the 'ordered' stream as of now. + -- | Runner for the monadic actions in the stream. Captures the monad -- state at the point where the channel was created and uses the same -- state to run all actions. svarMrun :: RunInIO m --------------------------------------------------------------------------- - -- FORWARD FLOW: Flow of data from the workers to the consumer + -- Output queue related --------------------------------------------------------------------------- + -- | Maximum size of the 'outputQueue'. The actual worst case buffer could + -- be double of this as the event loop may read the queue and the workers + -- may fill it up even before the event loop has started consuming. + , maxBufferLimit :: Limit + -- XXX For better efficiency we can try a preallocated array type (perhaps -- something like a vector) that allows an O(1) append. That way we will -- avoid constructing and reversing the list. Possibly we can also avoid @@ -98,91 +112,139 @@ data Channel m a = Channel -- XXX We can send a bundle of events of one type coaleseced together in an -- unboxed structure. - -- | (events, count): output queue of the channel. This is where the - -- workers queue the results. + -- | (events, count): worker event queue of the channel. This is where the + -- workers queue the results and other events. -- - -- [LOCKING] Frequently locked. This is locked and updated by workers - -- on each yield, and locked, updated by the consumer thread once in a - -- while for reading. Workers' locking contention may be high if there are - -- a large number of workers. + -- [LOCKING] Frequently locked. This is locked and updated by workers on + -- each yield, and locked, updated by the event loop thread once in a while + -- for reading. Workers' locking contention may be high if there are a + -- large number of workers. , outputQueue :: IORef ([ChildEvent a], Int) - -- | Door bell for workers to wakeup the consumer. + -- | Door bell for workers to wakeup the event loop. -- -- [LOCKING] Infrequently locked. Used only when the 'outputQueue' -- transitions from empty to non-empty, or a work item is queued by a - -- worker to the work queue and 'doorBellOnWorkQ' is set by the consumer. + -- worker to the work queue and 'doorBellOnWorkQ' is set by the event loop. , outputDoorBell :: MVar () -- XXX Can we use IO instead of m here? - , readOutputQ :: m [ChildEvent a] -- XXX remove - , postProcess :: m Bool -- XXX remove - - --------------------------------------------------------------------------- - -- Scheduling -- - --------------------------------------------------------------------------- - -- Combined/aggregate parameters + -- | Function to read the output queue of the channel, depends on the rate + -- control option. + , readOutputQ :: m [ChildEvent a] - -- | This is capped to 'maxBufferLimit' if set to more than that. Otherwise - -- potentially each worker may yield one value to the buffer in the worst - -- case exceeding the requested buffer size. - , maxWorkerLimit :: Limit + -- | Function to invoke after all the events read in a batch are processed + -- i.e. before we go on to read the next batch, depends on the rate control + -- option. + , postProcess :: m Bool - -- | Maximum size of the 'outputQueue'. The actual worst case buffer could - -- be double of this as the consumer may read the queue and the workers may - -- fill it up even before the consumer has started consuming. - , maxBufferLimit :: Limit + --------------------------------------------------------------------------- + -- Work and rate control + --------------------------------------------------------------------------- -- | Tracks how many yields are remaining before the channel stops, used -- when 'maxYields' option is enabled. -- - -- [LOCKING] Read only access by consumer when dispatching a worker. + -- [LOCKING] Read only access by event loop when dispatching a worker. -- Decremented by workers when picking work and undo decrement if the -- worker does not yield a value. , remainingWork :: Maybe (IORef Count) + -- XXX We make this isChannelDone which should not include isQueueDone. + -- + -- | Determine if there is no more work to do. When 'maxYields' is set for + -- the channel we may be done even if the work queue still has work. + , isWorkDone :: IO Bool + -- | Rate control information for the channel used when 'rate' control is -- enabled, , yieldRateInfo :: Maybe YieldRateInfo - , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO () -- XXX remove - , eagerDispatch :: m () - , isWorkDone :: IO Bool - , isQueueDone :: IO Bool + --------------------------------------------------------------------------- + -- Work queue related + --------------------------------------------------------------------------- -- | When set to True, ring 'outputDoorBell' when a work item is queued on - -- the work queue. + -- the work queue. This is set by the dispatcher before going to sleep. It + -- wants to be woken up whenever the work queue got more work to do so that + -- it can dispatch a worker. , doorBellOnWorkQ :: IORef Bool - , workLoop :: Maybe WorkerInfo -> m () -- XXX remove - -- | Tracks all active worker threads. + -- XXX instead of this we should use a dispatcher setting. + + -- | This is a hook which is invoked whenever the tail of the stream is + -- re-enqueued on the work queue. Normally, this is set to a noop. When + -- 'eager' option is enabled this is set to an unconditional worker + -- dispatch function. This ensures that we eagerly sends a worker as long + -- as there is work to do. + , eagerDispatch :: m () + + -- | Enqueue a stream for evaluation on the channel. The first argument is + -- used only when 'ordered' or 'interleaved' is NOT set. In that case the + -- queue has two priority levels, True means higher priority and False + -- means lower priority. The first element of the tuple is the runner + -- function which is used to run the stream actions in a specific monadic + -- context. + , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO () + + -- | Determine if the work queue is empty, therefore, there is no more work + -- to do. + , isQueueDone :: IO Bool + + -- | Worker function. It is implicitly aware of the work queue. It dequeues + -- a work item from the queue and runs it. It keeps on doing this in a loop + -- until it determines that it needs to stop. -- - -- [LOCKING] Updated unlocked, only by consumer thread. + -- Normally, the worker stops when the work queue becomes empty or the work + -- rate is higher than the target rate when rate control is enabled. It + -- stops by sending a 'ChildStop' event to the channel + -- + -- When rate control is enabled, the worker is dispatched with a + -- 'WorkerInfo' record which is used by the worker to maintain rate control + -- information and communicate it to the channel. + , workLoop :: Maybe WorkerInfo -> m () + + --------------------------------------------------------------------------- + -- Worker thread accounting + --------------------------------------------------------------------------- + -- + -- | This is capped to 'maxBufferLimit' if set to more than that. Otherwise + -- potentially each worker may yield one value to the buffer in the worst + -- case exceeding the requested buffer size. + , maxWorkerLimit :: Limit + + -- | Tracks all active worker threads. An entry is added by the dispatcher + -- when a worker is dispatched, and removed whenever the event processing + -- loop receives a 'ChildStop' event. + -- + -- [LOCKING] Updated unlocked, only by the event loop thread. , workerThreads :: IORef (Set ThreadId) -- | Total number of active worker threads. -- - -- [LOCKING] Updated locked, by consumer thread when dispatching a worker - -- and by a worker threads when the thread stops. This is read without lock - -- at several places where we want to rely on an approximate value. + -- [LOCKING] Updated locked, by the event loop thread when dispatching a + -- worker and by a worker threads when the thread stops. This is read + -- without lock at several places where we want to rely on an approximate + -- value. , workerCount :: IORef Int -- XXX Can we use IO instead of m here? , accountThread :: ThreadId -> m () - -- | Used when 'ordered' is enabled. + -- | Used when 'ordered' is enabled. This is a lock to stop the workers one + -- at a time. Stopping one might affect whether the other should stop. , workerStopMVar :: MVar () --------------------------------------------------------------------------- - -- cleanup -- + -- Channel cleanup -- --------------------------------------------------------------------------- - -- | IORef to call a cleanup function when the channel is garbage + -- | A weak IORef to call a cleanup function when the channel is garbage -- collected. , svarRef :: Maybe (IORef ()) --------------------------------------------------------------------------- - -- Stats -- + -- Channel Stats -- --------------------------------------------------------------------------- -- | Stats collection. , svarStats :: SVarStats @@ -197,13 +259,26 @@ data Channel m a = Channel , svarCreator :: ThreadId } --- | Yield a value to the channel. Worker latencies are collected in the --- supplied 'WorkerInfo' record, and periodically pushed to the channel's --- 'workerPendingLatency' stat. Worker latencies are updated in the channel --- only if the channel has a 'YieldRateInfo' attached and the +-- | Used by workers to send a value to the channel's output stream. +-- +-- When a worker is dispatched, a 'WorkerInfo' record is supplied to it by the +-- dispatcher. This record contains the timestamp at the time of dispatch. +-- Whenever the worker yields a value, the yield count in the 'WorkerInfo' is +-- incremented. If the channel has rate control enabled, the yield count and +-- time duration is periodically (based on 'workerPollingInterval') pushed to +-- the channel's 'workerPendingLatency' stat. It is done only if the -- 'workerPollingInterval' is non-zero. -- --- Even unregistered workers can use this API. +-- Queues the event but returns 'False' if: +-- +-- * the buffer limit is exceeding +-- * channel yield rate is exceeding (when rate control is enabled and +-- 'WorkerInfo' is available) +-- +-- This is a thread-safe API and can be called by anyone from anywhere. Even a +-- thread that is not registered as a worker with the channel can use it but +-- when rate control is enabled, it might confuse the rate control mechanism if +-- we use workers beyond the knowledge of dispatcher. -- {-# INLINE yieldWith #-} yieldWith :: @@ -221,10 +296,10 @@ yieldWith winfo chan = (outputDoorBell chan) winfo --- | The worker stops yielding and exits. The final update of the collected --- latency stats in 'WorkerInfo' are pushed to the channel. Upon receiving the --- 'ChildStop' event the channel would remove the worker from its set of --- registered workers. +-- | Send a 'ChildStop' event to the channel, used when the worker stops +-- yielding and exits. The final update of the collected latency stats in +-- 'WorkerInfo' is pushed to the channel. Upon receiving the 'ChildStop' event +-- the channel would remove the worker from its set of registered workers. -- -- A worker that uses this API must have been registered on the Channel prior -- to invoking this API. This is usually done by the dispatcher when the @@ -239,13 +314,15 @@ stopWith winfo chan = (outputDoorBell chan) winfo --- | Like 'stopWith' but stops with the specified exception. +-- | Like 'stopWith' but marks the stop event with the specified exception. {-# INLINE exceptionWith #-} exceptionWith :: Maybe WorkerInfo -> Channel m a -> SomeException -> IO () exceptionWith _winfo chan = sendException (outputQueue chan) (outputDoorBell chan) --- | Shutdown the channel. Kill all the registered worker threads. +-- | Send a 'ChildStopChannel' event to shutdown the channel. Upon receiving +-- the event the event processing loop kills all the registered worker threads +-- and stops the channel. {-# INLINABLE shutdown #-} shutdown :: MonadIO m => Channel m a -> m () shutdown chan = liftIO $ do diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index cea4f1c8f6..acb704f7a4 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -82,6 +82,10 @@ import Streamly.Internal.Control.ForkLifted (forkManaged) import Streamly.Internal.Data.Channel.Dispatcher (modifyThread) import Streamly.Internal.Data.Channel.Worker (sendEvent) import Streamly.Internal.Data.Stream (Stream, Step(..)) +import Streamly.Internal.Data.Stream.Channel + ( Channel(..), newChannel, fromChannel, toChannelK, withChannelK + , withChannel, shutdown + ) import Streamly.Internal.Data.SVar.Type (adaptState) import qualified Streamly.Internal.Data.MutArray as Unboxed @@ -91,7 +95,6 @@ import qualified Streamly.Internal.Data.StreamK as K import Prelude hiding (mapM, sequence, concat, concatMap, zipWith) import Streamly.Internal.Data.Channel.Types -import Streamly.Internal.Data.Stream.Channel -- $setup -- @@ -266,8 +269,8 @@ parTwo modifier stream1 stream2 = -- Used for concurrent evaluation of streams using a Channel. {-# INLINE concatMapDivK #-} concatMapDivK :: Monad m => - (K.StreamK m a -> m ()) - -> (a -> K.StreamK m b) + (K.StreamK m a -> m ()) -- ^ Queue the tail + -> (a -> K.StreamK m b) -- ^ Generate a stream from the head -> K.StreamK m a -> K.StreamK m b concatMapDivK useTail useHead stream = @@ -281,20 +284,33 @@ concatMapDivK useTail useHead stream = -- concat streams ------------------------------------------------------------------------------- --- | A runner function takes a queuing function @q@ and a stream, it splits the --- input stream, queuing the tail and using the head to generate a stream. --- 'mkEnqueue' takes a runner function and generates the queuing function @q@. --- Note that @q@ and the runner are mutually recursive, mkEnqueue ties the knot --- between the two. +-- | 'mkEnqueue chan divider' returns a queuing function @enq@. @enq@ takes a +-- @stream@ and enqueues the stream returned by @divider enq stream@ on the +-- channel. Divider generates an output stream from the head and enqueues the +-- tail on the channel. +-- +-- The returned function @enq@ basically queues two streams on the channel, the +-- first stream is a stream generated from the head element of the +-- input stream, the second stream is a lazy action which when evaluated would +-- recursively do the same thing again for the tail. If we keep on evaluating +-- the second stream, ultimately all the elements in the original stream +-- (@StreamK m a@) would be mapped to individual streams (@StreamK m b@) which +-- are individually queued on the channel. +-- +-- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the +-- knot between the two. +-- {-# INLINE mkEnqueue #-} mkEnqueue :: MonadAsync m => Channel m b + -- | @divider enq stream@ -> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b) + -- | Queuing function @enq@ -> m (K.StreamK m a -> m ()) mkEnqueue chan runner = do runInIO <- askRunInIO return - $ let q stream = do + $ let f stream = do -- When using parConcatMap with lazy dispatch we enqueue the -- outer stream tail and then map a stream generator on the -- head, which is also queued. If we pick both head and tail @@ -303,14 +319,14 @@ mkEnqueue chan runner = do -- the inner streams when picking up for execution. This -- requires two work queues, one for outer stream and one for -- inner. Here we enqueue the outer loop stream. - liftIO $ enqueue chan False (runInIO, runner q stream) + liftIO $ enqueue chan False (runInIO, runner f stream) -- XXX In case of eager dispatch we can just directly dispatch -- a worker with the tail stream here rather than first queuing -- and then dispatching a worker which dequeues the work. The -- older implementation did a direct dispatch here and its perf -- characterstics looked much better. eagerDispatch chan - in q + in f -- | Takes the head element of the input stream and queues the tail of the -- stream to the channel, then maps the supplied function on the head and @@ -349,6 +365,11 @@ parConcatMapChanKFirst chan f stream = q t return $ K.append (f h) done +-- XXX Move this to the Channel module as an evaluator. Rename to +-- parConcatMapChanK or just parConcatMapK. +-- XXX If we use toChannelK multiple times on a channel make sure the channel +-- does not go away before we use the subsequent ones. + {-# INLINE parConcatMapChanKGeneric #-} parConcatMapChanKGeneric :: MonadAsync m => (Config -> Config) From 9077f3938cb17b28bb5e493a75e073de6bc9cd6e Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 16 Feb 2024 07:49:29 +0530 Subject: [PATCH 04/24] Export maxYields --- src/Streamly/Internal/Data/Stream/Channel/Type.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Type.hs b/src/Streamly/Internal/Data/Stream/Channel/Type.hs index 45d5ca85c5..5db1bae389 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Type.hs @@ -17,6 +17,7 @@ module Streamly.Internal.Data.Stream.Channel.Type -- *** Limits , maxThreads , maxBuffer + , maxYields -- *** Rate Control , Rate(..) From 8e7760fdbc5283294c846beac74a69c8170f7350 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 03:09:01 +0530 Subject: [PATCH 05/24] Rename concatMapDivK to concatMapHeadK --- .../Internal/Data/Stream/Concurrent.hs | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index acb704f7a4..fe1e498fd7 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -262,22 +262,23 @@ parTwo modifier stream1 stream2 = -- Evaluator ------------------------------------------------------------------------------- --- | @concatMapDivK useTail useHead stream@, divides the stream in head and --- tail, maps a stream generator on the head and maps an action on the tail of --- a stream. Returns the stream generated by the head. --- --- Used for concurrent evaluation of streams using a Channel. -{-# INLINE concatMapDivK #-} -concatMapDivK :: Monad m => +-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation +-- function on the head element and performs a side effect on the tail. +-- +-- Used for concurrent evaluation of streams using a Channel. A worker +-- evaluating the stream would queue the tail and go on to evaluate the head. +-- The tail is picked up by another worker which does the same. +{-# INLINE concatMapHeadK #-} +concatMapHeadK :: Monad m => (K.StreamK m a -> m ()) -- ^ Queue the tail -> (a -> K.StreamK m b) -- ^ Generate a stream from the head -> K.StreamK m a -> K.StreamK m b -concatMapDivK useTail useHead stream = +concatMapHeadK consumeTail mapHead stream = K.mkStream $ \st yld sng stp -> do let foldShared = K.foldStreamShared st yld sng stp - single a = foldShared $ useHead a - yieldk a r = useTail r >> single a + single a = foldShared $ mapHead a + yieldk a r = consumeTail r >> single a in K.foldStreamShared (adaptState st) yieldk single stp stream ------------------------------------------------------------------------------- @@ -338,7 +339,7 @@ mkEnqueue chan runner = do parConcatMapChanK :: MonadAsync m => Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b parConcatMapChanK chan f stream = - let run q = concatMapDivK q f + let run q = concatMapHeadK q f in K.concatMapEffect (`run` stream) (mkEnqueue chan run) -- K.parConcatMap (_appendWithChanK chan) f stream @@ -347,7 +348,7 @@ parConcatMapChanKAny :: MonadAsync m => Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b parConcatMapChanKAny chan f stream = let done = K.nilM (shutdown chan) - run q = concatMapDivK q (\x -> K.append (f x) done) + run q = concatMapHeadK q (\x -> K.append (f x) done) in K.concatMapEffect (`run` stream) (mkEnqueue chan run) {-# INLINE parConcatMapChanKFirst #-} @@ -355,7 +356,7 @@ parConcatMapChanKFirst :: MonadAsync m => Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b parConcatMapChanKFirst chan f stream = let done = K.nilM (shutdown chan) - run q = concatMapDivK q f + run q = concatMapHeadK q f in K.concatEffect $ do res <- K.uncons stream case res of From 6bb838ce38e9c1a0a57e1de913878e7d919012d6 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 03:44:01 +0530 Subject: [PATCH 06/24] Move parConcatMapChanK to the Channel module Update the doc of parConcatMapChanK Update doc of toChannelK --- src/Streamly/Internal/Data/Stream/Channel.hs | 134 ++++++++++++++++- .../Data/Stream/Channel/Operations.hs | 12 +- .../Internal/Data/Stream/Concurrent.hs | 142 +----------------- 3 files changed, 148 insertions(+), 140 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index 3758ee125a..4a7fbb3c47 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -23,17 +23,20 @@ module Streamly.Internal.Data.Stream.Channel -- ** Evaluation , withChannelK , withChannel + , chanConcatMapK -- quiesceChannel -- wait for running tasks but do not schedule any more. ) where import Streamly.Internal.Control.Concurrent (MonadAsync) +import Control.Monad.IO.Class (MonadIO(liftIO)) import Streamly.Internal.Data.Stream (Stream) +import Streamly.Internal.Control.Concurrent (askRunInIO) +import Streamly.Internal.Data.SVar.Type (adaptState) import qualified Streamly.Internal.Data.StreamK as K import Streamly.Internal.Data.Channel.Types - import Streamly.Internal.Data.Stream.Channel.Type import Streamly.Internal.Data.Stream.Channel.Operations import Streamly.Internal.Data.Stream.Channel.Append @@ -82,3 +85,132 @@ withChannel :: MonadAsync m => withChannel modifier input evaluator = let f chan stream = K.fromStream $ evaluator chan (K.toStream stream) in K.toStream $ withChannelK modifier (K.fromStream input) f + +------------------------------------------------------------------------------- +-- Evaluator +------------------------------------------------------------------------------- + +-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation +-- function on the head element and performs a side effect on the tail. +-- +-- Used for concurrent evaluation of streams using a Channel. A worker +-- evaluating the stream would queue the tail and go on to evaluate the head. +-- The tail is picked up by another worker which does the same. +{-# INLINE concatMapHeadK #-} +concatMapHeadK :: Monad m => + (K.StreamK m a -> m ()) -- ^ Queue the tail + -> (a -> K.StreamK m b) -- ^ Generate a stream from the head + -> K.StreamK m a + -> K.StreamK m b +concatMapHeadK consumeTail mapHead stream = + K.mkStream $ \st yld sng stp -> do + let foldShared = K.foldStreamShared st yld sng stp + single a = foldShared $ mapHead a + yieldk a r = consumeTail r >> single a + in K.foldStreamShared (adaptState st) yieldk single stp stream + +------------------------------------------------------------------------------- +-- concat streams +------------------------------------------------------------------------------- + +-- | 'mkEnqueue chan f returns a queuing function @enq@. @enq@ takes a +-- @stream@ and enqueues @f enq stream@ on the channel. One example of @f@ is +-- 'concatMapHeadK'. When the enqueued value with 'concatMapHeadK' as @f@ is +-- evaluated, it generates an output stream from the head and enqueues @f enq +-- tail@ on the channel. Thus whenever the enqueued stream is evaluated it +-- generates a stream from the head and queues the tail on the channel. +-- +-- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the +-- knot between the two. +-- +{-# INLINE mkEnqueue #-} +mkEnqueue :: MonadAsync m => + Channel m b + -- | @divider enq stream@ + -> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b) + -- | Queuing function @enq@ + -> m (K.StreamK m a -> m ()) +mkEnqueue chan runner = do + runInIO <- askRunInIO + return + $ let f stream = do + -- When using parConcatMap with lazy dispatch we enqueue the + -- outer stream tail and then map a stream generator on the + -- head, which is also queued. If we pick both head and tail + -- with equal priority we may keep blowing up the tail into + -- more and more streams. To avoid that we give preference to + -- the inner streams when picking up for execution. This + -- requires two work queues, one for outer stream and one for + -- inner. Here we enqueue the outer loop stream. + liftIO $ enqueue chan False (runInIO, runner f stream) + -- XXX In case of eager dispatch we can just directly dispatch + -- a worker with the tail stream here rather than first queuing + -- and then dispatching a worker which dequeues the work. The + -- older implementation did a direct dispatch here and its perf + -- characterstics looked much better. + eagerDispatch chan + in f + +{-# INLINE parConcatMapChanKAll #-} +parConcatMapChanKAll :: MonadAsync m => + Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b +parConcatMapChanKAll chan f stream = + let run q = concatMapHeadK q f + in K.concatMapEffect (`run` stream) (mkEnqueue chan run) + -- K.parConcatMap (_appendWithChanK chan) f stream + +{-# INLINE parConcatMapChanKAny #-} +parConcatMapChanKAny :: MonadAsync m => + Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b +parConcatMapChanKAny chan f stream = + let done = K.nilM (shutdown chan) + run q = concatMapHeadK q (\x -> K.append (f x) done) + in K.concatMapEffect (`run` stream) (mkEnqueue chan run) + +{-# INLINE parConcatMapChanKFirst #-} +parConcatMapChanKFirst :: MonadAsync m => + Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b +parConcatMapChanKFirst chan f stream = + let done = K.nilM (shutdown chan) + run q = concatMapHeadK q f + in K.concatEffect $ do + res <- K.uncons stream + case res of + Nothing -> return K.nil + Just (h, t) -> do + q <- mkEnqueue chan run + q t + return $ K.append (f h) done + +-- | Make a concurrent stream evaluator from a stream, to be used in +-- 'withChannelK' or 'toChannelK'. Maps a stream generation function on each +-- element of the stream, the evaluation of the map on each element happens +-- concurrently. All the generated streams are merged together in the output of +-- the channel. The scheduling and termination behavior depends on the channel +-- settings. +-- +-- Note that if you queue a stream on the channel using 'toChannelK', it will +-- be picked up by a worker and the worker would evaluate the entire stream +-- serially and emit the results on the channel. However, if you transform the +-- stream using 'parConcatMapChanK' and queue it on the channel, it +-- parallelizes the function map on each element of the stream. The simplest +-- example is @parConcatMapChanK id id@ which is equivalent to evaluating each +-- element of the stream concurrently. +-- +-- A channel worker evaluating this function would enqueue the tail on the +-- channel's work queue and go on to evaluate the head generating an output +-- stream. The tail is picked up by another worker which does the same and so +-- on. +{-# INLINE chanConcatMapK #-} +chanConcatMapK :: MonadAsync m => + (Config -> Config) + -> Channel m b + -> (a -> K.StreamK m b) + -> K.StreamK m a + -> K.StreamK m b +chanConcatMapK modifier chan f stream = do + let cfg = modifier defaultConfig + case getStopWhen cfg of + AllStop -> parConcatMapChanKAll chan f stream + FirstStops -> parConcatMapChanKFirst chan f stream + AnyStops -> parConcatMapChanKAny chan f stream diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index 2aedde5ad3..3e97c6b715 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -88,6 +88,8 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) -- XXX Should be a Fold, singleton API could be called joinChannel, or the fold -- can be called joinChannel. +-- XXX If we use toChannelK multiple times on a channel make sure the channel +-- does not go away before we use the subsequent ones. -- | High level function to enqueue a work item on the channel. The fundamental -- unit of work is a stream. Each stream enqueued on the channel is picked up @@ -100,13 +102,17 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) -- be generated serially one after the other. Only two or more streams can be -- run concurrently with each other. -- +-- See 'chanConcatMapK' for concurrent evaluation of each element of a stream. +-- Alternatively, you can wrap each element of the original stream into a +-- stream generating action and queue all those streams on the channel. Then +-- all of them would be evaluated concurrently. However, that would not be +-- streaming in nature, it would require buffering space for the entire +-- original stream. Prefer 'chanConcatMapK' for larger streams. +-- -- Items from each evaluated streams are queued to the same output queue of the -- channel which can be read using 'fromChannelK'. 'toChannelK' can be called -- multiple times to enqueue multiple streams on the channel. -- --- The fundamental unit of work is a stream. If you want to run single actions --- concurrently, wrap each action into a singleton stream and queue all those --- streams on the channel. {-# INLINE toChannelK #-} toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m () toChannelK chan m = do diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index fe1e498fd7..7c1faa99ce 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -77,16 +77,15 @@ where import Control.Concurrent (myThreadId, killThread) import Control.Monad (void, when) import Control.Monad.IO.Class (MonadIO(liftIO)) -import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO) +import Streamly.Internal.Control.Concurrent (MonadAsync) import Streamly.Internal.Control.ForkLifted (forkManaged) import Streamly.Internal.Data.Channel.Dispatcher (modifyThread) import Streamly.Internal.Data.Channel.Worker (sendEvent) import Streamly.Internal.Data.Stream (Stream, Step(..)) import Streamly.Internal.Data.Stream.Channel ( Channel(..), newChannel, fromChannel, toChannelK, withChannelK - , withChannel, shutdown + , withChannel, shutdown, chanConcatMapK ) -import Streamly.Internal.Data.SVar.Type (adaptState) import qualified Streamly.Internal.Data.MutArray as Unboxed import qualified Streamly.Internal.Data.Stream as Stream @@ -258,133 +257,6 @@ parTwo modifier stream1 stream2 = $ appendWithK modifier (Stream.toStreamK stream1) (Stream.toStreamK stream2) -------------------------------------------------------------------------------- --- Evaluator -------------------------------------------------------------------------------- - --- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation --- function on the head element and performs a side effect on the tail. --- --- Used for concurrent evaluation of streams using a Channel. A worker --- evaluating the stream would queue the tail and go on to evaluate the head. --- The tail is picked up by another worker which does the same. -{-# INLINE concatMapHeadK #-} -concatMapHeadK :: Monad m => - (K.StreamK m a -> m ()) -- ^ Queue the tail - -> (a -> K.StreamK m b) -- ^ Generate a stream from the head - -> K.StreamK m a - -> K.StreamK m b -concatMapHeadK consumeTail mapHead stream = - K.mkStream $ \st yld sng stp -> do - let foldShared = K.foldStreamShared st yld sng stp - single a = foldShared $ mapHead a - yieldk a r = consumeTail r >> single a - in K.foldStreamShared (adaptState st) yieldk single stp stream - -------------------------------------------------------------------------------- --- concat streams -------------------------------------------------------------------------------- - --- | 'mkEnqueue chan divider' returns a queuing function @enq@. @enq@ takes a --- @stream@ and enqueues the stream returned by @divider enq stream@ on the --- channel. Divider generates an output stream from the head and enqueues the --- tail on the channel. --- --- The returned function @enq@ basically queues two streams on the channel, the --- first stream is a stream generated from the head element of the --- input stream, the second stream is a lazy action which when evaluated would --- recursively do the same thing again for the tail. If we keep on evaluating --- the second stream, ultimately all the elements in the original stream --- (@StreamK m a@) would be mapped to individual streams (@StreamK m b@) which --- are individually queued on the channel. --- --- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the --- knot between the two. --- -{-# INLINE mkEnqueue #-} -mkEnqueue :: MonadAsync m => - Channel m b - -- | @divider enq stream@ - -> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b) - -- | Queuing function @enq@ - -> m (K.StreamK m a -> m ()) -mkEnqueue chan runner = do - runInIO <- askRunInIO - return - $ let f stream = do - -- When using parConcatMap with lazy dispatch we enqueue the - -- outer stream tail and then map a stream generator on the - -- head, which is also queued. If we pick both head and tail - -- with equal priority we may keep blowing up the tail into - -- more and more streams. To avoid that we give preference to - -- the inner streams when picking up for execution. This - -- requires two work queues, one for outer stream and one for - -- inner. Here we enqueue the outer loop stream. - liftIO $ enqueue chan False (runInIO, runner f stream) - -- XXX In case of eager dispatch we can just directly dispatch - -- a worker with the tail stream here rather than first queuing - -- and then dispatching a worker which dequeues the work. The - -- older implementation did a direct dispatch here and its perf - -- characterstics looked much better. - eagerDispatch chan - in f - --- | Takes the head element of the input stream and queues the tail of the --- stream to the channel, then maps the supplied function on the head and --- evaluates the resulting stream. --- --- This function is designed to be used by worker threads on a channel to --- concurrently map and evaluate a stream. -{-# INLINE parConcatMapChanK #-} -parConcatMapChanK :: MonadAsync m => - Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b -parConcatMapChanK chan f stream = - let run q = concatMapHeadK q f - in K.concatMapEffect (`run` stream) (mkEnqueue chan run) - -- K.parConcatMap (_appendWithChanK chan) f stream - -{-# INLINE parConcatMapChanKAny #-} -parConcatMapChanKAny :: MonadAsync m => - Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b -parConcatMapChanKAny chan f stream = - let done = K.nilM (shutdown chan) - run q = concatMapHeadK q (\x -> K.append (f x) done) - in K.concatMapEffect (`run` stream) (mkEnqueue chan run) - -{-# INLINE parConcatMapChanKFirst #-} -parConcatMapChanKFirst :: MonadAsync m => - Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b -parConcatMapChanKFirst chan f stream = - let done = K.nilM (shutdown chan) - run q = concatMapHeadK q f - in K.concatEffect $ do - res <- K.uncons stream - case res of - Nothing -> return K.nil - Just (h, t) -> do - q <- mkEnqueue chan run - q t - return $ K.append (f h) done - --- XXX Move this to the Channel module as an evaluator. Rename to --- parConcatMapChanK or just parConcatMapK. --- XXX If we use toChannelK multiple times on a channel make sure the channel --- does not go away before we use the subsequent ones. - -{-# INLINE parConcatMapChanKGeneric #-} -parConcatMapChanKGeneric :: MonadAsync m => - (Config -> Config) - -> Channel m b - -> (a -> K.StreamK m b) - -> K.StreamK m a - -> K.StreamK m b -parConcatMapChanKGeneric modifier chan f stream = do - let cfg = modifier defaultConfig - case getStopWhen cfg of - AllStop -> parConcatMapChanK chan f stream - FirstStops -> parConcatMapChanKFirst chan f stream - AnyStops -> parConcatMapChanKAny chan f stream - -- XXX Add a deep evaluation variant that evaluates individual elements in the -- generated streams in parallel. @@ -395,7 +267,7 @@ parConcatMapChanKGeneric modifier chan f stream = do parConcatMapK :: MonadAsync m => (Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b parConcatMapK modifier f input = - let g = parConcatMapChanKGeneric modifier + let g = chanConcatMapK modifier in withChannelK modifier input (`g` f) -- | Map each element of the input to a stream and then concurrently evaluate @@ -673,12 +545,10 @@ parConcatIterate modifier f input = where - iterateStream channel = - parConcatMapChanKGeneric modifier channel (generate channel) + iterateStream chan = chanConcatMapK modifier chan (generate chan) - generate channel x = - -- XXX The channel q should be FIFO for DFS, otherwise it is BFS - x `K.cons` iterateStream channel (Stream.toStreamK $ f x) + -- XXX The channel q should be FIFO for DFS, otherwise it is BFS + generate chan x = x `K.cons` iterateStream chan (Stream.toStreamK $ f x) ------------------------------------------------------------------------------- -- Generate From b2783dbfb71ecabbb489037c1756e7784b5628c1 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 05:28:31 +0530 Subject: [PATCH 07/24] Remove the two queue solution for async streams Since the stream tail is always a single entry in the queue, this cannot help. --- src/Streamly/Internal/Data/Stream/Channel.hs | 10 +- .../Internal/Data/Stream/Channel/Append.hs | 98 +++++-------------- .../Data/Stream/Channel/Interleave.hs | 2 +- .../Data/Stream/Channel/Operations.hs | 6 +- .../Internal/Data/Stream/Channel/Type.hs | 11 +-- 5 files changed, 32 insertions(+), 95 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index 4a7fbb3c47..e08cbf68c4 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -134,15 +134,7 @@ mkEnqueue chan runner = do runInIO <- askRunInIO return $ let f stream = do - -- When using parConcatMap with lazy dispatch we enqueue the - -- outer stream tail and then map a stream generator on the - -- head, which is also queued. If we pick both head and tail - -- with equal priority we may keep blowing up the tail into - -- more and more streams. To avoid that we give preference to - -- the inner streams when picking up for execution. This - -- requires two work queues, one for outer stream and one for - -- inner. Here we enqueue the outer loop stream. - liftIO $ enqueue chan False (runInIO, runner f stream) + liftIO $ enqueue chan (runInIO, runner f stream) -- XXX In case of eager dispatch we can just directly dispatch -- a worker with the tail stream here rather than first queuing -- and then dispatching a worker which dequeues the work. The diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 2878fc257b..d1d7806ca1 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -48,69 +48,33 @@ import Streamly.Internal.Data.Stream.Channel.Type -- Concurrent streams with first-come-first serve results ------------------------------------------------------------------------------ --- | We use two queues, one outer and the other inner. When entries are present --- in both the queues, inner queue is given preference when dequeuing. --- --- Normally entries are queued to the inner queue only. The parConcatMap --- implementation makes use of the outer queue as well. The tail of the outer --- stream is queued to the outer queue whereas the inner loop streams are --- queued to the inner queue so that inner streams are given preference, --- otherwise we might just keep generating streams from the outer stream and --- not use them fast enough. We need to go depth first rather than breadth --- first. --- --- If we do not use outer and inner distinction there are two problematic --- cases. The outer stream gets executed faster than inner and may keep adding --- more entries. When we queue it back on the work queue it may be the first --- one to be picked if it is on top of the LIFO. --- --- Normally, when using parConcatMap the outer queue would have only one entry --- which is the tail of the outer stream. However, when manually queueing --- streams on the channel using 'toChannelK' you could queue to outer or inner, --- in which case outer queue may have multiple entries. --- {-# INLINE enqueueLIFO #-} enqueueLIFO :: Channel m a - -- | (outer queue, inner queue) - -> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) - -> Bool -- True means put it on inner queue, otherwise outer + -> IORef [(RunInIO m, K.StreamK m a)] -> (RunInIO m, K.StreamK m a) -> IO () -enqueueLIFO sv q inner m = do - atomicModifyIORefCAS_ q $ \(xs, ys) -> - if inner then (xs, m : ys) else (m : xs, ys) +enqueueLIFO sv q m = do + atomicModifyIORefCAS_ q (m :) ringDoorBell (doorBellOnWorkQ sv) (outputDoorBell sv) --- | We need to know whether an entry was dequeued from the outer q or inner q --- because when we consume it partially and q it back on the q we need to know --- which q to put it back on. -data QResult a = - QEmpty - | QOuter a -- ^ Entry dequeued from outer q - | QInner a -- ^ Entry dequeued from inner q - --- | Dequeues from inner q first and if it is empty then dequeue from the --- outer. {-# INLINE dequeue #-} dequeue :: MonadIO m => - -- | (outer queue, inner queue) - IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) - -> m (QResult (RunInIO m, K.StreamK m a)) + IORef [(RunInIO m, K.StreamK m a)] + -> m (Maybe (RunInIO m, K.StreamK m a)) dequeue qref = liftIO $ atomicModifyIORefCAS qref $ \case - (xs, y : ys) -> ((xs, ys), QInner y) - (x : xs, ys) -> ((xs, ys), QOuter x) - x -> (x, QEmpty) + (x : xs) -> (xs, Just x) + x -> (x, Nothing) data WorkerStatus = Continue | Suspend {-# INLINE workLoopLIFO #-} workLoopLIFO :: MonadRunInIO m - => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) + => IORef [(RunInIO m, K.StreamK m a)] -> Channel m a -> Maybe WorkerInfo -> m () @@ -121,14 +85,10 @@ workLoopLIFO qref sv winfo = run run = do work <- dequeue qref case work of - QEmpty -> - liftIO $ stopWith winfo sv - QInner (RunInIO runin, m) -> - process runin m True - QOuter (RunInIO runin, m) -> - process runin m False - - process runin m inner = do + Nothing -> liftIO $ stopWith winfo sv + Just (RunInIO runin, m) -> process runin m + + process runin m = do -- XXX when we finish we need to send the monadic state back to -- the parent so that the state can be merged back. We capture -- and return the state in the stop continuation. @@ -160,7 +120,7 @@ workLoopLIFO qref sv winfo = run then K.foldStreamShared undefined yieldk single (return Continue) r else do runInIO <- askRunInIO - liftIO $ enqueueLIFO sv qref inner (runInIO, r) + liftIO $ enqueueLIFO sv qref (runInIO, r) return Suspend -- We duplicate workLoop for yield limit and no limit cases because it has @@ -171,7 +131,7 @@ workLoopLIFO qref sv winfo = run {-# INLINE workLoopLIFOLimited #-} workLoopLIFOLimited :: forall m a. MonadRunInIO m - => IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) + => IORef [(RunInIO m, K.StreamK m a)] -> Channel m a -> Maybe WorkerInfo -> m () @@ -185,14 +145,10 @@ workLoopLIFOLimited qref sv winfo = run run = do work <- dequeue qref case work of - QEmpty -> - liftIO $ stopWith winfo sv - QInner item -> - process item True - QOuter item -> - process item False - - process item@(RunInIO runin, m) inner = do + Nothing -> liftIO $ stopWith winfo sv + Just item -> process item + + process item@(RunInIO runin, m) = do -- XXX This is just a best effort minimization of concurrency -- to the yield limit. If the stream is made of concurrent -- streams we do not reserve the yield limit in the constituent @@ -216,7 +172,7 @@ workLoopLIFOLimited qref sv winfo = run -- Avoid any side effects, undo the yield limit decrement if we -- never yielded anything. else liftIO $ do - enqueueLIFO sv qref inner item + enqueueLIFO sv qref item incrementYieldLimit (remainingWork sv) stopWith winfo sv @@ -236,7 +192,7 @@ workLoopLIFOLimited qref sv winfo = run else do runInIO <- askRunInIO liftIO $ incrementYieldLimit (remainingWork sv) - liftIO $ enqueueLIFO sv qref inner (runInIO, r) + liftIO $ enqueueLIFO sv qref (runInIO, r) return Suspend ------------------------------------------------------------------------------- @@ -923,10 +879,7 @@ getLifoSVar mrun cfg = do active <- newIORef 0 wfw <- newIORef False running <- newIORef Set.empty - q <- newIORef - ( [] :: [(RunInIO m, K.StreamK m a)] - , [] :: [(RunInIO m, K.StreamK m a)] - ) + q <- newIORef ([] :: [(RunInIO m, K.StreamK m a)]) -- Sequence number is incremented whenever something is de-queued, -- therefore, first sequence number would be 0 aheadQ <- newIORef ([], -1) @@ -943,8 +896,8 @@ getLifoSVar mrun cfg = do -- We are reading it without lock, the result would be reliable only if no -- worker is pending. let isWorkFinished _ = do - (xs, ys) <- readIORef q - return (null xs && null ys) + xs <- readIORef q + return (null xs) let isWorkFinishedLimited sv = do yieldsDone <- @@ -963,7 +916,7 @@ getLifoSVar mrun cfg = do -> (Channel m a -> m [ChildEvent a]) -> (Channel m a -> m Bool) -> (Channel m a -> IO Bool) - -> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)]) + -> (IORef [(RunInIO m, K.StreamK m a)] -> Channel m a -> Maybe WorkerInfo -> m()) @@ -984,10 +937,9 @@ getLifoSVar mrun cfg = do then workLoopAhead aheadQ outH sv else wloop q sv , enqueue = - \inner -> if inOrder then enqueueAhead sv aheadQ - else enqueueLIFO sv q inner + else enqueueLIFO sv q , eagerDispatch = when eagerEval $ void $ dispatchWorker 0 sv , isWorkDone = if inOrder diff --git a/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs b/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs index a981127053..3b03a2b396 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Interleave.hs @@ -198,7 +198,7 @@ getFifoSVar mrun cfg = do , postProcess = postProc sv , workerThreads = running , workLoop = wloop q sv - , enqueue = \_ -> enqueueFIFO sv q + , enqueue = enqueueFIFO sv q , eagerDispatch = return () , isWorkDone = workDone sv , isQueueDone = workDone sv diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index 3e97c6b715..61f6d95a83 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -117,11 +117,7 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m () toChannelK chan m = do runIn <- askRunInIO - -- The second argument to enqeue is used in case of lazy on-demand - -- scheduling. See comments in mkEnqueue. By default we enqueue on the - -- inner work q (True). When using concatMap the outer loop is enqueued on - -- the outer work q. - liftIO $ enqueue chan True (runIn, m) + liftIO $ enqueue chan (runIn, m) -- INLINE for fromStreamK/toStreamK fusion diff --git a/src/Streamly/Internal/Data/Stream/Channel/Type.hs b/src/Streamly/Internal/Data/Stream/Channel/Type.hs index 5db1bae389..20cc39754e 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Type.hs @@ -181,13 +181,10 @@ data Channel m a = Channel -- as there is work to do. , eagerDispatch :: m () - -- | Enqueue a stream for evaluation on the channel. The first argument is - -- used only when 'ordered' or 'interleaved' is NOT set. In that case the - -- queue has two priority levels, True means higher priority and False - -- means lower priority. The first element of the tuple is the runner - -- function which is used to run the stream actions in a specific monadic - -- context. - , enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO () + -- | Enqueue a stream for evaluation on the channel. The first element of + -- the tuple is the runner function which is used to run the stream actions + -- in a specific monadic context. + , enqueue :: (RunInIO m, StreamK m a) -> IO () -- | Determine if the work queue is empty, therefore, there is no more work -- to do. From 997580a7a6f7f3c386451f5fd081eb7a5c8a9a3b Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 06:09:24 +0530 Subject: [PATCH 08/24] Rename some dispatcher functions --- src/Streamly/Internal/Data/Stream/Channel.hs | 2 +- .../Internal/Data/Stream/Channel/Consumer.hs | 10 +++---- .../Data/Stream/Channel/Dispatcher.hs | 28 +++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index e08cbf68c4..14897c0db3 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -19,11 +19,11 @@ module Streamly.Internal.Data.Stream.Channel , module Streamly.Internal.Data.Stream.Channel.Dispatcher , module Streamly.Internal.Data.Stream.Channel.Consumer , module Streamly.Internal.Data.Stream.Channel.Operations + , chanConcatMapK -- ** Evaluation , withChannelK , withChannel - , chanConcatMapK -- quiesceChannel -- wait for running tasks but do not schedule any more. ) where diff --git a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs index bf173649ef..b299a90cbb 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs @@ -63,11 +63,11 @@ readOutputQBounded eagerEval sv = do cnt <- liftIO $ readIORef $ workerCount sv when (cnt <= 0) $ do done <- liftIO $ isWorkDone sv - when (not done) (pushWorker 0 sv) + when (not done) (forkWorker 0 sv) {-# INLINE blockingRead #-} blockingRead = do - sendWorkerWait eagerEval sendWorkerDelay (dispatchWorker 0) sv + dispatchAllWait eagerEval sendWorkerDelay (dispatchWorker 0) sv liftIO (fst `fmap` readOutputQChan sv) -- | Same as 'readOutputQBounded' but uses 'dispatchWorkerPaced' to @@ -90,7 +90,7 @@ readOutputQPaced sv = do {-# INLINE blockingRead #-} blockingRead = do - sendWorkerWait False sendWorkerDelayPaced dispatchWorkerPaced sv + dispatchAllWait False sendWorkerDelayPaced dispatchWorkerPaced sv liftIO (fst `fmap` readOutputQChan sv) -- | If there is work to do dispatch as many workers as the target rate @@ -113,7 +113,7 @@ postProcessPaced sv = do -- finished, therefore we cannot just rely on dispatchWorkerPaced -- which may or may not send a worker. noWorker <- allThreadsDone (workerThreads sv) - when noWorker $ pushWorker 0 sv + when noWorker $ forkWorker 0 sv return r else return False @@ -137,7 +137,7 @@ postProcessBounded sv = do r <- liftIO $ isWorkDone sv -- Note that we need to guarantee a worker, therefore we cannot just -- use dispatchWorker which may or may not send a worker. - when (not r) (pushWorker 0 sv) + when (not r) (forkWorker 0 sv) -- XXX do we need to dispatch many here? -- void $ dispatchWorker sv return r diff --git a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs index 0b968d5f17..f339abe742 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs @@ -12,10 +12,10 @@ module Streamly.Internal.Data.Stream.Channel.Dispatcher -- *** Worker Dispatching -- | Low level functions used to build readOutputQ and postProcess -- functions. - pushWorker + forkWorker , dispatchWorker , dispatchWorkerPaced - , sendWorkerWait + , dispatchAllWait , sendWorkerDelay , sendWorkerDelayPaced , startChannel -- XXX bootstrap? @@ -46,12 +46,12 @@ import Streamly.Internal.Data.Stream.Channel.Type -- | Low level API to create a worker. Forks a thread which executes the -- 'workLoop' of the channel. -{-# NOINLINE pushWorker #-} -pushWorker :: MonadRunInIO m => +{-# NOINLINE forkWorker #-} +forkWorker :: MonadRunInIO m => Count -- ^ max yield limit for the worker -> Channel m a -> m () -pushWorker yieldMax sv = do +forkWorker yieldMax sv = do liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 when (svarInspectMode sv) $ recordMaxWorkers (workerCount sv) (svarStats sv) @@ -140,7 +140,7 @@ checkMaxBuffer active sv = do (_, n) <- liftIO $ readIORef (outputQueue sv) return $ fromIntegral lim > n + active --- | Higher level API to dispatch a worker, it uses 'pushWorker' to create a +-- | Higher level API to dispatch a worker, it uses 'forkWorker' to create a -- worker. Dispatches a worker only if: -- -- * the channel has work to do @@ -176,13 +176,13 @@ dispatchWorker yieldCount sv = do then do r1 <- checkMaxBuffer active sv if r1 - then pushWorker yieldCount sv >> return True + then forkWorker yieldCount sv >> return True else return False else return False else do when (active <= 0) $ do r <- liftIO $ isWorkDone sv - when (not r) $ pushWorker 0 sv + when (not r) $ forkWorker 0 sv return False else return False @@ -314,15 +314,15 @@ dispatchWorkerPaced sv = do -- -- When this function returns we are sure that there is some output available. -- -{-# NOINLINE sendWorkerWait #-} -sendWorkerWait +{-# NOINLINE dispatchAllWait #-} +dispatchAllWait :: MonadIO m => Bool -- ^ 'eager' option is on -> (Channel m a -> IO ()) -- ^ delay function -> (Channel m a -> m Bool) -- ^ dispatcher function -> Channel m a -> m () -sendWorkerWait eagerEval delay dispatch sv = go +dispatchAllWait eagerEval delay dispatch sv = go where @@ -400,7 +400,7 @@ sendWorkerWait eagerEval delay dispatch sv = go $ withDiagMVar (svarInspectMode sv) (dumpChannel sv) - "sendWorkerWait: nothing to do" + "dispatchAllWait: nothing to do" $ takeMVar (outputDoorBell sv) (_, len) <- liftIO $ readIORef (outputQueue sv) if len <= 0 @@ -417,11 +417,11 @@ startChannel :: MonadRunInIO m => Channel m a -> m () startChannel chan = do case yieldRateInfo chan of - Nothing -> pushWorker 0 chan + Nothing -> forkWorker 0 chan Just yinfo -> if svarLatencyTarget yinfo == maxBound then liftIO $ threadDelay maxBound - else pushWorker 1 chan + else forkWorker 1 chan -- | Noop as of now. sendWorkerDelayPaced :: Channel m a -> IO () From fe20741174c949dfb08a12a2343df6419df10e49 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 08:16:29 +0530 Subject: [PATCH 09/24] Add rate benchmarks for new concurrent streams --- .../Streamly/Benchmark/Data/Stream/Rate.hs | 129 +++++++++++++ benchmark/streamly-benchmarks.cabal | 175 ++++++++++-------- 2 files changed, 225 insertions(+), 79 deletions(-) create mode 100644 benchmark/Streamly/Benchmark/Data/Stream/Rate.hs diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs new file mode 100644 index 0000000000..78b4bf0512 --- /dev/null +++ b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs @@ -0,0 +1,129 @@ +{-# LANGUAGE FlexibleContexts #-} + +-- | +-- Module : Main +-- Copyright : (c) 2018 Composewell Technologies +-- +-- License : BSD3 +-- Maintainer : streamly@composewell.com + +import Stream.Common (benchIOSrc, sourceUnfoldrM) +import Streamly.Data.Stream (Stream) +import Streamly.Internal.Data.Stream.Prelude (MonadAsync, Config) + +import qualified Streamly.Data.Stream.Prelude as Stream + +import Streamly.Benchmark.Common +import Test.Tasty.Bench + +moduleName :: String +moduleName = "Data.Stream.Rate" + +------------------------------------------------------------------------------- +-- Average Rate +------------------------------------------------------------------------------- + +{-# INLINE rateNothing #-} +rateNothing :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Int -> Stream m Int +rateNothing f cfg value = f (Stream.rate Nothing . cfg) . sourceUnfoldrM value + +{-# INLINE avgRate #-} +avgRate :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Double -> Int -> Stream m Int +avgRate f cfg value rt = f (Stream.avgRate rt . cfg) . sourceUnfoldrM value + +{- +-- parEval should be maxThreads 1 anyway +{-# INLINE avgRateThreads1 #-} +avgRateThreads1 :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Double -> Int -> Stream m Int +avgRateThreads1 f cfg value rt = + f (Stream.maxThreads 1 . Stream.avgRate rt . cfg) . sourceUnfoldrM value +-} + +{-# INLINE minRate #-} +minRate :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Double -> Int -> Stream m Int +minRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value + +{-# INLINE maxRate #-} +maxRate :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Double -> Int -> Stream m Int +maxRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value + +{-# INLINE constRate #-} +constRate :: MonadAsync m => + ((Config -> Config) -> Stream m Int -> Stream m Int) + -> (Config -> Config) -> Int -> Double -> Int -> Stream m Int +constRate f cfg value rt = f (Stream.constRate rt . cfg) . sourceUnfoldrM value + +-- XXX arbitrarily large rate should be the same as rate Nothing +-- XXX Add tests for multiworker cases as well - parMapM +o_1_space_async :: Int -> [Benchmark] +o_1_space_async value = + [ bgroup + "default/parEval" + [ bgroup + "avgRate" + -- benchIO "unfoldr" $ toNull + -- benchIOSrc "unfoldrM" (sourceUnfoldrM value) + [ benchIOSrc "Nothing" $ rateNothing Stream.parEval id value + , benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000 + , benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000 + -- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000 + , benchIOSrc "10M" $ avgRate Stream.parEval id value 10000000 + , benchIOSrc "20M" $ avgRate Stream.parEval id value 20000000 + ] + , bgroup + "minRate" + [ benchIOSrc "1M" $ minRate Stream.parEval id value 1000000 + , benchIOSrc "10M" $ minRate Stream.parEval id value 10000000 + , benchIOSrc "20M" $ minRate Stream.parEval id value 20000000 + ] + , bgroup + "maxRate" + [ -- benchIOSrc "10K" $ maxRate value 10000 + benchIOSrc "10M" $ maxRate Stream.parEval id value 10000000 + ] + , bgroup + "constRate" + [ -- benchIOSrc "10K" $ constRate value 10000 + benchIOSrc "1M" $ constRate Stream.parEval id value 1000000 + , benchIOSrc "10M" $ constRate Stream.parEval id value 10000000 + ] + ] + ] + +o_1_space_ahead :: Int -> [Benchmark] +o_1_space_ahead value = + [ bgroup + "ordered/parEval" + [ benchIOSrc "avgRate/1M" + $ avgRate Stream.parEval (Stream.ordered True) value 1000000 + , benchIOSrc "minRate/1M" + $ minRate Stream.parEval (Stream.ordered True) value 1000000 + , benchIOSrc "maxRate/1M" + $ maxRate Stream.parEval (Stream.ordered True) value 1000000 + , benchIOSrc "constRate/1M" + $ constRate Stream.parEval (Stream.ordered True) value 1000000 + ] + ] + +------------------------------------------------------------------------------- +-- Main +------------------------------------------------------------------------------- + +main :: IO () +main = runWithCLIOpts defaultStreamSize allBenchmarks + + where + + allBenchmarks value = + [ bgroup (o_1_space_prefix moduleName) + $ concat [o_1_space_async value, o_1_space_ahead value]] diff --git a/benchmark/streamly-benchmarks.cabal b/benchmark/streamly-benchmarks.cabal index 748db0d8ee..d60bf4e0d8 100644 --- a/benchmark/streamly-benchmarks.cabal +++ b/benchmark/streamly-benchmarks.cabal @@ -296,6 +296,59 @@ benchmark Data.Stream.StreamDK else ghc-options: +RTS -M2500M -RTS +benchmark Data.StreamD + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: Streamly/Benchmark/Data/Stream + main-is: StreamD.hs + if impl(ghcjs) + buildable: False + +benchmark Data.StreamK + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: Streamly/Benchmark/Data/Stream + main-is: StreamK.hs + if impl(ghcjs) + buildable: False + +benchmark Data.Stream.ToStreamK + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: Streamly/Benchmark/Data/Stream + main-is: ToStreamK.hs + if !flag(dev) || impl(ghcjs) + buildable: False + +benchmark Data.StreamK.Alt + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: Streamly/Benchmark/Data/Stream + main-is: StreamKAlt.hs + if !flag(dev) || impl(ghcjs) + buildable: False + +benchmark Data.Unfold + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: . + main-is: Streamly/Benchmark/Data/Unfold.hs + if flag(use-streamly-core) || impl(ghcjs) + buildable: False + else + buildable: True + +executable nano-bench + import: bench-options + hs-source-dirs: . + main-is: NanoBenchmarks.hs + if !flag(dev) + buildable: False + +------------------------------------------------------------------------------- +-- Concurrent Streams +------------------------------------------------------------------------------- + benchmark Data.Stream.Concurrent import: bench-options-threaded type: exitcode-stdio-1.0 @@ -348,45 +401,23 @@ benchmark Data.Stream.ConcurrentOrdered else buildable: True -benchmark Data.Unbox - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: . - cpp-options: -DUSE_UNBOX - main-is: Streamly/Benchmark/Data/Serialize.hs - -benchmark Data.Unbox.Derive.TH - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: . - cpp-options: -DUSE_UNBOX - cpp-options: -DUSE_TH - main-is: Streamly/Benchmark/Data/Serialize.hs - -benchmark Data.Serialize - import: bench-options +benchmark Data.Stream.Rate + import: bench-options-threaded type: exitcode-stdio-1.0 - hs-source-dirs: . - cpp-options: -DUSE_TH - main-is: Streamly/Benchmark/Data/Serialize.hs - build-depends: QuickCheck, template-haskell + hs-source-dirs: Streamly/Benchmark/Data/Stream/, Streamly/Benchmark/Data/ + main-is: Rate.hs other-modules: - Streamly.Benchmark.Data.Serialize.TH - Streamly.Benchmark.Data.Serialize.RecCompatible - Streamly.Benchmark.Data.Serialize.RecNonCompatible - if flag(limit-build-mem) - ghc-options: +RTS -M1000M -RTS - -benchmark Data.Unfold - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: . - main-is: Streamly/Benchmark/Data/Unfold.hs + Stream.ConcurrentCommon + Stream.Common if flag(use-streamly-core) || impl(ghcjs) buildable: False else buildable: True +------------------------------------------------------------------------------- +-- Folds and Parsers +------------------------------------------------------------------------------- + benchmark Data.Fold import: bench-options type: exitcode-stdio-1.0 @@ -459,49 +490,6 @@ benchmark Data.ParserK.Chunked.Generic buildable: True build-depends: exceptions >= 0.8 && < 0.11 -------------------------------------------------------------------------------- --- Raw Streams -------------------------------------------------------------------------------- - -benchmark Data.StreamD - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: Streamly/Benchmark/Data/Stream - main-is: StreamD.hs - if impl(ghcjs) - buildable: False - -benchmark Data.StreamK - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: Streamly/Benchmark/Data/Stream - main-is: StreamK.hs - if impl(ghcjs) - buildable: False - -benchmark Data.Stream.ToStreamK - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: Streamly/Benchmark/Data/Stream - main-is: ToStreamK.hs - if !flag(dev) || impl(ghcjs) - buildable: False - -benchmark Data.StreamK.Alt - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: Streamly/Benchmark/Data/Stream - main-is: StreamKAlt.hs - if !flag(dev) || impl(ghcjs) - buildable: False - -executable nano-bench - import: bench-options - hs-source-dirs: . - main-is: NanoBenchmarks.hs - if !flag(dev) - buildable: False - ------------------------------------------------------------------------------- -- Streamly.Prelude ------------------------------------------------------------------------------- @@ -608,10 +596,6 @@ benchmark Prelude.Parallel if flag(limit-build-mem) ghc-options: +RTS -M2000M -RTS -------------------------------------------------------------------------------- --- Concurrent Streams -------------------------------------------------------------------------------- - benchmark Prelude.Concurrent import: bench-options-threaded type: exitcode-stdio-1.0 @@ -639,6 +623,39 @@ benchmark Prelude.Rate if !flag(use-prelude) buildable: False +------------------------------------------------------------------------------- +-- Serialization Benchmarks +------------------------------------------------------------------------------- + +benchmark Data.Unbox + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: . + cpp-options: -DUSE_UNBOX + main-is: Streamly/Benchmark/Data/Serialize.hs + +benchmark Data.Unbox.Derive.TH + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: . + cpp-options: -DUSE_UNBOX + cpp-options: -DUSE_TH + main-is: Streamly/Benchmark/Data/Serialize.hs + +benchmark Data.Serialize + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: . + cpp-options: -DUSE_TH + main-is: Streamly/Benchmark/Data/Serialize.hs + build-depends: QuickCheck, template-haskell + other-modules: + Streamly.Benchmark.Data.Serialize.TH + Streamly.Benchmark.Data.Serialize.RecCompatible + Streamly.Benchmark.Data.Serialize.RecNonCompatible + if flag(limit-build-mem) + ghc-options: +RTS -M1000M -RTS + ------------------------------------------------------------------------------- -- Array Benchmarks ------------------------------------------------------------------------------- From 333cc944034d59e78dacaea2e54adbfa978da928 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 12:48:26 +0530 Subject: [PATCH 10/24] Add comparison benchmarks for rate/without rate --- benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs | 1 + benchmark/Streamly/Benchmark/Data/Stream/Rate.hs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs index 723ffedb17..100d215bea 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs @@ -200,6 +200,7 @@ allBenchmarks moduleName wide modifier value = , o_1_space_concatFoldable value modifier , o_1_space_concatMap "" value modifier , o_1_space_concatMap "-maxThreads-1" value (modifier . Async.maxThreads 1) + , o_1_space_concatMap "-rate-Nothing" value (modifier . Async.rate Nothing) , o_1_space_joining value modifier ] ++ if wide then [] else o_1_space_outerProduct value modifier , bgroup (o_n_heap_prefix moduleName) $ concat diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs index 78b4bf0512..1bf139455e 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Rate.hs @@ -72,8 +72,8 @@ o_1_space_async value = [ bgroup "avgRate" -- benchIO "unfoldr" $ toNull - -- benchIOSrc "unfoldrM" (sourceUnfoldrM value) - [ benchIOSrc "Nothing" $ rateNothing Stream.parEval id value + [ benchIOSrc "baseline" $ sourceUnfoldrM value + , benchIOSrc "Nothing" $ rateNothing Stream.parEval id value , benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000 , benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000 -- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000 From fd9ca82e5f2b3a91b71ff9296329afe8fe65b475 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 08:16:51 +0530 Subject: [PATCH 11/24] Fix old prelude rate benchmarks --- benchmark/lib/Streamly/Benchmark/Prelude.hs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/benchmark/lib/Streamly/Benchmark/Prelude.hs b/benchmark/lib/Streamly/Benchmark/Prelude.hs index 26164dceab..e9de3b413c 100644 --- a/benchmark/lib/Streamly/Benchmark/Prelude.hs +++ b/benchmark/lib/Streamly/Benchmark/Prelude.hs @@ -62,7 +62,7 @@ module Streamly.Benchmark.Prelude , transformComposeMapM , transformMapM , transformTeeMapM - , transformZipMapM + -- , transformZipMapM ) where @@ -78,7 +78,7 @@ import qualified Data.Foldable as F import qualified Data.List as List import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as Internal -import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream +import qualified Streamly.Internal.Data.Stream.IsStream as IsStream import qualified Streamly.Internal.Data.Pipe as Pipe import qualified Streamly.Internal.Data.Stream.Serial as Serial @@ -318,9 +318,10 @@ transformTeeMapM t n = composeN n $ t . Internal.transform - (Pipe.mapM (\x -> return (x + 1)) `Pipe.tee` + (Pipe.mapM (\x -> return (x + 1)) `Pipe.teeMerge` Pipe.mapM (\x -> return (x + 2))) +{- {-# INLINE transformZipMapM #-} transformZipMapM :: (S.IsStream t, S.MonadAsync m) @@ -336,6 +337,7 @@ transformZipMapM t n = (+) (Pipe.mapM (\x -> return (x + 1))) (Pipe.mapM (\x -> return (x + 2)))) +-} ------------------------------------------------------------------------------- -- Streams of streams From 8a62be8eecbdec3640076653302b3db898aa1c83 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 11:11:19 +0530 Subject: [PATCH 12/24] Fix build with "dev" flag --- core/src/Streamly/Internal/Data/Array/Type.hs | 7 ++++++- src/Streamly/Internal/Data/SmallArray/Type.hs | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/Streamly/Internal/Data/Array/Type.hs b/core/src/Streamly/Internal/Data/Array/Type.hs index 1d26d4143a..fbc4cf14c1 100644 --- a/core/src/Streamly/Internal/Data/Array/Type.hs +++ b/core/src/Streamly/Internal/Data/Array/Type.hs @@ -329,7 +329,12 @@ isPinned = MA.isPinned . unsafeThaw -- would make a copy on every splice operation, instead use the -- 'fromChunksK' operation to combine n immutable arrays. {-# INLINE splice #-} -splice :: MonadIO m => Array a -> Array a -> m (Array a) +splice :: (MonadIO m +#ifdef DEVBUILD + , Unbox a +#endif + ) + => Array a -> Array a -> m (Array a) splice arr1 arr2 = unsafeFreeze <$> MA.spliceCopy (unsafeThaw arr1) (unsafeThaw arr2) diff --git a/src/Streamly/Internal/Data/SmallArray/Type.hs b/src/Streamly/Internal/Data/SmallArray/Type.hs index 0dc3d40d1b..32b21f6af8 100644 --- a/src/Streamly/Internal/Data/SmallArray/Type.hs +++ b/src/Streamly/Internal/Data/SmallArray/Type.hs @@ -1,4 +1,6 @@ {-# LANGUAGE UnboxedTuples #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE NoMonoLocalBinds #-} -- | -- Module : Data.Primitive.SmallArray From 04f59097f31f582782b6c7fdcd5b2421b268c07e Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 15 Feb 2024 11:46:43 +0530 Subject: [PATCH 13/24] Add rate tests for new concurrent streams --- streamly.cabal | 2 +- test/Streamly/Test/Data/Stream/Rate.hs | 252 +++++++++++++++++++++++++ test/streamly-tests.cabal | 28 ++- 3 files changed, 273 insertions(+), 9 deletions(-) create mode 100644 test/Streamly/Test/Data/Stream/Rate.hs diff --git a/streamly.cabal b/streamly.cabal index 39a3f40593..5ccb87256e 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -118,7 +118,7 @@ extra-source-files: test/Streamly/Test/Data/Array/Stream.hs test/Streamly/Test/Data/Parser.hs test/Streamly/Test/Data/ParserK.hs - test/Streamly/Test/Data/Stream/Concurrent.hs + test/Streamly/Test/Data/Stream/*.hs test/Streamly/Test/FileSystem/Event.hs test/Streamly/Test/FileSystem/Event/Common.hs test/Streamly/Test/FileSystem/Event/Darwin.hs diff --git a/test/Streamly/Test/Data/Stream/Rate.hs b/test/Streamly/Test/Data/Stream/Rate.hs new file mode 100644 index 0000000000..8e0a2e7beb --- /dev/null +++ b/test/Streamly/Test/Data/Stream/Rate.hs @@ -0,0 +1,252 @@ +-- | +-- Module : Streamly.Test.Data.Stream.Rate +-- Copyright : (c) 2018 Composewell Technologies +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC + +module Streamly.Test.Data.Stream.Rate (main) where + +import Streamly.Data.Stream.Prelude (Stream, Config) +import Streamly.Internal.Data.Time.Clock (getTime, Clock(..)) +import Streamly.Internal.Data.Time.Units + (NanoSecond64, diffAbsTime64, fromRelTime64) + +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream.Prelude as Stream + +import Control.Concurrent +import Control.Monad +import System.Random +import Test.Hspec + +durationShouldBe :: (Double, Double) -> IO () -> Expectation +durationShouldBe d@(tMin, tMax) action = do + t0 <- getTime Monotonic + action + t1 <- getTime Monotonic + let diff = fromRelTime64 (diffAbsTime64 t1 t0) :: NanoSecond64 + let t = fromIntegral diff / 1e9 + putStrLn $ "Expected: " <> show d <> " Took: " <> show t + (t <= tMax && t >= tMin) `shouldBe` True + +toMicroSecs :: Num a => a -> a +toMicroSecs x = x * 10^(6 :: Int) + +measureRate' :: + String + -> (Config -> Config) + -> Int -- buffers + -> Int -- threads + -> Either Double Int -- either rate or count of actions + -> Int + -> (Double, Double) + -> (Double, Double) + -> Spec +measureRate' + desc modifier buffers threads rval consumerDelay producerDelay expectedRange = do + + let + cfg = + Stream.maxBuffer buffers + . Stream.maxThreads threads + . case rval of + Left r -> Stream.avgRate r + Right _ -> Stream.rate Nothing + . modifier + + threadAction = + case rval of + Left r -> + Stream.take (round $ 10 * r) + . Stream.parRepeatM cfg + Right n -> + Stream.parReplicateM cfg n + + rateDesc = case rval of + Left r -> " rate: " <> show r + Right n -> " count: " <> show n + + it (desc <> rateDesc + <> " buffers: " <> show buffers + <> " threads: " <> show threads + <> ", consumer latency: " <> show consumerDelay + <> ", producer latency: " <> show producerDelay) + $ durationShouldBe expectedRange $ + Stream.fold Fold.drain + $ (if consumerDelay > 0 + then Stream.mapM $ \x -> + threadDelay (toMicroSecs consumerDelay) >> return x + else id) + $ threadAction $ do + let (t1, t2) = producerDelay + r <- if t1 == t2 + then return $ round $ toMicroSecs t1 + else randomRIO ( round $ toMicroSecs t1 + , round $ toMicroSecs t2) + when (r > 0) $ -- do + -- t1 <- getTime Monotonic + threadDelay r + -- t2 <- getTime Monotonic + -- let delta = fromIntegral (toNanoSecs (t2 - t1)) / 1000000000 + -- putStrLn $ "delay took: " <> show delta + -- when (delta > 2) $ do + -- putStrLn $ "delay took high: " <> show delta + return (1 :: Int) + +measureRateVariable :: + String + -> (Config -> Config) + -> Double + -> Int + -> (Double, Double) + -> (Double, Double) + -> Spec +measureRateVariable desc modifier rval = + measureRate' desc modifier (-1) (-1) (Left rval) + +measureRate :: + String + -> (Config -> Config) + -> Double + -> Int + -> Int + -> (Double, Double) + -> Spec +measureRate desc modifier rval consumerDelay producerDelay dur = + let d = fromIntegral producerDelay + in measureRateVariable desc modifier rval consumerDelay (d, d) dur + +measureThreads :: + String + -> (Config -> Config) + -> Int -- threads + -> Int -- count of actions + -> Spec +measureThreads desc modifier threads count = do + let expectedTime = + if threads < 0 + then 1.0 + else fromIntegral count / fromIntegral threads + duration = (expectedTime * 0.9, expectedTime * 1.1) + measureRate' desc modifier (-1) threads (Right count) 0 (1,1) duration + +measureBuffers :: + String + -> (Config -> Config) + -> Int -- buffers + -> Int -- count of actions + -> Spec +measureBuffers desc modifier buffers count = do + let expectedTime = + if buffers < 0 + then 1.0 + else fromIntegral count / fromIntegral buffers + duration = (expectedTime * 0.9, expectedTime * 1.1) + measureRate' desc modifier buffers (-1) (Right count) 0 (1,1) duration + +moduleName :: String +moduleName = "Data.Stream.Rate" + +main :: IO () +main = hspec $ do + describe moduleName $ do + + describe "maxBuffers" $ do + measureBuffers "async" id (-1) 5 + -- XXX this test fails due to a known issue + -- measureBuffers "maxBuffers" id 1 5 + measureBuffers "async" id 5 5 + + describe "maxThreads" $ do + measureThreads "async" id (-1) 5 + measureThreads "async" id 1 5 + measureThreads "async" id 5 5 + + measureThreads "ordered" (Stream.ordered True) (-1) 5 + measureThreads "ordered" (Stream.ordered True) 1 5 + measureThreads "ordered" (Stream.ordered True) 5 5 + + let range = (8,12) + + -- Note that because after the last yield we don't wait, the last period + -- will be effectively shorter. This becomes significant when the rates are + -- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate + -- 10 0.1 second. + let rates = [1, 10, 100, 1000, 10000 +#ifndef __GHCJS__ + , 100000, 1000000 +#endif + ] + in describe "asyncly no consumer delay no producer delay" $ + forM_ rates (\r -> measureRate "async" id r 0 0 range) + + -- XXX try staggering the dispatches to achieve higher rates + -- Producer delay causes a lot of threads to be created, consuming large + -- amounts of memory at higher rates. + let rates = [1, 10, 100 +#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY + 1000, 10000, 25000 +#endif + ] + in describe "asyncly no consumer delay and 1 sec producer delay" $ + forM_ rates (\r -> measureRate "async" id r 0 1 range) + + -- At lower rates (1/10) this is likely to vary quite a bit depending on + -- the spread of random producer latencies generated. + let rates = [1, 10, 100 +#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY + , 1000, 10000, 25000 +#endif + ] + in describe "asyncly, no consumer delay and variable producer delay" $ + forM_ rates $ \r -> + measureRateVariable "async" id r 0 (0.1, 3) range + + let rates = [1, 10, 100, 1000, 10000 +#ifndef __GHCJS__ + , 100000, 1000000 +#endif + ] + in describe "interleaved, no consumer delay no producer delay" $ + forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 0 range) + + let rates = [1, 10, 100, 1000 +#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY + , 10000, 25000 +#endif + ] + in describe "interleaved, no consumer delay and 1 sec producer delay" $ + forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 1 range) + + let rates = [1, 10, 100, 1000, 10000 +#ifndef __GHCJS__ + , 100000, 1000000 +#endif + ] + in describe "ordered, no consumer delay no producer delay" $ + forM_ rates (\r -> measureRate "ordered" (Stream.ordered True) r 0 0 range) + + -- XXX after the change to stop workers when the heap is clearing + -- thi does not work well at a 25000 ops per second, need to fix. + let rates = [1, 10, 100, 1000 +#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY + , 10000, 12500 +#endif + ] + in describe "ordered, no consumer delay and 1 sec producer delay" $ + forM_ rates (\r -> measureRate "ordered" (Stream.ordered True) r 0 1 range) + + describe "async, some consumer delay and 1 sec producer delay" $ do + -- ideally it should take 10 x 1 + 1 seconds + forM_ [1] (\r -> measureRate "async" id r 1 1 (11, 16)) + -- ideally it should take 10 x 2 + 1 seconds + forM_ [1] (\r -> measureRate "async" id r 2 1 (21, 23)) + -- ideally it should take 10 x 3 + 1 seconds + forM_ [1] (\r -> measureRate "async" id r 3 1 (31, 33)) + + describe "ordered, some consumer delay and 1 sec producer delay" $ do + forM_ [1] (\r -> measureRate "ordered" (Stream.ordered True) r 1 1 (11, 16)) + forM_ [1] (\r -> measureRate "ordered" (Stream.ordered True) r 2 1 (21, 23)) + forM_ [1] (\r -> measureRate "ordered" (Stream.ordered True) r 3 1 (31, 33)) diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index 98c7e27635..172b73fed5 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -250,6 +250,26 @@ test-suite Data.Stream main-is: Streamly/Test/Data/Stream.hs ghc-options: -main-is Streamly.Test.Data.Stream.main +test-suite Data.Stream.Concurrent + import: test-options + type: exitcode-stdio-1.0 + main-is: Streamly/Test/Data/Stream/Concurrent.hs + ghc-options: -main-is Streamly.Test.Data.Stream.Concurrent.main + if flag(use-streamly-core) + buildable: False + +test-suite Data.Stream.Rate + import:always-optimized + type: exitcode-stdio-1.0 + main-is: Streamly/Test/Data/Stream/Rate.hs + ghc-options: -main-is Streamly.Test.Data.Stream.Rate.main + if flag(dev) + buildable: True + else + buildable: False + if flag(use-streamly-core) + buildable: False + test-suite Data.MutArray import: test-options type: exitcode-stdio-1.0 @@ -461,14 +481,6 @@ test-suite Prelude.Async if flag(use-streamly-core) buildable: False -test-suite Data.Stream.Concurrent - import: test-options - type: exitcode-stdio-1.0 - main-is: Streamly/Test/Data/Stream/Concurrent.hs - ghc-options: -main-is Streamly.Test.Data.Stream.Concurrent.main - if flag(use-streamly-core) - buildable: False - test-suite Prelude.Concurrent import: test-options type: exitcode-stdio-1.0 From ef9474afad4bc3968f580561a3ff907774c9edf8 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 02:09:04 +0530 Subject: [PATCH 14/24] Add low rate examples, maxYields support in rate test --- test/Streamly/Test/Data/Stream/Rate.hs | 107 ++++++++++++++++--------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/test/Streamly/Test/Data/Stream/Rate.hs b/test/Streamly/Test/Data/Stream/Rate.hs index 8e0a2e7beb..aef68b7785 100644 --- a/test/Streamly/Test/Data/Stream/Rate.hs +++ b/test/Streamly/Test/Data/Stream/Rate.hs @@ -8,13 +8,16 @@ module Streamly.Test.Data.Stream.Rate (main) where -import Streamly.Data.Stream.Prelude (Stream, Config) +import Data.Int (Int64) +import Streamly.Data.Stream.Prelude (Config) import Streamly.Internal.Data.Time.Clock (getTime, Clock(..)) import Streamly.Internal.Data.Time.Units (NanoSecond64, diffAbsTime64, fromRelTime64) +import System.Mem (performMajorGC) import qualified Streamly.Data.Fold as Fold -import qualified Streamly.Data.Stream.Prelude as Stream +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Stream.Prelude as Stream import Control.Concurrent import Control.Monad @@ -46,40 +49,57 @@ measureRate' :: -> Spec measureRate' desc modifier buffers threads rval consumerDelay producerDelay expectedRange = do - - let - cfg = - Stream.maxBuffer buffers - . Stream.maxThreads threads - . case rval of - Left r -> Stream.avgRate r - Right _ -> Stream.rate Nothing - . modifier - - threadAction = - case rval of - Left r -> - Stream.take (round $ 10 * r) - . Stream.parRepeatM cfg - Right n -> - Stream.parReplicateM cfg n - - rateDesc = case rval of - Left r -> " rate: " <> show r - Right n -> " count: " <> show n - it (desc <> rateDesc <> " buffers: " <> show buffers <> " threads: " <> show threads <> ", consumer latency: " <> show consumerDelay <> ", producer latency: " <> show producerDelay) - $ durationShouldBe expectedRange $ - Stream.fold Fold.drain + runTest + + where + + -- Keep a minimum of 2 for the very low rate cases, otherwise the + -- timing would be 0 because we will finish as soon as the first result + -- arrives. + yieldCount :: Int + yieldCount = case rval of + Left r -> max 2 (round (10 * r)) + Right n -> max 2 n + + rateDesc = (case rval of + Left r -> ", rate: " <> show r <> ", count: " <> show yieldCount + Right n -> ", count: " <> show n) <> "," + + cfg (_n :: Maybe Int64) = + modifier + . Stream.maxBuffer buffers + -- . Stream.inspect True + . Stream.maxThreads threads + . case rval of + Left r -> Stream.avgRate r + Right _ -> Stream.rate Nothing + -- XXX it comes out less than expected for ordered streams at high + -- rates, need to fix. + -- . Stream.maxYields (Just (fromIntegral yieldCount)) + + threadAction f = + case rval of + Left _ -> + Stream.take yieldCount + $ Stream.parMapM (cfg (Just (fromIntegral yieldCount))) f + $ Stream.enumerateFrom (1 :: Int) + Right _ -> + Stream.parReplicateM + (cfg (Just (fromIntegral yieldCount))) yieldCount (f 1) + + runTest = do + durationShouldBe expectedRange $ do + res <- Stream.fold Fold.length $ (if consumerDelay > 0 then Stream.mapM $ \x -> threadDelay (toMicroSecs consumerDelay) >> return x else id) - $ threadAction $ do + $ threadAction $ \_idx -> do let (t1, t2) = producerDelay r <- if t1 == t2 then return $ round $ toMicroSecs t1 @@ -93,7 +113,15 @@ measureRate' -- putStrLn $ "delay took: " <> show delta -- when (delta > 2) $ do -- putStrLn $ "delay took high: " <> show delta + -- putStrLn $ "Done: " ++ show idx return (1 :: Int) + when (res /= yieldCount) $ + error $ "expected yield count: " ++ show yieldCount + ++ " actual: " ++ show res + + -- To ensure that when we use "inspect" option on the channel, GC + -- occurs and cleans up the channel to print the debug info. + performMajorGC measureRateVariable :: String @@ -168,29 +196,36 @@ main = hspec $ do measureThreads "ordered" (Stream.ordered True) 1 5 measureThreads "ordered" (Stream.ordered True) 5 5 + measureThreads "interleaved" (Stream.interleaved True) (-1) 5 + measureThreads "interleaved" (Stream.interleaved True) 1 5 + measureThreads "interleaved" (Stream.interleaved True) 5 5 + + describe "max rate possible (count / time)" $ do + measureRate "async" (Stream.rate Nothing) 1000000 0 0 (0, 1e9) + let range = (8,12) -- Note that because after the last yield we don't wait, the last period -- will be effectively shorter. This becomes significant when the rates are -- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate -- 10 0.1 second. - let rates = [1, 10, 100, 1000, 10000 + let rates = [0.1, 1, 10, 100, 1000, 10000 #ifndef __GHCJS__ , 100000, 1000000 #endif ] - in describe "asyncly no consumer delay no producer delay" $ + in describe "async no consumer delay no producer delay" $ forM_ rates (\r -> measureRate "async" id r 0 0 range) -- XXX try staggering the dispatches to achieve higher rates -- Producer delay causes a lot of threads to be created, consuming large -- amounts of memory at higher rates. - let rates = [1, 10, 100 + let rates = [0.1, 1, 10, 100 #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY 1000, 10000, 25000 #endif ] - in describe "asyncly no consumer delay and 1 sec producer delay" $ + in describe "async no consumer delay and 1 sec producer delay" $ forM_ rates (\r -> measureRate "async" id r 0 1 range) -- At lower rates (1/10) this is likely to vary quite a bit depending on @@ -200,11 +235,11 @@ main = hspec $ do , 1000, 10000, 25000 #endif ] - in describe "asyncly, no consumer delay and variable producer delay" $ + in describe "async, no consumer delay and variable producer delay" $ forM_ rates $ \r -> measureRateVariable "async" id r 0 (0.1, 3) range - let rates = [1, 10, 100, 1000, 10000 + let rates = [0.1, 1, 10, 100, 1000, 10000 #ifndef __GHCJS__ , 100000, 1000000 #endif @@ -212,7 +247,7 @@ main = hspec $ do in describe "interleaved, no consumer delay no producer delay" $ forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 0 range) - let rates = [1, 10, 100, 1000 + let rates = [0.1, 1, 10, 100, 1000 #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY , 10000, 25000 #endif @@ -220,7 +255,7 @@ main = hspec $ do in describe "interleaved, no consumer delay and 1 sec producer delay" $ forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 1 range) - let rates = [1, 10, 100, 1000, 10000 + let rates = [0.1, 1, 10, 100, 1000, 10000 #ifndef __GHCJS__ , 100000, 1000000 #endif @@ -230,7 +265,7 @@ main = hspec $ do -- XXX after the change to stop workers when the heap is clearing -- thi does not work well at a 25000 ops per second, need to fix. - let rates = [1, 10, 100, 1000 + let rates = [0.1, 1, 10, 100, 1000 #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY , 10000, 12500 #endif From 6cbfc49157463171219b1139fce90a484e41aebb Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 16 Feb 2024 00:41:51 +0530 Subject: [PATCH 15/24] Add stop reason and life-time duration to channel dump --- .../Internal/Data/Channel/Dispatcher.hs | 80 ++++++++++--------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/Streamly/Internal/Data/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Channel/Dispatcher.hs index aa610b4f7f..1a436c72ea 100644 --- a/src/Streamly/Internal/Data/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Channel/Dispatcher.hs @@ -207,47 +207,55 @@ dumpSVarStats inspecting rateInfo ss = do minLat <- readIORef $ minWorkerLatency ss maxLat <- readIORef $ maxWorkerLatency ss (avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss - (svarCnt, svarGainLossCnt, svarLat) <- case rateInfo of - Nothing -> return (0, 0, 0) + stopTime <- readIORef (svarStopTime ss) + let stopReason = + case stopTime of + Nothing -> "on GC" + Just _ -> "normal" + (svarCnt, svarGainLossCnt, svarLat, interval) <- case rateInfo of + Nothing -> return (0, 0, 0, 0) Just yinfo -> do (cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo + interval <- + case stopTime of + Nothing -> do + now <- getTime Monotonic + return (diffAbsTime64 now startTime) + Just t -> do + return (diffAbsTime64 t startTime) if cnt > 0 then do - t <- readIORef (svarStopTime ss) gl <- readIORef (svarGainedLostYields yinfo) - case t of - Nothing -> do - now <- getTime Monotonic - let interval = diffAbsTime64 now startTime - return (cnt, gl, interval `div` fromIntegral cnt) - Just stopTime -> do - let interval = diffAbsTime64 stopTime startTime - return (cnt, gl, interval `div` fromIntegral cnt) - else return (0, 0, 0) - - return $ unlines - [ "total dispatches = " <> show dispatches - , "max workers = " <> show maxWrk - , "max outQSize = " <> show maxOq - <> (if minLat > 0 - then "\nmin worker latency = " <> showNanoSecond64 minLat - else "") - <> (if maxLat > 0 - then "\nmax worker latency = " <> showNanoSecond64 maxLat - else "") - <> (if avgCnt > 0 - then let lat = avgTime `div` fromIntegral avgCnt - in "\navg worker latency = " <> showNanoSecond64 lat - else "") - <> (if svarLat > 0 - then "\nSVar latency = " <> showRelTime64 svarLat - else "") - <> (if svarCnt > 0 - then "\nSVar yield count = " <> show svarCnt - else "") - <> (if svarGainLossCnt > 0 - then "\nSVar gain/loss yield count = " <> show svarGainLossCnt - else "") + return (cnt, gl, interval `div` fromIntegral cnt, interval) + else return (0, 0, 0, interval) + + return $ concat + [ "stop reason = " <> stopReason + , if interval > 0 + then "\nlife time = " <> showRelTime64 interval + else "" + , "\ntotal dispatches = " <> show dispatches + , "\nmax workers = " <> show maxWrk + , "\nmax outQSize = " <> show maxOq + , if minLat > 0 + then "\nmin worker latency = " <> showNanoSecond64 minLat + else "" + , if maxLat > 0 + then "\nmax worker latency = " <> showNanoSecond64 maxLat + else "" + , if avgCnt > 0 + then let lat = avgTime `div` fromIntegral avgCnt + in "\navg worker latency = " <> showNanoSecond64 lat + else "" + , if svarLat > 0 + then "\nchannel latency = " <> showRelTime64 svarLat + else "" + , if svarCnt > 0 + then "\nchannel yield count = " <> show svarCnt + else "" + , if svarGainLossCnt > 0 + then "\nchannel gain/loss yield count = " <> show svarGainLossCnt + else "" ] ------------------------------------------------------------------------------- From 796352a4581bfb53fa26b80bd888e4793f6d31ba Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 16 Feb 2024 03:12:06 +0530 Subject: [PATCH 16/24] Remove unnecessry type parameter from AheadHeapEntry --- .../Internal/Data/Stream/Channel/Append.hs | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index d1d7806ca1..692c31d0ae 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -19,6 +19,7 @@ module Streamly.Internal.Data.Stream.Channel.Append ) where +import Debug.Trace import Control.Concurrent (myThreadId) import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar) import Control.Exception (assert) @@ -26,7 +27,6 @@ import Control.Monad (when, void) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.Heap (Heap, Entry(..)) import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef) -import Data.Kind (Type) import GHC.Exts (inline) import Streamly.Internal.Control.Concurrent (MonadRunInIO, RunInIO(..), askRunInIO, restoreM) @@ -250,7 +250,7 @@ enqueueAhead sv q m = do {-# INLINE dequeueAhead #-} dequeueAhead :: MonadIO m - => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) + => IORef ([K.StreamK m a], Int) -> m (Maybe (K.StreamK m a, Int)) dequeueAhead q = liftIO $ atomicModifyIORefCAS q $ \case ([], n) -> (([], n), Nothing) @@ -259,7 +259,7 @@ dequeueAhead q = liftIO $ -- Dequeue only if the seq number matches the expected seq number. {-# INLINE dequeueAheadSeqCheck #-} dequeueAheadSeqCheck :: MonadIO m - => IORef ([t m a], Int) -> Int -> m (Maybe (t m a)) + => IORef ([K.StreamK m a], Int) -> Int -> m (Maybe (K.StreamK m a)) dequeueAheadSeqCheck q seqNo = liftIO $ atomicModifyIORefCAS q $ \case ([], n) -> (([], n), Nothing) @@ -279,20 +279,20 @@ atomicModifyIORef_ :: IORef a -> (a -> a) -> IO () atomicModifyIORef_ ref f = atomicModifyIORef ref $ \x -> (f x, ()) -data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a = +data AheadHeapEntry m a = AheadEntryNull | AheadEntryPure a - | AheadEntryStream (RunInIO m, t m a) + | AheadEntryStream (RunInIO m, K.StreamK m a) -data HeapDequeueResult t m a = +data HeapDequeueResult m a = Clearing | Waiting Int - | Ready (Entry Int (AheadHeapEntry t m a)) + | Ready (Entry Int (AheadHeapEntry m a)) {-# INLINE dequeueFromHeap #-} dequeueFromHeap - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) - -> IO (HeapDequeueResult t m a) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -> IO (HeapDequeueResult m a) dequeueFromHeap hpVar = atomicModifyIORef hpVar $ \pair@(hp, snum) -> case snum of @@ -308,9 +308,9 @@ dequeueFromHeap hpVar = {-# INLINE dequeueFromHeapSeq #-} dequeueFromHeapSeq - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Int - -> IO (HeapDequeueResult t m a) + -> IO (HeapDequeueResult m a) dequeueFromHeapSeq hpVar i = atomicModifyIORef hpVar $ \(hp, snum) -> case snum of @@ -332,8 +332,8 @@ heapIsSane snum seqNo = {-# INLINE requeueOnHeapTop #-} requeueOnHeapTop - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) - -> Entry Int (AheadHeapEntry t m a) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -> Entry Int (AheadHeapEntry m a) -> Int -> IO () requeueOnHeapTop hpVar ent seqNo = @@ -342,7 +342,7 @@ requeueOnHeapTop hpVar ent seqNo = {-# INLINE updateHeapSeq #-} updateHeapSeq - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Int -> IO () updateHeapSeq hpVar seqNo = @@ -427,7 +427,7 @@ updateHeapSeq hpVar seqNo = {-# INLINE underMaxHeap #-} underMaxHeap :: Channel m a - -> Heap (Entry Int (AheadHeapEntry K.StreamK m a)) + -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool underMaxHeap sv hp = do (_, len) <- readIORef (outputQueue sv) @@ -449,7 +449,7 @@ underMaxHeap sv hp = do -- False => continue preStopCheck :: Channel m a - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)) , Maybe Int) -> IO Bool preStopCheck sv heap = -- check the stop condition under a lock before actually @@ -502,10 +502,10 @@ abortExecution sv winfo = do processHeap :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo - -> AheadHeapEntry K.StreamK m a + -> AheadHeapEntry m a -> Int -> Bool -- we are draining the heap before we stop -> m () @@ -574,6 +574,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry -- only in yield continuation where we may have a remaining stream to be -- pushed on the heap. singleStreamFromHeap seqNo a = do + trace "singleStreamFromHeap" (return ()) void $ liftIO $ yieldWith winfo sv a nextHeap seqNo @@ -606,6 +607,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry stopWith winfo sv yieldStreamFromHeap seqNo a r = do + trace "yieldStreamFromHeap" (return ()) continue <- liftIO $ yieldWith winfo sv a runStreamWithYieldLimit continue seqNo r @@ -613,7 +615,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry drainHeap :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> m () @@ -629,7 +631,7 @@ data HeapStatus = HContinue | HStop processWithoutToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> K.StreamK m a @@ -709,7 +711,7 @@ data TokenWorkerStatus = TokenContinue Int | TokenSuspend processWithToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> K.StreamK m a @@ -737,6 +739,7 @@ processWithToken q heap sv winfo action sno = do where singleOutput seqNo a = do + trace "singleOutput" (return ()) continue <- liftIO $ yieldWith winfo sv a if continue then return $ TokenContinue (seqNo + 1) @@ -748,6 +751,7 @@ processWithToken q heap sv winfo action sno = do -- incrementing the yield in a stop continuation. Essentiatlly all -- "unstream" calls in this function must increment yield limit on stop. yieldOutput seqNo a r = do + trace "yieldOutput" (return ()) continue <- liftIO $ yieldWith winfo sv a yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) if continue && yieldLimitOk @@ -818,7 +822,7 @@ processWithToken q heap sv winfo action sno = do workLoopAhead :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> m () From df1ed32155ab4a8854e55fb4b46d922f065a95a0 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 02:05:11 +0530 Subject: [PATCH 17/24] Fix workerPollingInterval --- src/Streamly/Internal/Data/Channel/Dispatcher.hs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Streamly/Internal/Data/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Channel/Dispatcher.hs index 1a436c72ea..4c3aa96c0e 100644 --- a/src/Streamly/Internal/Data/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Channel/Dispatcher.hs @@ -70,7 +70,11 @@ minThreadDelay = 1000000 updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO () updateWorkerPollingInterval yinfo latency = do let periodRef = workerPollingInterval yinfo - cnt = max 1 $ minThreadDelay `div` latency + -- This depends on the rate, if the rate is low, latencies are + -- small, by the time we poll it might be too late and we may have + -- yielded too many results. + -- cnt = max 1 $ minThreadDelay `div` latency + cnt = max 1 (latency `div` svarLatencyTarget yinfo) period = min cnt (fromIntegral magicMaxBuffer) writeIORef periodRef (fromIntegral period) From 1c861b14c161b3887f878ed89165c660971a344e Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 16 Feb 2024 06:33:30 +0530 Subject: [PATCH 18/24] Fix a rate control issue in ordered streams We were consing an evaluated stream element back into the stream and puttting it on heap. This caused the latency of that item to be very low next time it was yielded. --- .../Internal/Data/Stream/Channel/Append.hs | 231 ++++++++++++------ 1 file changed, 153 insertions(+), 78 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 692c31d0ae..387599091d 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -19,7 +19,6 @@ module Streamly.Internal.Data.Stream.Channel.Append ) where -import Debug.Trace import Control.Concurrent (myThreadId) import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar) import Control.Exception (assert) @@ -280,18 +279,38 @@ atomicModifyIORef_ ref f = atomicModifyIORef ref $ \x -> (f x, ()) data AheadHeapEntry m a = - AheadEntryNull - | AheadEntryPure a - | AheadEntryStream (RunInIO m, K.StreamK m a) + AheadEntryNull -- ^ an empty result, required for sequencing + | AheadEntryPure a -- ^ a yielded value + -- ^ A stream with its head possibly evaluated, and tail unevaluated + | AheadEntryStream (RunInIO m, Maybe a, K.StreamK m a) data HeapDequeueResult m a = + -- | Not dequeued because someone is processing the heap. This is indicated + -- by the second component of the heap IORef tuple being set to 'Nothing'. Clearing + -- | Not dequeued because the seq no. of the top entry is not the next one + -- expected in seqeunce, we have to wait. | Waiting Int + -- | dequeued successfully, the seq no. of the top entry is the next one + -- expected in sequence. | Ready (Entry Int (AheadHeapEntry m a)) +-- | The heap is stored in an IORef along with a sequence number. When the +-- sequence number is set to 'Nothing' it means we are processing the heap. The +-- type of the dequeued entry would be 'Clearing' in this case. When the +-- sequence number in the IORef is set to 'Just' then it is the next expected +-- sequence number. If the dequeued entry matches with this expected sequence +-- number then it is 'Ready' and dequeued otherwise it is 'Waiting'. When we +-- return 'Clearing' or 'Waiting', the heap is not modified i.e. nothing is +-- dequeued. +-- +-- Note, when we have n streams each consisting of multiple items composed with +-- "ordered" execution then the entire stream is treated as one item with the +-- given sequence number and all of its elements are yielded serially. {-# INLINE dequeueFromHeap #-} dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -- ^ (heap, Maybe sequence-no). -> IO (HeapDequeueResult m a) dequeueFromHeap hpVar = atomicModifyIORef hpVar $ \pair@(hp, snum) -> @@ -306,6 +325,10 @@ dequeueFromHeap hpVar = else assert (seqNo >= n) (pair, Waiting n) Nothing -> (pair, Waiting n) +-- | Called only when the heap is being processed to transfer entries to output +-- queue. Matches the sequence number of the dequeued entry with the supplied +-- sequence number to determine if the entry is 'Ready' or 'Waiting'. Heap is +-- not modified if we return 'Waiting' i.e. entry is not dequeued. {-# INLINE dequeueFromHeapSeq #-} dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) @@ -498,86 +521,39 @@ abortExecution sv winfo = do -- 2) make the other threads queue and go away if draining is in progress -- -- In both cases we give the drainer a chance to run more often. + +-- | Move entries from the heap to the channel's output queue. Only those +-- entries which are in correct order are transferred. Stop whenever a missing +-- sequence number is encountered. -- +-- We enter this function only when we have verified that the sequence number +-- passed to it is the next expected sequence number. processHeap :: MonadRunInIO m - => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + => IORef ([K.StreamK m a], Int) -- ^ work queue + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -- ^ heap -> Channel m a -> Maybe WorkerInfo - -> AheadHeapEntry m a - -> Int - -> Bool -- we are draining the heap before we stop + -> AheadHeapEntry m a -- heap entry dequeued from top of heap + -> Int -- seq no. of the heap entry, this is the next correct seq no. + -> Bool -- True if we are draining the heap when we are finally stopping -> m () processHeap q heap sv winfo entry sno stopping = loopHeap sno entry where - stopIfNeeded ent seqNo r = do - stopIt <- liftIO $ preStopCheck sv heap - if stopIt - then liftIO $ do - -- put the entry back in the heap and stop - requeueOnHeapTop heap (Entry seqNo ent) seqNo - stopWith winfo sv - else runStreamWithYieldLimit True seqNo r - - loopHeap seqNo ent = - case ent of - AheadEntryNull -> nextHeap seqNo - AheadEntryPure a -> do - -- Use 'send' directly so that we do not account this in worker - -- latency as this will not be the real latency. - -- Don't stop the worker in this case as we are just - -- transferring available results from heap to outputQueue. - void - $ liftIO - $ sendEvent - (outputQueue sv) (outputDoorBell sv) (ChildYield a) - nextHeap seqNo - AheadEntryStream (RunInIO runin, r) -> do - if stopping - then stopIfNeeded ent seqNo r - else do - res <- liftIO $ runin (runStreamWithYieldLimit True seqNo r) - restoreM res - - nextHeap prevSeqNo = do - res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) - case res of - Ready (Entry seqNo hent) -> loopHeap seqNo hent - Clearing -> liftIO $ stopWith winfo sv - Waiting _ -> - if stopping - then do - r <- liftIO $ preStopCheck sv heap - if r - then liftIO $ stopWith winfo sv - else processWorkQueue prevSeqNo - else inline processWorkQueue prevSeqNo - - processWorkQueue prevSeqNo = do - yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) - if yieldLimitOk - then do - work <- dequeueAhead q - case work of - Nothing -> liftIO $ stopWith winfo sv - Just (m, seqNo) -> do - if seqNo == prevSeqNo + 1 - then processWithToken q heap sv winfo m seqNo - else processWithoutToken q heap sv winfo m seqNo - else liftIO $ abortExecution sv winfo - -- We do not stop the worker on buffer full here as we want to proceed to -- nextHeap anyway so that we can clear any subsequent entries. We stop -- only in yield continuation where we may have a remaining stream to be -- pushed on the heap. singleStreamFromHeap seqNo a = do - trace "singleStreamFromHeap" (return ()) void $ liftIO $ yieldWith winfo sv a nextHeap seqNo + yieldStreamFromHeap seqNo a r = do + continue <- liftIO $ yieldWith winfo sv a + runStreamWithYieldLimit continue seqNo r + -- XXX when we have an unfinished stream on the heap we cannot account all -- the yields of that stream until it finishes, so if we have picked up -- and executed more actions beyond that in the parent stream and put them @@ -600,16 +576,93 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ do requeueOnHeapTop heap ent seqNo incrementYieldLimit (remainingWork sv) stopWith winfo sv - yieldStreamFromHeap seqNo a r = do - trace "yieldStreamFromHeap" (return ()) - continue <- liftIO $ yieldWith winfo sv a - runStreamWithYieldLimit continue seqNo r + processWorkQueue prevSeqNo = do + yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) + if yieldLimitOk + then do + work <- dequeueAhead q + case work of + Nothing -> liftIO $ stopWith winfo sv + Just (m, seqNo) -> do + if seqNo == prevSeqNo + 1 + then processWithToken q heap sv winfo m seqNo + else processWithoutToken q heap sv winfo m seqNo + else liftIO $ abortExecution sv winfo + + nextHeap prevSeqNo = do + res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) + case res of + Ready (Entry seqNo hent) -> loopHeap seqNo hent + Clearing -> liftIO $ stopWith winfo sv + Waiting _ -> + if stopping + then do + r <- liftIO $ preStopCheck sv heap + if r + then liftIO $ stopWith winfo sv + else processWorkQueue prevSeqNo + else inline processWorkQueue prevSeqNo + + -- The main loop processing the heap. The seqNo is correct in sequence, + -- this is the one we should be sending to output next. + loopHeap seqNo ent = + case ent of + AheadEntryNull -> nextHeap seqNo + AheadEntryPure a -> do + -- Use 'send' directly so that we do not account this in worker + -- latency as this will not be the real latency. + -- Don't stop the worker in this case as we are just + -- transferring available results from heap to outputQueue. + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + nextHeap seqNo + AheadEntryStream (RunInIO runin, Just a, r) -> do + let + action = do + -- XXX deduplicate this code with the same code above + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go + AheadEntryStream (RunInIO runin, Nothing, r) -> do + -- XXX deuplicate this code with the code above + let + action = runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go {-# NOINLINE drainHeap #-} drainHeap @@ -628,6 +681,12 @@ drainHeap q heap sv winfo = do data HeapStatus = HContinue | HStop +-- XXX Rename to processOutOfOrder + +-- | Without token means the worker is working on an item which not the next in +-- sequence, therefore, the output has to be placed on the heap rather than +-- sending it directly to the output queue. +-- processWithoutToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -649,11 +708,15 @@ processWithoutToken q heap sv winfo m seqNo = do toHeap AheadEntryNull mrun = runInIO $ svarMrun sv + -- XXX When StreamD streams are converted to StreamK, even for singleton + -- streams we have a yield and a stop. That can cause perf overhead in case + -- of concurrent workers. We should always create streams with a "single" + -- continuation. r <- liftIO $ mrun $ K.foldStreamShared undefined (\a r -> do runIn <- askRunInIO - toHeap $ AheadEntryStream (runIn, K.cons a r)) + toHeap $ AheadEntryStream (runIn, Just a, r)) (toHeap . AheadEntryPure) stopk m @@ -681,6 +744,8 @@ processWithoutToken q heap sv winfo m seqNo = do writeIORef (maxHeapSize $ svarStats sv) (H.size newHp) heapOk <- liftIO $ underMaxHeap sv newHp + + -- XXX Refactor to use join points status <- case yieldRateInfo sv of Nothing -> return HContinue @@ -708,6 +773,14 @@ processWithoutToken q heap sv winfo m seqNo = do data TokenWorkerStatus = TokenContinue Int | TokenSuspend +-- XXX Rename to processInOrder + +-- | With token means this worker is working on an item which is the next in +-- sequence, therefore, it can be yielded directly to the output queue, +-- avoiding the heap. +-- +-- Before suspending the worker has the responsibility to transfer all the +-- in-sequence entries from the heap to the output queue. processWithToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -739,7 +812,6 @@ processWithToken q heap sv winfo action sno = do where singleOutput seqNo a = do - trace "singleOutput" (return ()) continue <- liftIO $ yieldWith winfo sv a if continue then return $ TokenContinue (seqNo + 1) @@ -751,7 +823,6 @@ processWithToken q heap sv winfo action sno = do -- incrementing the yield in a stop continuation. Essentiatlly all -- "unstream" calls in this function must increment yield limit on stop. yieldOutput seqNo a r = do - trace "yieldOutput" (return ()) continue <- liftIO $ yieldWith winfo sv a yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) if continue && yieldLimitOk @@ -766,7 +837,7 @@ processWithToken q heap sv winfo action sno = do r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ requeueOnHeapTop heap ent seqNo liftIO $ incrementYieldLimit (remainingWork sv) return TokenSuspend @@ -875,9 +946,13 @@ getLifoSVar :: forall m a. MonadRunInIO m => RunInIO m -> Config -> IO (Channel m a) getLifoSVar mrun cfg = do outQ <- newIORef ([], 0) - -- the second component of the tuple is "Nothing" when heap is being - -- cleared, "Just n" when we are expecting sequence number n to arrive - -- before we can start clearing the heap. + -- The second component of the heap IORef tuple is: + -- + -- * "Nothing" when we are in the process of clearing the heap i.e. when + -- we are procssing the heap and transferring entries from the heap to the + -- output queue + -- * "Just n" when we are expecting sequence number n to arrive before we + -- can start clearing the heap. outH <- newIORef (H.empty, Just 0) outQMv <- newEmptyMVar active <- newIORef 0 From 76c06f9da46b4bc9f2117457153ea7394b28fa65 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 02:07:20 +0530 Subject: [PATCH 19/24] Fix rate control in preStopCheck of ordered stream worker --- src/Streamly/Internal/Data/Stream/Channel/Append.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 387599091d..a8cd3d8216 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -491,10 +491,10 @@ preStopCheck sv heap = case yieldRateInfo sv of Nothing -> continue Just yinfo -> do - rateOk <- + beyondRate <- isBeyondMaxRate (maxWorkerLimit sv) (workerCount sv) yinfo - if rateOk then continue else stopping + if beyondRate then stopping else continue else stopping abortExecution :: Channel m a -> Maybe WorkerInfo -> IO () From d8f2bb5b3c12c14843b1b457d6cff8ec4c50c4f7 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 01:09:59 +0530 Subject: [PATCH 20/24] Add changelog entry for rate control fix --- docs/User/Project/Changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/User/Project/Changelog.md b/docs/User/Project/Changelog.md index 2d544ef6a9..63ade08881 100644 --- a/docs/User/Project/Changelog.md +++ b/docs/User/Project/Changelog.md @@ -2,6 +2,10 @@ +## unreleased + +* Fix rate control for ordered streams. + ## 0.10.1 (Jan 2024) * Fix TH macros in `Streamly.Data.Stream.MkType` for GHC 9.6 and above. From 39c1717fcf57dd70ae4543ffd59739c078ce4c2b Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 07:16:43 +0530 Subject: [PATCH 21/24] Disable haddock build for 8.6.5 --- .github/workflows/haskell.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/haskell.yml b/.github/workflows/haskell.yml index 4426222659..6ef0e6ff4d 100644 --- a/.github/workflows/haskell.yml +++ b/.github/workflows/haskell.yml @@ -193,6 +193,7 @@ jobs: cabal_project: cabal.project cabal_build_options: "--flag debug --flag -opt" disable_sdist_build: "y" + disable_docs: "y" ignore_error: false # - name: hlint # build: hlint From f7c281de5415a6c4285fdf7fb41c3a2aa7620605 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 07:17:04 +0530 Subject: [PATCH 22/24] Use GHC-9.8.1 for packdiff --- .github/workflows/packdiff.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/packdiff.yml b/.github/workflows/packdiff.yml index c443b7477f..78ea7d5af7 100644 --- a/.github/workflows/packdiff.yml +++ b/.github/workflows/packdiff.yml @@ -11,10 +11,10 @@ jobs: - name: Download ghc run: | - GHCUP_VER=0.1.18.0 + GHCUP_VER=0.1.20.0 curl -sL -o ./ghcup https://downloads.haskell.org/~ghcup/$GHCUP_VER/x86_64-linux-ghcup-$GHCUP_VER chmod +x ./ghcup - GHCVER=8.10.7 + GHCVER=9.8.1 ./ghcup install ghc $GHCVER ./ghcup set ghc $GHCVER cabal update From d5561cc98a690eb4d2a50c33fb3c88d8ea9b3d2e Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 07:50:46 +0530 Subject: [PATCH 23/24] Disable haddock build for ghc-8.8 --- .github/workflows/haskell.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/haskell.yml b/.github/workflows/haskell.yml index 6ef0e6ff4d..0b00f0e932 100644 --- a/.github/workflows/haskell.yml +++ b/.github/workflows/haskell.yml @@ -184,6 +184,7 @@ jobs: cabal_version: 3.6.2.0 cabal_project: cabal.project disable_sdist_build: "y" + disable_docs: "y" ignore_error: false - name: 8.6.5-debug-unoptimized ghc_version: 8.6.5 From 82fcad38a91b8e807683ee7558a9e211c5028351 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 17 Feb 2024 07:51:07 +0530 Subject: [PATCH 24/24] Update packdiff repo commit --- cabal.project.packdiff | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cabal.project.packdiff b/cabal.project.packdiff index ea793c3cef..dd65345073 100644 --- a/cabal.project.packdiff +++ b/cabal.project.packdiff @@ -1,9 +1,9 @@ source-repository-package type: git location: https://github.com/composewell/packdiff.git - tag: b11a50c9c0523382a19eebc0b12efdae194406a9 + tag: c9b07feb25a896bd5364ddb847ce73047a837859 source-repository-package type: git location: https://github.com/composewell/streamly-process.git - tag: d80b860d9d8ea98e4f7f63390442b3155c34dd08 + tag: cc81711774e530601b14d3f34b362941ed80b070