From 6835bfc66503b8f9fb456be7dd1851b53a7fc4a1 Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Mon, 29 Jul 2024 15:18:41 +0200 Subject: [PATCH] corro-client: handle deserialization error correctly Change ID still needs to be updated even if the event fails to deserialize --- crates/corro-client/src/sub.rs | 57 ++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/crates/corro-client/src/sub.rs b/crates/corro-client/src/sub.rs index cb7039dd..25fb4bcd 100644 --- a/crates/corro-client/src/sub.rs +++ b/crates/corro-client/src/sub.rs @@ -8,7 +8,7 @@ use std::{ }; use bytes::{Buf, Bytes, BytesMut}; -use corro_api_types::{ChangeId, TypedQueryEvent}; +use corro_api_types::{ChangeId, QueryEvent, TypedQueryEvent}; use futures::{ready, Future, Stream}; use hyper::{client::HttpConnector, Body}; use pin_project_lite::pin_project; @@ -144,24 +144,34 @@ where Some(Ok(b)) => match serde_json::from_slice(&b) { Ok(evt) => { if let TypedQueryEvent::EndOfQuery { change_id, .. } = &evt { - self.observed_eoq = true; - self.last_change_id = *change_id; + self.handle_eoq(*change_id); } + if let TypedQueryEvent::Change(_, _, _, change_id) = &evt { - match self.last_change_id { - Some(id) if id + 1 != *change_id => { - return Poll::Ready(Some(Err(SubscriptionError::MissedChange { - expected: id + 1, - got: *change_id, - }))) - } - _ => (), + if let Err(e) = self.handle_change(*change_id) { + return Poll::Ready(Some(Err(e))); } - self.last_change_id = Some(*change_id); } + Poll::Ready(Some(Ok(evt))) } - Err(e) => Poll::Ready(Some(Err(e.into()))), + Err(deser_err) => { + // It failed to deserialize, try untyped variant to extract the metadata + if let Ok(evt) = serde_json::from_slice::(&b) { + if let TypedQueryEvent::EndOfQuery { change_id, .. } = &evt { + self.handle_eoq(*change_id); + } + + if let TypedQueryEvent::Change(_, _, _, change_id) = &evt { + if let Err(e) = self.handle_change(*change_id) { + return Poll::Ready(Some(Err(e))); + } + } + } + + // But return the original error anyway (unless this is out-of-order event) + Poll::Ready(Some(Err(deser_err.into()))) + } }, Some(Err(e)) => match e { LinesCodecError::MaxLineLengthExceeded => { @@ -173,6 +183,27 @@ where } } + fn handle_eoq(&mut self, change_id: Option) { + self.observed_eoq = true; + self.last_change_id = change_id; + } + + fn handle_change(&mut self, change_id: ChangeId) -> Result<(), SubscriptionError> { + match self.last_change_id { + Some(id) if id + 1 != change_id => { + return Err(SubscriptionError::MissedChange { + expected: id + 1, + got: change_id, + }) + } + _ => (), + } + + self.last_change_id = Some(change_id); + + Ok(()) + } + fn poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>,