Skip to content

Commit

Permalink
rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Oct 19, 2023
1 parent 7b33373 commit f0b42a2
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions src/pubsub/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,25 @@ async fn subscriber_task(client: Arc<StreamConsumer>, topics: Vec<String>) {
while RUNNING.load(Ordering::Relaxed) {
match client.recv().await {
Ok(message) => match message.payload_view::<[u8]>() {
Some(Ok(message)) => {
match validator.validate(&mut message.to_owned()) {
Err(ValidationError::Unexpected) => {
error!("pubsub: invalid message received");
RESPONSE_EX.increment();
PUBSUB_RECEIVE_INVALID.increment();
continue;
}
Err(ValidationError::Corrupted) => {
error!("pubsub: corrupt message received");
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_CORRUPT.increment();
continue;
}
Ok(latency) => {
let _ = PUBSUB_LATENCY.increment(latency);
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_OK.increment();
}
Some(Ok(message)) => match validator.validate(&mut message.to_owned()) {
Err(ValidationError::Unexpected) => {
error!("pubsub: invalid message received");
RESPONSE_EX.increment();
PUBSUB_RECEIVE_INVALID.increment();
continue;
}
}
Err(ValidationError::Corrupted) => {
error!("pubsub: corrupt message received");
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_CORRUPT.increment();
continue;
}
Ok(latency) => {
let _ = PUBSUB_LATENCY.increment(latency);
PUBSUB_RECEIVE.increment();
PUBSUB_RECEIVE_OK.increment();
}
},
Some(Err(e)) => {
error!("Error in deserializing the message:{:?}", e);
PUBSUB_RECEIVE.increment();
Expand Down

0 comments on commit f0b42a2

Please sign in to comment.