Skip to content

Commit

Permalink
eval: add event message type (#1629)
Browse files Browse the repository at this point in the history
Update messages parsing to handle simple pass-through
events. They get logged out just like diagnostic
messages.
  • Loading branch information
brharrington authored Mar 26, 2024
1 parent 36a8e80 commit 3709900
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport

/**
* Message type use for events to forward to a consumer.
*/
case class EventMessage(payload: JsonNode) extends JsonSupport {

override def encode(gen: JsonGenerator): Unit = {
gen.writeStartObject()
gen.writeStringField("type", "event")
gen.writeFieldName("payload")
Json.encode(gen, payload)
gen.writeEndObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.databind.JsonNode
import com.netflix.atlas.json.JsonSupport

/**
* Raw event data to pass through to the consumer.
*
* @param id
* Identifies the expression that resulted in this datapoint being generated.
* @param payload
* Raw event payload.
*/
case class LwcEvent(id: String, payload: JsonNode) extends JsonSupport {
val `type`: String = "event"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.netflix.atlas.eval.model
import org.apache.pekko.util.ByteString
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import com.netflix.atlas.core.util.SmallHashMap
import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.atlas.json.Json
Expand All @@ -32,6 +34,9 @@ import java.io.ByteArrayOutputStream
*/
object LwcMessages {

// For reading arbitrary json structures for events
private val mapper = Json.newMapper

/**
* Parse the message string into an internal model object based on the type.
*/
Expand Down Expand Up @@ -69,6 +74,9 @@ object LwcMessages {
var tags: Map[String, String] = Map.empty
var value: Double = Double.NaN

// LwcEvent
var payload: JsonNode = NullNode.instance

// LwcDiagnosticMessage
// - id
// - message: DiagnosticMessage
Expand Down Expand Up @@ -96,6 +104,8 @@ object LwcMessages {
case "tags" => tags = parseTags(parser)
case "value" => value = nextDouble(parser)

case "payload" => payload = nextTree(parser)

case "message" =>
val t = parser.nextToken()
if (t == JsonToken.VALUE_STRING)
Expand All @@ -110,6 +120,7 @@ object LwcMessages {
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, metrics)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
Expand All @@ -119,6 +130,11 @@ object LwcMessages {
}
}

private def nextTree(parser: JsonParser): JsonNode = {
parser.nextToken()
mapper.readTree[JsonNode](parser)
}

private[model] def parseDataExprs(parser: JsonParser): List[LwcDataExpr] = {
val builder = List.newBuilder[LwcDataExpr]
foreachItem(parser) {
Expand Down Expand Up @@ -163,6 +179,7 @@ object LwcMessages {
private val LwcDiagnostic = 3
private val Diagnostic = 4
private val Heartbeat = 5
private val Event = 6

/**
* Encode messages using Jackson's smile format into a ByteString.
Expand Down Expand Up @@ -224,8 +241,12 @@ object LwcMessages {
gen.writeNumber(Heartbeat)
gen.writeNumber(msg.timestamp)
gen.writeNumber(msg.step)
case _ =>
throw new MatchError("foo")
case msg: LwcEvent =>
gen.writeNumber(Event)
gen.writeString(msg.id)
mapper.writeTree(gen, msg.payload)
case msg =>
throw new MatchError(s"$msg")
}
gen.writeEndArray()
} finally {
Expand Down Expand Up @@ -286,6 +307,9 @@ object LwcMessages {
val timestamp = parser.nextLongValue(-1L)
val step = parser.nextLongValue(-1L)
builder += LwcHeartbeat(timestamp, step)
case Event =>
val id = parser.nextTextValue()
builder += LwcEvent(id, nextTree(parser))
case v =>
throw new MatchError(s"invalid type id: $v")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import org.apache.pekko.stream.stage.GraphStageLogic
import org.apache.pekko.stream.stage.InHandler
import org.apache.pekko.stream.stage.OutHandler
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.EventMessage
import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcSubscription

Expand Down Expand Up @@ -54,6 +56,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
grab(in).foreach {
case sb: LwcSubscription => updateState(sb)
case dp: LwcDatapoint => builder ++= pushDatapoint(dp)
case ev: LwcEvent => pushEvent(ev)
case dg: LwcDiagnosticMessage => pushDiagnosticMessage(dg)
case hb: LwcHeartbeat => builder += pushHeartbeat(hb)
case _ =>
Expand Down Expand Up @@ -85,6 +88,13 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}
}

private def pushEvent(event: LwcEvent): Unit = {
state.get(event.id) match {
case Some(sub) => context.log(sub.expr, EventMessage(event.payload))
case None => unknown.increment()
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.util.Streams
import com.netflix.atlas.eval.stream.Evaluator.DataSource
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.AccessLogger
import com.netflix.atlas.pekko.DiagnosticMessage
import com.netflix.atlas.pekko.StreamOps
Expand Down Expand Up @@ -233,7 +234,7 @@ private[stream] class StreamContext(
/**
* Send a diagnostic message to all data sources that use a particular data expression.
*/
def log(expr: DataExpr, msg: DiagnosticMessage): Unit = {
def log(expr: DataExpr, msg: JsonSupport): Unit = {
dataExprMap.get(expr).foreach { ds =>
ds.foreach(s => dsLogger(s, msg))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.databind.JsonNode
import org.apache.pekko.util.ByteString
import com.netflix.atlas.core.util.Streams
import com.netflix.atlas.json.Json
Expand Down Expand Up @@ -63,6 +64,13 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected)
}

test("event") {
val payload = Json.decode[JsonNode]("""{"foo":"bar"}""")
val expected = LwcEvent("123", payload)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("diagnostic message") {
val expected = DiagnosticMessage.error("something bad happened")
val actual = LwcMessages.parse(Json.encode(expected))
Expand Down Expand Up @@ -130,6 +138,16 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected.toList)
}

test("batch: event") {
val expected = (0 until 10).map { i =>
val tags = Map("name" -> "cpu", "node" -> s"i-$i")
val payload = Json.decode[JsonNode](Json.encode(tags))
LwcEvent(s"$i", payload)
}
val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected))
assertEquals(actual, expected.toList)
}

test("batch: lwc diagnostic") {
val expected = (0 until 10).map { i =>
LwcDiagnosticMessage(s"$i", DiagnosticMessage.error("foo"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.netflix.atlas.chart.util.SrcPath
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
import com.netflix.atlas.eval.model.LwcExpression
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcMessages
Expand Down Expand Up @@ -117,6 +118,7 @@ class EvaluatorSuite extends FunSuite {
case m: LwcExpression => Option(m.`type`)
case m: LwcSubscription => Option(m.`type`)
case m: LwcDatapoint => Option(m.`type`)
case m: LwcEvent => Option(m.`type`)
case m: LwcDiagnosticMessage => Option(m.`type`)
case m: LwcHeartbeat => Option(m.`type`)
case _ => None
Expand Down

0 comments on commit 3709900

Please sign in to comment.