Ocaml: run it on 24 cores
Table of Contents
As I mentioned in the "Haskell Vs. Go Vs. OCaml Vs." post, I deliberately secluded the program to run on a single core to have a fair comparison with the original Perl/Python version. However, having a 24 core machine and using only one did not seem right to me. So when I got some free time I decided to start with the OCaml version. And until multicore OCaml kicks off for real 1 using many OS processes seems to be the only option.
The problem
To quote the problem definition from "Haskell Vs. Go Vs. OCaml Vs." -
- parse command line arguments to gather parameters about DB connection, in-out folders/files and something called device groups
- query the required set of devices and relevant metadata from a MySQL database
- based on the device set deduce a set of RRD files
- for each file in the file set query data points for the 12:00AM - 11:55PM interval of the previous day
- dump every data point using human-readable metric name, device name, etc. as
a row to a specified
.csv
file
Step 1 and step 2 take milliseconds even on a production data set so it makes no sense to parallelise those, and with step 5 being inherently sequential, I got only steps 3 and 4 to experiment with.
Parmap
The initial search lead me to the parmap library. It did seem like a good
choice, especially Parmap.parfold
or Parmap.parmapfold
functions, until I
closely inspected the function signature:
val parfold: ('a -> 'b -> 'b) -> 'a sequence -> 'b -> ('b->'b->'b) -> 'b
where ('a -> 'b -> 'b)
is a reducing function that runs inside child
processes and ('b -> 'b -> 'b)
is a concatenating function that glues
together partial results in the parent process.
In my case 'a
could be an rrd file from the list of files deduced on step 3
and 'b
could be a list of rows in the resulting csv file. So the reducing
function would be something like
let reducer (f: rrd_file) (acc: csv_row list) : csv_row list = (* get all the data points for file f *) let rows = get_csv_rows_for_file f in (* concatenate the rows with all the previous rows *) rows @ acc
The list concatenation operator xs @ ys
has complexity O(length(xs))
. Given
that the requested interval has at most 287 data points per file (number 5 minute
intervals per day) and we've got approximately 800000/num_of_cores
files per
child process, the fold operation should be fairly efficient.
The problem, however is the concatenating function 'b -> 'b -> 'b
. I think
the only function that is both commutative and associative for lists is the
very same concatenation operator. And in this case we'd have to deal with
millions of rows in xs
(the resulting file has ~270 million rows) and also
keep the whole thing in memory. Had the function had a different signature,
e.g. 'b -> 'c -> 'c
I could use something like
let concat rows out_channel = dump_rows_to_channel rows out_channel; out_channel
thus discarding the no longer needed partial result. Alas, it was not the case so without writing a single line of code I had to look for another solution.
Pipes. Unix pipes.
As a first take I decided to split the device set into N
chunks and then
iterate over the result of processing of each chunk in the parent process.
let process_devices n_chunks base_path devices ifaces interval = let total_devices = List.length devices in let chunk_size = (total_devices+n_chunks-1)/n_chunks in let f (sin, pid) = IO.copy sin stdout; close_in sin; ignore @@ Unix.waitpid [] pid; in devices |> List.ntake chunk_size |> List.map (spawn_worker base_path ifaces interval) |> List.iter f
where spawn_worker
starts a child process and returns the read end of the
pipe along with the child's pid
:
let spawn_worker base_bath ifaces interval devices = let (fd_in, fd_out) = Unix.pipe () in match Unix.fork () with | 0 -> (* child process *) let sout = Unix.output_of_descr ~cleanup:true fd_out in let rrdtool = Unix.open_process "rrdtool -" in Unix.close fd_in; list_rrd_files base_bath devices ifaces rrdtool interval sout; ignore (Unix.close_process rrdtool); flush sout; close_out sout; exit 0 | pid -> (* parent process *) Unix.close fd_out; (Unix.input_of_descr fd_in, pid)
The list_rrd_files
function required only minimal change to write into sout
instead of stdout
and use the resizable buffer to accumulate the intermediate
results:
let list_rrd_files base_path devices ifaces rrdtool interval sout = let buf = Buffer.create initial_buf_size in let g device = (* deduce the file set for device and query the data points saving the results in `buf` *) in devices |> List.enum |> Enum.iter g; Buffer.print sout buf
After testing the changes I ran it on the production data set and got 2min 18s
with n_chunks
set to 48. It was a great improvement compared to 23 minutes
for the single-core version! I also ran it with n_cores
set to 1 to measure the
impact of having the extra process - 23min 55s. Hmm… I would imagine that
the 55s went to memory allocation and data copying between processes and for
all those system calls. After all, the resulting csv files occupies 17GB on
disk.
Pipes + Parmap
To somewhat visualise the process I added some timestamps
(* in the parent process before forking *) let start_time = Unix.time () (* when child process forked *) Printf.eprintf "spawned worker %d\n" pid; (* ... *) (* in the child process, right before `exit` *) Printf.eprintf "worker %d finished in %f\n" (Unix.getpid()) (Unix.time() -. start_time);
and noticed that the run time of individual processes ranged anywhere from 1s to 120s. That meant that the distribution of data per device is anything but uniform.
I speculated that if I were to better utilise the cores, the run time would necessarily improve. One way to do so that was to evenly divide files across worker processes, but that required the list (more specifically its length) to be known a priori.
I didn't want to loose the ability to run step 3 in parallel. Reasoning that
the number of files was significantly less than the number of resulting rows I
decided that using Parmap
would not incur too much overhead.
The list_rrd_files
function was changed to actually list files without
any further processing:
let list_rrd_files base_path devices ifaces = let glob device rrds = let rrds' = Printf.sprintf "%s/%s/*/%s/*/*.rrd" base_path device.dc device.id |> Glob.glob |> Array.enum |> Enum.filter_map (rrd_from_path ifaces device) |> List.of_enum in rrds' @ rrds in Parmap.parfold glob (Parmap.L devices) [] (@)
and process_devices
now was splitting the rrd file list in stead of the list
of devices:
let process_devices n_chunks base_path devices ifaces interval = let rrds = list_rrd_files base_path devices ifaces in let total_rrds = List.length rrds in let chunk_size = (total_rrds+n_chunks-1)/n_chunks in let f (sin, pid) = IO.copy sin stdout; close_in sin; ignore @@ Unix.waitpid [] pid; in rrds |> List.ntake chunk_size |> List.map (spawn_worker interval) |> List.iter f
The effect of the change was fantastic - 1min 7s. That is almost 24x times better than the single-core version.
Future improvements
Could it do even better? I think it could, if not for the run time then in terms of memory consumption. Spawning all those processes and accumulating intermediate results takes approximately 32GB of RAM. While it was not an issue for the beefy machine I ran the tests on, the original VM had only 8GB of RAM and the program would OOM there.
The idea was to process data points for a single rrd file as soon as they become available. One requirements for the tool is to have per-file data points in order and without gaps, but we are free to mix results from different devices.
I needed something like that
let process_first_available workers sout = match workers with | [] -> `Done | _ -> let worker = select workers in match read_data worker with | Some datapoints -> (* per-file datapoints *) write sout datapoints | None -> (* end of file, remove the worker from the list *) process_first_available (List.remove workers worker) sout
The select
function did look like it could be implemented via Unix.select
,
but the problem was that reading from a file descriptor had byte-level
granularity while I needed "data points per file" granularity. That would
necessitate keeping intermediate buffers per file descriptor, frame the data
using some special byte sequences (e.g. double line feed) and so on. In other
words, it seemed more complicated than I could have accomplished during those
weekends.
To sum it up, I was impressed by how easy it was to parallelise the program using processes and by the performance gains achieved. However, I felt like I was re-inventing the wheel because surely I was not the first one to encounter such a problem.
Footnotes:
I am not quite sure about the state of the project. On the one hand it would seem like it's been there for years and there are a number of papers and presentations floating around. On the other hand there have been multiple OCaml releases since the project started and they don't seem to include multicore support.