Public API

Exported

Macros

OhMyThreads.@tasksMacro
@tasks for ... end

A macro to parallelize a for loop by spawning a set of tasks that can be run in parallel. The policy of how many tasks to spawn and how to distribute the iteration space among the tasks (and more) can be configured via @set statements in the loop body.

Supports reductions (@set reducer=<reducer function>) and collecting the results (@set collect=true).

Under the hood, the for loop is translated into corresponding parallel tforeach, tmapreduce, or tmap calls.

See also: @set, @local

Examples

using OhMyThreads: @tasks
@tasks for i in 1:3
    println(i)
end
@tasks for x in rand(10)
    @set reducer=+
    sin(x)
end
@tasks for i in 1:5
    @set collect=true
    i^2
end
@tasks for i in 1:100
    @set ntasks=4*nthreads()
    # non-uniform work...
end
@tasks for i in 1:5
    @set scheduler=:static
    println("i=", i, " → ", threadid())
end
@tasks for i in 1:100
    @set begin
        scheduler=:static
        chunksize=10
    end
    println("i=", i, " → ", threadid())
end
source
OhMyThreads.@setMacro
@set name = value

This can be used inside a @tasks for ... end block to specify settings for the parallel execution of the loop.

Multiple settings are supported, either as separate @set statements or via @set begin ... end.

Settings

  • reducer (e.g. reducer=+): Indicates that a reduction should be performed with the provided binary function. See tmapreduce for more information.
  • collect (e.g. collect=true): Indicates that results should be collected (similar to map).

All other settings will be passed on to the underlying parallel functions (e.g. tmapreduce) as keyword arguments. Hence, you may provide whatever these functions accept as keyword arguments. Among others, this includes

  • scheduler (e.g. scheduler=:static): Can be either a Scheduler or a Symbol (e.g. :dynamic, :static, :serial, or :greedy).
  • init (e.g. init=0.0): Initial value to be used in a reduction (requires reducer=...).

Settings like ntasks, chunksize, and split etc. can be used to tune the scheduling policy (if the selected scheduler supports it).

Note that the assignment is hoisted above the loop body which means that the scope is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand is a useful tool to inspect the generated code of the @tasks block.)

source
OhMyThreads.@localMacro
@local name = value

@local name::T = value

Can be used inside a @tasks for ... end block to specify task-local values (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations.

There can only be a single @local block in a @tasks for ... end block. To specify multiple TLVs, use @local begin ... end. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other.

Examples

using OhMyThreads: @tasks
using OhMyThreads.Tools: taskid

@tasks for i in 1:10
    @set begin
        scheduler=:dynamic
        ntasks=2
    end
    @local x = zeros(3) # TLV

    x .+= 1
    println(taskid(), " -> ", x)
end
@tasks for i in 1:10
    @local begin
        x = rand(Int, 3)
        M = rand(3, 3)
    end
    # ...
end

Task local variables created by @local are by default constrained to their inferred type, but if you need to, you can specify a different type during declaration:

@tasks for i in 1:10
    @local x::Vector{Float64} = some_hard_to_infer_setup_function()
    # ...
end

The right hand side of the assignment is hoisted outside of the loop body and captured as a closure used to initialize the task local value. This means that the scope of the closure is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand is a useful tool to inspect the generated code of the @tasks block.)

source
OhMyThreads.@only_oneMacro
@only_one begin ... end

This can be used inside a @tasks for ... end block to mark a region of code to be executed by only one of the parallel tasks (all other tasks skip over this region).

Example

using OhMyThreads: @tasks

@tasks for i in 1:10
    @set ntasks = 10

    println(i, ": before")
    @only_one begin
        println(i, ": only printed by a single task")
        sleep(1)
    end
    println(i, ": after")
end
source
OhMyThreads.@one_by_oneMacro
@one_by_one begin ... end

This can be used inside a @tasks for ... end block to mark a region of code to be executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary and non-deterministic.

Example

using OhMyThreads: @tasks

@tasks for i in 1:10
    @set ntasks = 10

    println(i, ": before")
    @one_by_one begin
        println(i, ": one task at a time")
        sleep(0.5)
    end
    println(i, ": after")
end
source

Functions

OhMyThreads.tmapreduceFunction
tmapreduce(f, op, A::AbstractArray...;
           [scheduler::Union{Scheduler, Symbol} = :dynamic],
           [outputtype::Type = Any],
           [init])

