Skip to content

Commit

Permalink
chore: review and FIXME
Browse files Browse the repository at this point in the history
  • Loading branch information
vigith committed Jun 3, 2024
1 parent 786a72e commit 2370fd7
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::StreamExt;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::{async_trait, Request, Response, Status};

use crate::error::Error;
Expand Down Expand Up @@ -322,7 +322,6 @@ where
.send(Err(error.clone().into()))
.await
.expect("send to response channel failed");
panic!("Error while performing reduce operation: {:?}", error);
}
});

Expand All @@ -346,10 +345,11 @@ where
if task_set.tasks.contains_key(&keys.join(KEY_JOIN_DELIMITER)) {
task_set.write_to_task(keys, rr).await;
} else {
task_set.create(keys, rr).await;
task_set.create_and_write(keys, rr).await;
}
}
Err(e) => {
// FIXME: who is receiving this error?
return Err(Status::internal(format!("Stream error: {}", e)));
}
}
Expand Down Expand Up @@ -488,7 +488,7 @@ where

/// Creates a new task with the given keys and `ReduceRequest`.
/// It creates a new reducer, starts it in a new task, and adds the task to the task set.
async fn create(&mut self, keys: Vec<String>, rr: proto::ReduceRequest) {
async fn create_and_write(&mut self, keys: Vec<String>, rr: proto::ReduceRequest) {
let (reduce_request, interval_window) = match self.validate_and_extract(rr).await {
Some(value) => value,
None => return,
Expand Down Expand Up @@ -722,15 +722,15 @@ impl<C> Server<C> {

#[cfg(test)]
mod tests {
use std::{error::Error, time::Duration};
use std::path::PathBuf;
use std::{error::Error, time::Duration};

use prost_types::Timestamp;
use tempfile::TempDir;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
use tonic::transport::Uri;
use tonic::Request;
use tower::service_fn;

use crate::reduce;
Expand Down

0 comments on commit 2370fd7

Please sign in to comment.