Skip to content

Commit

Permalink
Add resources and JobCount
Browse files Browse the repository at this point in the history
  • Loading branch information
exodist committed Dec 1, 2023
1 parent e828558 commit ae8d143
Show file tree
Hide file tree
Showing 18 changed files with 824 additions and 174 deletions.
13 changes: 7 additions & 6 deletions lib/App/Yath.pm
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ sub process_args {
$self->{+ENV_VARS} = $env;
$self->{+OPTION_STATE} = $state;

for my $module (keys %$modules) {
$settings->yath->{plugins}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Plugin');
$settings->renderer->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Renderer');
$settings->resource->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Resource');
warn "FIXME renderers and resources (if applicable)";
}
warn "FIXME renderers and resources (if applicable)";
# This is probably not needed.
#for my $module (keys %$modules) {
# $settings->yath->{plugins}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Plugin');
# $settings->renderer->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Renderer');
# $settings->resource->{classes}->{$module} //= [$module->can('args_from_settings') ? $module->args_from_settings($settings) : ()] if $module->isa('App::Yath::Resource');
#}
}

sub run {
Expand Down
94 changes: 67 additions & 27 deletions lib/App/Yath/Command/start.pm
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ use File::Path qw/remove_tree/;
use parent 'App::Yath::Command';
use Test2::Harness::Util::HashBase qw{
+log_file
+ipc
+runner
+scheduler
+resources
+instance
+collector
};

use Getopt::Yath;
Expand Down Expand Up @@ -79,7 +86,7 @@ sub run {
}
}

my $collector = $self->init_collector();
my $collector = $self->collector();

