Skip to content

Commit

Permalink
[AGENT] Support "key=value" type mysql trace id
Browse files Browse the repository at this point in the history
  • Loading branch information
rvql authored and sharang committed May 10, 2024
1 parent 0f62396 commit 257c231
Showing 1 changed file with 177 additions and 40 deletions.
217 changes: 177 additions & 40 deletions agent/src/flow_generator/protocol_logs/sql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,37 +231,20 @@ impl MysqlInfo {
if config.trace_types.is_empty() && config.span_types.is_empty() {
return;
}
debug!("extract id from sql {}", sql);
debug!("extract id from sql {sql}");
'outer: for comment in comment_parser::MysqlCommentParserIter::new(sql) {
trace!("comment={}", comment);
let mut segs = comment.split(":");
let mut value = segs.next();
loop {
let key = value;
value = segs.next();
if value.is_none() {
break;
};

// take last word before ':' and first word after it
let Some(rk) = key.as_ref().unwrap().trim().split_whitespace().last() else {
continue;
};
let Some(rv) = value.as_ref().unwrap().trim().split_whitespace().next() else {
continue;
};
let rk = rk.trim();
let rv = rv.trim();
trace!("key={} value={}", rk, rv);
trace!("comment={comment}");
for (key, value) in KvExtractor::new(comment) {
trace!("key={key} value={value}");
for tt in config.trace_types.iter() {
if tt.check(rk) {
self.trace_id = tt.decode_trace_id(rv).map(|s| s.to_string());
if tt.check(key) {
self.trace_id = tt.decode_trace_id(value).map(|s| s.to_string());
break;
}
}
for st in config.span_types.iter() {
if st.check(rk) {
self.span_id = st.decode_span_id(rv).map(|s| s.to_string());
if st.check(key) {
self.span_id = st.decode_span_id(value).map(|s| s.to_string());
break;
}
}
Expand Down Expand Up @@ -835,6 +818,98 @@ impl MysqlHeader {
}
}

#[derive(PartialEq)]
enum Token {
Key,
Separator,
Value,
}

pub struct KvExtractor<'a> {
split: Box<dyn Iterator<Item = &'a str> + 'a>,
last_segment: Option<&'a str>,
}

impl<'a> KvExtractor<'a> {
pub fn new(s: &'a str) -> Self {
Self {
split: Box::new(
s.split_inclusive(|c: char| {
c.is_ascii_whitespace() || c == ':' || c == '=' || c == ','
})
.into_iter(),
),
last_segment: None,
}
}
}

impl<'a> Iterator for KvExtractor<'a> {
type Item = (&'a str, &'a str);

fn next(&mut self) -> Option<Self::Item> {
let mut next_token = Token::Key;
let mut last_key = None;
loop {
let Some(seg) = self
.last_segment
.take()
.or_else(|| self.split.as_mut().next())
else {
return None;
};

let Some((last, sep_char)) = seg.char_indices().last() else {
continue;
};
let (mut exp, sep) = seg.split_at(last);
match sep {
"," => {
if !exp.is_empty() && next_token == Token::Value {
return Some((last_key.unwrap(), exp.trim()));
}
next_token = Token::Key; // resets
}
":" | "=" => {
if !exp.is_empty() {
if next_token == Token::Value {
self.last_segment.replace(seg);
return Some((last_key.unwrap(), exp.trim()));
}
// discard previous parsed key if any
last_key = Some(exp.trim());
next_token = Token::Value;
} else if next_token == Token::Separator {
assert!(last_key.is_some());
next_token = Token::Value;
} else {
// invalid separator
next_token = Token::Key;
}
}
_ => {
if exp.is_empty() {
continue;
}
if !sep_char.is_ascii_whitespace() {
exp = seg;
}
match next_token {
Token::Key | Token::Separator => {
last_key = Some(exp.trim());
next_token = Token::Separator;
}
Token::Value => {
self.last_segment.replace(seg);
return Some((last_key.unwrap(), exp.trim()));
}
}
}
}
}
}
}

// test log parse
#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -1065,21 +1140,26 @@ mod tests {
.start()
.unwrap();
let testcases = vec![
(
"/* traceparent: 00-trace_id-span_id-01 */ SELECT * FROM table",
Some("trace_id"),
Some("span_id"),
),
(
"/* traceparent: traceparent \t : 00-trace_id-span_id-01 */ SELECT * FROM table",
Some("trace_id"),
Some("span_id"),
),
(
" SELECT * FROM table # traceparent: traceparent \ttRaCeId \t: 00-trace_id-span_id-01: traceparent",
Some("00-trace_id-span_id-01"),
None,
),
(
"/* traceparent: 00-trace_id-span_id-01 */ SELECT * FROM table",
Some("trace_id"),
Some("span_id"),
),
(
"/* traceparent: traceparent \t : 00-trace_id-span_id-01 */ SELECT * FROM table",
Some("trace_id"),
Some("span_id"),
),
(
" SELECT * FROM table # traceparent: traceparent \ttRaCeId \t: 00-trace_id-span_id-01: traceparent",
Some("00-trace_id-span_id-01"),
None,
),
(
"/* trcod=VCCMOYF2,svccod=VCCMOF2,jrnno=W557426527, reqseq=124748979092341,chanl=MB,userId=12094710GSOE */ SELECT * FROM table",
Some("W557426527"),
None,
),
];
let mut info = MysqlInfo::default();
let config = L7LogDynamicConfig::new(
Expand All @@ -1088,6 +1168,7 @@ mod tests {
vec![
TraceType::TraceParent,
TraceType::Customize("TraceID".to_owned()),
TraceType::Customize("jrnno".to_owned()),
],
vec![TraceType::TraceParent],
ExtraLogFields::default(),
Expand Down Expand Up @@ -1210,4 +1291,60 @@ mod tests {
);
}
}

#[test]
fn test_kv_extractor() {
let cases = vec![
("safwa: asfew saefa:weaiow dff=ea,dsas=sad , asda=2,: ,a23 =zsda fawa:1,ara :af::, 2e=c:g,",
vec![
("safwa", "asfew"),
("saefa", "weaiow"),
("dff", "ea"),
("dsas", "sad"),
("asda", "2"),
("a23", "zsda"),
("fawa", "1"),
("ara", "af"),
("2e", "c"),
("c", "g"),
]),
(
"traceparent: 00-trace_id-span_id-01",
vec![("traceparent", "00-trace_id-span_id-01")]
),
(
"traceparent: traceparent \t : 00-trace_id-span_id-01",
vec![
("traceparent", "traceparent"),
("traceparent", "00-trace_id-span_id-01"),
]
),
(
" traceparent: traceparent \ttRaCeId \t: 00-trace_id-span_id-01: traceparent",
vec![
("traceparent", "traceparent"),
("tRaCeId", "00-trace_id-span_id-01"),
("00-trace_id-span_id-01", "traceparent"),
]
),
(
" trcod=VCCMOYF2,svccod=VCCMOF2,jrnno=W557426527, reqseq=124748979092341,chanl=MB,userId=12094710GSOE",
vec![
("trcod", "VCCMOYF2"),
("svccod", "VCCMOF2"),
("jrnno", "W557426527"),
("reqseq", "124748979092341"),
("chanl", "MB"),
("userId", "12094710GSOE"),
]
),
];
for (input, output) in cases {
assert_eq!(
output,
KvExtractor::new(input).collect::<Vec<_>>(),
"failed in case {input}",
);
}
}
}

0 comments on commit 257c231

Please sign in to comment.