Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added header values #51

Merged
merged 13 commits into from
Jun 4, 2024
Merged

Conversation

shubhamdixit863
Copy link
Contributor

This Pr Addresses this issue #48

map ,reduce and sourcetransform proto files have been modified to include header in incoming request.

How it is tested
Tested in local cluster

Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add support to set headers inside user defined source.

Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
@yhl25 yhl25 requested a review from vigith May 28, 2024 14:46
Comment on lines 55 to 58
let offset = self.read_idx.load(Ordering::Relaxed);

let mut headers=HashMap::new();
headers.insert(String::from("key"),String::from("key"));
// send the message to the transmitter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use some UUID instead of (key, key)

Comment on lines 66 to 69
event_time: chrono::offset::Utc::now(),
keys: vec![],
headers:headers.clone()
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to clone?

Copy link
Contributor Author

@shubhamdixit863 shubhamdixit863 May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah cloning is not good ,but sadly due to ownership rules i dnt have any other option except cloning or using Arc ,I have used Arc as of now its thread safe too @yhl25

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we just give the ownership? are we using the headers anywhere else?

Copy link
Contributor Author

@shubhamdixit863 shubhamdixit863 May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is every for loop iteration will be new scope ,if there was only one iteration it would have worked. @yhl25

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed the for loop.

Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
Copy link
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run rustfmt please

@@ -52,7 +54,9 @@ pub(crate) mod simple_source {
self.read_idx
.store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
let offset = self.read_idx.load(Ordering::Relaxed);

let mut headers=HashMap::new();
headers.insert(String::from( Uuid::new_v4()),String::from( Uuid::new_v4()));
Copy link
Member

@vigith vigith May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the both key and value, UUID?

src/source.rs Outdated
@@ -320,6 +325,9 @@ mod tests {
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>) {
let event_time = Utc::now();
let mut message_offsets = Vec::with_capacity(request.count);
let mut headers=HashMap::new();
headers.insert(String::from( Uuid::new_v4()),String::from( Uuid::new_v4()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

src/source.rs Outdated
@@ -332,6 +340,7 @@ mod tests {
partition_id: 0,
},
keys: vec![],
headers:Arc::clone(&shared_headers), // Cloning the Arc, not the HashMap,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
headers:Arc::clone(&shared_headers), // Cloning the Arc, not the HashMap,
headers:Arc::clone(&shared_headers),

no need of that comment since we can clearly see it is Arc::clone

Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
Comment on lines 57 to 60
let mut headers = HashMap::new();
let header_key=String::from(Uuid::new_v4());
let header_value= String::from("numaflow");
headers.insert(header_key, header_value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut headers = HashMap::new();
let header_key=String::from(Uuid::new_v4());
let header_value= String::from("numaflow");
headers.insert(header_key, header_value);
let mut headers = HashMap::new();
let header_key=String::from("x-txn-id");
let header_value= String::from(Uuid::new_v4());
headers.insert(header_key, header_value);

src/source.rs Outdated
Comment on lines 330 to 333
let mut headers = HashMap::new();
let header_key=String::from(Uuid::new_v4());
let header_value = String::from("numaflow");
headers.insert(header_key, header_value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut headers = HashMap::new();
let header_key=String::from(Uuid::new_v4());
let header_value = String::from("numaflow");
headers.insert(header_key, header_value);
let mut headers = HashMap::new();
let header_key=String::from("x-txn-id");
let header_value = String::from(Uuid::new_v4());
headers.insert(header_key, header_value);

also move the headers inside the for loop, ideally each message will have different txn-id

Signed-off-by: shubham <shubhamdixit863@gmail.com>
Signed-off-by: shubham <shubhamdixit863@gmail.com>
@yhl25 yhl25 requested a review from vigith June 4, 2024 16:21
@vigith vigith merged commit d8533ce into numaproj:main Jun 4, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants