Skip to content

Commit

Permalink
Fix/instrumentation (#200)
Browse files Browse the repository at this point in the history
* update: fixed block_state collection
  • Loading branch information
heemankv authored Dec 27, 2024
1 parent a008642 commit 15b321b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- refactor: instrumentation
- refactor: instrumentations
- `is_worker_enabled` status check moved from `VerificationFailed` to `Failed`
- refactor: static attributes for telemetry
- refactor: aws setup for Event Bridge
Expand Down
21 changes: 18 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,16 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
JobError::Other(OtherError(e))
})?;

let attributes = [
let attributes = vec![
KeyValue::new("operation_job_type", format!("{:?}", job.job_type)),
KeyValue::new("operation_type", "process_job"),
];

tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
register_block_gauge(&job, &attributes)?;
Ok(())
}

Expand Down Expand Up @@ -476,7 +476,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let duration = start.elapsed();
ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes);
ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes);
ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes);
register_block_gauge(&job, &attributes)?;
Ok(())
}

Expand All @@ -497,6 +497,21 @@ pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), Job
.await
}

fn register_block_gauge(job: &JobItem, attributes: &[KeyValue]) -> Result<(), JobError> {
let block_number = if let JobType::StateTransition = job.job_type {
parse_string(
job.external_id
.unwrap_string()
.map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?,
)
} else {
parse_string(&job.internal_id)
}?;

ORCHESTRATOR_METRICS.block_gauge.record(block_number, attributes);
Ok(())
}

async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String) -> Result<(), JobError> {
if job.status == JobStatus::Completed {
tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue");
Expand Down

0 comments on commit 15b321b

Please sign in to comment.