Tuesday, September 22, 2015

Three Space Leaks

Summary: Using the technique from the previous post, here are three space leaks I found.

Every large Haskell program almost inevitably contains space leaks. This post examines three space leaks I found while experimenting with a space-leak detection algorithm. The first two space leaks have obvious causes, but I remain mystified by the third.

Hoogle leak 1

The motivation for looking at space leak detection tools was that Hoogle 5 kept suffering space leaks. Since Hoogle 5 is run on a virtual machine with only 1Gb of RAM, a space leak will often cause it to use the swap file for the heap, and destroy performance. I applied the detection techniques to the hoogle generate command (which generates the databases), which told me that writeDuplicates took over 100K of stack. The body of writeDuplicates is:

xs <- return $ map (second snd) $ sortOn (fst . snd) $ Map.toList $
    Map.fromListWith (\(x1,x2) (y1,y2) -> (min x1 y1, x2 ++ y2))
                     [(s,(p,[t])) | (p,(t,s)) <- zip [0::Int ..] xs]
storeWrite store TypesDuplicates $ jaggedFromList $ map (reverse . snd) xs
return $ map fst xs

I don't expect readers to understand the purpose of the code, but it is interesting to consider if you can spot the space leak, and if you'd have realised so while writing the code.

In order to narrow down the exact line, I inserted evaluate $ rnf ... between each line, along with print statements. For example:

print "step 1"
evaluate $ rnf xs
print "step 2"
xs <- return $ map (second snd) $ sortOn (fst . snd) $ Map.toList $
    Map.fromListWith (\(x1,x2) (y1,y2) -> (min x1 y1, x2 ++ y2))
                     [(s,(p,[t])) | (p,(t,s)) <- zip [0::Int ..] xs]
evaluate $ rnf xs
print "step 3"   
storeWrite store TypesDuplicates $ jaggedFromList $ map (reverse . snd) xs
print "step 4"
let res = map fst xs
evaluate $ rnf res
print "step 5"
return res

(Debugging tip: always use print for debugging and never for real code, that way getting rid of all debugging output is easy.) It failed after printing step 2, but before printing step 3. Pulling each subexpression out and repeating the evaluate/rnf pattern I reduced the expression to:

Map.fromListWith (\(x1,x2) (y1,y2) -> (min x1 y1, x2 ++ y2)) xs

The fromListWith function essentially performs a foldl over values with duplicate keys. I was using Data.Map.Strict, meaning it the fold was strict, like foldl'. However, the result is a pair, so forcing the accumulator only forces the pair itself, not the first component, which contains a space leak. I effectively build up min x1 (min x1 (min x1 ... in the heap, which would run faster and take less memory if reduced eagerly. I solved the problem with:

Map.fromListWith (\(x1,x2) (y1,y2) -> (, x2 ++ y2) $! min x1 y1) xs

After that the stack limit could be reduced a bit. Originally fixed in commit 102966ec, then refined in 940412cf.

Hoogle leak 2

The next space leak appeared in the function:

spreadNames (reverse . sortOn snd -> xs@((_,limit):_)) =
    check $ f (99 + genericLength xs) maxBound xs
        check xs | all (isCon . snd) xs && length (nubOrd $ map snd xs) == length xs = xs
                 | otherwise = error "Invalid spreadNames"

        -- I can only assign values between mn and mx inclusive
        f :: Word16 -> Word16 -> [(a, Int)] -> [(a, Name)]
        f !mn !mx [] = []
        f mn mx ((a,i):xs) = (a, Name real) : f (mn-1) (real-1) xs
            where real = fromIntegral $ max mn $ min mx ideal
                  ideal = mn + floor (fromIntegral (min commonNameThreshold i) * fromIntegral (mx - mn) / fromIntegral (min commonNameThreshold limit))

I had already added ! in the definition of f when writing it, on the grounds it was likely a candidate for space leaks (an accumulating map), so was immediately suspicious that I hadn't got it right. However, adding bang patterns near real made no difference, so I tried systematically reducing the bug.

