Splitting a string into words and counting them in parallel

We start from the parallel algorithm presented in Guy Steele's 2009 ICFP talk (video). It splits a space-separated string into list of strings (words). The repeating theme in the talk was to build "singleton solutions" and then merge them together using an associative function. We will follow this guideline and slightly extend the algorithm.

It is highly recommended to just watch the talk for understanding the algorithm. However, we briefly describe how it works.

When a certain contiguous region of a string is processed, we either already have seen at least one space or not. These two states are tracked using following two types. If there is no space so far, we only have a chunk of a possibly larger word (see example below):

struct Chunk
    s::String
end

If there are one or more spaces, (possibly zero) words that are already determined and left/right "chunks" have to be tracked separately:

struct Segment
    l::String
    A::Vector{String}
    r::String
end

Here is an example taken from the talk:

Segment("Here", ["is", "a"], "")
   |
   |       Segment("lian", [], "string")
 __|_____            _|______
|        |          |        |
Here is a sesquipedalian string of words
          |________|          |________|
   Chunk("sesquipeda")        Segment("g", ["of"], "words")

We then need a way to merge two results which can independently in one of the above two states.

⊕(x::Chunk, y::Chunk) = Chunk(x.s * y.s)
⊕(x::Chunk, y::Segment) = Segment(x.s * y.l, y.A, y.r)
⊕(x::Segment, y::Chunk) = Segment(x.l, x.A, x.r * y.s)
⊕(x::Segment, y::Segment) =
    Segment(x.l,
            append!(append!(x.A, maybewordv(x.r * y.l)), y.A),
            y.r)

maybewordv(s::String) = isempty(s) ? String[] : [s]

Input is a sequence of Chars. Each of them has to be converted into a "singleton solution" which can be merged with already aggregated (or another singleton) solution with :

segment_or_chunk(c::Char) = c == ' ' ? Segment("", [], "") : Chunk(string(c))

Putting them together, we get:

function collectwords(s::String)
    g = mapfoldl(segment_or_chunk, ⊕, s; init=Segment("", [], ""))
    if g isa Char
        return maybewordv(g.s)
    else
        return append!(append!(maybewordv(g.l), g.A), maybewordv(g.r))
    end
end

Let's run a few tests covering some edge cases:

using Test
@testset begin
    @test collectwords("This is a sample") == ["This", "is", "a", "sample"]
    @test collectwords(" Here is another sample ") == ["Here", "is", "another", "sample"]
    @test collectwords("JustOneWord") == ["JustOneWord"]
    @test collectwords(" ") == []
    @test collectwords("") == []
end
Test Summary: | Pass  Total  Time
test set      |    5      5  0.2s

String-splitting transducer

Let's make it re-usable by packaging it into a transducer.

Rather than accumulating words into a vector, we are going to write a transducer that "emits" a word as soon as it is ready. The downstream transducer may choose to record everything or only aggregate, e.g., reduced statistics. To this end, we replace Segment in the original algorithm with

struct Vacant
    l::String
    r::String
end

and output the words in the "middle" without accumulating it. So, instead of segment_or_chunk, we now have:

vacant_or_chunk(c::Char) = c == ' ' ? Vacant("", "") : Chunk(string(c))

The idea is to create a custom transducer WordsXF that is used as in

... |> Map(vacant_or_chunk) |> WordsXF() |> Filter(!isnothing) |> ...

so that the whole transducer streams non-empty words to the downstream. That is to say, the input stream is first processed by vacant_or_chunk which returns either a Vacant or a Chunk. This is processed by WordsXF() which outputs either a word (a String) or nothing. We are using Filter(!isnothing) in the downstream to simplify the definition of WordsXF.

We define a function extract(x::Union{Chunk,Vacant}, y::Union{Chunk,Vacant}) -> (output, state). It is something like but works with Chunk and Vacant:

extract(x::Chunk, y::Chunk) = nothing, Chunk(x.s * y.s)
extract(x::Chunk, y::Vacant) = nothing, Vacant(x.s * y.l, y.r)
extract(x::Vacant, y::Chunk) = nothing, Vacant(x.l, x.r * y.s)
extract(x::Vacant, y::Vacant) = maybeword(x.r * y.l), Vacant(x.l, y.r)

maybeword(s) = isempty(s) ? nothing : s

Let's wrap this in a Transducer.

using Transducers
using Transducers:
    @next, R_, Transducer, combine, complete, inner, next, start, unwrap, wrap, wrapping

First, we declare a transducer type:

struct WordsXF <: Transducer end

