Skip to content

Commit

Permalink
Remove the :stop-after feature
Browse files Browse the repository at this point in the history
It was causing more trouble than it was worth.  .head(N) should do
the work nicely
  • Loading branch information
lizmat committed Jun 15, 2024
1 parent 7b3eeb2 commit f096bbd
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.precomp/
/ParaSeq-*
/releases/
/HN/
?
18 changes: 1 addition & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use ParaSeq;
# The 1-millionth prime number: 15485863
say (^Inf).&hyperize.grep(*.is-prime)[999999];
say (^Inf).&hyperize.grep(*.is-prime).skip(999999).head;
say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail;

# Fetching lines of files, each element containing a List of lines
my @lines = @filenames.&hyperize(1, :!auto).map(*.IO.lines.List);
Expand Down Expand Up @@ -46,9 +45,6 @@ say hyperize(^Inf).grep(*.is-prime)[999999];
# Start with 2000 element batches, and use max 10 workers
say (^Inf).&hyperize(2000, 10).grep(*.is-prime)[999999];

# Stop producing results after 1 million values
say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail;

# Always work with a batch-size of 1
my @lines = @filenames.&hyperize(1, :!auto).map(*.lines.List);
```
Expand Down Expand Up @@ -81,10 +77,6 @@ The third positional argument indicates the maximum number of worker threads tha

Flag. Defaults to `True`. If specified with a `False` value, then the batch size will **not** be altered from the size (implicitely) specified with the second positional argument.

* :stop-after(N)

Integer value. Defaults to `Inf`, indicating there is **no** maximum number of values that should be delivered. If specified, should be a positive integer value: the produced sequence is then **guaranteed** not to produce more values than the value given.

racify
------

Expand Down Expand Up @@ -552,7 +544,7 @@ Change hypering settings on invocant and returns invocant. Takes the same argume

### is-lazy

Bool. Returns whether the `ParaSeq` iterator should be considered lazy or not. It will be considered lazy if the source iterator is lazy and **no** value has been specified with `:stop-after`.
Bool. Returns whether the `ParaSeq` iterator should be considered lazy or not. It will be considered lazy if the source iterator is lazy.

### processed

Expand All @@ -566,12 +558,6 @@ Int. The number of items produced, as obtained from the `stats`.

A `List` of `ParaStats` objects that were produced, in the order that they were produced. Note that due to the asynchronous nature of stats production and processing, this may be incomplete at any given time.

### stop-after

Int or False. Returns `False` if there is **no limit** on the number of values that can be delivered. Otherwise returns the maximum number of values that will be delivered.

Can also be called as a mutator if an `Int` value is specified (or `False` to inhibit any resul constraint). When used as a mutator, Returns the invocant for easier chaining.

### stopped

Bool. `True` if the processing of this `ParaSeq` has been halted. Otherwise `False`
Expand Down Expand Up @@ -702,8 +688,6 @@ If step 1 and 2 didn't result in a premature return, then a `ParaSeq` object is

* the "auto" flag

* any "stop after" value

* the $*SCHEDULER value

Step 4: an interface method is called
Expand Down
25 changes: 1 addition & 24 deletions doc/ParaSeq.rakudoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use ParaSeq;
# The 1-millionth prime number: 15485863
say (^Inf).&hyperize.grep(*.is-prime)[999999];
say (^Inf).&hyperize.grep(*.is-prime).skip(999999).head;
say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail;

# Fetching lines of files, each element containing a List of lines
my @lines = @filenames.&hyperize(1, :!auto).map(*.IO.lines.List);
Expand Down Expand Up @@ -51,9 +50,6 @@ say hyperize(^Inf).grep(*.is-prime)[999999];
# Start with 2000 element batches, and use max 10 workers
say (^Inf).&hyperize(2000, 10).grep(*.is-prime)[999999];

# Stop producing results after 1 million values
say (^Inf).&hyperize(stop-after => 1_000_000).grep(*.is-prime).tail;

# Always work with a batch-size of 1
my @lines = @filenames.&hyperize(1, :!auto).map(*.lines.List);

Expand Down Expand Up @@ -100,13 +96,6 @@ Flag. Defaults to C<True>. If specified with a C<False> value, then the
batch size will B<not> be altered from the size (implicitely) specified
with the second positional argument.

=item :stop-after(N)

Integer value. Defaults to C<Inf>, indicating there is B<no> maximum number
of values that should be delivered. If specified, should be a positive
integer value: the produced sequence is then B<guaranteed> not to produce
more values than the value given.

=head2 racify

=begin code :lang<raku>
Expand Down Expand Up @@ -751,8 +740,7 @@ same arguments as C<&hyperize>.
=head3 is-lazy

Bool. Returns whether the C<ParaSeq> iterator should be considered lazy
or not. It will be considered lazy if the source iterator is lazy and
B<no> value has been specified with C<:stop-after>.
or not. It will be considered lazy if the source iterator is lazy.

=head3 processed

Expand All @@ -768,16 +756,6 @@ A C<List> of C<ParaStats> objects that were produced, in the order that
they were produced. Note that due to the asynchronous nature of stats
production and processing, this may be incomplete at any given time.

=head3 stop-after

Int or False. Returns C<False> if there is B<no limit> on the number of
values that can be delivered. Otherwise returns the maximum number of
values that will be delivered.

Can also be called as a mutator if an C<Int> value is specified (or
C<False> to inhibit any resul constraint). When used as a mutator,
Returns the invocant for easier chaining.

=head3 stopped

Bool. C<True> if the processing of this C<ParaSeq> has been halted.
Expand Down Expand Up @@ -942,7 +920,6 @@ C<IterationBuffer> and source iterator, it also contains:
=item the initial batch size
=item the degree (max number of worker threads)
=item the "auto" flag
=item any "stop after" value
=item the $*SCHEDULER value

=head2 Step 4: an interface method is called
Expand Down
80 changes: 19 additions & 61 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ my class ParaIterator does Iterator {
has uint $!processed; # number of items processed so far
has uint $!produced; # number of items produced so far
has uint $!nsecs; # nano seconds wallclock used so far
has uint $!todo; # number of items left to deliver
has $!lastsema; # the last semaphore that should be processed
has atomicint $!stop; # stop all processing if 1

Expand All @@ -438,7 +437,6 @@ my class ParaIterator does Iterator {
\pressure,
uint $batch,
uint $auto,
uint $todo,
) {
my $self := nqp::create(self);

Expand All @@ -451,7 +449,6 @@ my class ParaIterator does Iterator {

nqp::bindattr_i($self,ParaIterator,'$!batch', $batch );
nqp::bindattr_i($self,ParaIterator,'$!auto', $auto );
nqp::bindattr_i($self,ParaIterator,'$!todo', $todo );
$self
}

Expand All @@ -464,36 +461,15 @@ my class ParaIterator does Iterator {
# Stop delivering after this buffer
if nqp::eqaddr($next,$!lastsema) {
self.stop; # shut everything down
return $buffer;
}

# Complete shutdown requested
if nqp::atomicload_i($!stop) {
return emptyIB;
}

# Need to monitor number of results to deliver
elsif $!todo {
my int $todo = nqp::sub_i($!todo,nqp::elems($buffer));
if $todo <= 0 {
# Stop all processing, we have enough
self.stop;

# Get rid of all values we don't need
return nqp::splice(
$buffer,
emptyIB,
nqp::add_i(nqp::elems($buffer),$todo),
nqp::neg_i($todo)
);
}
else {
$!todo = $todo;
}
elsif nqp::atomicload_i($!stop) {
nqp::setelems($buffer,0);
}

# Nothing else ready to be delivered
if nqp::isnull(nqp::atpos($!waiting,0)) {
elsif nqp::isnull(nqp::atpos($!waiting,0)) {

# Initiate more work using last batch value calculated
nqp::push($!pressure,$!batch);
Expand Down Expand Up @@ -691,7 +667,6 @@ class ParaSeq does Sequence {
has uint $.batch; # initial batch size, must be > 0
has uint $!auto; # 1 if batch size automatically adjusts
has uint $.degree; # number of CPUs, must be > 1
has uint $!stop-after; # stop after these number of values
has atomicint $!stop; # stop all processing if 1
has atomicint $.discarded; # produced values discarded because of stop

Expand Down Expand Up @@ -720,7 +695,6 @@ class ParaSeq does Sequence {
uint $batch,
uint $auto,
uint $degree,
uint $stop-after,
) is hidden-from-backtrace {
my $self := nqp::create(self);
nqp::bindattr($self, ParaSeq, '$!source', $source );
Expand All @@ -731,7 +705,6 @@ class ParaSeq does Sequence {
nqp::bindattr_i($self, ParaSeq, '$!batch', $batch );
nqp::bindattr_i($self, ParaSeq, '$!auto', $auto );
nqp::bindattr_i($self, ParaSeq, '$!degree', $degree );
nqp::bindattr_i($self, ParaSeq, '$!stop-after', $stop-after);
$self
}

Expand Down Expand Up @@ -782,9 +755,7 @@ class ParaSeq does Sequence {
}

# Set up the result iterator
$!result := ParaIterator.new(
self, $pressure, $!batch, $!auto, $!stop-after
);
$!result := ParaIterator.new(self, $pressure, $!batch, $!auto);

# Batcher logic for fast batching, where no special phaser handling
# is required
Expand Down Expand Up @@ -998,9 +969,8 @@ class ParaSeq does Sequence {
method default-batch() { $default-batch }
method default-degree() { $default-degree }

multi method is-lazy(ParaSeq:D:) {
nqp::hllbool($!source.is-lazy && nqp::not_i($!stop-after))
}
multi method is-lazy(ParaSeq:D:) { $!source.is-lazy }

method is-deterministic(ParaSeq:D:) {
nqp::hllbool($!source.is-deterministic)
}
Expand All @@ -1015,13 +985,6 @@ class ParaSeq does Sequence {
method processed(ParaSeq:D:) { self.stats.map(*.processed).sum }
method produced(ParaSeq:D:) { self.stats.map(*.produced ).sum }

proto method stop-after(|) {*}
multi method stop-after(ParaSeq:D:) { $!stop-after || False }
multi method stop-after(ParaSeq:D: Int() $stop-after) {
$!stop-after = $stop-after > 0 ?? $stop-after !! 0;
self
}

method stopped(ParaSeq:D:) { nqp::hllbool(nqp::atomicload_i($!stop)) }

method threads(ParaSeq:D:) {
Expand Down Expand Up @@ -2154,19 +2117,16 @@ class ParaSeq does Sequence {
proto method hyper(|) {*}

# If the degree is 1, then just return the iterable, nothing to parallelize
multi method hyper(ParaSeq:
\iterable, $, $, 1, $
) is implementation-detail {
multi method hyper(ParaSeq: \iterable, $, $, 1) is implementation-detail {
iterable
}

# Entry point from the subs
multi method hyper(ParaSeq:U:
\iterable,
$batch is copy,
$auto is copy,
$degree is copy,
$stop-after is copy
$batch is copy,
$auto is copy,
$degree is copy,
) is implementation-detail {
$batch := ($batch // $default-batch).Int;
X::Invalid::Value.new(
Expand All @@ -2190,38 +2150,36 @@ class ParaSeq does Sequence {
# Need to actually parallelize, set up ParaSeq object
else {
$auto := ($auto // True).Bool;
$stop-after := ($stop-after // Inf) == Inf ?? 0 !! $stop-after.Int;
self!setup(
BufferIterator.new($buffer, $iterator),
$batch, $auto, $degree, $stop-after
$batch, $auto, $degree
)
}
}

# Change hypering settings along the way
multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto, :$stop-after) {
multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto) {
$degree && $degree == 1
?? self.serial # re-hypering with degree == 1 -> serialize
!! ParaSeq.hyper( # restart, taking over defaults
self,
$batch // $!batch,
$auto // $!auto,
$degree // $!degree,
$stop-after // $!stop-after
$batch // $!batch,
$auto // $!auto,
$degree // $!degree
)
}
}

#- actual interface ------------------------------------------------------------

proto sub hyperize(|) is export {*}
multi sub hyperize(\iterable, $batch?, $degree?, :$auto, :$stop-after) {
ParaSeq.hyper(iterable, $batch, $auto, $degree, $stop-after)
multi sub hyperize(\iterable, $batch?, $degree?, :$auto) {
ParaSeq.hyper(iterable, $batch, $auto, $degree)
}
multi sub hyperize(List:D $list, $size?, $degree?, :$auto, :$stop-after) {
multi sub hyperize(List:D $list, $size?, $degree?, :$auto) {
my uint $batch = $size // $default-batch;
$list.is-lazy || $list.elems > $batch
?? ParaSeq.hyper($list, $batch, $auto, $degree, $stop-after)
?? ParaSeq.hyper($list, $batch, $auto, $degree)
!! $list
}

Expand Down
24 changes: 13 additions & 11 deletions xt/01-chaining.rakutest
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use paths;
use path-utils;
use ParaSeq;

plan 9;
plan 10;

my uint $stop-after = 1000;
my uint $head = 1000;

sub is-Para($seq, str $tag, :$stop-after = False) is test-assertion {
sub is-Para($seq, str $tag) is test-assertion {
subtest "$tag: check ParaSeq" => {
plan 4;
plan 3;
isa-ok $seq, ParaSeq;
is $seq.stats.elems, 0, "$tag: nothing happened yet";
is-deeply $seq.auto, True, 'auto is set';
is-deeply $seq.stop-after, $stop-after, 'stop-afer is NOT set';
}
}

Expand All @@ -35,16 +34,19 @@ is-Para($seq5, 'lines');
my $seq6 = $seq5.pairs;
is-Para($seq6, 'pairs');

my $seq7 = $seq6.stop-after($stop-after).grep(*.value.contains("expandtab"));
is-Para($seq7, 'grep(expandtab)', :$stop-after);
my $seq7 = $seq6.grep(*.value.contains("expandtab"));
is-Para($seq7, 'grep(expandtab)');

my @result := $seq7.List;
my $seq8 = $seq7.head($head);
is-Para($seq8, "head($head)");

my @result := $seq8.List;
isa-ok @result.are, Pair, 'All pairs';
is @result.elems, $stop-after, 'Did we get all the expected results';
is @result.elems, $head, 'Did we get all the expected results';

#for $seq1, $seq2, $seq3, $seq4, $seq5, $seq6, $seq7 -> $seq is raw {
#for $seq1, $seq2, $seq3, $seq4, $seq5, $seq6, $seq7, $seq8 -> $seq is raw {
# say $seq.VAR.name;
# say "processed: $seq.processed(), produced: $seq.produced()";
# say "processed: $seq.processed(), produced: $seq.produced(), discarded: $seq.discarded()";
# .say for $seq.stats.head(20);
#}

Expand Down
Loading

0 comments on commit f096bbd

Please sign in to comment.