Skip to content

Commit

Permalink
lwc-events: fix sync for time series (#1646)
Browse files Browse the repository at this point in the history
Ensure that unchanged handlers are added to the set that
will get flushed based on the heartbeat.
  • Loading branch information
brharrington authored Apr 2, 2024
1 parent 44199c6 commit 47bd1af
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
subHandlers.put(sub, q -> handler)
flushableHandlers += handler
}
diff.unchanged.timeSeries.foreach { sub =>
val handlerMeta = subHandlers.get(sub)
if (handlerMeta != null)
flushableHandlers += handlerMeta._2
}
diff.removed.timeSeries.foreach(removeSubscription)

// Trace pass-through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.atlas.json.Json
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.ManualClock
import munit.FunSuite

import java.io.StringWriter
import scala.util.Using

class LwcEventClientSuite extends FunSuite {

import LwcEventSuite.*
import LwcEventClientSuite.*

private val clock = new ManualClock()
private val step = 5_000L
Expand Down Expand Up @@ -121,6 +127,41 @@ class LwcEventClientSuite extends FunSuite {
assert(output.result().isEmpty)
}

test("analytics, sync") {
val subs = Subscriptions.fromTypedList(
List(
Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
)
)
val output = List.newBuilder[String]
val client = TestLwcEventClient(subs, output.addOne, clock)
client.process(sampleLwcEvent)
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
assertEquals(output.result().size, 2)

// Sync expressions, same set
(2 until 10).foreach { i =>
output.clear()
client.sync(subs)
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 2)
}

// Sync expressions, subset
(10 until 20).foreach { i =>
output.clear()
client.sync(subs.copy(timeSeries = subs.timeSeries.tail))
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 1)
}
}

test("trace analytics, basic aggregate") {
val subs = Subscriptions.fromTypedList(
List(
Expand All @@ -136,3 +177,32 @@ class LwcEventClientSuite extends FunSuite {
assertEquals(vs.size, 1)
}
}

object LwcEventClientSuite {

case class TestLwcEventClient(
subscriptions: Subscriptions,
consumer: String => Unit,
clock: Clock
) extends AbstractLwcEventClient(clock) {

sync(subscriptions)

override def sync(subscriptions: Subscriptions): Unit = {
super.sync(subscriptions)
}

override def submit(id: String, event: LwcEvent): Unit = {
Using.resource(new StringWriter()) { w =>
Using.resource(Json.newJsonGenerator(w)) { gen =>
gen.writeStartObject()
gen.writeStringField("id", id)
gen.writeFieldName("event")
event.encode(gen)
gen.writeEndObject()
}
consumer(s"data: ${w.toString}")
}
}
}
}

0 comments on commit 47bd1af

Please sign in to comment.