Multithreading
The iterators chunks
and index_chunks
can be very useful in combination with @spawn
and @threads
for task-based multithreading. Let's see how we can use them together.
Example: Parallel summation
julia> using ChunkSplitters: chunk
julia> using Base.Threads: nthreads, @spawn
julia> function parallel_sum(f, x; n=nthreads())
tasks = map(chunks(x; n=n)) do c
@spawn sum(f, c)
end
return sum(fetch, tasks)
end
parallel_sum (generic function with 1 method)
julia> x = rand(10^5);
julia> parallel_sum(identity, x) ≈ sum(identity, x) # true
true
julia> using BenchmarkTools
julia> @btime sum(x -> log(x)^7, $x);
938.583 μs (0 allocations: 0 bytes)
julia> @btime parallel_sum(x -> log(x)^7, $x; n=Threads.nthreads());
321.083 μs (44 allocations: 3.42 KiB)
Note that by chunking x
we can readily control how many tasks we will use for the parallelisation. One reason why this is useful is that we can reduce the (large) overhead that we would have to pay if we would simply spawn length(x)
tasks:
julia> @btime parallel_sum(x -> log(x)^7, $x; n=length(x)); # equivalent no chunking
40.259 ms (700006 allocations: 54.17 MiB)
Another reason why chunking is useful is that by setting n <= nthreads()
we can make parallel_sum
use only a subset of the available Julia threads:
julia> @btime parallel_sum(x -> log(x)^7, $x; n=2); # use only 2 tasks/threads
506.875 μs (16 allocations: 1.20 KiB)
Lastly, as we'll discuss further down below, the ability to control the elements-to-task mapping allows you to tune load-balancing for non-uniform workload.
@threads
and enumerate
If you try to rewrite the parallel summation example above and try to use @threads
instead of @spawn
you might realize that it won't work by just using chunks
alone. The reason is that we need to store the partial (chunk-)sums in a vector and to write to the correct slots of this vector, we need a chunk index in each task.
The natural solution is to use enumerate(chunks(...))
, which will give us the necessary chunk indices besides the chunks. And in fact, this works:
julia> using ChunkSplitters: chunk
julia> using Base.Threads: nthreads, @threads
julia> function parallel_sum(f, x; n=nthreads())
psums = Vector{eltype(x)}(undef, n)
@threads for (i, c) in enumerate(chunks(x; n=n))
psums[i] = sum(f, c)
end
return sum(psums)
end
parallel_sum (generic function with 1 method)
julia> x = rand(10^5);
julia> parallel_sum(identity, x) ≈ sum(identity, x) # true
true
julia> using BenchmarkTools
julia> @btime sum(x -> log(x)^7, $x);
936.625 μs (0 allocations: 0 bytes)
julia> @btime parallel_sum(x -> log(x)^7, $x; n=Threads.nthreads());
319.000 μs (35 allocations: 3.42 KiB)
However, the fact that this works is that we actively support it. In general, @threads
isn't compatible with enumerate
:
julia> @threads for (i, x) in enumerate(1:10)
@show i, x
end
ERROR: TaskFailedException
nested task error: MethodError: no method matching firstindex(::Base.Iterators.Enumerate{UnitRange{Int64}})
[...]
Dynamic load balancing
Consider the following function:
f_nonuniform(x) = sum(abs2, rand() for _ in 1:(2^14*x))
The workload and computational cost of f_nonuniform(x)
is non-uniform and increases linearly with x
:
julia> xs = 1:2^7;
julia> workload = 2^14 .* xs;
julia> lineplot(xs, workload; xlabel="x", ylabel="∝ workload of f_nonuniform(x)", xlim=(1,2^7), ylim=(minimum(ys), maximum(ys)))
┌────────────────────────────────────────┐
2 097 152 │⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⠔⠋│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡠⠚⠁⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⡠⠖⠉⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠴⠋⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡤⠚⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣠⠔⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⠔⠊⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
∝ workload of f_nonuniform(x) │⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡠⠊⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⡠⠔⠉⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠔⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡤⠚⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⠀⠀⠀⣠⠖⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⠀⠀⠀⣀⠴⠊⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
│⠀⠀⢀⡤⠊⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
16 384 │⣠⠔⠉⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀│
└────────────────────────────────────────┘
⠀1⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀128⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀x⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
Let's now reconsider our parallel_sum
implementation from above and see how it can handle the non-uniformity of the workload for different values of n
.
julia> using ChunkSplitters, Base.Threads, BenchmarkTools
julia> xs = 1:512
julia> f_nonuniform(x) = sum(abs2, rand() for _ in 1:(2^14*x))
julia> function parallel_sum(f, x; n=nthreads(), split=Consecutive())
tasks = map(chunks(x; n=n, split=split)) do c
@spawn sum(f, c)
end
return sum(fetch, tasks)
end
parallel_sum (generic function with 1 method)
julia> @btime parallel_sum($f_nonuniform, $xs; n=nthreads());
861.248 ms (45 allocations: 3.39 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=2*nthreads());
683.033 ms (86 allocations: 6.59 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=4*nthreads());
638.611 ms (170 allocations: 13.06 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=8*nthreads());
585.203 ms (338 allocations: 26.02 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=16*nthreads());
567.806 ms (674 allocations: 51.88 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=32*nthreads());
557.139 ms (1346 allocations: 103.67 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=64*nthreads());
556.886 ms (2690 allocations: 207.33 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=128*nthreads());
558.612 ms (3586 allocations: 276.33 KiB)
We notice that, up to some point, increasing the number of tasks beyond nthreads()
improves the runtime. The reason is that we give the dynamic scheduler more freedom to dynamically balance the increasing number tasks/chunks among threads. Compare this to n=nthreads()
, where there is only one task/chunk per thread and no flexibility for load balancing at all. On the other hand, we can also see the downside of creating more tasks: the number and size of allocations increases.
From this we learn that, in general, a balance must be found between more tasks (→ better load balancing) and not too many tasks (→ fewer allocations and less overhead).
"Load balancing" via RoundRobin
Apart from increasing the number of tasks/chunks to improve dynamic load balancing, we can also statically distribute the workload more efficiently among tasks by choosing split=RoundRobin()
. This way, each task/chunk will get workload from everywhere along the linear curve plotted above. This effectively leads to better balancing of the load.
Let's demonstrate and benchmark this effect for the case n=nthreads()
, which, essentially, corresponds to turned off dynamic load balancing.
julia> @btime parallel_sum($f_nonuniform, $xs; n=nthreads(), split=Consecutive());
857.914 ms (44 allocations: 3.36 KiB)
julia> @btime parallel_sum($f_nonuniform, $xs; n=nthreads(), split=RoundRobin());
566.818 ms (45 allocations: 3.39 KiB)
Note that with RoundRobin()
, we obtain a runtime that is comparable to n=16*nthreads()
with Consecutive()
(dynamic load balancing, see above). At the same time, the RoundRobin()
variant is more efficient in terms of allocations.
@threads :static
The strategy of using RoundRobin()
as a mean to get static load balancing also works with @threads
and even @threads :static
.
julia> function parallel_sum_atthreads(f, x; n=nthreads(), split=Consecutive())
psums = zeros(Float64, n)
@threads :static for (i, c) in enumerate(chunks(x; n=n, split=split))
psums[i] = sum(f, c)
end
return sum(psums)
end
julia> @btime parallel_sum_atthreads($f_nonuniform, $xs; n=nthreads(), split=Consecutive());
850.475 ms (35 allocations: 3.53 KiB)
julia> @btime parallel_sum_atthreads($f_nonuniform, $xs; n=nthreads(), split=RoundRobin());
567.175 ms (35 allocations: 3.53 KiB)