diff --git a/lib/ParaSeq.rakumod b/lib/ParaSeq.rakumod index df8bc84..86ee46d 100644 --- a/lib/ParaSeq.rakumod +++ b/lib/ParaSeq.rakumod @@ -172,32 +172,29 @@ my class ParaQueue is repr('ConcBlockingQueue') { } # of the values from the buffer, and then from the iterator my class BufferIterator does Iterator { - has $!parent # ParaSeq parent object - handles ; - has $!buffer; # first buffer to produce from - has $!iterator; # iterator to produce from onwards + has $!buffer; # first buffer to produce from + has $.source; # iterator to produce from onwards - method new(\parent, \buffer, \iterator) { + method new(\buffer, \source) { my $self := nqp::create(self); - nqp::bindattr($self,BufferIterator,'$!parent', parent ); - nqp::bindattr($self,BufferIterator,'$!buffer', buffer ); - nqp::bindattr($self,BufferIterator,'$!iterator',iterator); + nqp::bindattr($self,BufferIterator,'$!buffer',buffer); + nqp::bindattr($self,BufferIterator,'$!source',source); $self } - method pull-one() { + method pull-one(BufferIterator:D:) { nqp::elems($!buffer) ?? nqp::shift($!buffer) - !! $!iterator.pull-one + !! $!source.pull-one } - method skip-at-least(uint $skipping) { + method skip-at-least(BufferIterator:D: uint $skipping) { # Need to skip more than in initial buffer my uint $elems = nqp::elems($!buffer); if $skipping > $elems { nqp::setelems($!buffer,0); - $!iterator.skip-at-least($skipping - $elems) + $!source.skip-at-least($skipping - $elems) } # Less than in inital buffer, remove so many entries from it @@ -207,13 +204,35 @@ my class BufferIterator does Iterator { } } - method push-all(\target) { + method push-all(BufferIterator:D: \target) { nqp::istype(target,IB) ?? nqp::splice(target,$!buffer,nqp::elems(target),0) !! $!buffer.iterator.push-all(target); nqp::setelems($!buffer,0); - $!iterator.push-all(target) + $!source.push-all(target) + } + + method is-lazy(BufferIterator:D:) { + $!source.is-lazy + } + method is-deterministic(BufferIterator:D:) { + $!source.is-deterministic + } + method is-monotonically-increasing(BufferIterator:D:) { + $!source.is-monotonically-increasing + } + + # If there is a buffer available, return it and reset in object, else Nil + method zap-buffer(BufferIterator:D:) { + my $buffer := $!buffer; + if nqp::elems($buffer) { + $!buffer := nqp::create(IB); + $buffer + } + else { + Nil + } } method stats(--> emptyList) { } @@ -257,7 +276,7 @@ my class SquishIterator does Iterator { } } - method pull-one() { + method pull-one(SquishIterator:D:) { nqp::elems(my $current := $!current) > 1 ?? nqp::shift($current) !! nqp::elems($current) @@ -265,7 +284,7 @@ my class SquishIterator does Iterator { !! self!next-batch } - method push-all(\target --> IterationEnd) { + method push-all(SquishIterator:D: \target --> IterationEnd) { my $source := $!source; my &as := $!as; my &with := $!with; @@ -516,7 +535,7 @@ my class ParaIterator does Iterator { $buffer } - method pull-one() { + method pull-one(ParaIterator:D:) { nqp::if( nqp::elems($!current), (my $pulled := nqp::shift($!current)), @@ -533,7 +552,7 @@ my class ParaIterator does Iterator { $pulled } - method skip-at-least(uint $skipping) { + method skip-at-least(ParaIterator:D: uint $skipping) { my $current := $!current; my $queues := $!queues; my uint $toskip = $skipping; @@ -565,7 +584,7 @@ my class ParaIterator does Iterator { proto method push-all(|) {*} # Optimized version appending to an IterationBuffer - multi method push-all(IterationBuffer:D \target) { + multi method push-all(ParaIterator:D: IterationBuffer:D \target) { my $current := $!current; my $queues := $!queues; @@ -584,7 +603,7 @@ my class ParaIterator does Iterator { # Slower generic version that needs to coerce each buffer to a List # to ensure the correct semantics with .append - multi method push-all(\target) { + multi method push-all(ParaIterator:D: \target) { my $current := $!current; my $queues := $!queues; @@ -658,7 +677,6 @@ class ParaStats { #- ParaSeq --------------------------------------------------------------------- # The class containing all of the logic for parallel sequences class ParaSeq does Sequence { - has $!buffer; # first buffer has $!source; # iterator producing source values has $!result; # iterator producing result values has $!SCHEDULER; # $*SCHEDULER to be used @@ -689,25 +707,24 @@ class ParaSeq does Sequence { #- private helper methods ------------------------------------------------------ - # Do error checking and set up object if all ok + # Set up object and return it method !setup( + $source, uint $batch, uint $auto, uint $degree, uint $stop-after, - $buffer is raw, - $source ) is hidden-from-backtrace { my $self := nqp::create(self); + nqp::bindattr($self, ParaSeq, '$!source', $source ); + nqp::bindattr($self, ParaSeq, '$!result', nqp::null ); + nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $*SCHEDULER); + nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); + nqp::bindattr_i($self, ParaSeq, '$!batch', $batch ); nqp::bindattr_i($self, ParaSeq, '$!auto', $auto ); nqp::bindattr_i($self, ParaSeq, '$!degree', $degree ); nqp::bindattr_i($self, ParaSeq, '$!stop-after', $stop-after); - - nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $*SCHEDULER); - nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); - nqp::bindattr($self, ParaSeq, '$!buffer', $buffer ); - nqp::bindattr($self, ParaSeq, '$!source', $source ); $self } @@ -725,10 +742,13 @@ class ParaSeq does Sequence { !! $batch } - # Local copy of first buffer, make sure it is granulized correctly - my $first := $!buffer; - if nqp::isconcrete($!buffer) { - $!buffer := Mu; # release buffer in object + # Get copy of first buffer if possible + my $first; + if nqp::istype($!source,BufferIterator) + && ($first := $!source.zap-buffer) { + + # We can get rid of the BufferIterator now + $!source := $!source.source; nqp::until( nqp::elems($first) %% $granularity || nqp::eqaddr((my $pulled := $!source.pull-one),IE), @@ -872,18 +892,11 @@ class ParaSeq does Sequence { self } - # Entry point in chain, from an Iterator - method !pass-the-chain(\source) { - my $self := nqp::create(self); - nqp::bindattr_i($self, ParaSeq, '$!degree', $!degree ); - nqp::bindattr_i($self, ParaSeq, '$!batch', $!batch ); - nqp::bindattr_i($self, ParaSeq, '$!auto', $!auto ); - nqp::bindattr_i($self, ParaSeq, '$!stop-after', $!stop-after); - - nqp::bindattr($self, ParaSeq, '$!SCHEDULER', $!SCHEDULER); - nqp::bindattr($self, ParaSeq, '$!snitcher', nqp::null ); - nqp::bindattr($self, ParaSeq, '$!source', source ); - $self + # Change the source of this ParaSeq (and reset the result iterator) + method !re-source($source is raw) { + $!source := $source; + $!result := nqp::null; + self } # Mark the given queue as done @@ -914,7 +927,7 @@ class ParaSeq does Sequence { nqp::elems($output), nqp::time() - $then ) - ); + ) if nqp::istype($!result,ParaIterator); # Make the produced values available to the result iterator nqp::push($semaphore,$output); @@ -941,15 +954,7 @@ class ParaSeq does Sequence { #- where all the magic happens under the hood ---------------------------------- - # Acts as a normal iterator, producing values from queues that are - # filled asynchronously. If there is no result iterator, then the - # source iterator be recreated using the initial buffer if there is - # one. Otherwise - method iterator(ParaSeq:D:) { - $!result //= nqp::istype($!buffer,IB) - ?? BufferIterator.new(self, $!buffer, $!source) - !! $!source - } + method iterator(ParaSeq:D:) { nqp::ifnull($!result,$!source) } method stop(ParaSeq:D:) { nqp::atomicstore_i($!stop,1); @@ -1689,7 +1694,7 @@ class ParaSeq does Sequence { ) ); - self!pass-the-chain: $final.iterator + self!re-source: $final.iterator } proto method max(|) {*} @@ -1811,7 +1816,7 @@ class ParaSeq does Sequence { SquishIterator.new: self!start(&processor).iterator, &as, &with } - self!pass-the-chain: $iterator + self!re-source: $iterator } #- unique ---------------------------------------------------------------------- @@ -1899,7 +1904,7 @@ class ParaSeq does Sequence { # one thread doing it, and so we've done all the checking that we # need already if nqp::elems($seens) == 1 { - self!pass-the-chain: WhichIterator.new($result) + self!re-source: WhichIterator.new($result) } # Multiple threads processed values @@ -1937,79 +1942,79 @@ class ParaSeq does Sequence { ) ); - self!pass-the-chain: nqp::elems($seen) + self!re-source: nqp::elems($seen) ?? UniqueIterator.new($seen, $result) # need further checks !! WhichIterator.new($result) # all ok } } multi method unique(ParaSeq:D: :&as = &identity) { - self!pass-the-chain: self.List.unique(:&as).iterator + self!re-source: self.List.unique(:&as).iterator } #- interfaces that are just infectious ----------------------------------------- proto method collate(|) {*} multi method collate(ParaSeq:D:) { - self!pass-the-chain: self.List.collate.iterator + self!re-source: self.List.collate.iterator } proto method combinations(|) {*} multi method combinations(ParaSeq:D:) { - self!pass-the-chain: self.List.combinations.iterator + self!re-source: self.List.combinations.iterator } multi method combinations(ParaSeq:D: $of) { - self!pass-the-chain: self.List.combinations($of).iterator + self!re-source: self.List.combinations($of).iterator } multi method flat(ParaSeq:D:) { - self!pass-the-chain: self.List.flat.iterator + self!re-source: self.List.flat.iterator } multi method head(ParaSeq:D: $what) { - self!pass-the-chain: self.Seq.head($what).iterator + self!re-source: self.Seq.head($what).iterator } multi method invert(ParaSeq:D:) { - self!pass-the-chain: self.Seq.invert.iterator + self!re-source: self.Seq.invert.iterator } multi method keys(ParaSeq:D:) { - self!pass-the-chain: self.Seq.keys.iterator + self!re-source: self.Seq.keys.iterator } multi method pairup(ParaSeq:D:) { - self!pass-the-chain: self.List.pairup.iterator + self!re-source: self.List.pairup.iterator } multi method permutations(ParaSeq:D:) { - self!pass-the-chain: self.List.permutations.iterator + self!re-source: self.List.permutations.iterator } multi method pick(ParaSeq:D: $what) { - self!pass-the-chain: self.List.pick($what).iterator + self!re-source: self.List.pick($what).iterator } multi method produce(ParaSeq:D: Callable:D $producer) { - self!pass-the-chain: self.Seq.produce($producer).iterator + self!re-source: self.Seq.produce($producer).iterator } proto method repeated(|) {*} multi method repeated(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.repeated(|c).iterator + self!re-source: self.Seq.repeated(|c).iterator } multi method reverse(ParaSeq:D:) { - self!pass-the-chain: + self!re-source: Rakudo::Iterator.ReifiedReverse(self.IterationBuffer, Mu) } multi method roll(ParaSeq:D: $what) { - self!pass-the-chain: self.List.roll($what).iterator + self!re-source: self.List.roll($what).iterator } multi method rotate(ParaSeq:D: Int(Cool) $rotate) { $rotate - ?? self!pass-the-chain( + ?? self!re-source( Rakudo::Iterator.ReifiedRotate($rotate, self.IterationBuffer, Mu) ) !! self @@ -2020,7 +2025,7 @@ class ParaSeq does Sequence { self!batch($size, $partial.Bool) # can be faster } multi method rotor(ParaSeq:D: |c) { - self!pass-the-chain: self.List.rotor(|c).iterator # nah, too difficult + self!re-source: self.List.rotor(|c).iterator # nah, too difficult } proto method skip(|) {*} @@ -2034,37 +2039,37 @@ class ParaSeq does Sequence { } multi method skip(ParaSeq:D: Whatever) { self.stop; - self!pass-the-chain: Rakudo::Iterator.Empty + self!re-source: Rakudo::Iterator.Empty } multi method skip(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.skip(|c).iterator + self!re-source: self.Seq.skip(|c).iterator } proto method slice(|) {*} multi method slice(ParaSeq:D: |c) { - self!pass-the-chain: self.List.slice(|c).iterator + self!re-source: self.List.slice(|c).iterator } proto method snip(|) {*} multi method snip(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.snip(|c).iterator + self!re-source: self.Seq.snip(|c).iterator } proto method sort(|) {*} multi method sort(ParaSeq:D:) { - self!pass-the-chain: self.List.sort.iterator + self!re-source: self.List.sort.iterator } multi method sort(ParaSeq:D: Callable:D $how) { - self!pass-the-chain: self.List.sort($how).iterator + self!re-source: self.List.sort($how).iterator } multi method tail(ParaSeq:D: $what) { - self!pass-the-chain: self.List.tail($what).iterator + self!re-source: self.List.tail($what).iterator } proto method toggle(|) {*} multi method toggle(ParaSeq:D: |c) { - self!pass-the-chain: self.Seq.toggle(|c).iterator + self!re-source: self.Seq.toggle(|c).iterator } multi method values(ParaSeq:D:) { self } @@ -2151,8 +2156,8 @@ class ParaSeq does Sequence { $auto := ($auto // True).Bool; $stop-after := ($stop-after // Inf) == Inf ?? 0 !! $stop-after.Int; self!setup( - $batch, $auto, $degree, $stop-after, - $buffer, $iterator + BufferIterator.new($buffer, $iterator), + $batch, $auto, $degree, $stop-after ) } }