Since this transducer has to keep "unfinished" words as its own private state, we use wrap inside start to prepare the state for it:

Transducers.start(rf::R_{WordsXF}, init) = wrap(rf, Chunk(""), start(inner(rf), init))

Inside of next (i.e., "loop body") we call extract defined above to combine the input x::Union{Chunk,Vacant} into state::Union{Chunk,Vacant}. If extract returns a word, it is passed to the inner reducing function:

function Transducers.next(rf::R_{WordsXF}, acc, x)
    wrapping(rf, acc) do state, iacc
        word, state = extract(state, x)
        iacc = next(inner(rf), iacc, word)
        return state, iacc
    end
end

At the end of a fold, complete is called. We can process unfinished words at this stage. Note that we need to use combine of the inner reducing function (assuming it is associative) to "prepend" a word to the accumulated state of the inner reducing function.

function Transducers.complete(rf::R_{WordsXF}, acc)
    state, iacc = unwrap(rf, acc)
    if state isa Vacant
        pre = @next(inner(rf), start(inner(rf), Init), maybeword(state.l))
        iacc = combine(inner(rf), pre, iacc)  # prepending `state.l`
        iacc = @next(inner(rf), iacc, maybeword(state.r))  # appending `state.r`
    else
        @assert state isa Chunk
        iacc = @next(inner(rf), iacc, maybeword(state.s))
    end
    return complete(inner(rf), iacc)
end

That's all we need for using this transducer with sequential folds. For parallel reduce we need combine. It is more or less identical to next:

function Transducers.combine(rf::R_{WordsXF}, a, b)
    ua, ira = unwrap(rf, a)
    ub, irb = unwrap(rf, b)
    word, uc = extract(ua, ub)
    ira = @next(inner(rf), ira, word)
    irc = combine(inner(rf), ira, irb)
    return wrap(rf, uc, irc)
end

wordsxf = opcompose(Map(vacant_or_chunk), WordsXF(), Filter(!isnothing))

Test:

@testset begin
    @test collect(wordsxf, "This is a sample") == ["This", "is", "a", "sample"]
    @test collect(wordsxf, " Here is another sample ") == ["Here", "is", "another", "sample"]
    @test collect(wordsxf, "JustOneWord") == ["JustOneWord"]
    @test collect(wordsxf, " ") == []
    @test collect(wordsxf, "") == []
end
Test Summary: | Pass  Total  Time
test set      |    5      5  0.3s

Word-counting transducer

We can pipe the resulting words into various transducers.

using MicroCollections: SingletonDict

processcount(word) = SingletonDict(word => 1)
countxf = opcompose(wordsxf, Map(processcount))
Map(vacant_or_chunk) 
    WordsXF() 
    Filter(ComposedFunction) 
    Map(processcount)

Transducer countxf constructs a "singleton solution" as a dictionary which then accumulated with the associative reducing step function mergewith!!(+) from BangBang.jl:

using BangBang: mergewith!!

Putting the transducer and reducing function together, we get:

countwords(s; kwargs...) =
    foldxt(mergewith!!(+), countxf, s; init = CopyInit(Dict{String,Int}()), kwargs...)

Side note: We use CopyInit to create a fresh initial state for each sub-reduce to avoid overwriting mutable data between threads.

Let's run some tests with different basesize (length(s) / basesize corresponds to number of tasks to be used):

@testset for basesize in [1, 2, 4]
    @test countwords("This is a sample", basesize=basesize) ==
        Dict("This" => 1, "is" => 1, "a" => 1, "sample" => 1)
    @test countwords(" Here is another sample ", basesize=basesize) ==
        Dict("Here" => 1, "is" => 1, "another" => 1, "sample" => 1)
    @test countwords("JustOneWord", basesize=basesize) ==
        Dict("JustOneWord" => 1)
    @test countwords(" ", basesize=basesize) == Dict()
    @test countwords("", basesize=basesize) == Dict()
    @test countwords("aaa bb aaa aaa bb bb aaa", basesize=basesize) ==
        Dict("aaa" => 4, "bb" => 3)
    @test countwords("あああ いい あああ あああ いい いい あああ", basesize=basesize) ==
        Dict("あああ" => 4, "いい" => 3)
end
Test Summary: | Pass  Total  Time
basesize = 1  |    7      7  0.5s
Test Summary: | Pass  Total  Time
basesize = 2  |    7      7  0.0s
Test Summary: | Pass  Total  Time
basesize = 4  |    7      7  0.0s

This page was generated using Literate.jl.