Skip to content

Commit

Permalink
feat: add Kinesis integration & support (#603)
Browse files Browse the repository at this point in the history
AWS Kinesis doesn't allow out of the box Auth header support,
instead all the custom (user provided) headers are sent under
the header `x-amz-firehose-common-attributes`, with value as
{"commonAttributes":{"Authorization":"Basic <<base64 encoded 
credentials","X-P-Stream":"<>","Content-Type":"application/json"}}

This PR adds a mechanism to look for the kinesis custom header
in the actix middleware and update the contained headers to proper
top level headers

fixes #602
  • Loading branch information
nikhilsinhaparseable authored Jan 8, 2024
1 parent ce58c6c commit 238b9ae
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod livetail;
const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const AUTHORIZATION_KEY: &str = "authorization";
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
const SEPARATOR: char = '^';

const OIDC_SCOPE: &str = "openid profile email";
Expand Down
45 changes: 45 additions & 0 deletions server/src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,35 @@ use std::future::{ready, Ready};
use actix_web::{
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized},
http::header::{self, HeaderName},
Error, Route,
};
use futures_util::future::LocalBoxFuture;

use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
use crate::{
option::CONFIG,
rbac::Users,
rbac::{self, role::Action},
utils::actix::extract_session_key,
};

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Message {
#[serde(rename = "commonAttributes")]
common_attributes: CommonAttributes,
}

#[derive(Serialize, Deserialize, Debug)]
struct CommonAttributes {
#[serde(rename = "Authorization")]
authorization: String,
#[serde(rename = "X-P-Stream")]
x_p_stream: String,
}

pub trait RouteExt {
fn authorize(self, action: Action) -> Self;
fn authorize_for_stream(self, action: Action) -> Self;
Expand Down Expand Up @@ -108,6 +126,33 @@ where
forward_ready!(service);

fn call(&self, mut req: ServiceRequest) -> Self::Future {
/*Below section is added to extract the Authorization and X-P-Stream headers from x-amz-firehose-common-attributes custom header
when request is made from Kinesis Firehose.
For requests made from other clients, no change.
## Section start */
if let Some((_, kinesis_common_attributes)) = req
.request()
.headers()
.iter()
.find(|&(key, _)| key == KINESIS_COMMON_ATTRIBUTES_KEY)
{
let attribute_value: &str = kinesis_common_attributes.to_str().unwrap();
let message: Message = serde_json::from_str(attribute_value).unwrap();
req.headers_mut().insert(
HeaderName::from_static(AUTHORIZATION_KEY),
header::HeaderValue::from_str(&message.common_attributes.authorization.clone())
.unwrap(),
);
req.headers_mut().insert(
HeaderName::from_static(STREAM_NAME_HEADER_KEY),
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
.unwrap(),
);
}

/* ## Section end */

let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action);
let fut = self.service.call(req);
Box::pin(async move {
Expand Down

0 comments on commit 238b9ae

Please sign in to comment.