Transducers and Transducible processes
BangBang.append!!Base.:∘Base.adjointBase.append!Base.collectBase.copyBase.copy!Base.foldlBase.foreachBase.map!Base.mapfoldlBase.mapreduceCompositionsBase.:⨟Transducers.ZipTransducers.append_unordered!Transducers.channel_unorderedTransducers.dcollectTransducers.dcopyTransducers.dtransduceTransducers.eductionTransducers.foldxdTransducers.foldxlTransducers.foldxtTransducers.ifunreducedTransducers.reducedTransducers.reducingfunctionTransducers.rightTransducers.setinputTransducers.tcollectTransducers.tcopyTransducers.transduceTransducers.unreducedTransducers.whencombineTransducers.whencompleteTransducers.wheninitTransducers.whenstartTransducers.withprogressBase.ChannelTransducers.AdHocFoldableTransducers.BroadcastingTransducers.CatTransducers.CompletingTransducers.ConsecutiveTransducers.CopyInitTransducers.CountTransducers.DedupeTransducers.DistributedExTransducers.DropTransducers.DropLastTransducers.DropWhileTransducers.EnumerateTransducers.FilterTransducers.FlagFirstTransducers.GetIndexTransducers.GroupByTransducers.InjectTransducers.InterposeTransducers.IteratedTransducers.KeepSomethingTransducers.MapTransducers.MapCatTransducers.MapSplatTransducers.NondeterministicThreadingTransducers.NotATransducers.OfTypeTransducers.OnInitTransducers.PartitionTransducers.PartitionByTransducers.PreferParallelTransducers.ProductRFTransducers.ReduceIfTransducers.ReducePartitionByTransducers.ReducedTransducers.ReplaceTransducers.ScanTransducers.ScanEmitTransducers.SequentialExTransducers.SetIndexTransducers.SplitByTransducers.TCatTransducers.TakeTransducers.TakeLastTransducers.TakeNthTransducers.TakeWhileTransducers.TeeRFTransducers.ThreadedExTransducers.TransducerTransducers.TransducerTransducers.UniqueTransducers.ZipSource
Transducible processes
Transducers.foldxl — Functionfoldxl(step, xf::Transducer, reducible; init, simd) :: T
foldxl(step, reducible; init, simd) :: T
foldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: T
transduce(xf, step, init, reducible, [executor]; simd) :: Union{T, Reduced{T}}eXtended left fold. Compose transducer xf with reducing step function step and reduce itr using it.
transduce differs from foldxl as Reduced{T} is returned if the transducer xf or step aborts the reduction and step is not automatically wrapped by Completing.
This API is modeled after transduce in Clojure.
For parallel versions, see foldxt and foldxd.
See also: Empty result handling.
Arguments
xf::Transducer: A transducer.step: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it withCompletingto "add" 1-argument form (i.e.,completeprotocol).reducible: A reducible object (array, dictionary, any iterator, etc.).executor: Specify an executor. SeeSequentialEx.init: An initial value fed to the first argument to reducing step functionstep. This argument can be omitted for well know binary operations like+or*. Supported binary operations are listed in InitialValues.jl documentation. WhenInit(not the result ofInit, such asInit(*)) is given, it is automatically "instantiated" asInit(step)(wherestepis appropriately unwrapped ifstepis aCompleting). See Empty result handling in the manual for more information.simd: Iftrueor:ivdep, enable SIMD usingBase.@simd. If:ivdep, use@simd ivdep for ... endvariant. Read Julia manual ofBase.@simdto understand when it is appropriate to use this option. For example,simd = :ivdepmust not be used with stateful transducer likeScan. Iffalse(default),Base.@simdis not used.
Examples
julia> using Transducers
julia> foldl(Filter(isodd), 1:4, init=0.0) do state, input
@show state, input
state + input
end
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
4.0
julia> foldxl(+, 1:5 |> Filter(isodd))
9
julia> 1:5 |> Filter(isodd) |> foldxl(+)
9Since TeeRF requires the extended fold protocol, foldl(TeeRF(min, max), [5, 2, 6, 8, 3]) does not work while it works with foldxl:
julia> foldxl(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)The unary method of foldlx is useful when combined with |>:
julia> (1:5, 4:-1:1) |> Cat() |> Filter(isodd) |> Enumerate() |> foldxl() do a, b
a[2] < b[2] ? b : a
end
(3, 5)Transducers.transduce — Functiontransduce(xf, step, init, reducible) :: Union{T, Reduced{T}}See foldxl.
Base.foldl — Functionfoldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: TSee foldxl.
Base.foreach — Functionforeach(eff, xf::Transducer, reducible; simd)
foreach(eff, ed::Eduction; simd)Feed the results of xf processing items in reducible into a unary function eff. This is useful when the primary computation at the bottom is the side-effect. It is also equivalent to foreach(eff, eduction(xf, coll)). Note that
foreach(eduction(xf, coll)) do x
...
endcan be more efficient than
for x in eduction(xf, coll)
...
endas the former does not have to translate the transducer protocol to the iterator protocol.
foreach supports all constructs in the native for loop as well as the enhancements [julia_issue_22891] to break with a value (break D(x) below) and append the else clause (E(x) below).
This native for loop
ans = for x in xs
A(x)
B(x) && break
C(x) && break D(x)
else
E(x)
endcan be written as
ans = foreach(Map(identity), xs) do x
A(x)
B(x) && return reduced()
C(x) && return reduced(D(x))
x # required for passing `x` to `E(x)` below
end |> ifunreduced() do x
E(x)
endSee: mapfoldl, reduced, ifunreduced.
foreach is changed to return what the do block (eff function) returns as-is in version 0.3. This was required for supporting "for-else" (|> ifunreduced). Previously, it only supported break-with-value and always applied unreduced before it returns.
Examples
julia> using Transducers
julia> foreach(eduction(Filter(isodd), 1:4)) do input
@show input
end
input = 1
input = 3
3
julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
@show input
if iseven(input)
return reduced()
end
end
input = 1
input = 2
Reduced(nothing)It is often useful to append |> unreduced to unwrap Reduced in the final result (note that |> here is the standard function application, not the transducer composition).
julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
reduced("got $input")
end |> unreduced
"got 1"Combination of break-with-value and for-else is useful for triggering action after (e.g.) some kind of membership testing failed:
julia> has2(xs) = foreach(Filter(!ismissing), xs) do input
input == 2 && reduced(true)
end |> ifunreduced() do input
@show input
false
end;
julia> has2([1, missing, 2, 3])
true
julia> has2([1, missing])
input = false
falseHowever, note the output input = false in the last example. This is because how && works in Julia
julia> false && "otherwise"
falseThus, pure membership testing functions like has2 above can be written in a more concise manner:
julia> simpler_has2(xs) = foreach(Filter(!ismissing), xs) do input
input == 2 && reduced(true)
end |> unreduced;
julia> simpler_has2([1, missing, 2, 3])
true
julia> simpler_has2([1, missing])
falseTransducers.foldxt — Functionfoldxt(step, xf, reducible; [init, simd, basesize, stoppable, nestlevel]) :: TeXtended threaded fold (reduce). This is a multi-threaded reduce based on extended fold protocol defined in Transducers.jl.
The "bottom" reduction function step(::T, ::T) :: T must be associative and init must be its identity element.
Transducers composing xf must be stateless (e.g., Map, Filter, Cat, etc.) except for ScanEmit. Note that Scan is not supported (although possible in theory). Early termination requires Julia ≥ 1.3.
Use tcollect or tcopy to collect results into a container.
See also: Parallel processing tutorial, foldxl, foldxd.
Keyword Arguments
basesize::Integer = amount(reducible) ÷ nthreads(): A size of chunk inreduciblethat is processed by each worker. A smaller size may be required when:stoppable::Bool: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction withreducedhas a slight overhead ifreducedis not used. This mode can be disabled by passingstoppable = false. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.nestlevel::Union{Integer,Val}: Specify how many innerCat(flatten) transducers to be multi-threaded (usingTCat). It must be a positive integer,Valof positive integer, orVal(:inf).Val(:inf)means to use multi-threading for allCattransducers. Note thatCattransducer should be statically known. That is to say, the fold implementation sees twoCats in... |> Map(f) |> Cat() |> Cat()but only oneCatin... |> Map(x -> f(x) |> Cat()) |> Cat()even though they are semantically identical.For other keyword arguments, see
foldl.
Examples
julia> using Transducers
julia> foldxt(+, 1:3 |> Map(exp) |> Map(log))
6.0
julia> using BangBang: append!!
julia> foldxt(append!!, Map(x -> 1:x), 1:2; basesize=1, init=Union{}[])
3-element Vector{Int64}:
1
1
2
julia> 1:5 |> Filter(isodd) |> foldxt(+)
9
julia> foldxt(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)Transducers.foldxd — Functionfoldxd(step, xform::Transducer, array; [init, simd, basesize, threads_basesize, pool])eXtended distributed fold (reduce). This is a distributed reduce based on extended fold protocol defined in Transducers.jl.
Unlike foldxt, early termination by reduced is not supported yet.
Use dcollect or dcopy to collect results into a container.
See also: Parallel processing tutorial, foldxl, foldxt.
Keyword Arguments
pool::AbstractWorkerPool: Passed toDistributed.remotecall.basesize::Integer = amount(array) ÷ nworkers(): A size of chunk inarraythat is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.threads_basesize::Integer = basesize ÷ nthreads(): A size of chunk inarraythat is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smallerthreads_basesizeandbasesizeto get a good performance.For other keyword arguments, see
foldl.
Examples
julia> using Transducers
julia> foldxd(+, 1:3 |> Map(exp) |> Map(log))
6.0
julia> 1:5 |> Filter(isodd) |> foldxd(+)
9
julia> foldxd(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)Transducers.dtransduce — Functiondtransduce(xform::Transducer, step, init, array; [simd, basesize, threads_basesize, pool])Transducers.eduction — Functioneduction(xf::Transducer, coll)
xf(coll)
coll |> xfCreate a iterable and reducible object.
This API is modeled after eduction in Clojure.
Even though eduction returns an iterable, it is highly recommended to use the foldl-based method provided by Transducers.jl when the performance is important.
Examples
julia> using Transducers
julia> for x in 1:1000 |> Filter(isodd) |> Take(3) # slow
@show x
end
x = 1
x = 3
x = 5
julia> foreach(1:1000 |> Filter(isodd) |> Take(3)) do x # better
@show x
end;
x = 1
x = 3
x = 5eduction(iterator::Iterators.Generator)
eduction(iterator::Iterators.Filter)
eduction(iterator::Iterators.Flatten)Convert an iterator to an eduction. The iterators that are typically used in the generator comprehensions are supported.
Examples
julia> using Transducers
julia> iter = (y for x in 1:10 if x % 2 == 0 for y in (x, x + 1));
julia> ed = eduction(iter);
julia> collect(iter) == collect(ed)
trueBase.map! — Functionmap!(xf::Transducer, dest, src; simd)Feed src to transducer xf, storing the result in dest. Collections dest and src must have the same shape. Transducer xf may contain filtering transducers. If some entries src are skipped, the corresponding entries in dest will be unchanged. Transducer xf must not contain any expansive transducers such as MapCat.
See also copy!.
Examples
julia> using Transducers
julia> xs = collect(1:5)
ys = zero(xs)
map!(Filter(isodd), ys, xs)
5-element Vector{Int64}:
1
0
3
0
5
julia> ans === ys
trueBase.copy! — Functioncopy!(xf::Transducer, dest, src)Feed src to transducer xf, storing the result in dest. Collections dest and src may have the same shape. Source src must be iterable. Destination dest must implement empty! and push!.
See also map!.
Examples
julia> using Transducers
julia> copy!(opcompose(PartitionBy(x -> x ÷ 3), Map(sum)), Int[], 1:10)
4-element Vector{Int64}:
3
12
21
19Base.copy — Functioncopy(xf::Transducer, T, foldable) :: Union{T, Empty{T}}
copy(xf::Transducer, foldable::T) :: Union{T, Empty{T}}
copy([T,] eduction::Eduction) :: Union{T, Empty{T}}Process foldable with a transducer xf and then create a container of type T filled with the result. Return BangBang.Empty{T} if the transducer does not produce anything. (This is because there is no consistent interface to create an empty container given its type and not all containers support creating an empty container.)
For parallel versions, see tcopy and dcopy.
Examples
julia> using Transducers
using BangBang: Empty
julia> copy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
2 => 4
julia> @assert copy(Filter(_ -> false), Set, 1:1) === Empty(Set)
julia> using TypedTables
julia> @assert copy(Map(x -> (a=x, b=x^2)), Table, 1:1) == Table(a=[1], b=[1])
julia> using StructArrays
julia> @assert copy(Map(x -> (a=x, b=x^2)), StructVector, 1:1) == StructVector(a=[1], b=[1])
julia> using DataFrames
julia> @assert copy(
Map(x -> (A = x.a + 1, B = x.b + 1)),
DataFrame(a = [1], b = [2]),
) == DataFrame(A = [2], B = [3])Transducers.tcopy — Functiontcopy(xf::Transducer, T, reducible; basesize) :: Union{T, Empty{T}}
tcopy(xf::Transducer, reducible::T; basesize) :: Union{T, Empty{T}}
tcopy([T,] itr; basesize) :: Union{T, Empty{T}}Thread-based parallel version of copy. Keyword arguments are passed to foldxt.
See also: Parallel processing tutorial (especially Example: parallel collect).
Examples
julia> using Transducers
julia> tcopy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
2 => 4
julia> using TypedTables
julia> @assert tcopy(Map(x -> (a=x,)), Table, 1:1) == Table(a=[1])
julia> using StructArrays
julia> @assert tcopy(Map(x -> (a=x,)), StructVector, 1:1) == StructVector(a=[1])tcopy works with iterator comprehensions and eductions (unlike copy, there is no need for manual conversion with eduction):
julia> table = StructVector(a = [1, 2, 3], b = [5, 6, 7]);
julia> @assert tcopy(
(A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
) == StructVector(A = [2, 4], B = [4, 6])
julia> @assert tcopy(
DataFrame,
(A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
) == DataFrame(A = [2, 4], B = [4, 6])
julia> @assert table |>
Filter(row -> isodd(row.a)) |> Map(row -> (A = row.a + 1, B = row.b - 1)) |>
tcopy == StructVector(A = [2, 4], B = [4, 6])If you have Cat or MapCat at the end of the transducer, consider using foldxt directly:
julia> using Transducers
using DataFrames
julia> @assert tcopy(
DataFrame,
1:2 |> Map(x -> DataFrame(a = [x])) |> MapCat(eachrow);
basesize = 1,
) == DataFrame(a = [1, 2])
julia> using BangBang: Empty, append!!
julia> @assert foldxt(
append!!,
Map(x -> DataFrame(a = [x])),
1:2;
basesize = 1,
# init = Empty(DataFrame),
) == DataFrame(a = [1, 2])Note that above snippet assumes that it is OK to mutate the dataframe returned by the transducer. Use init = Empty(DataFrame) if this is not the case.
This approach of using foldxt works with other containers; e.g., with TypedTables.Table:
julia> using TypedTables
julia> @assert foldxt(
append!!,
Map(x -> Table(a = [x])),
1:2;
basesize = 1,
# init = Empty(Table),
) == Table(a = [1, 2])Transducers.dcopy — Functiondcopy(xf::Transducer, T, reducible; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy(xf::Transducer, reducible::T; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy([T,] itr; [basesize, threads_basesize]) :: Union{T, Empty{T}}Distributed.jl-based parallel version of copy. Keyword arguments are passed to foldxd. For examples, see tcopy.
See also: Parallel processing tutorial (especially Example: parallel collect).
Base.append! — Functionappend!(xf::Transducer, dest, src) -> destThis API is modeled after into in Clojure.
The performance of append!(dest, src::Eduction) is poor. Use append!! instead if two-argument form is preferred.
Examples
julia> using Transducers
julia> append!(Drop(2), [-1, -2], 1:5)
5-element Vector{Int64}:
-1
-2
3
4
5BangBang.append!! — FunctionBangBang.append!!(xf::Transducer, dest, src) -> dest′
BangBang.append!!(dest, src::Eduction) -> dest′Mutate-or-widen version of append!.
Performance optimization for append!!(dest, src::Eduction) requires version 0.4.37.
Examples
julia> using Transducers, BangBang
julia> append!!(opcompose(Drop(2), Map(x -> x + 0.0)), [-1, -2], 1:5)
5-element Vector{Float64}:
-1.0
-2.0
3.0
4.0
5.0Base.collect — Functioncollect(xf::Transducer, itr) :: Vector
collect(ed::Eduction) :: VectorProcess an iterable itr using a transducer xf and collect the result into a Vector.
For parallel versions, see tcollect and dcollect.
Examples
julia> using Transducers
julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
1
missing
2
missing
3Transducers.tcollect — Functiontcollect(xf::Transducer, reducible; basesize) :: Union{Vector, Empty{Vector}}
tcollect(itr; basesize) :: Union{Vector, Empty{Vector}}Thread-based parallel version of collect. This is just a short-hand notation of tcopy(xf, Vector, reducible). Use tcopy to get a container other than a Vector.
See also: Parallel processing tutorial (especially Example: parallel collect).
Examples
julia> using Transducers
julia> tcollect(Map(x -> x^2), 1:2)
2-element Vector{Int64}:
1
4
julia> tcollect(x^2 for x in 1:2)
2-element Vector{Int64}:
1
4Transducers.dcollect — Functiondcollect(xf::Transducer, reducible; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}
dcollect(itr; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}Distributed.jl-based parallel version of collect. This is just a short-hand notation of dcopy(xf, Vector, reducible). Use dcopy to get a container other than a Vector.
See also: Parallel processing tutorial (especially Example: parallel collect).
Base.Channel — TypeChannel(xf::Transducer, itr; kwargs...)
Channel(ed::Eduction; kwargs...)Pipe items from an iterable itr processed by the transducer xf through a channel. Channel(xf, itr) and Channel(eduction(xf, itr)) are equivalent. Note that itr itself can be a Channel.
Keyword arguments are passed to Channel(function; kwargs...).
Examples
julia> using Transducers
julia> ch1 = Channel(Filter(isodd), 1:5);
julia> ch2 = Channel(Map(x -> 2x - 1), ch1);
julia> ed = eduction(Map(x -> 1:x), ch2);
julia> ch3 = Channel(Cat(), ed);
julia> foreach(PartitionBy(isequal(1)), ch3) do input
@show input
end;
input = [1, 1]
input = [2, 3, 4, 5]
input = [1]
input = [2, 3, 4, 5, 6, 7, 8, 9]Experimental transducible processes
Transducers.channel_unordered — Functionchannel_unordered(xf, input; eltype, size, ntasks, basesize) :: Channel{eltype}
channel_unordered(itr; eltype, size, ntasks, basesize) :: Channel{eltype}Provide elements in input processed by a transducer xf through a Channel.
Unary method channel_unordered(itr) produces a Channel that provides elements in the input iterator itr with possibly different order. Iterator comprehensions and eductions can be passed as the input itr.
Use append_unordered! to send outputs to an existing channel.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Keyword Arguments
eltype::Type: element type of returned channelsize: The size ofChannel. A non-negativeIntorInf.ntasks::Int: Number of concurrent tasks. Default toThreads.nthreads().basesize: The "batch size" of the processing; i.e., the number of elements to be processed in a single task. Default to 1.
Examples
julia> using Transducers: Map, channel_unordered
julia> sort!(collect(channel_unordered(Map(x -> x + 1), 1:3)))
3-element Vector{Any}:
2
3
4
julia> sort!(collect(channel_unordered(x + 1 for x in 1:3 if isodd(x))))
2-element Vector{Any}:
2
4Transducers.append_unordered! — Functionappend_unordered!(output, xf, input; ntasks, basesize)
append_unordered!(output, itr; ntasks, basesize)Process input elements through a transducer xf and then push! them into output in undefined order.
Binary method append_unordered!(output, itr) is like append!(output, itr) but without order guarantee. Iterator comprehensions and eductions can be passed as the input itr.
output (typically a Channel) must implement thread-safe push!(output, x) method.
See also channel_unordered.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers: Map, append_unordered!
julia> input = Channel(Map(identity), 1:3);
julia> output = Channel{Int}(0);
julia> task = @async try
append_unordered!(output, Map(x -> x + 1), input)
finally
close(output)
end;
julia> sort!(collect(output))
3-element Vector{Int64}:
2
3
4
julia> input = Channel(Map(identity), 1:3);
julia> output = Channel{Int}(0);
julia> task = @async try
append_unordered!(output, (y for x in input if isodd(x) for y in 1:x))
finally
close(output)
end;
julia> sort!(collect(output))
4-element Vector{Int64}:
1
1
2
3Transducers
Transducers.Transducer — TypeTransducerThe abstract type for transducers.
A transducer xf can be used as both iterator transformation xf(itr) and reducing function transformation xf'(rf).
See also adjoint for xf'(rf).
The call overload xf(rf) requires Julia 1.3 or later. For older Julia versions, use eduction.
Examples
julia> using Transducers
julia> xs = Map(inv)(2:2:4)
2-element StepRange{Int64, Int64} |>
Map(inv)
julia> collect(xs)
2-element Vector{Float64}:
0.5
0.25
julia> rf = Map(inv)'(+)
Reduction(
Map(inv),
BottomRF(
+))
julia> rf(1, 4) # +(1, inv(4))
1.25CompositionsBase.:⨟ — Functiong ⨟ f
opcompose(g, f)The opposite composition operator defined as
g ⨟ f = f ∘ g
⨟(f) = f
⨟(fs...) = ∘(reverse(fs)...)Base.:∘ — Functionf ⨟ g
g ∘ f
opcompose(f, g)
compose(g, f)Composition of transducers.
Base.adjoint — Functionxf'xf'(rf₁) is a shortcut for calling reducingfunction(xf, rf₁).
More precisely, adjoint xf′ of a transducer xf is a reducing function transform rf₁ -> rf₂. That is to say, xf' a function that maps a reducing function rf₁ to another reducing function rf₂.
Examples
julia> using Transducers
julia> y = Map(inv)'(+)(10, 2)
10.5
julia> y == +(10, inv(2))
trueTransducers.Broadcasting — TypeBroadcasting()Broadcast inner reducing function over elements in the input. Roughly speaking, it transforms the inner reducing function op to op′(a, b) = op.(a, b). However, it has a better memory usage and better initial value handling.
If the input is an array, the array created at the first iteration is reused if it can hold the element types of subsequent iterations. Otherwise, the array type is widen as needed.
If init passed to the fold function is a lazy "initializer" object such as OnInit, it is initialized independently for each item in the first input array. This makes using Broadcasting for (possibly) in-place functions safe.
Examples
julia> using Transducers
julia> foldl(+, Broadcasting(), [[1, 2], [3, 4], 5])
2-element Vector{Int64}:
9
11
julia> foldl(+, Broadcasting(), [(0,), [1], [2.0], [3 + 0im]])
1-element Vector{ComplexF64}:
6.0 + 0.0im
julia> foldl(
*,
[[[1], [10, 100]], [[2], [20, 200]], [[3], [30, 300]]] |>
Broadcasting() |> Broadcasting(),
)
2-element Vector{Vector{Int64}}:
[6]
[6000, 6000000]When processing nested data structure (e.g., array-of-arrays) and mutating accumulator in-place, be careful with sharing the accumulators with each processing of items in the input. For example, this is a bad example:
julia> add!(a, b) = a .+= b;
julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
init = Ref([0, 0]))
2-element Vector{Vector{Int64}}:
[13, 15]
[13, 15]
julia> ans[1] === ans[2] # they are the same object
trueUse OnInit to initialize a new array with each item in the input:
julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
init = OnInit(() -> [0, 0]))
2-element Vector{Vector{Int64}}:
[5, 6]
[8, 9]
julia> ans == [
add!(add!([0, 0], [1]), [4, 5]),
add!(add!([0, 0], [2, 3]), 6),
]
trueTransducers.Cat — TypeCat()Concatenate/flatten nested iterators.
This API is modeled after cat in Clojure.
Examples
julia> using Transducers
julia> collect(Cat(), [[1, 2], [3], [4, 5]]) == 1:5
trueTransducers.Consecutive — TypeConsecutive(size, step = size)
Consecutive(size; step = size)Sliding window of width size and interval step. Yield tuples.
This transducer is like Partition but feeds tuples to the downstream transducers instead of vectors.
If step == 1, this transducer supports parallel reduction for any collections; i.e., Consecutive(size, 1)'(op) is associative if op is associative.
Currently, in parallel folds, Consecutive(size, 1) cannot be used with reducing functions that can produce a Reduced.
If step > 1, this transducer can, in principle, support parallel reduction if the input collection allows random access (e.g., arrays). However, this feature is not implemented yet.
Examples
julia> using Transducers
julia> 1:8 |> Consecutive(3) |> collect
2-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 2, 3)
(4, 5, 6)
julia> 1:8 |> Consecutive(3; step = 1) |> collect
6-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
(6, 7, 8)Transducers.Count — TypeCount([start[, step]])Generate a sequence start, start + step, start + step + step, and so on.
Note that input is ignored. To use the input in the downstream reduction steps, use Zip.
start defaults to 1 and step defaults to oneunit(start).
See also: Iterators.countfrom. Enumerate
Examples
julia> using Transducers
julia> collect(Zip(Map(identity), Count()), -3:-1)
3-element Vector{Tuple{Int64, Int64}}:
(-3, 1)
(-2, 2)
(-1, 3)
julia> using Dates
julia> 1:3 |> Zip(Map(identity), Count(Day(1))) |> MapSplat(*) |> collect ==
[Day(1), Day(4), Day(9)]
trueTransducers.Dedupe — TypeDedupe()
Dedupe(eq)De-duplicate consecutive items. Comparison operator which identifies duplicates can be specified by the eq parameter, which defaults to == (equal).
This API is modeled after dedupe in Clojure.
Examples
julia> using Transducers
julia> collect(Dedupe(), [1, 1, 2, 1, 3, 3, 2])
5-element Vector{Int64}:
1
2
1
3
2Transducers.Drop — TypeDrop(n)Drop first n items.
This API is modeled after drop in Clojure.
Examples
julia> using Transducers
julia> 1:5 |> Drop(3) |> collect
2-element Vector{Int64}:
4
5Transducers.DropLast — TypeDropLast(n)Drop last n items.
This API is modeled after drop-last in Clojure.
Examples
julia> using Transducers
julia> 1:5 |> DropLast(2) |> collect
3-element Vector{Int64}:
1
2
3
julia> 1:1 |> DropLast(2) |> collect == []
true
julia> 1:0 |> DropLast(2) |> collect == []
trueTransducers.DropWhile — TypeDropWhile(pred)Drop items while pred returns true consecutively. It becomes a no-op after pred returns a false.
This API is modeled after drop-while in Clojure.
Examples
julia> using Transducers
julia> collect(DropWhile(x -> x < 3), [1:5; 1:2])
5-element Vector{Int64}:
3
4
5
1
2Transducers.Enumerate — TypeEnumerate([start[, step]])Transducer variant of Base.enumerate. The start and step arguments are optional and have the same meaning as in Count.
Examples
julia> using Transducers
julia> collect(Enumerate(), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
(1, "A")
(2, "B")
(3, "C")
julia> start=2; step=3;
julia> collect(Enumerate(start, step), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
(2, "A")
(5, "B")
(8, "C")
Transducers.Filter — TypeFilter(pred)Skip items for which pred is evaluated to false.
This API is modeled after filter in Clojure.
Examples
julia> using Transducers
julia> 1:3 |> Filter(iseven) |> collect
1-element Vector{Int64}:
2Transducers.FlagFirst — TypeFlagFirst()Output (isfirst, input) where isfirst::Bool is true only for the first iteration and input is the original input.
See also: IterTools.flagfirst
Examples
julia> using Transducers
julia> collect(FlagFirst(), 1:3)
3-element Vector{Tuple{Bool, Int64}}:
(1, 1)
(0, 2)
(0, 3)Transducers.GroupBy — TypeGroupBy(key, xf::Transducer, [step = right, [init]])
GroupBy(key, rf, [init])Group the input stream by a function key and then fan-out each group of key-value pairs to the eduction xf'(step). This is similar to the groupby relational database operation.
For example
[1,2,1,2,3] |> GroupBy(string, Map(last)'(+)) |> foldxl(right)returns a result equivalent to Dict("1"=>2, "2"=>4, "3"=>3) while
[1,2,1,2,3] |> GroupBy(string, Map(last) ⨟ Map(Transducers.SingletonVector), append!!) |> foldxl(right)returns a result equivalent to Dict("1"=>[1,1], "2"=>[2,2], "3"=>[3,3]).
Alternatively, one can provide a reducing function directly, though this is disfavored since it prevents results from being combined with Transducers.combine and therefore cannot be used with foldxt or foldxd. For example, if GroupBy is used as in:
xs |> Map(upstream) |> GroupBy(key, rf, init) |> Map(downstream)then the function signatures would be:
upstream(_) :: V
key(::V) :: K
rf(::Y, ::Pair{K, V}) ::Y
downstream(::Dict{K, Y})That is to say,
Ouput of the
upstreamis fed into the functionkeythat produces the group key (of typeK).For each new group key, a new transducible process is started with the initial state
init :: Y. PassOnInitorCopyInitobject toinitfor creating a dedicated (possibly mutable) state for each group.After one "nested" reducing function
rfis called, the intermediate result dictionary (of typeDict{K, Y}) accumulating the current and all preceding results is then fed into thedownstream.
See also groupreduce in SplitApplyCombine.jl.
Examples
julia> [1,2,3,4] |> GroupBy(iseven, Map(last)'(+)) |> foldxl(right)
Transducers.GroupByViewDict{Bool,Int64,…}(...):
0 => 4
1 => 6julia> using Transducers: SingletonDict;
julia> using BangBang; # for merge!!
julia> x = [(a="A", b=1, c=1), (a="B", b=2, c=2), (a="A", b=3, c=3)];
julia> inner = Map(last) ⨟ Map() do ξ
SingletonDict(ξ.b => ξ.c)
end;
julia> x |> GroupBy(ξ -> ξ.a, inner, merge!!) |> foldxl(right)
Transducers.GroupByViewDict{String,Dict{Int64, Int64},…}(...):
"B" => Dict(2=>2)
"A" => Dict(3=>3, 1=>1)Note that the reduction stops if one of the group returns a reduced. This can be used, for example, to find if there is a group with a sum grater than 3 and stop the computation as soon as it is find:
julia> result = transduce(
GroupBy(
string,
Map(last) ⨟ Scan(+) ⨟ ReduceIf(x -> x > 3),
),
right,
nothing,
[1, 2, 1, 2, 3],
);
julia> result isa Reduced
true
julia> unreduced(result)
Transducers.GroupByViewDict{String,Any,…}(...):
"1" => 2
"2" => 4Transducers.Interpose — TypeInterpose(sep)Interleave input items with a sep.
This API is modeled after interpose in Clojure.
Examples
julia> using Transducers
julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
1
missing
2
missing
3Transducers.Iterated — TypeIterated(f, init)Generate a sequence init, f(init), f(f(init)), f(f(f(init))), and so on.
Note that input is ignored. To use the input in the downstream reduction steps, use Zip.
Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.
The idea is taken from IterTools.iterated
Examples
julia> using Transducers
julia> collect(Iterated(x -> 2x, 1), 1:5)
5-element Vector{Int64}:
1
2
4
8
16
julia> collect(Zip(Map(identity), Iterated(x -> 2x, 1)), 1:5)
5-element Vector{Tuple{Int64, Int64}}:
(1, 1)
(2, 2)
(3, 4)
(4, 8)
(5, 16)Transducers.KeepSomething — TypeKeepSomething(f = identity)Pass non-nothing output of f to the inner reducing step after possibly unwrapping Some.
This API is modeled after keep in Clojure.
Examples
julia> using Transducers
julia> xf = KeepSomething() do x
if x == 0
:zero
elseif x == 1
Some(:one)
elseif x == 2
Some(nothing)
end
end;
julia> collect(xf, 0:3)
3-element Vector{Union{Nothing, Symbol}}:
:zero
:one
nothingNote that NotA(Nothing) can be used to avoid automatically unwrapping Some:
julia> [Some(1), 2, nothing] |> KeepSomething() |> collect
2-element Vector{Int64}:
1
2
julia> [Some(1), 2, nothing] |> NotA(Nothing) |> collect
2-element Vector{Any}:
Some(1)
2Transducers.Map — TypeMap(f)Apply unary function f to each input and pass the result to the inner reducing step.
This API is modeled after map in Clojure.
Examples
julia> using Transducers
julia> collect(Map(x -> 2x), 1:3)
3-element Vector{Int64}:
2
4
6Transducers.MapCat — TypeMapCat(f)Concatenate output of f which is expected to return an iterable.
This API is modeled after mapcat in Clojure.
Examples
julia> using Transducers
julia> collect(MapCat(x -> 1:x), 1:3)
6-element Vector{Int64}:
1
1
2
1
2
3Transducers.MapSplat — TypeMapSplat(f)Like Map(f) but calls f(input...) for each input and then pass the result to the inner reducing step.
Examples
julia> using Transducers
julia> collect(MapSplat(*), zip(1:3, 10:10:30))
3-element Vector{Int64}:
10
40
90Transducers.NondeterministicThreading — TypeNondeterministicThreading(; basesize, ntasks = nthreads())Parallelize inner reducing function using ntasks.
Given
# ,-- Not parallelized
# ______/__
foldxl(rf, xs |> xfo |> NondeterministicThreading() |> xfi)
# == ===
# Parallelized Parallelizedthe inner transducer xfi and the reducing function rf are parallelized but the outer transducer xfo and the iteration over data collection xs are not parallelized.
The scheduling of the tasks (hence the call tree of the inner reducing function) is non-deterministic. It means that the result is deterministic only if the inner reducing function is exactly associative. If the inner reducing function is only approximately associative (e.g., addition of floating point numbers), the result of reduction is likely different for each time.
The outer transducers and iterate are processed sequentially. In particular, the data collection does not have to implement SplittablesBase.halve.
Currently, the default basesize is 1. However, it may be changed in the future (e.g. it may be automatically tuned at run-time).
Keyword Arguments
basesize::Integer: The number of input elements to be accumulated in a buffer before sent to a task.ntasks::Integer: The number of tasks@spawned. The default value isThreads.nthreads(). A number larger thanThreads.nthreads()may be useful if the inner reducing function contains I/O and does not consume too much resource (e.g., memory).
Examples
julia> using Transducers
julia> collect(1:4 |> Filter(isodd))
2-element Vector{Int64}:
1
3
julia> collect(1:4 |> NondeterministicThreading() |> Filter(isodd))
2-element Vector{Int64}:
1
3Transducers.NotA — TypeNotA(T)Skip items of type T. Unlike Filter(!ismissing), downstream transducers can have a correct type information for NotA(Missing).
See also: OfType
Examples
julia> using Transducers
julia> [1, missing, 2] |> NotA(Missing) |> collect
2-element Vector{Int64}:
1
2Transducers.OfType — TypeOfType(T)Include only items of type T.
See also: NotA
Examples
julia> using Transducers
julia> [1, missing, 2] |> OfType(Int) |> collect
2-element Vector{Int64}:
1
2Transducers.Partition — TypePartition(size, step = size, flush = false)
Partition(size; step = size, flush = false)Sliding window of width size and interval step. Yield vectors.
Note: step = size is the default. Hence, the default behavior is non-overlapping windows.
For small size, see Consecutive for a similar transducer that yields tuples instead.
The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.
This API is modeled after partition-all in Clojure.
Examples
julia> using Transducers
julia> 1:8 |> Partition(3) |> Map(copy) |> collect
2-element Vector{Vector{Int64}}:
[1, 2, 3]
[4, 5, 6]
julia> 1:8 |> Partition(3; flush=true) |> Map(copy) |> collect
3-element Vector{Vector{Int64}}:
[1, 2, 3]
[4, 5, 6]
[7, 8]
julia> 1:8 |> Partition(3; step=1) |> Map(copy) |> collect
6-element Vector{Vector{Int64}}:
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]Transducers.PartitionBy — TypePartitionBy(f)Group input sequence into chunks in which f returns a same value consecutively.
The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.
This API is modeled after partition-by in Clojure.
Examples
julia> using Transducers
julia> 1:9 |> PartitionBy(x -> (x + 1) ÷ 3) |> Map(copy) |> collect
4-element Vector{UnitRange{Int64}}:
1:1
2:4
5:7
8:9Transducers.ReduceIf — TypeReduceIf(pred)Stop fold when pred(x) returns true for the output x of the upstream transducer.
Examples
julia> using Transducers
julia> foldl(right, ReduceIf(x -> x == 3), 1:10)
3Transducers.ReducePartitionBy — TypeReducePartitionBy(f, rf, [init])Reduce partitions determined by isequal on the output value of f with an associative reducing function rf. Partitions are reduced on-the-fly and no intermediate arrays are allocated.
Examples
Consider the input 1:6 "keyed" by a function x -> x ÷ 3:
julia> map(x -> x ÷ 3, 1:6)
6-element Vector{Int64}:
0
0
1
1
1
2i.e., there are three partitions with the key values 0, 1, and 2. We can use ReducePartitionBy to compute, e.g., the length and the sum of each partition by:
julia> using Transducers
julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, Map(_ -> 1)'(+)) |> collect
3-element Vector{Int64}:
2
3
1
julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, +) |> collect
3-element Vector{Int64}:
3
12
6Transducers.Replace — TypeReplace(assoc)Replace each input with the value in the associative container assoc (e.g., a dictionary, array, string) if it matches with a key/index. Otherwise output the input as-is.
This API is modeled after replace in Clojure.
Examples
julia> using Transducers
julia> collect(Replace(Dict('a' => 'A')), "abc")
3-element Vector{Char}:
'A': ASCII/Unicode U+0041 (category Lu: Letter, uppercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
julia> collect(Replace([:a, :b, :c]), 0:4)
5-element Vector{Any}:
0
:a
:b
:c
4
julia> collect(Replace("abc"), 0:4)
5-element Vector{Any}:
0
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
4Transducers.Scan — TypeScan(f, [init = Init])Accumulate input with binary function f and pass the accumulated result so far to the inner reduction step.
The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ... when the sequence x₁, x₂, x₃, ..., xₙ, ... is fed to Scan(f).
y₁ = f(init, x₁)
y₂ = f(y₁, x₂)
y₃ = f(y₂, x₃)
...
yₙ = f(yₙ₋₁, xₙ)This is a generalized version of the prefix sum also known as cumulative sum, inclusive scan, or scan.
Note that the associativity of f is not required when the transducer is used in a process that guarantee an order, such as foldl.
Unless f is a function with known identity element such as +, *, min, max, and append!, the initial state init must be provided.
Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.
Examples
julia> using Transducers
julia> collect(Scan(*), 1:3)
3-element Vector{Int64}:
1
2
6
julia> 1:3 |> Map(x -> x + im) |> Scan(*) |> collect
3-element Vector{Complex{Int64}}:
1 + 1im
1 + 3im
0 + 10im
julia> collect(Scan(*, 10), 1:3)
3-element Vector{Int64}:
10
20
60Transducers.ScanEmit — TypeScanEmit(f, init[, onlast])Accumulate input x with a function f with the call signature (u, x) -> (y, u) and pass the result y to the inner reduction step.
The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ... computed as follows
u₀ = init
y₁, u₁ = f(u₀, x₁)
y₂, u₂ = f(u₁, x₂)
y₃, u₃ = f(u₂, x₃)
...
yₙ, uₙ = f(uₙ₋₁, xₙ)
...
yₒₒ = onlast(uₒₒ)when the sequence x₁, x₂, x₃, ..., xₙ, ... is fed to ScanEmit(f).
Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.
Examples
julia> using Transducers
julia> collect(ScanEmit(tuple, 0), 1:3)
3-element Vector{Int64}:
0
1
2Transducers.TCat — TypeTCat(basesize::Integer)Threaded version of Cat (concatenate/flatten).
To use this transducer, all the downstream (inner) transducers must be stateless (or of type ScanEmit) and the reducing function must be associative. See also: Parallel processing tutorial.
Note that the upstream (outer) transducers need not to be stateless as long as it is called with non-parallel reduction such as foldl and collect.
Examples
julia> using Transducers
julia> 1:3 |> Map(x -> 1:x) |> TCat(1) |> tcollect
6-element Vector{Int64}:
1
1
2
1
2
3
julia> 1:3 |> Scan(+) |> Map(x -> 1:x) |> TCat(1) |> collect
10-element Vector{Int64}:
1
1
2
3
1
2
3
4
5
6Transducers.Take — TypeTake(n)Take n items from the input sequence.
This API is modeled after take in Clojure.
Examples
julia> using Transducers
julia> 1:10 |> Take(2) |> collect
2-element Vector{Int64}:
1
2
julia> 1:2 |> Take(5) |> collect
2-element Vector{Int64}:
1
2Using a low-level API, it's possible to distinguish if the output is truncated or not:
julia> transduce(Take(3), Completing(push!!), Init, 1:2)
2-element Vector{Int64}:
1
2
julia> transduce(Take(3), Completing(push!!), Init, 1:4)
Reduced([1, 2, 3])
julia> transduce(Take(3), Completing(push!!), Init, 1:0)
InitialValue(push!!)See also transduce, Reduced and Completing.
Transducers.TakeLast — TypeTakeLast(n)Take last n items from the input sequence.
Examples
julia> using Transducers
julia> 1:10 |> TakeLast(2) |> collect
2-element Vector{Int64}:
9
10
julia> 1:2 |> TakeLast(5) |> collect
2-element Vector{Int64}:
1
2Transducers.TakeNth — TypeTakeNth(n)Output every n item to the inner reducing step.
This API is modeled after take-nth in Clojure.
Examples
julia> using Transducers
julia> 1:9 |> TakeNth(3) |> collect
3-element Vector{Int64}:
1
4
7Transducers.TakeWhile — TypeTakeWhile(pred)Take items while pred returns true. Abort the reduction when pred returns false for the first time.
This API is modeled after take-while in Clojure.
Examples
julia> using Transducers
julia> [1, 2, 3, 1, 2] |> TakeWhile(x -> x < 3) |> collect
2-element Vector{Int64}:
1
2Transducers.Unique — TypeUnique(by = identity)Pass only unseen item to the inner reducing step.
The item is distinguished by the output of function by when given.
This API is modeled after distinct in Clojure.
Examples
julia> using Transducers
julia> [1, 1, 2, -1, 3, 3, 2] |> Unique() |> collect
4-element Vector{Int64}:
1
2
-1
3
julia> [1, 1, 2, -1, 3, 3, 2] |> Unique(x -> x^2) |> collect
3-element Vector{Int64}:
1
2
3Transducers.Zip — MethodZip(xforms...)Zip outputs of transducers xforms in a tuple and pass it to the inner reduction step.
Head transducers drive tail transducers. Be careful when using it with transducers other than Map, especially the contractive ones like PartitionBy and the expansive ones like MapCat.
Examples
julia> using Transducers
julia> collect(Zip(Map(identity), Map(x -> 10x), Map(x -> 100x)), 1:3)
3-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 10, 100)
(2, 20, 200)
(3, 30, 300)Experimental transducers
Transducers.ZipSource — TypeZipSource(xform::Transducer)Branch input into two "flows", inject one into xform and then merge (zip) the output of xform with the original (source) input.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
To illustrate how it works, consider the following usage
collection |> xf0 |> ZipSource(xf1) |> xf2where xf0, xf1, and xf2 are some transducers. Schematically, the output yn from xfn flows as follows:
xf0 xf1 xf2
---- y0 ------ y1 ---.-- (y0, y1) ----->
| |
`-------------'Examples
julia> using Transducers
using Transducers: ZipSource
julia> collect(ZipSource(opcompose(Filter(isodd), Map(x -> x + 1))), 1:5)
3-element Vector{Tuple{Int64, Int64}}:
(1, 2)
(3, 4)
(5, 6)Transducers.GetIndex — TypeGetIndex(array)
GetIndex{inbounds}(array)Transform an integer input i to array[i].
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: GetIndex
julia> collect(GetIndex('a':'z'), [2, 3, 4])
3-element Vector{Char}:
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)
julia> collect(GetIndex{true}('a':'z'), [2, 3, 4]) # Inbounds
3-element Vector{Char}:
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)Transducers.SetIndex — TypeSetIndex(array)
SetIndex{inbounds}(array)Perform array[i] = v for each input pair (i, v).
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: SetIndex
julia> ys = zeros(3);
julia> foldl(first ∘ tuple, SetIndex(ys), [(1, 11.1), (3, 33.3)], init=nothing)
julia> ys
3-element Vector{Float64}:
11.1
0.0
33.3Transducers.Inject — TypeInject(iterator)Inject the output from iterator to the stream processed by the inner reduction step.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: Inject
julia> collect(Inject(Iterators.cycle("hello")), 1:8)
8-element Vector{Tuple{Int64, Char}}:
(1, 'h')
(2, 'e')
(3, 'l')
(4, 'l')
(5, 'o')
(6, 'h')
(7, 'e')
(8, 'l')
julia> collect(Inject(Iterators.repeated([1 2])), 1:4)
4-element Vector{Tuple{Int64, Matrix{Int64}}}:
(1, [1 2])
(2, [1 2])
(3, [1 2])
(4, [1 2])
julia> collect(Inject(Iterators.product(1:2, 3:5)), 1:100)
6-element Vector{Tuple{Int64, Tuple{Int64, Int64}}}:
(1, (1, 3))
(2, (2, 3))
(3, (1, 4))
(4, (2, 4))
(5, (1, 5))
(6, (2, 5))Other reducing function combinators
Transducers.TeeRF — TypeTeeRF(reducing_functions::Tuple)
TeeRF(reducing_functions...)Combine multiple reducing functions into a new reducing function that "multicast" the input to multiple reducing functions.
Roughly speaking, TeeRF(op₁, op₂, ..., opₙ) is equivalent to
((a₁, a₂, ..., aₙ), x) -> (op₁(a₁, x), op₂(a₂, x), ..., opₙ(aₙ, x))For combine, it behaves like ProductRF.
Examples
julia> using Transducers
julia> extrema′(xs, xf = Map(identity)) = foldl(TeeRF(min, max), xf, xs);
julia> extrema′([5, 2, 6, 8, 3])
(2, 8)Note that the input is considered empty unless all reducing functions call their bottom reducing functions. Specify init to obtain results even when the input collection is empty or all filtered out.
julia> filtering_max = Filter(isodd)'(max);
julia> foldl(TeeRF(min, filtering_max), Map(identity), [5, 2, 6, 8, 3])
(2, 5)
julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8)
ERROR: EmptyResultError: ...
julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8; init = Init)
(2, InitialValue(max))Transducers.ProductRF — TypeProductRF(reducing_functions::Tuple)
ProductRF(reducing_functions...)Combine N reducing functions into a new reducing function that work on N-tuple. The i-th reducing function receives the i-th element of the input tuple.
Roughly speaking, ProductRF(op₁, op₂, ..., opₙ) is equivalent to
((a₁, a₂, ..., aₙ), (b₁, b₂, ..., bₙ)) -> (op₁(a₁, b₁), op₂(a₂, b₂), ..., opₙ(aₙ, bₙ))Examples
Like TeeRF, ProductRF can be used to drive multiple reducing functions. ProductRF is more "low-level" in the sense that TeeRF can be defined in terms of ProductRF (other direction is much harder):
julia> using Transducers
julia> TeeRF′(fs...) = reducingfunction(
Map(x -> ntuple(_ -> x, length(fs))),
ProductRF(fs...),
);
julia> foldl(TeeRF′(min, max), Map(identity), [5, 2, 6, 8, 3])
(2, 8)ProductRF may be useful for handling pre-existing stream whose item type is already a tuple:
julia> foldl(ProductRF(&, +), Map(x -> (isodd(x), x)), [5, 2, 6, 8, 3])
(false, 24)
julia> foldl(TeeRF(reducingfunction(Map(isodd), &), +), Map(identity), [5, 2, 6, 8, 3])
(false, 24)Transducers.wheninit — Functionwheninit(oninit, rf) -> rf′
wheninit(oninit) -> rf -> rf′
whenstart(start, rf) -> rf′
whenstart(start) -> rf -> rf′
whencomplete(complete, rf) -> rf′
whencomplete(complete) -> rf -> rf′
whencombine(combine, rf) -> rf′
whencombine(combine) -> rf -> rf′Add initialization/completion/merging phase to arbitrary reducing function.
The functions passed to those combinators are used as follows in foldl:
init′ = oninit() # if oninit is given; otherwise standard `init`-preprocessing
acc = start(init′)
for x in collection
acc += rf(acc, x)
end
result = acc
return complete(result)In foldxt, a collection is split in multiple parts and then above foldl except for complete is run on them, yielding multiple results which are combined by repeatedly calling combine(result_1, result_2). Note that this allows non-associative function for next while combine must be associative.
See also next, start, complete, and combine.
Arguments
rf: reducing functiononinit: nullary function that generates an initial value forrfstart: unary function that pre-process the initial value forrfcomplete: unary function that post-process the accumulatorcombine: (approximately) associative binary function for combining multiple results ofrf(before post-processed bycomplete).
Extended help
Examples
An example for using non-associative reducing function in foldxt:
julia> using Transducers
julia> collector! = push! |> whencombine(append!) |> wheninit(() -> []);
julia> foldxt(collector!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Any}:
1
3
5More "tightly" typed vector can returned by using BangBang.jl interface:
julia> collector!! = push!! |> whencombine(append!!) |> wheninit(Vector{Union{}});
julia> foldxt(collector!!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Int64}:
1
3
5Online averaging algorithm can be implemented, e.g., by:
julia> averaging = function add_average((sum, count), x)
(sum + x, count + 1)
end |> wheninit() do
(Init(+), 0)
end |> whencombine() do (sum1, count1), (sum2, count2)
(sum1 + sum2), (count1 + count2)
end |> whencomplete() do (sum, count)
sum / count
end;
julia> foldl(averaging, Filter(isodd), 1:5)
3.0
julia> foldxt(averaging, Filter(isodd), 1:50; basesize = 1)
25.0An alternative implementation is to use Map to construct a singleton solution and then merge it into the accumulated solution:
julia> averaging2 = function merge_average((sum1, count1), (sum2, count2))
(sum1 + sum2, count1 + count2)
end |> whencomplete() do (sum, count)
sum / count
end |> Map() do x
(x, 1)
end'; # `'` here is important;
julia> foldl(averaging2, Filter(isodd), 1:5)
3.0
julia> foldxt(averaging2, Filter(isodd), 1:50; basesize = 1)
25.0Transducers.whenstart — FunctionSee wheninit
Transducers.whencomplete — FunctionSee wheninit
Transducers.whencombine — FunctionSee wheninit
Early termination
Transducers.Reduced — TypeReducedThe type signaling transducible processes to abort.
Examples
julia> using Transducers
julia> function step_demo(y, x)
if x > 5
return reduced(y)
else
return y + x
end
end;
julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:10)
Reduced(15)
julia> result isa Reduced
true
julia> unreduced(result)
15
julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:4)
10
julia> result isa Reduced
false
julia> unreduced(result)
10Transducers.reduced — Functionreduced([x = nothing])Stop transducible process with the final value x (default: nothing). Return x as-is if it already is a reduced value.
This API is modeled after ensure-reduced in Clojure.
Examples
julia> using Transducers
julia> foldl(Enumerate(), "abcdef"; init=0) do y, (i, x)
if x == 'd'
return reduced(y)
end
return y + i
end
6
julia> foreach(Enumerate(), "abc") do (i, x)
println(i, ' ', x)
if x == 'b'
return reduced()
end
end;
1 a
2 bTransducers.unreduced — Functionunreduced(x)Unwrap x if it is a Reduced; do nothing otherwise.
This API is modeled after unreduced in Clojure.
Transducers.ifunreduced — Functionifunreduced(f, [x])Equivalent to unreduced(x) if x is a Reduced; otherwise run f(x). Return a curried version if x is not provided.
See: foreach.
Examples
julia> using Transducers
julia> 1 |> ifunreduced() do x
println("called with x = ", x)
end
called with x = 1
julia> reduced(1) |> ifunreduced() do x
println("called with x = ", x)
end
1Notice that nothing is printed in the last example.
Implementation
ifunreduced(f) = x -> ifunreduced(f, x)
ifunreduced(f, x::Reduced) = unreduced(x)
ifunreduced(f, x) = f(x)Executors
Transducers.SequentialEx — TypeSequentialEx(; simd)Sequential fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.
See also: foldxl, ThreadedEx and DistributedEx.
Keyword Arguments
simd: Iftrueor:ivdep, enable SIMD usingBase.@simd. If:ivdep, use@simd ivdep for ... endvariant. Read Julia manual ofBase.@simdto understand when it is appropriate to use this option. For example,simd = :ivdepmust not be used with stateful transducer likeScan. Iffalse(default),Base.@simdis not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, SequentialEx())
6Transducers.ThreadedEx — TypeThreadedEx(; basesize, stoppable, nestlevel, simd)Multi-threaded fold executor. This is the default [1] parallel executor used by Folds.jl and FLoops.jl.
See also: foldxt, SequentialEx and DistributedEx.
Keyword Arguments
basesize::Integer = amount(reducible) ÷ nthreads(): A size of chunk inreduciblethat is processed by each worker. A smaller size may be required when:stoppable::Bool: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction withreducedhas a slight overhead ifreducedis not used. This mode can be disabled by passingstoppable = false. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.nestlevel::Union{Integer,Val}: Specify how many innerCat(flatten) transducers to be multi-threaded (usingTCat). It must be a positive integer,Valof positive integer, orVal(:inf).Val(:inf)means to use multi-threading for allCattransducers. Note thatCattransducer should be statically known. That is to say, the fold implementation sees twoCats in... |> Map(f) |> Cat() |> Cat()but only oneCatin... |> Map(x -> f(x) |> Cat()) |> Cat()even though they are semantically identical.simd: Iftrueor:ivdep, enable SIMD usingBase.@simd. If:ivdep, use@simd ivdep for ... endvariant. Read Julia manual ofBase.@simdto understand when it is appropriate to use this option. For example,simd = :ivdepmust not be used with stateful transducer likeScan. Iffalse(default),Base.@simdis not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, ThreadedEx(basesize = 1))
6Transducers.DistributedEx — TypeDistributedEx(; pool, basesize, threads_basesize, simd)Distributed fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.
See also: foldxd, SequentialEx and ThreadedEx.
Keyword Arguments
pool::AbstractWorkerPool: Passed toDistributed.remotecall.basesize::Integer = amount(array) ÷ nworkers(): A size of chunk inarraythat is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.threads_basesize::Integer = basesize ÷ nthreads(): A size of chunk inarraythat is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smallerthreads_basesizeandbasesizeto get a good performance.simd: Iftrueor:ivdep, enable SIMD usingBase.@simd. If:ivdep, use@simd ivdep for ... endvariant. Read Julia manual ofBase.@simdto understand when it is appropriate to use this option. For example,simd = :ivdepmust not be used with stateful transducer likeScan. Iffalse(default),Base.@simdis not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, DistributedEx())
6Transducers.PreferParallel — TypePreferParallel(; simd, basesize)A "placeholder" executor that indicates preference to parallel execution.
This lets the input data collection decide preferred execution strategy (e.g., CUDAEx for CuArray when FoldsCUDA.jl is available), assuming that the reducing function is associative. The default executor is ThreadedEx. As an optional feature, some input data collections support (e.g., AbstractChannel) automatically demoting the execution strategy to SequentialEx. An error is thrown if the automatic detection fails,
Miscellaneous
Transducers.SplitBy — TypeSplitBy(f; [keepend = false,] [keepempty = false,])Split input collection into chunks delimited by the elements on which f returns true. This can be used to implement parallel and lazy versions of functions like eachline and split.
If keepend is true (or Val(true)), include the "delimiter"/end element at the end of each chunk. If keepempty is true (or Val(true)), include empty chunks. When keepend is true, the value of keepempty is irrelevant since the chunks cannot be empty (i.e., it at least contains the end).
The input collection (xs in SplitBy(...)(xs)) has to support eachindex and view or SubString.
Extended Help
Examples
For demonstration, consider the following input stream and SplitBy(iszero; ...) used with the following options:
input keepend=false keepend=false keepend=true
keepempty=false keepempty=true
1 `. `. `.
2 | y1 | y1 | y1
3 ,' ,' |
0 ,'
1 `. `. `.
2 | y2 | y2 | y2
3 | | |
4 ,' ,' |
0 __ y3 ,'
0 ] y3
1 `. `. `.
2 | y3 | y4 | y4In the above diagram, yi (i = 1, 2, 3, 4) are the output of SplitBy. This can be checked in the REPL easily as follows. (Note: we are using Map(collect) for cleaner printing; it's not required unless the arrays is mutated in the downstream.)
julia> using Transducers
julia> xs = [1, 2, 3, 0, 1, 2, 3, 4, 0, 0, 1, 2]; # input
julia> xs |> SplitBy(iszero) |> Map(collect) |> collect
3-element Vector{Vector{Int64}}:
[1, 2, 3]
[1, 2, 3, 4]
[1, 2]
julia> xs |> SplitBy(iszero; keepempty = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
[1, 2, 3]
[1, 2, 3, 4]
[]
[1, 2]
julia> xs |> SplitBy(iszero; keepend = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
[1, 2, 3, 0]
[1, 2, 3, 4, 0]
[0]
[1, 2]Transducers.Transducer — MethodTransducer(iterator::Iterators.Generator)
Transducer(iterator::Iterators.Filter)
Transducer(iterator::Iterators.Flatten)Extract "processing" part of an iterator as a Transducer. The "data source" iterator (i.e., xs in (f(x) for x in xs)) is ignored and nothing must be used as a place holder (i.e., (f(x) for x in nothing)).
See also eduction.
Examples
julia> using Transducers
julia> xf1 = Transducer(2x for x in nothing if x % 2 == 0);
julia> xf2 = opcompose(Filter(x -> x % 2 == 0), Map(x -> 2x)); # equivalent
julia> xs = 1:10
collect(xf1, xs) == collect(xf2, xs)
trueTransducers.reducingfunction — Functionreducingfunction(xf, step; simd)
xf'(step; simd)Apply transducer xf to the reducing function step to create a new reducing function.
Be careful using reducingfunction with stateful transducers like Scan with mutable init (e.g., Scan(push!, [])). See more in Examples below.
Arguments
xf::Transducer: A transducer.step: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it withCompletingto "add" 1-argument form (i.e.,completeprotocol).
Keyword Arguments
simd:false,true, or:ivdep. Seemaybe_usesimd.
Examples
julia> using Transducers
julia> rf = reducingfunction(Map(x -> x + 1), *);
julia> rf(10, 2) === 10 * (2 + 1)
trueWarning: Be careful when using reducingfunction with stateful transducers
Stateful Transducers themselves in Transducers.jl are not inherently broken with reducingfunction. However, it can produce incorrect results when combined with mutable states:
julia> scan_state = [];
julia> rf_bad = opcompose(Scan(push!, scan_state), Cat())'(string);
julia> transduce(rf_bad, "", 1:3)
"112123"The first run works. However, observe that the vector scan_state is not empty anymore:
julia> scan_state
3-element Vector{Any}:
1
2
3Thus, the second run produces an incorrect result:
julia> transduce(rf_bad, "", 1:3)
"123112312123123"One way to solve this issue is to use CopyInit or OnInit.
julia> scan_state = CopyInit([])
CopyInit(Any[])
julia> rf_good = opcompose(Scan(push!, scan_state), Cat())'(string);
julia> transduce(rf_good, "", 1:3)
"112123"
julia> scan_state
CopyInit(Any[])
julia> transduce(rf_good, "", 1:3)
"112123"Transducers.Completing — TypeCompleting(function)Wrap a function to add a no-op complete protocol. Use it when passing a function without unary method to transduce etc.
This API is modeled after completing in Clojure.
Transducers.Init — ConstantInit
Init(op) :: InitialValues.InitialValueThe canonical initializer; i.e., a singleton placeholder usable for init argument of foldl for binary functions with known initial values.
When init = Init is passed to foldl etc., Init(op) is called for the bottom reducing function op during the start phase. Init(op) returns InitialValue(op) which acts as the canonical initial value of op.
Examples
julia> using Transducers
julia> foldl(+, 1:3 |> Filter(isodd); init = Init)
4
julia> foldl(+, 2:2:4 |> Filter(isodd); init = Init)
InitialValue(+)Extended help
Note that op passed to foldl etc. must be known to InitialValues.jl:
julia> unknown_op(a, b) = a + b;
julia> foldl(unknown_op, 2:2:4 |> Filter(isodd); init = Init)
ERROR: IdentityNotDefinedError: `init = Init` is specified but the identity element `InitialValue(op)` is not defined for
op = unknown_op
[...]InitialValues.asmonoid can be used to wrap a binary function to add ("adjoin") the identity value to its domain:
julia> using InitialValues: asmonoid
julia> foldl(asmonoid(unknown_op), 2:2:4 |> Filter(isodd); init = Init)
InitialValue(::InitialValues.AdjoinIdentity{typeof(unknown_op)})When start(rf, Init) is called with a composite reducing function rf, Init(rf₀) is called for the bottom reducing function rf₀ of rf:
julia> rf = Take(3)'(+); # `+` is the bottom reducing function;
julia> acc = Transducers.start(rf, Init);
julia> Transducers.unwrap(rf, acc)
(3, InitialValue(+))Transducers.OnInit — TypeOnInit(f)Call a callable f to create an initial value.
See also CopyInit.
OnInit or CopyInit must be used whenever using in-place reduction with foldxt etc.
Examples
julia> using Transducers
julia> xf1 = Scan(push!, [])
Scan(push!, Any[])
julia> foldl(right, xf1, 1:3)
3-element Vector{Any}:
1
2
3
julia> xf1
Scan(push!, Any[1, 2, 3])Notice that the array is stored in xf1 and mutated in-place. As a result, second run of foldl contains the results from the first run:
julia> foldl(right, xf1, 10:11)
5-element Vector{Any}:
1
2
3
10
11This may not be desired. To avoid this behavior, create an OnInit object which takes a factory function to create a new initial value.
julia> xf2 = Scan(push!, OnInit(() -> []))
Scan(push!, OnInit(##9#10()))
julia> foldl(right, xf2, 1:3)
3-element Vector{Any}:
1
2
3
julia> foldl(right, xf2, [10.0, 11.0])
2-element Vector{Any}:
10.0
11.0Keyword argument init for transducible processes also accept an OnInit:
julia> foldl(push!, Map(identity), "abc"; init=OnInit(() -> []))
3-element Vector{Any}:
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)To create a copy of a mutable object, CopyInit is easier to use.
However, more powerful and generic pattern is to use push!! from BangBang.jl and initialize init with Union{}[] so that it automatically finds the minimal element type.
julia> using BangBang
julia> foldl(push!!, Map(identity), "abc"; init=Union{}[])
3-element Vector{Char}:
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)Transducers.CopyInit — TypeCopyInit(value)This is equivalent to OnInit(() -> deepcopy(value)).
Examples
julia> using Transducers
julia> init = CopyInit([]);
julia> foldl(push!, Map(identity), 1:3; init=init)
3-element Vector{Any}:
1
2
3
julia> foldl(push!, Map(identity), 1:3; init=init) # `init` can be reused
3-element Vector{Any}:
1
2
3Transducers.right — Functionright([l, ]r) -> rIt is simply defined as
right(l, r) = r
right(r) = rThis function is meant to be used as step argument for foldl etc. for extracting the last output of the transducers.
Initial value must be manually specified. In 0.2, it was automatically set to nothing.
Examples
julia> using Transducers
julia> foldl(right, Take(5), 1:10)
5
julia> foldl(right, Drop(5), 1:3; init=0) # using `init` as the default value
0Transducers.setinput — Functionsetinput(ed::Eduction, coll)Set input collection of eduction ed to coll.
Previously, setinput combined with eduction was a recommended way to use transducers in a type stable manner. As of v0.3, all the foldl-like functions and eduction are type stable for many cases. This workaround is no more necessary.
Examples
julia> using Transducers
julia> ed = eduction(Map(x -> 2x), Float64[]);
julia> xs = ones(2, 3);
julia> foldl(+, setinput(ed, xs))
12.0Transducers.AdHocFoldable — TypeAdHocFoldable(foldl, [collection = nothing])Provide a different way to fold collection without creating a wrapper type.
Arguments
foldl::Function: a function that implements__foldl__.collection: a collection passed to the last argument offoldl.
Examples
julia> using Transducers
using Transducers: @next, complete
using ArgCheck
julia> function uppertriangle(A::AbstractMatrix)
@argcheck !Base.has_offset_axes(A)
return AdHocFoldable(A) do rf, acc, A
for j in 1:size(A, 2), i in 1:min(j, size(A, 1))
acc = @next(rf, acc, @inbounds A[i, j])
end
return complete(rf, acc)
end
end;
julia> A = reshape(1:6, (3, 2))
3×2 reshape(::UnitRange{Int64}, 3, 2) with eltype Int64:
1 4
2 5
3 6
julia> collect(Map(identity), uppertriangle(A))
3-element Vector{Int64}:
1
4
5
julia> function circularwindows(xs::AbstractVector, h::Integer)
@argcheck !Base.has_offset_axes(xs)
@argcheck h >= 0
@argcheck 2 * h + 1 <= length(xs)
return AdHocFoldable(xs) do rf, acc, xs
buffer = similar(xs, 2 * h + 1)
@inbounds for i in 1:h
buffer[1:h - i + 1] .= @view xs[end - h + i:end]
buffer[h - i + 2:end] .= @view xs[1:h + i]
acc = @next(rf, acc, buffer)
end
for i in h + 1:length(xs) - h
acc = @next(rf, acc, @inbounds @view xs[i - h:i + h])
end
@inbounds for i in 1:h
buffer[1:end - i] .= @view xs[end - 2 * h + i:end]
buffer[end - i + 1:end] .= @view xs[1:i]
acc = @next(rf, acc, buffer)
end
return complete(rf, acc)
end
end;
julia> collect(Map(collect), circularwindows(1:9, 2))
9-element Vector{Vector{Int64}}:
[8, 9, 1, 2, 3]
[9, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
[6, 7, 8, 9, 1]
[7, 8, 9, 1, 2]
julia> expressions(str::AbstractString; kwargs...) =
AdHocFoldable(str) do rf, val, str
pos = 1
while true
expr, pos = Meta.parse(str, pos;
raise = false,
depwarn = false,
kwargs...)
expr === nothing && break
val = @next(rf, val, expr)
end
return complete(rf, val)
end;
julia> collect(Map(identity), expressions("""
x = 1
y = 2
"""))
2-element Vector{Expr}:
:(x = 1)
:(y = 2)
julia> counting = AdHocFoldable() do rf, acc, _
i = 0
while true
i += 1
acc = @next(rf, acc, i)
end
end;
julia> foreach(counting) do i
@show i;
i == 3 && return reduced()
end;
i = 1
i = 2
i = 3Transducers.withprogress — Functionwithprogress(foldable) -> foldable′Wrap a foldable so that progress is shown in logging-based progress meter (e.g., Juno) during foldl, foldxt, foldxd, etc.
For parallel reduction such as foldxt and foldxd, reasonably small basesize and threads_basesize (for foldxd) must be used to ensure that progress information is updated frequently. However, it may slow down the computation if basesize is too small.
Keyword Arguments
interval::Real: Minimum interval (in seconds) for how often progress is logged.
Examples
julia> using Transducers
julia> xf = Map() do x
sleep(0.01)
x
end;
julia> foldl(+, xf, withprogress(1:100; interval=1e-3)) # see progress meter
5050In foldl and foldxt, withprogress can be nested. This is not supported in foldxd.
julia> xf = opcompose(
MapCat() do x
withprogress(1:x; interval=1e-3) # nested progress
end,
Map() do x
sleep(0.5)
x
end,
);
julia> foldxt(+, xf, withprogress(1:10; interval=1e-3); basesize=1)
220Base.mapfoldl — Functionmapfoldl(xf::Transducer, step, reducible; init, simd)mapfoldl(::Transducer, rf, itr) is deprecated. Use foldl(rf, ::Transducer, itr) if you do not need to call single-argument rf on complete. Use foldl(whencomplete(rf, rf), ::Transducer, itr) to call the single-argument method of rf on complete.
Like foldl but step is not automatically wrapped by Completing.
Examples
julia> using Transducers
julia> function step_demo(state, input)
@show state, input
state + input
end;
julia> function step_demo(state)
println("Finishing with state = ", state)
state
end;
julia> mapfoldl(Filter(isodd), step_demo, 1:4, init=0.0)
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
Finishing with state = 4.0
4.0Base.mapreduce — Functionmapreduce(xf, step, reducible; init, simd)mapreduce(::Transducer, rf, itr) is deprecated. Use foldxt(rf, ::Transducer, itr) if you do not need to call single-argument rf on complete. Use foldxt(whencomplete(rf, rf), ::Transducer, itr) to call the single-argument method of rf on complete.
Like foldxt but step is not automatically wrapped by Completing.
- julia_issue_22891See also: break with value + loop else clauses (JuliaLang/julia#22891)
- 1More specifically, Folds.jl and FLoops.jl uses
PreferParallelwhich in turn defaults toThreadedEx.