Skip to content

Commit

Permalink
Merge pull request #54 from w6d-io/develop
Browse files Browse the repository at this point in the history
ADD] send error to kafka + some fix
  • Loading branch information
nhaquet-w6d authored Feb 19, 2024
2 parents 0d4b3c0 + 9fa9ddb commit 13c6bb1
Show file tree
Hide file tree
Showing 16 changed files with 963 additions and 662 deletions.
823 changes: 497 additions & 326 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
axum = "0.6.*"
axum = "0.7.*"
tower = "0.4.13"
tower-http = { version = "0.4.4", features = ["request-id"] }
tokio = { version = "1.34.*", features = ["rt-multi-thread", "macros", "sync"]}
tower-http = { version = "0.5.1", features = ["request-id", "trace"] }
tokio = { version = "1.36.*", features = ["rt-multi-thread", "macros", "sync"]}
serde = "1.0.*"
serde_json = "1.0.*"
rs-utils = {git = "https://github.com/w6d-io/rs-utils",features = ["kratos", "anyhow-rocket"]}
Expand All @@ -22,20 +22,22 @@ hyper = "0.14.23"
ory-kratos-client = "1.0.0"
futures = "0.3.26"
thiserror = "1.0.38"
axum-extra = { version = "0.8.0", features = ["cookie"] }
tonic = "0.10.*"
axum-extra = { version = "0.9.2", features = ["cookie"] }
tonic = "0.11.*"
prost = "0.12.*"
reqwest = "0.11.14"
serde-email = "3.0.0"
uuid = { version = "^1.5", features = ["serde"] }
stream-cancel = "0.8.2"
axum-macros = "0.4.1"

[dependencies.libkafka]
git = "https://github.com/w6d-io/libkafka"
branch = "develop"
features = ["async", "anyhow"]

[build-dependencies]
tonic-build = "0.10.*"
tonic-build = "0.11.*"

[dev-dependencies]
mime = "0.3.17"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.73-bullseye AS build
FROM rust:1.75-bullseye AS build
ARG JOB_TOKEN
ARG JOB_USER
ENV CARGO_NET_GIT_FETCH_WITH_CLI true
Expand Down
67 changes: 38 additions & 29 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
collections::HashMap,
fmt,
path::{Path, PathBuf},
sync::Arc,
};

use anyhow::{bail, Result};
Expand All @@ -25,47 +26,50 @@ pub const CONFIG_FALLBACK: &str = "test/config.toml";
///structure containing kafka consumer data
#[derive(Deserialize, Clone, Default)]
pub struct Producer {
pub broker: String,
pub topic: String,
pub topics: Vec<String>,

#[serde(skip)]
pub client: Option<KafkaProducer<FutureProducer, DefaultFutureContext>>,
pub clients: Option<HashMap<String, Arc<KafkaProducer<FutureProducer, DefaultFutureContext>>>>,
}

impl fmt::Debug for Producer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let client = match self.client {
Some(_) => "Some(_)",
None => "None",
f.debug_struct("Producer")
.field("topics", &self.topics)
.finish_non_exhaustive()
}
}

impl Producer {
///update the producer Producers if needed.
pub fn update(&mut self, broker: &str) -> Result<()> {
let mut new_producer = HashMap::new();
let producer = match self.clients {
Some(ref mut prod) => prod,
None => &mut new_producer,
};
write!(
f,
"Consumer: {{
brokers: {},
topic: {},
client: {}
}}",
self.broker, self.topic, client
)
for topic in self.topics.iter() {
producer.insert(
topic.to_owned(),
Arc::new(KafkaProducer::<FutureProducer, DefaultFutureContext>::new(
&default_config(broker),
topic,
)?),
);
}
Ok(())
}
}

#[derive(Deserialize, Clone, Default, Debug)]
pub struct Kafka {
pub producers: HashMap<String, Producer>,
pub broker: String,
pub producers: Producer,
}

impl Kafka {
fn update(&mut self) -> Result<&mut Self> {
let producers = &mut self.producers;
for producer in producers.values_mut() {
let new_producer: KafkaProducer<FutureProducer, DefaultFutureContext> =
KafkaProducer::<FutureProducer, DefaultFutureContext>::new(
&default_config(&producer.broker),
&producer.topic,
)?;
producer.client = Some(new_producer);
}
self.producers.update(&self.broker)?;
Ok(self)
}
}
Expand All @@ -89,14 +93,19 @@ pub struct Iam {
pub client: Option<IamClient<Channel>>,
}

#[derive(Deserialize, Clone, Default, Debug)]
pub struct Opa {
pub addr: String,
pub mode: String,
}

