Skip to content

Commit

Permalink
fixme
Browse files Browse the repository at this point in the history
  • Loading branch information
exodist committed Apr 27, 2024
1 parent 9a68b39 commit 1e745ea
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 99 deletions.
44 changes: 23 additions & 21 deletions lib/App/Yath/Command/db/importer.pm
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
package App::Yath::Command::ui::importer;
package App::Yath::Command::db::importer;
use strict;
use warnings;

our $VERSION = '2.000000';

use Test2::Harness::UI;
use Test2::Harness::UI::Config;
use Test2::Harness::UI::Importer;
sub summary { "Start an importer process that will wait for uploaded logs to import" }
sub description { "Start an importer process that will wait for uploaded logs to import" }
sub group { "db" }

sub summary { "No Summary" }
sub description { "No Description" }
sub group { "ui" }
use Test2::Harness::Schema::Loader;

sub cli_args { }
use Test2::Harness::Schema::Util qw/schema_config_from_settings/;

sub run {
use parent 'App::Yath::Command';
use Getopt::Yath;

include_options(
'App::Yath::Options::DB',
);

my ($dsn, $user, $pass, $seed) = @ARGV;
sub run {
my $self = shift;

$user ||= '';
$pass ||= '';
my $settings = $self->settings;
my $config = schema_config_from_settings($settings);

srand($seed) if $seed;
$SIG{INT} = sub { exit 0 };
$SIG{TERM} = sub { exit 0 };

my $config = Test2::Harness::UI::Config->new(
dbi_dsn => $dsn,
dbi_user => $user,
dbi_pass => $pass,
);
Test2::Harness::UI::Importer->new(config => $config)->run;
}

1;

$SIG{INT} = sub { exit 0 };
$SIG{TERM} = sub { exit 0 };
__END__
Test2::Harness::UI::Importer->new(config => $config)->run;
=head1 POD IS AUTO-GENERATED
49 changes: 18 additions & 31 deletions lib/App/Yath/Command/db/publish.pm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package App::Yath::Command::publish;
package App::Yath::Command::db::publish;
use strict;
use warnings;

Expand All @@ -7,26 +7,27 @@ our $VERSION = '2.000000';
use IO::Uncompress::Bunzip2 qw($Bunzip2Error);
use IO::Uncompress::Gunzip qw($GunzipError);

use Test2::Harness::UI::Util qw/ config_from_settings /;
use Test2::Harness::Schema::Util qw/schema_config_from_settings/;
use Test2::Harness::Util::JSON qw/decode_json/;

use Test2::Harness::Renderer::UIDB;
use Test2::Harness::Schema::RunProcessor;

use parent 'App::Yath::Command';
use Test2::Harness::Util::HashBase;

use Getopt::Yath;
include_options(
'App::Yath::Options::DB',
'App::Yath::Options::Upload',
);

sub summary { "Use the YathUIDB plugin to publish a log file" }
sub summary { "Publish a log file directly to a yath database" }

sub group { 'log' }

sub cli_args { "[--] event_log.jsonl[.gz|.bz2]" }

sub description {
return <<" EOT";
EOT
}
sub description { "Publish a log file directly to a yath database" }

sub run {
my $self = shift;
Expand All @@ -40,8 +41,6 @@ sub run {
die "'$file' is not a valid log file" unless -f $file;
die "'$file' does not look like a log file" unless $file =~ m/\.jsonl(\.(gz|bz2))?$/;

die "The YathUIDB plugin is required" unless $settings->check_prefix('yathui-db');

my $fh;
if ($file =~ m/\.bz2$/) {
$fh = IO::Uncompress::Bunzip2->new($file) or die "Could not open bz2 file: $Bunzip2Error";
Expand All @@ -53,22 +52,18 @@ sub run {
open($fh, '<', $file) or die "Could not open log file: $!";
}

my $config = config_from_settings($settings);
my $config = schema_config_from_settings($settings);

my $ydb = $self->settings->prefix('yathui-db');
my $yath = $settings->yathui;
my $user = $yath->user || $ENV{USER};

my $renderer = Test2::Harness::Renderer::UIDB->new(
config => $config,
settings => $settings,
user => $user,
);
my $ydb = $settings->db;
my $yup = $settings->upload;
my $user = $yup->user || $ENV{USER};

my $is_term = -t STDOUT ? 1 : 0;

print "\n" if $is_term;

my $cb = Test2::Harness::Schema::RunProcessor->process_lines($settings);

local $| = 1;
while (my $line = <$fh>) {
my $ln = $.;
Expand All @@ -78,20 +73,12 @@ sub run {

next if $line =~ m/^null$/ims;

my $ok = eval {
my $event = decode_json($line);
$renderer->_render_event($event);
1;
};
my $err = $@;
next if $ok;

die "Error processing log on line $ln: $err";
$cb->($line);
}

print "Upload Complete\n";
$cb->();

$renderer->finish();
print "Upload Complete\n";

return 0;
}
Expand Down
28 changes: 3 additions & 25 deletions lib/App/Yath/Command/db/sweeper.pm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package App::Yath::Command::ui::sweeper;
package App::Yath::Command::db::sweeper;
use strict;
use warnings;

Expand Down Expand Up @@ -103,36 +103,14 @@ sub run {
my $config = schema_config_from_settings($settings);

my $sweeper = Test2::Harness::Schema::Sweeper->new(
$settings->sweeper->all,
interval => $settings->sweeper->interval,
);

$sweeper->sweep(coverage => 0);
$sweeper->sweep($settings->sweeper->all);
}

1;

__END__
=head1 POD IS AUTO-GENERATED
if (grep { m/^-+(h(?:elp)?|\?)$/ } @ARGV) {
print "Usage: $0 'DSN' ['USER'] ['PASSWORD'] 'INTERVAL'\nDSN and Interval are required, sql username and password are optional.\n";
exit 0;
}
my $dsn = shift @ARGV // die "Must provide a DSN as the first command line argument";
my $interval = pop @ARGV // die "Must provide an sql interval value (Example: '2 day') as the final command line argument";
my ($user, $pass) = @ARGV;
$interval //= "10 day";
my $config = Test2::Harness::UI::Config->new(
dbi_dsn => $dsn,
dbi_user => $user // '',
dbi_pass => $pass // '',
single_user => 1,
show_user => 0,
);
91 changes: 69 additions & 22 deletions lib/Test2/Harness/Schema/RunProcessor.pm
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,50 @@ sub process_stdin {
my $class = shift;
my ($settings) = @_;

return $class->process_handle(\*STDIN, $settings);
}


sub process_handle {
my $class = shift;
my ($fh, $settings) = @_;

my $cb = $class->process_lines($settings);

while (my $line = <$fh>) {
$cb->($line);
}
}

sub process_lines {
my $class = shift;
my ($settings) = @_;

my $done = 0;
my ($next, $last);
return sub {
my $line = shift;

croak "Call to process lines callback after an undef line" if $done;

if (!defined($line)) {
$done++;
$last->();
return;
}
elsif ($next) {
$next->($line);
}
else {
$next = $class->_process_first_line($line, $settings);
}
};
}

sub _process_first_line {
my $class = shift;
my ($line, $settings) = @_;

my $run;
my $self;
my $config = schema_config_from_settings($settings);
Expand All @@ -69,7 +113,7 @@ sub process_stdin {
$dbh->{mysql_auto_reconnect} = 1 if $Test2::Harness::UI::Schema::LOADED =~ m/mysql/i;
}

my $e = decode_json(scalar <STDIN>);
my $e = decode_json(scalar $line);
my $f = $e->{facet_data};

my $run_id;
Expand Down Expand Up @@ -112,34 +156,37 @@ sub process_stdin {
print STDOUT $links;
}

my $ok = eval {
local $SIG{INT} = sub { $self->set_signal('INT'); die "Caught Signal 'INT'\n"; };
local $SIG{TERM} = sub { $self->set_signal('TERM'); die "Caught Signal 'TERM'\n"; };
my $int = $SIG{INT};
my $term = $SIG{TERM};

while (my $json = <STDIN>) {
my $e = decode_json($json);
$SIG{INT} = sub { $self->set_signal('INT'); die "Caught Signal 'INT'\n"; };
$SIG{TERM} = sub { $self->set_signal('TERM'); die "Caught Signal 'TERM'\n"; };

next if eval { $self->process_event($e); 1 };
return (
sub {
my $line = shift;

return if eval {
my $e = decode_json($line);
$self->process_event($e);
1;
};
my $err = $@;

die $err if $self->{+SIGNAL};
warn "Error sending event(s) to database:\n====\n$err\n====\n";
}

1;
};
my $err = $@;

unless ($ok) {
my $run = $self->{+RUN} or return;
$run->update({status => 'canceled', error => $err});
}

$self->finish;

print STDOUT $links if $links;
die $err if $self->{+SIGNAL};
my $run = $self->{+RUN} or return;
$run->update({status => 'canceled', error => $err});
},
sub {
$self->finish;
print STDOUT $links if $links;

return 0;
$SIG{INT} = $int;
$SIG{TERM} = $term;
}
);
}

sub trim_error {
Expand Down

0 comments on commit 1e745ea

Please sign in to comment.