Since this code isn't in IO, the evaluate technique from the previous leak doesn't work. Fortunately, using seq works, but is a bit more fiddly. To check the argument expression (reverse . sortOn) wasn't leaking I made the change:

spreadNames (reverse . sortOn snd -> xs@((_,limit):_)) =
    rnf xs `seq` trace "passed xs" (check $ f (99 + genericLength xs) maxBound xs)

I was slightly worried that the GHC optimiser may break the delicate seq/trace due to imprecise exceptions, but at -O0 that didn't happen. Successive attempts at testing different subexpressions eventually lead to genericLength xs, which in this case returns a Word16. The definition of genericLength reads:

genericLength []        =  0
genericLength (_:l)     =  1 + genericLength l

Alas, a very obvious space leak. In addition, the base library provides two rules:

  "genericLengthInt"     genericLength = (strictGenericLength :: [a] -> Int);
  "genericLengthInteger" genericLength = (strictGenericLength :: [a] -> Integer);

If you use genericLength on Int or Integer then it is replaced with a strict version without a space leak - but on Word16 the space leak remains. To solve this space leak I replaced genericLength xs with fromIntegral (length xs) in commit 12c46e93, which worked. After that change, the Hoogle test suite can be run with 1Kb of stack - a test that has been added to the continuous integration.

Shake leak

After solving the space leak from the original post, I was then able to run the entire test suite with 1Kb stack on my Windows machine. I made that an RTS option to the Cabal test suite, and my Linux continuous integration started failing. Further experimentation on a Linux VM showed that:

  • The entire test failed at 50K, but succeeded at 100K.
  • The excessive stack usage could be replicated with only two of the tests - the tar test followed by the benchmark test. The tar test is incredibly simple and likely any of the tests before before benchmark would have triggered the issue.
  • The tests succeeded in 1K if running benchmark followed by tar.

The initial assumption was that some CAF was being partially evaluated or created by the first test, and then used by the second, but I have yet to find any evidence of that. Applying -xc suggested a handful of possible sites (as Shake catches and rethrows exceptions), but the one that eventually lead to a fix was extractFileTime, defined as:

extractFileTime x = ceiling $ modificationTimeHiRes x * 1e4

And called from:

getFileInfo x = handleBool isDoesNotExistError (const $ return Nothing) $ do
    s <- getFileStatus $ unpackU_ x
    return $ Just (fileInfo $ extractFileTime s, fileInfo $ fromIntegral $ fileSize s)

There is a small (constant sized) space leak here - the result does not force extractTime, but returns a pair containing thunks. In fact, getFileStatus from the unix library allocates a ForeignPtr to store s, so by not forcing the pair we cause the ForeignPtr to live much longer than would be otherwise required. The fix from commit 2ee36a99 is simple:

getFileInfo x = handleBool isDoesNotExistError (const $ return Nothing) $ do
    s <- getFileStatus $ unpackU_ x
    a <- evaluate $ fileInfo $ extractFileTime s
    b <- evaluate $ fileInfo $ fromIntegral $ fileSize s
    return $! Just $! (a, b)

Afterwards the entire Shake test suite can be run in 1K. Since getFileInfo is different on Windows vs Linux, I understand why the space leak doesn't occur on Windows. What I still don't understand is:

  • How does running one test first cause the space leak in the second test?
  • How does what looks like a small space leak result in over 49K additional stack space?
  • Is the fact that ForeignPtr is involved behind the scenes somehow relevant?

I welcome any insights.

Monday, September 21, 2015

Detecting Space Leaks

Summary: Below is a technique for easily detecting space leaks. It's even found a space leak in the base library.

Every large Haskell program almost inevitably contains space leaks. Space leaks are often difficult to detect, but relatively easy to fix once detected (typically insert a !). Working with Tom Ellis, we found a fairly simple method to detect such leaks. These ideas have detected four space leaks so far, including one in the base library maximumBy function, which has now been fixed. For an introduction to space leaks, see this article.

