From 52f1a89c169efb7c8e03fe633a19277d46ce3631 Mon Sep 17 00:00:00 2001 From: Sebastiaan Koppe Date: Tue, 5 Sep 2023 15:13:50 +0200 Subject: [PATCH] Move timingwheels to use Nullable instead --- source/concurrency/scheduler.d | 2 +- source/concurrency/timingwheels.d | 2 +- tests/ut/concurrency/operations.d | 24 +++++------ tests/ut/concurrency/scheduler.d | 15 +++---- tests/ut/concurrency/sender.d | 3 +- tests/ut/concurrency/stream.d | 69 ++++++++++++++++--------------- 6 files changed, 59 insertions(+), 56 deletions(-) diff --git a/source/concurrency/scheduler.d b/source/concurrency/scheduler.d index 7892566..4083dc0 100644 --- a/source/concurrency/scheduler.d +++ b/source/concurrency/scheduler.d @@ -3,7 +3,7 @@ module concurrency.scheduler; import concurrency.sender : SenderObjectBase, isSender; import core.time : Duration; import concepts; -import mir.algebraic : Nullable, nullable; +import std.typecons : Nullable, nullable; void checkScheduler(T)() { import concurrency.sender : checkSender; diff --git a/source/concurrency/timingwheels.d b/source/concurrency/timingwheels.d index b447cc8..8963bc7 100644 --- a/source/concurrency/timingwheels.d +++ b/source/concurrency/timingwheels.d @@ -26,7 +26,7 @@ import core.memory; import ikod.containers.hashmap; import automem; -import mir.algebraic : Nullable, nullable; +import std.typecons : Nullable, nullable; version(twtesting) { import unit_threaded; diff --git a/tests/ut/concurrency/operations.d b/tests/ut/concurrency/operations.d index 43d33cc..dc39fc5 100644 --- a/tests/ut/concurrency/operations.d +++ b/tests/ut/concurrency/operations.d @@ -331,13 +331,13 @@ unittest { }).retryWhen(Wait()).withScheduler(worker.getScheduler); auto driver = just(worker).then((shared ManualTimeWorker worker) { - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(sender, driver).syncWait.value.should == 3; @@ -412,11 +412,11 @@ unittest { auto worker = new shared ManualTimeWorker(); auto driver = just(worker).then((shared ManualTimeWorker worker) shared { - worker.timeUntilNextEvent().should == 10.msecs; + worker.timeUntilNextEvent().should == 10.msecs.nullable; worker.advance(5.msecs); - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; worker.advance(5.msecs); - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); @@ -430,9 +430,9 @@ unittest { auto worker = new shared ManualTimeWorker(); auto source = new StopSource(); auto driver = just(source).then((StopSource source) shared { - worker.timeUntilNextEvent().should == 10.msecs; + worker.timeUntilNextEvent().should == 10.msecs.nullable; source.stop(); - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); @@ -795,11 +795,11 @@ unittest { .repeat(); auto driver = just(worker).then((shared ManualTimeWorker worker) { - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; }); race(base, driver).withScheduler(worker.getScheduler).syncWait().assumeOk; diff --git a/tests/ut/concurrency/scheduler.d b/tests/ut/concurrency/scheduler.d index b4eb86e..b0559d0 100644 --- a/tests/ut/concurrency/scheduler.d +++ b/tests/ut/concurrency/scheduler.d @@ -7,6 +7,7 @@ import unit_threaded; import concurrency.stoptoken; import core.time : msecs; import concurrency.scheduler; +import std.typecons : nullable; @("scheduleAfter") @safe unittest { @@ -39,24 +40,24 @@ unittest { h.atomicOp!"+="(1); }, 5.msecs); - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; g.should == 0; h.should == 0; worker.advance(4.msecs); - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; h.should == 0; g.should == 0; worker.advance(1.msecs); - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; h.should == 1; g.should == 0; worker.advance(5.msecs); h.should == 1; g.should == 1; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; } @("ManualTimeWorker.cancel") @safe @@ -68,15 +69,15 @@ unittest { auto timer = worker.addTimer((TimerTrigger trigger) shared { g.atomicOp!"+="(1 + (trigger == TimerTrigger.cancel)); }, 10.msecs); - worker.timeUntilNextEvent().should == 10.msecs; + worker.timeUntilNextEvent().should == 10.msecs.nullable; g.should == 0; worker.advance(4.msecs); - worker.timeUntilNextEvent().should == 6.msecs; + worker.timeUntilNextEvent().should == 6.msecs.nullable; g.should == 0; worker.cancelTimer(timer); - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; g.should == 2; } diff --git a/tests/ut/concurrency/sender.d b/tests/ut/concurrency/sender.d index 48524fd..8b9fb80 100644 --- a/tests/ut/concurrency/sender.d +++ b/tests/ut/concurrency/sender.d @@ -7,6 +7,7 @@ import concurrency.operations; import concurrency.receiver; import unit_threaded; import core.atomic : atomicOp; +import std.typecons : nullable; @("syncWait.value") @safe unittest { @@ -310,7 +311,7 @@ unittest { .withScheduler(worker.getScheduler); auto driver = just(worker).then((shared ManualTimeWorker worker) { - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); }); diff --git a/tests/ut/concurrency/stream.d b/tests/ut/concurrency/stream.d index e847b27..7d81cc7 100644 --- a/tests/ut/concurrency/stream.d +++ b/tests/ut/concurrency/stream.d @@ -6,6 +6,7 @@ import unit_threaded; import concurrency.stoptoken; import core.atomic; import concurrency.thread : ThreadSender; +import std.typecons : nullable; // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. @@ -30,13 +31,13 @@ unittest { .withScheduler(worker.getScheduler); auto driver = justFrom(() shared { - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; worker.advance(5.msecs); - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; worker.advance(5.msecs); - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(interval, driver).syncWait().assumeOk; @@ -245,31 +246,31 @@ unittest { auto driver = justFrom(() shared { worker.advance(7.msecs); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 4.msecs; + worker.timeUntilNextEvent().should == 4.msecs.nullable; worker.advance(4.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 6.msecs; + worker.timeUntilNextEvent().should == 6.msecs.nullable; worker.advance(6.msecs); p.atomicLoad.should == 3; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 3; - worker.timeUntilNextEvent().should == 7.msecs; + worker.timeUntilNextEvent().should == 7.msecs.nullable; worker.advance(7.msecs); p.atomicLoad.should == 3; - worker.timeUntilNextEvent().should == 2.msecs; + worker.timeUntilNextEvent().should == 2.msecs.nullable; worker.advance(2.msecs); p.atomicLoad.should == 7; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(sampler, driver).syncWait().assumeOk; @@ -297,39 +298,39 @@ unittest { auto driver = justFrom(() shared { worker.advance(3.msecs); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 2.msecs; + worker.timeUntilNextEvent().should == 2.msecs.nullable; worker.advance(2.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 2.msecs; + worker.timeUntilNextEvent().should == 2.msecs.nullable; worker.advance(2.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 3; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 3; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 6; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(sampler, driver).syncWait().assumeOk; @@ -375,14 +376,14 @@ unittest { auto driver = justFrom(() shared { p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; foreach (expected; [0, 0, 3, 3, 3, 9, 9, 9, 18, 18, 18, 30]) { worker.advance(1.msecs); p.atomicLoad.should == expected; } - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(throttled, driver).syncWait().assumeOk; @@ -466,7 +467,7 @@ unittest { worker.advance(1.msecs); p.atomicLoad.should == 5; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(throttled, driver).syncWait().assumeOk; @@ -491,31 +492,31 @@ unittest { auto driver = justFrom(() shared { source.emit(1); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 1; source.emit(2); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; source.emit(3); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 2.msecs; + worker.timeUntilNextEvent().should == 2.msecs.nullable; source.emit(4); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 3.msecs; + worker.timeUntilNextEvent().should == 3.msecs.nullable; worker.advance(3.msecs); p.atomicLoad.should == 5; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(throttled, driver).syncWait().assumeOk; @@ -691,11 +692,11 @@ unittest { auto driver = justFrom(() shared { p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; worker.advance(5.msecs); p.atomicLoad.should == 1; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 2; @@ -724,7 +725,7 @@ unittest { worker.advance(1.msecs); p.atomicLoad.should == 10; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(sender, driver).syncWait().assumeOk; @@ -753,11 +754,11 @@ unittest { auto driver = justFrom(() shared { p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 5.msecs; + worker.timeUntilNextEvent().should == 5.msecs.nullable; worker.advance(5.msecs); p.atomicLoad.should == 0; - worker.timeUntilNextEvent().should == 1.msecs; + worker.timeUntilNextEvent().should == 1.msecs.nullable; worker.advance(1.msecs); p.atomicLoad.should == 0; @@ -804,7 +805,7 @@ unittest { worker.advance(2.msecs); p.atomicLoad.should == 12; - worker.timeUntilNextEvent().should == null; + worker.timeUntilNextEvent().isNull.should == true; }); whenAll(sender, driver).syncWait().assumeOk;