my $pid = fork // die "Could not fork: $!";
if ($pid) {
Expand All @@ -95,7 +102,7 @@ sub run {
else {
$0 = "yath-daemon";
$collector->setup_child_output();
return $self->start_instance();
$self->instance->run;
}

return 0;
Expand All @@ -106,8 +113,11 @@ sub log_file {
return $self->{+LOG_FILE} //= File::Spec->catfile($self->settings->harness->workdir, 'log.jsonl');
}

sub init_collector {
sub collector {
my $self = shift;

return $self->{+COLLECTOR} if $self->{+COLLECTOR};

my $settings = $self->settings;

my $out_file = $self->log_file;
Expand All @@ -122,13 +132,14 @@ sub init_collector {
open(my $log, '>', $out_file) or die "Could not open '$out_file' for writing: $!";
$log->autoflush(1);

my $parser = Test2::Harness::Collector::IOParser->new(job_id => 0, job_try => 0, run_id => 0, type => 'runner');
my $collector = Test2::Harness::Collector->new(
parser => $parser,
job_id => 0,
job_try => 0,
run_id => 0,
output => sub {
my $parser = Test2::Harness::Collector::IOParser->new(job_id => 0, job_try => 0, run_id => 0, type => 'runner');
return $self->{+COLLECTOR} = Test2::Harness::Collector->new(
parser => $parser,
job_id => 0,
job_try => 0,
run_id => 0,
always_flush => 1,
output => sub {
for my $e (@_) {
print $log encode_json($e), "\n";
return unless $renderers;
Expand All @@ -138,51 +149,58 @@ sub init_collector {
);
}

sub start_instance {
sub instance {
my $self = shift;

return $self->{+INSTANCE} if $self->{+INSTANCE};

my $settings = $self->settings;

my $ipc = $self->build_ipc();
my $runner = $self->build_runner();
my $scheduler = $self->build_scheduler(runner => $runner);
my $ipc = $self->ipc();
my $runner = $self->runner();
my $scheduler = $self->scheduler();
my $resources = $self->resources();

my $instance = Test2::Harness::Instance->new(
return $self->{+INSTANCE} = Test2::Harness::Instance->new(
ipc => $ipc,
scheduler => $scheduler,
runner => $runner,
resources => $resources,
log_file => $self->log_file,
);

$instance->run;

return 0;
}

sub build_ipc {
sub ipc {
my $self = shift;

return $self->{+IPC} if $self->{+IPC};

my $ipc_s = App::Yath::Options::IPC->vivify_ipc($self->settings);
my $ipc = Test2::Harness::IPC::Protocol->new(protocol => $ipc_s->{protocol});
$ipc->start($ipc_s->{address}, $ipc_s->{port});

return $ipc;
return $self->{+IPC} = $ipc;
}

sub build_scheduler {
sub scheduler {
my $self = shift;
my %params = @_;

return $self->{+SCHEDULER} if $self->{+SCHEDULER};

my $runner = $self->runner;
my $resources = $self->resources;

my $scheduler_s = $self->settings->scheduler;
my $class = $scheduler_s->class;
require(mod2file($class));

return $class->new($scheduler_s->all, %params);
return $self->{+SCHEDULER} = $class->new($scheduler_s->all, runner => $runner, resources => $resources);
}

sub build_runner {
sub runner {
my $self = shift;
my %params = @_;

return $self->{+RUNNER} if $self->{+RUNNER};

my $settings = $self->settings;
my $runner_s = $settings->runner;
Expand All @@ -191,7 +209,29 @@ sub build_runner {

my $ts = Test2::Harness::TestSettings->new($settings->tests->all);

return $class->new($runner_s->all, test_settings => $ts, workdir => $settings->harness->workdir, %params);
return $self->{+RUNNER} = $class->new($runner_s->all, test_settings => $ts, workdir => $settings->harness->workdir);
}

sub resources {
my $self = shift;

return $self->{+RESOURCES} if $self->{+RESOURCES};

my $settings = $self->settings;
my $res_s = $settings->resource;
my $res_classes = $res_s->classes;

my @res_class_list = keys %$res_classes;
require(mod2file($_)) for @res_class_list;

@res_class_list = sort { $a->sort_weight <=> $b->sort_weight } @res_class_list;

my @resources;
for my $mod (@res_class_list) {
push @resources => $mod->new(@{$res_classes->{$mod}});
}

return $self->{+RESOURCES} = \@resources;
}

1;
148 changes: 146 additions & 2 deletions lib/App/Yath/Options/Resource.pm
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,159 @@ option_group {group => 'resource', category => "Resource Options"} => sub {
field => 'classes',
alt => ['resource'],

description => 'Specify resources. Use "+" to give a fully qualified module name. Without "+" "App::Yath::Resource::" will be prepended to your argument.',
description => 'Specify resources. Use "+" to give a fully qualified module name. Without "+" "App::Yath::Resource::" and "Test2::Harness::Resource::" will be searched for a matching resource module.',

long_examples => [' +My::Resource', ' MyResource,MyOtherResource', ' MyResource=opt1,opt2', ' :{ MyResource :{ opt1 opt2 }: }:', '=:{ MyResource opt1,opt2,... }:'],
short_examples => ['MyResource', ' +My::Resource', ' MyResource,MyOtherResource', ' MyResource=opt1,opt2', ' :{ MyResource :{ opt1 opt2 }: }:', '=:{ MyResource opt1,opt2,... }:'],

normalize => sub { fqmod($_[0], 'App::Yath::Resource'), ref($_[1]) ? $_[1] : [split(',', $_[1] // '')] },
normalize => sub { fqmod($_[0], ['App::Yath::Resource', 'Test2::Harness::Resource']), ref($_[1]) ? $_[1] : [split(',', $_[1] // '')] },

mod_adds_options => 1,
);

option shared_jobs_config => (
type => 'Scalar',
default => '.sharedjobslots.yml',
long_examples => [' .sharedjobslots.yml', ' relative/path/.sharedjobslots.yml', ' /absolute/path/.sharedjobslots.yml'],
description => 'Where to look for a shared slot config file. If a filename with no path is provided yath will search the current and all parent directories for the name.',
);

option slots => (
type => 'Scalar',
short => 'j',
default => 1,
alt => ['jobs', 'job_count'],
description => 'Set the number of concurrent jobs to run. Add a :# if you also wish to designate multiple slots per test. 8:2 means 8 slots, but each test gets 2 slots, so 4 tests run concurrently. Tests can find their concurrency assignemnt in the "T2_HARNESS_MY_JOB_CONCURRENCY" environment variable.',
long_examples => [' 4', ' 8:2'],
short_examples => ['4', '8:2'],
from_env_vars => [qw/YATH_JOB_COUNT T2_HARNESS_JOB_COUNT HARNESS_JOB_COUNT/],
clear_env_vars => [qw/YATH_JOB_COUNT T2_HARNESS_JOB_COUNT HARNESS_JOB_COUNT/],

trigger => sub {
my $opt = shift;
my %params = @_;

if ($params{action} eq 'set' || $params{action} eq 'initialize') {
my ($val) = @{$params{val}};
return unless $val =~ m/:/;
my ($jobs, $slots) = split /:/, $val;
@{$params{val}} = ($jobs);
$params{group}->{job_slots} = $slots;
}
},
);

option job_slots => (
type => 'Scalar',
alt => ['slots_per_job'],
short => 'x',

description => "This sets the number of slots each job will use (default 1). This is normally set by the ':#' in '-j#:#'.",
from_env_vars => ['T2_HARNESS_JOB_CONCURRENCY'],
clear_env_vars => ['T2_HARNESS_JOB_CONCURRENCY'],
long_examples => [' 2'],
short_examples => ['2'],

default => sub {
my ($opt, $settings) = @_;
$settings->resource->slots // 1;
},
);

option_post_process \&jobs_post_process;
};

sub jobs_post_process {
my ($options, $state) = @_;

my $settings = $state->{settings};
my $resource = $settings->resource;
$resource->field(slots => 1) unless $resource->slots;
$resource->field(job_slots => 1) unless $resource->job_slots;

my $slots = $resource->slots;
my $job_slots = $resource->job_slots;

my @args = (
slots => $slots,
job_slots => $job_slots,
);

die "The slots per job (set to $job_slots) must not be larger than the total number of slots (set to $slots).\n" if $job_slots > $slots;

my %found;
for my $r (keys %{$resource->classes}) {
require(mod2file($r));
next unless $r->is_job_limiter;
$found{$r}++;
}

warn "Fix shared slots";

if (keys %found) {
unshift @{$resource->classes->{$_} //= []} => @args;
}
else {
require Test2::Harness::Resource::JobCount;
$resource->classes->{'Test2::Harness::Resource::JobCount'} = [@args];
}
}

1;

__END__
sub jobs_post_process {
my ($settings) = @_;
my $resource = $settings->resource;
require Test2::Harness::Runner::Resource::SharedJobSlots::Config;
my $sconf = Test2::Harness::Runner::Resource::SharedJobSlots::Config->find(settings => $settings);
my %found;
for my $r (@{$runner->resources}) {
require(mod2file($r));
next unless $r->job_limiter;
$found{$r}++;
}
if ($sconf && !$found{'Test2::Harness::Runner::Resource::SharedJobSlots'}) {
if (delete $found{'Test2::Harness::Runner::Resource::JobCount'}) {
@{$settings->runner->resources} = grep { $_ ne 'Test2::Harness::Runner::Resource::JobCount' } @{$runner->resources};
}
if (!keys %found) {
require Test2::Harness::Runner::Resource::SharedJobSlots;
unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::SharedJobSlots';
$found{'Test2::Harness::Runner::Resource::SharedJobSlots'}++;
}
}
elsif (!keys %found) {
require Test2::Harness::Runner::Resource::JobCount;
unshift @{$runner->resources} => 'Test2::Harness::Runner::Resource::JobCount';
}
if ($found{'Test2::Harness::Runner::Resource::SharedJobSlots'} && $sconf) {
$runner->field(job_count => $sconf->default_slots_per_run || $sconf->max_slots_per_run) if $runner && !$runner->job_count;
$runner->field(slots_per_job => $sconf->default_slots_per_job || $sconf->max_slots_per_job) if $runner && !$runner->slots_per_job;
my $run_slots = $runner->job_count;
my $job_slots = $runner->slots_per_job;
die "Requested job count ($run_slots) exceeds the system shared limit (" . $sconf->max_slots_per_run . ").\n"
if $run_slots > $sconf->max_slots_per_run;
die "Requested job concurrency ($job_slots) exceeds the system shared limit (" . $sconf->max_slots_per_job . ").\n"
if $job_slots > $sconf->max_slots_per_job;
}
$runner->field(job_count => 1) if $runner && !$runner->job_count;
$runner->field(slots_per_job => 1) if $runner && !$runner->slots_per_job;
my $run_slots = $runner->job_count;
my $job_slots = $runner->slots_per_job;
die "The slots_per_job (set to $job_slots) must not be larger than the job_count (set to $run_slots).\n" if $job_slots > $run_slots;
}
1;
Loading

0 comments on commit ae8d143

Please sign in to comment.