Skip to content

Commit

Permalink
Move timingwheels to use Nullable instead
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Sep 5, 2023
1 parent 21a6215 commit 52f1a89
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 56 deletions.
2 changes: 1 addition & 1 deletion source/concurrency/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module concurrency.scheduler;
import concurrency.sender : SenderObjectBase, isSender;
import core.time : Duration;
import concepts;
import mir.algebraic : Nullable, nullable;
import std.typecons : Nullable, nullable;

void checkScheduler(T)() {
import concurrency.sender : checkSender;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/timingwheels.d
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import core.memory;

import ikod.containers.hashmap;
import automem;
import mir.algebraic : Nullable, nullable;
import std.typecons : Nullable, nullable;

version(twtesting) {
import unit_threaded;
Expand Down
24 changes: 12 additions & 12 deletions tests/ut/concurrency/operations.d
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,13 @@ unittest {
}).retryWhen(Wait()).withScheduler(worker.getScheduler);

auto driver = just(worker).then((shared ManualTimeWorker worker) {
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;
worker.advance(3.msecs);
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;
worker.advance(3.msecs);
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;
worker.advance(3.msecs);
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});

whenAll(sender, driver).syncWait.value.should == 3;
Expand Down Expand Up @@ -412,11 +412,11 @@ unittest {

auto worker = new shared ManualTimeWorker();
auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
worker.timeUntilNextEvent().should == 10.msecs;
worker.timeUntilNextEvent().should == 10.msecs.nullable;
worker.advance(5.msecs);
worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;
worker.advance(5.msecs);
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);

Expand All @@ -430,9 +430,9 @@ unittest {
auto worker = new shared ManualTimeWorker();
auto source = new StopSource();
auto driver = just(source).then((StopSource source) shared {
worker.timeUntilNextEvent().should == 10.msecs;
worker.timeUntilNextEvent().should == 10.msecs.nullable;
source.stop();
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);

Expand Down Expand Up @@ -795,11 +795,11 @@ unittest {
.repeat();

auto driver = just(worker).then((shared ManualTimeWorker worker) {
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;
worker.advance(1.msecs);
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;
worker.advance(1.msecs);
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;
});

race(base, driver).withScheduler(worker.getScheduler).syncWait().assumeOk;
Expand Down
15 changes: 8 additions & 7 deletions tests/ut/concurrency/scheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import unit_threaded;
import concurrency.stoptoken;
import core.time : msecs;
import concurrency.scheduler;
import std.typecons : nullable;

@("scheduleAfter") @safe
unittest {
Expand Down Expand Up @@ -39,24 +40,24 @@ unittest {
h.atomicOp!"+="(1);
}, 5.msecs);

worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;
g.should == 0;
h.should == 0;

worker.advance(4.msecs);
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;
h.should == 0;
g.should == 0;

worker.advance(1.msecs);
worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;
h.should == 1;
g.should == 0;

worker.advance(5.msecs);
h.should == 1;
g.should == 1;
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
}

@("ManualTimeWorker.cancel") @safe
Expand All @@ -68,15 +69,15 @@ unittest {
auto timer = worker.addTimer((TimerTrigger trigger) shared {
g.atomicOp!"+="(1 + (trigger == TimerTrigger.cancel));
}, 10.msecs);
worker.timeUntilNextEvent().should == 10.msecs;
worker.timeUntilNextEvent().should == 10.msecs.nullable;
g.should == 0;

worker.advance(4.msecs);
worker.timeUntilNextEvent().should == 6.msecs;
worker.timeUntilNextEvent().should == 6.msecs.nullable;
g.should == 0;

worker.cancelTimer(timer);
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
g.should == 2;
}

Expand Down
3 changes: 2 additions & 1 deletion tests/ut/concurrency/sender.d
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import concurrency.operations;
import concurrency.receiver;
import unit_threaded;
import core.atomic : atomicOp;
import std.typecons : nullable;

@("syncWait.value") @safe
unittest {
Expand Down Expand Up @@ -310,7 +311,7 @@ unittest {
.withScheduler(worker.getScheduler);

auto driver = just(worker).then((shared ManualTimeWorker worker) {
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;
worker.advance(1.msecs);
});

Expand Down
69 changes: 35 additions & 34 deletions tests/ut/concurrency/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import unit_threaded;
import concurrency.stoptoken;
import core.atomic;
import concurrency.thread : ThreadSender;
import std.typecons : nullable;

// TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.

Expand All @@ -30,13 +31,13 @@ unittest {
.withScheduler(worker.getScheduler);

auto driver = justFrom(() shared {
worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;
worker.advance(5.msecs);

worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;
worker.advance(5.msecs);

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});

whenAll(interval, driver).syncWait().assumeOk;
Expand Down Expand Up @@ -245,31 +246,31 @@ unittest {
auto driver = justFrom(() shared {
worker.advance(7.msecs);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 4.msecs;
worker.timeUntilNextEvent().should == 4.msecs.nullable;

worker.advance(4.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 6.msecs;
worker.timeUntilNextEvent().should == 6.msecs.nullable;

worker.advance(6.msecs);
p.atomicLoad.should == 3;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 3;
worker.timeUntilNextEvent().should == 7.msecs;
worker.timeUntilNextEvent().should == 7.msecs.nullable;

worker.advance(7.msecs);
p.atomicLoad.should == 3;
worker.timeUntilNextEvent().should == 2.msecs;
worker.timeUntilNextEvent().should == 2.msecs.nullable;

worker.advance(2.msecs);
p.atomicLoad.should == 7;
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});

whenAll(sampler, driver).syncWait().assumeOk;
Expand Down Expand Up @@ -297,39 +298,39 @@ unittest {
auto driver = justFrom(() shared {
worker.advance(3.msecs);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 2.msecs;
worker.timeUntilNextEvent().should == 2.msecs.nullable;

worker.advance(2.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 2.msecs;
worker.timeUntilNextEvent().should == 2.msecs.nullable;

worker.advance(2.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 3;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 3;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 6;
worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});

whenAll(sampler, driver).syncWait().assumeOk;
Expand Down Expand Up @@ -375,14 +376,14 @@ unittest {

auto driver = justFrom(() shared {
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

foreach (expected; [0, 0, 3, 3, 3, 9, 9, 9, 18, 18, 18, 30]) {
worker.advance(1.msecs);
p.atomicLoad.should == expected;
}

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});

whenAll(throttled, driver).syncWait().assumeOk;
Expand Down Expand Up @@ -466,7 +467,7 @@ unittest {
worker.advance(1.msecs);
p.atomicLoad.should == 5;

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
whenAll(throttled, driver).syncWait().assumeOk;

Expand All @@ -491,31 +492,31 @@ unittest {
auto driver = justFrom(() shared {
source.emit(1);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 1;

source.emit(2);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

source.emit(3);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 2.msecs;
worker.timeUntilNextEvent().should == 2.msecs.nullable;

source.emit(4);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 3.msecs;
worker.timeUntilNextEvent().should == 3.msecs.nullable;

worker.advance(3.msecs);
p.atomicLoad.should == 5;

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
whenAll(throttled, driver).syncWait().assumeOk;

Expand Down Expand Up @@ -691,11 +692,11 @@ unittest {

auto driver = justFrom(() shared {
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;

worker.advance(5.msecs);
p.atomicLoad.should == 1;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 2;
Expand Down Expand Up @@ -724,7 +725,7 @@ unittest {
worker.advance(1.msecs);
p.atomicLoad.should == 10;

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
whenAll(sender, driver).syncWait().assumeOk;

Expand Down Expand Up @@ -753,11 +754,11 @@ unittest {

auto driver = justFrom(() shared {
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 5.msecs;
worker.timeUntilNextEvent().should == 5.msecs.nullable;

worker.advance(5.msecs);
p.atomicLoad.should == 0;
worker.timeUntilNextEvent().should == 1.msecs;
worker.timeUntilNextEvent().should == 1.msecs.nullable;

worker.advance(1.msecs);
p.atomicLoad.should == 0;
Expand Down Expand Up @@ -804,7 +805,7 @@ unittest {
worker.advance(2.msecs);
p.atomicLoad.should == 12;

worker.timeUntilNextEvent().should == null;
worker.timeUntilNextEvent().isNull.should == true;
});
whenAll(sender, driver).syncWait().assumeOk;

Expand Down

0 comments on commit 52f1a89

Please sign in to comment.