Our approach is based around the observation that most space leaks result in an excess use of stack. If you look for the part of the program that results in the largest stack usage, that is the most likely space leak, and the one that should be investigated first.


Given a program, and a representative run (e.g. the test suite, a suitable input file):

  • Compile the program for profiling, e.g. ghc --make Main.hs -rtsopts -prof -auto-all.
  • Run the program with a specific stack size, e.g. ./Main +RTS -K100K to run with a 100Kb stack.
  • Increase/decrease the stack size until you have determined the minimum stack for which the program succeeds, e.g. -K33K.
  • Reduce the stack by a small amount and rerun with -xc, e.g. ./Main +RTS -K32K -xc.
  • The -xc run will print out the stack trace on every exception, look for the one which says stack overflow (likely the last one) and look at the stack trace to determine roughly where the leak is.
  • Attempt to fix the space leak, confirm by rerunning with -K32K.
  • Repeat until the test works with a small stack, typically -K1K.
  • Add something to your test suite to ensure that if a space leak is ever introduced then it fails, e.g. ghc-options: -with-rtsopts=-K1K in Cabal.

I have followed these steps for Shake, Hoogle and HLint, all of which now contain -K1K in the test suite or test scripts.

Example: Testing on Shake

Applying these techniques to the Shake test suite, I used the run ./shake-test self test, which compiles Shake using Shake. Initially it failed at -K32K, and the stack trace produced by -xc was:

*** Exception (reporting due to +RTS -xc): (THUNK_STATIC), stack trace:
  called from Development.Shake.Profile.writeProfile,
  called from Development.Shake.Core.run.\.\.\,
  called from Development.Shake.Core.run.\.\,
  called from Development.Shake.Database.withDatabase.\,
  called from Development.Shake.Storage.withStorage.continue.\,
  called from Development.Shake.Storage.flushThread,
  called from Development.Shake.Storage.withStorage.continue,
  called from Development.Shake.Storage.withStorage.\,
  called from General.FileLock.withLockFile.\,
  called from General.FileLock.withLockFile,
  called from Development.Shake.Storage.withStorage,
  called from Development.Shake.Database.withDatabase,
  called from Development.Shake.Core.run.\,
  called from General.Cleanup.withCleanup,
  called from Development.Shake.Core.lineBuffering,
  called from Development.Shake.Core.run,
  called from Development.Shake.Shake.shake,
  called from Development.Shake.Args.shakeArgsWith,
  called from Test.Type.shakeWithClean,
  called from Test.Type.shaken.\,
  called from Test.Type.noTest,
  called from Test.Type.shaken,
  called from Test.Self.main,
  called from Test.main,
  called from :Main.CAF:main
stack overflow

Looking at the generateSummary function, it takes complete profile information and reduces it to a handful of summary lines. As a typical example, one line of the output is generated with the code:

let f xs = if null xs then "0s" else (\(a,b) -> showDuration a ++ " (" ++ b ++ ")") $ maximumBy (compare `on` fst) xs in
    "* The longest rule takes " ++ f (map (prfExecution &&& prfName) xs) ++
    ", and the longest traced command takes " ++ f (map (prfTime &&& prfCommand) $ concatMap prfTraces xs) ++ "."

Most of the code is map, maximum and sum in various combinations. By commenting out pieces I was able to still produce the space leak using maximumBy alone. By reimplementing maximumBy in terms of foldl', the leak went away. Small benchmarks showed this space leak was a regression in GHC 7.10, which I reported as GHC ticket 10830. To fix Shake, I added the helper:

maximumBy' cmp = foldl1' $ \x y -> if cmp x y == GT then x else y

After switching to maximumBy' I was able to reduce the stack to -K1K. While this space leak was not problematic in practice (it's rarely used code which isn't performance sensitive), it's still nice to fix. I modified the Shake test suite to pass -K1K so if I ever regress I'll get an immediate notification. (Shake actually had one additional Linux-only space leak, also now fixed, but that's a tale for a future post.)


