Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support dd integration #8538

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum SendMessageType {
ApplicationLog = 17,
SyslogDetail = 18,
SkyWalking = 19,
Datadog = 20,
}

impl fmt::Display for SendMessageType {
Expand All @@ -80,6 +81,7 @@ impl fmt::Display for SendMessageType {
Self::ApplicationLog => write!(f, "application_log"),
Self::SyslogDetail => write!(f, "syslog_detail"),
Self::SkyWalking => write!(f, "skywalking"),
Self::Datadog => write!(f, "datadog"),
}
}
}
2 changes: 1 addition & 1 deletion agent/plugins/integration_skywalking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use public::{
use std::net::SocketAddr;

#[derive(Debug, PartialEq)]
pub struct SkyWalkingExtra(pub flow_log::SkyWalkingExtra);
pub struct SkyWalkingExtra(pub flow_log::ThirdPartyTrace);

impl Sendable for SkyWalkingExtra {
fn encode(self, _: &mut Vec<u8>) -> Result<usize, EncodeError> {
Expand Down
70 changes: 70 additions & 0 deletions agent/src/integration_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use public::{
l7_protocol::L7Protocol,
proto::{
agent::Exception,
flow_log,
integration::opentelemetry::proto::{
common::v1::{
any_value::Value::{IntValue, StringValue},
Expand Down Expand Up @@ -226,6 +227,19 @@ async fn aggregate_with_catch_exception(
})
}

#[derive(Debug, PartialEq)]
pub struct Datadog(flow_log::ThirdPartyTrace);

impl Sendable for Datadog {
fn encode(self, buf: &mut Vec<u8>) -> Result<usize, prost::EncodeError> {
self.0.encode(buf).map(|_| self.0.encoded_len())
}

fn message_type(&self) -> SendMessageType {
SendMessageType::Datadog
}
}

// for log capture from vector
#[derive(Debug, PartialEq)]
pub struct ApplicationLog(Vec<u8>);
Expand Down Expand Up @@ -599,6 +613,7 @@ async fn handler(
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
exception_handler: ExceptionHandler,
compressed: bool,
profile_compressed: bool,
Expand Down Expand Up @@ -862,6 +877,37 @@ async fn handler(
)
.await)
}
(
&Method::POST,
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.114.0/receiver/datadogreceiver/README.md?plain=1#L65
"/api/v0.2/traces" | "/v0.3/traces" | "/v0.4/traces" | "/v0.5/traces" | "/v0.7/traces",
) => {
if external_trace_integration_disabled {
return Ok(Response::builder().body(Body::empty()).unwrap());
}
let (part, body) = req.into_parts();
let whole_body = match aggregate_with_catch_exception(body, &exception_handler).await {
Ok(b) => b,
Err(e) => {
return Ok(e);
}
};

let mut third_party_data = flow_log::ThirdPartyTrace::default();
parse_dd_headers(&part.headers, &mut third_party_data);
third_party_data.data = decode_metric(whole_body, &part.headers)?;
third_party_data.uri = part.uri.path().to_string();
third_party_data.peer_ip = match peer_addr.ip() {
IpAddr::V4(ip4) => ip4.octets().to_vec(),
IpAddr::V6(ip6) => ip6.octets().to_vec(),
};

if let Err(e) = datadog_sender.send(Datadog(third_party_data)) {
warn!("datadog_sender failed to send data, because {:?}", e);
}

Ok(Response::builder().body(Body::empty()).unwrap())
}
// Return the 404 Not Found for other routes.
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -904,6 +950,23 @@ fn parse_profile_query(query: &str, profile: &mut metric::Profile) {
};
}

fn parse_dd_headers(headers: &HeaderMap, third_party_data: &mut flow_log::ThirdPartyTrace) {
for key in vec![
"Datadog-Meta-Lang", // headers.lang
"Datadog-Meta-Lang-Version", // headers.lang_version
"Datadog-Meta-Tracer-Version", // headers.tracer_version
"Datadog-Container-Id", // headers.container_id
"Content-Type", // for decode format validate
] {
if let Some(value) = headers.get(key) {
third_party_data.extend_keys.push(key.to_string());
third_party_data
.extend_values
.push(value.to_str().unwrap_or_default().to_string());
}
}
}

#[derive(Default)]
struct CompressedMetric {
compressed: AtomicU64, // unit (bytes)
Expand Down Expand Up @@ -963,6 +1026,7 @@ pub struct MetricServer {
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
port: Arc<AtomicU16>,
exception_handler: ExceptionHandler,
server_shutdown_tx: Mutex<Option<mpsc::Sender<()>>>,
Expand Down Expand Up @@ -991,6 +1055,7 @@ impl MetricServer {
profile_sender: DebugSender<Profile>,
application_log_sender: DebugSender<ApplicationLog>,
skywalking_sender: DebugSender<SkyWalkingExtra>,
datadog_sender: DebugSender<Datadog>,
port: u16,
exception_handler: ExceptionHandler,
compressed: bool,
Expand Down Expand Up @@ -1020,6 +1085,7 @@ impl MetricServer {
profile_sender,
application_log_sender,
skywalking_sender,
datadog_sender,
port: Arc::new(AtomicU16::new(port)),
exception_handler,
server_shutdown_tx: Default::default(),
Expand Down Expand Up @@ -1070,6 +1136,7 @@ impl MetricServer {
let profile_sender = self.profile_sender.clone();
let application_log_sender = self.application_log_sender.clone();
let skywalking_sender = self.skywalking_sender.clone();
let datadog_sender = self.datadog_sender.clone();
let port = self.port.clone();
let monitor_port = Arc::new(AtomicU16::new(port.load(Ordering::Acquire)));
let (mon_tx, mon_rx) = oneshot::channel();
Expand Down Expand Up @@ -1143,6 +1210,7 @@ impl MetricServer {
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let datadog_sender = datadog_sender.clone();
let exception_handler_inner = exception_handler.clone();
let counter = counter.clone();
let compressed = compressed.clone();
Expand All @@ -1161,6 +1229,7 @@ impl MetricServer {
let profile_sender = profile_sender.clone();
let application_log_sender = application_log_sender.clone();
let skywalking_sender = skywalking_sender.clone();
let datadog_sender = datadog_sender.clone();
let exception_handler = exception_handler_inner.clone();
let peer_addr = conn.remote_addr();
let counter = counter.clone();
Expand All @@ -1185,6 +1254,7 @@ impl MetricServer {
profile_sender.clone(),
application_log_sender.clone(),
skywalking_sender.clone(),
datadog_sender.clone(),
exception_handler.clone(),
compressed.load(Ordering::Relaxed),
profile_compressed.load(Ordering::Relaxed),
Expand Down
37 changes: 35 additions & 2 deletions agent/src/trident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ use crate::{
},
handler::{NpbBuilder, PacketHandlerBuilder},
integration_collector::{
ApplicationLog, BoxedPrometheusExtra, MetricServer, OpenTelemetry, OpenTelemetryCompressed,
Profile, TelegrafMetric,
ApplicationLog, BoxedPrometheusExtra, Datadog, MetricServer, OpenTelemetry,
OpenTelemetryCompressed, Profile, TelegrafMetric,
},
metric::document::BoxedDocument,
monitor::Monitor,
Expand Down Expand Up @@ -1572,6 +1572,7 @@ pub struct AgentComponents {
pub proc_event_uniform_sender: UniformSenderThread<BoxedProcEvents>,
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
pub skywalking_uniform_sender: UniformSenderThread<SkyWalkingExtra>,
pub datadog_uniform_sender: UniformSenderThread<Datadog>,
pub exception_handler: ExceptionHandler,
pub proto_log_sender: DebugSender<BoxAppProtoLogsData>,
pub pcap_batch_sender: DebugSender<BoxedPcapBatch>,
Expand Down Expand Up @@ -2491,6 +2492,32 @@ impl AgentComponents {
None,
);

let datadog_queue_name = "1-datadog-to-sender";
let (datadog_sender, datadog_receiver, counter) = queue::bounded_with_debug(
user_config
.processors
.flow_log
.tunning
.flow_aggregator_queue_size,
datadog_queue_name,
&queue_debugger,
);
stats_collector.register_countable(
&QueueStats {
module: datadog_queue_name,
..Default::default()
},
Countable::Owned(Box::new(counter)),
);
let datadog_uniform_sender = UniformSenderThread::new(
datadog_queue_name,
Arc::new(datadog_receiver),
config_handler.sender(),
stats_collector.clone(),
exception_handler.clone(),
None,
);

let ebpf_dispatcher_id = dispatcher_components.len();
#[cfg(any(target_os = "linux", target_os = "android"))]
let mut ebpf_dispatcher_component = None;
Expand Down Expand Up @@ -2735,6 +2762,7 @@ impl AgentComponents {
profile_sender,
application_log_sender,
skywalking_sender,
datadog_sender,
candidate_config.metric_server.port,
exception_handler.clone(),
candidate_config.metric_server.compressed,
Expand Down Expand Up @@ -2820,6 +2848,7 @@ impl AgentComponents {
proc_event_uniform_sender,
application_log_uniform_sender,
skywalking_uniform_sender,
datadog_uniform_sender,
capture_mode: candidate_config.capture_mode,
packet_sequence_uniform_output, // Enterprise Edition Feature: packet-sequence
packet_sequence_uniform_sender, // Enterprise Edition Feature: packet-sequence
Expand Down Expand Up @@ -2909,6 +2938,7 @@ impl AgentComponents {
self.proc_event_uniform_sender.start();
self.application_log_uniform_sender.start();
self.skywalking_uniform_sender.start();
self.datadog_uniform_sender.start();
if self.config.metric_server.enabled {
self.metrics_server_component.start();
}
Expand Down Expand Up @@ -2984,6 +3014,9 @@ impl AgentComponents {
if let Some(h) = self.skywalking_uniform_sender.notify_stop() {
join_handles.push(h);
}
if let Some(h) = self.datadog_uniform_sender.notify_stop() {
join_handles.push(h);
}
// Enterprise Edition Feature: packet-sequence
if let Some(h) = self.packet_sequence_uniform_sender.notify_stop() {
join_handles.push(h);
Expand Down
4 changes: 3 additions & 1 deletion message/flow_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,10 @@ message MqttTopic {
int32 qos = 2; // -1 mean not exist qos
}

message SkyWalkingExtra {
message ThirdPartyTrace {
bytes data = 1;
bytes peer_ip = 2;
string uri = 3;
repeated string extend_keys = 4;
repeated string extend_values = 5;
}
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ replace (
github.com/deepflowio/deepflow/server/controller/http/service/configuration => ./controller/http/service/configuration
github.com/deepflowio/deepflow/server/controller/monitor/license => ./controller/monitor/license
github.com/deepflowio/deepflow/server/ingester/config/configdefaults => ./ingester/config/configdefaults
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import => ./ingester/flow_log/log_data/dd_import
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import => ./ingester/flow_log/log_data/sw_import
github.com/deepflowio/deepflow/server/libs/logger/blocker => ./libs/logger/blocker
github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap => ./querier/app/distributed_tracing/service/tracemap
Expand Down Expand Up @@ -106,6 +107,7 @@ require (
github.com/bytedance/sonic v1.12.5
github.com/deepflowio/deepflow/server/controller/http/appender v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/http/service/agentlicense v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/dd_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/ingester/flow_log/log_data/sw_import v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/libs/logger/blocker v0.0.0-20240822020041-cdaf0f82ce6f
github.com/deepflowio/deepflow/server/querier/app/distributed_tracing/service/tracemap v0.0.0-00010101000000-000000000000
Expand Down
Loading
Loading