diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 043f1d4..d400deb 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -34,8 +34,6 @@ pub async fn task( let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); - let grpc_flow = GrpcEventFlow::new(entity_type.clone(), grpc_producer); - let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); let task_entity_type = entity_type.clone(); @@ -60,7 +58,7 @@ pub async fn task( let source_provider = CommitLogSourceProvider::new( commit_log, EventEnvelopeMarshaler { - entity_type, + entity_type: entity_type.clone(), events_key_secret_path: Arc::from(events_key_secret_path), secret_store: secret_store.clone(), }, @@ -92,6 +90,7 @@ pub async fn task( // gRPC events to a remote consumer. The handler is a "flowing" one // where an upper limit of the number of envelopes in-flight is set. + let grpc_flow = GrpcEventFlow::new(entity_type, grpc_producer); let producer_filter = |_: &CommitLogEventEnvelope| true; let consumer_filter = Filter::default(); let event_handler = grpc_flow.handler( @@ -100,6 +99,7 @@ pub async fn task( consumer_filter, transformer, ); + akka_projection_rs_storage::run( &secret_store, &offsets_key_secret_path,