Summary: I wrote a Conduit combinator which makes the upstream and downstream run in parallel. It makes Hoogle database generation faster.
The Hoogle database generation parses lines one-by-one using haskell-src-exts, and then encodes each line and writes it to a file. Using Conduit, that ends up being roughly:
parse =$= write
Conduit ensures that parsing and writing are interleaved, so each line is parsed and written before the next is parsed - ensuring minimal space usage. Recently the FP Complete guys profiled Hoogle database generation and found each of these pieces takes roughly the same amount of time, and together are the bottleneck. Therefore, it seems likely that if we could parse the next line while writing the previous line we should be able to speed up database generation. I think of this as analogous to CPU pipelining, where the next instruction is decoded while the current one is executed.
I came up with the combinator:
pipelineC :: Int -> Consumer o IO r -> Consumer o IO r
Allowing us to write:
parse =$= pipelineC 10 write
Given a buffer size 10
(the maximum number of elements in memory simultaneously), and a Consumer
(write
), produce a new Consumer
which is roughly the same but runs in parallel to its upstream (parse
).
The Result
When using 2 threads the Hoogle 5 database creation drops from 45s to 30s. The CPU usage during the pipelined stage hovers between 180% and 200%, suggesting the stages are quite well balanced (as the profile suggested). The parsing stage is currently a little slower than the writing, so a buffer of 10 is plenty - increasing the buffer makes no meaningful difference. The reason the drop in total time is only by 33% is that the non-pipelined steps (parsing Cabal files, writing summary information) take about 12s.
Note that Hoogle 5 remains unreleased, but can be tested from the git repo and will hopefully be ready soon.
The Code
The idea is to run the Consumer
on a separate thread, and on the main thread keep pulling elements (using await
) and pass them to the other thread, without blocking the upstream yield
. The only tricky bit is what to do with exceptions. If the consumer thread throws an exception we have to get that back to the main thread so it can be dealt with normally. Fortunately async exceptions fit the bill perfectly. The full code is:
pipelineC :: Int -> Consumer o IO r -> Consumer o IO r
pipelineC buffer sink = do
sem <- liftIO $ newQSem buffer -- how many are in flow, to avoid excess memory
chan <- liftIO newChan -- the items in flow (type o)
bar <- liftIO newBarrier -- the result type (type r)
me <- liftIO myThreadId
liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
runConduit $
(whileM $ do
x <- liftIO $ readChan chan
liftIO $ signalQSem sem
whenJust x yield
return $ isJust x) =$=
sink
awaitForever $ \x -> liftIO $ do
waitQSem sem
writeChan chan $ Just x
liftIO $ writeChan chan Nothing
liftIO $ waitBarrier bar
We are using a channel chan
to move elements from producer to consumer, a quantity semaphore sem
to limit the number of items in the channel, and a barrier bar
to store the return result (see about the barrier type). On the consumer thread we read from the channel and yield
to the consumer. On the main thread we awaitForever
and write to the channel. At the end we move the result back from the consumer thread to the main thread. The full implementation is in the repo.
Enhancements
I have specialised pipelineC
for Consumer
s that run in the IO
monad. Since the Consumer
can do IO
, and the order of that IO
has changed, it isn't exactly equivalent - but relying on such IO
timing seems to break the spirit of Conduit
anyway. I suspect pipelineC
is applicable in some other moands, but am not sure which (ReaderT
and ResourceT
seem plausible, StateT
seems less likely).
Acknowledgements: Thanks to Tom Ellis for helping figure out what type pipelineC
should have.
4 comments:
Thank you for sharing. I think this is basically the same as buffer and buffer' from Data.Conduit.Async from the package stm-conduit, which are more like buffered versions of ($$) and (=$=).
https://hackage.haskell.org/package/stm-conduit-2.6.1/docs/Data-Conduit-Async.html#v:buffer
Bastian: Thanks, yes, I think what I've written is approximately buffer. However, it's a bit hard to tell, because none of the type classes that the constraints require are linked to, so I have no idea what CCatable allows.
Is there a solution to the termination problem?
I see that both your pipelineC and stm-conduit have basically the same approach: consume the source in a separate thread and push into the channel, then "main" thread consumes from the channel.
Now assume that I want to stop consuming after some point. How do underline threads (the ones that populate the channel) know about it and stop?
Alexey: I guess you'd need to use finalisation to kill the thread? Not necessary for my purpose, so I didn't look into it, but I'm sure it's possible.
Post a Comment