Skip to content

Commit

Permalink
Add exception catching logic / reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
lizmat committed Jun 22, 2024
1 parent 7bc84f8 commit 95dc350
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 21 deletions.
6 changes: 6 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ Revision history for ParaSeq
- Added ParaIterator.sink-all for those hypering cases where only
the side-effects are interesting
- Added ParaSeq.sink to keep proper sinking semantics
- Added logic to catch and keep any exceptions occurring during
parallel execution, and display them at END time.
- Added :catch named argument to &hyperize/&racify/.hyper to indicate
whether any exceptions should be caught. Defaults to True
- Added .exceptions method that will return a Bag of exceptions that
were caught so far
- Removed some dead code

0.2.3 2024-06-20T19:44:33+02:00
Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ The third positional argument indicates the maximum number of worker threads tha

Flag. Defaults to `True`. If specified with a `False` value, then the batch size will **not** be altered from the size (implicitely) specified with the second positional argument.

* :catch / :!catch

Flag. Defaults to `True`. If specified with a `False` value, then any exception that is thrown by any of the parallel worker threads, will be re-thrown immediately, rather than being presented on STDERR at the end of the process.

racify
------

Expand Down Expand Up @@ -518,7 +522,7 @@ ParaSeq

The class of which an instance is returned by `hyperize` and `racify` if the requirements for parallelizing have been met.

Can be used in any situation where a `Seq` could also be used. The following additional methods can also be called for introspection and debugginng. In alphabetical order:
Can be used in any situation where a `Seq` could also be used. The following additional methods can also be called for introspection and debugging. In alphabetical order:

### auto

Expand All @@ -528,6 +532,10 @@ Bool. Returns whether batch sizes will be automatically optimized to provide the

Range. The smallest and largest batch size as a `Range`.

### catch

Bool. Returns whether exceptions in parallel worker threads will be caught.

### default-batch

