Skip to content

Commit

Permalink
Add slide sequence operator
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 18, 2024
1 parent 705a319 commit 9b3c09b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 5 deletions.
83 changes: 78 additions & 5 deletions source/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -1197,16 +1197,89 @@ struct SampleSequenceOp(BaseSequence, TriggerSequence, Receiver) {
}
}

auto slide(Sequence)(Sequence sequence, long window, long step = 1) {
static assert(!is(Sequence.Element == void), "Sequence passed to slide must produce elements.");

import std.exception : enforce;
enforce(window > 0, "window must be greater than 0.");
enforce(step > 0, "step must be greated than 0.");

// cron - create a sequence like interval but using cron spec
return SlideSequence!(Sequence)(sequence, window, step);
}

// flatmap{latest,concat} - create a sequence that flattens
struct SlideSequence(Sequence) {
alias Value = void;
alias Element = Sequence.Element[];

Sequence sequence;
long window, step;

auto connect(Receiver)(return Receiver receiver) @safe return scope {
auto op = SlideSequenceOp!(Sequence, Receiver)(sequence, window, step, receiver);
return op;
}
}

struct SlideSequenceOp(Sequence, Receiver) {
import concurrency.sender : OpType;
alias Element = Sequence.Element;

alias Op = OpType!(FilterMapSequence!(Sequence, Nullable!(Element[]) delegate(Sequence.Element) @safe pure nothrow), Receiver);

long step, pos;
Element[] arr;
Op op;

@disable this(ref return scope typeof(this) rhs);
@disable this(this);

// sample - forward latest from sequence a when sequence b emits
@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;

this(Sequence sequence, long window, long step, return Receiver receiver) @trusted return scope {
arr.length = window;
this.step = step;
op = sequence.filterMap(&this.onNext).connect(receiver);
}

// scan - applies accumulator to each value
void start() scope {
op.start();
}

// slide - create sliding window over sequence
Nullable!(Element[]) onNext(Sequence.Element element) @safe {
import std.algorithm : moveAll;
if (pos < 0) {
pos++;
return Nullable!(Element[]).init;
}

if (pos+1 > arr.length) {
if (step < arr.length) {
moveAll(arr[step .. $], arr[0 .. $ - step]);
pos -= step;
} else {
pos = (cast(long)arr.length) - step;
if (pos < 0) {
pos++;
return Nullable!(Element[]).init;
}
}
}

arr[pos] = element;
pos++;

if (pos == arr.length) {
return Nullable!(Element[])(arr.dup);
} else {
return Nullable!(Element[]).init;
}
}
}

// cron - create a sequence like interval but using cron spec

// flatmap{latest,concat} - create a sequence that flattens

// throttling ?

Expand Down
34 changes: 34 additions & 0 deletions tests/ut/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,37 @@ import unit_threaded;
driver,
).syncWait.value.should == [1,3,5,7];
}

@("slide.basic")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,1).toList().syncWait.value.should == [
[1,2,3],
[2,3,4],
[3,4,5],
[4,5,6]
];
}

@("slide.step.skip")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,2).toList().syncWait.value.should == [
[1,2,3],
[3,4,5]
];
}

@("slide.step.full")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(3,3).toList().syncWait.value.should == [
[1,2,3],
[4,5,6]
];
}

@("slide.step.negative")
@safe unittest {
[1,2,3,4,5,6].sequence().slide(2,3).toList().syncWait.value.should == [
[1,2],
[4,5]
];
}

0 comments on commit 9b3c09b

Please sign in to comment.