Skip to content

Commit

Permalink
Add filterMap Sequence operator
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 14, 2024
1 parent b3a2ca3 commit b6ef86e
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
110 changes: 110 additions & 0 deletions source/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,116 @@ auto iotaSequence(T)(T start, T end) {
return iota(start, end).sequence();
}

auto filterMap(Sequence, Fun)(Sequence s, Fun f) {
return FilterMapSequence!(Sequence, Fun)(s, f);
}

struct FilterMapSequence(Sequence, Fun) {
import std.traits : ReturnType;
alias Value = void;
alias Element = Sequence.Element;
Sequence s;
Fun f;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
auto op = s.connect(FilterMapSequenceReceiver!(Fun, Receiver)(f, receiver));
return op;
}
}

struct FilterMapSequenceReceiver(Fun, Receiver) {
Fun fun;
Receiver receiver;
auto setNext(Sender)(Sender sender) {
return FilterMapSequenceNextSender!(Sender, Fun, Receiver)(sender, fun, receiver);
}
auto setValue() {
receiver.setValue();
}
auto setDone() nothrow @safe {
receiver.setDone();
}
auto setError(Throwable t) nothrow @safe {
receiver.setError(t);
}
import concurrency.receiver : ForwardExtensionPoints;
mixin ForwardExtensionPoints!receiver;
}

struct FilterMapSequenceNextSender(Sender, Fun, NextReceiver) {
alias Value = Sender.Value;
Sender sender;
Fun fun;
NextReceiver nextReceiver;
auto connect(Receiver)(return Receiver receiver) @safe return scope {
auto op = FilterMapSequenceNextOp!(Sender, Fun, NextReceiver, Receiver)(sender, fun, nextReceiver, receiver);
return op;
}
}

struct FilterMapSequenceNextOp(Sender, Fun, NextReceiver, Receiver) {
import concurrency.sender : OpType;

alias Op = OpType!(Sender, FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver));
Op op;
FilterMapSequenceNextState!(Fun, NextReceiver, Receiver) state;
this(Sender sender, Fun fun, NextReceiver nextReceiver, Receiver receiver) @trusted {
state = FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)(fun, nextReceiver, receiver);
op = sender.connect(FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver)(&state));
}
@disable this(ref return scope typeof(this) rhs);
@disable this(this);

@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;

void start() @trusted scope nothrow {
op.start();
}
}

struct FilterMapSequenceNextState(Fun, NextReceiver, Receiver) {
Fun fun;
NextReceiver nextReceiver;
Receiver receiver;
}

struct FilterMapSequenceNextReceiver(Value, Fun, NextReceiver, Receiver) {
FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)* state;

auto setValue(Value value) {
import concurrency : just;
import concurrency : connectHeap;
auto result = state.fun(value);
if (result.isNone) {
state.receiver.setValue();
} else {
auto sender = state.nextReceiver.setNext(just(result.getSome));
// TODO: put state in FilterMapSequenceNextOp
sender.connectHeap(state.receiver).start();
}
}
auto setError(Throwable t) nothrow @safe {
state.nextReceiver.setError(t);
}
auto setDone() nothrow @safe {
state.nextReceiver.setDone();
}
auto receiver() nothrow @safe {
return &state.nextReceiver;
}
import concurrency.receiver : ForwardExtensionPoints;
mixin ForwardExtensionPoints!receiver;
}

private bool isNone(T)(ref const T t) {
return t.isNull();
}

private auto getSome(T)(ref T t) {
return t.get();
}


// cron - create a sequence like interval but using cron spec

// flatmap{latest,concat} - create a sequence that flattens
Expand Down
11 changes: 11 additions & 0 deletions tests/ut/concurrency/sequence.d
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,14 @@ import unit_threaded;
@safe unittest {
iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9];
}

@("filterMap")
@safe unittest {
import std.typecons : Nullable;
[1,2,3,4].sequence.filterMap((int i) {
if (i > 2)
return Nullable!(int)(i*3);
else
return Nullable!(int).init;
}).toList().syncWait.value.should == [9,12];
}

0 comments on commit b6ef86e

Please sign in to comment.