diff --git a/Changes b/Changes index 0929279..4013591 100644 --- a/Changes +++ b/Changes @@ -8,6 +8,12 @@ Revision history for ParaSeq - Added ParaIterator.sink-all for those hypering cases where only the side-effects are interesting - Added ParaSeq.sink to keep proper sinking semantics + - Added logic to catch and keep any exceptions occurring during + parallel execution, and display them at END time. + - Added :catch named argument to &hyperize/&racify/.hyper to indicate + whether any exceptions should be caught. Defaults to True + - Added .exceptions method that will return a Bag of exceptions that + were caught so far - Removed some dead code 0.2.3 2024-06-20T19:44:33+02:00 diff --git a/README.md b/README.md index 65e935e..7439f07 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,10 @@ The third positional argument indicates the maximum number of worker threads tha Flag. Defaults to `True`. If specified with a `False` value, then the batch size will **not** be altered from the size (implicitely) specified with the second positional argument. + * :catch / :!catch + +Flag. Defaults to `True`. If specified with a `False` value, then any exception that is thrown by any of the parallel worker threads, will be re-thrown immediately, rather than being presented on STDERR at the end of the process. + racify ------ @@ -518,7 +522,7 @@ ParaSeq The class of which an instance is returned by `hyperize` and `racify` if the requirements for parallelizing have been met. -Can be used in any situation where a `Seq` could also be used. The following additional methods can also be called for introspection and debugginng. In alphabetical order: +Can be used in any situation where a `Seq` could also be used. The following additional methods can also be called for introspection and debugging. In alphabetical order: ### auto @@ -528,6 +532,10 @@ Bool. Returns whether batch sizes will be automatically optimized to provide the Range. The smallest and largest batch size as a `Range`. +### catch + +Bool. Returns whether exceptions in parallel worker threads will be caught. + ### default-batch ```raku @@ -550,6 +558,12 @@ say ParaSeq.defaultdegreebatch; # 3 Int. The default maximum number of worker threads to be used. Currently set to the number of available CPUs minus one. Can also be used to change the default maximum number of worker threads by assigning to it. +### exceptions + +Bag. Returns a `Bag` of exceptions that were caught in parallel processing **so far**, keyed to the `.gist` of the execution error (and the value being the number of times they occurred). + +By default, any caught exceptions will be printed on STDERR at the end of the process. Calling this method effectively clears the caught exceptions, so that it can also be used to inhibit showing any execution errors. + ### hyper Change hypering settings on invocant and returns invocant. Takes the same arguments as `&hyperize`. Additionally takes a `:racing` named argument, to indicate `racing` or `hypering` semantics. diff --git a/doc/ParaSeq.rakudoc b/doc/ParaSeq.rakudoc index d2e46b0..6d361cb 100644 --- a/doc/ParaSeq.rakudoc +++ b/doc/ParaSeq.rakudoc @@ -96,6 +96,13 @@ Flag. Defaults to C. If specified with a C value, then the batch size will B be altered from the size (implicitely) specified with the second positional argument. +=item :catch / :!catch + +Flag. Defaults to C. If specified with a C value, then any +exception that is thrown by any of the parallel worker threads, will be +re-thrown immediately, rather than being presented on STDERR at the end +of the process. + =head2 racify =begin code :lang @@ -707,7 +714,7 @@ if the requirements for parallelizing have been met. Can be used in any situation where a C could also be used. The following additional methods can also be called for introspection and -debugginng. In alphabetical order: +debugging. In alphabetical order: =head3 auto @@ -718,6 +725,11 @@ provide the best throughput. Range. The smallest and largest batch size as a C. +=head3 catch + +Bool. Returns whether exceptions in parallel worker threads will be +caught. + =head3 default-batch =begin code :lang @@ -747,6 +759,16 @@ Int. The default maximum number of worker threads to be used. Currently set to the number of available CPUs minus one. Can also be used to change the default maximum number of worker threads by assigning to it. +=head3 exceptions + +Bag. Returns a C of exceptions that were caught in parallel processing +B, keyed to the C<.gist> of the execution error (and the value being +the number of times they occurred). + +By default, any caught exceptions will be printed on STDERR at the end of +the process. Calling this method effectively clears the caught exceptions, +so that it can also be used to inhibit showing any execution errors. + =head3 hyper Change hypering settings on invocant and returns invocant. Takes the diff --git a/lib/ParaSeq.rakumod b/lib/ParaSeq.rakumod index 385a155..9d4f14d 100644 --- a/lib/ParaSeq.rakumod +++ b/lib/ParaSeq.rakumod @@ -33,13 +33,39 @@ my constant IE = IterationEnd; my constant emptyIB = nqp::create(IB); my constant emptyList = emptyIB.List; +# A separator line +my constant separator = "-" x 80; + #- exception handling ---------------------------------------------------------- my $exceptions := nqp::create(ParaQueue); +# Create a Bag of the exceptions so far +my sub bag-the-exceptions() { + my $buffer := nqp::create(IB); + nqp::while( + nqp::elems($exceptions), + nqp::push($buffer, nqp::shift($exceptions)) + ); + $buffer.List.map(*.gist.chomp).Bag +} + +# Show the exceptions that weren't bagged before END { - if nqp::elems($exceptions) -> uint $elems { - say "Caught $elems exceptions:"; + if nqp::elems($exceptions) -> uint $total { + my $bagged := bag-the-exceptions; + my uint $elems = $bagged.elems; + my str $s = $elems == 1 ?? "" !! "s"; + + my $ERR := $*ERR; + $ERR.say: + "Caught $elems unique exception$s (out of $total) in hypered code:"; + for $bagged.sort(-*.value) { + $ERR.say: separator; + $ERR.print(.value ~ "x: ") if .value > 1; + $ERR.say: .key; + } + $ERR.say: separator; } } @@ -716,6 +742,7 @@ class ParaSeq is Seq { has uint $!racing; # 1 = racing, 0 = hypering has uint $!batch; # initial batch size, must be > 0 has uint $!auto; # 1 if batch size automatically adjusts + has uint $!catch; # 1 if exceptions should be caught has uint $!degree; # number of CPUs, must be > 1 #- "last" handling ------------------------------------------------------------- @@ -738,7 +765,12 @@ class ParaSeq is Seq { #- private helper methods ------------------------------------------------------ method !cue(&callable) { - $!SCHEDULER.cue: &callable; + $!catch + ?? $!SCHEDULER.cue: &callable, :catch({ + nqp::push($exceptions,$_); + .resume + }) + !! $!SCHEDULER.cue: &callable } # Set up object and return it @@ -747,6 +779,7 @@ class ParaSeq is Seq { uint $racing, uint $batch, uint $auto, + uint $catch, uint $degree, ) is hidden-from-backtrace { my $self := nqp::create(self); @@ -758,6 +791,7 @@ class ParaSeq is Seq { nqp::bindattr_i($self, ParaSeq, '$!racing', $racing); nqp::bindattr_i($self, ParaSeq, '$!batch', $batch ); nqp::bindattr_i($self, ParaSeq, '$!auto', $auto ); + nqp::bindattr_i($self, ParaSeq, '$!catch', $catch ); nqp::bindattr_i($self, ParaSeq, '$!degree', $degree); $self } @@ -1007,9 +1041,13 @@ class ParaSeq is Seq { self.stats.map(*.processed).minmax } + method catch(ParaSeq:D:) { nqp::hllbool($!catch) } + method default-batch() is raw { $default-batch } method default-degree() is raw { $default-degree } + method exceptions() { bag-the-exceptions } + multi method is-lazy(ParaSeq:D:) { $!source.is-lazy } method is-deterministic(ParaSeq:D:) { @@ -2127,7 +2165,7 @@ class ParaSeq is Seq { proto method hyper(|) {*} # If the degree is 1, then just return the iterable, nothing to parallelize - multi method hyper(ParaSeq: \iterable, $,$,$, 1) is implementation-detail { + multi method hyper(ParaSeq: \iterable,$,$,$,$, 1) is implementation-detail { iterable } @@ -2137,6 +2175,7 @@ class ParaSeq is Seq { $racing, $batch is copy, $auto is copy, + $catch is copy, $degree is copy, ) is implementation-detail { $batch := ($batch // $default-batch).Int; @@ -2160,16 +2199,19 @@ class ParaSeq is Seq { # Need to actually parallelize, set up ParaSeq object else { - $auto := ($auto // True).Bool; + $auto := ($auto // True).Bool; + $catch := ($catch // True).Bool; self!setup( BufferIterator.new($buffer, $iterator), - $racing, $batch, $auto, $degree + $racing, $batch, $auto, $catch, $degree ) } } # Change hypering settings along the way - multi method hyper(ParaSeq:D: $batch?, $degree?, :$auto, :$racing) { + multi method hyper(ParaSeq:D: + $batch?, $degree?, :$auto, :$catch, :$racing + ) { $degree && $degree == 1 ?? self.serial # re-hypering with degree == 1 -> serialize !! ParaSeq.hyper( # restart, taking over defaults @@ -2177,6 +2219,7 @@ class ParaSeq is Seq { $racing // $!racing, $batch // $!batch, $auto // $!auto, + $catch // $!catch, $degree // $!degree ) } @@ -2185,24 +2228,24 @@ class ParaSeq is Seq { #- actual interface ------------------------------------------------------------ proto sub hyperize(|) is export {*} -multi sub hyperize(\iterable, $batch?, $degree?, :$auto) { - ParaSeq.hyper(iterable, 0, $batch, $auto, $degree) +multi sub hyperize(\iterable, $batch?, $degree?, :$auto, :$catch) { + ParaSeq.hyper(iterable, 0, $batch, $auto, $catch, $degree) } -multi sub hyperize(List:D $list, $size?, $degree?, :$auto) { +multi sub hyperize(List:D $list, $size?, $degree?, :$auto, :$catch) { my uint $batch = $size // $default-batch; $list.is-lazy || $list.elems > $batch - ?? ParaSeq.hyper($list, 0, $batch, $auto, $degree) + ?? ParaSeq.hyper($list, 0, $batch, $auto, $catch, $degree) !! $list } proto sub racify(|) is export {*} -multi sub racify(\iterable, $batch?, $degree?, :$auto) { - ParaSeq.hyper(iterable, 1, $batch, $auto, $degree) +multi sub racify(\iterable, $batch?, $degree?, :$auto, :$catch) { + ParaSeq.hyper(iterable, 1, $batch, $auto, $catch, $degree) } -multi sub racify(List:D $list, $size?, $degree?, :$auto) { +multi sub racify(List:D $list, $size?, $degree?, :$auto, :$catch) { my uint $batch = $size // $default-batch; $list.is-lazy || $list.elems > $batch - ?? ParaSeq.hyper($list, 1, $batch, $auto, $degree) + ?? ParaSeq.hyper($list, 1, $batch, $auto, $catch, $degree) !! $list } diff --git a/t/01-basic.rakutest b/t/01-basic.rakutest index dea151c..84626ad 100644 --- a/t/01-basic.rakutest +++ b/t/01-basic.rakutest @@ -1,10 +1,7 @@ use Test; use ParaSeq; -plan 24; - -ok &hyperize, 'was hyperize exported'; -ok &racify, 'was racify exported'; +plan 30; my constant $elems = 200000; my constant $batch = 16; @@ -15,6 +12,9 @@ my constant %mapped = @list.map: $mapper; isa-ok @list.&hyperize, ParaSeq, 'did hyperize produce a ParaSeq'; isa-ok @list.&racify, ParaSeq, 'did racify produce a ParaSeq'; +ok &hyperize, 'was hyperize exported'; +ok &racify, 'was racify exported'; + for 1, ParaSeq.default-degree { is-deeply @list.&hyperize($batch, $_).Array, [@list], "did .Array produce ok with degree $_"; @@ -37,6 +37,17 @@ for 1, ParaSeq.default-degree { "did .map(mapper) produce ok with degree $_"; } +my $s := (^20).&hyperize.grep({ die "Bazinga" if .is-prime }); +is-deeply $s.auto, True, 'is :auto reported ok'; +is-deeply $s.catch, True, 'is :catch reported ok'; +is-deeply $s.List, (), 'no values where produced'; + +my $b := $s.exceptions; +is $b.elems, 1, 'did we get 1 type of exception'; + +ok $b.keys.head.starts-with("Bazinga"), 'did we get the right error message'; +is $b.values.head, 8, 'did we get the right amount'; + is ParaSeq.default-batch, 16, "is the default batch size correct"; ParaSeq.default-batch = 1024; is ParaSeq.default-batch, 1024, "is the updated default batch size correct";