frawk provides a rudamentary model of parallelism that can speed up many simple scripts with little or no modifications. One relatively unique aspect of frawk's implementation is that it can achieve nontrivial speedups even when the input is only a single input file or stream. The first part of this doc explains the architecture that frawk uses to read formats like CSV in a parallel-friendly manner. While simple aggregations do not need to be modified to successfully run in parallel, running a script in this mode can change the meaning of a script. The second portion provides an overview of the semantics of a frawk script when it is run in parallel.
Note: frawk only supports parallel execution for CSV, TSV, scripts that only split by whitespace, and scripts that only use a unique, single-byte field separator and single-byte record separator. In time, this limitation may be relaxed, but those formats are unlikely to support the same level of performance with record-level parallelism.
Why expand the language in this way in the first place? After all, existing tools like GNU Parallel already provide a succinct means of performing shell commands in parallel. I have 2 reasons for this, with reason 2 being more important than reason 1.
- While existing solutions are quite succinct, some simple aggregations can be even easier to write in a single frawk program.
- While it is relatively straightforward to run a command in parallel across multiple input files, existing tools have a hard time achieving parallelism within a single input file.
frawk supports a file-per-worker model of parallelism using the -pf
option,
but I think its support for record-level parallelism (under the -pr
option) is
more interesting.
Consider the CSV format. Parallelizing CSV parsing is a difficult task because a parser must take a different action based on whether characters like commas occur inside a quoted field. Most existing solutions I have come across construct a state machine that a parser steps through on a byte-by-byte basis.
Recent approaches to parsing JSON and
CSV instead perform a first pass to
write out a sequence of locations of structural characters within the input:
Relevant structural characters in CSV are ,
s that are not inside a quoted
field, "
characters, \n
and \r\n
sequences. The remaining parsing task may
still need to step through a state machine of some kind, but this state machine
only executes once per structural character, not once per byte in the input.
Because this first phase can be implemented extremely cheaply using SIMD
instructions, this approach achieves substantial end-to-end performance gains on
recent CPUs.
frawk implements this approach for scripts with CSV, TSV and single-byte-separator inputs. Not only does this approach provide high performance for all scripts that consume input in this form, the separation of parsing into two phases provides us with an opportunity to parallelize the reading of a single CSV file. A single worker thread performs an initial pass on a chunk of input data to discover structural characters, it then locates a relevant record separator and sends that chunk off to a worker thread. That worker thread can then finish the parsing task at its own pace.
This architecture doesn't scale perfectly --- I've seen diminishing marginal returns after 4-6 workers depending on the machine --- but it scales fast enough to process CSV files at >2GB/s on my laptop, which is much faster than I have been able to process CSV otherwise. The performance doc provides measurements of the speedups that different frawk scripts achieve when run this way, as well as comparisons to other tools performing the same task.
frawk supports a limited notion of parallelism suitable for performing simple aggregations or transformations on textual data. The goal of frawk's parallelism in its current form is to facilitate parallelizing frawk scripts that are already embarrassingly parallel. While there are many possible directions to go from here, this strikes me as a modest but useful first step.
Like Awk, frawk executes programs as a sequence of "patterns" and "actions."
Actions are blocks of code that are executed if a pattern matches. Most patterns
are tested against each successive line of input, with the exception being
the BEGIN
and END
patterns whose corresponding actions are executed before
and after any input is read, respectively.
To be precise, Awk scripts that only have a
BEGIN
pattern never read any input outside of explicitgetline
calls.
When frawk is passed the pr
or pf
command-line options, it compiles the program
in parallel mode. In this mode, the frawk program is broken into three "stages":
- The
BEGIN
block is executed by a single thread. - The main loop (i.e. pattern/action pairs aside from
BEGIN
andEND
) is executed independently in parallel by a configurable (via the-j
flag) number of worker threads. Variables mentioned in both theBEGIN
block and the main loop are copied to the worker threads. - The
END
block is executed by a single thread after all worker threads terminate. Variables mentioned in both theEND
block and the main loop are copied from each worker thread and aggregated before being accessed by the thread executing theEND
block.
Here is a simple "select" script written in frawk that extracts the 2nd column of an input source:
{ print $2; }
Running this script with the -pr
option will run this
script in parallel, with a dynamic number of worker threads each getting a roughly equal portion of the input.
The output of the parallel script will be the same
as an invocation without the -pr
option up to a reordering of of the output rows.
A similar workload in the performance
doc gets
close to a 2x speedup in record-oriented parallel mode, despite the fact that
writes to output files are all serialized, and all input records come from a
single file.
Implicit Aggregations Variables that are referenced in both the main loop and
END
block of a script are implicitly aggregated across all worker threads.
Scalars are aggregated according to rules that are a bit arbitrary, but as we
shall see you always have the option of performing the aggregation explicitly.
- Scalars are aggregated differently based on their type. Integer and floating-point values are summed. String variables are aggregated by picking an arbitrary non-empty representative value from one of the worker threads.
- Maps are aggregated by performing a union of the underlying sets of key/value pairs, with overlapping values being aggregated according to the corresponding scalar rule.
This, among other things, means that simple aggregations like sums:
{ SUM += $1 }
END { print SUM }
Or group-by's:
{ HIST[$1]++ }
END {
for (k in HIST) {
print k, HIST[k];
}
}
Produce equivalent output when run in serial and parallel modes, with the usual caveats about map iteration ordering (undefined) and floating point addition (it is not associative).
Other Aggregations While useful, the aggregations that happen by default are not universally applicable. Consider the embarrassingly parallel task of finding the maximum (lexicographic) value of a particular column:
{
if (NR==1) {
max=$2;
} else {
max=max>=$2?max:$2;
}
}
END {
print max;
}
This script is no longer correct if it is run in parallel. In parallel, the
aggregation rules dictate that it will simply return a maximum value observed
by one of the worker threads. To aggregate explicitly, worker threads are
provided with a PID
variable which takes on a positive integer value counting
up from 1, with each thread receiving a unique PID
. Note that PID
values
aren't always consecutive. This, combined with the implicit aggregation for
maps, lets us write an explicit max aggregation.
{
if (NR==1) {
max[PID]=$2;
} else {
max[PID]=max[PID]>=$2?max[PID]:$2;
}
}
END {
for (pid in max) {
if (!i++) {
max_val = max[pid]
} else {
max_val = max_val>max[pid]?max_val:max[pid]
}
}
print max_val;
}
Because the repeated map references are both annoying to write and inefficient
to execute, frawk has a PREPARE
block which executes in the worker threads at
the end of its input:
{
if (NR==1) {
max=$2;
} else {
max=max>=$2?max:$2;
}
}
PREPARE { max_map[PID] = max }
END {
for (pid in max_map) {
v = max_map[pid]
max = max>v?max:v
}
print max;
}
For a more involved example of an explicit aggregation, see the "Statistics" benchmark in the performance doc.