diff --git a/source/concurrency/sequence.d b/source/concurrency/sequence.d index 1ed5497..8fb9ed7 100644 --- a/source/concurrency/sequence.d +++ b/source/concurrency/sequence.d @@ -941,6 +941,116 @@ auto iotaSequence(T)(T start, T end) { return iota(start, end).sequence(); } +auto filterMap(Sequence, Fun)(Sequence s, Fun f) { + return FilterMapSequence!(Sequence, Fun)(s, f); +} + +struct FilterMapSequence(Sequence, Fun) { + import std.traits : ReturnType; + alias Value = void; + alias Element = Sequence.Element; + Sequence s; + Fun f; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + auto op = s.connect(FilterMapSequenceReceiver!(Fun, Receiver)(f, receiver)); + return op; + } +} + +struct FilterMapSequenceReceiver(Fun, Receiver) { + Fun fun; + Receiver receiver; + auto setNext(Sender)(Sender sender) { + return FilterMapSequenceNextSender!(Sender, Fun, Receiver)(sender, fun, receiver); + } + auto setValue() { + receiver.setValue(); + } + auto setDone() nothrow @safe { + receiver.setDone(); + } + auto setError(Throwable t) nothrow @safe { + receiver.setError(t); + } + import concurrency.receiver : ForwardExtensionPoints; + mixin ForwardExtensionPoints!receiver; +} + +struct FilterMapSequenceNextSender(Sender, Fun, NextReceiver) { + alias Value = Sender.Value; + Sender sender; + Fun fun; + NextReceiver nextReceiver; + auto connect(Receiver)(return Receiver receiver) @safe return scope { + auto op = FilterMapSequenceNextOp!(Sender, Fun, NextReceiver, Receiver)(sender, fun, nextReceiver, receiver); + return op; + } +} + +struct FilterMapSequenceNextOp(Sender, Fun, NextReceiver, Receiver) { + import concurrency.sender : OpType; + + alias Op = OpType!(Sender, FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver)); + Op op; + FilterMapSequenceNextState!(Fun, NextReceiver, Receiver) state; + this(Sender sender, Fun fun, NextReceiver nextReceiver, Receiver receiver) @trusted { + state = FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)(fun, nextReceiver, receiver); + op = sender.connect(FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver)(&state)); + } + @disable this(ref return scope typeof(this) rhs); + @disable this(this); + + @disable void opAssign(typeof(this) rhs) nothrow @safe @nogc; + @disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc; + + void start() @trusted scope nothrow { + op.start(); + } +} + +struct FilterMapSequenceNextState(Fun, NextReceiver, Receiver) { + Fun fun; + NextReceiver nextReceiver; + Receiver receiver; +} + +struct FilterMapSequenceNextReceiver(Value, Fun, NextReceiver, Receiver) { + FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)* state; + + auto setValue(Value value) { + import concurrency : just; + import concurrency : connectHeap; + auto result = state.fun(value); + if (result.isNone) { + state.receiver.setValue(); + } else { + auto sender = state.nextReceiver.setNext(just(result.getSome)); + // TODO: put state in FilterMapSequenceNextOp + sender.connectHeap(state.receiver).start(); + } + } + auto setError(Throwable t) nothrow @safe { + state.nextReceiver.setError(t); + } + auto setDone() nothrow @safe { + state.nextReceiver.setDone(); + } + auto receiver() nothrow @safe { + return &state.nextReceiver; + } + import concurrency.receiver : ForwardExtensionPoints; + mixin ForwardExtensionPoints!receiver; +} + +private bool isNone(T)(ref const T t) { + return t.isNull(); +} + +private auto getSome(T)(ref T t) { + return t.get(); +} + + // cron - create a sequence like interval but using cron spec // flatmap{latest,concat} - create a sequence that flattens diff --git a/tests/ut/concurrency/sequence.d b/tests/ut/concurrency/sequence.d index 0a520af..0c25aee 100644 --- a/tests/ut/concurrency/sequence.d +++ b/tests/ut/concurrency/sequence.d @@ -144,3 +144,14 @@ import unit_threaded; @safe unittest { iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9]; } + +@("filterMap") +@safe unittest { + import std.typecons : Nullable; + [1,2,3,4].sequence.filterMap((int i) { + if (i > 2) + return Nullable!(int)(i*3); + else + return Nullable!(int).init; + }).toList().syncWait.value.should == [9,12]; +}