Public API
Exported
Macros
OhMyThreads.@tasks
— Macro@tasks for ... end
A macro to parallelize a for
loop by spawning a set of tasks that can be run in parallel. The policy of how many tasks to spawn and how to distribute the iteration space among the tasks (and more) can be configured via @set
statements in the loop body.
Supports reductions (@set reducer=<reducer function>
) and collecting the results (@set collect=true
).
Under the hood, the for
loop is translated into corresponding parallel tforeach
, tmapreduce
, or tmap
calls.
Examples
using OhMyThreads: @tasks
@tasks for i in 1:3
println(i)
end
@tasks for x in rand(10)
@set reducer=+
sin(x)
end
@tasks for i in 1:5
@set collect=true
i^2
end
@tasks for i in 1:100
@set ntasks=4*nthreads()
# non-uniform work...
end
@tasks for i in 1:5
@set scheduler=:static
println("i=", i, " → ", threadid())
end
@tasks for i in 1:100
@set begin
scheduler=:static
chunksize=10
end
println("i=", i, " → ", threadid())
end
OhMyThreads.@set
— Macro@set name = value
This can be used inside a @tasks for ... end
block to specify settings for the parallel execution of the loop.
Multiple settings are supported, either as separate @set
statements or via @set begin ... end
.
Settings
reducer
(e.g.reducer=+
): Indicates that a reduction should be performed with the provided binary function. Seetmapreduce
for more information.collect
(e.g.collect=true
): Indicates that results should be collected (similar tomap
).
All other settings will be passed on to the underlying parallel functions (e.g. tmapreduce) as keyword arguments. Hence, you may provide whatever these functions accept as keyword arguments. Among others, this includes
scheduler
(e.g.scheduler=:static
): Can be either aScheduler
or aSymbol
(e.g.:dynamic
,:static
,:serial
, or:greedy
).init
(e.g.init=0.0
): Initial value to be used in a reduction (requiresreducer=...
).
Settings like ntasks
, chunksize
, and split
etc. can be used to tune the scheduling policy (if the selected scheduler supports it).
Note that the assignment is hoisted above the loop body which means that the scope is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand
is a useful tool to inspect the generated code of the @tasks
block.)
OhMyThreads.@local
— Macro@local name = value
@local name::T = value
Can be used inside a @tasks for ... end
block to specify task-local values (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations.
There can only be a single @local
block in a @tasks for ... end
block. To specify multiple TLVs, use @local begin ... end
. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other.
Examples
using OhMyThreads: @tasks
using OhMyThreads.Tools: taskid
@tasks for i in 1:10
@set begin
scheduler=:dynamic
ntasks=2
end
@local x = zeros(3) # TLV
x .+= 1
println(taskid(), " -> ", x)
end
@tasks for i in 1:10
@local begin
x = rand(Int, 3)
M = rand(3, 3)
end
# ...
end
Task local variables created by @local
are by default constrained to their inferred type, but if you need to, you can specify a different type during declaration:
@tasks for i in 1:10
@local x::Vector{Float64} = some_hard_to_infer_setup_function()
# ...
end
The right hand side of the assignment is hoisted outside of the loop body and captured as a closure used to initialize the task local value. This means that the scope of the closure is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand
is a useful tool to inspect the generated code of the @tasks
block.)
OhMyThreads.@only_one
— Macro@only_one begin ... end
This can be used inside a @tasks for ... end
block to mark a region of code to be executed by only one of the parallel tasks (all other tasks skip over this region).
Example
using OhMyThreads: @tasks
@tasks for i in 1:10
@set ntasks = 10
println(i, ": before")
@only_one begin
println(i, ": only printed by a single task")
sleep(1)
end
println(i, ": after")
end
OhMyThreads.@one_by_one
— Macro@one_by_one begin ... end
This can be used inside a @tasks for ... end
block to mark a region of code to be executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary and non-deterministic.
Example
using OhMyThreads: @tasks
@tasks for i in 1:10
@set ntasks = 10
println(i, ": before")
@one_by_one begin
println(i, ": one task at a time")
sleep(0.5)
end
println(i, ": after")
end
Functions
OhMyThreads.tmapreduce
— Functiontmapreduce(f, op, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])
A multithreaded function like Base.mapreduce
. Perform a reduction over A
, applying a single-argument function f
to each element, and then combining them with the two-argument function op
.
Note that op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: tmapreduce
tmapreduce(√, +, [1, 2, 3, 4, 5])
is the parallelized version of sum(√, [1, 2, 3, 4, 5])
in the form
(√1 + √2) + (√3 + √4) + √5
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.outputtype::Type
(defaultAny
): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init
: initial value of the reduction. Will be forwarded tomapreduce
for the task-local sequential parts of the calculation.
In addition, tmapreduce
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.treduce
— Functiontreduce(op, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])
A multithreaded function like Base.reduce
. Perform a reduction over A
using the two-argument function op
.
Note that op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: treduce
treduce(+, [1, 2, 3, 4, 5])
is the parallelized version of sum([1, 2, 3, 4, 5])
in the form
(1 + 2) + (3 + 4) + 5
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.outputtype::Type
(defaultAny
): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init
: initial value of the reduction. Will be forwarded tomapreduce
for the task-local sequential parts of the calculation.
In addition, treduce
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.tmap
— Functiontmap(f, [OutputElementType], A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic])
A multithreaded function like Base.map
. Create a new container similar
to A
and fills it in parallel such that the i
th element is equal to f(A[i])
.
The optional argument OutputElementType
will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType
is not specified.
Example:
using OhMyThreads: tmap
tmap(sin, 1:10)
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.
In addition, tmap
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
tmap(sin, 1:10; chunksize=2, scheduler=:static)
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.tmap!
— Functiontmap!(f, out, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic])
A multithreaded function like Base.map!
. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i])
for each index i
of A
and out
.
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.
In addition, tmap!
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.tforeach
— Functiontforeach(f, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic]) :: Nothing
A multithreaded function like Base.foreach
. Apply f
to each element of A
on multiple parallel tasks, and return nothing
. I.e. it is the parallel equivalent of
for x in A
f(x)
end
Example:
using OhMyThreads: tforeach
tforeach(1:10) do i
println(i^2)
end
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.
In addition, tforeach
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
tforeach(1:10; chunksize=2, scheduler=:static) do i
println(i^2)
end
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.tcollect
— Functiontcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
[scheduler::Union{Scheduler, Symbol} = :dynamic])
A multithreaded function like Base.collect
. Essentially just calls tmap
on the generator function and inputs.
The optional argument OutputElementType
will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType
is not specified.
Example:
using OhMyThreads: tcollect
tcollect(sin(i) for i in 1:10)
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.
In addition, tcollect
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static)
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
OhMyThreads.treducemap
— Functiontreducemap(op, f, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])
Like tmapreduce
except the order of the f
and op
arguments are switched. This is sometimes convenient with do
-block notation. Perform a reduction over A
, applying a single-argument function f
to each element, and then combining them with the two-argument function op
.
Note that op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: treducemap
treducemap(+, √, [1, 2, 3, 4, 5])
is the parallelized version of sum(√, [1, 2, 3, 4, 5])
in the form
(√1 + √2) + (√3 + √4) + √5
Keyword arguments:
scheduler::Union{Scheduler, Symbol}
(default:dynamic
): determines how the computation is divided into parallel tasks and how these are scheduled. SeeScheduler
for more information on the available schedulers.outputtype::Type
(defaultAny
): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init
: initial value of the reduction. Will be forwarded tomapreduce
for the task-local sequential parts of the calculation.
In addition, treducemap
accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler
constructor. Example:
treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
However, to avoid ambiguity, this is currently only supported for scheduler::Symbol
(but not for scheduler::Scheduler
).
Schedulers
OhMyThreads.Schedulers.Scheduler
— TypeSupertype for all available schedulers:
DynamicScheduler
: default dynamic schedulerStaticScheduler
: low-overhead static schedulerGreedyScheduler
: greedy load-balancing schedulerSerialScheduler
: serial (non-parallel) execution
OhMyThreads.Schedulers.DynamicScheduler
— TypeDynamicScheduler (aka :dynamic)
The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.
Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.
Keyword arguments:
nchunks::Integer
orntasks::Integer
(defaultnthreads(threadpool)
):- Determines the number of chunks (and thus also the number of parallel tasks).
- Increasing
nchunks
can help with load balancing, but at the expense of creating more overhead. Fornchunks <= nthreads()
there are not enough chunks for any load balancing. - Setting
nchunks < nthreads()
is an effective way to use only a subset of the available threads.
chunksize::Integer
(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksize
andnchunks
/ntasks
are mutually exclusive (only one may be a positive integer).
split::Union{Symbol, OhMyThreads.Split}
(defaultOhMyThreads.Consecutive()
):- Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutive
in place ofConsecutive()
, and:roundrobin
in place ofRoundRobin()
- Beware that for
split=OhMyThreads.RoundRobin()
the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
chunking::Bool
(defaulttrue
):- Controls whether input elements are grouped into chunks (
true
) or not (false
). - For
chunking=false
, the argumentsnchunks
/ntasks
,chunksize
, andsplit
are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
- Controls whether input elements are grouped into chunks (
threadpool::Symbol
(default:default
):- Possible options are
:default
and:interactive
. - The high-priority pool
:interactive
should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes.
- Possible options are
OhMyThreads.Schedulers.StaticScheduler
— TypeStaticScheduler (aka :static)
A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).
Can sometimes be more performant than DynamicScheduler
when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.
Keyword arguments:
nchunks::Integer
orntasks::Integer
(defaultnthreads()
):- Determines the number of chunks (and thus also the number of parallel tasks).
- Setting
nchunks < nthreads()
is an effective way to use only a subset of the available threads. - For
nchunks > nthreads()
the chunks will be distributed to the available threads in a round-robin fashion.
chunksize::Integer
(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksize
andnchunks
/ntasks
are mutually exclusive (only one may be non-zero).
chunking::Bool
(defaulttrue
):- Controls whether input elements are grouped into chunks (
true
) or not (false
). - For
chunking=false
, the argumentsnchunks
/ntasks
,chunksize
, andsplit
are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
- Controls whether input elements are grouped into chunks (
split::Union{Symbol, OhMyThreads.Split}
(defaultOhMyThreads.Consecutive()
):- Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutive
in place ofConsecutive()
, and:roundrobin
in place ofRoundRobin()
- Beware that for
split=OhMyThreads.RoundRobin()
the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
OhMyThreads.Schedulers.GreedyScheduler
— TypeGreedyScheduler (aka :greedy)
A greedy dynamic scheduler. The elements are put into a shared workqueue and dynamic, non-sticky, tasks are spawned to process the elements of the queue with each task taking a new element from the queue as soon as the previous one is done.
Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!
Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.
Keyword arguments:
ntasks::Int
(defaultnthreads()
):- Determines the number of parallel tasks to be spawned.
- Setting
ntasks < nthreads()
is an effective way to use only a subset of the available threads.
chunking::Bool
(defaultfalse
):- Controls whether input elements are grouped into chunks (
true
) or not (false
) before put into the shared workqueue. This can improve the performance especially if there are many iterations each of which are computationally cheap. - If
nchunks
orchunksize
are explicitly specified,chunking
will be automatically set totrue
.
- Controls whether input elements are grouped into chunks (
nchunks::Integer
(default10 * nthreads()
):- Determines the number of chunks (that will eventually be put into the shared workqueue).
- Increasing
nchunks
can help with load balancing. Fornchunks <= nthreads()
there are not enough chunks for any load balancing.
chunksize::Integer
(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksize
andnchunks
are mutually exclusive (only one may be a positive integer).
split::Union{Symbol, OhMyThreads.Split}
(defaultOhMyThreads.RoundRobin()
):- Determines how the collection is divided into chunks (if chunking=true).
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutive
in place ofConsecutive()
, and:roundrobin
in place ofRoundRobin()
OhMyThreads.Schedulers.SerialScheduler
— TypeSerialScheduler (aka :serial)
A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., tmapreduce(sin, +, 1:100)
behave like their serial counterparts, e.g., mapreduce(sin, +, 1:100)
.
Re-exported
OhMyThreads.chunks | see ChunkSplitters.chunks |
OhMyThreads.index_chunks | see ChunkSplitters.index_chunks |
Public but not exported
OhMyThreads.@spawn | see StableTasks.@spawn |
OhMyThreads.@spawnat | see StableTasks.@spawnat |
OhMyThreads.@fetch | see StableTasks.@fetch |
OhMyThreads.@fetchfrom | see StableTasks.@fetchfrom |
OhMyThreads.TaskLocalValue | see TaskLocalValues.TaskLocalValue |
OhMyThreads.Split | see ChunkSplitters.Split |
OhMyThreads.Consecutive | see ChunkSplitters.Consecutive |
OhMyThreads.RoundRobin | see ChunkSplitters.RoundRobin |
OhMyThreads.WithTaskLocals
— Typestruct WithTaskLocals{F, TLVs <: Tuple{Vararg{TaskLocalValue}}} <: Function
This callable function-like object is meant to represent a function which closes over some TaskLocalValues
. This is, if you do
TLV{T} = TaskLocalValue{T}
f = WithTaskLocals((TLV{Int}(() -> 1), TLV{Int}(() -> 2))) do (x, y)
z -> (x + y)/z
end
then that is equivalent to
g = let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
z -> let x = x[], y=y[]
(x + y)/z
end
end
however, the main difference is that you can call promise_task_local
on a WithTaskLocals
closure in order to turn it into something equivalent to
let x=x[], y=y[]
z -> (x + y)/z
end
which doesn't have the overhead of accessing the task_local_storage
each time the closure is called. This of course will lose the safety advantages of TaskLocalValue
, so you should never do f_local = promise_task_local(f)
and then pass f_local
to some unknown function, because if that unknown function calls f_local
on a new task, you'll hit a race condition.
OhMyThreads.promise_task_local
— Functionpromise_task_local(f) = f
promise_task_local(f::WithTaskLocals) = f.inner_func(map(x -> x[], f.tasklocals))
Take a WithTaskLocals
closure, grab the TaskLocalValue
s, and passs them to the closure. That is, it turns a WithTaskLocals
closure from the equivalent of
TLV{T} = TaskLocalValue{T}
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
z -> let x = x[], y=y[]
(x + y)/z
end
end
into the equivalent of
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
let x = x[], y = y[]
z -> (x + y)/z
end
end
which doesn't have the overhead of accessing the task_local_storage
each time the closure is called. This of course will lose the safety advantages of TaskLocalValue
, so you should never do f_local = promise_task_local(f)
and then pass f_local
to some unknown function, because if that unknown function calls f_local
on a new task, you'll hit a race condition. ```
OhMyThreads.ChannelLike
— TypeChannelLike(itr)
This struct wraps an indexable object such that it can be iterated by concurrent tasks in a safe manner similar to a Channel
.
ChannelLike(itr)
is conceptually similar to:
Channel{eltype(itr)}(length(itr)) do ch
foreach(i -> put!(ch, i), itr)
end
i.e. creating a channel, put!
ing all elements of itr
into it and closing it. The advantage is that ChannelLike
doesn't copy the data.
Examples
ch = OhMyThreads.ChannelLike(1:5)
@sync for taskid in 1:2
Threads.@spawn begin
for i in ch
println("Task #$taskid processing item $i")
sleep(1 / i)
end
end
end
# output
Task #1 processing item 1
Task #2 processing item 2
Task #2 processing item 3
Task #2 processing item 4
Task #1 processing item 5
Note that ChannelLike
is stateful (just like a Channel
), so you can't iterate over it twice.
The wrapped iterator must support firstindex(itr)::Int
, lastindex(itr)::Int
and getindex(itr, ::Int)
.