diff --git a/lib/App/Yath.pm b/lib/App/Yath.pm index 892e09fda..b6dd7d812 100644 --- a/lib/App/Yath.pm +++ b/lib/App/Yath.pm @@ -214,12 +214,13 @@ sub process_args { $self->{+ENV_VARS} = $env; $self->{+OPTION_STATE} = $state; - for my $module (keys %$modules) { - $settings->yath->{plugins}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Plugin'); - $settings->renderer->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Renderer'); - $settings->resource->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Resource'); - warn "FIXME renderers and resources (if applicable)"; - } + warn "FIXME renderers and resources (if applicable)"; + # This is probably not needed. + #for my $module (keys %$modules) { + # $settings->yath->{plugins}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Plugin'); + # $settings->renderer->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Renderer'); + # $settings->resource->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Resource'); + #} } sub run { diff --git a/lib/App/Yath/Command/start.pm b/lib/App/Yath/Command/start.pm index 1ecff1266..e922dc8d4 100644 --- a/lib/App/Yath/Command/start.pm +++ b/lib/App/Yath/Command/start.pm @@ -22,6 +22,13 @@ use File::Path qw/remove_tree/; use parent 'App::Yath::Command'; use Test2::Harness::Util::HashBase qw{ +log_file + + +ipc + +runner + +scheduler + +resources + +instance + +collector }; use Getopt::Yath; @@ -79,7 +86,7 @@ sub run { } } - my $collector = $self->init_collector(); + my $collector = $self->collector(); my $pid = fork // die "Could not fork: $!"; if ($pid) { @@ -95,7 +102,7 @@ sub run { else { $0 = "yath-daemon"; $collector->setup_child_output(); - return $self->start_instance(); + $self->instance->run; } return 0; @@ -106,8 +113,11 @@ sub log_file { return $self->{+LOG_FILE} //= File::Spec->catfile($self->settings->harness->workdir, 'log.jsonl'); } -sub init_collector { +sub collector { my $self = shift; + + return $self->{+COLLECTOR} if $self->{+COLLECTOR}; + my $settings = $self->settings; my $out_file = $self->log_file; @@ -122,13 +132,14 @@ sub init_collector { open(my $log, '>', $out_file) or die "Could not open '$out_file' for writing: $!"; $log->autoflush(1); - my $parser = Test2::Harness::Collector::IOParser->new(job_id => 0, job_try => 0, run_id => 0, type => 'runner'); - my $collector = Test2::Harness::Collector->new( - parser => $parser, - job_id => 0, - job_try => 0, - run_id => 0, - output => sub { + my $parser = Test2::Harness::Collector::IOParser->new(job_id => 0, job_try => 0, run_id => 0, type => 'runner'); + return $self->{+COLLECTOR} = Test2::Harness::Collector->new( + parser => $parser, + job_id => 0, + job_try => 0, + run_id => 0, + always_flush => 1, + output => sub { for my $e (@_) { print $log encode_json($e), "\n"; return unless $renderers; @@ -138,51 +149,58 @@ sub init_collector { ); } -sub start_instance { +sub instance { my $self = shift; + return $self->{+INSTANCE} if $self->{+INSTANCE}; + my $settings = $self->settings; - my $ipc = $self->build_ipc(); - my $runner = $self->build_runner(); - my $scheduler = $self->build_scheduler(runner => $runner); + my $ipc = $self->ipc(); + my $runner = $self->runner(); + my $scheduler = $self->scheduler(); + my $resources = $self->resources(); - my $instance = Test2::Harness::Instance->new( + return $self->{+INSTANCE} = Test2::Harness::Instance->new( ipc => $ipc, scheduler => $scheduler, runner => $runner, + resources => $resources, log_file => $self->log_file, ); - - $instance->run; - - return 0; } -sub build_ipc { +sub ipc { my $self = shift; + return $self->{+IPC} if $self->{+IPC}; + my $ipc_s = App::Yath::Options::IPC->vivify_ipc($self->settings); my $ipc = Test2::Harness::IPC::Protocol->new(protocol => $ipc_s->{protocol}); $ipc->start($ipc_s->{address}, $ipc_s->{port}); - return $ipc; + return $self->{+IPC} = $ipc; } -sub build_scheduler { +sub scheduler { my $self = shift; - my %params = @_; + + return $self->{+SCHEDULER} if $self->{+SCHEDULER}; + + my $runner = $self->runner; + my $resources = $self->resources; my $scheduler_s = $self->settings->scheduler; my $class = $scheduler_s->class; require(mod2file($class)); - return $class->new($scheduler_s->all, %params); + return $self->{+SCHEDULER} = $class->new($scheduler_s->all, runner => $runner, resources => $resources); } -sub build_runner { +sub runner { my $self = shift; - my %params = @_; + + return $self->{+RUNNER} if $self->{+RUNNER}; my $settings = $self->settings; my $runner_s = $settings->runner; @@ -191,7 +209,29 @@ sub build_runner { my $ts = Test2::Harness::TestSettings->new($settings->tests->all); - return $class->new($runner_s->all, test_settings => $ts, workdir => $settings->harness->workdir, %params); + return $self->{+RUNNER} = $class->new($runner_s->all, test_settings => $ts, workdir => $settings->harness->workdir); +} + +sub resources { + my $self = shift; + + return $self->{+RESOURCES} if $self->{+RESOURCES}; + + my $settings = $self->settings; + my $res_s = $settings->resource; + my $res_classes = $res_s->classes; + + my @res_class_list = keys %$res_classes; + require(mod2file($_)) for @res_class_list; + + @res_class_list = sort { $a->sort_weight <=> $b->sort_weight } @res_class_list; + + my @resources; + for my $mod (@res_class_list) { + push @resources => $mod->new(@{$res_classes->{$mod}}); + } + + return $self->{+RESOURCES} = \@resources; } 1; diff --git a/lib/App/Yath/Options/Resource.pm b/lib/App/Yath/Options/Resource.pm index 4e5d6e23c..704d4f0b1 100644 --- a/lib/App/Yath/Options/Resource.pm +++ b/lib/App/Yath/Options/Resource.pm @@ -15,15 +15,159 @@ option_group {group => 'resource', category => "Resource Options"} => sub { field => 'classes', alt => ['resource'], - description => 'Specify resources. Use "+" to give a fully qualified module name. Without "+" "App::Yath::Resource::" will be prepended to your argument.', + description => 'Specify resources. Use "+" to give a fully qualified module name. Without "+" "App::Yath::Resource::" and "Test2::Harness::Resource::" will be searched for a matching resource module.', long_examples => [' +My::Resource', ' MyResource,MyOtherResource', ' MyResource=opt1,opt2', ' :{ MyResource :{ opt1 opt2 }: }:', '=:{ MyResource opt1,opt2,... }:'], short_examples => ['MyResource', ' +My::Resource', ' MyResource,MyOtherResource', ' MyResource=opt1,opt2', ' :{ MyResource :{ opt1 opt2 }: }:', '=:{ MyResource opt1,opt2,... }:'], - normalize => sub { fqmod($_[0], 'App::Yath::Resource'), ref($_[1]) ? $_[1] : [split(',', $_[1] // '')] }, + normalize => sub { fqmod($_[0], ['App::Yath::Resource', 'Test2::Harness::Resource']), ref($_[1]) ? $_[1] : [split(',', $_[1] // '')] }, mod_adds_options => 1, ); + + option shared_jobs_config => ( + type => 'Scalar', + default => '.sharedjobslots.yml', + long_examples => [' .sharedjobslots.yml', ' relative/path/.sharedjobslots.yml', ' /absolute/path/.sharedjobslots.yml'], + description => 'Where to look for a shared slot config file. If a filename with no path is provided yath will search the current and all parent directories for the name.', + ); + + option slots => ( + type => 'Scalar', + short => 'j', + default => 1, + alt => ['jobs', 'job_count'], + description => 'Set the number of concurrent jobs to run. Add a :# if you also wish to designate multiple slots per test. 8:2 means 8 slots, but each test gets 2 slots, so 4 tests run concurrently. Tests can find their concurrency assignemnt in the "T2_HARNESS_MY_JOB_CONCURRENCY" environment variable.', + long_examples => [' 4', ' 8:2'], + short_examples => ['4', '8:2'], + from_env_vars => [qw/YATH_JOB_COUNT T2_HARNESS_JOB_COUNT HARNESS_JOB_COUNT/], + clear_env_vars => [qw/YATH_JOB_COUNT T2_HARNESS_JOB_COUNT HARNESS_JOB_COUNT/], + + trigger => sub { + my $opt = shift; + my %params = @_; + + if ($params{action} eq 'set' || $params{action} eq 'initialize') { + my ($val) = @{$params{val}}; + return unless $val =~ m/:/; + my ($jobs, $slots) = split /:/, $val; + @{$params{val}} = ($jobs); + $params{group}->{job_slots} = $slots; + } + }, + ); + + option job_slots => ( + type => 'Scalar', + alt => ['slots_per_job'], + short => 'x', + + description => "This sets the number of slots each job will use (default 1). This is normally set by the ':#' in '-j#:#'.", + from_env_vars => ['T2_HARNESS_JOB_CONCURRENCY'], + clear_env_vars => ['T2_HARNESS_JOB_CONCURRENCY'], + long_examples => [' 2'], + short_examples => ['2'], + + default => sub { + my ($opt, $settings) = @_; + $settings->resource->slots // 1; + }, + ); + + option_post_process \&jobs_post_process; }; +sub jobs_post_process { + my ($options, $state) = @_; + + my $settings = $state->{settings}; + my $resource = $settings->resource; + $resource->field(slots => 1) unless $resource->slots; + $resource->field(job_slots => 1) unless $resource->job_slots; + + my $slots = $resource->slots; + my $job_slots = $resource->job_slots; + + my @args = ( + slots => $slots, + job_slots => $job_slots, + ); + + die "The slots per job (set to $job_slots) must not be larger than the total number of slots (set to $slots).\n" if $job_slots > $slots; + + my %found; + for my $r (keys %{$resource->classes}) { + require(mod2file($r)); + next unless $r->is_job_limiter; + $found{$r}++; + } + + warn "Fix shared slots"; + + if (keys %found) { + unshift @{$resource->classes->{$_} //= []} => @args; + } + else { + require Test2::Harness::Resource::JobCount; + $resource->classes->{'Test2::Harness::Resource::JobCount'} = [@args]; + } +} + +1; + +__END__ +sub jobs_post_process { + my ($settings) = @_; + + my $resource = $settings->resource; + + require Test2::Harness::Runner::Resource::SharedJobSlots::Config; + my $sconf = Test2::Harness::Runner::Resource::SharedJobSlots::Config->find(settings => $settings); + + my %found; + for my $r (@{$runner->resources}) { + require(mod2file($r)); + next unless $r->job_limiter; + $found{$r}++; + } + + if ($sconf && !$found{'Test2::Harness::Runner::Resource::SharedJobSlots'}) { + if (delete $found{'Test2::Harness::Runner::Resource::JobCount'}) { + @{$settings->runner->resources} = grep { $_ ne 'Test2::Harness::Runner::Resource::JobCount' } @{$runner->resources}; + } + + if (!keys %found) { + require Test2::Harness::Runner::Resource::SharedJobSlots; + unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::SharedJobSlots'; + $found{'Test2::Harness::Runner::Resource::SharedJobSlots'}++; + } + } + elsif (!keys %found) { + require Test2::Harness::Runner::Resource::JobCount; + unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::JobCount'; + } + + if ($found{'Test2::Harness::Runner::Resource::SharedJobSlots'} && $sconf) { + $runner->field(job_count => $sconf->default_slots_per_run || $sconf->max_slots_per_run) if $runner && !$runner->job_count; + $runner->field(slots_per_job => $sconf->default_slots_per_job || $sconf->max_slots_per_job) if $runner && !$runner->slots_per_job; + + my $run_slots = $runner->job_count; + my $job_slots = $runner->slots_per_job; + + die "Requested job count ($run_slots) exceeds the system shared limit (" . $sconf->max_slots_per_run . ").\n" + if $run_slots > $sconf->max_slots_per_run; + + die "Requested job concurrency ($job_slots) exceeds the system shared limit (" . $sconf->max_slots_per_job . ").\n" + if $job_slots > $sconf->max_slots_per_job; + } + + $runner->field(job_count => 1) if $runner && !$runner->job_count; + $runner->field(slots_per_job => 1) if $runner && !$runner->slots_per_job; + + my $run_slots = $runner->job_count; + my $job_slots = $runner->slots_per_job; + + die "The slots_per_job (set to $job_slots) must not be larger than the job_count (set to $run_slots).\n" if $job_slots > $run_slots; +} + 1; diff --git a/lib/App/Yath/Options/Scheduler.pm b/lib/App/Yath/Options/Scheduler.pm index ba97da4ad..393aa012e 100644 --- a/lib/App/Yath/Options/Scheduler.pm +++ b/lib/App/Yath/Options/Scheduler.pm @@ -24,89 +24,12 @@ option_group {group => 'scheduler', category => 'Scheduler Options'} => sub { normalize => sub { fqmod($_[0], 'Test2::Harness::Scheduler') }, ); - - option shared_jobs_config => ( - type => 'Scalar', - default => '.sharedjobslots.yml', - long_examples => [' .sharedjobslots.yml', ' relative/path/.sharedjobslots.yml', ' /absolute/path/.sharedjobslots.yml'], - description => 'Where to look for a shared slot config file. If a filename with no path is provided yath will search the current and all parent directories for the name.', - ); }; -option_post_process \&scheduler_post_process; - -sub scheduler_post_process { - my ($options, $state) = @_; - - my $settings = $state->{settings}; - my $scheduler = $settings->scheduler; - my $tests = $settings->tests; - - warn "Fix shared job slots"; -} - 1; -warn "Do we need the stuff under __END__?"; - __END__ -sub fix_job_resources { - my ($settings) = @_; - - my $runner = $settings->runner; - - require Test2::Harness::Runner::Resource::SharedJobSlots::Config; - my $sconf = Test2::Harness::Runner::Resource::SharedJobSlots::Config->find(settings => $settings); - - my %found; - for my $r (@{$runner->resources}) { - require(mod2file($r)); - next unless $r->job_limiter; - $found{$r}++; - } - - if ($sconf && !$found{'Test2::Harness::Runner::Resource::SharedJobSlots'}) { - if (delete $found{'Test2::Harness::Runner::Resource::JobCount'}) { - @{$settings->runner->resources} = grep { $_ ne 'Test2::Harness::Runner::Resource::JobCount' } @{$runner->resources}; - } - - if (!keys %found) { - require Test2::Harness::Runner::Resource::SharedJobSlots; - unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::SharedJobSlots'; - $found{'Test2::Harness::Runner::Resource::SharedJobSlots'}++; - } - } - elsif (!keys %found) { - require Test2::Harness::Runner::Resource::JobCount; - unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::JobCount'; - } - - if ($found{'Test2::Harness::Runner::Resource::SharedJobSlots'} && $sconf) { - $runner->field(job_count => $sconf->default_slots_per_run || $sconf->max_slots_per_run) if $runner && !$runner->job_count; - $runner->field(slots_per_job => $sconf->default_slots_per_job || $sconf->max_slots_per_job) if $runner && !$runner->slots_per_job; - - my $run_slots = $runner->job_count; - my $job_slots = $runner->slots_per_job; - - die "Requested job count ($run_slots) exceeds the system shared limit (" . $sconf->max_slots_per_run . ").\n" - if $run_slots > $sconf->max_slots_per_run; - - die "Requested job concurrency ($job_slots) exceeds the system shared limit (" . $sconf->max_slots_per_job . ").\n" - if $job_slots > $sconf->max_slots_per_job; - } - - $runner->field(job_count => 1) if $runner && !$runner->job_count; - $runner->field(slots_per_job => 1) if $runner && !$runner->slots_per_job; - - my $run_slots = $runner->job_count; - my $job_slots = $runner->slots_per_job; - - die "The slots_per_job (set to $job_slots) must not be larger than the job_count (set to $run_slots).\n" if $job_slots > $run_slots; -} - - - =pod =encoding UTF-8 diff --git a/lib/App/Yath/Options/Tests.pm b/lib/App/Yath/Options/Tests.pm index 68791487c..579837185 100644 --- a/lib/App/Yath/Options/Tests.pm +++ b/lib/App/Yath/Options/Tests.pm @@ -157,6 +157,7 @@ option_group {group => 'tests', category => 'Test Options', maybe => 1} => sub { description => 'Use the specified file as standard input to ALL tests', trigger => sub { + my $opt = shift; my %params = @_; return unless $params{action} eq 'set'; diff --git a/lib/App/Yath/Resource.pm b/lib/App/Yath/Resource.pm new file mode 100644 index 000000000..6a65e84be --- /dev/null +++ b/lib/App/Yath/Resource.pm @@ -0,0 +1,13 @@ +package App::Yath::Resource; +use stricgt; +use warnings; + +use parent 'Test2::Harnes::Resource'; +use Test2::Harness::Util::HashBase; + +sub init { + my $self = shift; + $self->SUPER::init(); +} + +1; diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 17dfc4b5d..3f867ba05 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -37,6 +37,7 @@ use Test2::Harness::Util::HashBase qw{ setsid if $params{setsid}; + + my ($self, $cb, @ipc); + if ($params{job}) { + ($self, $cb, @ipc) = $class->collect_job(%params); + } + elsif ($params{command}) { + ($self, $cb, @ipc) = $class->collect_command(%params); + } + else { + die "Was not given either a 'job' or 'command' to collect"; + } + + $self->{+SKIP} = $params{skip} if $params{skip}; + + open(my $stderr, '>&', \*STDERR) or die "Could not clone STDERR"; + local $SIG{__WARN__} = sub { print $stderr @_ }; + + my $exit; + my $ok = eval { $exit = $self->launch_and_process($cb); 1 } // 0; + + my $err = $@; + + if (!$ok) { + $self->_die($err, no_exit => 1); + print $stderr $err; + print STDERR "Test2 Harness Collector Error: $err"; + return 255; + } + + return 0 unless $params{forward_exit}; + + if ($exit->{sig}) { + delete $SIG{$_} for grep { $SIG{$_} } keys %SIG; + kill($exit->{sig}, $$); + sleep 1; + exit(255); # In case signal cannot be forwarded + } + + exit($exit->{err} // 0); +} + +sub setsid { + POSIX::setsid() or die "Could not setsid: $!"; + my $pid = fork // die "Could not fork: $!"; + exit(0) if $pid; +} + +sub collect_command { + my $class = shift; + my %params = @_; + my $root_pid = $params{root_pid} or die "No root pid"; + my $io_pipes = $params{io_pipes} or die "IO pipes are required"; - # Disconnect from parent group so that a test cannot kill the harness. - if ($params{setsid}) { - POSIX::setsid() or die "Could not setsid: $!"; - my $pid = fork // die "Could not fork: $!"; - exit(0) if $pid; + my ($stdout, $stderr); + $stdout = Atomic::Pipe->from_fh('>&=', \*STDOUT); + $stdout->set_mixed_data_mode(); + + if ($io_pipes > 1) { + $stderr = Atomic::Pipe->from_fh('>&=', \*STDERR); + $stderr->set_mixed_data_mode(); } + my $handler = sub { + for my $e (@_) { + $stdout->write_message(encode_json($e)); + next unless $stderr; + my $event_id = $e->{event_id} or next; + $stderr->write_message(qq/{"event_id":"$event_id"}/); + } + }; + + my $parser = Test2::Harness::Collector::IOParser->new( + job_id => 0, + job_try => 0, + run_id => 0, + type => $params{type} // 'unknown', + name => $params{name} // 'unknown', + tag => $params{tag} // $params{name} // $params{type} // 'unknown', + ); + + return $class->new( + parser => $parser, + job_id => 0, + job_try => 0, + run_id => 0, + root_pid => $root_pid, + output => $handler, + command => $params{command}, + always_flush => 1, + ); +} + +sub collect_job { + my $class = shift; + my %params = @_; + + my $root_pid = $params{root_pid} or die "No root pid"; + my $ts = $params{test_settings} or die "No test_settings provided"; unless (blessed($ts)) { my $tsclass = $ts->{class} // 'Test2::Harness::TestSettings'; @@ -206,7 +301,7 @@ sub collect { my $auditor = Test2::Harness::Collector::Auditor->new(%create_params); my $parser = Test2::Harness::Collector::IOParser::Stream->new(%create_params, type => 'test'); - my $collector = $class->new( + my $self = $class->new( %create_params, %params, parser => $parser, @@ -215,12 +310,9 @@ sub collect { root_pid => $root_pid, ); - open(our $stderr, '>&', \*STDERR) or die "Could not clone STDERR"; - - $SIG{__WARN__} = sub { print $stderr @_ }; - - my $ok = eval { - $collector->launch_and_process(sub { + return ( + $self, + sub { my $pid = shift; $child_pid = $pid; @@ -232,41 +324,43 @@ sub collect { pid => $pid, }, ); - }); + }, + $inst_ipc => $inst_con, + $agg_ipc => $agg_con, + ); +} - 1; - }; +sub event_timeout { my $ts = shift->test_settings or return; $ts->event_timeout } +sub post_exit_timeout { my $ts = shift->test_settings or return; $ts->post_exit_timeout } - my $err = $@; +sub launch_command { + my $self = shift; - if (!$ok) { - $collector->_die($err, no_exit => 1); - print $stderr $err; - print STDERR "Test2 Harness Collector Error: $err"; - return 255; + return [$^X, '-e', "print \"1..0 # SKIP $self->{+SKIP}\\n\""] + if $self->{+SKIP}; + + if(my $job = $self->{+JOB}) { + my $run = $self->{+RUN}; + my $ts = $self->{+TEST_SETTINGS}; + + return $job->launch_command($run, $ts); } - return 0; -} + return $self->{+COMMAND} if $self->{+COMMAND}; -sub event_timeout { my $ts = shift->test_settings or return; $ts->event_timeout } -sub post_exit_timeout { my $ts = shift->test_settings or return; $ts->post_exit_timeout } + die "No command!"; +} sub launch_and_process { my $self = shift; my ($parent_cb) = @_; - my $run = $self->{+RUN}; - my $ts = $self->{+TEST_SETTINGS}; - my $job = $self->{+JOB}; - - $self->setup_child(); - my $pid = start_process(@{$job->launch_command($run, $ts)}); + my $pid = start_process($self->launch_command, sub { $self->setup_child() }); $0 = "yath-collector $pid"; $parent_cb->($pid) if $parent_cb; - $self->process($pid); + return $self->process($pid); } sub _pre_event { @@ -361,7 +455,7 @@ sub setup_child_output { sub setup_child_input { my $self = shift; - my $ts = $self->{+TEST_SETTINGS}; + my $ts = $self->{+TEST_SETTINGS} or return; if (my $in_file = $ts->input_file) { my $in_fh = open_file($in_file, '<') if $in_file; @@ -382,7 +476,9 @@ sub setup_child_input { sub setup_child_env_vars { my $self = shift; - my $ts = $self->{+TEST_SETTINGS}; + my $ts = $self->{+TEST_SETTINGS} or return; + + delete $ENV{T2_HARNESS_PIPE_COUNT}; $ENV{TMPDIR} = $self->tempdir; $ENV{T2_TRACE_STAMPS} = 1; @@ -483,7 +579,7 @@ sub process { $SIG{PIPE} = 'IGNORE'; - my $exit = 0; + my $exit; my $ok = eval { $exit = $self->_process($child_pid); 1 }; my $err = $@; @@ -632,7 +728,7 @@ sub _process { } } - $self->_flush() if $self->{+INTERACTIVE}; + $self->_flush() if $self->{+INTERACTIVE} || $self->{+ALWAYS_FLUSH}; } next if $did_work; @@ -678,6 +774,7 @@ sub _process { $self->_flush(); + $SIG{CHLD} = 'IGNORE'; unless (defined($exit // $exited) || $reap->(WNOHANG)) { $self->_die("Sending 'TERM' signal to process...\n", no_exit => 1); @@ -709,6 +806,8 @@ sub _process { push @$times => shift(@$end_times) - shift(@$start_times); } + my $ret = parse_exit($exit); + $self->_pre_event( stream => 'process', stamp => $exited, @@ -719,7 +818,7 @@ sub _process { harness_job_exit => { job_id => $self->job_id, exit => $exit, - codes => parse_exit($exit), + codes => $ret, stamp => $exited, retry => $self->should_retry($exit), times => $times, @@ -728,7 +827,7 @@ sub _process { }, ); - return $exit ? 1 : 0; + return $ret; } sub should_retry { diff --git a/lib/Test2/Harness/Collector/Preloaded.pm b/lib/Test2/Harness/Collector/Preloaded.pm index 76dd091c2..1d56c37d3 100644 --- a/lib/Test2/Harness/Collector/Preloaded.pm +++ b/lib/Test2/Harness/Collector/Preloaded.pm @@ -39,6 +39,8 @@ sub launch_and_process { my $self = shift; my ($parent_cb, $child_cb) = @_; + return $self->SUPER::launch_and_process(@_) if $self->{+SKIP}; + my $run = $self->{+RUN}; my $job = $self->{+JOB}; my $ts = $self->{+TEST_SETTINGS}; diff --git a/lib/Test2/Harness/IPC/Util.pm b/lib/Test2/Harness/IPC/Util.pm index 579e16ab5..281c376e8 100644 --- a/lib/Test2/Harness/IPC/Util.pm +++ b/lib/Test2/Harness/IPC/Util.pm @@ -88,13 +88,18 @@ sub swap_io { sub start_collected_process { my %params = @_; + my $post_fork = delete $params{post_fork}; + my %seen; my $pid = start_process( - $^X, # Call current perl - (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @INC), # Use the dev libs specified - '-mTest2::Harness::Collector', # Load Collector - '-e' => 'exit(Test2::Harness::Collector->collect($ARGV[0]))', # Run it. - encode_json(\%params), # json data for what to do + [ + $^X, # Call current perl + (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @INC), # Use the dev libs specified + '-mTest2::Harness::Collector', # Load Collector + '-e' => 'exit(Test2::Harness::Collector->collect($ARGV[0]))', # Run it. + encode_json(\%params), # json data for what to do + ], + $post_fork, ); return $pid unless $params{setsid}; @@ -111,13 +116,15 @@ sub start_collected_process { } sub start_process { - my @cmd = @_; + my ($cmd, $post_fork) = @_; my $pid = fork // die "Could not fork: $!"; return $pid if $pid; + $post_fork->() if $post_fork; + no warnings "exec"; - my $ok = eval { exec(@cmd); 1 }; + my $ok = eval { exec(@$cmd); 1 }; my $err = $@; print STDERR "Failed to exec ($!) $@\n"; POSIX::_exit(255); diff --git a/lib/Test2/Harness/Instance.pm b/lib/Test2/Harness/Instance.pm index c31fdae77..f9e4c096e 100644 --- a/lib/Test2/Harness/Instance.pm +++ b/lib/Test2/Harness/Instance.pm @@ -19,10 +19,10 @@ use Test2::Harness::Instance::Request; use Test2::Harness::Instance::Response; use Test2::Harness::Util::HashBase qw{ - scheduler->start($self->{+IPC}); + my $scheduler = $self->scheduler; + $scheduler->start($self->{+IPC}); + + for my $res (@{$self->{+RESOURCES} //= []}) { + next unless $res->spawns_process; + $res->spawn_process(instance => $self, scheduler => $scheduler); + } ipc_loop( ipcs => $self->{+IPC}, diff --git a/lib/Test2/Harness/Resource.pm b/lib/Test2/Harness/Resource.pm new file mode 100644 index 000000000..5df8a9dfb --- /dev/null +++ b/lib/Test2/Harness/Resource.pm @@ -0,0 +1,198 @@ +package Test2::Harness::Resource; +use strict; +use warnings; + +use Carp qw/croak/; + +use Term::Table; + +use Test2::Util::Times qw/render_duration/; + +use Test2::Harness::Util qw/parse_exit/; +use Test2::Harness::IPC::Util qw/start_collected_process ipc_connect/; +use Test2::Harness::Util::JSON qw/decode_json encode_json/; +use Test2::Harness::Util::UUID qw/gen_uuid/; + +use Test2::Harness::Util::HashBase qw{ + {+RESOURCE_ID} //= gen_uuid(); +} + +sub spawns_process { 0 } + +sub is_job_limiter { 0 } + +sub setup { } +sub tick { } +sub refresh { } +sub discharge { } +sub cleanup { } + +sub subprocess_args { () } + +sub resource_name { 'Resource' } +sub resource_io_tag { 'RESOURCE' } + +sub job_limiter_max { croak "'$_[0]' does not implement 'job_limiter_max'" } +sub job_limiter_at_max { croak "'$_[0]' does not implement 'job_limiter_at_max'" } +sub available { croak "'$_[0]' does not implement 'available'" } +sub assign { croak "'$_[0]' does not implement 'assign'" } +sub release { croak "'$_[0]' does not implement 'release'" } +sub subprocess_run { croak "'$_[0]' does not implement 'subprocess_run'" } + +sub sort_weight { + my $class = shift; + return 100 if $class->is_job_limiter; + return 50; +} + +sub subprocess_exited { + my $self = shift; + my %params = @_; + + my $pid = $params{pid}; + my $exit = $params{exit}; + my $scheduler = $params{scheduler}; + + my $x = parse_exit($exit); + + warn "'$self' sub-process '$pid' exited (Code: $x->{err}, Signal: $x->{sig})" + if $exit; +} + +sub spawn_class { + my $self = shift; + return ref($self) || $self; +} + +sub spawn_command { + my $self = shift; + my %params = @_; + + my $class = $self->spawn_class; + my $instance = $params{instance}; + + my %seen; + return ( + $^X, # Call current perl + (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @INC), # Use the dev libs specified + "-m$class", # Load Resource + '-e' => "exit($class->_subprocess_run(\$ARGV[0]))", # Run it. + encode_json({parent_pid => $$, $self->subprocess_args}), # json data + ); +} + +sub spawn_process { + my $self = shift; + my %params = @_; + + my $scheduler = $params{scheduler} or die "'scheduler' is required"; + + my @spawn_cmd = $self->spawn_command(%params); + + my $pid = start_collected_process( + instance_ipc => $params{instance_ipc}, + io_pipes => $ENV{T2_HARNESS_PIPE_COUNT}, + command => \@spawn_cmd, + root_pid => $$, + type => 'resource', + name => $self->resourse_name, + tag => $self->resource_io_tag, + setsid => 0, + forward_exit => 1, + ); + + $scheduler->register_child($pid => sub { $self->subprocess_exited(@_) }); + + return $pid; +} + +sub _subprocess_run { + my $class = shift; + my ($json) = @_; + + STDOUT->autoflush(1); + STDERR->autoflush(1); + + my $params = decode_json($json); + + $0 = "yath-resource-" . $class->resourse_name(%$params); + + $class->subprocess_run(%$params); + + exit 0; +} + +sub status_data { () } + +sub status_lines { + my $self = shift; + + my $data = $self->status_data || return; + return unless @$data; + + my $out = ""; + + for my $group (@$data) { + my $gout = "\n"; + $gout .= "**** $group->{title} ****\n\n" if defined $group->{title}; + + for my $table (@{$group->{tables} || []}) { + my $rows = $table->{rows}; + + if (my $format = $table->{format}) { + my $rows2 = []; + + for my $row (@$rows) { + my $row2 = []; + for (my $i = 0; $i < @$row; $i++) { + my $val = $row->[$i]; + my $fmt = $format->[$i]; + + $val = defined($val) ? render_duration($val) : '--' + if $fmt && $fmt eq 'duration'; + + push @$row2 => $val; + } + push @$rows2 => $row2; + } + + $rows = $rows2; + } + + next unless $rows && @$rows; + + my $tt = Term::Table->new( + header => $table->{header}, + rows => $rows, + + sanitize => 1, + collapse => 1, + auto_columns => 1, + + %{$table->{term_table_opts} || {}}, + ); + + $gout .= "** $table->{title} **\n" if defined $table->{title}; + $gout .= "$_\n" for $tt->render; + $gout .= "\n"; + } + + if ($group->{lines} && @{$group->{lines}}) { + $gout .= "$_\n" for @{$group->{lines}}; + $gout .= "\n"; + } + + $out .= $gout; + } + + return $out; +} + +1; diff --git a/lib/Test2/Harness/Resource/JobCount.pm b/lib/Test2/Harness/Resource/JobCount.pm new file mode 100644 index 000000000..f9944f8c5 --- /dev/null +++ b/lib/Test2/Harness/Resource/JobCount.pm @@ -0,0 +1,154 @@ +package Test2::Harness::Resource::JobCount; +use strict; +use warnings; + +use Carp qw/croak/; +use List::Util qw/max min/; + +use parent 'Test2::Harness::Resource'; +use Test2::Harness::Util::HashBase qw{ + {+SLOTS} } +sub job_limiter_at_max { $_[0]->{+SLOTS} <= $_[0]->{+USED} ? 1 : 0 } + +sub init { + my $self = shift; + $self->SUPER::init(); + + die "'slots' is a require attribute and must be set higher to 0" unless $self->{+SLOTS}; + die "'job_slots' is a require attribute and must be set higher to 0" unless $self->{+JOB_SLOTS}; + + $self->{+USED} = 0; + $self->{+ASSIGNMENTS} = {}; +} + +# Always applicable +sub applicable { 1 } + +sub available { + my $self = shift; + my ($id, $job) = @_; + + my $run_count = $self->{+JOB_SLOTS}; + my $min_slots = $job->test_file->check_min_slots // 1; + my $max_slots = $job->test_file->check_max_slots // $min_slots; + + return -1 if $run_count < $min_slots; + return -1 if $self->{+SLOTS} < $min_slots; + + my $free = $self->{+SLOTS} - $self->{+USED}; + return 0 if $free < 1; + return 0 if $free < $min_slots; + + return min($max_slots, $free); +} + +sub assign { + my $self = shift; + my ($id, $job, $env) = @_; + + croak "'env' hash was not provided" unless $env; + + my $count = $self->available($id, $job); + + $self->{+USED} += $count; + $self->{+ASSIGNMENTS}->{$id} = { + job => $job, + count => $count, + }; + + $env->{T2_HARNESS_MY_JOB_CONCURRENCY} = $count; + + return $env; +} + +sub release { + my $self = shift; + my ($id, $job) = @_; + + my $assign = delete $self->{+ASSIGNMENTS}->{$id} or die "Invalid release ID: $id"; + my $count = $assign->{count}; + + $self->{+USED} -= $count; + + return $id; +} + +sub status_data { () } + +sub status_lines { +# my $self = shift; +# +# my $data = $self->status_data || return; +# return unless @$data; +# +# my $out = ""; +# +# for my $group (@$data) { +# my $gout = "\n"; +# $gout .= "**** $group->{title} ****\n\n" if defined $group->{title}; +# +# for my $table (@{$group->{tables} || []}) { +# my $rows = $table->{rows}; +# +# if (my $format = $table->{format}) { +# my $rows2 = []; +# +# for my $row (@$rows) { +# my $row2 = []; +# for (my $i = 0; $i < @$row; $i++) { +# my $val = $row->[$i]; +# my $fmt = $format->[$i]; +# +# $val = defined($val) ? render_duration($val) : '--' +# if $fmt && $fmt eq 'duration'; +# +# push @$row2 => $val; +# } +# push @$rows2 => $row2; +# } +# +# $rows = $rows2; +# } +# +# next unless $rows && @$rows; +# +# my $tt = Term::Table->new( +# header => $table->{header}, +# rows => $rows, +# +# sanitize => 1, +# collapse => 1, +# auto_columns => 1, +# +# %{$table->{term_table_opts} || {}}, +# ); +# +# $gout .= "** $table->{title} **\n" if defined $table->{title}; +# $gout .= "$_\n" for $tt->render; +# $gout .= "\n"; +# } +# +# if ($group->{lines} && @{$group->{lines}}) { +# $gout .= "$_\n" for @{$group->{lines}}; +# $gout .= "\n"; +# } +# +# $out .= $gout; +# } +# +# return $out; +} + +1; diff --git a/lib/Test2/Harness/Run/Job.pm b/lib/Test2/Harness/Run/Job.pm index 97ea0b3d6..ec7f8e3d5 100644 --- a/lib/Test2/Harness/Run/Job.pm +++ b/lib/Test2/Harness/Run/Job.pm @@ -35,6 +35,13 @@ sub try { return scalar(@{$self->{+RESULTS}}); } +sub resource_id { + my $self = shift; + my $job_id = $self->{+JOB_ID}; + my $try = $self->try // 0; + return "${job_id}:${try}"; +} + sub launch_command { my $self = shift; my ($run, $ts) = @_; diff --git a/lib/Test2/Harness/Runner.pm b/lib/Test2/Harness/Runner.pm index 7d6e6f645..364bf2845 100644 --- a/lib/Test2/Harness/Runner.pm +++ b/lib/Test2/Harness/Runner.pm @@ -46,7 +46,7 @@ sub job_update { } sub job_launch_data { my $self = shift; - my ($run, $job) = @_; + my ($run, $job, $env, $skip) = @_; my $run_id = $run->{run_id}; @@ -56,11 +56,15 @@ sub job_launch_data { $job->test_file->test_settings ); + my $env_ref = $ts->env_vars; + %$env_ref = (%$env_ref, %$env); + my $workdir = $self->{+WORKDIR}; return ( workdir => $self->{+WORKDIR}, run => $run->data_no_jobs, + skip => $skip, job => $job, test_settings => $ts, root_pid => $$, @@ -68,13 +72,24 @@ sub job_launch_data { ); } +sub skip_job { + my $self = shift; + my ($run, $job, $env, $skip) = @_; + + $skip //= "Unknown reason"; + + return 1 if eval { start_collected_process($self->job_launch_data($run, $job, $env, $skip)); 1 }; + warn $@; + return 0; +} + sub launch_job { my $self = shift; - my ($stage, $run, $job) = @_; + my ($stage, $run, $job, $env) = @_; croak "Invalid stage '$stage'" unless $stage eq 'NONE'; - return 1 if eval { start_collected_process($self->job_launch_data($run, $job)); 1 }; + return 1 if eval { start_collected_process($self->job_launch_data($run, $job, $env)); 1 }; warn $@; return 0; } diff --git a/lib/Test2/Harness/Runner/Preloading.pm b/lib/Test2/Harness/Runner/Preloading.pm index 5d401ce6a..6ad6e4c8c 100644 --- a/lib/Test2/Harness/Runner/Preloading.pm +++ b/lib/Test2/Harness/Runner/Preloading.pm @@ -10,7 +10,6 @@ use Scalar::Util qw/blessed/; use Test2::Util qw/IS_WIN32/; use Test2::Harness::Util qw/parse_exit mod2file/; -use Test2::Harness::IPC::Util qw/start_process/; use Test2::Harness::Util::JSON qw/encode_json/; use Test2::Harness::Preload(); @@ -175,9 +174,9 @@ sub start_base_stage { sub launch_job { my $self = shift; - my ($stage, $run, $job) = @_; + my ($stage, $run, $job, $env) = @_; - my %job_launch_data = $self->job_launch_data($run, $job); + my %job_launch_data = $self->job_launch_data($run, $job, $env); my $ts = $job_launch_data{test_settings}; my $can_fork = 1; diff --git a/lib/Test2/Harness/Runner/Preloading/Stage.pm b/lib/Test2/Harness/Runner/Preloading/Stage.pm index 450760187..d894361a6 100644 --- a/lib/Test2/Harness/Runner/Preloading/Stage.pm +++ b/lib/Test2/Harness/Runner/Preloading/Stage.pm @@ -89,14 +89,14 @@ sub launch { my $pkg = __PACKAGE__; my %seen; - my $pid = start_process( + my $pid = start_process([ $^X, # Call current perl (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @INC), # Use the dev libs specified (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @{$ts->includes}), # Use the test libs "-m${class}=start", # Load Stage '-e' => "\$${pkg}\::ERROR ? die \$${pkg}\::ERROR : exit(\$${pkg}\::EXIT // 255)", # Run it. encode_json(\%params), # json data for job - ); + ]); return $pid; } diff --git a/lib/Test2/Harness/Scheduler.pm b/lib/Test2/Harness/Scheduler.pm index 21120ea05..a1b7c7150 100644 --- a/lib/Test2/Harness/Scheduler.pm +++ b/lib/Test2/Harness/Scheduler.pm @@ -8,6 +8,7 @@ use Carp qw/confess/; use Test2::Harness::Util::HashBase qw{ runner + resources }; sub init { } diff --git a/lib/Test2/Harness/Scheduler/Default.pm b/lib/Test2/Harness/Scheduler/Default.pm index 52af734c8..f10472d9c 100644 --- a/lib/Test2/Harness/Scheduler/Default.pm +++ b/lib/Test2/Harness/Scheduler/Default.pm @@ -294,12 +294,26 @@ sub advance { return unless $self->runner->ready; - my ($run, $job, $stage, $cat, $dur, $confl, $job_set) = $self->next_job() or return; + my ($run, $job, $stage, $cat, $dur, $confl, $job_set, $skip, $resources) = $self->next_job() or return; - my $ok = $self->runner->launch_job($stage, $run, $job); + my $res_id = $job->resource_id; + + my $ok; + if ($skip) { + @$resources = grep { $_->is_job_limiter } @$resources; + my $env = {}; + $_->assign($res_id, $job, $env) for @$resources; + $ok = $self->runner->skip_job($run, $job, $env, $skip); + } + else { + my $env = {}; + $_->assign($res_id, $job, $env) for @$resources; + $ok = $self->runner->launch_job($stage, $run, $job, $env); + } # If the job could not be started unless ($ok) { + $_->release($res_id, $job) for @$resources; $job_set->{$job->job_id} = $job; return 1; } @@ -312,6 +326,8 @@ sub advance { cleanup => sub { my $scheduler = shift; + $_->release($res_id, $job) for @{$resources}; + $scheduler->{+RUNNING}->{categories}->{$cat}--; $scheduler->{+RUNNING}->{durations}->{$dur}--; $scheduler->{+RUNNING}->{conflicts}->{$_}-- for @{$confl || []}; @@ -327,6 +343,7 @@ sub advance { } $job = undef; $run = undef; + $resources = undef; }, }; @@ -364,7 +381,8 @@ sub duration_order { [qw/long medium short/] } sub next_job { my $self = shift; - my $running = $self->{+RUNNING}; + my $resources = $self->{+RESOURCES}; + my $running = $self->{+RUNNING}; my $stages = $self->runner->stage_sets; my $cat_order = $self->category_order; @@ -391,15 +409,37 @@ sub next_job { for my $confl (qw/conflict none/) { my $search = $search->{$confl} or next; - for my $job_id (keys %$search) { + JOB: for my $job_id (keys %$search) { my $job = $search->{$job_id}; # Skip if conflicting tests are running my $confl = $job->test_file->conflicts_list; next if first { $running->{conflicts}->{$_} } @$confl; + my $res_id = $job->resource_id; + + my $skip; + my @use_resources; + for my $res (@$resources) { + next unless $res->applicable($res_id, $job); + my $av = $res->available($res_id, $job); + + warn "FIXME: skip test"; + if ($av < 0) { + my $comma = $skip ? 1 : 0; + $skip //= "The following resources are permanently unavailable: "; + $skip .= ', ' if $comma; + $skip .= $res->resource_name; + next; + } + + next JOB unless $av || $skip; + + push @use_resources => $res; + } + delete $search->{$job_id}; - return ($run, $job, $run_by_stage, $cat, $dur, $confl, $search); + return ($run, $job, $run_by_stage, $cat, $dur, $confl, $search, $skip, \@use_resources); } } }