Public API

Exported

Macros

OhMyThreads.@tasksMacro
@tasks for ... end

A macro to parallelize a for loop by spawning a set of tasks that can be run in parallel. The policy of how many tasks to spawn and how to distribute the iteration space among the tasks (and more) can be configured via @set statements in the loop body.

Supports reductions (@set reducer=<reducer function>) and collecting the results (@set collect=true).

Under the hood, the for loop is translated into corresponding parallel tforeach, tmapreduce, or tmap calls.

See also: @set, @local

Examples

using OhMyThreads: @tasks
@tasks for i in 1:3
    println(i)
end
@tasks for x in rand(10)
    @set reducer=+
    sin(x)
end
@tasks for i in 1:5
    @set collect=true
    i^2
end
@tasks for i in 1:100
    @set ntasks=4*nthreads()
    # non-uniform work...
end
@tasks for i in 1:5
    @set scheduler=:static
    println("i=", i, " → ", threadid())
end
@tasks for i in 1:100
    @set begin
        scheduler=:static
        chunksize=10
    end
    println("i=", i, " → ", threadid())
end
source
OhMyThreads.@setMacro
@set name = value

This can be used inside a @tasks for ... end block to specify settings for the parallel execution of the loop.

Multiple settings are supported, either as separate @set statements or via @set begin ... end.

Settings

  • reducer (e.g. reducer=+): Indicates that a reduction should be performed with the provided binary function. See tmapreduce for more information.
  • collect (e.g. collect=true): Indicates that results should be collected (similar to map).

All other settings will be passed on to the underlying parallel functions (e.g. tmapreduce) as keyword arguments. Hence, you may provide whatever these functions accept as keyword arguments. Among others, this includes

  • scheduler (e.g. scheduler=:static): Can be either a Scheduler or a Symbol (e.g. :dynamic, :static, :serial, or :greedy).
  • init (e.g. init=0.0): Initial value to be used in a reduction (requires reducer=...).

Settings like ntasks, chunksize, and split etc. can be used to tune the scheduling policy (if the selected scheduler supports it).

source
OhMyThreads.@localMacro
@local name = value

@local name::T = value

