Ocaml: run it on 24 cores

As I mentioned in the “Haskell Vs. Go Vs. OCaml Vs.” post, I deliberately secluded the program to run on a single core to have a fair comparison with the original Perl/Python version. However, having a 24 core machine and using only one did not seem right to me. So when I got some free time last weekend I decided to start with OCaml version. And until multicore OCaml kicks off for real 1 using many OS processes seems to be the only option.

The problem

To quote the problem definition from “Haskell Vs. Go Vs. OCaml Vs.” -

  1. parse command line arguments to gather parameters about DB connection, in-out folders/files and something called device groups

  2. query the required set of devices and relevant metadata from the MySQL database

  3. based on the device set deduce the list of RRD files

  4. for each file in the file set query data points for the 12:00AM - 11:55PM interval of the previous day

  5. dump every data point using human-readable metric name, device name, etc. as a row to the specified .csv file

Step 1 and step 2 (which takes milliseconds even on the production data set) makes little sense to parallelise and with step 5 being inherently sequential, I got only steps 3 and 4 to experiment with.

Parmap

The initial search lead me to parmap library. It did seem like a good choice, especially Parmap.parfold or Parmap.parmapfold functions, up to the moment I closely inspected the function signature (omitting the optional parameters):

val parfold: ('a -> 'b -> 'b) -> 'a sequence -> 'b -> ('b->'b->'b) -> 'b

where ('a -> 'b -> 'b) is a reducing function that runs inside child processes and ('b -> 'b -> 'b) is a concatenating function that glues together partial results in the parent process.

In my case 'a could be an rrd file from the list of files deduced on step 3 and 'b could be a list of rows in the resulting csv file. So the reducing function would be something like

let reducer (f: rrd_file) (acc: csv_row list) : csv_row list =
    (* get all the data points for file f *)
    let rows = get_csv_rows_for_file f in
       (* concatenate the rows with all the previous rows *)
        rows @ acc

The list concatenation operator xs @ ys has complexity O(length(xs)). Given that the requested interval has at most 287 data points per file (5 minute-long intervals per day) and we’ve got approximately 800000/num_of_cores files per child process the fold operation should be rather efficient.

The problem, however is the concatenating function 'b -> 'b -> 'b. I think the only function that is both commutative and associative for lists is the very same concatenation operator. And in this case we’d have to deal with millions of rows in xs (the resulting file has ~270 million rows) and also keep the whole thing in memory. Had the function had a different signature, e.g. 'b -> 'c -> 'c I could use something like

let concat rows out_channel =
    dump_rows_to_channel rows out_channel;
    out_channel

thus discarding the no longer needed partial result. Alas it was not the case so without writing a single line of code I had to look for another solution.

Pipes. Unix pipes.

As a first take I decided to split the device set into N chunks and then iterate over the result of processing of each chunk in the parent process.

let process_devices n_chunks base_path devices ifaces interval =
 let total_devices = List.length devices in
 let chunk_size = (total_devices+n_chunks-1)/n_chunks in
 let f (sin, pid) =
   IO.copy sin stdout;
   close_in sin;
   ignore @@ Unix.waitpid [] pid;
 in
 devices
 |> List.ntake chunk_size
 |> List.map (spawn_worker base_path ifaces interval)
 |> List.iter f

where spawn_worker starts a child process and returns the read end of the pipe along with the child’s pid:

let spawn_worker base_bath ifaces interval devices =
 let (fd_in, fd_out) = Unix.pipe () in
 match Unix.fork () with
 | 0 ->
   (* child process *)
   let sout = Unix.output_of_descr ~cleanup:true fd_out in
   let rrdtool = Unix.open_process "rrdtool -" in
   Unix.close fd_in;
   list_rrd_files base_bath devices ifaces rrdtool interval sout;
   ignore (Unix.close_process rrdtool);
   flush sout; close_out sout;
   exit 0

 | pid ->
   (* parent process *)
   Unix.close fd_out;
   (Unix.input_of_descr fd_in, pid)

The list_rrd_files function required only minimal change to write into sout instead of stdout and use the resizable buffer to accumulate the intermediate results:

