Public API
Exported
Macros
OhMyThreads.@tasks — Macro@tasks for ... endA macro to parallelize a for loop by spawning a set of tasks that can be run in parallel. The policy of how many tasks to spawn and how to distribute the iteration space among the tasks (and more) can be configured via @set statements in the loop body.
Supports reductions (@set reducer=<reducer function>) and collecting the results (@set collect=true).
Under the hood, the for loop is translated into corresponding parallel tforeach, tmapreduce, or tmap calls.
Examples
using OhMyThreads: @tasks@tasks for i in 1:3
println(i)
end@tasks for x in rand(10)
@set reducer=+
sin(x)
end@tasks for i in 1:5
@set collect=true
i^2
end@tasks for i in 1:100
@set ntasks=4*nthreads()
# non-uniform work...
end@tasks for i in 1:5
@set scheduler=:static
println("i=", i, " → ", threadid())
end@tasks for i in 1:100
@set begin
scheduler=:static
chunksize=10
end
println("i=", i, " → ", threadid())
endOhMyThreads.@set — Macro@set name = valueThis can be used inside a @tasks for ... end block to specify settings for the parallel execution of the loop.
Multiple settings are supported, either as separate @set statements or via @set begin ... end.
Settings
reducer(e.g.reducer=+): Indicates that a reduction should be performed with the provided binary function. Seetmapreducefor more information.collect(e.g.collect=true): Indicates that results should be collected (similar tomap).
All other settings will be passed on to the underlying parallel functions (e.g. tmapreduce) as keyword arguments. Hence, you may provide whatever these functions accept as keyword arguments. Among others, this includes
scheduler(e.g.scheduler=:static): Can be either aScheduleror aSymbol(e.g.:dynamic,:static,:serial, or:greedy).init(e.g.init=0.0): Initial value to be used in a reduction (requiresreducer=...).
Settings like ntasks, chunksize, and split etc. can be used to tune the scheduling policy (if the selected scheduler supports it).
Note that the assignment is hoisted above the loop body which means that the scope is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand is a useful tool to inspect the generated code of the @tasks block.)
OhMyThreads.@local — Macro@local name = value
@local name::T = valueCan be used inside a @tasks for ... end block to specify task-local values (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations.
There can only be a single @local block in a @tasks for ... end block. To specify multiple TLVs, use @local begin ... end. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other.
Examples
using OhMyThreads: @tasks
using OhMyThreads.Tools: taskid
@tasks for i in 1:10
@set begin
scheduler=:dynamic
ntasks=2
end
@local x = zeros(3) # TLV
x .+= 1
println(taskid(), " -> ", x)
end@tasks for i in 1:10
@local begin
x = rand(Int, 3)
M = rand(3, 3)
end
# ...
endTask local variables created by @local are by default constrained to their inferred type, but if you need to, you can specify a different type during declaration:
@tasks for i in 1:10
@local x::Vector{Float64} = some_hard_to_infer_setup_function()
# ...
endThe right hand side of the assignment is hoisted outside of the loop body and captured as a closure used to initialize the task local value. This means that the scope of the closure is not the scope of the loop (even though it looks like it) but rather the scope surrounding the loop body. (@macroexpand is a useful tool to inspect the generated code of the @tasks block.)
OhMyThreads.@only_one — Macro@only_one begin ... endThis can be used inside a @tasks for ... end block to mark a region of code to be executed by only one of the parallel tasks (all other tasks skip over this region).
Example
using OhMyThreads: @tasks
@tasks for i in 1:10
@set ntasks = 10
println(i, ": before")
@only_one begin
println(i, ": only printed by a single task")
sleep(1)
end
println(i, ": after")
endOhMyThreads.@one_by_one — Macro@one_by_one begin ... endThis can be used inside a @tasks for ... end block to mark a region of code to be executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary and non-deterministic.
Example
using OhMyThreads: @tasks
@tasks for i in 1:10
@set ntasks = 10
println(i, ": before")
@one_by_one begin
println(i, ": one task at a time")
sleep(0.5)
end
println(i, ": after")
endOhMyThreads.@allow_boxed_captures — Macro@allow_boxed_captures exprBy default, OhMyThreads.jl will detect and error on multithreaded code which references local variables which are 'boxed' – something that happens if the variable could be re-bound in multiple scopes. This process can cause very sublte bugs in multithreaded code by creating silent race conditions, e.g.
let
function wrong()
tmap(1:10) do i
A = i # define A for the first time (lexically)
sleep(rand()/10)
A # user is trying to reference local A only
end
end
@show wrong()
A = 1 # boxed! this hoists "A" to the same variable as in `wrong` but presumably the user wanted a new one
endIn this example, you might expect to get [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], but you would actually observe incorrect results because A is 'boxed'. The fix for this would be to write something like
let
function right()
tmap(1:10) do i
local A = i
sleep(rand()/10)
A
end
end
@show right()
A = 1
endHowever, if you are really sure you want to bypass OhMyThreads's error mechanism, you can use @allow_boxed_captures to wrap code you believe is okay, e.g.
julia> let A = 1
@allow_boxed_captures tmap(1:10) do i
A = i
sleep(rand()/10)
A # race condition!
end
end
10-element Vector{Int64}:
4
2
7
2
2
8
6
8
7
2This is a dynamically scoped construct, so this effect will apply to all nested code inside of expr.
See also @disallow_boxed_captures
OhMyThreads.@disallow_boxed_captures — Macro@disallow_boxed_captures exprDisable the effect of @allow_boxed_captures for any code in expr.
This is a dynamically scoped construct, so this effect will apply to all nested code inside of expr.
See also @disallow_boxed_captures
OhMyThreads.@localize — Macro@localize args... expr
Writing
@localize x y z expris equivalent to writing
let x=x, y=y, z=z
expr
endThis is useful for avoiding the boxing of captured variables when working with closures.
See https://juliafolds2.github.io/OhMyThreads.jl/stable/literate/boxing/boxing/ for more information about boxed variables.
Functions
OhMyThreads.tmapreduce — Functiontmapreduce(f, op, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])A multithreaded function like Base.mapreduce. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.
Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: tmapreduce
tmapreduce(√, +, [1, 2, 3, 4, 5])is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form
(√1 + √2) + (√3 + √4) + √5Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.outputtype::Type(defaultAny): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init: initial value of the reduction. Will be forwarded tomapreducefor the task-local sequential parts of the calculation.
In addition, tmapreduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.treduce — Functiontreduce(op, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])A multithreaded function like Base.reduce. Perform a reduction over A using the two-argument function op.
Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: treduce
treduce(+, [1, 2, 3, 4, 5])is the parallelized version of sum([1, 2, 3, 4, 5]) in the form
(1 + 2) + (3 + 4) + 5Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.outputtype::Type(defaultAny): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init: initial value of the reduction. Will be forwarded tomapreducefor the task-local sequential parts of the calculation.
In addition, treduce accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.tmap — Functiontmap(f, [OutputElementType], A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic])A multithreaded function like Base.map. Create a new container similar to A and fills it in parallel such that the ith element is equal to f(A[i]).
The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.
Example:
using OhMyThreads: tmap
tmap(sin, 1:10)Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.
In addition, tmap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
tmap(sin, 1:10; chunksize=2, scheduler=:static)However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.tmap! — Functiontmap!(f, out, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic])A multithreaded function like Base.map!. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i]) for each index i of A and out.
Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.
In addition, tmap! accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.tforeach — Functiontforeach(f, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic]) :: NothingA multithreaded function like Base.foreach. Apply f to each element of A on multiple parallel tasks, and return nothing. I.e. it is the parallel equivalent of
for x in A
f(x)
endExample:
using OhMyThreads: tforeach
tforeach(1:10) do i
println(i^2)
endKeyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.
In addition, tforeach accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
tforeach(1:10; chunksize=2, scheduler=:static) do i
println(i^2)
endHowever, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.tcollect — Functiontcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
[scheduler::Union{Scheduler, Symbol} = :dynamic])A multithreaded function like Base.collect. Essentially just calls tmap on the generator function and inputs.
The optional argument OutputElementType will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType is not specified.
Example:
using OhMyThreads: tcollect
tcollect(sin(i) for i in 1:10)Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.
In addition, tcollect accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static)However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
OhMyThreads.treducemap — Functiontreducemap(op, f, A::AbstractArray...;
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])Like tmapreduce except the order of the f and op arguments are switched. This is sometimes convenient with do-block notation. Perform a reduction over A, applying a single-argument function f to each element, and then combining them with the two-argument function op.
Note that op must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c). If op is not (approximately) associative, you will get undefined results.
Example:
using OhMyThreads: treducemap
treducemap(+, √, [1, 2, 3, 4, 5])is the parallelized version of sum(√, [1, 2, 3, 4, 5]) in the form
(√1 + √2) + (√3 + √4) + √5Keyword arguments:
scheduler::Union{Scheduler, Symbol}(default:dynamic): determines how the computation is divided into parallel tasks and how these are scheduled. SeeSchedulerfor more information on the available schedulers.outputtype::Type(defaultAny): will work as the asserted output type of parallel calculations. We use StableTasks.jl to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.init: initial value of the reduction. Will be forwarded tomapreducefor the task-local sequential parts of the calculation.
In addition, treducemap accepts all keyword arguments that are supported by the selected scheduler. They will simply be passed on to the corresponding Scheduler constructor. Example:
treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)However, to avoid ambiguity, this is currently only supported for scheduler::Symbol (but not for scheduler::Scheduler).
Schedulers
OhMyThreads.Schedulers.Scheduler — TypeSupertype for all available schedulers:
DynamicScheduler: default dynamic schedulerStaticScheduler: low-overhead static schedulerGreedyScheduler: greedy load-balancing schedulerSerialScheduler: serial (non-parallel) execution
OhMyThreads.Schedulers.DynamicScheduler — TypeDynamicScheduler (aka :dynamic)The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, they can migrate between threads.
Generally preferred since it is flexible, can provide load balancing, and is composable with other multithreaded code.
Keyword arguments:
nchunks::Integerorntasks::Integer(defaultnthreads(threadpool)):- Determines the number of chunks (and thus also the number of parallel tasks).
- Increasing
nchunkscan help with load balancing, but at the expense of creating more overhead. Fornchunks <= nthreads()there are not enough chunks for any load balancing. - Setting
nchunks < nthreads()is an effective way to use only a subset of the available threads.
chunksize::Integer(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksizeandnchunks/ntasksare mutually exclusive (only one may be a positive integer).
minchunksize::Union{Integer, Nothing}(defaultnothing)- Sets a lower bound on the size of chunks. This argument takes priority over
nchunks, sotreduce(+, 1:10; nchunks=10, minchunksize=5)will only operate on2chunks for example.
- Sets a lower bound on the size of chunks. This argument takes priority over
split::Union{Symbol, OhMyThreads.Split}(defaultOhMyThreads.Consecutive()):- Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutivein place ofConsecutive(), and:roundrobinin place ofRoundRobin() - Beware that for
split=OhMyThreads.RoundRobin()the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
chunking::Bool(defaulttrue):- Controls whether input elements are grouped into chunks (
true) or not (false). - For
chunking=false, the argumentsnchunks/ntasks,chunksize, andsplitare ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
- Controls whether input elements are grouped into chunks (
threadpool::Symbol(default:default):- Possible options are
:defaultand:interactive. - The high-priority pool
:interactiveshould be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyielding as it can interfere with heartbeat processes.
- Possible options are
OhMyThreads.Schedulers.StaticScheduler — TypeStaticScheduler (aka :static)A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made sticky, that is, they are guaranteed to stay on the assigned threads (no task migration).
Can sometimes be more performant than DynamicScheduler when the workload is (close to) uniform and, because of the lower overhead, for small workloads. Isn't well composable with other multithreaded code though.
Keyword arguments:
nchunks::Integerorntasks::Integer(defaultnthreads()):- Determines the number of chunks (and thus also the number of parallel tasks).
- Setting
nchunks < nthreads()is an effective way to use only a subset of the available threads. - For
nchunks > nthreads()the chunks will be distributed to the available threads in a round-robin fashion.
chunksize::Integer(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksizeandnchunks/ntasksare mutually exclusive (only one may be non-zero).
minchunksize::Union{Integer, Nothing}(defaultnothing)- Sets a lower bound on the size of chunks. This argument takes priority over
nchunks, sotreduce(+, 1:10; nchunks=10, minchunksize=5)will only operate on2chunks for example.
- Sets a lower bound on the size of chunks. This argument takes priority over
chunking::Bool(defaulttrue):- Controls whether input elements are grouped into chunks (
true) or not (false). - For
chunking=false, the argumentsnchunks/ntasks,chunksize, andsplitare ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this might spawn many(!) tasks and can be costly!
- Controls whether input elements are grouped into chunks (
split::Union{Symbol, OhMyThreads.Split}(defaultOhMyThreads.Consecutive()):- Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutivein place ofConsecutive(), and:roundrobinin place ofRoundRobin() - Beware that for
split=OhMyThreads.RoundRobin()the order of elements isn't maintained and a reducer function must not only be associative but also commutative!
OhMyThreads.Schedulers.GreedyScheduler — TypeGreedyScheduler (aka :greedy)A greedy dynamic scheduler. The elements are put into a shared workqueue and dynamic, non-sticky, tasks are spawned to process the elements of the queue with each task taking a new element from the queue as soon as the previous one is done.
Note that elements are processed in a non-deterministic order, and thus a potential reducing function must be commutative in addition to being associative, or you could get incorrect results!
Can be good choice for load-balancing slower, uneven computations, but does carry some additional overhead.
Keyword arguments:
ntasks::Int(defaultnthreads()):- Determines the number of parallel tasks to be spawned.
- Setting
ntasks < nthreads()is an effective way to use only a subset of the available threads.
chunking::Bool(defaultfalse):- Controls whether input elements are grouped into chunks (
true) or not (false) before put into the shared workqueue. This can improve the performance especially if there are many iterations each of which are computationally cheap. - If
nchunksorchunksizeare explicitly specified,chunkingwill be automatically set totrue.
- Controls whether input elements are grouped into chunks (
nchunks::Integer(default10 * nthreads()):- Determines the number of chunks (that will eventually be put into the shared workqueue).
- Increasing
nchunkscan help with load balancing. Fornchunks <= nthreads()there are not enough chunks for any load balancing.
chunksize::Integer(default not set)- Specifies the desired chunk size (instead of the number of chunks).
- The options
chunksizeandnchunksare mutually exclusive (only one may be a positive integer).
minchunksize::Union{Integer, Nothing}(defaultnothing)- Sets a lower bound on the size of chunks. This argument takes priority over
nchunks, sotreduce(+, 1:10; nchunks=10, minchunksize=5)will only operate on2chunks for example.
- Sets a lower bound on the size of chunks. This argument takes priority over
split::Union{Symbol, OhMyThreads.Split}(defaultOhMyThreads.RoundRobin()):- Determines how the collection is divided into chunks (if chunking=true).
- See ChunkSplitters.jl for more details and available options. We also allow users to pass
:consecutivein place ofConsecutive(), and:roundrobinin place ofRoundRobin()
OhMyThreads.Schedulers.SerialScheduler — TypeSerialScheduler (aka :serial)A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., tmapreduce(sin, +, 1:100) behave like their serial counterparts, e.g., mapreduce(sin, +, 1:100).
Re-exported
OhMyThreads.chunks | see ChunkSplitters.chunks |
OhMyThreads.index_chunks | see ChunkSplitters.index_chunks |
Public but not exported
OhMyThreads.@spawn | see StableTasks.@spawn |
OhMyThreads.@spawnat | see StableTasks.@spawnat |
OhMyThreads.@fetch | see StableTasks.@fetch |
OhMyThreads.@fetchfrom | see StableTasks.@fetchfrom |
OhMyThreads.TaskLocalValue | see TaskLocalValues.TaskLocalValue |
OhMyThreads.Split | see ChunkSplitters.Split |
OhMyThreads.Consecutive | see ChunkSplitters.Consecutive |
OhMyThreads.RoundRobin | see ChunkSplitters.RoundRobin |
OhMyThreads.WithTaskLocals — Typestruct WithTaskLocals{F, TLVs <: Tuple{Vararg{TaskLocalValue}}} <: FunctionThis callable function-like object is meant to represent a function which closes over some TaskLocalValues. This is, if you do
TLV{T} = TaskLocalValue{T}
f = WithTaskLocals((TLV{Int}(() -> 1), TLV{Int}(() -> 2))) do (x, y)
z -> (x + y)/z
endthen that is equivalent to
g = let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
z -> let x = x[], y=y[]
(x + y)/z
end
endhowever, the main difference is that you can call promise_task_local on a WithTaskLocals closure in order to turn it into something equivalent to
let x=x[], y=y[]
z -> (x + y)/z
endwhich doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition.
OhMyThreads.promise_task_local — Functionpromise_task_local(f) = f
promise_task_local(f::WithTaskLocals) = f.inner_func(map(x -> x[], f.tasklocals))Take a WithTaskLocals closure, grab the TaskLocalValues, and passs them to the closure. That is, it turns a WithTaskLocals closure from the equivalent of
TLV{T} = TaskLocalValue{T}
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
z -> let x = x[], y=y[]
(x + y)/z
end
endinto the equivalent of
let x = TLV{Int}(() -> 1), y = TLV{Int}(() -> 2)
let x = x[], y = y[]
z -> (x + y)/z
end
endwhich doesn't have the overhead of accessing the task_local_storage each time the closure is called. This of course will lose the safety advantages of TaskLocalValue, so you should never do f_local = promise_task_local(f) and then pass f_local to some unknown function, because if that unknown function calls f_local on a new task, you'll hit a race condition. ```
OhMyThreads.ChannelLike — TypeChannelLike(itr)This struct wraps an indexable object such that it can be iterated by concurrent tasks in a safe manner similar to a Channel.
ChannelLike(itr) is conceptually similar to:
Channel{eltype(itr)}(length(itr)) do ch
foreach(i -> put!(ch, i), itr)
endi.e. creating a channel, put!ing all elements of itr into it and closing it. The advantage is that ChannelLike doesn't copy the data.
Examples
ch = OhMyThreads.ChannelLike(1:5)
@sync for taskid in 1:2
Threads.@spawn begin
for i in ch
println("Task #$taskid processing item $i")
sleep(1 / i)
end
end
end
# output
Task #1 processing item 1
Task #2 processing item 2
Task #2 processing item 3
Task #2 processing item 4
Task #1 processing item 5Note that ChannelLike is stateful (just like a Channel), so you can't iterate over it twice.
The wrapped iterator must support firstindex(itr)::Int, lastindex(itr)::Int and getindex(itr, ::Int).