diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EventMessage.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EventMessage.scala new file mode 100644 index 000000000..dcaf536d3 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EventMessage.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.databind.JsonNode +import com.netflix.atlas.json.Json +import com.netflix.atlas.json.JsonSupport + +/** + * Message type use for events to forward to a consumer. + */ +case class EventMessage(payload: JsonNode) extends JsonSupport { + + override def encode(gen: JsonGenerator): Unit = { + gen.writeStartObject() + gen.writeStringField("type", "event") + gen.writeFieldName("payload") + Json.encode(gen, payload) + gen.writeEndObject() + } +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcEvent.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcEvent.scala new file mode 100644 index 000000000..134da8cdf --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcEvent.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.fasterxml.jackson.databind.JsonNode +import com.netflix.atlas.json.JsonSupport + +/** + * Raw event data to pass through to the consumer. + * + * @param id + * Identifies the expression that resulted in this datapoint being generated. + * @param payload + * Raw event payload. + */ +case class LwcEvent(id: String, payload: JsonNode) extends JsonSupport { + val `type`: String = "event" +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala index b2b6aeb6d..72e7fca9e 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala @@ -18,6 +18,8 @@ package com.netflix.atlas.eval.model import org.apache.pekko.util.ByteString import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.NullNode import com.netflix.atlas.core.util.SmallHashMap import com.netflix.atlas.core.util.SortedTagMap import com.netflix.atlas.json.Json @@ -32,6 +34,9 @@ import java.io.ByteArrayOutputStream */ object LwcMessages { + // For reading arbitrary json structures for events + private val mapper = Json.newMapper + /** * Parse the message string into an internal model object based on the type. */ @@ -69,6 +74,9 @@ object LwcMessages { var tags: Map[String, String] = Map.empty var value: Double = Double.NaN + // LwcEvent + var payload: JsonNode = NullNode.instance + // LwcDiagnosticMessage // - id // - message: DiagnosticMessage @@ -96,6 +104,8 @@ object LwcMessages { case "tags" => tags = parseTags(parser) case "value" => value = nextDouble(parser) + case "payload" => payload = nextTree(parser) + case "message" => val t = parser.nextToken() if (t == JsonToken.VALUE_STRING) @@ -110,6 +120,7 @@ object LwcMessages { case "expression" => LwcExpression(expression, exprType, step) case "subscription" => LwcSubscription(expression, metrics) case "datapoint" => LwcDatapoint(timestamp, id, tags, value) + case "event" => LwcEvent(id, payload) case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage) case "heartbeat" => LwcHeartbeat(timestamp, step) case _ => DiagnosticMessage(typeDesc, message, None) @@ -119,6 +130,11 @@ object LwcMessages { } } + private def nextTree(parser: JsonParser): JsonNode = { + parser.nextToken() + mapper.readTree[JsonNode](parser) + } + private[model] def parseDataExprs(parser: JsonParser): List[LwcDataExpr] = { val builder = List.newBuilder[LwcDataExpr] foreachItem(parser) { @@ -163,6 +179,7 @@ object LwcMessages { private val LwcDiagnostic = 3 private val Diagnostic = 4 private val Heartbeat = 5 + private val Event = 6 /** * Encode messages using Jackson's smile format into a ByteString. @@ -224,8 +241,12 @@ object LwcMessages { gen.writeNumber(Heartbeat) gen.writeNumber(msg.timestamp) gen.writeNumber(msg.step) - case _ => - throw new MatchError("foo") + case msg: LwcEvent => + gen.writeNumber(Event) + gen.writeString(msg.id) + mapper.writeTree(gen, msg.payload) + case msg => + throw new MatchError(s"$msg") } gen.writeEndArray() } finally { @@ -286,6 +307,9 @@ object LwcMessages { val timestamp = parser.nextLongValue(-1L) val step = parser.nextLongValue(-1L) builder += LwcHeartbeat(timestamp, step) + case Event => + val id = parser.nextTextValue() + builder += LwcEvent(id, nextTree(parser)) case v => throw new MatchError(s"invalid type id: $v") } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 071ffc0a4..594aa5c17 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -24,9 +24,11 @@ import org.apache.pekko.stream.stage.GraphStageLogic import org.apache.pekko.stream.stage.InHandler import org.apache.pekko.stream.stage.OutHandler import com.netflix.atlas.eval.model.AggrDatapoint +import com.netflix.atlas.eval.model.EventMessage import com.netflix.atlas.eval.model.LwcDataExpr import com.netflix.atlas.eval.model.LwcDatapoint import com.netflix.atlas.eval.model.LwcDiagnosticMessage +import com.netflix.atlas.eval.model.LwcEvent import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcSubscription @@ -54,6 +56,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) grab(in).foreach { case sb: LwcSubscription => updateState(sb) case dp: LwcDatapoint => builder ++= pushDatapoint(dp) + case ev: LwcEvent => pushEvent(ev) case dg: LwcDiagnosticMessage => pushDiagnosticMessage(dg) case hb: LwcHeartbeat => builder += pushHeartbeat(hb) case _ => @@ -85,6 +88,13 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) } } + private def pushEvent(event: LwcEvent): Unit = { + state.get(event.id) match { + case Some(sub) => context.log(sub.expr, EventMessage(event.payload)) + case None => unknown.increment() + } + } + private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = { state.get(diagMsg.id) match { case Some(sub) => context.log(sub.expr, diagMsg.message) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala index c23f9a7e3..0c079a075 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala @@ -33,6 +33,7 @@ import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.util.Streams import com.netflix.atlas.eval.stream.Evaluator.DataSource import com.netflix.atlas.eval.stream.Evaluator.DataSources +import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.AccessLogger import com.netflix.atlas.pekko.DiagnosticMessage import com.netflix.atlas.pekko.StreamOps @@ -233,7 +234,7 @@ private[stream] class StreamContext( /** * Send a diagnostic message to all data sources that use a particular data expression. */ - def log(expr: DataExpr, msg: DiagnosticMessage): Unit = { + def log(expr: DataExpr, msg: JsonSupport): Unit = { dataExprMap.get(expr).foreach { ds => ds.foreach(s => dsLogger(s, msg)) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala index 7ecbd9193..5026352cf 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala @@ -15,6 +15,7 @@ */ package com.netflix.atlas.eval.model +import com.fasterxml.jackson.databind.JsonNode import org.apache.pekko.util.ByteString import com.netflix.atlas.core.util.Streams import com.netflix.atlas.json.Json @@ -63,6 +64,13 @@ class LwcMessagesSuite extends FunSuite { assertEquals(actual, expected) } + test("event") { + val payload = Json.decode[JsonNode]("""{"foo":"bar"}""") + val expected = LwcEvent("123", payload) + val actual = LwcMessages.parse(Json.encode(expected)) + assertEquals(actual, expected) + } + test("diagnostic message") { val expected = DiagnosticMessage.error("something bad happened") val actual = LwcMessages.parse(Json.encode(expected)) @@ -130,6 +138,16 @@ class LwcMessagesSuite extends FunSuite { assertEquals(actual, expected.toList) } + test("batch: event") { + val expected = (0 until 10).map { i => + val tags = Map("name" -> "cpu", "node" -> s"i-$i") + val payload = Json.decode[JsonNode](Json.encode(tags)) + LwcEvent(s"$i", payload) + } + val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected)) + assertEquals(actual, expected.toList) + } + test("batch: lwc diagnostic") { val expected = (0 until 10).map { i => LwcDiagnosticMessage(s"$i", DiagnosticMessage.error("foo")) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index 2525f80f4..aff00075e 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -29,6 +29,7 @@ import com.netflix.atlas.chart.util.SrcPath import com.netflix.atlas.eval.model.ArrayData import com.netflix.atlas.eval.model.LwcDatapoint import com.netflix.atlas.eval.model.LwcDiagnosticMessage +import com.netflix.atlas.eval.model.LwcEvent import com.netflix.atlas.eval.model.LwcExpression import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcMessages @@ -117,6 +118,7 @@ class EvaluatorSuite extends FunSuite { case m: LwcExpression => Option(m.`type`) case m: LwcSubscription => Option(m.`type`) case m: LwcDatapoint => Option(m.`type`) + case m: LwcEvent => Option(m.`type`) case m: LwcDiagnosticMessage => Option(m.`type`) case m: LwcHeartbeat => Option(m.`type`) case _ => None