This method has found several space leaks - two in Shake and two in Hoogle (I also ran it on HLint, which had no failures). However, there are a number of caveats:

  • GHC's strictness analyser often removes space leaks by making accumulators strict, so -O2 tends to remove some space leaks, and profiling may reinsert them by blocking optimisations. I currently check my code using -O0, but using libraries I depend on with whatever optimisation they install with by default. By ensuring optimisations do not remove space leaks, it is less likely that minor code tweaks will introduce space leaks due to missed optimisations.
  • The stack trace produced by -xc omits duplicate adjacent elements, which is often the interesting information when debugging a stack overflow. In practice, it's a little inconvenient, but not terrible. Having GHC provide repetition counts (e.g. Main.recurse *12) would be useful.
  • The stack traces don't entries for things in imported libraries, which is unfortunate, and often means the location of the error is a 20 line function instead of the exact subexpression. The lack of such information makes fixing leaks take a little longer.
  • The -xc flag prints stack information on all exceptions, which are often numerous. Lots of IO operations make use of exceptions even when they succeed. As a result, it's often easier to run without -xc to figure out the stack limit, then turn -xc on. Usually the stack overflow exception is near the end.
  • There are sometimes a handful of exceptions after the stack overflow, as various layers of the program catch and rethrow the exception. For programs that catch exceptions and rethrow them somewhat later (e.g. Shake), that can sometimes result in a large number of exceptions to wade through. It would be useful if GHC had an option to filter -xc to only certain types of exception.
  • Some functions in the base libraries are both reasonable to use and have linear stack usage - notably mapM. For the case of mapM in particular you may wish to switch to a constant stack version while investigating space leaks.
  • This technique catches a large class of space leaks, but certainly not all. As an example, given a Map Key LargeValue, if you remove a single Key but don't force the Map, it will leak a LargeValue. When the Map is forced it will take only a single stack entry, and thus not be detected as a leak. However, this technique would have detected a previous Shake space leak, as it involved repeated calls to delete.


If anyone manages to find space leaks using this technique we would be keen to know. I have previously told people that there are many advantages to lazy programming languages, but that space leaks are the big disadvantage. With the technique above, I feel confident that I can now reduce the number of space leaks in my code.


Pepe Iborra suggested two tips to make this trick even more useful:

  • Instead of -xc, I find it's much better to catch for StackOverflow exceptions in main, and then print the stack trace using GHC.Stack.currentCallStack
  • For imported libraries, you can cabal unpack them and extend the .cabal descriptor Library section with a ghc-prof-options entry that enables -auto-all.

Monday, September 14, 2015

Making sequence/mapM for IO take O(1) stack

Summary: We have a version of mapM for IO that takes O(1) stack and is faster than the standard Haskell/GHC one for long lists.

The standard Haskell/GHC base library sequence function in IO takes O(n) stack space. However, working with Tom Ellis, we came up with a version that takes O(1) stack space. Our version is slower at reasonable sizes, but faster at huge sizes (100,000+ elements). The standard definition of sequence (when specialised for both IO and []) is equivalent to:

sequence :: [IO a] -> IO [a]
sequence [] = return []
sequence (x:xs) = do y <- x; ys <- sequence xs; return (y:ys)

Or, when rewritten inlining the monadic bind and opening up the internals of GHC's IO type, becomes:

