Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency: update docs, fix dev build, fix rate control, fix/add benchmarks/tests #2729

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dd12c4f
Add maxThreads 1 benchmarks for parConcat
harendra-kumar Feb 14, 2024
0ea617d
Fix work queue for parConcatMap
harendra-kumar Feb 13, 2024
184168c
Update docs for stream channels
harendra-kumar Feb 14, 2024
9077f39
Export maxYields
harendra-kumar Feb 16, 2024
8e7760f
Rename concatMapDivK to concatMapHeadK
harendra-kumar Feb 14, 2024
6bb838c
Move parConcatMapChanK to the Channel module
harendra-kumar Feb 14, 2024
b2783db
Remove the two queue solution for async streams
harendra-kumar Feb 14, 2024
997580a
Rename some dispatcher functions
harendra-kumar Feb 15, 2024
fe20741
Add rate benchmarks for new concurrent streams
harendra-kumar Feb 15, 2024
333cc94
Add comparison benchmarks for rate/without rate
harendra-kumar Feb 15, 2024
fd9ca82
Fix old prelude rate benchmarks
harendra-kumar Feb 15, 2024
8a62be8
Fix build with "dev" flag
harendra-kumar Feb 15, 2024
04f5909
Add rate tests for new concurrent streams
harendra-kumar Feb 15, 2024
ef9474a
Add low rate examples, maxYields support in rate test
harendra-kumar Feb 16, 2024
6cbfc49
Add stop reason and life-time duration to channel dump
harendra-kumar Feb 15, 2024
796352a
Remove unnecessry type parameter from AheadHeapEntry
harendra-kumar Feb 15, 2024
df1ed32
Fix workerPollingInterval
harendra-kumar Feb 16, 2024
1c861b1
Fix a rate control issue in ordered streams
harendra-kumar Feb 16, 2024
76c06f9
Fix rate control in preStopCheck of ordered stream worker
harendra-kumar Feb 16, 2024
d8f2bb5
Add changelog entry for rate control fix
harendra-kumar Feb 16, 2024
39c1717
Disable haddock build for 8.6.5
harendra-kumar Feb 17, 2024
f7c281d
Use GHC-9.8.1 for packdiff
harendra-kumar Feb 17, 2024
d5561cc
Disable haddock build for ghc-8.8
harendra-kumar Feb 17, 2024
82fcad3
Update packdiff repo commit
harendra-kumar Feb 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ jobs:
cabal_version: 3.6.2.0
cabal_project: cabal.project
disable_sdist_build: "y"
disable_docs: "y"
ignore_error: false
- name: 8.6.5-debug-unoptimized
ghc_version: 8.6.5
Expand All @@ -193,6 +194,7 @@ jobs:
cabal_project: cabal.project
cabal_build_options: "--flag debug --flag -opt"
disable_sdist_build: "y"
disable_docs: "y"
ignore_error: false
# - name: hlint
# build: hlint
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/packdiff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ jobs:

- name: Download ghc
run: |
GHCUP_VER=0.1.18.0
GHCUP_VER=0.1.20.0
curl -sL -o ./ghcup https://downloads.haskell.org/~ghcup/$GHCUP_VER/x86_64-linux-ghcup-$GHCUP_VER
chmod +x ./ghcup
GHCVER=8.10.7
GHCVER=9.8.1
./ghcup install ghc $GHCVER
./ghcup set ghc $GHCVER
cabal update
Expand Down
10 changes: 6 additions & 4 deletions benchmark/Streamly/Benchmark/Data/Stream/ConcurrentCommon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ concatFmapStreamsWith f outer inner n =
$ Async.parConcat f
$ fmap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)

