Skip to content

Commit

Permalink
Migrate the runner to produce the new RunnerOutput proto.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 672564115
  • Loading branch information
ksteuck authored and copybara-github committed Sep 9, 2024
1 parent 035a0fd commit 4104226
Show file tree
Hide file tree
Showing 21 changed files with 520 additions and 293 deletions.
1 change: 1 addition & 0 deletions common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ cc_library(
":snapshot_enums",
":snapshot_types",
"@silifuzz//proto:snapshot_cc_proto",
"@silifuzz//proto:snapshot_execution_result_cc_proto",
"@silifuzz//util:checks",
"@silifuzz//util:misc_util",
"@silifuzz//util:platform",
Expand Down
21 changes: 20 additions & 1 deletion common/snapshot_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Endpoint final {
using SigCause = snapshot_types::SigCause;
using enum snapshot_types::SigCause;

// Enpoint that is the first occurrence of reaching the instruction_address.
// Endpoint that is the first occurrence of reaching the instruction_address.
// I.e. type() == kInstruction.
explicit Endpoint(Address instruction_address);

Expand Down Expand Up @@ -254,6 +254,25 @@ enum class MakerStopReason {
kSignal,
};

// Status codes for runner execution.
// Parallels RunnerOutput::ExecutionResult::StatusCode proto.
enum class RunnerExecutionStatusCode {
kInternalError = 0,
kOk = 1,
kUnhandledSignal = 2,
kMmapFailed = 3,
kOverlappingMappings = 4,
kInitialChecksumMismatch = 5,
kSnapshotFailed = 6,
};

// Parallels RunnerOutput::ExecutionResult::ChecksumStatus proto.
enum class RunnerPostfailureChecksumStatus {
kNotChecked = 0,
kMatch = 1,
kMismatch = 2,
};

} // namespace snapshot_types

template <>
Expand Down
36 changes: 36 additions & 0 deletions common/snapshot_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "./common/snapshot.h"
#include "./common/snapshot_enums.h"
#include "./proto/snapshot.pb.h"
#include "./proto/snapshot_execution_result.pb.h"
#include "./util/checks.h"
#include "./util/misc_util.h"
#include "./util/platform.h"
Expand Down Expand Up @@ -133,6 +134,41 @@ static_assert(ToInt(PlatformId::kAmdRyzenV3000) ==
static_assert(ToInt(PlatformId::kArmNeoverseV2) ==
ToInt(proto::PlatformId::ARM_NEOVERSE_V2));

static_assert(ToInt(snapshot_types::RunnerExecutionStatusCode::kOk) ==
ToInt(proto::RunnerOutput::ExecutionResult::OK));
static_assert(
ToInt(snapshot_types::RunnerExecutionStatusCode::kInternalError) ==
ToInt(proto::RunnerOutput::ExecutionResult::INTERNAL_ERROR));
static_assert(
ToInt(snapshot_types::RunnerExecutionStatusCode::kUnhandledSignal) ==
ToInt(proto::RunnerOutput::ExecutionResult::UNHANDLED_SIGNAL));
static_assert(ToInt(snapshot_types::RunnerExecutionStatusCode::kMmapFailed) ==
ToInt(proto::RunnerOutput::ExecutionResult::MMAP_FAILED));
static_assert(
ToInt(snapshot_types::RunnerExecutionStatusCode::kOverlappingMappings) ==
ToInt(proto::RunnerOutput::ExecutionResult::OVERLAPPING_MAPPINGS));
static_assert(
ToInt(
snapshot_types::RunnerExecutionStatusCode::kInitialChecksumMismatch) ==
ToInt(proto::RunnerOutput::ExecutionResult::INITIAL_CHECKSUM_MISMATCH));
static_assert(
ToInt(snapshot_types::RunnerExecutionStatusCode::kSnapshotFailed) ==
ToInt(proto::RunnerOutput::ExecutionResult::SNAPSHOT_FAILED));

static_assert(
ToInt(snapshot_types::RunnerPostfailureChecksumStatus::kNotChecked) ==
ToInt(proto::RunnerOutput::ChecksumStatus::
RunnerOutput_ChecksumStatus_NOT_CHECKED));

static_assert(ToInt(snapshot_types::RunnerPostfailureChecksumStatus::kMatch) ==
ToInt(proto::RunnerOutput::ChecksumStatus::
RunnerOutput_ChecksumStatus_MATCH));

static_assert(
ToInt(snapshot_types::RunnerPostfailureChecksumStatus::kMismatch) ==
ToInt(proto::RunnerOutput::ChecksumStatus::
RunnerOutput_ChecksumStatus_MISMATCH));

static_assert(proto::PlatformId_MAX < 64,
"PlatformId_MAX is too large to fit in EndState::platforms");

Expand Down
3 changes: 1 addition & 2 deletions orchestrator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ cc_library(
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/base:log_severity",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
Expand Down Expand Up @@ -115,11 +114,11 @@ cc_test(
deps = [
":binary_log_channel",
":result_collector",
"@silifuzz//common:snapshot_enums",
"@silifuzz//proto:binary_log_entry_cc_proto",
"@silifuzz//proto:snapshot_execution_result_cc_proto",
"@silifuzz//runner/driver:runner_driver",
"@silifuzz//util/testing:status_macros",
"@com_google_absl//absl/strings:string_view",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
],
Expand Down
48 changes: 45 additions & 3 deletions orchestrator/orchestrator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def setUpClass(cls):
super(OrchestratorTest, cls).setUpClass()
corpus_paths = [_ENDS_AS_EXPECTED_CORPUS_PATH, _RUNAWAY_CORPUS_PATH]
for i, original_path in enumerate(corpus_paths):
contents = open(original_path, 'rb').read()
with open(original_path, 'rb') as f:
contents = f.read()
# Compress one corpus only. The orchestrator can load
# both corpora with and without compression.
is_compressed = i > 0
Expand Down Expand Up @@ -217,7 +218,9 @@ def test_snap_failure(self):
(err_log, returncode) = self.run_orchestrator(
['snap_fail'], extra_args=['--enable_v1_compat_logging']
)
self.assertEqual(returncode, 1)
self.assertEqual(
returncode, 1, msg='Expected EXIT_FAILURE ' + '\n'.join(err_log)
)
latest_entry = self._parse_v1_log(err_log)
self.assertGreater(
int(latest_entry['issues_detected']),
Expand Down Expand Up @@ -287,11 +290,50 @@ def test_duration(self):
err_log,
[
'corpus: ends_as_expected_corpus',
'error: Runner killed by signal 14',
'Runner killed by signal 14',
'exit_status: internal_error',
],
)

def test_checksum_mismatch(self):
(err_log, returncode) = self.run_orchestrator(['checksum_mismatch'])
self.assertEqual(
returncode, 1, msg='Expected EXIT_FAILURE ' + '\n'.join(err_log)
)
self.assertStrSeqContainsAll(
err_log,
[
'Snapshot checksum mismatch',
],
)

def test_no_execution_result(self):
(err_log, returncode) = self.run_orchestrator(['no_execution_result'])
self.assertEqual(returncode, 0)
self.assertStrSeqContainsAll(
err_log,
[
'Missing required execution_result',
],
)

def test_mmap_failed(self):
(err_log, returncode) = self.run_orchestrator(['mmap_failed'])
self.assertEqual(
returncode,
0,
msg=(
'Expected no failure from the orchestrator if the runner fails to'
' map memory'
),
)
self.assertStrSeqContainsAll(
err_log,
[
'failed to map memory',
],
)

def test_watchdog(self):
(err_log, returncode) = self.run_orchestrator(
['ignore_alarm', 'sleep100'],
Expand Down
48 changes: 29 additions & 19 deletions orchestrator/result_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ absl::StatusOr<proto::BinaryLogEntry> RunResultToSnapshotExecutionResult(
proto::BinaryLogEntry entry;
proto::SnapshotExecutionResult *snapshot_execution_result =
entry.mutable_snapshot_execution_result();
snapshot_execution_result->set_snapshot_id(run_result.snapshot_id());
snapshot_execution_result->set_snapshot_id(run_result.failed_snapshot_id());
RETURN_IF_NOT_OK(PlayerResultProto::ToProto(
run_result.player_result(),
run_result.failed_player_result(),
*snapshot_execution_result->mutable_player_result()));

snapshot_execution_result->set_hostname(std::string(ShortHostname()));
Expand Down Expand Up @@ -123,8 +123,8 @@ void LogV1SingleSnapFailure(const RunnerDriver::RunResult &run_result) {
absl::GetFlag(FLAGS_enable_v1_compat_logging);
if (!enable_v1_compat_logging) return;
std::cerr << "Silifuzz detected issue on CPU "
<< run_result.player_result().cpu_id << " running snapshot "
<< run_result.snapshot_id() << '\n';
<< run_result.failed_player_result().cpu_id << " running snapshot "
<< run_result.failed_snapshot_id() << std::endl;
}

// Logs V1-style summary e.g.
Expand All @@ -151,7 +151,7 @@ void LogV1CompatSummary(const Summary &summary, absl::Duration elapsed,
<< ", snapshot_execution_errors = 0"
<< ", runaway_count = " << summary.num_runaway_snapshots
<< ", max_rss_kb = " << max_rss_kb
<< ", had_checker_misconfigurations = false}" << '\n';
<< ", had_checker_misconfigurations = false}" << std::endl;
}

} // namespace
Expand All @@ -175,23 +175,33 @@ bool ResultCollector::operator()(const RunnerDriver::RunResult &result) {
max_rss_kb_ = std::max(max_rss_kb_, MaxRunnerRssSizeBytes(getpid()) / 1024);
bool should_stop = false;
if (!result.success()) {
if (result.player_result().outcome ==
snapshot_types::PlaybackOutcome::kExecutionRunaway) {
summary_.num_runaway_snapshots++;
if (!options_.report_runaways_as_errors) return false;
if (!result.execution_result().ok()) {
// TODO(ksteuck) Report the error to `binary_log_producer_`.
VLOG_INFO(1, result.execution_result().DebugString());
}
++summary_.num_failed_snapshots;
LogV1SingleSnapFailure(result);
absl::StatusOr<proto::BinaryLogEntry> entry_or =
RunResultToSnapshotExecutionResult(result, absl::Now(), session_id_);
if (entry_or.ok()) {
if (binary_log_producer_) {
if (absl::Status s = binary_log_producer_->Send(*entry_or); !s.ok()) {
LOG_ERROR(s.message());
if (result.has_failed_player_result()) {
// Currently, only explicitly failed snapshots count towards the overall
// failure status.
// TODO(ksteuck) Consider what non-successful execution_result codes
// should be considered as failures.
if (result.failed_player_result().outcome ==
snapshot_types::PlaybackOutcome::kExecutionRunaway) {
summary_.num_runaway_snapshots++;
if (!options_.report_runaways_as_errors) return false;
}
++summary_.num_failed_snapshots;
LogV1SingleSnapFailure(result);
absl::StatusOr<proto::BinaryLogEntry> entry =
RunResultToSnapshotExecutionResult(result, absl::Now(), session_id_);
if (entry.ok()) {
if (binary_log_producer_) {
if (absl::Status s = binary_log_producer_->Send(*entry); !s.ok()) {
LOG_ERROR(s.message());
}
}
} else {
LOG_ERROR(entry.status().message());
}
} else {
LOG_ERROR(entry_or.status().message());
}
should_stop = (--options_.fail_after_n_errors <= 0);
}
Expand Down
23 changes: 14 additions & 9 deletions orchestrator/result_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@
#include <unistd.h>

#include "gtest/gtest.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "./common/snapshot_enums.h"
#include "./orchestrator/binary_log_channel.h"
#include "./proto/binary_log_entry.pb.h"
#include "./proto/snapshot_execution_result.pb.h"
#include "./runner/driver/runner_driver.h"
#include "./util/testing/status_macros.h"

namespace silifuzz {
namespace {

using snapshot_types::PlaybackOutcome;
class RunResultPeer {
public:
static RunnerDriver::RunResult SnapshotFailed(absl::string_view snapshot_id) {
RunnerDriver::PlayerResult result = {
.outcome = PlaybackOutcome::kExecutionMisbehave};
return RunnerDriver::RunResult(
RunnerDriver::ExecutionResult::SnapshotFailed(snapshot_id), result, {},
snapshot_id);
}
};

namespace {
TEST(ResultCollector, Simple) {
ResultCollector collector(-1, absl::Now(), {});
collector(RunnerDriver::RunResult::Successful({}));
ASSERT_EQ(collector.summary().play_count, 1);
ASSERT_EQ(collector.summary().num_failed_snapshots, 0);
RunnerDriver::PlayerResult result = {
.outcome = PlaybackOutcome::kExecutionMisbehave};
collector(RunnerDriver::RunResult(result, {}, "snap_id"));
collector(RunResultPeer::SnapshotFailed("snap_id"));
ASSERT_EQ(collector.summary().play_count, 2);
ASSERT_EQ(collector.summary().num_failed_snapshots, 1);
}
Expand All @@ -48,9 +55,7 @@ TEST(ResultCollector, BinaryLogging) {
{
ResultCollector collector(pipefd[1], absl::Now(), {});
collector(RunnerDriver::RunResult::Successful({}));
RunnerDriver::PlayerResult result = {
.outcome = PlaybackOutcome::kExecutionMisbehave};
collector(RunnerDriver::RunResult(result, {}, "snap_id"));
collector(RunResultPeer::SnapshotFailed("snap_id"));
// Let collector go out of scope which closes the write end of the pipe.
// This way the Receive() below does not block if logging misbehaves.
}
Expand Down
35 changes: 19 additions & 16 deletions orchestrator/silifuzz_orchestrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "absl/base/log_severity.h"
#include "absl/log/check.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
Expand All @@ -38,16 +37,17 @@ namespace silifuzz {

namespace {
// Returns a string representation of StatusOr<RunResult>.
std::string RunResultToDebugString(
const absl::StatusOr<RunnerDriver::RunResult> &run_result_or) {
if (run_result_or.ok()) {
if (run_result_or->success()) {
return "ok";
} else {
std::string RunResultToDebugString(const RunnerDriver::RunResult &run_result) {
if (run_result.success()) {
return "ok";
}
switch (run_result.execution_result().code) {
case RunnerDriver::ExecutionResult::Code::kInternalError:
return "internal_error";
case RunnerDriver::ExecutionResult::Code::kSnapshotFailed:
return "snap_fail";
}
} else {
return "internal_error";
default:
return "unknown";
}
}
} // namespace
Expand Down Expand Up @@ -173,22 +173,25 @@ void RunnerThread(ExecutionContext *ctx, const RunnerThreadArgs &args) {
const InMemoryShard &shard = args.corpora->shards[shard_idx];
RunnerDriver driver =
RunnerDriver::ReadingRunner(args.runner, shard.file_path, shard.name);
absl::StatusOr<RunnerDriver::RunResult> run_result_or =
driver.Run(runner_options);
RunnerDriver::RunResult run_result = driver.Run(runner_options);

absl::Duration elapsed_time = absl::Now() - start_time;

std::string log_msg = absl::StrCat(
"T", args.thread_idx, " cpu: ", args.runner_options.cpu(),
" corpus: ", shard.name, " time: ", absl::ToInt64Seconds(elapsed_time),
" exit_status: ", RunResultToDebugString(run_result_or));
if (!run_result_or.ok()) {
LOG_ERROR(log_msg, " error: ", run_result_or.status().message());
" exit_status: ", RunResultToDebugString(run_result));
if (!run_result.execution_result().ok()) {
LOG_ERROR(log_msg, " ", run_result.execution_result().DebugString());
if (run_result.postfailure_checksum_status() ==
RunnerPostfailureChecksumStatus::kMismatch) {
LOG_ERROR("Snapshot checksum mismatch");
}
} else {
VLOG_INFO(0, log_msg);
}

if (!ctx->OfferRunResult(std::move(run_result_or))) {
if (!ctx->OfferRunResult(std::move(run_result))) {
LOG_ERROR(
"T", args.thread_idx,
" Result processing queue is stuck, some results won't be logged");
Expand Down
Loading

0 comments on commit 4104226

Please sign in to comment.