Skip to content

Commit

Permalink
Merge branch 'main' into feature-improve-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sharang authored Nov 4, 2024
2 parents 5683545 + 7b88a66 commit 88403e5
Show file tree
Hide file tree
Showing 59 changed files with 8,820 additions and 189 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
### <a id="main"></a>DeepFlow release main

#### Bug Fix
* fix: Mega units in configuration [#8435](https://github.com/deepflowio/deepflow/pull/8435) by [rvql](https://github.com/rvql)
* fix: L4/L7 log store tap types configuration [#8434](https://github.com/deepflowio/deepflow/pull/8434) by [rvql](https://github.com/rvql)
* fix: Invalid type in config cause parse failure [#8433](https://github.com/deepflowio/deepflow/pull/8433) by [rvql](https://github.com/rvql)
* fix: container env supports aggr packets with and without tunnels [#8427](https://github.com/deepflowio/deepflow/pull/8427) by [yuanchaoa](https://github.com/yuanchaoa)
* fix: modify the matching logic of flow_node [#8420](https://github.com/deepflowio/deepflow/pull/8420) by [yuanchaoa](https://github.com/yuanchaoa)
* fix: Compatible with older configurations [#8377](https://github.com/deepflowio/deepflow/pull/8377) by [jin-xiaofeng](https://github.com/jin-xiaofeng)
Expand Down Expand Up @@ -227,6 +230,7 @@
* Fix prometheus data cannot be labeled with universal tags,if slow-decoder is used. [#7100](https://github.com/deepflowio/deepflow/pull/7100)

#### NEW FEATURE
* feat: agent - eBPF Support for the ARM version of Kylin v10 SP2 [#8439](https://github.com/deepflowio/deepflow/pull/8439) by [yinjiping](https://github.com/yinjiping)
* feat: Change bpf map feat to feat_flags to support multi-function [#8424](https://github.com/deepflowio/deepflow/pull/8424) by [rvql](https://github.com/rvql)
* feat: modify agent.proto [#8416](https://github.com/deepflowio/deepflow/pull/8416) by [yuanchaoa](https://github.com/yuanchaoa)
* feat: querier add query cache config [#8404](https://github.com/deepflowio/deepflow/pull/8404) by [xiaochaoren1](https://github.com/xiaochaoren1)
Expand Down
79 changes: 46 additions & 33 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,15 @@ impl Default for GolangSpecific {
pub struct Java {
#[serde(with = "humantime_serde")]
pub refresh_defer_duration: Duration,
#[serde(deserialize_with = "deser_usize_with_mega_unit")]
pub max_symbol_file_size: usize,
}

impl Default for Java {
fn default() -> Self {
Self {
refresh_defer_duration: Duration::from_secs(60),
max_symbol_file_size: 10,
max_symbol_file_size: 10 << 20,
}
}
}
Expand Down Expand Up @@ -1806,8 +1807,10 @@ pub struct Processors {
#[serde(default)]
pub struct Limits {
pub max_millicpus: u32,
#[serde(deserialize_with = "deser_u64_with_mega_unit")]
pub max_memory: u64,
pub max_log_backhaul_rate: u32,
#[serde(deserialize_with = "deser_u32_with_mega_unit")]
pub max_local_log_file_size: u32,
#[serde(with = "humantime_serde")]
pub local_log_retention: Duration,
Expand All @@ -1817,9 +1820,9 @@ impl Default for Limits {
fn default() -> Self {
Self {
max_millicpus: 1000,
max_memory: 768,
max_memory: 768 << 20,
max_log_backhaul_rate: 300,
max_local_log_file_size: 1000,
max_local_log_file_size: 1000 << 20,
local_log_retention: Duration::from_secs(300 * 24 * 3600),
}
}
Expand Down Expand Up @@ -1911,6 +1914,7 @@ impl Default for RelativeSysLoad {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct TxThroughput {
#[serde(deserialize_with = "deser_u64_with_mega_unit")]
pub trigger_threshold: u64,
#[serde(with = "humantime_serde")]
pub throughput_monitoring_interval: Duration,
Expand Down Expand Up @@ -1983,6 +1987,7 @@ pub struct Communication {
pub max_escape_duration: Duration,
pub ingester_ip: String,
pub ingester_port: u16,
#[serde(deserialize_with = "deser_usize_with_mega_unit")]
pub grpc_buffer_size: usize,
pub request_via_nat_ip: bool,
pub proxy_controller_ip: String,
Expand All @@ -1998,7 +2003,7 @@ impl Default for Communication {
proxy_controller_port: 30035,
ingester_ip: "".to_string(),
ingester_port: 30033,
grpc_buffer_size: 5,
grpc_buffer_size: 5 << 20,
request_via_nat_ip: false,
}
}
Expand Down Expand Up @@ -2074,14 +2079,15 @@ impl Default for SelfMonitoring {
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct StandaloneMode {
#[serde(deserialize_with = "deser_u32_with_mega_unit")]
pub max_data_file_size: u32,
pub data_file_dir: String,
}

impl Default for StandaloneMode {
fn default() -> Self {
Self {
max_data_file_size: 200,
max_data_file_size: 200 << 20,
data_file_dir: "/var/log/deepflow_agent/".to_string(),
}
}
Expand Down Expand Up @@ -2177,8 +2183,8 @@ impl Default for Socket {
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct FlowLogFilters {
pub l4_capture_network_types: Vec<u16>,
pub l7_capture_network_types: Vec<u16>,
pub l4_capture_network_types: Vec<i16>,
pub l7_capture_network_types: Vec<i16>,
pub l4_ignored_observation_points: Vec<u16>,
pub l7_ignored_observation_points: Vec<u16>,
}
Expand Down Expand Up @@ -2317,6 +2323,7 @@ pub struct Npb {
#[serde(deserialize_with = "parse_maybe_binary_u8")]
pub custom_vxlan_flags: u8,
pub overlay_vlan_header_trimming: bool,
#[serde(deserialize_with = "deser_u64_with_mega_unit")]
pub max_tx_throughput: u64,
}

Expand All @@ -2330,7 +2337,7 @@ impl Default for Npb {
target_port: 4789,
custom_vxlan_flags: 0b1111_1111,
overlay_vlan_header_trimming: false,
max_tx_throughput: 1000,
max_tx_throughput: 1000 << 20,
}
}
}
Expand Down Expand Up @@ -2376,9 +2383,9 @@ impl From<&RuntimeConfig> for UserConfig {
global: Global {
limits: Limits {
max_millicpus: rc.max_millicpus,
max_memory: rc.max_memory,
max_memory: rc.max_memory << 20,
max_log_backhaul_rate: rc.log_threshold,
max_local_log_file_size: rc.log_file_size,
max_local_log_file_size: rc.log_file_size << 20,
local_log_retention: Duration::from_secs(rc.log_retention as u64),
},
alerts: Alerts {
Expand All @@ -2401,7 +2408,7 @@ impl From<&RuntimeConfig> for UserConfig {
.unwrap_or(SystemLoadMetric::Load15),
},
tx_throughput: TxThroughput {
trigger_threshold: rc.server_tx_bandwidth_threshold,
trigger_threshold: rc.server_tx_bandwidth_threshold << 20,
throughput_monitoring_interval: rc.bandwidth_probe_interval,
},
},
Expand Down Expand Up @@ -2432,7 +2439,7 @@ impl From<&RuntimeConfig> for UserConfig {
proxy_controller_port: rc.proxy_controller_port,
ingester_ip: rc.analyzer_ip.clone(),
ingester_port: rc.analyzer_port,
grpc_buffer_size: rc.yaml_config.grpc_buffer_size,
grpc_buffer_size: rc.yaml_config.grpc_buffer_size << 20,
request_via_nat_ip: false, // TODO: This configuration is not used
},
self_monitoring: SelfMonitoring {
Expand All @@ -2453,7 +2460,7 @@ impl From<&RuntimeConfig> for UserConfig {
interval: Duration::from_secs(rc.stats_interval),
},
standalone_mode: StandaloneMode {
max_data_file_size: rc.yaml_config.standalone_data_file_size,
max_data_file_size: rc.yaml_config.standalone_data_file_size << 20,
data_file_dir: rc.yaml_config.standalone_data_file_dir.clone(),
},
common: GlobalCommon {
Expand Down Expand Up @@ -2628,7 +2635,7 @@ impl From<&RuntimeConfig> for UserConfig {
.yaml_config
.ebpf
.java_symbol_file_refresh_defer_interval,
max_symbol_file_size: 10, // TODO: No corresponding configuration is found
..Default::default()
},
},
},
Expand All @@ -2653,7 +2660,6 @@ impl From<&RuntimeConfig> for UserConfig {
extra_netns_regex: rc.extra_netns_regex.clone(),
extra_bpf_filter: rc.capture_bpf.clone(),
vlan_pcp_in_physical_mirror_traffic: rc.yaml_config.mirror_traffic_pcp,
bpf_filter_disabled: false, // TODO: No corresponding configuration is found
tunning: AfPacketTunning {
socket_version: agent::CaptureSocketType::from_str_name(
rc.capture_socket_type.as_str_name(),
Expand Down Expand Up @@ -2838,10 +2844,7 @@ impl From<&RuntimeConfig> for UserConfig {
ingress_flavour: "kubernetes".to_string(), // deprecated
pod_mac_collection_method: rc.yaml_config.kubernetes_poller_type,
},
pull_resource_from_controller: PullResourceFromController {
domain_filter: vec![], // TODO: No corresponding configuration is found
only_kubernetes_pod_ip_in_local_cluster: false, // TODO: No corresponding configuration is found
},
..Default::default()
},
integration: Integration {
enabled: rc.external_agent_http_proxy_enabled,
Expand Down Expand Up @@ -2886,12 +2889,12 @@ impl From<&RuntimeConfig> for UserConfig {
l4_capture_network_types: rc
.l4_log_store_tap_types
.iter()
.map(|t| *t as u16)
.map(|t| *t as i16)
.collect(),
l7_capture_network_types: rc
.l7_log_store_tap_types
.iter()
.map(|t| *t as u16)
.map(|t| *t as i16)
.collect(),
l4_ignored_observation_points: rc
.l4_log_ignore_tap_sides
Expand Down Expand Up @@ -2938,7 +2941,7 @@ impl From<&RuntimeConfig> for UserConfig {
target_port: rc.yaml_config.npb_port,
custom_vxlan_flags: rc.yaml_config.vxlan_flags,
overlay_vlan_header_trimming: rc.yaml_config.ignore_overlay_vlan,
max_tx_throughput: rc.npb_bps_threshold,
max_tx_throughput: rc.npb_bps_threshold << 20,
},
},
processors: Processors {
Expand Down Expand Up @@ -3371,18 +3374,7 @@ impl UserConfig {
}
}

fn modify_unit(&mut self) {
self.global.circuit_breakers.tx_throughput.trigger_threshold <<= 20;
self.global.communication.grpc_buffer_size <<= 20;
self.global.limits.max_memory <<= 20;
self.global.limits.max_local_log_file_size <<= 20;
self.global.standalone_mode.max_data_file_size <<= 20;
self.inputs.proc.symbol_table.java.max_symbol_file_size <<= 20;
self.outputs.npb.max_tx_throughput <<= 20;
}

fn modify(&mut self) {
self.modify_unit();
self.modify_decap_types();
}

Expand Down Expand Up @@ -5098,6 +5090,27 @@ fn resolve_domain(addr: &str) -> Option<String> {
}
}

fn deser_u32_with_mega_unit<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
{
Ok(u32::deserialize(deserializer)? << 20)
}

fn deser_u64_with_mega_unit<'de, D>(deserializer: D) -> Result<u64, D::Error>
where
D: Deserializer<'de>,
{
Ok(u64::deserialize(deserializer)? << 20)
}

fn deser_usize_with_mega_unit<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
Ok(usize::deserialize(deserializer)? << 20)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
74 changes: 23 additions & 51 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,20 @@ impl PluginConfig {
}
}

fn generate_tap_types_array(types: &[i16]) -> [bool; 256] {
let mut tap_types = [false; 256];
for &t in types {
if t == -1 {
return [false; 256];
} else if t < 0 || (t as u16) >= u16::from(CaptureNetworkType::Max) {
warn!("invalid tap type: {}", t);
} else {
tap_types[t as usize] = true;
}
}
tap_types
}

#[derive(Clone, PartialEq, Eq)]
pub struct FlowConfig {
pub agent_id: u16,
Expand Down Expand Up @@ -484,23 +498,9 @@ impl From<(&UserConfig, &DynamicConfig)> for FlowConfig {
.physical_mirror
.private_cloud_gateway_traffic,
collector_enabled: conf.outputs.flow_metrics.enabled,
l7_log_tap_types: {
let mut tap_types = [false; 256];
for &t in conf
.outputs
.flow_log
.filters
.l7_capture_network_types
.iter()
{
if (t as u16) >= u16::from(CaptureNetworkType::Max) {
warn!("invalid tap type: {}", t);
} else {
tap_types[t as usize] = true;
}
}
tap_types
},
l7_log_tap_types: generate_tap_types_array(
&conf.outputs.flow_log.filters.l7_capture_network_types,
),
capacity: conf.processors.flow_log.tunning.concurrent_flow_limit,
hash_slots: conf.processors.flow_log.tunning.flow_map_hash_slots,
packet_delay: conf
Expand Down Expand Up @@ -1715,23 +1715,9 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig {
l7_metrics_enabled: conf.outputs.flow_metrics.filters.apm_metrics,
agent_type: conf.global.common.agent_type,
agent_id: dynamic_config.agent_id() as u16,
l4_log_store_tap_types: {
let mut tap_types = [false; 256];
for &t in conf
.outputs
.flow_log
.filters
.l4_capture_network_types
.iter()
{
if (t as u16) >= u16::from(CaptureNetworkType::Max) {
warn!("invalid tap type: {}", t);
} else {
tap_types[t as usize] = true;
}
}
tap_types
},
l4_log_store_tap_types: generate_tap_types_array(
&conf.outputs.flow_log.filters.l4_capture_network_types,
),
l4_log_ignore_tap_sides: {
let mut tap_sides = [false; TapSide::MAX as usize + 1];
for t in conf
Expand Down Expand Up @@ -1998,23 +1984,9 @@ impl TryFrom<(Config, UserConfig, DynamicConfig)> for ModuleConfig {
.session_aggregate_window_duration,
l7_log_packet_size: CAP_LEN_MAX
.min(conf.processors.request_log.tunning.payload_truncation as usize),
l7_log_tap_types: {
let mut tap_types = [false; 256];
for &t in conf
.outputs
.flow_log
.filters
.l7_capture_network_types
.iter()
{
if t >= u16::from(CaptureNetworkType::Max) {
warn!("invalid tap type: {}", t);
} else {
tap_types[t as usize] = true;
}
}
tap_types
},
l7_log_tap_types: generate_tap_types_array(
&conf.outputs.flow_log.filters.l7_capture_network_types,
),
l7_protocol_inference_max_fail_count: conf
.processors
.request_log
Expand Down
16 changes: 13 additions & 3 deletions agent/src/ebpf/kernel/include/protocol_inference.h
Original file line number Diff line number Diff line change
Expand Up @@ -1509,16 +1509,26 @@ static __inline bool mqtt_decoding_length(const __u8 * buffer, int *length,
buffer += 1;
*length = 0;
*lensize = 0;
do {

/*
* Limit the number of loop iterations, ensuring the byte usage remains
* within 32 bytes. This also resolves the issue of loading eBPF bytecode
* on the 4.19.90-25.24.v2101.ky10.aarch64 kernel.
*/
static const int loop_count = 32;
for (int i = 0; i < loop_count; i++) {
digit = buffer[(*lensize)++];
*length += (digit & 127) * multiplier;
multiplier *= 128;

// mqtt 最多用4个字节表示长度
if ((*lensize) > 4)
return false;
} while ((digit & 128) != 0);
return true;
if((digit & 128) == 0)
return true;
}

return false;
}

static __inline bool mqtt_decoding_message_type(const __u8 * buffer,
Expand Down
Loading

0 comments on commit 88403e5

Please sign in to comment.