let list_rrd_files base_path devices ifaces rrdtool interval sout =
 let buf = Buffer.create initial_buf_size in
 let g device =
   (* deduce the file set for device and query the data points
      saving the results in `buf` *)
 in
 devices
 |> List.enum
 |> Enum.iter g;
 Buffer.print sout buf

After testing the changes I ran it on the production data set and got 2min 18s with n_chunks set to 48. It was a great improvement compared to 23 minutes for single-core version! I also ran it with n_cores set to 1 to measure the impact of having the extra process - 23min 55s. Hmm… I would imagine that the 55s went to memory allocation and data copying between processes and for all those system calls. After all, the resulting csv files occupies 17GB on disk.

Pipes + Parmap

To somewhat visualise the process I added some timestamps

(* in the parent process before forking *)
let start_time = Unix.time ()
(* when child process forked *)
Printf.eprintf "spawned worker %d\n" pid;
(* ... *)
(* in the child process, right before `exit` *)
Printf.eprintf "worker %d finished in %f\n" (Unix.getpid()) (Unix.time() -. start_time);

and noticed that the run time of individual processes ranged anywhere from 1s to 120s. That meant that the distribution of files per devices is anything but uniform.

I speculated that if I were to better utilise the cores the run time would necessarily improve. One way to do so that was to evenly divide files across worker processes, but that required the list (more specifically its length) to be known a priori.

I didn’t want to loose the ability to run step 3 in parallel. Reasoning that the number of files was significantly less than the number of resulting rows I decided that using Parmap would not incur too much overhead.

The list_rrd_files function was changed to actually list files without any further processing:

let list_rrd_files base_path devices ifaces =
 let glob device rrds =
   let rrds' =
     Printf.sprintf "%s/%s/*/%s/*/*.rrd" base_path device.dc device.id
     |> Glob.glob
     |> Array.enum
     |> Enum.filter_map (rrd_from_path ifaces device)
     |> List.of_enum
   in rrds' @ rrds
 in
 Parmap.parfold glob (Parmap.L devices) [] (@)

and process_devices now was splitting the rrd file list in stead of the list of devices:

let process_devices n_chunks base_path devices ifaces interval =
 let rrds = list_rrd_files base_path devices ifaces in
 let total_rrds = List.length rrds in
 let chunk_size = (total_rrds+n_chunks-1)/n_chunks in
 let f (sin, pid) =
   IO.copy sin stdout;
   close_in sin; ignore @@ Unix.waitpid [] pid;
 in
 rrds
 |> List.ntake chunk_size
 |> List.map (spawn_worker interval)
 |> List.iter f

The effect of the change was fantastic - 1min 7s. That is almost 24x times better than the single-core version.

Future improvements

Could it do even better? I think it could, if not for the run time then in terms of memory consumption. Spawning all those processes and accumulating intermediate results takes approximately 32GB of RAM. While it was not an issue for the beefy machine I ran the tests on, the original VM had only 8GB of RAM and the program would OOM there.

The idea was to process data points for a single rrd file as soon as they become available. One requirements for the tool is to have the per-file data points in order and without gaps, but we are free to mix results from different devices.

I needed something like that

let process_first_available workers sout =
   match workers with
   | [] ->
     `Done
   | _ ->
     let worker = select workers in
       match read_data worker with
       | Some datapoints -> (* per-file datapoints *)
         write sout datapoints
       | None -> (* end of file, remove the worker from the list *)
         process_first_available (List.remove workers worker) sout

The select function did look like it could be implemented via Unix.select, but the problem was that reading from a file descriptor had byte-level granularity while I needed “data points per file” granularity. That would necessitate keeping intermediate buffers per file descriptor, frame the data using some special byte sequences (e.g. double line feed) and so on. In other words, it seemed more complicated than I could have accomplished during those weekends.

To sum it up I was impressed by how easy it was to parallelise the program using processes and by the performance gains achieved. However, I felt like I was re-inventing the wheel because surely I was not the first one to encounter such a problem.


  1. I am not quite sure about the state of the project. On the one hand it would seem like it’s been there for years and there are a number of papers and presentations floating around. On the other hand there have been multiple OCaml releases since the project started and they don’t seem to include multicore support.

    [return]