Parallel processing tutorial
See also: Overview of parallel processing in Transducers.jl
Quick examples
Sequential processing
using Transducers
xs = randn(10_000_000)
foldl(+, Map(sin), xs)346.22356705699764Thread-based parallelism
Just replace foldl with foldxt, to make use of multiple cores:
foldxt(+, Map(sin), xs)346.22356705699764(In my laptop (4 core machine) I start seeing some speedup around length(xs) ≥ 100_000 for this transducer and reducing function.)
Process-based parallelism
using Distributed
#addprocs(4) # add worker processes
foldxd(+, Map(sin), xs)346.22356705699764(Note: there is likely no speedup for light-weight computation and large input data like this, when using foldxd.)
Parallel processing with iterator comprehensions
You can also use parallel processing functions in Transducers.jl such as foldxt, foldxd, tcollect, dcollect, tcopy and dcopy with iterator comprehensions:
foldxt(+, (sin(x) for x in xs if abs(x) < 1); basesize = 500_000)-1895.5171345635522foldxt(+, (x * y for x in 1:3, y in 1:3))36tcollect(sin(x) for x in xs if abs(x) < 1)6824898-element Vector{Float64}:
0.8252565580389531
-0.8300617544717868
0.7844822964655624
-0.03279724661903634
-0.5652961462871117
0.6887793785253857
-0.7716536093391957
0.6487898919899266
0.7652939345558171
0.08558627635059872
⋮
0.2146817304246814
0.328242104232756
-0.5339099169603772
0.3830588696681145
0.61734715936978
0.24392177051582176
-0.44584895972634175
0.1631736265575484
-0.7620652279182762using StructArrays: StructVector
table = StructVector(a = [1, 2, 3], b = [5, 6, 7])
tcopy((A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a))2-element StructArray(::Vector{Int64}, ::Vector{Int64}) with eltype @NamedTuple{A::Int64, B::Int64}:
(A = 2, B = 4)
(A = 4, B = 6)When can I use foldxt and foldxd?
Requirement 1: Associative reducing step function
Parallel reductions such as foldxt and foldxd requires associative reducing step function. Recall that associativity means that the grouping of evaluations of binary operator does not matter:
op = + # for example
a, b, c = 1, 2, 3 # for example
@assert op(op(a, b), c) == op(a, op(b, c))Given this property, computations like a + b + c + d can be done with different "groupings":
a + b + c + d
= ((a + b) + c) + d +
/ \
+ d
/ \
+ c foldl-like grouping
/ \
a b
= (a + b) + (c + d) +
/ \
/ \
/ \ reduce-like grouping
+ +
/ \ / \
a b c dNotice that, in the last grouping, computation of c + d does not have to wait for the result of a + b. This is why we need associativity for parallel execution.
Do not confuse associativity with commutativity op(a, b) = op(b, a). For example, matrix multiplication *(::Matrix, ::Matrix) is associative but not commutative in general. However, since foldxt only requires associativity, it is valid to use foldxt(*, xf, matrices).
As reducing function + is associative, it can be used with foldxt (and foldxd):
foldxt(+, Map(identity), 1:10; init = 0, basesize = 1)55and the result is the same as the sequential version:
foldl(+, Map(identity), 1:10; init = 0)55Note: basesize is for forcing foldxt to avoid falling back to foldl for small length container such as 1:10.
On the other hand, binary function - is not associative. Thus, foldxt cannot be used instead of foldl (they produce different result):
foldxt(-, Map(identity), 1:10; init = 0, basesize = 1)-5foldl(+, Map(identity), 1:10; init = 0)55Requirement 2: stateless transducers
Parallel reduction only work with stateless transducers Map, Filter, Cat, etc. and you will get an error when using stateful transducers such as Scan with foldxt or foldxd:
foldxt(+, Scan(+), 1:10; basesize = 1)ERROR: Stateful transducer Scan(+) does not support `combine`Stateful transducers cannot be used with foldxt because it is impossible to start processing input collection from the middle when the transducers need to know all previous elements (= stateful).
ScanEmit is a stateful transducer but it is assumed that it is used in a context that outputs can be treated as stateless (see: Splitting a string into words and counting them in parallel).
Example: parallel collect
Suppose (pretend) there is a compute-heavy transducer:
xf_compute = opcompose(Filter(!ismissing), Map(x -> x^2))Transducers.jl supports applying this to an input container and then collecting the results into another container. It can be done sequentially (collect, copy, etc.) and in parallel using threads (tcollect, tcopy) or using multiple processes (dcollect, dcopy). For example:
xs = [abs(x) > 1 ? missing : x for x in randn(10_000)]
y1 = collect(xf_compute, xs)Doing this in parallel is as easy as using tcollect or dcollect. However, it is easy to do this manually, too:
using BangBang: append!!
singleton_vector(x) = [x]
y2 = foldxt(append!!, xs |> xf_compute |> Map(singleton_vector))
@assert y1 == y2This code illustrates the common pattern in parallel processing:
Put a result from the transducer in a "singleton solution". Here, it is
[x].Then "merge" the (singleton) solution into the existing one. This is done by
append!!in the above example.
To illustrate how foldxt(append!!, xs |> ... |> Map(singleton_vector)) works, let's create a reducing function that records the arguments and returned values of append!!:
chan = Channel(Inf)
function append_and_log!!(a, b)As arguments and output may be mutated later, we use copy to record the snapshots of their values at this moment:
a0 = copy(a)
b0 = copy(b)
c = append!!(a, b)
put!(chan, (a0, b0) => copy(c))
return c
endThis function can be used instead of append!!. Let's try simpler and shorter example. This is equivalent to collect(1:4):
foldxt(append_and_log!!, Map(singleton_vector), 1:4; basesize = 1, init = Union{}[])4-element Vector{Int64}:
1
2
3
4(See below for why we are using init = Union{}[] here.)
Here is the list of arguments and returned value of append!! in this reduction:
records = Pair[]
while isready(chan)
push!(records, take!(chan))
end
records7-element Vector{Pair}:
(Union{}[], [1]) => [1]
(Union{}[], [3]) => [3]
(Union{}[], [2]) => [2]
(Union{}[], [4]) => [4]
([1], [2]) => [1, 2]
([3], [4]) => [3, 4]
([1, 2], [3, 4]) => [1, 2, 3, 4]This recorded inputs and outputs of append!! show that its "call tree" is:
[1,2,3,4] <------------- append!!([1,2], [3,4]) == [1,2,3,4]
/ \
[1,2] [3,4] <------- append!!([3], [4]) == [3, 4]
/ \ / \
[1] [2] [3] [4] <---- append!!([], [4]) == [4]
/ \ / \ / \ / \
[] [1] [] [2] [] [3] [] [4]Compare this to the example a + b + c + d above.
Optimization and generic container handling
Above usage of foldxt is not quite efficient as singleton_vector allocates small objects in the heap. Thus, it makes sense to use immutable objects for the singleton solutions so that Julia compiler can eliminate allocation of the intermediate singleton solutions. Here, this can be done by simply using SVectorinstead of singleton_vector:
using StaticArrays: SVector
foldxt(append!!, Map(SVector), 1:4)4-element StaticArraysCore.SVector{4, Int64} with indices SOneTo(4):
1
2
3
4However, notice that the return value is a static vector. This is not ideal when the input collection is large. The output collection type can be specified by init. We can simply use init = Union{}[] in this case:
foldxt(append!!, Map(SVector), 1:4; init = Union{}[])4-element Vector{Int64}:
1
2
3
4Note that passing Vector to init of foldxt is usually a wrong choice as it would mean that the same object is simultaneously mutated by different threads. However, since Vector{Union{}} cannot have any element (as there is no object of type Union{}), using Union{}[] for init is an exception and it is a good way to indicate that output vector should use the "smallest" eltype required. That is to say, append!! widens the vector "just enough" to fit the resulting elements.
For generic containers (e.g., various table types), use BangBang.Empty as the empty initial value. This is useful for creating a table object such as DataFrame as the result of parallel processing:
using BangBang: Empty
using DataFrames: DataFrame
foldxt(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(DataFrame))4×1 DataFrame
Row │ a
│ Int64
─────┼───────
1 │ 1
2 │ 2
3 │ 3
4 │ 4It is slightly more tricky to make this approach work with other table types such as StructArrays and TypedTables. Use tcopy or dcopy to work with generic containers.
Example: ad-hoc histogram
Following example counts number of occurrence of each leading digit in a distribution of random numbers. First, let's create "singleton solutions" using transducers:
xs = 1_000_000 * randn(10_000_000)
dicts1 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
Dict(y => 1)
endThe singleton solutions can be merged using mergewith!(+, a, b). Conveniently, mergewith!(+) is the curried form (args...) -> mergewith!(+, args...):
rf! = mergewith!(+)
rf!(Dict(:a => 1, :b => 2), Dict(:b => 3, :c => 4))Dict{Symbol, Int64} with 3 entries:
:a => 1
:b => 5
:c => 4This is the form of binary function appropriate for foldl and foldxt.
Note that it is OK to use in-place function mergewith! here because the dictionary passed as a is created by Dict(y => 1) and not shared by anyone. When there is no such guarantee, passing init = OnInit(Dict{Int,Int}) is a good option. Note that passing init = Dict{Int,Int}() to foldxt is not correct as multiple tasks would share and try to mutate the same dictionary this way.
Let's try this with parallel foldxt:
counts1 = foldxt(mergewith!(+), dicts1)Compare the result with foldl:
counts2 = foldl(mergewith!(+), dicts1)
@assert counts1 == counts2Hopefully the result is close to the Benford's law - Wikipedia:
let n = sum(values(counts1))
sort!(keys(counts1) .=> values(counts1) ./ n)
end9-element Vector{Pair{Int64, Float64}}:
1 => 0.3596507236856513
2 => 0.12888261599435438
3 => 0.08648987784089006
4 => 0.08100887290798561
5 => 0.07744176969759273
6 => 0.07343096608786948
7 => 0.06915306223775601
8 => 0.0643175578858021
9 => 0.0596245536620983Since we are counting only nine elements, it is actually better to use fixed-size container such as a tuple in this case:
dicts2 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
ntuple(i -> i == y, 9)
end
counts3 = foldxt(dicts2; init=ntuple(_ -> 0, 9)) do a, b
map(+, a, b)
end
@assert Dict(zip(1:9, counts3)) == counts1Note that, as tuples are immutable, it is valid to pass it as init of foldxt.
MicroCollections.jl for efficient singleton solution
When the appropriate "bins" are not known, mergewith!(+)-based strategy is more appropriate. However, it is not ideal to allocate a small container like Dict(y => 1) in the heap for each iteration. MicroCollections.jl provides singleton (and empty) containers that are designed for this usecase. The SingletonDict is "upcast" to the mutable Dict in the first invocation when merged with BangBang.jl functions:
using BangBang: mergewith!!
using MicroCollections: SingletonDict
acc1 = mergewith!!(+, SingletonDict(:a => 1), SingletonDict(:b => 1))Dict(:a => 1, :b => 1)This dictionary is reused in the subsequent iterations:
acc2 = mergewith!!(+, acc1, SingletonDict(:b => 1))Dict(:a => 1, :b => 2)acc3 = mergewith!!(+, acc2, SingletonDict(:c => 1))Dict(:a => 1, :b => 2, :c => 1)The first result is reused across these iterations (within a single thread).
@assert acc1 === acc2 === acc3Finally, Dicts from different threads are merged using the same function mergewith!!(+):
acc4 = Dict(:a => 5, :c => 3) # from different thread
acc5 = mergewith!!(+, acc3, acc4)Dict(:a => 6, :b => 2, :c => 4)Thus, dicts1 can be optimized simply by replacing Dict(y => 1) with SingletonDict(y => 1):
dicts3 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
SingletonDict(y => 1)
end
counts4 = foldxt(mergewith!!(+), dicts3)
@assert counts1 == counts4Example: early termination
Find the first element that is multiple of three:
foldxt(ReduceIf(x -> x % 3 == 0), 1:10; init = nothing, basesize = 1) do _, x
# # Uncomment for demo:
# x == 3 ? sleep(0.1) : @show x # give other tasks a chance to finish first
return x
end3This snippet always returns 3, even though the reductions for c = 6 and c = 9 may finish first.
This page was generated using Literate.jl.