Async OCaml with Lwt
Table of Contents
One of the suggestions I got on Reddit when I asked about better approaches to utilising multiple cores was to use Lwt or async. I got some free time at one of the weekends and started with the Lwt manual. I think it is one of the nicest pieces of documentation I have seen so far in the OCaml world.
The problem at hand is described in Run OCaml on 24 cores and Haskell Vs. Go Vs. OCaml Vs.. articles. In a few words, the problem is to read metrics stored in over eight hundred thousand RRD files and dump enriched data points into one huge CSV file. Reading metrics is done via running rrdtool in the daemon mode, passing it some commands and parsing its output.
Before I continue, I should mention that I spent literally no more than half of a day on the Lwt-based solution and it is more than likely that it can be further optimised or that I simply misunderstood/misused some parts of the library. With the disclaimer in place, let's dive in.
Iterating over the list of files
The list of files to be processed is devised using the result of a database
query. For each row in the query result we should build a pattern and feed it
into glob function imported from libc. The function acts somewhat similar
to find command, that is it returns the list of files that match the pattern.
However, the call is blocking and it was bit of a head scratcher for me as to
how to make it run asynchronously. That was right up to the moment I read the
documentation on Lwt_preemptive module which fitted perfectly for that kind
of task. It runs a computation in a separate thread and fulfills the promise
when the thread finishes. All the "dirty" job is done by the library. To quote
the manual
detach f xruns the computationf xin a separate preemptive thread.detachevaluates to anLwtpromise, which is pending until the preemptive thread completes.
And it has the nice signature that made it trivial to embed glob calls into
the rest of the program:
val detach : ('a -> 'b) -> 'a -> 'b Lwt.t
The full listing of the "main" function is as follows
let iterate_rrd_files base_path devices ifaces pool interval = (* function g converts a tuple describing a particular device into rrd_t array and processes each element asynchronously *) let g (id, host, dc) = let process_rrd_array rrd_array = rrd_array (* query_rrd does the main job *) |> Array.map(query_rrd pool interval) |> Array.to_list |> Lwt.join in (* a helper function that wraps the glob call *) let pattern_to_rrds (pattern : string) : rrd_t array = pattern |> Glob.glob (* process/filter the result of the glob call *) |> Array.filter_map (rrd_from_path ifaces (id, host, dc) in make_pattern(base_path, dc, id) |> Lwt_preemptive.detach pattern_to_rrds >>= process_rrd_array in (* concurrently iterate over the list of devices *) Lwt_list.iter_p g devices |> Lwt_main.run
Pool of rrdtool processes
Another interesting feature of Lwt is that it allows to parallelise IO by
creating a pool of workers. In my case the pool was a pool of rrdtool
processes started in the daemon mode.
let create_rrdtool_pool size = Lwt_pool.create size ~dispose: (fun rrdtool -> rrdtool#close >|= ignore) (fun () -> Lwt_process.open_process("", [|"rrdtool"; "-"|]) |> Lwt.return)
I tried to tweak the size of the pool changing it from 1 to n_cores, however
the difference between pool sizes of e.g. 8 and 16 was negligible. I believe
that was because rrdtool was not the bottleneck and 8x parallelisation were
able to keep up with the rest of the program.
And again, as it was in the case of Lwt_preemptive I couldn't help but
appreciate how handy the library was, having exactly the tool I needed.
Processing rrdtool output
The last piece of the puzzle was to send commands to and read the output of
rrdtool. The tricky part there was that the output (after some processing)
should go into CSV file as a contiguous block
let query_rrd pool interval rrd = (* returns either (Some datapoint_line) Lwt.t or None Lwt.t *) let get_line rrdtool () = Lwt_io.read_line rrdtool#stdout >|= check_response in let q rrdtool = (* prepare command *) let cmd = fmt_cmd rrd.path interval in (* write it to the tool's stdin *) Lwt_io.write_line rrdtool#stdin cmd >>= fun () -> Lwt_io.flush rrdtool#stdin (* and read/parse lines till we get to the end of response sending the result directly to stdout *) >>= fun () -> Lwt_stream.from (get_line rrdtool) |> Lwt_stream.iter_s (Lwt_io.write_line Lwt_io.stdout) in Lwt_pool.use pool q
It wasn't quite clear to me how to achieve that continuity in the async world
without converting a list of promises into a promise containing the result
list. I used Lwt_stream.from/iter_s because it seemed to have the right
signature. However, the use of Lwt_stream seems to be discouraged and I was
somewhat lost with choosing the right tool for that particular task.  Anyway,
Lwt_stream did the job just fine.
Test results and conclusion
Overall, my experience of using Lwt was nothing short of delightful.  It
had all the tools and primitives exactly as and how I needed them.  The
documentation was comprehensive and comprehensible at the same time.
However, the speed-up was not quite what I had hoped for and what the reply to my Reddit post hinted on. The median run time is 14 min and 40 sec which is 40% faster than the synchronous single core version (23 min) but pales when compared to 1 min 7 sec of the multi-process version described in the Run OCaml on 24 cores post.
Memory consumption peaked at 1.6G which was significantly worse compared to 100M of the single core version but 20x better than 32G of the multi-process version.
I think OCaml is a great language that provides just the right mix of mathematical rigor and practicality needed for day-to-day work, yet the absence of out-of-the-box multicore support makes it hard to justify its use when we routinely can have 24 or even 64 cores. The argument "most programs are IO-bound and would gain more from using async IO" holds when we talk about 2x or 4x core systems, but, as the experiment clearly demonstrated, is no longer true in the realm of the modern multi core systems. And the trend is likely to continue.