Parallel processing tutorial
See also: Overview of parallel processing in Transducers.jl
Quick examples
Sequential processing
using Transducers
xs = randn(10_000_000)
foldl(+, Map(sin), xs)
346.22356705699764
Thread-based parallelism
Just replace foldl
with foldxt
, to make use of multiple cores:
foldxt(+, Map(sin), xs)
346.22356705699764
(In my laptop (4 core machine) I start seeing some speedup around length(xs) ≥ 100_000
for this transducer and reducing function.)
Process-based parallelism
using Distributed
#addprocs(4) # add worker processes
foldxd(+, Map(sin), xs)
346.22356705699764
(Note: there is likely no speedup for light-weight computation and large input data like this, when using foldxd
.)
Parallel processing with iterator comprehensions
You can also use parallel processing functions in Transducers.jl such as foldxt
, foldxd
, tcollect
, dcollect
, tcopy
and dcopy
with iterator comprehensions:
foldxt(+, (sin(x) for x in xs if abs(x) < 1); basesize = 500_000)
-1895.5171345635522
foldxt(+, (x * y for x in 1:3, y in 1:3))
36
tcollect(sin(x) for x in xs if abs(x) < 1)
6824898-element Vector{Float64}:
0.8252565580389531
-0.8300617544717868
0.7844822964655624
-0.03279724661903634
-0.5652961462871117
0.6887793785253857
-0.7716536093391957
0.6487898919899266
0.7652939345558171
0.08558627635059872
⋮
0.2146817304246814
0.328242104232756
-0.5339099169603772
0.3830588696681145
0.61734715936978
0.24392177051582176
-0.44584895972634175
0.1631736265575484
-0.7620652279182762
using StructArrays: StructVector
table = StructVector(a = [1, 2, 3], b = [5, 6, 7])
tcopy((A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a))
2-element StructArray(::Vector{Int64}, ::Vector{Int64}) with eltype @NamedTuple{A::Int64, B::Int64}:
(A = 2, B = 4)
(A = 4, B = 6)
When can I use foldxt
and foldxd
?
Requirement 1: Associative reducing step function
Parallel reductions such as foldxt
and foldxd
requires associative reducing step function. Recall that associativity means that the grouping of evaluations of binary operator does not matter:
op = + # for example
a, b, c = 1, 2, 3 # for example
@assert op(op(a, b), c) == op(a, op(b, c))
Given this property, computations like a + b + c + d
can be done with different "groupings":
a + b + c + d
= ((a + b) + c) + d +
/ \
+ d
/ \
+ c foldl-like grouping
/ \
a b
= (a + b) + (c + d) +
/ \
/ \
/ \ reduce-like grouping
+ +
/ \ / \
a b c d
Notice that, in the last grouping, computation of c + d
does not have to wait for the result of a + b
. This is why we need associativity for parallel execution.
Do not confuse associativity with commutativity op(a, b) = op(b, a)
. For example, matrix multiplication *(::Matrix, ::Matrix)
is associative but not commutative in general. However, since foldxt
only requires associativity, it is valid to use foldxt(*, xf, matrices)
.
As reducing function +
is associative, it can be used with foldxt
(and foldxd
):
foldxt(+, Map(identity), 1:10; init = 0, basesize = 1)
55
and the result is the same as the sequential version:
foldl(+, Map(identity), 1:10; init = 0)
55
Note: basesize
is for forcing foldxt
to avoid falling back to foldl
for small length container such as 1:10
.
On the other hand, binary function -
is not associative. Thus, foldxt
cannot be used instead of foldl
(they produce different result):
foldxt(-, Map(identity), 1:10; init = 0, basesize = 1)
-5
foldl(+, Map(identity), 1:10; init = 0)
55
Requirement 2: stateless transducers
Parallel reduction only work with stateless transducers Map
, Filter
, Cat
, etc. and you will get an error when using stateful transducers such as Scan
with foldxt
or foldxd
:
foldxt(+, Scan(+), 1:10; basesize = 1)
ERROR: Stateful transducer Scan(+) does not support `combine`
Stateful transducers cannot be used with foldxt
because it is impossible to start processing input collection from the middle when the transducers need to know all previous elements (= stateful).
ScanEmit
is a stateful transducer but it is assumed that it is used in a context that outputs can be treated as stateless (see: Splitting a string into words and counting them in parallel).
Example: parallel collect
Suppose (pretend) there is a compute-heavy transducer:
xf_compute = opcompose(Filter(!ismissing), Map(x -> x^2))
Transducers.jl supports applying this to an input container and then collecting the results into another container. It can be done sequentially (collect
, copy
, etc.) and in parallel using threads (tcollect
, tcopy
) or using multiple processes (dcollect
, dcopy
). For example:
xs = [abs(x) > 1 ? missing : x for x in randn(10_000)]
y1 = collect(xf_compute, xs)
Doing this in parallel is as easy as using tcollect
or dcollect
. However, it is easy to do this manually, too:
using BangBang: append!!
singleton_vector(x) = [x]
y2 = foldxt(append!!, xs |> xf_compute |> Map(singleton_vector))
@assert y1 == y2
This code illustrates the common pattern in parallel processing:
Put a result from the transducer in a "singleton solution". Here, it is
[x]
.Then "merge" the (singleton) solution into the existing one. This is done by
append!!
in the above example.
To illustrate how foldxt(append!!, xs |> ... |> Map(singleton_vector))
works, let's create a reducing function that records the arguments and returned values of append!!
:
chan = Channel(Inf)
function append_and_log!!(a, b)
As arguments and output may be mutated later, we use copy
to record the snapshots of their values at this moment:
a0 = copy(a)
b0 = copy(b)
c = append!!(a, b)
put!(chan, (a0, b0) => copy(c))
return c
end
This function can be used instead of append!!
. Let's try simpler and shorter example. This is equivalent to collect(1:4)
:
foldxt(append_and_log!!, Map(singleton_vector), 1:4; basesize = 1, init = Union{}[])
4-element Vector{Int64}:
1
2
3
4
(See below for why we are using init = Union{}[]
here.)
Here is the list of arguments and returned value of append!!
in this reduction:
records = Pair[]
while isready(chan)
push!(records, take!(chan))
end
records
7-element Vector{Pair}:
(Union{}[], [1]) => [1]
(Union{}[], [3]) => [3]
(Union{}[], [2]) => [2]
(Union{}[], [4]) => [4]
([1], [2]) => [1, 2]
([3], [4]) => [3, 4]
([1, 2], [3, 4]) => [1, 2, 3, 4]
This recorded inputs and outputs of append!!
show that its "call tree" is:
[1,2,3,4] <------------- append!!([1,2], [3,4]) == [1,2,3,4]
/ \
[1,2] [3,4] <------- append!!([3], [4]) == [3, 4]
/ \ / \
[1] [2] [3] [4] <---- append!!([], [4]) == [4]
/ \ / \ / \ / \
[] [1] [] [2] [] [3] [] [4]
Compare this to the example a + b + c + d
above.
Optimization and generic container handling
Above usage of foldxt
is not quite efficient as singleton_vector
allocates small objects in the heap. Thus, it makes sense to use immutable objects for the singleton solutions so that Julia compiler can eliminate allocation of the intermediate singleton solutions. Here, this can be done by simply using SVector
instead of singleton_vector
:
using StaticArrays: SVector
foldxt(append!!, Map(SVector), 1:4)
4-element StaticArraysCore.SVector{4, Int64} with indices SOneTo(4):
1
2
3
4
However, notice that the return value is a static vector. This is not ideal when the input collection is large. The output collection type can be specified by init
. We can simply use init = Union{}[]
in this case:
foldxt(append!!, Map(SVector), 1:4; init = Union{}[])
4-element Vector{Int64}:
1
2
3
4
Note that passing Vector
to init
of foldxt
is usually a wrong choice as it would mean that the same object is simultaneously mutated by different threads. However, since Vector{Union{}}
cannot have any element (as there is no object of type Union{}
), using Union{}[]
for init
is an exception and it is a good way to indicate that output vector should use the "smallest" eltype
required. That is to say, append!!
widens the vector "just enough" to fit the resulting elements.
For generic containers (e.g., various table types), use BangBang.Empty
as the empty initial value. This is useful for creating a table object such as DataFrame
as the result of parallel processing:
using BangBang: Empty
using DataFrames: DataFrame
foldxt(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(DataFrame))
4×1 DataFrame
Row │ a
│ Int64
─────┼───────
1 │ 1
2 │ 2
3 │ 3
4 │ 4
It is slightly more tricky to make this approach work with other table types such as StructArrays
and TypedTables
. Use tcopy
or dcopy
to work with generic containers.
Example: ad-hoc histogram
Following example counts number of occurrence of each leading digit in a distribution of random numbers. First, let's create "singleton solutions" using transducers:
xs = 1_000_000 * randn(10_000_000)
dicts1 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
Dict(y => 1)
end
The singleton solutions can be merged using mergewith!(+, a, b)
. Conveniently, mergewith!(+)
is the curried form (args...) -> mergewith!(+, args...)
:
rf! = mergewith!(+)
rf!(Dict(:a => 1, :b => 2), Dict(:b => 3, :c => 4))
Dict{Symbol, Int64} with 3 entries:
:a => 1
:b => 5
:c => 4
This is the form of binary function appropriate for foldl
and foldxt
.
Note that it is OK to use in-place function mergewith!
here because the dictionary passed as a
is created by Dict(y => 1)
and not shared by anyone. When there is no such guarantee, passing init = OnInit(Dict{Int,Int})
is a good option. Note that passing init = Dict{Int,Int}()
to foldxt
is not correct as multiple tasks would share and try to mutate the same dictionary this way.
Let's try this with parallel foldxt
:
counts1 = foldxt(mergewith!(+), dicts1)
Compare the result with foldl
:
counts2 = foldl(mergewith!(+), dicts1)
@assert counts1 == counts2
Hopefully the result is close to the Benford's law - Wikipedia:
let n = sum(values(counts1))
sort!(keys(counts1) .=> values(counts1) ./ n)
end
9-element Vector{Pair{Int64, Float64}}:
1 => 0.3596507236856513
2 => 0.12888261599435438
3 => 0.08648987784089006
4 => 0.08100887290798561
5 => 0.07744176969759273
6 => 0.07343096608786948
7 => 0.06915306223775601
8 => 0.0643175578858021
9 => 0.0596245536620983
Since we are counting only nine elements, it is actually better to use fixed-size container such as a tuple in this case:
dicts2 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
ntuple(i -> i == y, 9)
end
counts3 = foldxt(dicts2; init=ntuple(_ -> 0, 9)) do a, b
map(+, a, b)
end
@assert Dict(zip(1:9, counts3)) == counts1
Note that, as tuples are immutable, it is valid to pass it as init
of foldxt
.
MicroCollections.jl for efficient singleton solution
When the appropriate "bins" are not known, mergewith!(+)
-based strategy is more appropriate. However, it is not ideal to allocate a small container like Dict(y => 1)
in the heap for each iteration. MicroCollections.jl provides singleton (and empty) containers that are designed for this usecase. The SingletonDict
is "upcast" to the mutable Dict
in the first invocation when merged with BangBang.jl functions:
using BangBang: mergewith!!
using MicroCollections: SingletonDict
acc1 = mergewith!!(+, SingletonDict(:a => 1), SingletonDict(:b => 1))
Dict(:a => 1, :b => 1)
This dictionary is reused in the subsequent iterations:
acc2 = mergewith!!(+, acc1, SingletonDict(:b => 1))
Dict(:a => 1, :b => 2)
acc3 = mergewith!!(+, acc2, SingletonDict(:c => 1))
Dict(:a => 1, :b => 2, :c => 1)
The first result is reused across these iterations (within a single thread).
@assert acc1 === acc2 === acc3
Finally, Dict
s from different threads are merged using the same function mergewith!!(+)
:
acc4 = Dict(:a => 5, :c => 3) # from different thread
acc5 = mergewith!!(+, acc3, acc4)
Dict(:a => 6, :b => 2, :c => 4)
Thus, dicts1
can be optimized simply by replacing Dict(y => 1)
with SingletonDict(y => 1)
:
dicts3 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
SingletonDict(y => 1)
end
counts4 = foldxt(mergewith!!(+), dicts3)
@assert counts1 == counts4
Example: early termination
Find the first element that is multiple of three:
foldxt(ReduceIf(x -> x % 3 == 0), 1:10; init = nothing, basesize = 1) do _, x
# # Uncomment for demo:
# x == 3 ? sleep(0.1) : @show x # give other tasks a chance to finish first
return x
end
3
This snippet always returns 3
, even though the reductions for c = 6
and c = 9
may finish first.
This page was generated using Literate.jl.