From 9b3c09b2f2ee2f449b67931359d8b7e9ac4b338f Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Sun, 1 Sep 2024 20:52:32 +0200 Subject: [PATCH] Add slide sequence operator --- source/concurrency/sequence.d | 83 +++++++++++++++++++++++++++++++-- tests/ut/concurrency/sequence.d | 34 ++++++++++++++ 2 files changed, 112 insertions(+), 5 deletions(-) diff --git a/source/concurrency/sequence.d b/source/concurrency/sequence.d index 3ab9ba6..d0c8bcd 100644 --- a/source/concurrency/sequence.d +++ b/source/concurrency/sequence.d @@ -1197,16 +1197,89 @@ struct SampleSequenceOp(BaseSequence, TriggerSequence, Receiver) { } } +auto slide(Sequence)(Sequence sequence, long window, long step = 1) { + static assert(!is(Sequence.Element == void), "Sequence passed to slide must produce elements."); + + import std.exception : enforce; + enforce(window > 0, "window must be greater than 0."); + enforce(step > 0, "step must be greated than 0."); -// cron - create a sequence like interval but using cron spec + return SlideSequence!(Sequence)(sequence, window, step); +} -// flatmap{latest,concat} - create a sequence that flattens +struct SlideSequence(Sequence) { + alias Value = void; + alias Element = Sequence.Element[]; + + Sequence sequence; + long window, step; + + auto connect(Receiver)(return Receiver receiver) @safe return scope { + auto op = SlideSequenceOp!(Sequence, Receiver)(sequence, window, step, receiver); + return op; + } +} + +struct SlideSequenceOp(Sequence, Receiver) { + import concurrency.sender : OpType; + alias Element = Sequence.Element; + + alias Op = OpType!(FilterMapSequence!(Sequence, Nullable!(Element[]) delegate(Sequence.Element) @safe pure nothrow), Receiver); + + long step, pos; + Element[] arr; + Op op; + + @disable this(ref return scope typeof(this) rhs); + @disable this(this); -// sample - forward latest from sequence a when sequence b emits + @disable void opAssign(typeof(this) rhs) nothrow @safe @nogc; + @disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc; + + this(Sequence sequence, long window, long step, return Receiver receiver) @trusted return scope { + arr.length = window; + this.step = step; + op = sequence.filterMap(&this.onNext).connect(receiver); + } -// scan - applies accumulator to each value + void start() scope { + op.start(); + } -// slide - create sliding window over sequence + Nullable!(Element[]) onNext(Sequence.Element element) @safe { + import std.algorithm : moveAll; + if (pos < 0) { + pos++; + return Nullable!(Element[]).init; + } + + if (pos+1 > arr.length) { + if (step < arr.length) { + moveAll(arr[step .. $], arr[0 .. $ - step]); + pos -= step; + } else { + pos = (cast(long)arr.length) - step; + if (pos < 0) { + pos++; + return Nullable!(Element[]).init; + } + } + } + + arr[pos] = element; + pos++; + + if (pos == arr.length) { + return Nullable!(Element[])(arr.dup); + } else { + return Nullable!(Element[]).init; + } + } +} + +// cron - create a sequence like interval but using cron spec + +// flatmap{latest,concat} - create a sequence that flattens // throttling ? diff --git a/tests/ut/concurrency/sequence.d b/tests/ut/concurrency/sequence.d index 3c7312f..e3a1480 100644 --- a/tests/ut/concurrency/sequence.d +++ b/tests/ut/concurrency/sequence.d @@ -197,3 +197,37 @@ import unit_threaded; driver, ).syncWait.value.should == [1,3,5,7]; } + +@("slide.basic") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,1).toList().syncWait.value.should == [ + [1,2,3], + [2,3,4], + [3,4,5], + [4,5,6] + ]; +} + +@("slide.step.skip") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,2).toList().syncWait.value.should == [ + [1,2,3], + [3,4,5] + ]; +} + +@("slide.step.full") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(3,3).toList().syncWait.value.should == [ + [1,2,3], + [4,5,6] + ]; +} + +@("slide.step.negative") +@safe unittest { + [1,2,3,4,5,6].sequence().slide(2,3).toList().syncWait.value.should == [ + [1,2], + [4,5] + ]; +} \ No newline at end of file