From 3556c6013f694045836ec39eb49b5368bf4abfe2 Mon Sep 17 00:00:00 2001 From: huanchao Date: Sat, 18 May 2024 13:54:07 +0000 Subject: [PATCH] [Agent] Supports configure L7 log blacklist --- agent/src/common/l7_protocol_info.rs | 4 + agent/src/config/config.rs | 19 +- agent/src/config/handler.rs | 189 +++++++++++++++- agent/src/flow_generator/protocol_logs/dns.rs | 59 ++++- .../flow_generator/protocol_logs/fastcgi.rs | 57 ++++- .../src/flow_generator/protocol_logs/http.rs | 132 ++++++++---- .../flow_generator/protocol_logs/mq/amqp.rs | 115 +++++++--- .../flow_generator/protocol_logs/mq/kafka.rs | 136 ++++++++---- .../flow_generator/protocol_logs/mq/mqtt.rs | 152 ++++++++----- .../flow_generator/protocol_logs/mq/nats.rs | 67 ++++-- .../protocol_logs/mq/openwire.rs | 65 ++++-- .../flow_generator/protocol_logs/mq/pulsar.rs | 61 ++++-- .../flow_generator/protocol_logs/mq/zmtp.rs | 64 ++++-- .../flow_generator/protocol_logs/parser.rs | 2 +- .../flow_generator/protocol_logs/rpc/brpc.rs | 73 +++++-- .../flow_generator/protocol_logs/rpc/dubbo.rs | 58 ++++- .../protocol_logs/rpc/sofa_rpc.rs | 48 ++++- .../flow_generator/protocol_logs/sql/mongo.rs | 63 ++++-- .../flow_generator/protocol_logs/sql/mysql.rs | 45 +++- .../protocol_logs/sql/oracle.rs | 69 +++--- .../protocol_logs/sql/postgresql.rs | 120 ++++++----- .../flow_generator/protocol_logs/sql/redis.rs | 50 ++++- agent/src/flow_generator/protocol_logs/tls.rs | 63 ++++-- agent/src/utils/environment/linux.rs | 2 +- server/agent_config/config.go | 204 +++++++++--------- server/agent_config/example.yaml | 25 +++ 26 files changed, 1436 insertions(+), 506 deletions(-) diff --git a/agent/src/common/l7_protocol_info.rs b/agent/src/common/l7_protocol_info.rs index 9ceb216976a..32e49b8458c 100644 --- a/agent/src/common/l7_protocol_info.rs +++ b/agent/src/common/l7_protocol_info.rs @@ -421,6 +421,10 @@ pub trait L7ProtocolInfoInterface: Into { fn get_request_resource_length(&self) -> usize { 0 } + + fn is_on_blacklist(&self) -> bool { + false + } } impl L7ProtocolInfo { diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index a0ca305cf31..7c4b3661bf6 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -490,6 +490,14 @@ pub struct L7ProtocolAdvancedFeatures { pub http_endpoint_extraction: HttpEndpointExtraction, pub obfuscate_enabled_protocols: Vec, pub extra_log_fields: ExtraLogFields, + pub unconcerned_dns_nxdomain_response_suffixes: Vec, +} + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq)] +pub struct L7LogBlacklist { + pub field_name: String, + pub operator: String, + pub value: String, } #[derive(Clone, Copy, Default, Debug, Deserialize, PartialEq, Eq)] @@ -577,6 +585,7 @@ pub struct YamlConfig { #[serde(rename = "l7-protocol-ports")] // hashmap pub l7_protocol_ports: HashMap, + pub l7_log_blacklist: HashMap>, pub npb_port: u16, // process and socket scan config pub os_proc_root: String, @@ -781,10 +790,11 @@ impl YamlConfig { { c.ebpf.java_symbol_file_refresh_defer_interval = Duration::from_secs(600) } - c.ebpf.off_cpu_profile.min_block = c.ebpf.off_cpu_profile.min_block.clamp( - Duration::from_micros(0), - Duration::from_micros(3600000000), - ); + c.ebpf.off_cpu_profile.min_block = c + .ebpf + .off_cpu_profile + .min_block + .clamp(Duration::from_micros(0), Duration::from_micros(3600000000)); if c.guard_interval < Duration::from_secs(1) || c.guard_interval > Duration::from_secs(3600) { @@ -979,6 +989,7 @@ impl Default for YamlConfig { (String::from("DNS"), String::from(Self::DEFAULT_DNS_PORTS)), (String::from("TLS"), String::from(Self::DEFAULT_TLS_PORTS)), ]), + l7_log_blacklist: HashMap::new(), ebpf: EbpfYamlConfig::default(), npb_port: NPB_DEFAULT_PORT, os_proc_root: "/proc".into(), diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index acc1f6d43de..a344cd10eb8 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -43,7 +43,7 @@ use sysinfo::SystemExt; use sysinfo::{CpuRefreshKind, RefreshKind, System}; use tokio::runtime::Runtime; -use super::config::{ExtraLogFields, OracleParseConfig}; +use super::config::{ExtraLogFields, L7LogBlacklist, OracleParseConfig}; #[cfg(any(target_os = "linux", target_os = "android"))] use super::{ config::EbpfYamlConfig, OsProcRegexp, OS_PROC_REGEXP_MATCH_ACTION_ACCEPT, @@ -84,6 +84,7 @@ use crate::{ }; use public::bitmap::Bitmap; +use public::l7_protocol::L7Protocol; use public::proto::{ common::TridentType, trident::{self, CaptureSocketType, Exception, IfMacSource, SocketType, TapMode}, @@ -692,6 +693,165 @@ impl From<&HttpEndpointExtraction> for HttpEndpointTrie { } } +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Operator { + Equal, + Prefix, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct BlacklistTrieNode { + children: HashMap>, + operator: Option, +} + +impl BlacklistTrieNode { + pub fn is_on_blacklist(&self, input: &str) -> bool { + if input.is_empty() { + return false; + } + let mut node = self; + for c in input.chars() { + node = match node.children.get(&c) { + Some(child) => child, + None => return false, + }; + if let Some(op) = &node.operator { + if op == &Operator::Prefix { + return true; + } + } + } + // If we've reached the end of the input and the last node has an operator, + // it must be because we matched a complete word, not a prefix. + if let Some(o) = node.operator { + o == Operator::Equal + } else { + false + } + } +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct BlacklistTrie { + pub endpoint: BlacklistTrieNode, + pub request_type: BlacklistTrieNode, + pub request_domain: BlacklistTrieNode, + pub request_resource: BlacklistTrieNode, +} + +impl BlacklistTrie { + // Currently, the following field names are supported: + const ENDPOINT: &'static str = "endpoint"; + const REQUEST_TYPE: &'static str = "request_type"; + const REQUEST_DOMAIN: &'static str = "request_domain"; + const REQUEST_RESOURCE: &'static str = "request_resource"; + + // Currently, the following matching operations are supported: + const EQUAL: &'static str = "equal"; + const PREFIX: &'static str = "prefix"; + + pub fn new(blacklists: &Vec) -> Option { + if blacklists.is_empty() { + return None; + } + + let mut b = BlacklistTrie::default(); + for i in blacklists.iter() { + b.insert(i); + } + Some(b) + } + + pub fn insert(&mut self, rule: &L7LogBlacklist) { + let mut node = match rule.field_name.to_ascii_lowercase().as_str() { + Self::ENDPOINT => &mut self.endpoint, + Self::REQUEST_TYPE => &mut self.request_type, + Self::REQUEST_DOMAIN => &mut self.request_domain, + Self::REQUEST_RESOURCE => &mut self.request_resource, + _ => { + warn!("Unsupported field_name: {}, only supports endpoint, request_type, request_domain, request_resource.", rule.field_name.as_str()); + return; + } + }; + + let operator = match rule.operator.to_ascii_lowercase().as_str() { + Self::EQUAL => Operator::Equal, + Self::PREFIX => Operator::Prefix, + _ => { + warn!( + "Unsupported operator: {}, only supports equal, prefix.", + rule.operator.as_str() + ); + return; + } + }; + + for ch in rule.value.chars() { + node = node + .children + .entry(ch) + .or_insert_with(|| Box::new(BlacklistTrieNode::default())); + } + node.operator = Some(operator); + } +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +struct DnsNxdomainTrieNode { + children: HashMap>, + unconcerned: bool, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct DnsNxdomainTrie { + root: DnsNxdomainTrieNode, +} + +impl DnsNxdomainTrie { + pub fn insert(&mut self, rule: &String) { + let mut node = &mut self.root; + // the reversal is because what is matched is the suffix of the domain name + for ch in rule.chars().rev() { + node = node + .children + .entry(ch) + .or_insert_with(|| Box::new(DnsNxdomainTrieNode::default())); + } + node.unconcerned = true; + } + + pub fn is_unconcerned(&self, input: &str) -> bool { + if input.is_empty() { + return false; + } + let mut node = &self.root; + // the reversal is because what is matched is the suffix of the domain name + for c in input.chars().rev() { + match node.children.get(&c) { + Some(child) => { + if child.unconcerned { + return true; + } + node = child.as_ref(); + } + None => { + break; + } + } + } + false + } +} + +impl From<&Vec> for DnsNxdomainTrie { + fn from(v: &Vec) -> Self { + let mut t = Self::default(); + v.iter().for_each(|r| t.insert(r)); + t + } +} + #[derive(Clone, PartialEq, Eq)] pub struct LogParserConfig { pub l7_log_collect_nps_threshold: u64, @@ -702,6 +862,8 @@ pub struct LogParserConfig { pub http_endpoint_disabled: bool, pub http_endpoint_trie: HttpEndpointTrie, pub obfuscate_enabled_protocols: L7ProtocolBitmap, + pub l7_log_blacklist_trie: HashMap, + pub unconcerned_dns_nx_domain_trie: DnsNxdomainTrie, } impl Default for LogParserConfig { @@ -715,6 +877,8 @@ impl Default for LogParserConfig { http_endpoint_disabled: false, http_endpoint_trie: HttpEndpointTrie::new(), obfuscate_enabled_protocols: L7ProtocolBitmap::default(), + l7_log_blacklist_trie: HashMap::new(), + unconcerned_dns_nx_domain_trie: DnsNxdomainTrie::default(), } } } @@ -750,6 +914,11 @@ impl fmt::Debug for LogParserConfig { }) .collect::>(), ) + .field("l7_log_blacklist_trie", &self.l7_log_blacklist_trie) + .field( + "unconcerned_dns_nx_domain_trie", + &self.unconcerned_dns_nx_domain_trie, + ) .finish() } } @@ -1459,6 +1628,24 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig { .l7_protocol_advanced_features .obfuscate_enabled_protocols, ), + l7_log_blacklist_trie: { + let mut blacklist_trie = HashMap::new(); + for (k, v) in conf.yaml_config.l7_log_blacklist.iter() { + let l7_protocol = L7Protocol::from(k.to_string()); + if l7_protocol == L7Protocol::Unknown { + warn!("Unsupported l7_protocol: {:?}", k); + continue; + } + BlacklistTrie::new(v).map(|x| blacklist_trie.insert(l7_protocol, x)); + } + blacklist_trie + }, + unconcerned_dns_nx_domain_trie: DnsNxdomainTrie::from( + &conf + .yaml_config + .l7_protocol_advanced_features + .unconcerned_dns_nxdomain_response_suffixes, + ), }, debug: DebugConfig { vtap_id: conf.vtap_id as u16, diff --git a/agent/src/flow_generator/protocol_logs/dns.rs b/agent/src/flow_generator/protocol_logs/dns.rs index 82a0895e04c..fa30e35f8f5 100644 --- a/agent/src/flow_generator/protocol_logs/dns.rs +++ b/agent/src/flow_generator/protocol_logs/dns.rs @@ -20,6 +20,7 @@ use super::pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response}; use super::{consts::*, value_is_default, AppProtoHead, L7ResponseStatus, LogMessageType}; use crate::common::flow::L7PerfStats; use crate::common::l7_protocol_log::L7ParseResult; +use crate::config::handler::LogParserConfig; use crate::{ common::{ enums::IpProtocol, @@ -65,6 +66,9 @@ pub struct DnsInfo { #[serde(skip)] is_tls: bool, rrt: u64, + + #[serde(skip)] + is_on_blacklist: bool, } impl L7ProtocolInfoInterface for DnsInfo { @@ -104,6 +108,9 @@ impl L7ProtocolInfoInterface for DnsInfo { fn get_request_resource_length(&self) -> usize { self.query_name.len() } + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl DnsInfo { @@ -122,6 +129,9 @@ impl DnsInfo { } } self.captured_response_byte = other.captured_response_byte; + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } } fn is_query_address(&self) -> bool { @@ -144,6 +154,15 @@ impl DnsInfo { _ => "", } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::DNS) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.query_name) + || t.request_type.is_on_blacklist(self.get_domain_str()) + || t.request_domain.is_on_blacklist(&self.query_name) + || t.endpoint.is_on_blacklist(&self.query_name); + } + } } impl From for L7ProtocolSendLog { @@ -189,6 +208,7 @@ impl From for L7ProtocolSendLog { #[derive(Default)] pub struct DnsLog { perf_stats: Option, + last_is_on_blacklist: bool, } //解析器接口实现 @@ -206,11 +226,27 @@ impl L7ProtocolParserInterface for DnsLog { fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result { let mut info = DnsInfo::default(); self.parse(payload, &mut info, param)?; - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); info.is_tls = param.is_tls(); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + if info.query_type == DNS_RESPONSE { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.status == L7ResponseStatus::ClientError { + self.perf_stats.as_mut().map(|p| p.inc_req_err()); + } else if info.status == L7ResponseStatus::ServerError { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + } else { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::DnsInfo(info))) } else { @@ -444,10 +480,8 @@ impl DnsLog { if status_code == 0 { info.status = L7ResponseStatus::Ok; } else if status_code == 1 || status_code == 3 { - self.perf_stats.as_mut().map(|p| p.inc_req_err()); info.status = L7ResponseStatus::ClientError; } else { - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); info.status = L7ResponseStatus::ServerError; } } @@ -487,11 +521,16 @@ impl DnsLog { g_offset = self.decode_resource_record(payload, g_offset, info)?; } - self.perf_stats.as_mut().map(|p| p.inc_resp()); - self.set_status(code, info); + let mut is_unconcerned = false; + if let Some(config) = param.parse_config { + is_unconcerned = config + .unconcerned_dns_nx_domain_trie + .is_unconcerned(&info.answers); + } + if !is_unconcerned { + self.set_status(code, info); + } info.msg_type = LogMessageType::Response; - } else { - self.perf_stats.as_mut().map(|p| p.inc_req()); } set_captured_byte!(info, param); diff --git a/agent/src/flow_generator/protocol_logs/fastcgi.rs b/agent/src/flow_generator/protocol_logs/fastcgi.rs index 03ebc4984c2..854e07d1ad3 100644 --- a/agent/src/flow_generator/protocol_logs/fastcgi.rs +++ b/agent/src/flow_generator/protocol_logs/fastcgi.rs @@ -23,7 +23,7 @@ use crate::common::flow::{L7PerfStats, PacketDirection}; use crate::common::l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface}; use crate::common::l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}; use crate::common::meta_packet::EbpfFlags; -use crate::config::handler::L7LogDynamicConfig; +use crate::config::handler::{L7LogDynamicConfig, LogParserConfig}; use crate::flow_generator::protocol_logs::{set_captured_byte, value_is_default}; use crate::flow_generator::{Error, Result}; use crate::HttpLog; @@ -101,6 +101,9 @@ pub struct FastCGIInfo { #[serde(skip)] seq_off: u32, + + #[serde(skip)] + is_on_blacklist: bool, } impl L7ProtocolInfoInterface for FastCGIInfo { @@ -115,6 +118,9 @@ impl L7ProtocolInfoInterface for FastCGIInfo { self.captured_response_byte = info.captured_response_byte; super::swap_if!(self, trace_id, is_empty, info); super::swap_if!(self, span_id, is_empty, info); + if info.is_on_blacklist { + self.is_on_blacklist = info.is_on_blacklist; + } } Ok(()) @@ -147,6 +153,10 @@ impl L7ProtocolInfoInterface for FastCGIInfo { fn get_request_resource_length(&self) -> usize { self.path.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl FastCGIInfo { @@ -246,6 +256,19 @@ impl FastCGIInfo { Ok(()) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::FastCGI) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.path) + || t.request_type.is_on_blacklist(&self.method) + || t.request_domain.is_on_blacklist(&self.host) + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { @@ -326,6 +349,7 @@ impl FastCGIRecord { #[derive(Default)] pub struct FastCGILog { perf_stats: Option, + last_is_on_blacklist: bool, } impl FastCGILog { @@ -334,12 +358,10 @@ impl FastCGILog { && status_code <= HTTP_STATUS_CLIENT_ERROR_MAX { // http客户端请求存在错误 - self.perf_stats.as_mut().map(|p| p.inc_req_err()); info.status = L7ResponseStatus::ClientError; } else if status_code >= HTTP_STATUS_SERVER_ERROR_MIN && status_code <= HTTP_STATUS_SERVER_ERROR_MAX { - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); info.status = L7ResponseStatus::ServerError; } else { info.status = L7ResponseStatus::Ok; @@ -442,7 +464,6 @@ impl L7ProtocolParserInterface for FastCGILog { if info.method.is_empty() { return Err(Error::L7ProtocolUnknown); } - self.perf_stats.as_mut().map(|p| p.inc_req()); } PacketDirection::ServerToClient => { info.msg_type = LogMessageType::Response; @@ -500,15 +521,33 @@ impl L7ProtocolParserInterface for FastCGILog { if info.status_code.is_none() { return Err(Error::L7ProtocolUnknown); } - self.perf_stats.as_mut().map(|p| p.inc_resp()); } } - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); info.is_tls = param.is_tls(); set_captured_byte!(info, param); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.status == L7ResponseStatus::ClientError { + self.perf_stats.as_mut().map(|p| p.inc_req_err()); + } else if info.status == L7ResponseStatus::ServerError { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; Ok(L7ParseResult::Single(L7ProtocolInfo::FastCGIInfo(info))) } diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index 443bfbe12c5..01641acf907 100755 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -218,6 +218,9 @@ pub struct HttpInfo { #[serde(skip)] attributes: Vec, + + #[serde(skip)] + is_on_blacklist: bool, } impl HttpInfo { @@ -334,12 +337,19 @@ impl L7ProtocolInfoInterface for HttpInfo { fn get_request_resource_length(&self) -> usize { self.path.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl HttpInfo { pub fn merge(&mut self, other: &mut Self) -> Result<()> { let other_is_grpc = other.is_grpc(); + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } match other.msg_type { // merge with request LogMessageType::Request => { @@ -431,6 +441,19 @@ impl HttpInfo { } None } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&self.proto) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.path) + || t.request_type.is_on_blacklist(self.method.as_str()) + || t.request_domain.is_on_blacklist(&self.host) + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { @@ -519,7 +542,7 @@ impl From for L7ProtocolSendLog { #[derive(Default)] pub struct HttpLog { proto: L7Protocol, - is_tls: bool, + last_is_on_blacklist: bool, perf_stats: Option, http2_req_decoder: Option>, http2_resp_decoder: Option>, @@ -574,7 +597,6 @@ impl L7ProtocolParserInterface for HttpLog { let mut info = HttpInfo::default(); info.proto = self.proto; info.is_tls = param.is_tls(); - self.is_tls = param.is_tls(); if self.perf_stats.is_none() && param.parse_perf { self.perf_stats = Some(L7PerfStats::default()) @@ -602,14 +624,68 @@ impl L7ProtocolParserInterface for HttpLog { } _ => unreachable!(), } - if param.parse_log { - // In uprobe mode, headers are reported in a way different from other modes: - // one payload contains one header. - // Calling wasm plugin on every payload would be wasted effort, - // in this condition the call to the wasm plugin will be skipped. - if param.ebpf_type != EbpfType::GoHttp2Uprobe { - self.wasm_hook(param, payload, &mut info); + // In uprobe mode, headers are reported in a way different from other modes: + // one payload contains one header. + // Calling wasm plugin on every payload would be wasted effort, + // in this condition the call to the wasm plugin will be skipped. + if param.ebpf_type != EbpfType::GoHttp2Uprobe { + self.wasm_hook(param, payload, &mut info); + } + info.set_is_on_blacklist(config); + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match self.proto { + L7Protocol::Http1 => { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.set_status(info.status_code, &mut info); + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + L7Protocol::Http2 | L7Protocol::Grpc => match param.ebpf_type { + EbpfType::GoHttp2Uprobe => { + if info.is_req_end { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + if info.is_resp_end { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + + info.cal_rrt_for_multi_merge_log(param).map(|rrt| { + info.rrt = rrt; + }); + + if info.is_req_end || info.is_resp_end { + self.perf_stats.as_mut().map(|p| p.update_rrt(info.rrt)); + } + } + _ => { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + }, + _ => unreachable!(), } + } + self.last_is_on_blacklist = info.is_on_blacklist; + if param.parse_log { if matches!(self.proto, L7Protocol::Http1 | L7Protocol::Http2) && !config.http_endpoint_disabled && info.path.len() > 0 @@ -648,6 +724,7 @@ impl L7ProtocolParserInterface for HttpLog { }, _ => unreachable!(), }; + new_log.last_is_on_blacklist = self.last_is_on_blacklist; new_log.perf_stats = self.perf_stats.take(); new_log.http2_req_decoder = self.http2_req_decoder.take(); new_log.http2_resp_decoder = self.http2_resp_decoder.take(); @@ -789,21 +866,6 @@ impl HttpLog { info: &mut HttpInfo, ) -> Result<()> { self.check_http2_go_uprobe(config, payload, param, info)?; - - if info.is_req_end { - self.perf_stats.as_mut().map(|p| p.inc_req()); - } - if info.is_resp_end { - self.perf_stats.as_mut().map(|p| p.inc_resp()); - } - - info.cal_rrt_for_multi_merge_log(param).map(|rrt| { - info.rrt = rrt; - }); - - if info.is_req_end || info.is_resp_end { - self.perf_stats.as_mut().map(|p| p.update_rrt(info.rrt)); - } Ok(()) } @@ -844,9 +906,6 @@ impl HttpLog { info.status_code = status_code; info.msg_type = LogMessageType::Response; - - self.perf_stats.as_mut().map(|p| p.inc_resp()); - self.set_status(status_code, info); } else { // HTTP请求行:GET /background.png HTTP/1.0 let Ok((method, path, version)) = get_http_request_info(first_line) else { @@ -858,17 +917,8 @@ impl HttpLog { info.version = get_http_request_version(version)?; info.msg_type = LogMessageType::Request; - self.perf_stats.as_mut().map(|p| p.inc_req()); } - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); - - if !param.parse_log { - return Ok(()); - } let mut content_length: Option = None; for body_line in headers { let col_index = body_line.find(':'); @@ -1078,16 +1128,6 @@ impl HttpLog { info: &mut HttpInfo, ) -> Result<()> { self.check_http_v2(payload, param, info)?; - - if param.direction == PacketDirection::ClientToServer { - self.perf_stats.as_mut().map(|p| p.inc_req()); - } else { - self.perf_stats.as_mut().map(|p| p.inc_resp()); - } - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); set_captured_byte!(info, param); Ok(()) } diff --git a/agent/src/flow_generator/protocol_logs/mq/amqp.rs b/agent/src/flow_generator/protocol_logs/mq/amqp.rs index 9410734f5e3..69ded8f0471 100644 --- a/agent/src/flow_generator/protocol_logs/mq/amqp.rs +++ b/agent/src/flow_generator/protocol_logs/mq/amqp.rs @@ -28,6 +28,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::Result, protocol_logs::{ @@ -292,6 +293,13 @@ pub struct AmqpInfo { req_len: Option, resp_len: Option, resp_code: Option, + + #[serde(skip)] + req_type: Option, + #[serde(skip)] + endpoint: Option, + #[serde(skip)] + is_on_blacklist: bool, } fn slice_to_string(slice: &[u8]) -> String { @@ -682,13 +690,33 @@ impl AmqpInfo { }; Some(slice_to_string(routing_key)) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::AMQP) { + self.is_on_blacklist = self + .req_type + .as_ref() + .map(|p| t.request_type.is_on_blacklist(p)) + .unwrap_or_default() + || self + .vhost + .as_ref() + .map(|p| t.request_domain.is_on_blacklist(p)) + .unwrap_or_default() + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p) || t.request_resource.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } #[derive(Default)] pub struct AmqpLog { perf_stats: Option, - vhost: Option, + last_is_on_blacklist: bool, } impl From for L7ProtocolSendLog { @@ -697,18 +725,6 @@ impl From for L7ProtocolSendLog { true => EbpfFlags::TLS.bits(), false => EbpfFlags::NONE.bits(), }; - let req_type = info.get_packet_type(); - let type1_len = info.exchange.as_ref().map_or(0, |r| r.len()) - + info.routing_key.as_ref().map_or(0, |r| r.len()); - let endpoint = if type1_len > 0 { - format!( - "{}.{}", - info.exchange.unwrap_or_default(), - info.routing_key.unwrap_or_default() - ) - } else { - info.queue.unwrap_or_default() - }; let log = L7ProtocolSendLog { captured_request_byte: info.captured_request_byte, captured_response_byte: info.captured_response_byte, @@ -717,10 +733,10 @@ impl From for L7ProtocolSendLog { req_len: info.req_len, resp_len: info.resp_len, req: L7Request { - req_type, + req_type: info.req_type.unwrap_or_default(), domain: info.vhost.unwrap_or_default(), - resource: endpoint.clone(), - endpoint: endpoint.clone(), + resource: info.endpoint.clone().unwrap_or_default(), + endpoint: info.endpoint.unwrap_or_default(), ..Default::default() }, resp: L7Response { @@ -764,6 +780,15 @@ impl L7ProtocolInfoInterface for AmqpInfo { if req.exchange.as_ref().map_or(0, |r| r.len()) == 0 { req.exchange = rsp.exchange.clone(); } + if req.req_type.is_none() { + req.resp_code = rsp.raw_method_id.map(|x| x as i32); + } + if req.endpoint.is_none() { + req.resp_code = rsp.raw_method_id.map(|x| x as i32); + } + if rsp.is_on_blacklist { + req.is_on_blacklist = rsp.is_on_blacklist; + } req.captured_response_byte = rsp.captured_response_byte; } Ok(()) @@ -780,6 +805,27 @@ impl L7ProtocolInfoInterface for AmqpInfo { fn get_request_domain(&self) -> String { self.vhost.clone().unwrap_or_default() } + + fn get_endpoint(&self) -> Option { + let (exchange, exchange_len) = self + .exchange + .as_ref() + .map(|r| (r.as_str(), r.len())) + .unwrap_or(("", 0)); + let (routing_key, routing_key_len) = self + .routing_key + .as_ref() + .map(|r| (r.as_str(), r.len())) + .unwrap_or(("", 0)); + if exchange_len + routing_key_len == 0 { + return self.queue.clone(); + } + Some(format!("{}.{}", exchange, routing_key)) + } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl L7ProtocolParserInterface for AmqpLog { @@ -806,6 +852,10 @@ impl L7ProtocolParserInterface for AmqpLog { info.is_protcol_header = true; info.is_tls = param.is_tls(); info.msg_type = info.get_log_message_type(); + if info.msg_type == LogMessageType::Request { + info.req_type = Some(info.get_packet_type()); + info.endpoint = info.get_endpoint(); + } vec.push(L7ProtocolInfo::AmqpInfo(info)); } loop { @@ -896,7 +946,11 @@ impl L7ProtocolParserInterface for AmqpLog { info.msg_type = info.get_log_message_type(); match info.msg_type { - LogMessageType::Request => info.req_len = Some((offset - offset_begin) as u32), + LogMessageType::Request => { + info.req_len = Some((offset - offset_begin) as u32); + info.req_type = Some(info.get_packet_type()); + info.endpoint = info.get_endpoint(); + } LogMessageType::Response => info.resp_len = Some((offset - offset_begin) as u32), _ => {} } @@ -904,23 +958,27 @@ impl L7ProtocolParserInterface for AmqpLog { } for info in &mut vec { if let L7ProtocolInfo::AmqpInfo(info) = info { - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } info.is_tls = param.is_tls(); set_captured_byte!(info, param); - match info.msg_type { - LogMessageType::Request => { + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + if info.msg_type == LogMessageType::Request { self.perf_stats.as_mut().map(|p| p.inc_req()); - } - LogMessageType::Response => { + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); + } else if info.msg_type == LogMessageType::Response { self.perf_stats.as_mut().map(|p| p.inc_resp()); + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } - _ => {} } + self.last_is_on_blacklist = info.is_on_blacklist; } } if !param.parse_log { @@ -936,6 +994,7 @@ impl L7ProtocolParserInterface for AmqpLog { fn reset(&mut self) { let mut s = Self::default(); + s.last_is_on_blacklist = self.last_is_on_blacklist; s.vhost = self.vhost.take(); s.perf_stats = self.perf_stats.take(); *self = s; diff --git a/agent/src/flow_generator/protocol_logs/mq/kafka.rs b/agent/src/flow_generator/protocol_logs/mq/kafka.rs index f9b3b3462ff..7c406ee473e 100644 --- a/agent/src/flow_generator/protocol_logs/mq/kafka.rs +++ b/agent/src/flow_generator/protocol_logs/mq/kafka.rs @@ -27,7 +27,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, - config::handler::TraceType, + config::handler::{LogParserConfig, TraceType}, flow_generator::{ error::{Error, Result}, protocol_logs::{ @@ -36,8 +36,8 @@ use crate::{ pb_adapter::{ ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, TraceInfo, }, - set_captured_byte, value_is_default, value_is_negative, AppProtoHead, L7ResponseStatus, - LogMessageType, + set_captured_byte, swap_if, value_is_default, value_is_negative, AppProtoHead, + L7ResponseStatus, LogMessageType, }, }, utils::bytes::{read_i16_be, read_i32_be, read_i64_be, read_u16_be, read_u32_be}, @@ -90,6 +90,14 @@ pub struct KafkaInfo { captured_response_byte: u32, rrt: u64, + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + resource: Option, + #[serde(skip)] + endpoint: Option, + #[serde(skip)] + command: Option, } impl L7ProtocolInfoInterface for KafkaInfo { @@ -120,25 +128,27 @@ impl L7ProtocolInfoInterface for KafkaInfo { } fn get_endpoint(&self) -> Option { - if self.topic_name.is_empty() { + if self.topic_name.is_empty() || self.partition < 0 { None } else { - Some(self.topic_name.clone()) + Some(format!("{}-{}", self.topic_name, self.partition)) } } fn get_request_resource_length(&self) -> usize { self.topic_name.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl KafkaInfo { // https://kafka.apache.org/protocol.html const API_KEY_MAX: u16 = 67; pub fn merge(&mut self, other: &mut Self) { - if self.resp_msg_size.is_none() { - self.resp_msg_size = other.resp_msg_size; - } + swap_if!(self, resp_msg_size, is_none, other); if other.status != L7ResponseStatus::default() { self.status = other.status; } @@ -153,7 +163,13 @@ impl KafkaInfo { } self.msg_type = LogMessageType::Session; self.captured_response_byte = other.captured_response_byte; - crate::flow_generator::protocol_logs::swap_if!(self, topic_name, is_empty, other); + swap_if!(self, topic_name, is_empty, other); + swap_if!(self, resource, is_none, other); + swap_if!(self, endpoint, is_none, other); + swap_if!(self, command, is_none, other); + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } } pub fn check(&self) -> bool { @@ -163,7 +179,7 @@ impl KafkaInfo { return self.client_id.len() > 0 && self.client_id.is_ascii(); } - pub fn get_command(&self) -> &'static str { + pub fn get_command(&self) -> Option { let command_str = [ "Produce", "Fetch", @@ -231,40 +247,40 @@ impl KafkaInfo { "AllocateProducerIds", ]; match self.api_key { - 0..=58 => command_str[self.api_key as usize], - _ => "", + 0..=58 => Some(command_str[self.api_key as usize].to_string()), + _ => None, + } + } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Kafka) { + self.is_on_blacklist = self + .command + .as_ref() + .map(|p| t.request_type.is_on_blacklist(p)) + .unwrap_or_default() + || self + .resource + .as_ref() + .map(|p| t.request_resource.is_on_blacklist(p)) + .unwrap_or_default() + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default() + || t.request_domain.is_on_blacklist(&self.topic_name); } } } impl From for L7ProtocolSendLog { fn from(f: KafkaInfo) -> Self { - let command_str = f.get_command(); let flags = if f.is_tls { EbpfFlags::TLS.bits() } else { EbpfFlags::NONE.bits() }; - let (resource, endpoint) = match (f.api_key, f.msg_type) { - (KAFKA_FETCH, LogMessageType::Request) | (KAFKA_FETCH, LogMessageType::Session) - if !f.topic_name.is_empty() => - { - ( - format!("{}-{}:{}", f.topic_name, f.partition, f.offset), - format!("{}-{}", f.topic_name, f.partition), - ) - } - (KAFKA_PRODUCE, LogMessageType::Response) - | (KAFKA_PRODUCE, LogMessageType::Session) - if !f.topic_name.is_empty() => - { - ( - format!("{}-{}:{}", f.topic_name, f.partition, f.offset), - format!("{}-{}", f.topic_name, f.partition), - ) - } - _ => (String::new(), String::new()), - }; let mut attributes = vec![]; if !f.group_id.is_empty() { attributes.push(KeyVal { @@ -278,9 +294,9 @@ impl From for L7ProtocolSendLog { req_len: f.req_msg_size, resp_len: f.resp_msg_size, req: L7Request { - req_type: String::from(command_str), - resource, - endpoint, + req_type: f.command.unwrap_or_default(), + resource: f.resource.unwrap_or_default(), + endpoint: f.endpoint.unwrap_or_default(), domain: f.topic_name, ..Default::default() }, @@ -324,6 +340,7 @@ impl From for L7ProtocolSendLog { pub struct KafkaLog { perf_stats: Option, sessions: LruCache, + last_is_on_blacklist: bool, } impl Default for KafkaLog { @@ -331,6 +348,7 @@ impl Default for KafkaLog { Self { perf_stats: None, sessions: LruCache::new(NonZeroUsize::new(Self::MAX_SESSION_PER_FLOW).unwrap()), + last_is_on_blacklist: false, } } } @@ -356,11 +374,47 @@ impl L7ProtocolParserInterface for KafkaLog { let mut info = KafkaInfo::default(); Self::parse(self, payload, param.l4_protocol, param.direction, &mut info)?; info.is_tls = param.is_tls(); - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); set_captured_byte!(info, param); + info.resource = match (info.api_key, info.msg_type) { + (KAFKA_FETCH, LogMessageType::Request) | (KAFKA_FETCH, LogMessageType::Session) + if !info.topic_name.is_empty() => + { + Some(format!( + "{}-{}:{}", + info.topic_name, info.partition, info.offset + )) + } + (KAFKA_PRODUCE, LogMessageType::Response) + | (KAFKA_PRODUCE, LogMessageType::Session) + if !info.topic_name.is_empty() => + { + Some(format!( + "{}-{}:{}", + info.topic_name, info.partition, info.offset + )) + } + _ => None, + }; + info.command = info.get_command(); + info.endpoint = info.get_endpoint(); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::KafkaInfo(info))) } else { @@ -1493,14 +1547,12 @@ impl KafkaLog { return Err(Error::KafkaLogParseFailed); } self.request(payload, false, info)?; - self.perf_stats.as_mut().map(|p| p.inc_req()); } PacketDirection::ServerToClient => { if payload.len() < KAFKA_RESP_HEADER_LEN { return Err(Error::KafkaLogParseFailed); } self.response(payload, info)?; - self.perf_stats.as_mut().map(|p| p.inc_resp()); } } Ok(()) diff --git a/agent/src/flow_generator/protocol_logs/mq/mqtt.rs b/agent/src/flow_generator/protocol_logs/mq/mqtt.rs index bb0326124eb..99d6fd34054 100644 --- a/agent/src/flow_generator/protocol_logs/mq/mqtt.rs +++ b/agent/src/flow_generator/protocol_logs/mq/mqtt.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use std::fmt; +use std::fmt::{self, Write}; use log::{debug, warn}; use nom::{ @@ -34,12 +34,13 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::{Error, Result}, protocol_logs::{ pb_adapter::{L7ProtocolSendLog, L7Request, L7Response}, - set_captured_byte, value_is_default, value_is_negative, AppProtoHead, L7ResponseStatus, - LogMessageType, + set_captured_byte, swap_if, value_is_default, value_is_negative, AppProtoHead, + L7ResponseStatus, LogMessageType, }, }, }; @@ -77,6 +78,11 @@ pub struct MqttInfo { captured_response_byte: u32, rrt: u64, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + endpoint: Option, } impl L7ProtocolInfoInterface for MqttInfo { @@ -104,16 +110,39 @@ impl L7ProtocolInfoInterface for MqttInfo { } fn get_endpoint(&self) -> Option { - let endpoint = self.get_endpoint(); - if endpoint.is_empty() { - return None; + match self.pkt_type { + PacketKind::Publish { .. } => { + if let Some(t) = &self.publish_topic { + Some(t.clone()) + } else { + None + } + } + PacketKind::Unsubscribe | PacketKind::Subscribe => { + if let Some(s) = &self.subscribe_topics { + let mut topic_str = String::new(); + for i in s { + let _ = write!(&mut topic_str, "{},", i.name); + } + if !topic_str.is_empty() { + topic_str.pop(); + } + Some(topic_str) + } else { + None + } + } + _ => None, } - Some(endpoint) } fn get_request_domain(&self) -> String { self.client_id.clone().unwrap_or_default() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } pub fn topics_format(t: &Option>, serializer: S) -> Result @@ -142,35 +171,13 @@ impl Default for MqttInfo { is_tls: false, captured_request_byte: 0, captured_response_byte: 0, + is_on_blacklist: false, + endpoint: None, } } } impl MqttInfo { - fn get_endpoint(&self) -> String { - let mut topic_str = String::new(); - match self.pkt_type { - PacketKind::Publish { .. } => { - if let Some(t) = &self.publish_topic { - return t.clone(); - } - } - PacketKind::Unsubscribe | PacketKind::Subscribe => { - if let Some(s) = &self.subscribe_topics { - for i in s { - topic_str.push_str(format!("{},", i.name).as_str()); - } - if !topic_str.is_empty() { - topic_str.pop(); - } - return topic_str; - } - } - _ => {} - }; - return topic_str; - } - pub fn merge(&mut self, other: &mut Self) { if self.res_msg_size.is_none() { self.res_msg_size = other.res_msg_size; @@ -191,6 +198,10 @@ impl MqttInfo { } _ => (), } + swap_if!(self, endpoint, is_none, other); + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } } pub fn get_version_str(&self) -> &'static str { @@ -201,12 +212,27 @@ impl MqttInfo { _ => "", } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::MQTT) { + self.is_on_blacklist = t.request_type.is_on_blacklist(self.pkt_type.as_str()) + || self + .client_id + .as_ref() + .map(|p: &String| t.request_domain.is_on_blacklist(p)) + .unwrap_or_default() + || self + .endpoint + .as_ref() + .map(|p| t.request_resource.is_on_blacklist(p) || t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { fn from(f: MqttInfo) -> Self { let version = Some(String::from(f.get_version_str())); - let topic_str = f.get_endpoint(); let flags = if f.is_tls { EbpfFlags::TLS.bits() } else { @@ -216,14 +242,14 @@ impl From for L7ProtocolSendLog { L7ProtocolSendLog { captured_request_byte: f.captured_request_byte, captured_response_byte: f.captured_response_byte, - version: version, + version, req_len: f.req_msg_size, resp_len: f.res_msg_size, req: L7Request { req_type: f.pkt_type.to_string(), domain: f.client_id.unwrap_or_default(), - resource: topic_str.clone(), - endpoint: topic_str, + resource: f.endpoint.clone().unwrap_or_default(), + endpoint: f.endpoint.unwrap_or_default(), ..Default::default() }, resp: L7Response { @@ -242,8 +268,8 @@ pub struct MqttLog { msg_type: LogMessageType, status: L7ResponseStatus, version: u8, - perf_stats: Option, + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for MqttLog { @@ -263,25 +289,30 @@ impl L7ProtocolParserInterface for MqttLog { for info in infos.iter_mut() { if let L7ProtocolInfo::MqttInfo(info) = info { - if self.msg_type != LogMessageType::Session { - // FIXME due to mqtt not parse and handle packet identity correctly, the rrt is incorrect now. - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); - } - info.msg_type = self.msg_type; info.is_tls = param.is_tls(); set_captured_byte!(info, param); - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + if self.msg_type != LogMessageType::Session { + // FIXME due to mqtt not parse and handle packet identity correctly, the rrt is incorrect now. + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); } } + self.last_is_on_blacklist = info.is_on_blacklist; } else { unreachable!() } @@ -303,6 +334,7 @@ impl L7ProtocolParserInterface for MqttLog { fn reset(&mut self) { let mut s = Self::default(); + s.last_is_on_blacklist = self.last_is_on_blacklist; s.version = self.version; s.perf_stats = self.perf_stats.take(); *self = s; @@ -448,6 +480,7 @@ impl MqttLog { } info.status = self.status; + info.endpoint = info.get_endpoint(); if parse_log { infos.push(L7ProtocolInfo::MqttInfo(info)); } @@ -582,6 +615,27 @@ impl Default for PacketKind { } } +impl PacketKind { + fn as_str(&self) -> &'static str { + match self { + Self::Connect => "CONNECT", + Self::Connack => "CONNACK", + Self::Publish { .. } => "PUBLISH", + Self::Puback => "PUBACK", + Self::Pubrec => "PUBREC", + Self::Pubrel => "PUBREL", + Self::Pubcomp => "PUBCOMP", + Self::Subscribe => "SUBSCRIBE", + Self::Suback => "SUBACK", + Self::Unsubscribe => "UNSUBSCRIBE", + Self::Unsuback => "UNSUBACK", + Self::Pingreq => "PINGREQ", + Self::Pingresp => "PINGRESP", + Self::Disconnect => "DISCONNECT", + } + } +} + #[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum QualityOfService { AtMostOnce = 0, diff --git a/agent/src/flow_generator/protocol_logs/mq/nats.rs b/agent/src/flow_generator/protocol_logs/mq/nats.rs index 24c8092fab9..cd6ae8473f1 100644 --- a/agent/src/flow_generator/protocol_logs/mq/nats.rs +++ b/agent/src/flow_generator/protocol_logs/mq/nats.rs @@ -34,7 +34,7 @@ use crate::{ pb_adapter::{ ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, TraceInfo, }, - set_captured_byte, AppProtoHead, LogMessageType, + set_captured_byte, swap_if, AppProtoHead, LogMessageType, }, }, plugin::wasm::{wasm_plugin::NatsMessage as WasmNatsMessage, WasmData}, @@ -221,14 +221,19 @@ pub struct NatsInfo { captured_request_byte: u32, captured_response_byte: u32, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + endpoint: Option, } #[derive(Default)] pub struct NatsLog { perf_stats: Option, - version: String, server_name: String, + last_is_on_blacklist: bool, } fn slice_split(slice: &[u8], n: usize) -> Option<(&[u8], &[u8])> { @@ -688,6 +693,22 @@ impl NatsInfo { } (trace_id, span_id) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::NATS) { + self.is_on_blacklist = t.request_type.is_on_blacklist(self.get_name()) + || t.request_domain.is_on_blacklist(&self.server_name) + || self + .get_subject() + .map(|p| t.request_resource.is_on_blacklist(p)) + .unwrap_or_default() + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl Default for NatsMessage { @@ -709,7 +730,6 @@ impl From for L7ProtocolSendLog { .get_subject() .map(|x| x.to_string()) .unwrap_or_default(); - let endpoint = info.get_endpoint().unwrap_or_default(); let log = L7ProtocolSendLog { captured_request_byte: info.captured_request_byte, captured_response_byte: info.captured_response_byte, @@ -721,7 +741,7 @@ impl From for L7ProtocolSendLog { req_type: name.to_string(), domain: info.server_name, resource: subject, - endpoint, + endpoint: info.endpoint.unwrap_or_default(), ..Default::default() }, resp: L7Response { @@ -770,6 +790,10 @@ impl L7ProtocolInfoInterface for NatsInfo { req.resp_len = rsp.resp_len; } req.captured_response_byte = rsp.captured_response_byte; + swap_if!(req, endpoint, is_none, rsp); + if rsp.is_on_blacklist { + req.is_on_blacklist = rsp.is_on_blacklist; + } } Ok(()) } @@ -785,6 +809,10 @@ impl L7ProtocolInfoInterface for NatsInfo { fn get_request_domain(&self) -> String { self.server_name.clone() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl NatsLog { @@ -872,27 +900,33 @@ impl L7ProtocolParserInterface for NatsLog { for info in &mut vec { if let L7ProtocolInfo::NatsInfo(info) = info { - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } - info.is_tls = param.is_tls(); info.version = self.version.clone(); info.server_name = self.server_name.clone(); self.wasm_hook(param, payload, info); set_captured_byte!(info, param); - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + info.endpoint = info.get_endpoint(); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } } + self.last_is_on_blacklist = info.is_on_blacklist; } } @@ -921,6 +955,7 @@ impl L7ProtocolParserInterface for NatsLog { fn reset(&mut self) { let mut s = Self::default(); + s.last_is_on_blacklist = self.last_is_on_blacklist; s.version = self.version.clone(); s.server_name = self.server_name.clone(); s.perf_stats = self.perf_stats.take(); diff --git a/agent/src/flow_generator/protocol_logs/mq/openwire.rs b/agent/src/flow_generator/protocol_logs/mq/openwire.rs index ec5be3f7340..26dd9b97612 100644 --- a/agent/src/flow_generator/protocol_logs/mq/openwire.rs +++ b/agent/src/flow_generator/protocol_logs/mq/openwire.rs @@ -11,7 +11,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, - config::handler::{L7LogDynamicConfig, TraceType}, + config::handler::{L7LogDynamicConfig, LogParserConfig, TraceType}, flow_generator::{ error::{Error, Result}, protocol_logs::{ @@ -148,6 +148,13 @@ macro_rules! all_openwire_commands { } } } + impl OpenWireCommand { + fn as_str(&self) -> &'static str { + match self { + $(OpenWireCommand::$variant => stringify!($variant)),* + } + } + } }; } @@ -1653,6 +1660,8 @@ pub struct OpenWireInfo { captured_response_byte: u32, rtt: u64, + #[serde(skip)] + is_on_blacklist: bool, } impl Default for OpenWireInfo { @@ -1683,6 +1692,7 @@ impl Default for OpenWireInfo { rtt: 0, captured_request_byte: 0, captured_response_byte: 0, + is_on_blacklist: false, } } } @@ -1711,6 +1721,25 @@ impl OpenWireInfo { self.err_msg = res.err_msg.clone(); } self.captured_response_byte = res.captured_response_byte; + if res.is_on_blacklist { + self.is_on_blacklist = res.is_on_blacklist; + } + } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::OpenWire) { + self.is_on_blacklist = t.request_type.is_on_blacklist(self.command_type.as_str()) + || self + .topic + .as_ref() + .map(|p| t.request_resource.is_on_blacklist(p) || t.endpoint.is_on_blacklist(p)) + .unwrap_or_default() + || self + .broker_url + .as_ref() + .map(|p| t.request_domain.is_on_blacklist(p)) + .unwrap_or_default(); + } } } @@ -1745,6 +1774,9 @@ impl L7ProtocolInfoInterface for OpenWireInfo { fn get_endpoint(&self) -> Option { self.topic.clone() } + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl From for L7ProtocolSendLog { @@ -1828,6 +1860,7 @@ pub struct OpenWireLog { client_next_skip_len: Option, server_next_skip_len: Option, perf_stats: Option, + last_is_on_blacklist: bool, } impl Default for OpenWireLog { @@ -1844,6 +1877,7 @@ impl Default for OpenWireLog { client_next_skip_len: None, server_next_skip_len: None, perf_stats: None, + last_is_on_blacklist: false, } } } @@ -1864,22 +1898,27 @@ impl L7ProtocolParserInterface for OpenWireLog { L7ProtocolInfo::OpenWireInfo(info) => info, _ => return, }; - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } set_captured_byte!(info, param); - - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } } + self.last_is_on_blacklist = info.is_on_blacklist; }); if !param.parse_log { diff --git a/agent/src/flow_generator/protocol_logs/mq/pulsar.rs b/agent/src/flow_generator/protocol_logs/mq/pulsar.rs index 430d7e94f45..309d477830e 100644 --- a/agent/src/flow_generator/protocol_logs/mq/pulsar.rs +++ b/agent/src/flow_generator/protocol_logs/mq/pulsar.rs @@ -16,6 +16,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::Result, protocol_logs::{ @@ -127,6 +128,9 @@ pub struct PulsarInfo { captured_request_byte: u32, captured_response_byte: u32, + + #[serde(skip)] + is_on_blacklist: bool, } pub struct PulsarLog { @@ -137,6 +141,8 @@ pub struct PulsarLog { producer_topic: PulsarTopicMap, consumer_topic: PulsarTopicMap, + + last_is_on_blacklist: bool, } impl Default for PulsarLog { @@ -147,6 +153,7 @@ impl Default for PulsarLog { domain: None, producer_topic: PulsarTopicMap::new(), consumer_topic: PulsarTopicMap::new(), + last_is_on_blacklist: false, } } } @@ -725,6 +732,24 @@ impl PulsarInfo { } Some((payload, info)) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Pulsar) { + self.is_on_blacklist = t + .request_type + .is_on_blacklist(self.command.r#type().as_str_name()) + || self + .topic + .as_ref() + .map(|p| t.request_resource.is_on_blacklist(p) || t.endpoint.is_on_blacklist(p)) + .unwrap_or_default() + || self + .domain + .as_ref() + .map(|p| t.request_domain.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { @@ -788,6 +813,9 @@ impl L7ProtocolInfoInterface for PulsarInfo { req.resp_exception = rsp.resp_exception.clone(); } req.captured_response_byte = rsp.captured_response_byte; + if rsp.is_on_blacklist { + req.is_on_blacklist = rsp.is_on_blacklist; + } } Ok(()) } @@ -807,6 +835,10 @@ impl L7ProtocolInfoInterface for PulsarInfo { fn get_request_domain(&self) -> String { self.domain.clone().unwrap_or_default() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl L7ProtocolParserInterface for PulsarLog { @@ -845,23 +877,28 @@ impl L7ProtocolParserInterface for PulsarLog { for info in &mut vec { if let L7ProtocolInfo::PulsarInfo(info) = info { - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } - info.is_tls = param.is_tls(); set_captured_byte!(info, param); - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } } + self.last_is_on_blacklist = info.is_on_blacklist; } } diff --git a/agent/src/flow_generator/protocol_logs/mq/zmtp.rs b/agent/src/flow_generator/protocol_logs/mq/zmtp.rs index 24759af6598..d82f09e94aa 100644 --- a/agent/src/flow_generator/protocol_logs/mq/zmtp.rs +++ b/agent/src/flow_generator/protocol_logs/mq/zmtp.rs @@ -5,6 +5,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::{Error, Result}, protocol_logs::{ @@ -86,6 +87,17 @@ impl Default for FrameType { } } +impl FrameType { + pub fn as_str(&self) -> &'static str { + match self { + FrameType::Greeting => "Greeting", + FrameType::Command => "Command", + FrameType::Message => "Message", + FrameType::Unknown => "Unknown", + } + } +} + #[derive(Serialize, Clone, Debug, Default)] pub struct ZmtpInfo { msg_type: LogMessageType, @@ -113,6 +125,9 @@ pub struct ZmtpInfo { attributes: Vec, #[serde(skip)] l7_protocol_str: Option, + + #[serde(skip)] + is_on_blacklist: bool, } impl ZmtpInfo { @@ -132,6 +147,9 @@ impl ZmtpInfo { self.err_msg = res.err_msg.take(); } self.captured_response_byte = res.captured_response_byte; + if res.is_on_blacklist { + self.is_on_blacklist = res.is_on_blacklist; + } } fn wasm_hook(&mut self, param: &ParseParam, payload: &[u8]) { let mut vm_ref = param.wasm_vm.borrow_mut(); @@ -157,6 +175,19 @@ impl ZmtpInfo { } } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::ZMTP) { + self.is_on_blacklist = t.request_type.is_on_blacklist(self.frame_type.as_str()) + || self + .subscription + .as_ref() + .map(|p| { + t.request_domain.is_on_blacklist(p) || t.request_resource.is_on_blacklist(p) + }) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { @@ -224,6 +255,9 @@ impl L7ProtocolInfoInterface for ZmtpInfo { fn get_request_domain(&self) -> String { self.subscription.clone().unwrap_or_default() } + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } #[derive(Default)] @@ -235,6 +269,7 @@ pub struct ZmtpLog { mechanism: Option, perf_stats: Option, + last_is_on_blacklist: bool, } fn parse_byte(payload: &[u8]) -> Option<(&[u8], u8)> { @@ -597,21 +632,26 @@ impl L7ProtocolParserInterface for ZmtpLog { L7ProtocolInfo::ZmtpInfo(info) => info, _ => return, }; - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); set_captured_byte!(info, param); - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); - } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + info.wasm_hook(param, payload); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } - - info.wasm_hook(param, payload); + self.last_is_on_blacklist = info.is_on_blacklist; }); if !param.parse_log { diff --git a/agent/src/flow_generator/protocol_logs/parser.rs b/agent/src/flow_generator/protocol_logs/parser.rs index 0b45bca6a86..4188c7412eb 100644 --- a/agent/src/flow_generator/protocol_logs/parser.rs +++ b/agent/src/flow_generator/protocol_logs/parser.rs @@ -778,7 +778,7 @@ impl SessionQueue { } fn send(&mut self, item: Box) { - if item.l7_info.skip_send() { + if item.l7_info.skip_send() || item.l7_info.is_on_blacklist() { return; } diff --git a/agent/src/flow_generator/protocol_logs/rpc/brpc.rs b/agent/src/flow_generator/protocol_logs/rpc/brpc.rs index e53cf109dea..d06cc3792d4 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/brpc.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/brpc.rs @@ -11,11 +11,12 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::Result, protocol_logs::{ pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response, TraceInfo}, - set_captured_byte, AppProtoHead, L7ResponseStatus, LogMessageType, + set_captured_byte, swap_if, AppProtoHead, L7ResponseStatus, LogMessageType, }, }, utils::bytes::read_u32_be, @@ -49,11 +50,17 @@ pub struct BrpcInfo { captured_request_byte: u32, captured_response_byte: u32, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + endpoint: Option, } #[derive(Default)] pub struct BrpcLog { perf_stats: Option, + last_is_on_blacklist: bool, } impl BrpcInfo { @@ -79,6 +86,7 @@ impl BrpcInfo { info.req_method_name = Some(req.method_name); info.req_log_id = req.log_id; info.req_len = Some(body_size as u32 + 12); + info.endpoint = info.get_endpoint(); info.msg_type = LogMessageType::Request; } else if let Some(resp) = meta.response { info.resp_code = resp.error_code; @@ -130,6 +138,26 @@ impl BrpcInfo { */ self.correlation_id.map(|x| (x >> 32) as u32) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Brpc) { + self.is_on_blacklist = self + .req_method_name + .as_ref() + .map(|p| t.request_type.is_on_blacklist(p)) + .unwrap_or_default() + || self + .req_service_name + .as_ref() + .map(|p| t.request_resource.is_on_blacklist(p)) + .unwrap_or_default() + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl From for L7ProtocolSendLog { @@ -139,8 +167,6 @@ impl From for L7ProtocolSendLog { false => EbpfFlags::NONE.bits(), }; - let endpoint = info.get_endpoint(); - let request_id = info.get_request_id(); let log = L7ProtocolSendLog { @@ -152,7 +178,7 @@ impl From for L7ProtocolSendLog { req: L7Request { req_type: info.req_method_name.unwrap_or_default(), resource: info.req_service_name.unwrap_or_default(), - endpoint: endpoint.unwrap_or_default(), + endpoint: info.endpoint.unwrap_or_default(), ..Default::default() }, resp: L7Response { @@ -194,6 +220,10 @@ impl L7ProtocolInfoInterface for BrpcInfo { if req.resp_exception.is_none() { req.resp_exception = rsp.resp_exception.clone(); } + if rsp.is_on_blacklist { + req.is_on_blacklist = rsp.is_on_blacklist; + } + swap_if!(req, endpoint, is_none, rsp); } Ok(()) } @@ -214,6 +244,10 @@ impl L7ProtocolInfoInterface for BrpcInfo { ) .into() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl L7ProtocolParserInterface for BrpcLog { @@ -245,23 +279,29 @@ impl L7ProtocolParserInterface for BrpcLog { for info in &mut vec { if let L7ProtocolInfo::BrpcInfo(info) = info { - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rtt| { - info.rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); - }); - } - info.is_tls = param.is_tls(); set_captured_byte!(info, param); - match param.direction { - PacketDirection::ClientToServer => { - self.perf_stats.as_mut().map(|p| p.inc_req()); + + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } } - PacketDirection::ServerToClient => { - self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rtt| { + info.rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rtt)); + }); } } + self.last_is_on_blacklist = info.is_on_blacklist; } } @@ -290,6 +330,7 @@ impl L7ProtocolParserInterface for BrpcLog { fn reset(&mut self) { let mut s = Self::default(); + s.last_is_on_blacklist = self.last_is_on_blacklist; s.perf_stats = self.perf_stats.take(); *self = s; } diff --git a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs index a8a04579c4d..9fa27ea8edf 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs @@ -24,7 +24,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, - config::handler::{L7LogDynamicConfig, TraceType}, + config::handler::{L7LogDynamicConfig, LogParserConfig, TraceType}, flow_generator::{ error::{Error, Result}, protocol_logs::{ @@ -112,6 +112,11 @@ pub struct DubboInfo { #[serde(skip)] attributes: Vec, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + endpoint: Option, } impl DubboInfo { @@ -146,6 +151,9 @@ impl DubboInfo { if other.captured_response_byte > 0 { self.captured_response_byte = other.captured_response_byte; } + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } } fn set_trace_id(&mut self, trace_id: String, trace_type: &TraceType) { @@ -243,6 +251,19 @@ impl DubboInfo { self.attributes.extend(custom.attributes); } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Dubbo) { + self.is_on_blacklist =t.request_resource.is_on_blacklist(&self.service_name) + || t.request_type.is_on_blacklist(&self.method_name) + || t.request_domain.is_on_blacklist(&self.service_name) + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl L7ProtocolInfoInterface for DubboInfo { @@ -284,11 +305,14 @@ impl L7ProtocolInfoInterface for DubboInfo { fn get_request_resource_length(&self) -> usize { self.method_name.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl From for L7ProtocolSendLog { fn from(f: DubboInfo) -> Self { - let endpoint = format!("{}/{}", f.service_name, f.method_name); let serial_id_attr = KeyVal { key: "serialization_id".into(), // reference https://github.com/apache/dubbo/blob/3.2/dubbo-serialization/dubbo-serialization-api/src/main/java/org/apache/dubbo/common/serialize/Constants.java @@ -334,7 +358,7 @@ impl From for L7ProtocolSendLog { req: L7Request { resource: f.service_name.clone(), req_type: f.method_name.clone(), - endpoint, + endpoint: f.endpoint.unwrap_or_default(), domain: f.service_name.clone(), }, resp: L7Response { @@ -363,6 +387,7 @@ impl From for L7ProtocolSendLog { #[derive(Default)] pub struct DubboLog { perf_stats: Option, + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for DubboLog { @@ -393,13 +418,28 @@ impl L7ProtocolParserInterface for DubboLog { let mut info = DubboInfo::default(); self.parse(&config.l7_log_dynamic, payload, &mut info, param)?; info.is_tls = param.is_tls(); - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); set_captured_byte!(info, param); + info.endpoint = info.get_endpoint(); + self.wasm_hook(param, payload, &mut info); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { - self.wasm_hook(param, payload, &mut info); Ok(L7ParseResult::Single(L7ProtocolInfo::DubboInfo(info))) } else { Ok(L7ParseResult::None) @@ -803,11 +843,9 @@ impl DubboLog { match direction { PacketDirection::ClientToServer => { self.request(&config, payload, &dubbo_header, info); - self.perf_stats.as_mut().map(|p| p.inc_req()); } PacketDirection::ServerToClient => { self.response(&dubbo_header, info); - self.perf_stats.as_mut().map(|p| p.inc_resp()); } } Ok(()) diff --git a/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs b/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs index d979ee5f803..6536fdc6f5b 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/sofa_rpc.rs @@ -29,10 +29,11 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ protocol_logs::{ pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response, TraceInfo}, - set_captured_byte, L7ResponseStatus, + set_captured_byte, swap_if, L7ResponseStatus, }, AppProtoHead, Error, HttpLog, LogMessageType, Result, }, @@ -186,6 +187,11 @@ pub struct SofaRpcInfo { resp_code: u16, status: L7ResponseStatus, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + endpoint: Option, } impl SofaRpcInfo { @@ -201,6 +207,18 @@ impl SofaRpcInfo { self.parent_span_id = ctx.parent_span_id; } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::SofaRPC) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.target_serv) + || t.request_type.is_on_blacklist(&self.method) + || self + .endpoint + .as_ref() + .map(|p| t.endpoint.is_on_blacklist(p)) + .unwrap_or_default(); + } + } } impl L7ProtocolInfoInterface for SofaRpcInfo { @@ -214,6 +232,10 @@ impl L7ProtocolInfoInterface for SofaRpcInfo { self.resp_code = s.resp_code; self.status = s.status; self.captured_response_byte = s.captured_response_byte; + swap_if!(self, endpoint, is_none, s); + if s.is_on_blacklist { + self.is_on_blacklist = s.is_on_blacklist; + } } Ok(()) } @@ -237,6 +259,10 @@ impl L7ProtocolInfoInterface for SofaRpcInfo { None } } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl From for L7ProtocolSendLog { @@ -254,7 +280,7 @@ impl From for L7ProtocolSendLog { req: L7Request { req_type: s.method.clone(), resource: s.target_serv.clone(), - endpoint: format!("{}/{}", s.target_serv.clone(), s.method), + endpoint: s.endpoint.unwrap_or_default(), ..Default::default() }, resp: L7Response { @@ -279,15 +305,10 @@ impl From for L7ProtocolSendLog { } } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct SofaRpcLog { perf_stats: Option, -} - -impl Default for SofaRpcLog { - fn default() -> Self { - Self { perf_stats: None } - } + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for SofaRpcLog { @@ -305,9 +326,16 @@ impl L7ProtocolParserInterface for SofaRpcLog { if !ok { return Ok(L7ParseResult::None); } - self.cal_perf(param, &mut info); + info.endpoint = info.get_endpoint(); info.is_tls = param.is_tls(); set_captured_byte!(info, param); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + self.cal_perf(param, &mut info); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::SofaRpcInfo(info))) } else { diff --git a/agent/src/flow_generator/protocol_logs/sql/mongo.rs b/agent/src/flow_generator/protocol_logs/sql/mongo.rs index 9d316c06c72..99cf1ae3e61 100644 --- a/agent/src/flow_generator/protocol_logs/sql/mongo.rs +++ b/agent/src/flow_generator/protocol_logs/sql/mongo.rs @@ -22,6 +22,7 @@ use serde::Serialize; use super::super::{AppProtoHead, LogMessageType}; use crate::common::flow::L7PerfStats; use crate::common::l7_protocol_log::L7ParseResult; +use crate::config::handler::LogParserConfig; use crate::flow_generator::protocol_logs::set_captured_byte; use crate::{ common::{ @@ -77,6 +78,11 @@ pub struct MongoDBInfo { captured_response_byte: u32, rrt: u64, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + reply_false: bool, } impl L7ProtocolInfoInterface for MongoDBInfo { @@ -106,11 +112,18 @@ impl L7ProtocolInfoInterface for MongoDBInfo { fn get_request_resource_length(&self) -> usize { self.request.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } // 协议文档: https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ impl MongoDBInfo { fn merge(&mut self, other: &mut Self) { + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } match other.msg_type { LogMessageType::Request => { self.req_len = other.req_len; @@ -132,6 +145,13 @@ impl MongoDBInfo { _ => {} } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::MongoDB) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.request) + || t.request_type.is_on_blacklist(&self.op_code_name); + } + } } impl From for L7ProtocolSendLog { @@ -147,7 +167,7 @@ impl From for L7ProtocolSendLog { req_len: std::option::Option::::from(f.req_len), req: L7Request { req_type: f.op_code_name, - resource: f.request.to_string(), + resource: f.request, ..Default::default() }, resp_len: std::option::Option::::from(f.resp_len), @@ -173,6 +193,7 @@ impl From for L7ProtocolSendLog { pub struct MongoDBLog { info: MongoDBInfo, perf_stats: Option, + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for MongoDBLog { @@ -200,12 +221,33 @@ impl L7ProtocolParserInterface for MongoDBLog { }; self.parse(payload, param.l4_protocol, param.direction, &mut info)?; - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); info.is_tls = param.is_tls(); set_captured_byte!(info, param); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match info.msg_type { + LogMessageType::Request => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + LogMessageType::Response => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.response_code > 0 { + self.perf_stats.as_mut().map(|p| p.inc_req_err()); + } + } + _ => {} + } + if info.reply_false { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::MongoDBInfo(info))) } else { @@ -273,15 +315,11 @@ impl MongoDBLog { info.msg_type = LogMessageType::Request; self.info.req_len = header.length; info.request_id = header.request_id; - self.perf_stats - .as_mut() - .map(|p: &mut L7PerfStats| p.inc_req()); } else { info.msg_type = LogMessageType::Response; self.info.resp_len = header.length; info.request_id = header.response_to; info.response_id = header.request_id; - self.perf_stats.as_mut().map(|p| p.inc_resp()); } // command decode @@ -307,9 +345,6 @@ impl MongoDBLog { msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); } info.response_code = msg_body.sections.doc.get_i32("code").unwrap_or(0); - if info.response_code > 0 { - self.perf_stats.as_mut().map(|p| p.inc_req_err()); - } } _ => { info.request = msg_body.sections.doc.to_string(); @@ -320,9 +355,7 @@ impl MongoDBLog { // "OP_REPLY" let mut msg_body = MongoOpReply::default(); msg_body.decode(&payload[_HEADER_SIZE..])?; - if !msg_body.reply_ok { - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); - } + info.reply_false = !msg_body.reply_ok; info.response = msg_body.doc.to_string(); info.exception = msg_body.response_msg; } diff --git a/agent/src/flow_generator/protocol_logs/sql/mysql.rs b/agent/src/flow_generator/protocol_logs/sql/mysql.rs index 540f73411d1..c6e568f336f 100644 --- a/agent/src/flow_generator/protocol_logs/sql/mysql.rs +++ b/agent/src/flow_generator/protocol_logs/sql/mysql.rs @@ -93,6 +93,9 @@ pub struct MysqlInfo { trace_id: Option, span_id: Option, + + #[serde(skip)] + is_on_blacklist: bool, } impl L7ProtocolInfoInterface for MysqlInfo { @@ -122,6 +125,10 @@ impl L7ProtocolInfoInterface for MysqlInfo { fn get_request_resource_length(&self) -> usize { self.context.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl MysqlInfo { @@ -129,6 +136,9 @@ impl MysqlInfo { if self.protocol_version == 0 { self.protocol_version = other.protocol_version } + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } match other.msg_type { LogMessageType::Request => { self.command = other.command; @@ -267,6 +277,13 @@ impl MysqlInfo { self.statement_id = read_u32_le(payload) } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::MySQL) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.context) + || t.request_type.is_on_blacklist(self.get_command_str()); + } + } } impl From for L7ProtocolSendLog { @@ -338,6 +355,8 @@ pub struct MysqlLog { // This field is extracted in the COM_STMT_PREPARE request and calculate based on SQL statements parameter_counter: u32, has_request: bool, + + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for MysqlLog { @@ -366,12 +385,26 @@ impl L7ProtocolParserInterface for MysqlLog { return Ok(L7ParseResult::None); } set_captured_byte!(info, param); - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::MysqlInfo(info))) } else { @@ -600,7 +633,6 @@ impl MysqlLog { COM_PING => {} _ => return Err(Error::MysqlLogParseFailed), } - self.perf_stats.as_mut().map(|p| p.inc_req()); Ok(msg_type) } @@ -678,7 +710,6 @@ impl MysqlLog { } _ => (), } - self.perf_stats.as_mut().map(|p| p.inc_resp()); Ok(()) } diff --git a/agent/src/flow_generator/protocol_logs/sql/oracle.rs b/agent/src/flow_generator/protocol_logs/sql/oracle.rs index ce9e458af72..9f662848088 100644 --- a/agent/src/flow_generator/protocol_logs/sql/oracle.rs +++ b/agent/src/flow_generator/protocol_logs/sql/oracle.rs @@ -17,6 +17,7 @@ use serde::Serialize; use super::super::{value_is_default, LogMessageType}; +use crate::config::handler::LogParserConfig; use crate::flow_generator::protocol_logs::{set_captured_byte, L7ResponseStatus}; use crate::flow_generator::Error; use crate::{ @@ -62,6 +63,9 @@ pub struct OracleInfo { captured_response_byte: u32, pub rrt: u64, + + #[serde(skip)] + is_on_blacklist: bool, } impl OracleInfo { pub fn merge(&mut self, other: &mut Self) { @@ -70,6 +74,9 @@ impl OracleInfo { std::mem::swap(&mut self.error_message, &mut other.error_message); self.status = other.status; self.captured_response_byte = other.captured_response_byte; + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } } fn get_req_type(&self) -> String { @@ -89,6 +96,13 @@ impl OracleInfo { _ => "".to_string(), } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Oracle) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.sql) + || t.request_type.is_on_blacklist(&self.get_req_type()); + } + } } impl L7ProtocolInfoInterface for OracleInfo { @@ -118,6 +132,10 @@ impl L7ProtocolInfoInterface for OracleInfo { fn get_request_resource_length(&self) -> usize { self.sql.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl From for L7ProtocolSendLog { @@ -143,18 +161,11 @@ impl From for L7ProtocolSendLog { } } +#[derive(Default)] pub struct OracleLog { perf_stats: Option, parser: OracleParser, -} - -impl Default for OracleLog { - fn default() -> Self { - Self { - parser: OracleParser::default(), - perf_stats: None, - } - } + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for OracleLog { @@ -179,11 +190,6 @@ impl L7ProtocolParserInterface for OracleLog { self.perf_stats = Some(L7PerfStats::default()) }; - match param.direction { - PacketDirection::ClientToServer => self.perf_stats.as_mut().map(|p| p.inc_req()), - PacketDirection::ServerToClient => self.perf_stats.as_mut().map(|p| p.inc_resp()), - }; - let mut log_info = OracleInfo { msg_type: param.direction.into(), is_tls: false, @@ -201,22 +207,33 @@ impl L7ProtocolParserInterface for OracleLog { rrt: 0, captured_request_byte: 0, captured_response_byte: 0, + is_on_blacklist: false, }; set_captured_byte!(log_info, param); - match log_info.status { - L7ResponseStatus::ServerError => { - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); - } - L7ResponseStatus::ClientError => { - self.perf_stats.as_mut().map(|p| p.inc_req_err()); + + if let Some(config) = param.parse_config { + log_info.set_is_on_blacklist(config); + } + if !log_info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => self.perf_stats.as_mut().map(|p| p.inc_req()), + PacketDirection::ServerToClient => self.perf_stats.as_mut().map(|p| p.inc_resp()), + }; + match log_info.status { + L7ResponseStatus::ServerError => { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + L7ResponseStatus::ClientError => { + self.perf_stats.as_mut().map(|p| p.inc_req_err()); + } + _ => {} } - _ => {} + log_info.cal_rrt(param).map(|rrt| { + log_info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(log_info.rrt)); + }); } - - log_info.cal_rrt(param).map(|rrt| { - log_info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(log_info.rrt)); - }); + self.last_is_on_blacklist = log_info.is_on_blacklist; Ok(L7ParseResult::Single(L7ProtocolInfo::OracleInfo(log_info))) } diff --git a/agent/src/flow_generator/protocol_logs/sql/postgresql.rs b/agent/src/flow_generator/protocol_logs/sql/postgresql.rs index 3797079d367..9eeab528651 100644 --- a/agent/src/flow_generator/protocol_logs/sql/postgresql.rs +++ b/agent/src/flow_generator/protocol_logs/sql/postgresql.rs @@ -28,6 +28,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ protocol_logs::{ pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response}, @@ -95,6 +96,21 @@ pub struct PostgreInfo { captured_request_byte: u32, captured_response_byte: u32, + + #[serde(skip)] + is_on_blacklist: bool, + #[serde(skip)] + at_lease_one_block: bool, // is at lease one validate block in payload, prevent miscalculate to other protocol +} + +impl PostgreInfo { + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::PostgreSQL) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.context) + || t.request_type + .is_on_blacklist(get_request_str(self.req_type)); + } + } } impl L7ProtocolInfoInterface for PostgreInfo { @@ -104,6 +120,9 @@ impl L7ProtocolInfoInterface for PostgreInfo { fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> { if let L7ProtocolInfo::PostgreInfo(pg) = other { + if pg.is_on_blacklist { + self.is_on_blacklist = pg.is_on_blacklist; + } match pg.msg_type { LogMessageType::Request => { self.req_type = pg.req_type; @@ -178,6 +197,7 @@ impl From for L7ProtocolSendLog { pub struct PostgresqlLog { perf_stats: Option, obfuscate_cache: Option, + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for PostgresqlLog { @@ -189,7 +209,7 @@ impl L7ProtocolParserInterface for PostgresqlLog { return true; } - self.parse(payload, param, true, &mut info).is_ok() + self.parse(payload, &mut info).is_ok() } fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result { @@ -205,8 +225,41 @@ impl L7ProtocolParserInterface for PostgresqlLog { self.perf_stats = Some(L7PerfStats::default()) }; - self.parse(payload, param, false, &mut info)?; + self.parse(payload, &mut info)?; set_captured_byte!(info, param); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist + && !self.last_is_on_blacklist + && !info.ignore + && info.at_lease_one_block + { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + match info.status { + L7ResponseStatus::ClientError => { + self.perf_stats.as_mut().map(|p| p.inc_req_err()); + } + L7ResponseStatus::ServerError => { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + _ => {} + } + if info.at_lease_one_block { + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + } + self.last_is_on_blacklist = info.is_on_blacklist; Ok(if info.ignore || !param.parse_log { L7ParseResult::None } else { @@ -239,16 +292,8 @@ impl PostgresqlLog { } } - fn parse( - &mut self, - payload: &[u8], - param: &ParseParam, - check: bool, - info: &mut PostgreInfo, - ) -> Result<()> { + fn parse(&mut self, payload: &[u8], info: &mut PostgreInfo) -> Result<()> { let mut offset = 0; - // is at lease one validate block in payload, prevent miscalculate to other protocol - let mut at_lease_one_block = false; loop { if offset >= payload.len() { break; @@ -258,29 +303,23 @@ impl PostgresqlLog { offset += len + 5; // len(data) + len 4B + tag 1B let parsed = match info.msg_type { LogMessageType::Request => { - self.on_req_block(tag, &sub_payload[5..5 + len], check, info)? + self.on_req_block(tag, &sub_payload[5..5 + len], info)? } LogMessageType::Response => { - self.on_resp_block(tag, &sub_payload[5..5 + len], check, info)? + self.on_resp_block(tag, &sub_payload[5..5 + len], info)? } _ => unreachable!(), }; - if parsed && !at_lease_one_block { - at_lease_one_block = true; + if parsed && !info.at_lease_one_block { + info.at_lease_one_block = true; } } else { break; } } - if at_lease_one_block { - if !info.ignore && !check { - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); - } + if info.at_lease_one_block { return Ok(()); } Err(Error::L7ProtocolUnknown) @@ -292,13 +331,7 @@ impl PostgresqlLog { && read_u64_be(payload) == SSL_REQ } - fn on_req_block( - &mut self, - tag: char, - data: &[u8], - check: bool, - info: &mut PostgreInfo, - ) -> Result { + fn on_req_block(&mut self, tag: char, data: &[u8], info: &mut PostgreInfo) -> Result { match tag { 'Q' => { info.req_type = tag; @@ -308,9 +341,6 @@ impl PostgresqlLog { String::from_utf8_lossy(&m).to_string() }); info.ignore = false; - if !check { - self.perf_stats.as_mut().map(|p| p.inc_req()); - } Ok(true) } 'P' => { @@ -333,9 +363,6 @@ impl PostgresqlLog { String::from_utf8_lossy(&m).to_string() }); if postgresql { - if !check { - self.perf_stats.as_mut().map(|p| p.inc_req()); - } return Ok(true); } } @@ -347,13 +374,7 @@ impl PostgresqlLog { } } - fn on_resp_block( - &mut self, - tag: char, - data: &[u8], - check: bool, - info: &mut PostgreInfo, - ) -> Result { + fn on_resp_block(&mut self, tag: char, data: &[u8], info: &mut PostgreInfo) -> Result { let mut data = data; match tag { 'C' => { @@ -393,9 +414,6 @@ impl PostgresqlLog { } } - if !check { - self.perf_stats.as_mut().map(|p| p.inc_resp()); - } Ok(true) } 'E' => { @@ -426,18 +444,6 @@ impl PostgresqlLog { let (err_desc, status) = get_code_desc(info.result.as_str()); info.error_message = String::from(err_desc); info.status = status; - if !check { - match info.status { - L7ResponseStatus::ClientError => { - self.perf_stats.as_mut().map(|p| p.inc_req_err()); - } - L7ResponseStatus::ServerError => { - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); - } - _ => {} - } - self.perf_stats.as_mut().map(|p| p.inc_resp()); - } return Ok(true); } Err(Error::L7ProtocolUnknown) diff --git a/agent/src/flow_generator/protocol_logs/sql/redis.rs b/agent/src/flow_generator/protocol_logs/sql/redis.rs index 72f486ebeec..02c11d5c0db 100644 --- a/agent/src/flow_generator/protocol_logs/sql/redis.rs +++ b/agent/src/flow_generator/protocol_logs/sql/redis.rs @@ -31,6 +31,7 @@ use crate::{ l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, meta_packet::EbpfFlags, }, + config::handler::LogParserConfig, flow_generator::{ error::{Error, Result}, protocol_logs::{ @@ -79,6 +80,9 @@ pub struct RedisInfo { captured_response_byte: u32, rrt: u64, + + #[serde(skip)] + is_on_blacklist: bool, } impl L7ProtocolInfoInterface for RedisInfo { @@ -108,6 +112,10 @@ impl L7ProtocolInfoInterface for RedisInfo { fn get_request_resource_length(&self) -> usize { self.request.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } pub fn vec_u8_to_string(v: &Vec, serializer: S) -> Result @@ -123,8 +131,21 @@ impl RedisInfo { std::mem::swap(&mut self.error, &mut other.error); self.resp_status = other.resp_status; self.captured_response_byte = other.captured_response_byte; + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } Ok(()) } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::Redis) { + self.is_on_blacklist = t + .request_resource + .is_on_blacklist(str::from_utf8(&self.request).unwrap_or_default()) + || t.request_type + .is_on_blacklist(str::from_utf8(&self.request_type).unwrap_or_default()); + } + } } impl fmt::Display for RedisInfo { @@ -184,6 +205,7 @@ pub struct RedisLog { has_request: bool, perf_stats: Option, obfuscate: bool, + last_is_on_blacklist: bool, } impl L7ProtocolParserInterface for RedisLog { @@ -211,11 +233,28 @@ impl L7ProtocolParserInterface for RedisLog { param.is_from_ebpf(), &mut info, )?; - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); set_captured_byte!(info, param); + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); + } + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + if info.resp_status == L7ResponseStatus::ServerError { + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); + } + } + } + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::RedisInfo(info))) } else { @@ -250,13 +289,11 @@ impl RedisLog { info.msg_type = LogMessageType::Request; info.request = request.stringify(self.obfuscate); self.has_request = true; - self.perf_stats.as_mut().map(|p| p.inc_req()); } fn fill_response(&mut self, context: Vec, info: &mut RedisInfo) { info.msg_type = LogMessageType::Response; self.has_request = false; - self.perf_stats.as_mut().map(|p| p.inc_resp()); info.resp_status = L7ResponseStatus::Ok; @@ -268,7 +305,6 @@ impl RedisLog { b'-' | b'!' => { info.error = context; info.resp_status = L7ResponseStatus::ServerError; - self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } _ => {} } diff --git a/agent/src/flow_generator/protocol_logs/tls.rs b/agent/src/flow_generator/protocol_logs/tls.rs index 03a53031860..04255a3de1a 100644 --- a/agent/src/flow_generator/protocol_logs/tls.rs +++ b/agent/src/flow_generator/protocol_logs/tls.rs @@ -23,6 +23,7 @@ use super::pb_adapter::{ ExtendedInfo, KeyVal, L7ProtocolSendLog, L7Request, L7Response, MetricKeyVal, }; use super::{set_captured_byte, value_is_default, AppProtoHead, L7ResponseStatus, LogMessageType}; +use crate::config::handler::LogParserConfig; use crate::{ common::{ enums::IpProtocol, @@ -259,6 +260,9 @@ pub struct TlsInfo { rrt: u64, tls_rtt: u64, session_id: Option, + + #[serde(skip)] + is_on_blacklist: bool, } impl L7ProtocolInfoInterface for TlsInfo { @@ -295,10 +299,17 @@ impl L7ProtocolInfoInterface for TlsInfo { fn get_request_resource_length(&self) -> usize { self.request_resource.len() } + + fn is_on_blacklist(&self) -> bool { + self.is_on_blacklist + } } impl TlsInfo { pub fn merge(&mut self, other: &mut Self) { + if other.is_on_blacklist { + self.is_on_blacklist = other.is_on_blacklist; + } match other.msg_type { LogMessageType::Request => { std::mem::swap(&mut self.handshake_protocol, &mut other.handshake_protocol); @@ -335,6 +346,14 @@ impl TlsInfo { _ => {} } } + + fn set_is_on_blacklist(&mut self, config: &LogParserConfig) { + if let Some(t) = config.l7_log_blacklist_trie.get(&L7Protocol::TLS) { + self.is_on_blacklist = t.request_resource.is_on_blacklist(&self.request_resource) + || t.request_type.is_on_blacklist(&self.request_type) + || t.request_domain.is_on_blacklist(&self.request_domain); + } + } } impl From for L7ProtocolSendLog { @@ -448,6 +467,7 @@ impl From for L7ProtocolSendLog { pub struct TlsLog { change_cipher_spec_count: u8, perf_stats: Option, + last_is_on_blacklist: bool, } //解析器接口实现 @@ -468,20 +488,35 @@ impl L7ProtocolParserInterface for TlsLog { fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result { let mut info = TlsInfo::default(); self.parse(payload, &mut info, param)?; - if info.session_id.is_some() { - // Triggered by Client Hello and the last Change cipher spec - info.cal_rrt(param).map(|rtt| { - info.tls_rtt = rtt; - self.perf_stats.as_mut().map(|p| p.update_tls_rtt(rtt)); - }); - info.session_id = None; + + if let Some(config) = param.parse_config { + info.set_is_on_blacklist(config); } - if info.msg_type != LogMessageType::Session { - info.cal_rrt(param).map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); + if !info.is_on_blacklist && !self.last_is_on_blacklist { + match param.direction { + PacketDirection::ClientToServer => { + self.perf_stats.as_mut().map(|p| p.inc_req()); + } + PacketDirection::ServerToClient => { + self.perf_stats.as_mut().map(|p| p.inc_resp()); + } + } + if info.session_id.is_some() { + // Triggered by Client Hello and the last Change cipher spec + info.cal_rrt(param).map(|rtt| { + info.tls_rtt = rtt; + self.perf_stats.as_mut().map(|p| p.update_tls_rtt(rtt)); + }); + info.session_id = None; + } + if info.msg_type != LogMessageType::Session { + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } } + self.last_is_on_blacklist = info.is_on_blacklist; if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::TlsInfo(info))) } else { @@ -582,8 +617,6 @@ impl TlsLog { .collect::>() .join("|") .to_string(); - - self.perf_stats.as_mut().map(|p| p.inc_req()); } PacketDirection::ServerToClient => { info.msg_type = LogMessageType::Response; @@ -646,8 +679,6 @@ impl TlsLog { .collect::>() .join("|") .to_string(); - - self.perf_stats.as_mut().map(|p| p.inc_resp()); } } set_captured_byte!(info, param); diff --git a/agent/src/utils/environment/linux.rs b/agent/src/utils/environment/linux.rs index 1f679272533..daa3a8476d4 100644 --- a/agent/src/utils/environment/linux.rs +++ b/agent/src/utils/environment/linux.rs @@ -330,7 +330,7 @@ pub async fn set_docker_resource_limits( .map(|_| ()) .map_err(|e| { Error::Environment(format!( - "set milli_cpu_limit: {}, set memory_limit: {}bytes, update docker container failed: {:?}", + "set cpu_limit: {}, set memory_limit: {}bytes, update docker container failed: {:?}", milli_cpu_limit as f64 / 1000.0, memory_limit, e )) }) diff --git a/server/agent_config/config.go b/server/agent_config/config.go index 11a6095349a..a2e00948512 100644 --- a/server/agent_config/config.go +++ b/server/agent_config/config.go @@ -98,101 +98,102 @@ type AgentGroupConfig struct { } type StaticConfig struct { - ProxyControllerPort *uint16 `yaml:"proxy-controller-port,omitempty"` - LogLevel *string `yaml:"log-level,omitempty"` - Profiler *bool `yaml:"profiler,omitempty"` - AfpacketBlocksEnabled *bool `yaml:"afpacket-blocks-enabled,omitempty"` - AfpacketBlocks *int `yaml:"afpacket-blocks,omitempty"` - AnalyzerRawPacketBlockSize *int `yaml:"analyzer-raw-packet-block-size,omitempty"` - BatchedBufferSizeLimit *int `yaml:"batched-buffer-size-limit,omitempty"` - EnableDebugStats *bool `yaml:"enable-debug-stats,omitempty"` - AnalyzerDedupDisabled *bool `yaml:"analyzer-dedup-disabled,omitempty"` - DefaultTapType *uint32 `yaml:"default-tap-type,omitempty"` - DebugListenPort *uint16 `yaml:"debug-listen-port,omitempty"` - EnableQosBypass *bool `yaml:"enable-qos-bypass,omitempty"` - FastPathMapSize *int `yaml:"fast-path-map-size,omitempty"` - FirstPathLevel *int `yaml:"first-path-level,omitempty"` - LocalDispatcherCount *int `yaml:"local-dispatcher-count,omitempty"` - SrcInterfaces []string `yaml:"src-interfaces,omitempty"` - CloudGatewayTraffic *bool `yaml:"cloud-gateway-traffic,omitempty"` - MirrorTrafficPcp *uint16 `yaml:"mirror-traffic-pcp,omitempty"` - PCap *PCapConfig `yaml:"pcap,omitempty"` - Flow *FlowGeneratorConfig `yaml:"flow,omitempty"` - FlowQueueSize *int `yaml:"flow-queue-size,omitempty"` - QuadrupleQueueSize *int `yaml:"quadruple-queue-size,omitempty"` - AnalyzerQueueSize *int `yaml:"analyzer-queue-size,omitempty"` - DpdkEnabled *bool `yaml:"dpdk-enabled,omitempty"` - LibpcapEnabled *bool `yaml:"libpcap-enabled,omitempty"` - XflowCollector *XflowCollectorConfig `yaml:"xflow-collector,omitempty"` - NpbPort *uint16 `yaml:"npb-port,omitempty"` - VxlanFlags *uint8 `yaml:"vxlan-flags,omitempty"` - IgnoreOverlayVlan *bool `yaml:"ignore-overlay-vlan,omitempty"` - CollectorSenderQueueSize *int `yaml:"collector-sender-queue-size,omitempty"` - CollectorSenderQueueCount *int `yaml:"collector-sender-queue-count,omitempty"` - ToaSenderQueueSize *int `yaml:"toa-sender-queue-size,omitempty"` - ToaLruCacheSize *int `yaml:"toa-lru-cache-size,omitempty"` - FlowSenderQueueSize *int `yaml:"flow-sender-queue-size,omitempty"` - FlowSenderQueueCount *int `yaml:"flow-sender-queue-count,omitempty"` - SecondFlowExtraDelaySecond *string `yaml:"second-flow-extra-delay-second,omitempty"` - PacketDelay *string `yaml:"packet-delay,omitempty"` - Triple *TripleMapConfig `yaml:"triple,omitempty"` - KubernetesPollerType *string `yaml:"kubernetes-poller-type,omitempty"` - DecapErspan *bool `yaml:"decap-erspan,omitempty"` - AnalyzerIp *string `yaml:"analyzer-ip,omitempty"` - AnalyzerPort *uint16 `yaml:"analyzer-port,omitempty"` - KubernetesNamespace *string `yaml:"kubernetes-namespace,omitempty"` - KubernetesAPIListLimit *uint32 `yaml:"kubernetes-api-list-limit,omitempty"` - KubernetesAPIListInterval *string `yaml:"kubernetes-api-list-interval,omitempty"` - KubernetesResources []KubernetesResourceConfig `yaml:"kubernetes-resources,omitempty"` - IngressFlavour *string `yaml:"ingress-flavour,omitempty"` - GrpcBufferSize *int `yaml:"grpc-buffer-size,omitempty"` // 单位:M - L7LogSessionAggrTimeout *string `yaml:"l7-log-session-aggr-timeout,omitempty"` // 单位: s - L7LogSessionSlotCapacity *int `yaml:"l7-log-session-slot-capacity,omitempty"` - TapMacScript *string `yaml:"tap-mac-script,omitempty"` - BpfDisabled *bool `yaml:"bpf-disabled,omitempty"` - L7ProtocolInferenceMaxFailCount *uint64 `yaml:"l7-protocol-inference-max-fail-count,omitempty"` - L7ProtocolInferenceTtl *uint64 `yaml:"l7-protocol-inference-ttl,omitempty"` - OracleParseConfig *OracleConfig `yaml:"oracle-parse-config,omitempty"` - PacketSequenceBlockSize *int `yaml:"packet-sequence-block-size,omitempty"` - PacketSequenceQueueSize *int `yaml:"packet-sequence-queue-size,omitempty"` - PacketSequenceQueueCount *int `yaml:"packet-sequence-queue-count,omitempty"` - PacketSequenceFlag *uint8 `yaml:"packet-sequence-flag,omitempty"` - L7ProtocolEnabled []string `yaml:"l7-protocol-enabled,omitempty"` - StandaloneDataFileSize *uint64 `yaml:"standalone-data-file-size,omitempty"` - StandaloneDataFileDir *string `yaml:"standalone-data-file-dir,omitempty"` - LogFile *string `yaml:"log-file,omitempty"` - ExternalAgentHttpProxyCompressed *bool `yaml:"external-agent-http-proxy-compressed,omitempty"` - FeatureFlags []string `yaml:"feature-flags,omitempty"` - L7ProtocolPorts map[string]string `yaml:"l7-protocol-ports,omitempty"` - L7ProtocolAdvancedFeatures *L7ProtocolAdvancedFeatures `yaml:"l7-protocol-advanced-features,omitempty"` - Ebpf *EbpfConfig `yaml:"ebpf,omitempty"` - OsAppTagExecUser *string `yaml:"os-app-tag-exec-user,omitempty"` - OsAppTagExec []string `yaml:"os-app-tag-exec,omitempty"` - OsProcRoot *string `yaml:"os-proc-root,omitempty"` - OsProcSocketSyncInterval *int `yaml:"os-proc-socket-sync-interval,omitempty"` - OsProcSocketMinLifetime *int `yaml:"os-proc-socket-min-lifetime,omitempty"` - OsProcRegex []*OsProcRegex `yaml:"os-proc-regex,omitempty"` - OsProcSyncEnabled *bool `yaml:"os-proc-sync-enabled,omitempty"` - OsProcSyncTaggedOnly *bool `yaml:"os-proc-sync-tagged-only,omitempty"` - GuardInterval *string `yaml:"guard-interval,omitempty"` - CheckCoreFileDisabled *bool `yaml:"check-core-file-disabled,omitempty"` - SoPlugins []string `yaml:"so-plugins,omitempty"` - MemoryTrimDisabled *bool `yaml:"memory-trim-disabled,omitempty"` - FastPathDisabled *bool `yaml:"fast-path-disabled,omitempty"` - ForwardCapacity *uint32 `yaml:"forward-capacity,omitempty"` - RrtTcpTimeout *string `yaml:"rrt-tcp-timeout,omitempty"` - RrtUdpTimeout *string `yaml:"rrt-udp-timeout,omitempty"` - PrometheusExtraConfig *PrometheusExtraConfig `yaml:"prometheus-extra-config,omitempty"` - ProcessSchedulingPriority *int8 `yaml:"process-scheduling-priority,omitempty"` - CpuAffinity *string `yaml:"cpu-affinity,omitempty"` - ExternalProfileIntegrationDisabled *bool `yaml:"external-profile-integration-disabled,omitempty"` - ExternalTraceIntegrationDisabled *bool `yaml:"external-trace-integration-disabled,omitempty"` - ExternalMetricIntegrationDisabled *bool `yaml:"external-metric-integration-disabled,omitempty"` - ExternalLogIntegrationDisabled *bool `yaml:"external_log_integration_disabled,omitempty"` - NtpMaxInterval *string `yaml:"ntp-max-interval,omitempty"` - NtpMinInterval *string `yaml:"ntp-min-interval,omitempty"` - DispatcherQueue *bool `yaml:"dispatcher-queue,omitempty"` - EbpfCollectorQueueSize *int `yaml:"ebpf-collector-queue-size,omitempty"` + ProxyControllerPort *uint16 `yaml:"proxy-controller-port,omitempty"` + LogLevel *string `yaml:"log-level,omitempty"` + Profiler *bool `yaml:"profiler,omitempty"` + AfpacketBlocksEnabled *bool `yaml:"afpacket-blocks-enabled,omitempty"` + AfpacketBlocks *int `yaml:"afpacket-blocks,omitempty"` + AnalyzerRawPacketBlockSize *int `yaml:"analyzer-raw-packet-block-size,omitempty"` + BatchedBufferSizeLimit *int `yaml:"batched-buffer-size-limit,omitempty"` + EnableDebugStats *bool `yaml:"enable-debug-stats,omitempty"` + AnalyzerDedupDisabled *bool `yaml:"analyzer-dedup-disabled,omitempty"` + DefaultTapType *uint32 `yaml:"default-tap-type,omitempty"` + DebugListenPort *uint16 `yaml:"debug-listen-port,omitempty"` + EnableQosBypass *bool `yaml:"enable-qos-bypass,omitempty"` + FastPathMapSize *int `yaml:"fast-path-map-size,omitempty"` + FirstPathLevel *int `yaml:"first-path-level,omitempty"` + LocalDispatcherCount *int `yaml:"local-dispatcher-count,omitempty"` + SrcInterfaces []string `yaml:"src-interfaces,omitempty"` + CloudGatewayTraffic *bool `yaml:"cloud-gateway-traffic,omitempty"` + MirrorTrafficPcp *uint16 `yaml:"mirror-traffic-pcp,omitempty"` + PCap *PCapConfig `yaml:"pcap,omitempty"` + Flow *FlowGeneratorConfig `yaml:"flow,omitempty"` + FlowQueueSize *int `yaml:"flow-queue-size,omitempty"` + QuadrupleQueueSize *int `yaml:"quadruple-queue-size,omitempty"` + AnalyzerQueueSize *int `yaml:"analyzer-queue-size,omitempty"` + DpdkEnabled *bool `yaml:"dpdk-enabled,omitempty"` + LibpcapEnabled *bool `yaml:"libpcap-enabled,omitempty"` + XflowCollector *XflowCollectorConfig `yaml:"xflow-collector,omitempty"` + NpbPort *uint16 `yaml:"npb-port,omitempty"` + VxlanFlags *uint8 `yaml:"vxlan-flags,omitempty"` + IgnoreOverlayVlan *bool `yaml:"ignore-overlay-vlan,omitempty"` + CollectorSenderQueueSize *int `yaml:"collector-sender-queue-size,omitempty"` + CollectorSenderQueueCount *int `yaml:"collector-sender-queue-count,omitempty"` + ToaSenderQueueSize *int `yaml:"toa-sender-queue-size,omitempty"` + ToaLruCacheSize *int `yaml:"toa-lru-cache-size,omitempty"` + FlowSenderQueueSize *int `yaml:"flow-sender-queue-size,omitempty"` + FlowSenderQueueCount *int `yaml:"flow-sender-queue-count,omitempty"` + SecondFlowExtraDelaySecond *string `yaml:"second-flow-extra-delay-second,omitempty"` + PacketDelay *string `yaml:"packet-delay,omitempty"` + Triple *TripleMapConfig `yaml:"triple,omitempty"` + KubernetesPollerType *string `yaml:"kubernetes-poller-type,omitempty"` + DecapErspan *bool `yaml:"decap-erspan,omitempty"` + AnalyzerIp *string `yaml:"analyzer-ip,omitempty"` + AnalyzerPort *uint16 `yaml:"analyzer-port,omitempty"` + KubernetesNamespace *string `yaml:"kubernetes-namespace,omitempty"` + KubernetesAPIListLimit *uint32 `yaml:"kubernetes-api-list-limit,omitempty"` + KubernetesAPIListInterval *string `yaml:"kubernetes-api-list-interval,omitempty"` + KubernetesResources []KubernetesResourceConfig `yaml:"kubernetes-resources,omitempty"` + IngressFlavour *string `yaml:"ingress-flavour,omitempty"` + GrpcBufferSize *int `yaml:"grpc-buffer-size,omitempty"` // 单位:M + L7LogSessionAggrTimeout *string `yaml:"l7-log-session-aggr-timeout,omitempty"` // 单位: s + L7LogSessionSlotCapacity *int `yaml:"l7-log-session-slot-capacity,omitempty"` + TapMacScript *string `yaml:"tap-mac-script,omitempty"` + BpfDisabled *bool `yaml:"bpf-disabled,omitempty"` + L7ProtocolInferenceMaxFailCount *uint64 `yaml:"l7-protocol-inference-max-fail-count,omitempty"` + L7ProtocolInferenceTtl *uint64 `yaml:"l7-protocol-inference-ttl,omitempty"` + OracleParseConfig *OracleConfig `yaml:"oracle-parse-config,omitempty"` + PacketSequenceBlockSize *int `yaml:"packet-sequence-block-size,omitempty"` + PacketSequenceQueueSize *int `yaml:"packet-sequence-queue-size,omitempty"` + PacketSequenceQueueCount *int `yaml:"packet-sequence-queue-count,omitempty"` + PacketSequenceFlag *uint8 `yaml:"packet-sequence-flag,omitempty"` + L7ProtocolEnabled []string `yaml:"l7-protocol-enabled,omitempty"` + StandaloneDataFileSize *uint64 `yaml:"standalone-data-file-size,omitempty"` + StandaloneDataFileDir *string `yaml:"standalone-data-file-dir,omitempty"` + LogFile *string `yaml:"log-file,omitempty"` + ExternalAgentHttpProxyCompressed *bool `yaml:"external-agent-http-proxy-compressed,omitempty"` + FeatureFlags []string `yaml:"feature-flags,omitempty"` + L7ProtocolPorts map[string]string `yaml:"l7-protocol-ports,omitempty"` + L7LogBlacklist map[string][]*L7LogBlacklist `yaml:"l7-log-blacklist,omitempty"` + L7ProtocolAdvancedFeatures *L7ProtocolAdvancedFeatures `yaml:"l7-protocol-advanced-features,omitempty"` + Ebpf *EbpfConfig `yaml:"ebpf,omitempty"` + OsAppTagExecUser *string `yaml:"os-app-tag-exec-user,omitempty"` + OsAppTagExec []string `yaml:"os-app-tag-exec,omitempty"` + OsProcRoot *string `yaml:"os-proc-root,omitempty"` + OsProcSocketSyncInterval *int `yaml:"os-proc-socket-sync-interval,omitempty"` + OsProcSocketMinLifetime *int `yaml:"os-proc-socket-min-lifetime,omitempty"` + OsProcRegex []*OsProcRegex `yaml:"os-proc-regex,omitempty"` + OsProcSyncEnabled *bool `yaml:"os-proc-sync-enabled,omitempty"` + OsProcSyncTaggedOnly *bool `yaml:"os-proc-sync-tagged-only,omitempty"` + GuardInterval *string `yaml:"guard-interval,omitempty"` + CheckCoreFileDisabled *bool `yaml:"check-core-file-disabled,omitempty"` + SoPlugins []string `yaml:"so-plugins,omitempty"` + MemoryTrimDisabled *bool `yaml:"memory-trim-disabled,omitempty"` + FastPathDisabled *bool `yaml:"fast-path-disabled,omitempty"` + ForwardCapacity *uint32 `yaml:"forward-capacity,omitempty"` + RrtTcpTimeout *string `yaml:"rrt-tcp-timeout,omitempty"` + RrtUdpTimeout *string `yaml:"rrt-udp-timeout,omitempty"` + PrometheusExtraConfig *PrometheusExtraConfig `yaml:"prometheus-extra-config,omitempty"` + ProcessSchedulingPriority *int8 `yaml:"process-scheduling-priority,omitempty"` + CpuAffinity *string `yaml:"cpu-affinity,omitempty"` + ExternalProfileIntegrationDisabled *bool `yaml:"external-profile-integration-disabled,omitempty"` + ExternalTraceIntegrationDisabled *bool `yaml:"external-trace-integration-disabled,omitempty"` + ExternalMetricIntegrationDisabled *bool `yaml:"external-metric-integration-disabled,omitempty"` + ExternalLogIntegrationDisabled *bool `yaml:"external_log_integration_disabled,omitempty"` + NtpMaxInterval *string `yaml:"ntp-max-interval,omitempty"` + NtpMinInterval *string `yaml:"ntp-min-interval,omitempty"` + DispatcherQueue *bool `yaml:"dispatcher-queue,omitempty"` + EbpfCollectorQueueSize *int `yaml:"ebpf-collector-queue-size,omitempty"` } type XflowCollectorConfig struct { @@ -318,10 +319,17 @@ type ExtraLogFields struct { Grpc []ExtraLogFieldsInfo `yaml:"grpc,omitempty"` } +type L7LogBlacklist struct { + FieldName string `yaml:"field-name,omitempty"` + Operator string `yaml:"operator,omitempty"` + Value string `yaml:"value,omitempty"` +} + type L7ProtocolAdvancedFeatures struct { - HttpEndpointExtraction *HttpEndpointExtraction `yaml:"http-endpoint-extraction,omitempty"` - ObfuscateEnabledProtocols []string `yaml:"obfuscate-enabled-protocols,omitempty"` - ExtraLogFields *ExtraLogFields `yaml:"extra-log-fields,omitempty"` + HttpEndpointExtraction *HttpEndpointExtraction `yaml:"http-endpoint-extraction,omitempty"` + ObfuscateEnabledProtocols []string `yaml:"obfuscate-enabled-protocols,omitempty"` + ExtraLogFields *ExtraLogFields `yaml:"extra-log-fields,omitempty"` + UnconcernedDnsNxdomainResponseSuffixes []string `yaml:"unconcerned-dns-nxdomain-response-suffixes,omitempty"` } type OracleConfig struct { diff --git a/server/agent_config/example.yaml b/server/agent_config/example.yaml index bd65bea8e87..8f52c4e89f4 100644 --- a/server/agent_config/example.yaml +++ b/server/agent_config/example.yaml @@ -1065,6 +1065,21 @@ vtap_group_id: g-xxxxxx #"TLS": "443,6443" #"Custom": "1-65535" # plugins + ## l7_flow_log Blacklist + ## Example: + ## l7-log-blacklist: + ## HTTP: + ## - field_name: request_resource # endpoint, request_type, request_domain, request_resource + ## operator: equal # equal, prefix + ## value: somevalue + ## Note: A l7_flow_log blacklist can be configured for each protocol, preventing request logs matching + ## the blacklist from being collected by the agent or included in application performance metrics. + ## It's recommended to only place non-business request logs like heartbeats or health checks in this + ## blacklist. Including business request logs might lead to breaks in the distributed tracing tree. + #l7-log-blacklist: + # HTTP: [] + # HTTP2: [] + ## L7 Protocol Advanced Features #l7-protocol-advanced-features: @@ -1103,6 +1118,16 @@ vtap_group_id: g-xxxxxx # http: [] # http2: [] + ## Unconcerned DNS NXDOMAIN Responses + ## Note: You might not be concerned about certain DNS NXDOMAIN errors and may wish to ignore + ## them. For example, when a K8s Pod tries to resolve an external domain name, it first + ## concatenates it with the internal domain suffix of the cluster and attempts to resolve + ## it. All these attempts will receive an NXDOMAIN reply before it finally requests the + ## original domain name directly, and these errors may not be of concern to you. In such + ## cases, you can configure their `response_result` suffix here, so that the corresponding + ## `response_status` in the l7_flow_log is forcibly set to `Success`. + #unconcerned-dns-nxdomain-response-suffixes: [] + #oracle-parse-config: #is-be: true #int-compress: true