## Sunday, June 29, 2014

### Optimisation with Continuations

Summary: Continuations are confusing. Here we solve a simple problem (that is at the heart of the Shake build system) using continuations.

Imagine we are given two `IO a` computations, and want to run them both to completion, returning the first `a` value as soon as it is produced (let's ignore exceptions). Writing that in Haskell isn't too hard:

``````parallel :: IO a -> IO a -> IO a
parallel t1 t2 = do
once <- newOnce
var <- newEmptyMVar
forkIO \$ t1 >>= once . putMVar var
forkIO \$ t2 >>= once . putMVar var
``````

We create an empty variable `var` with `newEmptyMVar`, fire off two threads with `forkIO` to run the computations which write their results to `var`, and finish by reading as soon as a value is available with `readMVar`. We use a utility `newOnce` to ensure that only one of the threads calls `putMVar`, defined as:

``````newOnce :: IO (IO () -> IO ())
newOnce = do
run <- newMVar True
return \$ \act -> do
b <- modifyMVar run \$ \b -> return (False, b)
when b act
``````

Calling `newOnce` produces a function that given an action will either run it (the first time) or ignore it (every time after). Using `newOnce` we only call `putMVar` for the first thread to complete.

This solution works, and Shake does something roughly equivalent (but much more complex) in it's main scheduler. However, this solution has a drawback - it uses two additional threads. Can we use only one additional thread?

For the problem above, running the computations to completion without retrying, you can't avoid two additional threads. To use only one additional thread and run in parallel you must run one of the operations on the calling thread - but if whatever you run on the additional thread finishes first, there's no way to move the other computation off the the calling thread and return immediately. However, we can define:

``````type C a = (a -> IO ()) -> IO ()
``````

Comparing `IO a` to `C a`, instead of returning an `a`, we get given a function to pass the `a` to (known as a continuation). We still "give back" the `a`, but not as a return value, instead we pass it onwards to a function. We assume that the continuation is called exactly once. We can define `parallel` on `C`:

``````parallel :: C a -> C a -> C a
parallel t1 t2 k = do
once <- newOnce
forkIO \$ t1 (once . k)
t2 (once . k)
``````

This definition takes the two computations to run (`t1` and `t2`), plus the continuation `k`. We fork a separate thread to run `t1`, but run `t2` on the calling thread, using only one additional thread. While the `parallel` function won't return until after `t2` completes, subsequent processing using the `a` value will continue as soon as either finishes.

Looking at the transformers package, we see Control.Monad.Trans.Cont contains `ContT`, which is defined as:

``````newtype ContT r m a = ContT {runContT :: (a -> m r) -> m r}
``````

If we use `r` for `()` and `IO` for `m` then we get the same type as `C`. We can redefine `C` as:

``````type C a = ContT () IO a
``````

The changes to `parallel` just involve wrapping with `ContT` and unwrapping with `runContT`:

``````parallel :: C a -> C a -> C a
parallel t1 t2 = ContT \$ \k -> do
once <- newOnce
forkIO \$ runContT t1 (once . k)
runContT t2 (once . k)
``````

Now we've defined our `parallel` function in terms of `C`, it is useful to convert between `C` and `IO`:

``````toC :: IO a -> C a
toC = liftIO

fromC :: C a -> IO a
fromC c = do
var <- newEmptyMVar
forkIO \$ runContT c \$ putMVar var
``````

The `toC` function is already defined by `ContT` as `liftIO`. The `fromC` function needs to change from calling a callback on any thread, to returning a value on this thread, which we can do with a `forkIO` and `MVar`. Given `parallel` on `IO` takes two additional threads, and `parallel` on `C` takes only one, it's not too surprising that converting `IO` to `C` requires an additional thread.

Threads in Haskell are very cheap, and many people won't care about one additional thread. However, each thread comes with a stack, which takes memory. The stack starts off small (1Kb) and grows/shrinks in 32Kb chunks, but if it ever exceeds 1Kb, it never goes below 32Kb. For certain tasks (e.g. Shake build rules) often some operation will take a little over 1Kb in stack. Since each active rule (started but not finished) needs to maintain a stack, and for huge build systems there can be 30K active rules, you can get over 1Gb of stack memory. While stacks and threads are cheap, they aren't free.

The plan for Shake

Shake currently has one thread per active rule, and blocks that thread until all dependencies have rebuilt. The plan is to switch to continuations and only have one thread per rule executing in parallel. This change will not require any code changes to Shake-based build systems, hopefully just reduce memory usage. Until then, huge build systems may wish to pass `+RTS -kc8K`, which can save several 100Mb of memory.

Anonymous said...

Is this type signature messed up? It looks like it's missing the type for k to me:

parallel :: C a -> C a -> C a
parallel t1 t2 k = do ...

Neil Mitchell said...

C a is a type alias for (a -> IO ()) -> IO (), so you can rewrite it as:

parallel :: C a -> C a -> (a -> IO ()) -> IO ()

Now you can see where the k comes from.

Thomas Schilling said...

When I built my clone of Shake (that was shortly before you published your personal rewrite) I ended up using an explicit data type to represent the current state of a rule. Some of those did include a continuation.

https://github.com/nominolo/cake/blob/master/src/Development/Cake/Core.hs#L732

Neil Mitchell said...

Thomas: Very interesting to see. Did it work in practice? How did you cope with exceptions? Any general hardwon advice?

Thomas Schilling said...

It's been too long ago to remember the details.

I mostly did it because that was the most understandable way to design it. Actually, I looked at Max's Openshake and couldn't really make sense of it's inner workings. So, after thinking about how it should work conceptually, explicitly enumerating the states each Action could be in seemed to be the way that seemed to be easiest to follow (also for potential contributors).

I don't think I ever finished the code. Looking through the code, exceptions handling was never fully implemented, so I don't know if it can be made to work correctly.

The key driver is https://github.com/nominolo/cake/blob/master/src/Development/Cake/Core.hs#L460

Maybe you can see some fatal flaw in it :)

J. Maessen said...

This observation – that running the continuation on the first thread to complete lets you use one less thread – is very similar to how the work-stealing scheduler in Cilk works. There we want to run both pieces of work to completion, but the first thread to finish wants to go off and find other work to do. So we actually steal the continuation of pending work.

Sébastien Bocq said...

I thought also forkIO would be so cheap already that you wouldn't have to resort to CPS in Haskell.

This reminds me of a project I did in Scala a few years ago to support user-level green threads. Scala being impure, CPS with effects can be directly expressed as:
(a => ()) => ()

To cope with user-level threads and exceptions, the signature had to be changed to:
((t, a) => ()) => ()
Where t is a user-level thread. It is used to schedule back continuations sequentially (only for processing results!) and it acts also as an escape hatch in case of exception by implementing the function to handle them:
Exception => ().

After, I went on adding automatic resource control, and so on... If you are curious about it, the details are explained in the paper here:
https://github.com/molecule-labs/molecule#publication