Skip to content

Commit

Permalink
lwc-events: support value extraction for traces (#1648)
Browse files Browse the repository at this point in the history
Update trace time series to support extracting a particular
value from the event the same as event time series.
  • Loading branch information
brharrington authored Apr 3, 2024
1 parent 28feb66 commit f16a7f0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {

@volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty

@volatile private var traceHandlersTS: Map[Subscription, TraceQuery.SpanTimeSeries] = Map.empty
@volatile private var traceHandlersTS: Map[Subscription, TraceTimeSeries] = Map.empty

protected def sync(subscriptions: Subscriptions): Unit = {
val diff = Subscriptions.diff(currentSubs, subscriptions)
Expand All @@ -50,7 +50,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
// Pass-through events
diff.added.events.foreach { sub =>
val expr = ExprUtils.parseEventExpr(sub.expression)
val q = removeValueClause(expr.query)
val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
val handler = expr match {
case EventExpr.Raw(_) => EventHandler(sub, e => List(e))
case EventExpr.Table(_, cs) => EventHandler(sub, e => List(LwcEvent.Row(e, cs)))
Expand All @@ -64,7 +64,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
diff.added.timeSeries.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit)
val q = removeValueClause(expr.query)
val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
val handler = EventHandler(
sub,
event => {
Expand Down Expand Up @@ -94,7 +94,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val dataExpr = tq.expr.expr.dataExprs.head
val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, submit)
val q = removeValueClause(dataExpr.query)
val q = ExprUtils.toSpectatorQuery(removeValueClause(dataExpr.query))
val handler = EventHandler(
sub,
event => {
Expand All @@ -113,7 +113,9 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
}
diff.removed.traceTimeSeries.foreach(sub => subHandlers.remove(sub))
traceHandlersTS = subscriptions.traceTimeSeries.map { sub =>
sub -> ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val tts = TraceTimeSeries(tq.q, removeValueClause(tq.expr.expr.dataExprs.head.query))
sub -> tts
}.toMap

handlers = flushableHandlers.result()
Expand All @@ -127,13 +129,13 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
}
}

private def removeValueClause(query: Query): SpectatorQuery = {
private def removeValueClause(query: Query): Query = {
val q = query
.rewrite {
case kq: Query.KeyQuery if kq.k == "value" => Query.True
}
.asInstanceOf[Query]
ExprUtils.toSpectatorQuery(Query.simplify(q, ignore = true))
Query.simplify(q, ignore = true)
}

override def process(event: LwcEvent): Unit = {
Expand Down Expand Up @@ -162,8 +164,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
}
traceHandlersTS.foreachEntry { (sub, tq) =>
if (TraceMatcher.matches(tq.q, trace)) {
val f = tq.expr.expr.dataExprs.head.query
val filtered = trace.filter(event => ExprUtils.matches(f, event.tagValue))
val filtered = trace.filter(event => ExprUtils.matches(tq.f, event.tagValue))
if (filtered.nonEmpty) {
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null) {
Expand Down Expand Up @@ -193,4 +194,6 @@ object AbstractLwcEventClient {
}
}
}

private case class TraceTimeSeries(q: TraceQuery, f: Query)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.EventVocabulary
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,28 @@ class LwcEventClientSuite extends FunSuite {
val vs = output.result()
assertEquals(vs.size, 1)
}

test("trace analytics, basic aggregate extract value") {
val subs = Subscriptions.fromTypedList(
List(
Subscription(
"1",
step,
"app,www,:eq,value,duration,:eq,:span-time-series",
Subscriptions.TraceTimeSeries
)
)
)
val output = List.newBuilder[String]
val client = LwcEventClient(subs, output.addOne, clock)
client.processTrace(Seq(new TestSpan(sampleSpan)))
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
val vs = output.result()
assertEquals(vs.size, 1)
assert(vs.forall(_.contains(""""tags":{"value":"duration"}""")))
assert(vs.forall(_.contains(""""value":8.4""")))
}
}

object LwcEventClientSuite {
Expand Down

0 comments on commit f16a7f0

Please sign in to comment.