///structure containing the configuaration of the application
#[derive(Deserialize, Clone, Default, Debug)]
pub struct SiriusConfig {
// pub prefix: String,
pub mode: String,
pub service: Service,
pub iam: Iam,
pub opa: String,
pub opa: Opa,
pub kratos: Kratos,
pub kafka: Kafka,
#[serde(skip)]
Expand Down Expand Up @@ -149,15 +158,15 @@ mod test_config {
#[tokio::test]
async fn test_update_valid() {
let mut config = SiriusConfig::default();
config.set_path("test/config.toml");
config.set_path("tests/config.toml");
let res = config.update().await;
assert!(res.is_ok())
}

#[tokio::test]
async fn test_update_not_valid() {
let mut config = SiriusConfig::default();
config.set_path("test/not_config.toml");
config.set_path("tests/not_config.toml");
let res = config.update().await;
assert!(res.is_err())
}
Expand Down
50 changes: 21 additions & 29 deletions src/controller/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::{debug, error, info};

use crate::config::SiriusConfig;

fn populate_set(projects: &mut HashSet<String>, mut data: Value, request_id: &str) -> Result<()> {
fn populate_set(projects: &mut HashSet<String>, mut data: Value) -> Result<()> {
let data = data.take();
match data {
Value::Object(map) => {
Expand All @@ -28,37 +28,32 @@ fn populate_set(projects: &mut HashSet<String>, mut data: Value, request_id: &st
}
}
}
_ => bail!("{request_id}: This should be a map or an array!"),
_ => bail!("This should be a map or an array!"),
}
Ok(())
}

fn extract_projects(
projects: &mut HashSet<String>,
mut data: Value,
request_id: &str,
) -> Result<()> {
fn extract_projects(projects: &mut HashSet<String>, mut data: Value) -> Result<()> {
debug!("{data:?}");
let data = data
.as_object_mut()
.ok_or_else(|| anyhow!("this should be a map!"))?;
if !data.is_empty() {
for (_, val) in data.into_iter() {
if let Some(proj) = val.get_mut("project") {
populate_set(projects, proj.take(), request_id)?;
populate_set(projects, proj.take())?;
}
}
}
Ok(())
}

pub async fn list_project_controller(
request_id: &str,
identity: Identity,
config: SiriusConfig,
config: &SiriusConfig,
) -> Result<HashSet<String>> {
let mut projects = HashSet::new();
let meta = match &config.mode as &str {
let meta = match &config.opa.mode as &str {
"admin" => identity.metadata_admin,
"public" => identity.metadata_public,
"trait" => identity.traits,
Expand All @@ -68,33 +63,32 @@ pub async fn list_project_controller(
let mut metadata = match meta {
Some(mut metadata) => metadata.take(),
None => {
error!("{request_id}: no metadata in this user!");
bail!("{request_id}: no metadata in this user!")
error!("no metadata in this user!");
bail!("no metadata in this user!")
}
};
if let Some(data) = metadata.get_mut("project") {
info!("{request_id}: extracting project from project");
populate_set(&mut projects, data.take(), request_id)?;
info!("extracting project from project");
populate_set(&mut projects, data.take())?;
}
if let Some(group) = metadata.get_mut("group") {
info!("{request_id}: extracting project from group");
extract_projects(&mut projects, group.take(), request_id)?;
info!("extracting project from group");
extract_projects(&mut projects, group.take())?;
}
if let Some(orga) = metadata.get_mut("organisation") {
info!("{request_id}: extracting project from orga");
extract_projects(&mut projects, orga.take(), request_id)?;
info!("extracting project from orga");
extract_projects(&mut projects, orga.take())?;
}
Ok(projects)
}

pub async fn list_controller(
request_id: &str,
identity: Identity,
data_type: &str,
config: SiriusConfig,
config: &SiriusConfig,
) -> Result<HashMap<String, String>> {
let mut projects = HashMap::new();
let meta = match &config.mode as &str {
let meta = match &config.opa.mode as &str {
"admin" => &identity.metadata_admin,
"public" => &identity.metadata_public,
"trait" => &identity.traits,
Expand All @@ -104,23 +98,21 @@ pub async fn list_controller(
let metadata = match meta {
Some(metadata) => metadata,
None => {
error!("{request_id}: no metadata in this user!");
bail!("{request_id}: no metadata in this user!")
error!("no metadata in this user!");
bail!("no metadata in this user!")
}
};
if let Some(data) = metadata.get(data_type) {
info!("{request_id}: estracting: {data_type}");
info!("estracting: {data_type}");
let data = data
.as_object()
.ok_or_else(|| anyhow!("this should be a map!"))?;
if !data.is_empty() {
for (uuid, map) in data.iter() {
let val = map
.get("name")
.ok_or_else(|| anyhow!("{request_id}: no name found !"))?;
let val = map.get("name").ok_or_else(|| anyhow!("no name found !"))?;
let name = val
.as_str()
.ok_or_else(|| anyhow!("{request_id}: this should be a string!"))?;
.ok_or_else(|| anyhow!("this should be a string!"))?;
projects.insert(uuid.to_owned(), name.to_owned());
}
}
Expand Down
Loading

0 comments on commit 13c6bb1

Please sign in to comment.