Thursday, July 30, 2015

Parallel/Pipelined Conduit

Summary: I wrote a Conduit combinator which makes the upstream and downstream run in parallel. It makes Hoogle database generation faster.

The Hoogle database generation parses lines one-by-one using haskell-src-exts, and then encodes each line and writes it to a file. Using Conduit, that ends up being roughly:

parse =$= write

Conduit ensures that parsing and writing are interleaved, so each line is parsed and written before the next is parsed - ensuring minimal space usage. Recently the FP Complete guys profiled Hoogle database generation and found each of these pieces takes roughly the same amount of time, and together are the bottleneck. Therefore, it seems likely that if we could parse the next line while writing the previous line we should be able to speed up database generation. I think of this as analogous to CPU pipelining, where the next instruction is decoded while the current one is executed.

I came up with the combinator:

 pipelineC :: Int -> Consumer o IO r -> Consumer o IO r

Allowing us to write:

 parse =$= pipelineC 10 write

Given a buffer size 10 (the maximum number of elements in memory simultaneously), and a Consumer (write), produce a new Consumer which is roughly the same but runs in parallel to its upstream (parse).

The Result

When using 2 threads the Hoogle 5 database creation drops from 45s to 30s. The CPU usage during the pipelined stage hovers between 180% and 200%, suggesting the stages are quite well balanced (as the profile suggested). The parsing stage is currently a little slower than the writing, so a buffer of 10 is plenty - increasing the buffer makes no meaningful difference. The reason the drop in total time is only by 33% is that the non-pipelined steps (parsing Cabal files, writing summary information) take about 12s.

Note that Hoogle 5 remains unreleased, but can be tested from the git repo and will hopefully be ready soon.

The Code

The idea is to run the Consumer on a separate thread, and on the main thread keep pulling elements (using await) and pass them to the other thread, without blocking the upstream yield. The only tricky bit is what to do with exceptions. If the consumer thread throws an exception we have to get that back to the main thread so it can be dealt with normally. Fortunately async exceptions fit the bill perfectly. The full code is:

pipelineC :: Int -> Consumer o IO r -> Consumer o IO r
pipelineC buffer sink = do
    sem <- liftIO $ newQSem buffer  -- how many are in flow, to avoid excess memory
    chan <- liftIO newChan          -- the items in flow (type o)
    bar <- liftIO newBarrier        -- the result type (type r)
    me <- liftIO myThreadId
    liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
        runConduit $
            (whileM $ do
                x <- liftIO $ readChan chan
                liftIO $ signalQSem sem
                whenJust x yield
                return $ isJust x) =$=
            sink
    awaitForever $ \x -> liftIO $ do
        waitQSem sem
        writeChan chan $ Just x
    liftIO $ writeChan chan Nothing
    liftIO $ waitBarrier bar

We are using a channel chan to move elements from producer to consumer, a quantity semaphore sem to limit the number of items in the channel, and a barrier bar to store the return result (see about the barrier type). On the consumer thread we read from the channel and yield to the consumer. On the main thread we awaitForever and write to the channel. At the end we move the result back from the consumer thread to the main thread. The full implementation is in the repo.

Enhancements

I have specialised pipelineC for Consumers that run in the IO monad. Since the Consumer can do IO, and the order of that IO has changed, it isn't exactly equivalent - but relying on such IO timing seems to break the spirit of Conduit anyway. I suspect pipelineC is applicable in some other moands, but am not sure which (ReaderT and ResourceT seem plausible, StateT seems less likely).

Acknowledgements: Thanks to Tom Ellis for helping figure out what type pipelineC should have.

4 comments:

Anonymous said...

Thank you for sharing. I think this is basically the same as buffer and buffer' from Data.Conduit.Async from the package stm-conduit, which are more like buffered versions of ($$) and (=$=).

https://hackage.haskell.org/package/stm-conduit-2.6.1/docs/Data-Conduit-Async.html#v:buffer

Neil Mitchell said...

Bastian: Thanks, yes, I think what I've written is approximately buffer. However, it's a bit hard to tell, because none of the type classes that the constraints require are linked to, so I have no idea what CCatable allows.

Alexey Raga said...

Is there a solution to the termination problem?
I see that both your pipelineC and stm-conduit have basically the same approach: consume the source in a separate thread and push into the channel, then "main" thread consumes from the channel.
Now assume that I want to stop consuming after some point. How do underline threads (the ones that populate the channel) know about it and stop?

Neil Mitchell said...

Alexey: I guess you'd need to use finalisation to kill the thread? Not necessary for my purpose, so I didn't look into it, but I'm sure it's possible.