sequence :: [IO a] -> IO [a]
sequence [] = IO $ \r -> (# r, () #)
sequence (y:ys) = IO $ \r -> case unIO y r of
    (# r, y #) -> case unIO (sequence xs) r of
        (# r, ys #) -> (# r, y:ys #)

For those not familiar with IO, it is internally defined as:

newtype IO a = IO (State# RealWorld -> (# State# RealWorld, a #)

Each IO action takes a RealWorld token and returns a RealWorld token, which ensures that IO actions run in order. See here for a full tutorial.

Our observation was that this version requires O(n) stack space, as each recursive call is performed inside a case. The algorithm proceeds in two phases:

  • First, it traverses the input list, evaluating each action and pushing y on the stack.
  • After reaching the [] at the end of the list, it traverses the stack constructing the output list.

By constructing the list directly on the heap we can avoid the extra copy and use O(1) stack. Our version is:

sequenceIO :: [IO a] -> IO [a]
sequenceIO xs = do
        ys <- IO $ \r -> (# r, apply r xs #)
        evaluate $ demand ys
        return ys
        apply r [] = []
        apply r (IO x:xs) = case x r of
            (# r, y #) -> y : apply r xs

        demand [] = ()
        demand (x:xs) = demand xs

Here the two traversals are explicit:

  • First, we traverse the list using apply. Note that we pass the RealWorld token down the list (ensuring the items happen in the right order), but we do not pass it back.
  • To ensure all the IO actions performed during apply happen before we return any of the list, we then demand the list, ensuring the [] element has been forced.

Both these traversals use O(1) stack. The first runs the actions and constructs the list. The second ensures evaluation has happened before we continue. The trick in this algorithm is:

ys <- IO $ \r -> (# r, apply r xs #)

Here we cheat by duplicating r, which is potentially unsafe. This line does not evaluate apply, merely returns a thunk which when evaluated will force apply, and cause the IO to happen during evaluation, at somewhat unspecified times. To regain well-defined evaluation order we force the result of apply on the next line using demand.


We benchmarked using GHC 7.10.2, comparing the default sequence (which has identical performance to the specialised monomorphic variant at the top of this post), and our sequenceIO. We benchmarked at different lengths of lists. Our sequenceIO is twice as slow at short lists, draws even around 10,000-100,000 elements, and goes 40% faster by 1,000,000 elements.

Our algorithm saves allocating stack, at the cost of iterating through the list twice. It is likely that by tuning the stack allocation flags the standard algorithm would be faster everywhere.

Using sequence at large sizes

Despite improved performance at large size, we would not encourage people to use sequence or mapM at such large sizes - these functions still require O(n) memory. Instead:

  • If you are iterating over the elements, consider fusing this stage with subsequence stages, so that each element is processed one-by-one. The conduit and pipes libraries both help with that approach.
  • If you are reducing the elements, e.g. performing a sum, consider fusing the mapM and sum using something like foldM.

For common operations, such a concat after a mapM, an obvious definition of concatMapM is:

concatMapM :: Monad m => (a -> m [b]) -> [a] -> m [b]
concatMapM f = liftM concat . mapM f

But that if the result of the argument is regularly [] then a more efficient version is:

concatMapM op = foldr f (return [])
    where f x xs = do x <- op x
                      if null x then xs else liftM (x ++) xs

For lists of 10,000 elements long, using the function const (return []), this definition is about 4x faster. Version 1.4.2 of the extra library uses this approach for both concatMapM and mapMaybeM.

Saturday, August 15, 2015

Testing is never enough

Summary: Testing shows the presence, not the absence of bugs.

Recently, someone suggested to me that, thanks to test suites, things like changing compiler version or versions of library dependencies was "no big deal". If dependency changes still result in a passing test suite, then they have caused no harm. I disagree, and fortunately for me, Dijkstra explains it far more eloquently than I ever could:

Testing shows the presence, not the absence of bugs. Dijkstra (1969)

While a test suite can give you confidence in changes you make, it does not provide guarantees. Below are just a few reasons why.

The test suite does not cover all the code

For any reasonably sized code base (> 100 lines), covering all the lines of code is difficult. There are a number of factors that mean that mean a test suite is unlikely to provide 100% coverage:

  • Producing tests is a resource intensive activity, and most projects do not have the necessary manpower to test everything.
  • Sometimes there is no good way to test simple sugar functions - the definition is a specification of what the function should do.
  • Testing corner cases is difficult. As the corners get more obscure, the difficulty increases.
  • Testing error conditions is even harder. Some errors conditions have code to deal with them, but are believed to be unreachable.

The test suite does not cover all the ways through the code

Assuming the test suite really does cover every line of the code, making it cover every path through the code is almost certainly computationally infeasible. Consider a program taking a handful of boolean options. While it might be feasible to test each individual option in the true and false states, testing every state in conjunction with every other state requires an exponential amount of time. For programs with loops, testing every number of loop iterations is likely to be highly time consuming.

There is plenty of code you can't see

Even if you cover every line of source code, the compiler may still thwart your valiant efforts. Optimising compilers like to inline code (make copies of it) and specialise code (freeze in some details that would otherwise be dynamic). After such transformations, the compiler might spot undefined behaviour (something almost all C/C++ programs contain) and make modifications that break your code. You might have tested all the source code, but you have not tested all the code generated by the compiler. If you are writing in C/C++, and undefined behaviour and optimisation doesn't scare you a lot, you should read this LLVM article series.

Functions have huge inputs

Testing functions typically involves supplying their input and inspecting their output. Usually the input space is too large to enumerate - which is likely to be the case even if your function takes in an integer. As soon as your function takes a string or array, enumeration is definitely infeasible. Often you can pick cases at which the code is likely to go wrong (0, 1, -1, maxBound) - but maybe it only fails for Carmichael numbers. Random testing can help, and is always advisable, but the effort to deploy random testing is typically quite a bit higher than input/output samples, and it is no panacea.

Functions are not functions

Testing functions usually assumes they really are functions, which depend only on their input. In pure functional languages that is mostly true, but in C/C++ it is less common. For example, functions that have an internal cache might behave differently under parallelism, especially if their cache is not managed properly. Functions may rely on global variables, so they might perform correctly until some seemingly unrelated operation is performed. Even Haskell programs are likely to depend on global state such as the FPU flags, which may be changed unexpectedly by other code.

In my experience, the non-functional nature of functions is one of the biggest practical difficulties, and is also a common place where dependency changes cause frustration. Buggy code can work successfully for years until an improved memory allocator allows a race condition to be hit.

Performance testing is hard

Even if your code gives the correct results, it may take too long or use too much memory. Alas, testing for resource usage is difficult. Resource numbers, especially runtime, are often highly variable between runs - more so if tests are run on shared hardware or make use of parallelism. Every dependency change is likely to have some impact on resource usage, perhaps as dependencies themselves chose to trade time for memory. Spotting erroneous variations often requires a human to make a judgement call.

What is the solution?

Tests help, and are valuable, and you should aim to test as much as you can. But for any reasonably sized program, your tests will never be complete, and the program will always contain unknown bugs. Most likely someone using your code will stumble across one of these bugs. In this case, it's often possible (and indeed, highly desirable) to add a new test case specifically designed to spot this error. Bugs have a habit of recurring, and a bug that happens twice is just embarrassing.

Thinking back to dependency versions, there is often strength in numbers. If all your users are on the same version of all the dependencies, then any bug that is very common is likely to be found by at least one user and fixed for all.

Thinking more generally, it is clear that many of these issues are somewhat ameliorated by pure functional programming. I consider testability and robustness to be one of the great strengths of Haskell.

Monday, August 10, 2015

Upcoming talk to the Cambridge UK Meetup, Thursday 13 Aug (Shake 'n' Bake)

I'll be talking at the Cambridge NonDysFunctional Programmers Meetup this coming Thursday (13 Aug 2015). Doors open at 7:00pm with talk 7:30-8:30pm, followed by beer/food. I'll be talking about Shake 'n' Bake. The abstract is:

Shake is a Haskell build system, an alternative to Make, but with more powerful and accurate dependencies. I'll cover how to build things with Shake, and why I laugh at non-Monadic build systems (which covers most things that aren't Shake). Shake is an industrial quality library, with a website at http://shakebuild.com.

Bake is a Haskell continuous integration system, an alternative to Travis/Jenkins, but designed for large semi-trusted teams. Bake guarantees that all code arriving in your master branch passes all tests on all platforms, while using as few resources as possible, allowing you to have hours of tests, 100's of commits a day and one a few lonely test servers. Bake is held together with duct tape.

I look forward to seeing people there.

Thursday, July 30, 2015

Parallel/Pipelined Conduit

Summary: I wrote a Conduit combinator which makes the upstream and downstream run in parallel. It makes Hoogle database generation faster.

The Hoogle database generation parses lines one-by-one using haskell-src-exts, and then encodes each line and writes it to a file. Using Conduit, that ends up being roughly:

parse =$= write

Conduit ensures that parsing and writing are interleaved, so each line is parsed and written before the next is parsed - ensuring minimal space usage. Recently the FP Complete guys profiled Hoogle database generation and found each of these pieces takes roughly the same amount of time, and together are the bottleneck. Therefore, it seems likely that if we could parse the next line while writing the previous line we should be able to speed up database generation. I think of this as analogous to CPU pipelining, where the next instruction is decoded while the current one is executed.

I came up with the combinator:

 pipelineC :: Int -> Consumer o IO r -> Consumer o IO r

Allowing us to write:

 parse =$= pipelineC 10 write

Given a buffer size 10 (the maximum number of elements in memory simultaneously), and a Consumer (write), produce a new Consumer which is roughly the same but runs in parallel to its upstream (parse).

The Result

When using 2 threads the Hoogle 5 database creation drops from 45s to 30s. The CPU usage during the pipelined stage hovers between 180% and 200%, suggesting the stages are quite well balanced (as the profile suggested). The parsing stage is currently a little slower than the writing, so a buffer of 10 is plenty - increasing the buffer makes no meaningful difference. The reason the drop in total time is only by 33% is that the non-pipelined steps (parsing Cabal files, writing summary information) take about 12s.

Note that Hoogle 5 remains unreleased, but can be tested from the git repo and will hopefully be ready soon.

The Code

The idea is to run the Consumer on a separate thread, and on the main thread keep pulling elements (using await) and pass them to the other thread, without blocking the upstream yield. The only tricky bit is what to do with exceptions. If the consumer thread throws an exception we have to get that back to the main thread so it can be dealt with normally. Fortunately async exceptions fit the bill perfectly. The full code is:

pipelineC :: Int -> Consumer o IO r -> Consumer o IO r
pipelineC buffer sink = do
    sem <- liftIO $ newQSem buffer  -- how many are in flow, to avoid excess memory
    chan <- liftIO newChan          -- the items in flow (type o)
    bar <- liftIO newBarrier        -- the result type (type r)
    me <- liftIO myThreadId
    liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
        runConduit $
            (whileM $ do
                x <- liftIO $ readChan chan
                liftIO $ signalQSem sem
                whenJust x yield
                return $ isJust x) =$=
    awaitForever $ \x -> liftIO $ do
        waitQSem sem
        writeChan chan $ Just x
    liftIO $ writeChan chan Nothing
    liftIO $ waitBarrier bar

We are using a channel chan to move elements from producer to consumer, a quantity semaphore sem to limit the number of items in the channel, and a barrier bar to store the return result (see about the barrier type). On the consumer thread we read from the channel and yield to the consumer. On the main thread we awaitForever and write to the channel. At the end we move the result back from the consumer thread to the main thread. The full implementation is in the repo.


I have specialised pipelineC for Consumers that run in the IO monad. Since the Consumer can do IO, and the order of that IO has changed, it isn't exactly equivalent - but relying on such IO timing seems to break the spirit of Conduit anyway. I suspect pipelineC is applicable in some other moands, but am not sure which (ReaderT and ResourceT seem plausible, StateT seems less likely).

Acknowledgements: Thanks to Tom Ellis for helping figure out what type pipelineC should have.

Sunday, July 19, 2015

Thoughts on Conduits

Summary: I'm growing increasingly fond of the Conduit library. Here I give my intuitions and some hints I'd have found useful.

Recently I've been working on converting the Hoogle database generation to use the Conduit abstraction, in an effort to reduce the memory and improve the speed. It worked - database generation has gone from 2Gb of RAM to 320Mb, and time has dropped from several minutes (or > 15 mins on memory constrained machines) to 45s. These changes are all in the context of Hoogle 5, which should hopefully be out in a month or so.

The bit that I've converted to Conduit is something that takes in a tarball of one text files per Hackage file, namely the Haddock output with one definition per line (this 22Mb file). It processes each definition, saves it to a single binary file (with compression and some processing), and returns some compact information about the definition for later processing. I don't expect the process to run in constant space as it is accumulating some return information, but it is important that most of the memory used by one definition is released before the next definition. I originally tried lazy IO, and while it somewhat worked, it was hard to abstract properly and very prone to space leaks. Converting to Conduit was relatively easy and is simpler and more robust.

The Conduit model

My mental model for a conduit Conduit a m b is roughly a function [a] -> m [b] - a values go in and b values come out (but interleaved with the monadic actions). More concretely you ask for an a with await and give back a b with yield, doing stuff in the middle in the m Monad.

A piece of conduit is always either running (doing it's actual work), waiting after a yield for the downstream person to ask for more results (with await), or waiting after an await for the upstream person to give the value (with yield). You can think of a conduit as making explicit the demand-order inherent in lazy evaluation.

Things to know about Conduit

I think it's fair to say Conduit shows its history - this is good for people who have been using it for a while (your code doesn't keep breaking), but bad for people learning it (there are lots of things you don't care about). Here are some notes I made:

  • The Data.Conduit module in the conduit library is not the right module to use - it seems generally accepted to use the Conduit module from the conduit-combinators package. However, I decided to build my own conduit-combinators style replacement in the Hoogle tree, see General.Conduit - the standard Conduit module has a lot of dependencies, and a lot of generalisations.
  • Don't use Source or Sink - use Producer and Consumer - the former are just a convenient way to get confusing error messages.
  • Don't use =$ or $=, always use =$= between conduits. The =$= operator is essentially flip (.).
  • Given a Conduit you can run it with runConduit. Alternatively, given a =$= b =$= c you can replace any of the =$= with $$ to run the Conduit as well. I find that a bit ugly, and have stuck to runConduit.
  • Conduit and ConduitM have their type arguments in different orders, which is just confusing. However, generally I use either Conduit (a connector) or Producer (something with a result). You rarely need something with a result and a return value.
  • You call await to see if a value is available to process. The most common bug I've had with conduits is forgetting to make the function processing items recursive - usually you want awaitForever, not just await.
  • The ByteString lines Conduit function was accidentally O(n^2) - I spotted and fixed that. Using difference lists does not necessarily make your code O(n)!

Useful functions

When using Conduit I found a number of functions seemingly missing, so defined them myself.

First up is countC which counts the number of items that are consumed. Just a simple definition on top of sumC.

countC :: (Monad m, Num c) => Consumer a m c
countC = sumC <| mapC (const 1)

While I recommend awaitForever in preference to await, it's occasionally useful to have awaitJust as the single-step awaitForever, if you are doing your own recursion.

awaitJust :: Monad m => (i -> Conduit i m o) -> Conduit i m o
awaitJust act = do
    x <- await
    whenJust x act

I regularly find zipFrom i = zip [i..] very useful in strict languages, and since Conduit can be viewed as a strict version of lazy lists (through very foggy glasses) it's no surprise a Conduit version is also useful.

zipFromC :: (Monad m, Enum c) => c -> Conduit a m (c, a)
zipFromC !i = awaitJust $ \a -> do
    yield (i,a)
    zipFromC (succ i)

Finally, it's useful to zip two conduits. I was surprised how fiddly that was with the standard operators (you have to use newtype wrappers and an Applicative instance), but a simple |$| definition hides that complexity away.

(|$|) :: Monad m => ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1,r2)
(|$|) a b = getZipConduit $ (,) <$> ZipConduit a <*> ZipConduit b

Why not Pipes?

I am curious if all Pipes users get asked "Why not use Conduit?", or if this FAQ is asymmetrical?

I realise pipes are billed as the more "principled" choice for this type of programming, but I've yet to see anywhere Conduit seems fundamentally unprincipled. I use WAI/Warp and http-conduit, so learning Conduit gives me some help there.