Async OCaml with Lwt

One of the suggestions I got on Reddit when I asked about better approaches to utilising multiple cores was to use Lwt or async. I got some free time at one of the weekends and started with reading the Lwt manual. I think it is one of the nicest pieces of documentation I have seen so far in the OCaml world.

The problem at hand is described in Run OCaml on 24 cores and Haskell Vs. Go Vs. OCaml Vs.. articles. In a few words the problem is to read metrics stored in over eight hundred thousand RRD files and dump enriched data points into one CSV file. Reading metrics is done via running rrdtool in daemon mode, passing it some commands and parsing its output.

Before I continue, I should mention that I spent literally no more than half of a day on the Lwt-based solution and it is more than likely that it can be further optimised or that I simply misunderstood/misused some parts of the library. With the disclaimer in place, let’s dive in.

Iterating over the list of files

The list of files to be processed is devised using the result of a database query. For each row in the query result we should build a pattern and feed it into glob function imported from libc. The function acts somewhat similar to find command, that is it returns the list of files that match the pattern.

However, the call is blocking and it was bit of a head scratcher for me as to how to make it run asynchronously. That was right up to the moment I read the documentation on Lwt_preemptive module which fitted perfectly for that kind of task. It runs a computation in a separate thread and fulfills the promise when the thread finishes! All the “dirty” job is done by the library. To quote the manual

detach f x runs the computation f x in a separate preemptive thread. detach evaluates to an Lwt promise, which is pending until the preemptive thread completes.

And it has the nice signature that made it trivial to embed glob calls into the rest of the program:

val detach : ('a -> 'b) -> 'a -> 'b Lwt.t

The full listing of the “main” function is as follows

let iterate_rrd_files base_path devices ifaces pool interval =
    (* function g converts a tuple describing a particular
     device into rrd_t array and processes each of the
     elements asynchronously *)
    let g (id, host, dc) =
        let process_rrd_array rrd_array =
            rrd_array
            (* query_rrd does the main job *)
            |> Array.map(query_rrd pool interval)
            |> Array.to_list
            |> Lwt.join
        in
        (* a helper function that wraps the glob call *)
        let pattern_to_rrds (pattern : string) : rrd_t array =
            pattern
            |> Glob.glob
            (* process/filter the result of the glob call *)
            |> Array.filter_map (rrd_from_path ifaces (id, host, dc)
        in
        make_pattern(base_path, dc, id)
        |> Lwt_preemptive.detach pattern_to_rrds
        >>= process_rrd_array
    in
    (* concurrently iterate over the list of devices *)
    Lwt_list.iter_p g devices |> Lwt_main.run

Pool of rrdtool processes

Another interesting feature of Lwt is that it allows to parallelise IO by creating a pool of workers. In my case the pool was a pool of rrdtool processes started in daemon mode.

let create_rrdtool_pool size =
    Lwt_pool.create size
      ~dispose: (fun rrdtool -> rrdtool#close >|= ignore)
      (fun () ->
          Lwt_process.open_process("", [|"rrdtool"; "-"|])
          |> Lwt.return)

I tried to tweak the size of the pool changing it from 1 to n_cores, however the difference between pool sizes of e.g. 8 and 16 was negligible. I believe that was because rrdtool was not the bottleneck and 8x parallelisation were able to keep up with the rest of the program.

And again, as it was in the case of Lwt_preemptive I couldn’t help but appreciate how handy the library was, having exactly the tool I needed.

Processing rrdtool output

The last piece of the solution was to send commands to and read the output of rrdtool. The tricky part here was that the output (after some processing) should go into CSV file as a contiguous block

let query_rrd pool interval rrd =
    (* returns either (Some datapoint_line) Lwt.t or None Lwt.t *)
    let get_line rrdtool () =
        Lwt_io.read_line rrdtool#stdout >|= check_response
    in
    let q rrdtool =
        (* prepare command *)
        let cmd = fmt_cmd rrd.path interval in
        (* write it to the tool's stdin *)
        Lwt_io.write_line rrdtool#stdin cmd
        >>= fun () -> Lwt_io.flush rrdtool#stdin
        (* and read/parse lines till we get to the end of response
           sending the result directly to stdout *)
        >>= fun () ->
              Lwt_stream.from (get_line rrdtool)
              |> Lwt_stream.iter_s (Lwt_io.write_line Lwt_io.stdout)
    in
    Lwt_pool.use pool q

It wasn’t quite clear to me how to achieve that continuity in the async world without converting a list of promises into a promise containing the result list. I used Lwt_stream.from/iter_s because it seemed to have the right signature. However, the use of Lwt_stream seems to be discouraged and I was somewhat lost with choosing the right tool for that particular task. Anyway, Lwt_stream did the job just fine.

Test results and conclusion

Overall, my experience of using Lwt was nothing short of delightful. It had all the tools and primitives exactly as and how I needed them. The documentation was comprehensive and comprehensible at the same time.

However, the speed-up was not quite what I had hoped for and what the reply to my Reddit post hinted on. The median run time is 14 min and 40 sec which is 40% faster than the synchronous single core version (23 min) but pales when compared to 1 min 7 sec of the multi-process version described in the Run OCaml on 24 cores post.

Memory consumption peaked at 1.6G which was significantly worse compared to 100M of the single core version but 20x better than 32G of the multi-process version.

I think OCaml is a great language that provides just the right mix of mathematical rigor and practicality needed for day-to-day work, yet the absence of out-of-the-box multicore support makes it hard to justify its use when we routinely can have 24 or even 64 cores. The argument “most programs are IO-bound and would gain more from using async IO” holds when we talk about 2x or 4x core systems, but, as the experiment clearly demonstrated, is no longer true in the realm of the modern multi core systems. And the trend is likely to continue.