A multithreaded function like Base.mapreduce. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: tmapreduce

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, tmapreduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.treduceFunction
treduce(op, A::AbstractArray...;
        [scheduler::Union{Scheduler, Symbol} = :dynamic],
        [outputtype::Type = Any],
        [init])

A multithreaded function like Base.reduce. Perform a reduction over A using the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: treduce

treduce(+, [1, 2, 3, 4, 5])

is the parallelized version of sum([1, 2, 3, 4, 5]) in the form

(1 + 2) + (3 + 4) + 5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, treduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tmapFunction
tmap(f, [OutputElementType], A::AbstractArray...;
     [scheduler::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.map. Create a new container similar to A and fills it in parallel such that the ith element is equal to f(A[i]).

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

using OhMyThreads: tmap

tmap(sin, 1:10)

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tmap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tmap(sin, 1:10; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tmap!Function
tmap!(f, out, A::AbstractArray...;
      [scheduler::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.map!. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i]) for each index i of A and out.

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tmap! accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tforeachFunction
tforeach(f, A::AbstractArray...;
         [scheduler::Union{Scheduler, Symbol} = :dynamic]) :: Nothing

A multithreaded function like Base.foreach. Apply f to each element of A on multiple parallel tasks, and return nothing. I.e. it is the parallel equivalent of

for x in A
    f(x)
end

Example:

using OhMyThreads: tforeach

tforeach(1:10) do i
    println(i^2)
end

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tforeach accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tforeach(1:10; chunksize=2, scheduler=:static) do i
    println(i^2)
end

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tcollectFunction
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
         [scheduler::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.collect. Essentially just calls tmap on the generator function and inputs.

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

using OhMyThreads: tcollect

tcollect(sin(i) for i in 1:10)

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tcollect accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.treducemapFunction
treducemap(op, f, A::AbstractArray...;
           [scheduler::Union{Scheduler, Symbol} = :dynamic],
           [outputtype::Type = Any],
           [init])

Like tmapreduce except the order of the f and op arguments are switched. This is sometimes convenient with do-block notation. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: treducemap

treducemap(+, √, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, treducemap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source

Schedulers

OhMyThreads.Schedulers.DynamicSchedulerType
DynamicScheduler (aka :dynamic)

The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.

Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.

Keyword arguments:

  • nchunks::Integer or ntasks::Integer (default nthreads(threadpool)):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Increasing nchunks can help with load balancing, but at the expense of creating more overhead. For nchunks <= nthreads() there are not enough chunks for any load balancing.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks/ntasks are mutually exclusive (only one may be a positive integer).
  • split::Union{Symbol, OhMyThreads.Split} (default OhMyThreads.Consecutive()):
    • Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
    • See ChunkSplitters.jl for more details and available options. We also allow users to pass :consecutive in place of Consecutive(), and :roundrobin in place of RoundRobin()
    • Beware that for split=OhMyThreads.RoundRobin() the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
  • chunking::Bool (default true):
    • Controls whether input elements are grouped into chunks (true) or not (false).
    • For chunking=false, the arguments nchunks/ntasks, chunksize, and split are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
  • threadpool::Symbol (default :default):
    • Possible options are :default and :interactive.
    • The high-priority pool :interactive should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without yielding as it can interfere with heartbeat processes.
source
OhMyThreads.Schedulers.StaticSchedulerType
StaticScheduler (aka :static)

A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).

Can sometimes be more performant than DynamicScheduler when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.

Keyword arguments:

  • nchunks::Integer or ntasks::Integer (default nthreads()):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • For nchunks > nthreads() the chunks will be distributed to the available threads in a round-robin fashion.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks/ntasks are mutually exclusive (only one may be non-zero).
  • chunking::Bool (default true):
    • Controls whether input elements are grouped into chunks (true) or not (false).
    • For chunking=false, the arguments nchunks/ntasks, chunksize, and split are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
  • split::Union{Symbol, OhMyThreads.Split} (default OhMyThreads.Consecutive()):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
    • See ChunkSplitters.jl for more details and available options. We also allow users to pass :consecutive in place of Consecutive(), and :roundrobin in place of RoundRobin()
    • Beware that for split=OhMyThreads.RoundRobin() the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
source
OhMyThreads.Schedulers.GreedySchedulerType
GreedyScheduler (aka :greedy)

A greedy dynamic scheduler. The elements are put into a shared workqueue and dynamic, non-sticky, tasks are spawned to process the elements of the queue with each task taking a new element from the queue as soon as the previous one is done.

Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!

Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.

Keyword arguments:

  • ntasks::Int (default nthreads()):
    • Determines the number of parallel tasks to be spawned.
    • Setting ntasks < nthreads() is an effective way to use only a subset of the available threads.
  • chunking::Bool (default false):
    • Controls whether input elements are grouped into chunks (true) or not (false) before put into the shared workqueue. This can improve the performance especially if there are many iterations each of which are computationally cheap.
    • If nchunks or chunksize are explicitly specified, chunking will be automatically set to true.
  • nchunks::Integer (default 10 * nthreads()):
    • Determines the number of chunks (that will eventually be put into the shared workqueue).
    • Increasing nchunks can help with load balancing. For nchunks <= nthreads() there are not enough chunks for any load balancing.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks are mutually exclusive (only one may be a positive integer).
  • split::Union{Symbol, OhMyThreads.Split} (default OhMyThreads.RoundRobin()):
    • Determines how the collection is divided into chunks (if chunking=true).
    • See ChunkSplitters.jl for more details and available options. We also allow users to pass :consecutive in place of Consecutive(), and :roundrobin in place of RoundRobin()
source
OhMyThreads.Schedulers.SerialSchedulerType
SerialScheduler (aka :serial)

A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., tmapreduce(sin, +, 1:100) behave like their serial counterparts, e.g., mapreduce(sin, +, 1:100).

source

Re-exported

OhMyThreads.chunkssee ChunkSplitters.chunks
OhMyThreads.index_chunkssee ChunkSplitters.index_chunks

Public but not exported

OhMyThreads.@spawnsee StableTasks.@spawn
OhMyThreads.@spawnatsee StableTasks.@spawnat
OhMyThreads.@fetchsee StableTasks.@fetch
OhMyThreads.@fetchfromsee StableTasks.@fetchfrom
OhMyThreads.TaskLocalValuesee TaskLocalValues.TaskLocalValue
OhMyThreads.Splitsee ChunkSplitters.Split
OhMyThreads.Consecutivesee ChunkSplitters.Consecutive
OhMyThreads.RoundRobinsee ChunkSplitters.RoundRobin
OhMyThreads.WithTaskLocalsType
struct WithTaskLocals{F, TLVs <: Tuple{Vararg{TaskLocalValue}}} <: Function

This callable function-like object is meant to represent a function which closes over some TaskLocalValues. This is, if you do

TLV{T} = TaskLocalValue{T}
f = WithTaskLocals((TLV{Int}(() -> 1), TLV{Int}(() -> 2))) do (x, y)
    z -> (x + y)/z
end

then that is equivalent to

g = let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    z -> let x = x[], y=y[]
        (x + y)/z
    end
end

however, the main difference is that you can call promise_task_local on a WithTaskLocals closure in order to turn it into something equivalent to

let x=x[], y=y[]
    z -> (x + y)/z
end

which doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition.

source
OhMyThreads.promise_task_localFunction
promise_task_local(f) = f
promise_task_local(f::WithTaskLocals) = f.inner_func(map(x -> x[], f.tasklocals))

Take a WithTaskLocals closure, grab the TaskLocalValues, and passs them to the closure. That is, it turns a WithTaskLocals closure from the equivalent of

TLV{T} = TaskLocalValue{T}
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    z -> let x = x[], y=y[]
        (x + y)/z
    end
end

into the equivalent of

let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    let x = x[], y = y[]
        z -> (x + y)/z
    end
end

which doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition. ```

source
OhMyThreads.ChannelLikeType
ChannelLike(itr)

This struct wraps an indexable object such that it can be iterated by concurrent tasks in a safe manner similar to a Channel.

ChannelLike(itr) is conceptually similar to:

Channel{eltype(itr)}(length(itr)) do ch
    foreach(i -> put!(ch, i), itr)
end

i.e. creating a channel, put!ing all elements of itr into it and closing it. The advantage is that ChannelLike doesn't copy the data.

Examples

ch = OhMyThreads.ChannelLike(1:5)

@sync for taskid in 1:2
    Threads.@spawn begin
        for i in ch
            println("Task #$taskid processing item $i")
            sleep(1 / i)
        end
    end
end

# output

Task #1 processing item 1
Task #2 processing item 2
Task #2 processing item 3
Task #2 processing item 4
Task #1 processing item 5

Note that ChannelLike is stateful (just like a Channel), so you can't iterate over it twice.

The wrapped iterator must support firstindex(itr)::Int, lastindex(itr)::Int and getindex(itr, ::Int).

source