From 6a959ed6cd241768822c36034d61f9f6e1af150a Mon Sep 17 00:00:00 2001 From: Elizabeth Mattijsen Date: Wed, 12 Jun 2024 13:16:18 +0200 Subject: [PATCH] Re-imagine infectiousness (Part 1/N) - lose the $!buffer attribute on the ParaSeq class: initiall, a BufferIterator will be made as the source: if nothing special happens, then this will produce the values transparently. If a !start *does* happen, the buffer will be zapped from the BufferIterator and become the first buffer to be handled, and the sourc iterator of the BufferIterator will become the source iterator of the ParaSeq object. This also allows losing the $!parent attribute in BufferIterator - added many invocant constraints - replace the !pass-the-chain method by !re-source: this will no longer create a new ParaSeq object because it is not needed: swapping source/result iterators is enough, as each iterator itself should have all it needs. - the default state of $!result is now nqp::null Sadly this does not fix all issues with infectiousness yet, hence only the Part 1/N --- lib/ParaSeq.rakumod | 177 +++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 86 deletions(-) diff --git a/lib/ParaSeq.rakumod b/lib/ParaSeq.rakumod index df8bc84..86ee46d 100644 --- a/lib/ParaSeq.rakumod +++ b/lib/ParaSeq.rakumod @@ -172,32 +172,29 @@ my class ParaQueue is repr('ConcBlockingQueue') { } # of the values from the buffer, and then from the iterator my class BufferIterator does Iterator { - has $!parent # ParaSeq parent object - handles ; - has $!buffer; # first buffer to produce from - has $!iterator; # iterator to produce from onwards + has $!buffer; # first buffer to produce from + has $.source; # iterator to produce from onwards - method new(\parent, \buffer, \iterator) { + method new(\buffer, \source) { my $self := nqp::create(self); - nqp::bindattr($self,BufferIterator,'$!parent', parent ); - nqp::bindattr($self,BufferIterator,'$!buffer', buffer ); - nqp::bindattr($self,BufferIterator,'$!iterator',iterator); + nqp::bindattr($self,BufferIterator,'$!buffer',buffer); + nqp::bindattr($self,BufferIterator,'$!source',source); $self } - method pull-one() { + method pull-one(BufferIterator:D:) { nqp::elems($!buffer) ?? nqp::shift($!buffer) - !! $!iterator.pull-one + !! $!source.pull-one } - method skip-at-least(uint $skipping) { + method skip-at-least(BufferIterator:D: uint $skipping) { # Need to skip more than in initial buffer my uint $elems = nqp::elems($!buffer); if $skipping > $elems { nqp::setelems($!buffer,0); - $!iterator.skip-at-least($skipping - $elems) + $!source.skip-at-least($skipping - $elems) } # Less than in inital buffer, remove so many entries from it @@ -207,13 +204,35 @@ my class BufferIterator does Iterator { } } - method push-all(\target) { + method push-all(BufferIterator:D: \target) { nqp::istype(target,IB) ?? nqp::splice(target,$!buffer,nqp::elems(target),0) !! $!buffer.iterator.push-all(target); nqp::setelems($!buffer,0); - $!iterator.push-all(target) + $!source.push-all(target) + } + + method is-lazy(BufferIterator:D:) { + $!source.is-lazy + } + method is-deterministic(BufferIterator:D:) { + $!source.is-deterministic + } + method is-monotonically-increasing(BufferIterator:D:) { + $!source.is-monotonically-increasing + } + + # If there is a buffer available, return it and reset in object, else Nil + method zap-buffer(BufferIterator:D:) { + my $buffer := $!buffer; + if nqp::elems($buffer) { + $!buffer := nqp::create(IB); + $buffer + } + else { + Nil + } } method stats(--> emptyList) { } @@ -257,7 +276,7 @@ my class SquishIterator does Iterator { } } - method pull-one() { + method pull-one(SquishIterator:D:) { nqp::elems(my $current := $!current) > 1 ?? nqp::shift($current) !! nqp::elems($current) @@ -265,7 +284,7 @@ my class SquishIterator does Iterator { !! self!next-batch } - method push-all(\target --> IterationEnd) { + method push-all(SquishIterator:D: \target --> IterationEnd) { my $source := $!source; my &as := $!as; my &with := $!with; @@ -516,7 +535,7 @@ my class ParaIterator does Iterator { $buffer } - method pull-one() { + method pull-one(ParaIterator:D:) { nqp::if( nqp::elems($!current), (my $pulled := nqp::shift($!current)), @@ -533,7 +552,7 @@ my class ParaIterator does Iterator { $pulled } - method skip-at-least(uint $skipping) { + method skip-at-least(ParaIterator:D: uint $skipping) { my $current := $!current; my $queues := $!queues; my uint $toskip = $skipping; @@ -565,7 +584,7 @@ my class ParaIterator does Iterator { proto method push-all(|) {*} # Optimized version appending to an IterationBuffer - multi method push-all(IterationBuffer:D \target) { + multi method push-all(ParaIterator:D: IterationBuffer:D \target) { my $current := $!current; my $queues := $!queues; @@ -584,7 +603,7 @@ my class ParaIterator does Iterator { # Slower generic version that needs to coerce each buffer to a List # to ensure the correct semantics with .append - multi method push-all(\target) { + multi method push-all(ParaIterator:D: \target) { my $current := $!current; my $queues := $!queues; @@ -658,7 +677,6 @@ class ParaStats { #- ParaSeq --------------------------------------------------------------------- # The class containing all of the logic for parallel sequences class ParaSeq does Sequence { - has $!buffer; # first buffer has $!source; # iterator producing source values has $!result; # iterator producing result values has $!SCHEDULER; # $*SCHEDULER to be used @@ -689,25 +707,24 @@ class ParaSeq does Sequence { #- private helper methods ------------------------------------------------------ - # Do error checking and set up object if all ok + # Set up object and return it method !setup( + $source, uint $batch, uint $auto, uint $degree, uint $stop-after, - $buffer is raw, - $source ) is hidden-from-backtrace { my $self := nqp::create(self); + nqp::bindattr($self, ParaSeq, '$!source', $source ); + nqp::bindattr($self, ParaSeq, '$!result', nqp::null ); + nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $*SCHEDULER); + nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); + nqp::bindattr_i($self, ParaSeq, '$!batch', $batch ); nqp::bindattr_i($self, ParaSeq, '$!auto', $auto ); nqp::bindattr_i($self, ParaSeq, '$!degree', $degree ); nqp::bindattr_i($self, ParaSeq, '$!stop-after', $stop-after); - - nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $*SCHEDULER); - nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); - nqp::bindattr($self, ParaSeq, '$!buffer', $buffer ); - nqp::bindattr($self, ParaSeq, '$!source', $source ); $self } @@ -725,10 +742,13 @@ class ParaSeq does Sequence { !! $batch } - # Local copy of first buffer, make sure it is granulized correctly - my $first := $!buffer; - if nqp::isconcrete($!buffer) { - $!buffer := Mu; # release buffer in object + # Get copy of first buffer if possible + my $first; + if nqp::istype($!source,BufferIterator) + && ($first := $!source.zap-buffer) { + + # We can get rid of the BufferIterator now + $!source := $!source.source; nqp::until( nqp::elems($first) %% $granularity || nqp::eqaddr((my $pulled := $!source.pull-one),IE), @@ -872,18 +892,11 @@ class ParaSeq does Sequence { self } - # Entry point in chain, from an Iterator - method !pass-the-chain(\source) { - my $self := nqp::create(self); - nqp::bindattr_i($self, ParaSeq, '$!degree', $!degree ); - nqp::bindattr_i($self, ParaSeq, '$!batch', $!batch ); - nqp::bindattr_i($self, ParaSeq, '$!auto', $!auto ); - nqp::bindattr_i($self, ParaSeq, '$!stop-after', $!stop-after); - - nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $!SCHEDULER); - nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); - nqp::bindattr($self, ParaSeq, '$!source', source ); - $self + # Change the source of this ParaSeq (and reset the result iterator) + method !re-source($source is raw) { + $!source := $source; + $!result := nqp::null; + self } # Mark the given queue as done @@ -914,7 +927,7 @@ class ParaSeq does Sequence { nqp::elems($output), nqp::time() - $then ) - ); + ) if nqp::istype($!result,ParaIterator); # Make the produced values available to the result iterator nqp::push($semaphore,$output); @@ -941,15 +954,7 @@ class ParaSeq does Sequence { #- where all the magic happens under the hood ---------------------------------- - # Acts as a normal iterator, producing values from queues that are - # filled asynchronously. If there is no result iterator, then the - # source iterator be recreated using the initial buffer if there is - # one. Otherwise - method iterator(ParaSeq:D:) { - $!result //= nqp::istype($!buffer,IB) - ?? BufferIterator.new(self, $!buffer, $!source) - !! $!source - } + method iterator(ParaSeq:D:) { nqp::ifnull($!result,$!source) } method stop(ParaSeq:D:) { nqp::atomicstore_i($!stop,1); @@ -1689,7 +1694,7 @@ class ParaSeq does Sequence { ) ); - self!pass-the-chain: $final.iterator + self!re-source: $final.iterator } proto method max(|) {*} @@ -1811,7 +1816,7 @@ class ParaSeq does Sequence { SquishIterator.new: self!start(&processor).iterator, &as, &with } - self!pass-the-chain: $iterator + self!re-source: $iterator } #- unique ---------------------------------------------------------------------- @@ -1899,7 +1904,7 @@ class ParaSeq does Sequence { # one thread doing it, and so we've done all the checking that we # need already if nqp::elems($seens) == 1 { - self!pass-the-chain: WhichIterator.new($result) + self!re-source: WhichIterator.new($result) } # Multiple threads processed values @@ -1937,79 +1942,79 @@ class ParaSeq does Sequence { ) ); - self!pass-the-chain: nqp::elems($seen) + self!re-source: nqp::elems($seen) ?? UniqueIterator.new($seen, $result) # need further checks !! WhichIterator.new($result) # all ok } } multi method unique(ParaSeq:D: :&as = &identity) { - self!pass-the-chain: self.List.unique(:&as).iterator + self!re-source: self.List.unique(:&as).iterator } #- interfaces that are just infectious ----------------------------------------- proto method collate(|) {*} multi method collate(ParaSeq:D:) { - self!pass-the-chain: self.List.collate.iterator + self!re-source: self.List.collate.iterator } proto method combinations(|) {*} multi method combinations(ParaSeq:D:) { - self!pass-the-chain: self.List.combinations.iterator + self!re-source: self.List.combinations.iterator } multi method combinations(ParaSeq:D: $of) { - self!pass-the-chain: self.List.combinations($of).iterator + self!re-source: self.List.combinations($of).iterator } multi method flat(ParaSeq:D:) { - self!pass-the-chain: self.List.flat.iterator + self!re-source: self.List.flat.iterator } multi method head(ParaSeq:D: $what) { - self!pass-the-chain: self.Seq.head($what).iterator + self!re-source: self.Seq.head($what).iterator } multi method invert(ParaSeq:D:) { - self!pass-the-chain: self.Seq.invert.iterator + self!re-source: self.Seq.invert.iterator } multi method keys(ParaSeq:D:) { - self!pass-the-chain: self.Seq.keys.iterator + self!re-source: self.Seq.keys.iterator } multi method pairup(ParaSeq:D:) { - self!pass-the-chain: self.List.pairup.iterator + self!re-source: self.List.pairup.iterator } multi method permutations(ParaSeq:D:) { - self!pass-the-chain: self.List.permutations.iterator + self!re-source: self.List.permutations.iterator } multi method pick(ParaSeq:D: $what) { - self!pass-the-chain: self.List.pick($what).iterator + self!re-source: self.List.pick($what).iterator } multi method produce(ParaSeq:D: Callable:D $producer) { - self!pass-the-chain: self.Seq.produce($producer).iterator + self!re-source: self.Seq.produce($producer).iterator } proto method repeated(|) {*} multi method repeated(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.repeated(|c).iterator + self!re-source: self.Seq.repeated(|c).iterator } multi method reverse(ParaSeq:D:) { - self!pass-the-chain: + self!re-source: Rakudo::Iterator.ReifiedReverse(self.IterationBuffer, Mu) } multi method roll(ParaSeq:D: $what) { - self!pass-the-chain: self.List.roll($what).iterator + self!re-source: self.List.roll($what).iterator } multi method rotate(ParaSeq:D: Int(Cool) $rotate) { $rotate - ?? self!pass-the-chain( + ?? self!re-source( Rakudo::Iterator.ReifiedRotate($rotate, self.IterationBuffer, Mu) ) !! self @@ -2020,7 +2025,7 @@ class ParaSeq does Sequence { self!batch($size, $partial.Bool) # can be faster } multi method rotor(ParaSeq:D: |c) { - self!pass-the-chain: self.List.rotor(|c).iterator # nah, too difficult + self!re-source: self.List.rotor(|c).iterator # nah, too difficult } proto method skip(|) {*} @@ -2034,37 +2039,37 @@ class ParaSeq does Sequence { } multi method skip(ParaSeq:D: Whatever) { self.stop; - self!pass-the-chain: Rakudo::Iterator.Empty + self!re-source: Rakudo::Iterator.Empty } multi method skip(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.skip(|c).iterator + self!re-source: self.Seq.skip(|c).iterator } proto method slice(|) {*} multi method slice(ParaSeq:D: |c) { - self!pass-the-chain: self.List.slice(|c).iterator + self!re-source: self.List.slice(|c).iterator } proto method snip(|) {*} multi method snip(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.snip(|c).iterator + self!re-source: self.Seq.snip(|c).iterator } proto method sort(|) {*} multi method sort(ParaSeq:D:) { - self!pass-the-chain: self.List.sort.iterator + self!re-source: self.List.sort.iterator } multi method sort(ParaSeq:D: Callable:D $how) { - self!pass-the-chain: self.List.sort($how).iterator + self!re-source: self.List.sort($how).iterator } multi method tail(ParaSeq:D: $what) { - self!pass-the-chain: self.List.tail($what).iterator + self!re-source: self.List.tail($what).iterator } proto method toggle(|) {*} multi method toggle(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.toggle(|c).iterator + self!re-source: self.Seq.toggle(|c).iterator } multi method values(ParaSeq:D:) { self } @@ -2151,8 +2156,8 @@ class ParaSeq does Sequence { $auto := ($auto // True).Bool; $stop-after := ($stop-after // Inf) == Inf ?? 0 !! $stop-after.Int; self!setup( - $batch, $auto, $degree, $stop-after, - $buffer, $iterator + BufferIterator.new($buffer, $iterator), + $batch, $auto, $degree, $stop-after ) } }