```raku
Expand All @@ -550,6 +558,12 @@ say ParaSeq.defaultdegreebatch; # 3

Int. The default maximum number of worker threads to be used. Currently set to the number of available CPUs minus one. Can also be used to change the default maximum number of worker threads by assigning to it.

### exceptions

Bag. Returns a `Bag` of exceptions that were caught in parallel processing **so far**, keyed to the `.gist` of the execution error (and the value being the number of times they occurred).

By default, any caught exceptions will be printed on STDERR at the end of the process. Calling this method effectively clears the caught exceptions, so that it can also be used to inhibit showing any execution errors.

### hyper

Change hypering settings on invocant and returns invocant. Takes the same arguments as `&hyperize`. Additionally takes a `:racing` named argument, to indicate `racing` or `hypering` semantics.
Expand Down
24 changes: 23 additions & 1 deletion doc/ParaSeq.rakudoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ Flag. Defaults to C<True>. If specified with a C<False> value, then the
batch size will B<not> be altered from the size (implicitely) specified
with the second positional argument.

=item :catch / :!catch

Flag. Defaults to C<True>. If specified with a C<False> value, then any
exception that is thrown by any of the parallel worker threads, will be
re-thrown immediately, rather than being presented on STDERR at the end
of the process.

=head2 racify

=begin code :lang<raku>
Expand Down Expand Up @@ -707,7 +714,7 @@ if the requirements for parallelizing have been met.

Can be used in any situation where a C<Seq> could also be used. The
following additional methods can also be called for introspection and
debugginng. In alphabetical order:
debugging. In alphabetical order:

=head3 auto

Expand All @@ -718,6 +725,11 @@ provide the best throughput.

Range. The smallest and largest batch size as a C<Range>.

=head3 catch

Bool. Returns whether exceptions in parallel worker threads will be
caught.

=head3 default-batch

=begin code :lang<raku>
Expand Down Expand Up @@ -747,6 +759,16 @@ Int. The default maximum number of worker threads to be used. Currently
set to the number of available CPUs minus one. Can also be used to change
the default maximum number of worker threads by assigning to it.

=head3 exceptions

Bag. Returns a C<Bag> of exceptions that were caught in parallel processing
B<so far>, keyed to the C<.gist> of the execution error (and the value being
the number of times they occurred).

By default, any caught exceptions will be printed on STDERR at the end of
the process. Calling this method effectively clears the caught exceptions,
so that it can also be used to inhibit showing any execution errors.

=head3 hyper

Change hypering settings on invocant and returns invocant. Takes the
Expand Down
73 changes: 58 additions & 15 deletions lib/ParaSeq.rakumod
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,39 @@ my constant IE = IterationEnd;
my constant emptyIB = nqp::create(IB);
my constant emptyList = emptyIB.List;

# A separator line
my constant separator = "-" x 80;

#- exception handling ----------------------------------------------------------

my $exceptions := nqp::create(ParaQueue);

# Create a Bag of the exceptions so far
my sub bag-the-exceptions() {
my $buffer := nqp::create(IB);
nqp::while(
nqp::elems($exceptions),
nqp::push($buffer, nqp::shift($exceptions))
);
$buffer.List.map(*.gist.chomp).Bag
}

# Show the exceptions that weren't bagged before
END {
if nqp::elems($exceptions) -> uint $elems {
say "Caught $elems exceptions:";
if nqp::elems($exceptions) -> uint $total {
my $bagged := bag-the-exceptions;
my uint $elems = $bagged.elems;
my str $s = $elems == 1 ?? "" !! "s";

my $ERR := $*ERR;
$ERR.say:
"Caught $elems unique exception$s (out of $total) in hypered code:";
for $bagged.sort(-*.value) {
$ERR.say: separator;
$ERR.print(.value ~ "x: ") if .value > 1;
$ERR.say: .key;
}
$ERR.say: separator;
}
}

Expand Down Expand Up @@ -716,6 +742,7 @@ class ParaSeq is Seq {
has uint $!racing; # 1 = racing, 0 = hypering
has uint $!batch; # initial batch size, must be > 0
has uint $!auto; # 1 if batch size automatically adjusts
has uint $!catch; # 1 if exceptions should be caught
has uint $!degree; # number of CPUs, must be > 1

#- "last" handling -------------------------------------------------------------
Expand All @@ -738,7 +765,12 @@ class ParaSeq is Seq {
#- private helper methods ------------------------------------------------------

method !cue(&callable) {
$!SCHEDULER.cue: &callable;
$!catch
?? $!SCHEDULER.cue: &callable, :catch({
nqp::push($exceptions,$_);
.resume
})
!! $!SCHEDULER.cue: &callable
}

# Set up object and return it
Expand All @@ -747,6 +779,7 @@ class ParaSeq is Seq {
uint $racing,
uint $batch,
uint $auto,
uint $catch,
uint $degree,
) is hidden-from-backtrace {
my $self := nqp::create(self);
Expand All @@ -758,6 +791,7 @@ class ParaSeq is Seq {
nqp::bindattr_i($self, ParaSeq, '$!racing', $racing);
nqp::bindattr_i($self, ParaSeq, '$!batch', $batch );
nqp::bindattr_i($self, ParaSeq, '$!auto', $auto );
nqp::bindattr_i($self, ParaSeq, '$!catch', $catch );
nqp::bindattr_i($self, ParaSeq, '$!degree', $degree);
$self
}
Expand Down Expand Up @@ -1007,9 +1041,13 @@ class ParaSeq is Seq {
self.stats.map(*.processed).minmax
}

method catch(ParaSeq:D:) { nqp::hllbool($!catch) }

method default-batch() is raw { $default-batch }
method default-degree() is raw { $default-degree }

method exceptions() { bag-the-exceptions }

multi method is-lazy(ParaSeq:D:) { $!source.is-lazy }

method is-deterministic(ParaSeq:D:) {
Expand Down Expand Up @@ -2127,7 +2165,7 @@ class ParaSeq is Seq {
proto method hyper(|) {*}

# If the degree is 1, then just return the iterable, nothing to parallelize
multi method hyper(ParaSeq: \iterable, $,$,$, 1) is implementation-detail {
multi method hyper(ParaSeq: \iterable,$,$,$,$, 1) is implementation-detail {
iterable
}

Expand All @@ -2137,6 +2175,7 @@ class ParaSeq is Seq {
$racing,
$batch is copy,
$auto is copy,
$catch is copy,
$degree is copy,
) is implementation-detail {
$batch := ($batch // $default-batch).Int;
Expand All @@ -2160,23 +2199,27 @@ class ParaSeq is Seq {

# Need to actually parallelize, set up ParaSeq object
else {
$auto := ($auto // True).Bool;
$auto := ($auto // True).Bool;
$catch := ($catch // True).Bool;
self!setup(
BufferIterator.new($buffer, $iterator),
$racing, $batch, $auto, $degree
$racing, $batch, $auto, $catch, $degree
)
}
}

# Change hypering settings along the way
multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto, :$racing) {
multi method hyper(ParaSeq:D:
$batch?, $degree?, :$auto, :$catch, :$racing
) {
$degree && $degree == 1
?? self.serial # re-hypering with degree == 1 -> serialize
!! ParaSeq.hyper( # restart, taking over defaults
self,
$racing // $!racing,
$batch // $!batch,
$auto // $!auto,
$catch // $!catch,
$degree // $!degree
)
}
Expand All @@ -2185,24 +2228,24 @@ class ParaSeq is Seq {
#- actual interface ------------------------------------------------------------

proto sub hyperize(|) is export {*}
multi sub hyperize(\iterable, $batch?, $degree?, :$auto) {
ParaSeq.hyper(iterable, 0, $batch, $auto, $degree)
multi sub hyperize(\iterable, $batch?, $degree?, :$auto, :$catch) {
ParaSeq.hyper(iterable, 0, $batch, $auto, $catch, $degree)
}
multi sub hyperize(List:D $list, $size?, $degree?, :$auto) {
multi sub hyperize(List:D $list, $size?, $degree?, :$auto, :$catch) {
my uint $batch = $size // $default-batch;
$list.is-lazy || $list.elems > $batch
?? ParaSeq.hyper($list, 0, $batch, $auto, $degree)
?? ParaSeq.hyper($list, 0, $batch, $auto, $catch, $degree)
!! $list
}

proto sub racify(|) is export {*}
multi sub racify(\iterable, $batch?, $degree?, :$auto) {
ParaSeq.hyper(iterable, 1, $batch, $auto, $degree)
multi sub racify(\iterable, $batch?, $degree?, :$auto, :$catch) {
ParaSeq.hyper(iterable, 1, $batch, $auto, $catch, $degree)
}
multi sub racify(List:D $list, $size?, $degree?, :$auto) {
multi sub racify(List:D $list, $size?, $degree?, :$auto, :$catch) {
my uint $batch = $size // $default-batch;
$list.is-lazy || $list.elems > $batch
?? ParaSeq.hyper($list, 1, $batch, $auto, $degree)
?? ParaSeq.hyper($list, 1, $batch, $auto, $catch, $degree)
!! $list
}

Expand Down
19 changes: 15 additions & 4 deletions t/01-basic.rakutest
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use Test;
use ParaSeq;

plan 24;

ok &hyperize, 'was hyperize exported';
ok &racify, 'was racify exported';
plan 30;

my constant $elems = 200000;
my constant $batch = 16;
Expand All @@ -15,6 +12,9 @@ my constant %mapped = @list.map: $mapper;
isa-ok @list.&hyperize, ParaSeq, 'did hyperize produce a ParaSeq';
isa-ok @list.&racify, ParaSeq, 'did racify produce a ParaSeq';

ok &hyperize, 'was hyperize exported';
ok &racify, 'was racify exported';

for 1, ParaSeq.default-degree {
is-deeply @list.&hyperize($batch, $_).Array, [@list],
"did .Array produce ok with degree $_";
Expand All @@ -37,6 +37,17 @@ for 1, ParaSeq.default-degree {
"did .map(mapper) produce ok with degree $_";
}

my $s := (^20).&hyperize.grep({ die "Bazinga" if .is-prime });
is-deeply $s.auto, True, 'is :auto reported ok';
is-deeply $s.catch, True, 'is :catch reported ok';
is-deeply $s.List, (), 'no values where produced';

my $b := $s.exceptions;
is $b.elems, 1, 'did we get 1 type of exception';

ok $b.keys.head.starts-with("Bazinga"), 'did we get the right error message';
is $b.values.head, 8, 'did we get the right amount';

is ParaSeq.default-batch, 16, "is the default batch size correct";
ParaSeq.default-batch = 1024;
is ParaSeq.default-batch, 1024, "is the updated default batch size correct";
Expand Down

0 comments on commit 95dc350

Please sign in to comment.