diff --git a/.gitignore b/.gitignore index daa7bc6..515e817 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .precomp/ /ParaSeq-* /releases/ +/HN/ ? diff --git a/README.md b/README.md index f8f2807..2e21c4f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,6 @@ use ParaSeq; # The 1-millionth prime number: 15485863 say (^Inf).&hyperize.grep(*.is-prime)[999999]; say (^Inf).&hyperize.grep(*.is-prime).skip(999999).head; -say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail; # Fetching lines of files, each element containing a List of lines my @lines = @filenames.&hyperize(1, :!auto).map(*.IO.lines.List); @@ -46,9 +45,6 @@ say hyperize(^Inf).grep(*.is-prime)[999999]; # Start with 2000 element batches, and use max 10 workers say (^Inf).&hyperize(2000, 10).grep(*.is-prime)[999999]; -# Stop producing results after 1 million values -say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail; - # Always work with a batch-size of 1 my @lines = @filenames.&hyperize(1, :!auto).map(*.lines.List); ``` @@ -81,10 +77,6 @@ The third positional argument indicates the maximum number of worker threads tha Flag. Defaults to `True`. If specified with a `False` value, then the batch size will **not** be altered from the size (implicitely) specified with the second positional argument. - * :stop-after(N) - -Integer value. Defaults to `Inf`, indicating there is **no** maximum number of values that should be delivered. If specified, should be a positive integer value: the produced sequence is then **guaranteed** not to produce more values than the value given. - racify ------ @@ -552,7 +544,7 @@ Change hypering settings on invocant and returns invocant. Takes the same argume ### is-lazy -Bool. Returns whether the `ParaSeq` iterator should be considered lazy or not. It will be considered lazy if the source iterator is lazy and **no** value has been specified with `:stop-after`. +Bool. Returns whether the `ParaSeq` iterator should be considered lazy or not. It will be considered lazy if the source iterator is lazy. ### processed @@ -566,12 +558,6 @@ Int. The number of items produced, as obtained from the `stats`. A `List` of `ParaStats` objects that were produced, in the order that they were produced. Note that due to the asynchronous nature of stats production and processing, this may be incomplete at any given time. -### stop-after - -Int or False. Returns `False` if there is **no limit** on the number of values that can be delivered. Otherwise returns the maximum number of values that will be delivered. - -Can also be called as a mutator if an `Int` value is specified (or `False` to inhibit any resul constraint). When used as a mutator, Returns the invocant for easier chaining. - ### stopped Bool. `True` if the processing of this `ParaSeq` has been halted. Otherwise `False` @@ -702,8 +688,6 @@ If step 1 and 2 didn't result in a premature return, then a `ParaSeq` object is * the "auto" flag - * any "stop after" value - * the $*SCHEDULER value Step 4: an interface method is called diff --git a/doc/ParaSeq.rakudoc b/doc/ParaSeq.rakudoc index 44d4bec..dcd5146 100644 --- a/doc/ParaSeq.rakudoc +++ b/doc/ParaSeq.rakudoc @@ -13,7 +13,6 @@ use ParaSeq; # The 1-millionth prime number: 15485863 say (^Inf).&hyperize.grep(*.is-prime)[999999]; say (^Inf).&hyperize.grep(*.is-prime).skip(999999).head; -say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail; # Fetching lines of files, each element containing a List of lines my @lines = @filenames.&hyperize(1, :!auto).map(*.IO.lines.List); @@ -51,9 +50,6 @@ say hyperize(^Inf).grep(*.is-prime)[999999]; # Start with 2000 element batches, and use max 10 workers say (^Inf).&hyperize(2000, 10).grep(*.is-prime)[999999]; -# Stop producing results after 1 million values -say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail; - # Always work with a batch-size of 1 my @lines = @filenames.&hyperize(1, :!auto).map(*.lines.List); @@ -100,13 +96,6 @@ Flag. Defaults to C. If specified with a C value, then the batch size will B be altered from the size (implicitely) specified with the second positional argument. -=item :stop-after(N) - -Integer value. Defaults to C, indicating there is B maximum number -of values that should be delivered. If specified, should be a positive -integer value: the produced sequence is then B not to produce -more values than the value given. - =head2 racify =begin code :lang @@ -751,8 +740,7 @@ same arguments as C<&hyperize>. =head3 is-lazy Bool. Returns whether the C iterator should be considered lazy -or not. It will be considered lazy if the source iterator is lazy and -B value has been specified with C<:stop-after>. +or not. It will be considered lazy if the source iterator is lazy. =head3 processed @@ -768,16 +756,6 @@ A C of C objects that were produced, in the order that they were produced. Note that due to the asynchronous nature of stats production and processing, this may be incomplete at any given time. -=head3 stop-after - -Int or False. Returns C if there is B on the number of -values that can be delivered. Otherwise returns the maximum number of -values that will be delivered. - -Can also be called as a mutator if an C value is specified (or -C to inhibit any resul constraint). When used as a mutator, -Returns the invocant for easier chaining. - =head3 stopped Bool. C if the processing of this C has been halted. @@ -942,7 +920,6 @@ C and source iterator, it also contains: =item the initial batch size =item the degree (max number of worker threads) =item the "auto" flag -=item any "stop after" value =item the $*SCHEDULER value =head2 Step 4: an interface method is called diff --git a/lib/ParaSeq.rakumod b/lib/ParaSeq.rakumod index 4acdda3..33d0c75 100644 --- a/lib/ParaSeq.rakumod +++ b/lib/ParaSeq.rakumod @@ -429,7 +429,6 @@ my class ParaIterator does Iterator { has uint $!processed; # number of items processed so far has uint $!produced; # number of items produced so far has uint $!nsecs; # nano seconds wallclock used so far - has uint $!todo; # number of items left to deliver has $!lastsema; # the last semaphore that should be processed has atomicint $!stop; # stop all processing if 1 @@ -438,7 +437,6 @@ my class ParaIterator does Iterator { \pressure, uint $batch, uint $auto, - uint $todo, ) { my $self := nqp::create(self); @@ -451,7 +449,6 @@ my class ParaIterator does Iterator { nqp::bindattr_i($self,ParaIterator,'$!batch', $batch ); nqp::bindattr_i($self,ParaIterator,'$!auto', $auto ); - nqp::bindattr_i($self,ParaIterator,'$!todo', $todo ); $self } @@ -464,36 +461,15 @@ my class ParaIterator does Iterator { # Stop delivering after this buffer if nqp::eqaddr($next,$!lastsema) { self.stop; # shut everything down - return $buffer; } # Complete shutdown requested - if nqp::atomicload_i($!stop) { - return emptyIB; - } - - # Need to monitor number of results to deliver - elsif $!todo { - my int $todo = nqp::sub_i($!todo,nqp::elems($buffer)); - if $todo <= 0 { - # Stop all processing, we have enough - self.stop; - - # Get rid of all values we don't need - return nqp::splice( - $buffer, - emptyIB, - nqp::add_i(nqp::elems($buffer),$todo), - nqp::neg_i($todo) - ); - } - else { - $!todo = $todo; - } + elsif nqp::atomicload_i($!stop) { + nqp::setelems($buffer,0); } # Nothing else ready to be delivered - if nqp::isnull(nqp::atpos($!waiting,0)) { + elsif nqp::isnull(nqp::atpos($!waiting,0)) { # Initiate more work using last batch value calculated nqp::push($!pressure,$!batch); @@ -691,7 +667,6 @@ class ParaSeq does Sequence { has uint $.batch; # initial batch size, must be > 0 has uint $!auto; # 1 if batch size automatically adjusts has uint $.degree; # number of CPUs, must be > 1 - has uint $!stop-after; # stop after these number of values has atomicint $!stop; # stop all processing if 1 has atomicint $.discarded; # produced values discarded because of stop @@ -720,7 +695,6 @@ class ParaSeq does Sequence { uint $batch, uint $auto, uint $degree, - uint $stop-after, ) is hidden-from-backtrace { my $self := nqp::create(self); nqp::bindattr($self, ParaSeq, '$!source', $source ); @@ -731,7 +705,6 @@ class ParaSeq does Sequence { nqp::bindattr_i($self, ParaSeq, '$!batch', $batch ); nqp::bindattr_i($self, ParaSeq, '$!auto', $auto ); nqp::bindattr_i($self, ParaSeq, '$!degree', $degree ); - nqp::bindattr_i($self, ParaSeq, '$!stop-after', $stop-after); $self } @@ -782,9 +755,7 @@ class ParaSeq does Sequence { } # Set up the result iterator - $!result := ParaIterator.new( - self, $pressure, $!batch, $!auto, $!stop-after - ); + $!result := ParaIterator.new(self, $pressure, $!batch, $!auto); # Batcher logic for fast batching, where no special phaser handling # is required @@ -998,9 +969,8 @@ class ParaSeq does Sequence { method default-batch() { $default-batch } method default-degree() { $default-degree } - multi method is-lazy(ParaSeq:D:) { - nqp::hllbool($!source.is-lazy && nqp::not_i($!stop-after)) - } + multi method is-lazy(ParaSeq:D:) { $!source.is-lazy } + method is-deterministic(ParaSeq:D:) { nqp::hllbool($!source.is-deterministic) } @@ -1015,13 +985,6 @@ class ParaSeq does Sequence { method processed(ParaSeq:D:) { self.stats.map(*.processed).sum } method produced(ParaSeq:D:) { self.stats.map(*.produced ).sum } - proto method stop-after(|) {*} - multi method stop-after(ParaSeq:D:) { $!stop-after || False } - multi method stop-after(ParaSeq:D: Int() $stop-after) { - $!stop-after = $stop-after > 0 ?? $stop-after !! 0; - self - } - method stopped(ParaSeq:D:) { nqp::hllbool(nqp::atomicload_i($!stop)) } method threads(ParaSeq:D:) { @@ -2154,19 +2117,16 @@ class ParaSeq does Sequence { proto method hyper(|) {*} # If the degree is 1, then just return the iterable, nothing to parallelize - multi method hyper(ParaSeq: - \iterable, $, $, 1, $ - ) is implementation-detail { + multi method hyper(ParaSeq: \iterable, $, $, 1) is implementation-detail { iterable } # Entry point from the subs multi method hyper(ParaSeq:U: \iterable, - $batch is copy, - $auto is copy, - $degree is copy, - $stop-after is copy + $batch is copy, + $auto is copy, + $degree is copy, ) is implementation-detail { $batch := ($batch // $default-batch).Int; X::Invalid::Value.new( @@ -2190,24 +2150,22 @@ class ParaSeq does Sequence { # Need to actually parallelize, set up ParaSeq object else { $auto := ($auto // True).Bool; - $stop-after := ($stop-after // Inf) == Inf ?? 0 !! $stop-after.Int; self!setup( BufferIterator.new($buffer, $iterator), - $batch, $auto, $degree, $stop-after + $batch, $auto, $degree ) } } # Change hypering settings along the way - multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto, :$stop-after) { + multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto) { $degree && $degree == 1 ?? self.serial # re-hypering with degree == 1 -> serialize !! ParaSeq.hyper( # restart, taking over defaults self, - $batch // $!batch, - $auto // $!auto, - $degree // $!degree, - $stop-after // $!stop-after + $batch // $!batch, + $auto // $!auto, + $degree // $!degree ) } } @@ -2215,13 +2173,13 @@ class ParaSeq does Sequence { #- actual interface ------------------------------------------------------------ proto sub hyperize(|) is export {*} -multi sub hyperize(\iterable, $batch?, $degree?, :$auto, :$stop-after) { - ParaSeq.hyper(iterable, $batch, $auto, $degree, $stop-after) +multi sub hyperize(\iterable, $batch?, $degree?, :$auto) { + ParaSeq.hyper(iterable, $batch, $auto, $degree) } -multi sub hyperize(List:D $list, $size?, $degree?, :$auto, :$stop-after) { +multi sub hyperize(List:D $list, $size?, $degree?, :$auto) { my uint $batch = $size // $default-batch; $list.is-lazy || $list.elems > $batch - ?? ParaSeq.hyper($list, $batch, $auto, $degree, $stop-after) + ?? ParaSeq.hyper($list, $batch, $auto, $degree) !! $list } diff --git a/xt/01-chaining.rakutest b/xt/01-chaining.rakutest index 690564a..1fad094 100644 --- a/xt/01-chaining.rakutest +++ b/xt/01-chaining.rakutest @@ -3,17 +3,16 @@ use paths; use path-utils; use ParaSeq; -plan 9; +plan 10; -my uint $stop-after = 1000; +my uint $head = 1000; -sub is-Para($seq, str $tag, :$stop-after = False) is test-assertion { +sub is-Para($seq, str $tag) is test-assertion { subtest "$tag: check ParaSeq" => { - plan 4; + plan 3; isa-ok $seq, ParaSeq; is $seq.stats.elems, 0, "$tag: nothing happened yet"; is-deeply $seq.auto, True, 'auto is set'; - is-deeply $seq.stop-after, $stop-after, 'stop-afer is NOT set'; } } @@ -35,16 +34,19 @@ is-Para($seq5, 'lines'); my $seq6 = $seq5.pairs; is-Para($seq6, 'pairs'); -my $seq7 = $seq6.stop-after($stop-after).grep(*.value.contains("expandtab")); -is-Para($seq7, 'grep(expandtab)', :$stop-after); +my $seq7 = $seq6.grep(*.value.contains("expandtab")); +is-Para($seq7, 'grep(expandtab)'); -my @result := $seq7.List; +my $seq8 = $seq7.head($head); +is-Para($seq8, "head($head)"); + +my @result := $seq8.List; isa-ok @result.are, Pair, 'All pairs'; -is @result.elems, $stop-after, 'Did we get all the expected results'; +is @result.elems, $head, 'Did we get all the expected results'; -#for $seq1, $seq2, $seq3, $seq4, $seq5, $seq6, $seq7 -> $seq is raw { +#for $seq1, $seq2, $seq3, $seq4, $seq5, $seq6, $seq7, $seq8 -> $seq is raw { # say $seq.VAR.name; -# say "processed: $seq.processed(), produced: $seq.produced()"; +# say "processed: $seq.processed(), produced: $seq.produced(), discarded: $seq.discarded()"; # .say for $seq.stats.head(20); #} diff --git a/xt/02-HN.rakutest b/xt/02-HN.rakutest new file mode 100644 index 0000000..a0cf472 --- /dev/null +++ b/xt/02-HN.rakutest @@ -0,0 +1,38 @@ +use Test; +use ParaSeq; + +plan 5; + +my $io := $*PROGRAM.parent(2).add("HN/sixteenth.txt"); +sub hypered() { $io.lines.&hyperize } + +sub is-Para($seq, str $tag) is test-assertion { + subtest "$tag: check ParaSeq" => { + plan 3; + isa-ok $seq, ParaSeq; + is $seq.stats.elems, 0, "$tag: nothing happened yet"; + is-deeply $seq.auto, True, 'auto is set'; + } +} + +my $seq1 = $io.lines.&hyperize.pairs; +is-Para($seq1, 'pairs'); + +my $seq2 = $seq1.grep(*.value.contains("Chinese")); +is-Para($seq2, 'grep'); + +my $seq3 = $seq2.head(100); +is-Para($seq3, 'grep'); + +my @result := $seq3.List; +is @result.elems, 100, 'did we get all the results'; + +ok $seq1.produced < @result.tail.key * 1.05, 'Not more than 5% overshoot'; + +#for $seq1, $seq2, $seq3 -> $seq is raw { +# say $seq.VAR.name; +# say "processed: $seq.processed(), produced: $seq.produced(), discarded: $seq.discarded()"; +# .say for $seq.stats.tail(20); +#} + +# vim: expandtab shiftwidth=4