Can be used inside a @tasks for ... end block to specify task-local values (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations.

There can only be a single @local block in a @tasks for ... end block. To specify multiple TLVs, use @local begin ... end. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other.

Examples

using OhMyThreads: @tasks
using OhMyThreads.Tools: taskid

@tasks for i in 1:10
    @set begin
        scheduler=:dynamic
        ntasks=2
    end
    @local x = zeros(3) # TLV

    x .+= 1
    println(taskid(), " -> ", x)
end
@tasks for i in 1:10
    @local begin
        x = rand(Int, 3)
        M = rand(3, 3)
    end
    # ...
end

Task local variables created by @local are by default constrained to their inferred type, but if you need to, you can specify a different type during declaration:

@tasks for i in 1:10
    @local x::Vector{Float64} = some_hard_to_infer_setup_function()
    # ...
end
source
OhMyThreads.@only_oneMacro
@only_one begin ... end

This can be used inside a @tasks for ... end block to mark a region of code to be executed by only one of the parallel tasks (all other tasks skip over this region).

Example

using OhMyThreads: @tasks

@tasks for i in 1:10
    @set ntasks = 10

    println(i, ": before")
    @only_one begin
        println(i, ": only printed by a single task")
        sleep(1)
    end
    println(i, ": after")
end
source
OhMyThreads.@one_by_oneMacro
@one_by_one begin ... end

This can be used inside a @tasks for ... end block to mark a region of code to be executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary and non-deterministic.

Example

using OhMyThreads: @tasks

@tasks for i in 1:10
    @set ntasks = 10

    println(i, ": before")
    @one_by_one begin
        println(i, ": one task at a time")
        sleep(0.5)
    end
    println(i, ": after")
end
source

Functions

OhMyThreads.tmapreduceFunction
tmapreduce(f, op, A::AbstractArray...;
           [scheduler::Union{Scheduler, Symbol} = :dynamic],
           [outputtype::Type = Any],
           [init])

A multithreaded function like Base.mapreduce. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: tmapreduce

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, tmapreduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.treduceFunction
treduce(op, A::AbstractArray...;
        [scheduler::Union{Scheduler, Symbol} = :dynamic],
        [outputtype::Type = Any],
        [init])

A multithreaded function like Base.reduce. Perform a reduction over A using the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: treduce

treduce(+, [1, 2, 3, 4, 5])

is the parallelized version of sum([1, 2, 3, 4, 5]) in the form

(1 + 2) + (3 + 4) + 5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, treduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tmapFunction
tmap(f, [OutputElementType], A::AbstractArray...;
     [schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.map. Create a new container similar to A and fills it in parallel such that the ith element is equal to f(A[i]).

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

using OhMyThreads: tmap

tmap(sin, 1:10)

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tmap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tmap(sin, 1:10; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tmap!Function
tmap!(f, out, A::AbstractArray...;
      [schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.map!. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i]) for each index i of A and out.

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tmap! accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tforeachFunction
tforeach(f, A::AbstractArray...;
         [schedule::Union{Scheduler, Symbol} = :dynamic]) :: Nothing

A multithreaded function like Base.foreach. Apply f to each element of A on multiple parallel tasks, and return nothing. I.e. it is the parallel equivalent of

for x in A
    f(x)
end

Example:

using OhMyThreads: tforeach

tforeach(1:10) do i
    println(i^2)
end

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tforeach accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tforeach(1:10; chunksize=2, scheduler=:static) do i
    println(i^2)
end

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.tcollectFunction
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
         [schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like Base.collect. Essentially just calls tmap on the generator function and inputs.

The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.

Example:

using OhMyThreads: tcollect

tcollect(sin(i) for i in 1:10)

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.

In addition, tcollect accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source
OhMyThreads.treducemapFunction
treducemap(op, f, A::AbstractArray...;
           [scheduler::Union{Scheduler, Symbol} = :dynamic],
           [outputtype::Type = Any],
           [init])

Like tmapreduce except the order of the f and op arguments are switched. This is sometimes convenient with do-block notation. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.

Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.

Example:

using OhMyThreads: treducemap

treducemap(+, √, [1, 2, 3, 4, 5])

is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form

(√1 + √2) + (√3 + √4) + √5

Keyword arguments:

  • scheduler::Union{Scheduler, Symbol} (default :dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. See Scheduler for more information on the available schedulers.
  • outputtype::Type (default Any): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
  • init: initial value of the reduction. Will be forwarded to mapreduce for the task-local sequential parts of the calculation.

In addition, treducemap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:

treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)

However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).

source

Schedulers

OhMyThreads.Schedulers.DynamicSchedulerType
DynamicScheduler (aka :dynamic)

The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.

Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.

Keyword arguments:

  • nchunks::Integer or ntasks::Integer (default nthreads(threadpool)):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Increasing nchunks can help with load balancing, but at the expense of creating more overhead. For nchunks <= nthreads() there are not enough chunks for any load balancing.
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks/ntasks are mutually exclusive (only one may be a positive integer).
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
    • See ChunkSplitters.jl for more details and available options.
    • Beware that for split=:scatter the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
  • chunking::Bool (default true):
    • Controls whether input elements are grouped into chunks (true) or not (false).
    • For chunking=false, the arguments nchunks/ntasks, chunksize, and split are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
  • threadpool::Symbol (default :default):
    • Possible options are :default and :interactive.
    • The high-priority pool :interactive should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without yielding as it can interfere with heartbeat processes.
source
OhMyThreads.Schedulers.StaticSchedulerType
StaticScheduler (aka :static)

A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).

Can sometimes be more performant than DynamicScheduler when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.

Keyword arguments:

  • nchunks::Integer or ntasks::Integer (default nthreads()):
    • Determines the number of chunks (and thus also the number of parallel tasks).
    • Setting nchunks < nthreads() is an effective way to use only a subset of the available threads.
    • For nchunks > nthreads() the chunks will be distributed to the available threads in a round-robin fashion.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks/ntasks are mutually exclusive (only one may be non-zero).
  • chunking::Bool (default true):
    • Controls whether input elements are grouped into chunks (true) or not (false).
    • For chunking=false, the arguments nchunks/ntasks, chunksize, and split are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
  • split::Symbol (default :batch):
    • Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
    • See ChunkSplitters.jl for more details and available options.
    • Beware that for split=:scatter the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
source
OhMyThreads.Schedulers.GreedySchedulerType
GreedyScheduler (aka :greedy)

A greedy dynamic scheduler. The elements of the collection are first put into a Channel and then dynamic, non-sticky tasks are spawned to process the channel content in parallel.

Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!

Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.

Keyword arguments:

  • ntasks::Int (default nthreads()):
    • Determines the number of parallel tasks to be spawned.
    • Setting ntasks < nthreads() is an effective way to use only a subset of the available threads.
  • chunking::Bool (default false):
    • Controls whether input elements are grouped into chunks (true) or not (false) before put into the channel. This can improve the performance especially if there are many iterations each of which are computationally cheap.
    • If nchunks or chunksize are explicitly specified, chunking will be automatically set to true.
  • nchunks::Integer (default 10 * nthreads()):
    • Determines the number of chunks (that will eventually be put into the channel).
    • Increasing nchunks can help with load balancing. For nchunks <= nthreads() there are not enough chunks for any load balancing.
  • chunksize::Integer (default not set)
    • Specifies the desired chunk size (instead of the number of chunks).
    • The options chunksize and nchunks are mutually exclusive (only one may be a positive integer).
  • split::Symbol (default :scatter):
    • Determines how the collection is divided into chunks (if chunking=true).
    • See ChunkSplitters.jl for more details and available options.
source
OhMyThreads.Schedulers.SerialSchedulerType
SerialScheduler (aka :serial)

A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., tmapreduce(sin, +, 1:100) behave like their serial counterparts, e.g., mapreduce(sin, +, 1:100).

source

Non-Exported

OhMyThreads.@spawnsee StableTasks.jl
OhMyThreads.@spawnatsee StableTasks.jl
OhMyThreads.@fetchsee StableTasks.jl
OhMyThreads.@fetchfromsee StableTasks.jl
OhMyThreads.chunkssee ChunkSplitters.jl
OhMyThreads.TaskLocalValuesee TaskLocalValues.jl
OhMyThreads.WithTaskLocalsType
struct WithTaskLocals{F, TLVs <: Tuple{Vararg{TaskLocalValue}}} <: Function

This callable function-like object is meant to represent a function which closes over some TaskLocalValues. This is, if you do

TLV{T} = TaskLocalValue{T}
f = WithTaskLocals((TLV{Int}(() -> 1), TLV{Int}(() -> 2))) do (x, y)
    z -> (x + y)/z
end

then that is equivalent to

g = let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    z -> let x = x[], y=y[]
        (x + y)/z
    end
end

however, the main difference is that you can call promise_task_local on a WithTaskLocals closure in order to turn it into something equivalent to

let x=x[], y=y[]
    z -> (x + y)/z
end

which doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition.

source
OhMyThreads.promise_task_localFunction
promise_task_local(f) = f
promise_task_local(f::WithTaskLocals) = f.inner_func(map(x -> x[], f.tasklocals))

Take a WithTaskLocals closure, grab the TaskLocalValues, and passs them to the closure. That is, it turns a WithTaskLocals closure from the equivalent of

TLV{T} = TaskLocalValue{T}
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    z -> let x = x[], y=y[]
        (x + y)/z
    end
end

into the equivalent of

let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
    let x = x[], y = y[]
        z -> (x + y)/z
    end
end

which doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition. ```

source