Skip to content

Commit

Permalink
WIP: debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
thedodd committed May 20, 2024
1 parent b7bee58 commit 113a494
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,21 @@ impl BrokerTask {
tracing::trace!("handling broker response for request {}", resp.correlation_id);
let Some(pending) = self.requests.remove(&resp.correlation_id) else { return };
let header_version = pending.api_key.response_header_version(pending.api_version);
let Ok(response_header) = ResponseHeader::decode(&mut resp.body, header_version) else {
if let Some(chan) = pending.chan {
chan.send(BrokerResponse {
id: pending.id,
result: Err(BrokerRequestError {
payload: pending.request.kind,
kind: BrokerErrorKind::MalformedBrokerResponse,
}),
});
let response_header = match ResponseHeader::decode(&mut resp.body, header_version) {
Ok(response_header) => response_header,
Err(err) => {
tracing::error!(error = ?err, "error decoding response header from broker");
if let Some(chan) = pending.chan {
chan.send(BrokerResponse {
id: pending.id,
result: Err(BrokerRequestError {
payload: pending.request.kind,
kind: BrokerErrorKind::MalformedBrokerResponse,
}),
});
}
return;
}
return;
};

// Decode body based on API key.
Expand Down Expand Up @@ -403,17 +407,21 @@ impl BrokerTask {
ApiKey::ListTransactionsKey => ListTransactionsResponse::decode(&mut resp.body, pending.api_version).map(ResponseKind::ListTransactionsResponse),
ApiKey::AllocateProducerIdsKey => AllocateProducerIdsResponse::decode(&mut resp.body, pending.api_version).map(ResponseKind::AllocateProducerIdsResponse),
};
let Ok(response_body) = res else {
if let Some(chan) = pending.chan {
chan.send(BrokerResponse {
id: pending.id,
result: Err(BrokerRequestError {
payload: pending.request.kind,
kind: BrokerErrorKind::MalformedBrokerResponse,
}),
});
let response_body = match res {
Ok(response_body) => response_body,
Err(err) => {
tracing::error!(error = ?err, api_key = ?pending.api_key, "error decoding response body from broker");
if let Some(chan) = pending.chan {
chan.send(BrokerResponse {
id: pending.id,
result: Err(BrokerRequestError {
payload: pending.request.kind,
kind: BrokerErrorKind::MalformedBrokerResponse,
}),
});
}
return;
}
return;
};

// If this is an API versions response, always update our local cache of version info.
Expand Down

0 comments on commit 113a494

Please sign in to comment.