From e44d8d12ae18bad1185966df0d96474c25d8fad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20G=C3=B6rtler?= Date: Thu, 2 Jan 2025 14:43:19 +0100 Subject: [PATCH] feat: add `rpc` calls to remove recordings from catalog --- .../proto/rerun/v0/remote_store.proto | 15 ++ .../re_protos/src/v0/rerun.remote_store.v0.rs | 177 ++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/crates/store/re_protos/proto/rerun/v0/remote_store.proto b/crates/store/re_protos/proto/rerun/v0/remote_store.proto index 5d2f68ca37940..a2a54aae468c5 100644 --- a/crates/store/re_protos/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_protos/proto/rerun/v0/remote_store.proto @@ -12,10 +12,25 @@ service StorageNode { // metadata API calls rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {} rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {} + + rpc Remove(RemoveRecordingRequest) returns (RemoveRecordingResponse) {} + rpc RemoveAll(RemoveAllRecordingRequest) returns (RemoveAllRecordingResponse) {} + // TODO(zehiko) support registering more than one recording at a time rpc RegisterRecording(RegisterRecordingRequest) returns (DataframePart) {} } +// ---------------- Remove from catalog ------------------ + +message RemoveRecordingRequest { + // unique identifier of the recording + rerun.common.v0.RecordingId recording_id = 1; +} +message RemoveRecordingResponse {} + +message RemoveAllRecordingRequest {} +message RemoveAllRecordingResponse {} + // ---------------- Common response message ------------------ // DataframePart is arrow IPC encoded RecordBatch diff --git a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs index a755b57daac49..6d8873e38a983 100644 --- a/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.remote_store.v0.rs @@ -1,4 +1,56 @@ // This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RemoveRecordingRequest { + /// unique identifier of the recording + #[prost(message, optional, tag = "1")] + pub recording_id: ::core::option::Option, +} +impl ::prost::Name for RemoveRecordingRequest { + const NAME: &'static str = "RemoveRecordingRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.RemoveRecordingRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.RemoveRecordingRequest".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct RemoveRecordingResponse {} +impl ::prost::Name for RemoveRecordingResponse { + const NAME: &'static str = "RemoveRecordingResponse"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.RemoveRecordingResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.RemoveRecordingResponse".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct RemoveAllRecordingRequest {} +impl ::prost::Name for RemoveAllRecordingRequest { + const NAME: &'static str = "RemoveAllRecordingRequest"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.RemoveAllRecordingRequest".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.RemoveAllRecordingRequest".into() + } +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct RemoveAllRecordingResponse {} +impl ::prost::Name for RemoveAllRecordingResponse { + const NAME: &'static str = "RemoveAllRecordingResponse"; + const PACKAGE: &'static str = "rerun.remote_store.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.remote_store.v0.RemoveAllRecordingResponse".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.remote_store.v0.RemoveAllRecordingResponse".into() + } +} /// DataframePart is arrow IPC encoded RecordBatch #[derive(Clone, PartialEq, ::prost::Message)] pub struct DataframePart { @@ -444,6 +496,43 @@ pub mod storage_node_client { )); self.inner.unary(req, path, codec).await } + pub async fn remove( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = + http::uri::PathAndQuery::from_static("/rerun.remote_store.v0.StorageNode/Remove"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "Remove", + )); + self.inner.unary(req, path, codec).await + } + pub async fn remove_all( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.remote_store.v0.StorageNode/RemoveAll", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.remote_store.v0.StorageNode", + "RemoveAll", + )); + self.inner.unary(req, path, codec).await + } /// TODO(zehiko) support registering more than one recording at a time pub async fn register_recording( &mut self, @@ -514,6 +603,14 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn remove( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn remove_all( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; /// TODO(zehiko) support registering more than one recording at a time async fn register_recording( &self, @@ -762,6 +859,86 @@ pub mod storage_node_server { }; Box::pin(fut) } + "/rerun.remote_store.v0.StorageNode/Remove" => { + #[allow(non_camel_case_types)] + struct RemoveSvc(pub Arc); + impl tonic::server::UnaryService for RemoveSvc { + type Response = super::RemoveRecordingResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = + async move { ::remove(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RemoveSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.remote_store.v0.StorageNode/RemoveAll" => { + #[allow(non_camel_case_types)] + struct RemoveAllSvc(pub Arc); + impl + tonic::server::UnaryService + for RemoveAllSvc + { + type Response = super::RemoveAllRecordingResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::remove_all(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RemoveAllSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/rerun.remote_store.v0.StorageNode/RegisterRecording" => { #[allow(non_camel_case_types)] struct RegisterRecordingSvc(pub Arc);