o_1_space_concatMap :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_concatMap value f =
o_1_space_concatMap :: String -> Int -> (Config -> Config) -> [Benchmark]
o_1_space_concatMap label value f =
value2 `seq`
[ bgroup "concat"
[ bgroup ("concat" ++ label)
[ benchIO "parConcatMap (n of 1)"
(concatMapStreamsWith f value 1)
, benchIO "parConcatMap (sqrt x of sqrt x)"
Expand Down Expand Up @@ -198,7 +198,9 @@ allBenchmarks moduleName wide modifier value =
[ bgroup (o_1_space_prefix moduleName) $ concat
[ o_1_space_mapping value modifier
, o_1_space_concatFoldable value modifier
, o_1_space_concatMap value modifier
, o_1_space_concatMap "" value modifier
, o_1_space_concatMap "-maxThreads-1" value (modifier . Async.maxThreads 1)
, o_1_space_concatMap "-rate-Nothing" value (modifier . Async.rate Nothing)
, o_1_space_joining value modifier
] ++ if wide then [] else o_1_space_outerProduct value modifier
, bgroup (o_n_heap_prefix moduleName) $ concat
Expand Down
129 changes: 129 additions & 0 deletions benchmark/Streamly/Benchmark/Data/Stream/Rate.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
{-# LANGUAGE FlexibleContexts #-}

-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : [email protected]

import Stream.Common (benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Prelude (MonadAsync, Config)

import qualified Streamly.Data.Stream.Prelude as Stream

import Streamly.Benchmark.Common
import Test.Tasty.Bench

moduleName :: String
moduleName = "Data.Stream.Rate"

-------------------------------------------------------------------------------
-- Average Rate
-------------------------------------------------------------------------------

{-# INLINE rateNothing #-}
rateNothing :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Int -> Stream m Int
rateNothing f cfg value = f (Stream.rate Nothing . cfg) . sourceUnfoldrM value

{-# INLINE avgRate #-}
avgRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
avgRate f cfg value rt = f (Stream.avgRate rt . cfg) . sourceUnfoldrM value

{-
-- parEval should be maxThreads 1 anyway
{-# INLINE avgRateThreads1 #-}
avgRateThreads1 :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
avgRateThreads1 f cfg value rt =
f (Stream.maxThreads 1 . Stream.avgRate rt . cfg) . sourceUnfoldrM value
-}

{-# INLINE minRate #-}
minRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
minRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value

{-# INLINE maxRate #-}
maxRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
maxRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value

{-# INLINE constRate #-}
constRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
constRate f cfg value rt = f (Stream.constRate rt . cfg) . sourceUnfoldrM value

-- XXX arbitrarily large rate should be the same as rate Nothing
-- XXX Add tests for multiworker cases as well - parMapM
o_1_space_async :: Int -> [Benchmark]
o_1_space_async value =
[ bgroup
"default/parEval"
[ bgroup
"avgRate"
-- benchIO "unfoldr" $ toNull
[ benchIOSrc "baseline" $ sourceUnfoldrM value
, benchIOSrc "Nothing" $ rateNothing Stream.parEval id value
, benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000
, benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000
-- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000
, benchIOSrc "10M" $ avgRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ avgRate Stream.parEval id value 20000000
]
, bgroup
"minRate"
[ benchIOSrc "1M" $ minRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ minRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ minRate Stream.parEval id value 20000000
]
, bgroup
"maxRate"
[ -- benchIOSrc "10K" $ maxRate value 10000
benchIOSrc "10M" $ maxRate Stream.parEval id value 10000000
]
, bgroup
"constRate"
[ -- benchIOSrc "10K" $ constRate value 10000
benchIOSrc "1M" $ constRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ constRate Stream.parEval id value 10000000
]
]
]

o_1_space_ahead :: Int -> [Benchmark]
o_1_space_ahead value =
[ bgroup
"ordered/parEval"
[ benchIOSrc "avgRate/1M"
$ avgRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "minRate/1M"
$ minRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "maxRate/1M"
$ maxRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "constRate/1M"
$ constRate Stream.parEval (Stream.ordered True) value 1000000
]
]

-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------

main :: IO ()
main = runWithCLIOpts defaultStreamSize allBenchmarks

where

allBenchmarks value =
[ bgroup (o_1_space_prefix moduleName)
$ concat [o_1_space_async value, o_1_space_ahead value]]
8 changes: 5 additions & 3 deletions benchmark/lib/Streamly/Benchmark/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module Streamly.Benchmark.Prelude
, transformComposeMapM
, transformMapM
, transformTeeMapM
, transformZipMapM
-- , transformZipMapM
)
where

Expand All @@ -78,7 +78,7 @@ import qualified Data.Foldable as F
import qualified Data.List as List
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.IsStream as IsStream
import qualified Streamly.Internal.Data.Pipe as Pipe
import qualified Streamly.Internal.Data.Stream.Serial as Serial

Expand Down Expand Up @@ -318,9 +318,10 @@ transformTeeMapM t n =
composeN n $
t .
Internal.transform
(Pipe.mapM (\x -> return (x + 1)) `Pipe.tee`
(Pipe.mapM (\x -> return (x + 1)) `Pipe.teeMerge`
Pipe.mapM (\x -> return (x + 2)))

{-
{-# INLINE transformZipMapM #-}
transformZipMapM ::
(S.IsStream t, S.MonadAsync m)
Expand All @@ -336,6 +337,7 @@ transformZipMapM t n =
(+)
(Pipe.mapM (\x -> return (x + 1)))
(Pipe.mapM (\x -> return (x + 2))))
-}

-------------------------------------------------------------------------------
-- Streams of streams
Expand Down
Loading
Loading