-
Notifications
You must be signed in to change notification settings - Fork 328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
+ Monix Task and BIO support and Cats IO improvements #879
base: master
Are you sure you want to change the base?
Changes from all commits
cb9bd97
5c67e86
9349835
b2c9a72
4196984
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package kamon.instrumentation.cats.io | ||
|
||
import cats.effect.{ExitCase, Sync} | ||
import cats.implicits._ | ||
import kamon.Kamon | ||
import kamon.tag.TagSet | ||
import kamon.trace.Span | ||
|
||
object Tracing { | ||
|
||
/** | ||
* Wraps the effect `fa` in a new span with the provided name and tags. The created span is marked as finished after | ||
* the effect is completed or cancelled. | ||
* | ||
* @param name the span name | ||
* @param tags the collection of tags to apply to the span | ||
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far | ||
* @param fa the effect to execute | ||
* @tparam F the effect type | ||
* @tparam A the value produced in the effect F | ||
* @return the same effect wrapped within a named span | ||
*/ | ||
def operationName[F[_] : Sync, A](name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true)(fa: F[A]): F[A] = { | ||
val F = implicitly[Sync[F]] | ||
buildSpan(name, tags).flatMap { span => | ||
val ctx = Kamon.currentContext() | ||
val scope = Kamon.storeContext(ctx.withEntry(Span.Key, span)) | ||
F.guaranteeCase(fa) { | ||
case ExitCase.Completed => F.delay { | ||
finishSpan(span, takeSamplingDecision) | ||
scope.close() | ||
} | ||
case ExitCase.Error(err) => F.delay { | ||
failSpan(span, err, takeSamplingDecision) | ||
scope.close() | ||
} | ||
case ExitCase.Canceled => F.delay { | ||
finishSpan(span.tag("cancel", value = true), takeSamplingDecision) | ||
scope.close() | ||
} | ||
} | ||
} | ||
} | ||
|
||
private def buildSpan[F[_]](name: String, tags: Map[String, Any])(implicit F: Sync[F]): F[Span] = | ||
F.delay( | ||
Kamon | ||
.serverSpanBuilder(name, "cats-effect") | ||
.asChildOf(Kamon.currentSpan()) | ||
.tagMetrics(TagSet.from(tags)) | ||
.start() | ||
) | ||
|
||
private def finishSpan(span: Span, takeSamplingDecision: Boolean): Span = { | ||
if (takeSamplingDecision) span.takeSamplingDecision() | ||
span.finish() | ||
span | ||
} | ||
|
||
private def failSpan(span: Span, err: Throwable, takeSamplingDecision: Boolean): Span = { | ||
if (err.getMessage == null) span.fail(err) | ||
else span.fail(err.getMessage, err) | ||
|
||
finishSpan(span, takeSamplingDecision) | ||
} | ||
|
||
object Implicits { | ||
|
||
final class KamonOps[F[_] : Sync, A](fa: F[A]) { | ||
/** | ||
* Wraps the effect in a new span with the provided name and tags. The created span is marked as finished after | ||
* the effect is completed or cancelled. | ||
* | ||
* @param name the span name | ||
* @param tags the collection of tags to apply to the span | ||
* @param takeSamplingDecision if true, it ensures that a Sampling Decision is taken in case none has been taken so far | ||
* @return the same effect wrapped within a named span | ||
*/ | ||
def named(name: String, tags: Map[String, Any] = Map.empty, takeSamplingDecision: Boolean = true): F[A] = | ||
operationName(name, tags, takeSamplingDecision)(fa) | ||
} | ||
|
||
implicit final def kamonTracingSyntax[F[_] : Sync, A](fa: F[A]): KamonOps[F, A] = new KamonOps(fa) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
package kamon.instrumentation.cats.io | ||
|
||
import java.util.concurrent.Executors | ||
|
||
import cats.effect.{Async, ContextShift, Effect, LiftIO, Timer} | ||
import cats.implicits._ | ||
import kamon.Kamon | ||
import kamon.context.Context | ||
import kamon.instrumentation.cats.io.Tracing.Implicits._ | ||
import kamon.tag.Lookups.plain | ||
import kamon.testkit.TestSpanReporter | ||
import kamon.trace.Span | ||
import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures} | ||
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inspectors, Matchers, WordSpec} | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.ExecutionContext.global | ||
import scala.concurrent.duration.Duration | ||
|
||
// NOTE: We have this test just to ensure that the Context propagation is working, but starting with Kamon 2.0 there | ||
// is no need to have explicit Runnable/Callable instrumentation because the instrumentation brought by the | ||
// kamon-executors module should take care of all non-JDK Runnable/Callable implementations. | ||
abstract class AbstractCatsEffectInstrumentationSpec[F[_]: LiftIO](effectName: String)(implicit F: Effect[F]) | ||
extends WordSpec | ||
with ScalaFutures | ||
with Matchers | ||
with PatienceConfiguration | ||
with TestSpanReporter | ||
with Inspectors | ||
with Eventually | ||
with BeforeAndAfterAll | ||
with BeforeAndAfter { | ||
|
||
implicit def contextShift: ContextShift[F] | ||
|
||
implicit def timer: Timer[F] | ||
|
||
private val customExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd go with a fixed ThreadPool to ensure the test are running with multiple threads |
||
|
||
override protected def afterAll(): Unit = { | ||
customExecutionContext.shutdown() | ||
shutdownTestSpanReporter() | ||
super.afterAll() | ||
} | ||
|
||
before { | ||
Kamon.storeContext(Context.Empty) | ||
testSpanReporter().clear() | ||
} | ||
|
||
after { | ||
Kamon.storeContext(Context.Empty) | ||
testSpanReporter().clear() | ||
} | ||
|
||
s"A Cats Effect $effectName" should { | ||
"capture the active span available when created" which { | ||
"must be available across asynchronous boundaries" in { | ||
val context = Context.of("key", "value") | ||
|
||
val contextTagF: F[String] = | ||
for { | ||
scope <- F.delay(Kamon.storeContext(context)) | ||
_ <- Async.shift[F](customExecutionContext) | ||
len <- F.delay("Hello Kamon!").map(_.length) | ||
_ <- F.pure(len.toString) | ||
_ <- timer.sleep(Duration.Zero) | ||
_ <- Async.shift[F](global) | ||
tagValue <- F.delay(Kamon.currentContext().getTag(plain("key"))) | ||
_ <- F.delay(scope.close()) | ||
} yield tagValue | ||
|
||
val contextTag = F.toIO(contextTagF).unsafeRunSync() | ||
contextTag shouldEqual "value" | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should test cancellation and errors? Would be good to run multiple IOs in parallel (with shift inserted in between) to ensure everything works. |
||
|
||
"nest spans correctly" in { | ||
// the test expects the following span tree, but for some reason, it doesn't work: | ||
// - root | ||
// - 1 (value = 1) | ||
// - 2 (value = 2) | ||
// - 3 (value = 3) | ||
val rootSpan = for { | ||
rootAndScope <- F.delay { | ||
val span = Kamon.spanBuilder("root").start() | ||
val ctx = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key, span)) | ||
(span, ctx) | ||
} | ||
(root, scope) = rootAndScope | ||
_ <- (1L to 3L) | ||
.toList | ||
.traverse { idx => | ||
F.delay(idx).named(idx.toString, Map("value" -> idx)) | ||
} | ||
_ <- F.delay { | ||
root.finish() | ||
scope.close() | ||
} | ||
} yield root | ||
|
||
val root = F.toIO(rootSpan).unsafeRunSync() | ||
|
||
eventually { | ||
testSpanReporter().spans().size shouldEqual 4 | ||
testSpanReporter().spans().map(_.operationName).toSet shouldEqual Set("root", "1", "2", "3") | ||
} | ||
|
||
val childrenSpans = testSpanReporter().spans().filter(_.id.string != root.id.string) | ||
forAll(childrenSpans) { span => | ||
span.parentId.string shouldEqual root.id.string | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package kamon.instrumentation.cats.io | ||
|
||
import cats.effect.{ContextShift, IO, Timer} | ||
|
||
import scala.concurrent.ExecutionContext.global | ||
|
||
class CatsIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[IO]("IO") { | ||
|
||
override implicit def contextShift: ContextShift[IO] = IO.contextShift(global) | ||
|
||
override implicit def timer: Timer[IO] = IO.timer(global) | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
############################################# | ||
# Kamon Monix Reference Configuration # | ||
############################################# | ||
|
||
kanela.modules { | ||
executor-service { | ||
within += "monix.eval..*" | ||
within += "monix.execution..*" | ||
within += "monix.bio..*" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package kamon.instrumentation.monix | ||
|
||
import cats.effect.{ContextShift, Timer} | ||
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec | ||
import monix.bio.{IO, Task} | ||
import monix.execution.Scheduler.Implicits.global | ||
|
||
class MonixBIOInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Bifunctor IO") { | ||
|
||
override implicit def contextShift: ContextShift[Task] = IO.contextShift | ||
|
||
override implicit def timer: Timer[Task] = IO.timer | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package kamon.instrumentation.monix | ||
|
||
import cats.effect.{ContextShift, Timer} | ||
import kamon.instrumentation.cats.io.AbstractCatsEffectInstrumentationSpec | ||
import monix.eval.Task | ||
import monix.execution.Scheduler.Implicits.global | ||
|
||
class MonixInstrumentationSpec extends AbstractCatsEffectInstrumentationSpec[Task]("Monix Task") { | ||
|
||
override implicit def contextShift: ContextShift[Task] = Task.contextShift | ||
|
||
override implicit def timer: Timer[Task] = Task.timer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take
(implicit F: Sync[F])
like buildSpan?