Skip to content

Commit

Permalink
eval: preserve presentation meta for trace graph (#1638)
Browse files Browse the repository at this point in the history
Map back to normal graph URI and generate the config to
support consistent presentation metadata behavior.
  • Loading branch information
brharrington authored Mar 29, 2024
1 parent cb9421b commit be64e22
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ExprInterpreter(config: Config) {

private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite"))

/** Evaluate a normal Atlas Graph URI and produce and graph config. */
def eval(uri: Uri): GraphConfig = {
val graphCfg = grapher.toGraphConfig(uri)

Expand All @@ -71,6 +72,26 @@ class ExprInterpreter(config: Config) {
graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
}

/**
* Evaluate a time series URI. This could be an normal Atlas Graph URI or a trace
* time series URI. If a non-time series URI is passed in the result will be None.
*/
def evalTimeSeries(uri: Uri): Option[GraphConfig] = {
determineExprType(uri) match {
case ExprType.TIME_SERIES => Some(eval(uri))
case ExprType.TRACE_TIME_SERIES => Some(eval(toGraphUri(uri)))
case _ => None
}
}

/** Convert a trace time series URI to a normal graph URI for generating the config. */
private def toGraphUri(uri: Uri): Uri = {
val ts = evalTraceTimeSeries(uri)
val newExpr = ts.map(_.expr).mkString(",")
val newQuery = uri.query().filterNot(_._1 == "q").prepended("q" -> newExpr)
uri.withQuery(Uri.Query(newQuery*))
}

/**
* Check that data expressions are supported. The streaming path doesn't support
* time shifts, filters, and integral. The filters and integral are excluded because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,23 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)

// Compute the new set of expressions
recipients = sources
.filter { s =>
exprInterpreter.determineExprType(Uri(s.uri)).isTimeSeriesType
}
.flatMap { s =>
try {
val graphCfg = exprInterpreter.eval(Uri(s.uri))
val exprs = graphCfg.exprs
// Reuse the previous evaluated expression if available. States for the stateful
// expressions are maintained in an IdentityHashMap so if the instances change
// the state will be reset.
exprs.map { e =>
val paletteName =
if (graphCfg.flags.presentationMetadataEnabled) {
val axis = e.axis.getOrElse(0)
Some(graphCfg.flags.axisPalette(graphCfg.settings, axis))
} else {
None
}
previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName)
exprInterpreter.evalTimeSeries(Uri(s.uri)).toList.flatMap { graphCfg =>
val exprs = graphCfg.exprs
// Reuse the previous evaluated expression if available. States for the stateful
// expressions are maintained in an IdentityHashMap so if the instances change
// the state will be reset.
exprs.map { e =>
val paletteName =
if (graphCfg.flags.presentationMetadataEnabled) {
val axis = e.axis.getOrElse(0)
Some(graphCfg.flags.axisPalette(graphCfg.settings, axis))
} else {
None
}
previous.getOrElse(e, e) -> ExprInfo(s.id, paletteName)
}
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,6 @@ class EvaluatorSuite extends FunSuite {
"synthetic://test/traces/graph?q=name,cpu,:eq,nf.app,foo,:eq,:and&step=1ms&numStepIntervals=5"
val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq)
val result = Await.result(future, scala.concurrent.duration.Duration.Inf)
assertEquals(result.size, 5)
assertEquals(result.size, 10)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,60 @@ class FinalExprEvalSuite extends FunSuite {
assertEquals(ts.styleMetadata.get.color, Color.RED)
assertEquals(ts.styleMetadata.get.lineStyle, LineStyle.STACK)
}

test("trace time series: aggregate with single datapoint per group") {
val expr = DataExpr.Sum(Query.Equal("name", "rps"))
val tags = Map("name" -> "rps")
val input = List(
sources(ds("a", s"http://atlas/traces/graph?q=app,foo,:eq,$expr,:span-time-series")),
group(0),
group(1, AggrDatapoint(0, step, expr, "i-1", tags, 42.0)),
group(2, AggrDatapoint(0, step, expr, "i-1", tags, 43.0)),
group(3, AggrDatapoint(0, step, expr, "i-1", tags, 44.0))
)

val output = run(input)

val timeseries = output.filter(isTimeSeries)
assertEquals(timeseries.size, 4)
val expectedTimeseries = List(Double.NaN, 42.0, 43.0, 44.0)
timeseries.zip(expectedTimeseries).foreach {
case (env, expectedValue) =>
assertEquals(env.id, "a")
val ts = env.message.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}

val dataRateMsgs = output.filter(isEvalDataRate).filter(_.id == "a")
assert(dataRateMsgs.size == 3)
val expectedSizes = Array(
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
)
)
dataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * (i + 1),
60000,
expectedSizes(i)(0),
expectedSizes(i)(1),
expectedSizes(i)(2)
)
})
}
}

0 comments on commit be64e22

Please sign in to comment.