From 5621c97bf38dbc862a3e8506a61d518e0525abda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20P=C3=A9rez=20Pacheco?= Date: Fri, 7 Jun 2024 10:39:07 +0200 Subject: [PATCH] OpenTelemetry improved (#756) * OpenTelemetry improved * JC comment addressed * Parameters in customSpan for Metrics * Metrics in private function --- .../functional/xef/llm/MetricManagement.kt | 55 +++-- .../xef/llm/assistants/AssistantThread.kt | 109 ++++++--- .../functional/xef/metrics/LogsMetric.kt | 61 ++--- .../xebia/functional/xef/metrics/Metric.kt | 59 ++--- .../xebia/functional/xef/assistants/DSL.kt | 5 +- .../xef/conversation/conversations/Animal.kt | 2 +- .../OpenTelemetryAssistantState.kt | 221 +++++++----------- .../xef/opentelemetry/OpenTelemetryMetric.kt | 48 ++-- .../xef/opentelemetry/OpenTelemetryState.kt | 11 +- 9 files changed, 271 insertions(+), 300 deletions(-) diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt index c98eb8ae6..91d8e948a 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt @@ -1,6 +1,7 @@ package com.xebia.functional.xef.llm import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse +import com.xebia.functional.openai.generated.model.MessageObject import com.xebia.functional.openai.generated.model.RunObject import com.xebia.functional.openai.generated.model.RunStepObject import com.xebia.functional.xef.conversation.Conversation @@ -60,34 +61,48 @@ suspend fun Prompt.addMetrics(conversation: Conversation) { conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name }) } -suspend fun RunObject.addMetrics(metric: Metric): RunObject { - metric.assistantCreateRun(this) +suspend fun RunObject.addMetrics(metric: Metric, source: String): RunObject { + metric.assistantCreateRun(this, source) return this } -suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject { - metric.assistantCreateRunStep(this) +suspend fun RunStepObject.addMetrics(metric: Metric, source: String): RunStepObject { + metric.assistantCreateRunStep(this, source) + return this +} + +suspend fun MessageObject.addMetrics(metric: Metric, source: String): MessageObject { + metric.assistantCreatedMessage(this, source) return this } suspend fun RunDelta.addMetrics(metric: Metric): RunDelta { when (this) { - is RunDelta.RunCancelled -> run.addMetrics(metric) - is RunDelta.RunCancelling -> run.addMetrics(metric) - is RunDelta.RunCompleted -> run.addMetrics(metric) - is RunDelta.RunCreated -> run.addMetrics(metric) - is RunDelta.RunExpired -> run.addMetrics(metric) - is RunDelta.RunFailed -> run.addMetrics(metric) - is RunDelta.RunInProgress -> run.addMetrics(metric) - is RunDelta.RunQueued -> run.addMetrics(metric) - is RunDelta.RunRequiresAction -> run.addMetrics(metric) - is RunDelta.RunStepCancelled -> runStep.addMetrics(metric) - is RunDelta.RunStepCompleted -> runStep.addMetrics(metric) - is RunDelta.RunStepCreated -> runStep.addMetrics(metric) - is RunDelta.RunStepExpired -> runStep.addMetrics(metric) - is RunDelta.RunStepFailed -> runStep.addMetrics(metric) - is RunDelta.RunStepInProgress -> runStep.addMetrics(metric) - else -> {} // ignore other cases + is RunDelta.RunCancelled -> run.addMetrics(metric, "RunCancelled") + is RunDelta.RunCancelling -> run.addMetrics(metric, "RunCancelling") + is RunDelta.RunCompleted -> run.addMetrics(metric, "RunCompleted") + is RunDelta.RunCreated -> run.addMetrics(metric, "RunCreated") + is RunDelta.RunExpired -> run.addMetrics(metric, "RunExpired") + is RunDelta.RunFailed -> run.addMetrics(metric, "RunFailed") + is RunDelta.RunInProgress -> run.addMetrics(metric, "RunInProgress") + is RunDelta.RunQueued -> run.addMetrics(metric, "RunQueued") + is RunDelta.RunRequiresAction -> run.addMetrics(metric, "RunRequiresAction") + is RunDelta.RunStepCancelled -> runStep.addMetrics(metric, "RunStepCancelled") + is RunDelta.RunStepCompleted -> runStep.addMetrics(metric, "RunStepCompleted") + is RunDelta.RunStepCreated -> runStep.addMetrics(metric, "RunStepCreated") + is RunDelta.RunStepExpired -> runStep.addMetrics(metric, "RunStepExpired") + is RunDelta.RunStepFailed -> runStep.addMetrics(metric, "RunStepFailed") + is RunDelta.RunStepInProgress -> runStep.addMetrics(metric, "RunStepInProgress") + is RunDelta.MessageCreated -> message.addMetrics(metric, "MessageCreated") + is RunDelta.MessageIncomplete -> message.addMetrics(metric, "MessageIncomplete") + is RunDelta.MessageCompleted -> message.addMetrics(metric, "MessageCompleted") + is RunDelta.MessageInProgress -> message.addMetrics(metric, "MessageInProgress") + is RunDelta.MessageDelta, + is RunDelta.RunIncomplete, + is RunDelta.RunStepDelta, + is RunDelta.RunSubmitToolOutputs, + is RunDelta.ThreadCreated, + is RunDelta.Unknown -> {} } return this } diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt index 37b442da9..934a1b790 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt @@ -76,7 +76,7 @@ class AssistantThread( createRun(CreateRunRequest(assistantId = assistant.assistantId)) suspend fun createRun(request: CreateRunRequest): RunObject = - api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric) + api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric, "RunCreated") fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow = flow { api @@ -122,39 +122,42 @@ class AssistantThread( ) } ) - val run = - metric.assistantToolOutputsRun(event.run.id) { - api - .submitToolOuputsToRunStream( - threadId = threadId, - runId = event.run.id, - submitToolOutputsRunRequest = toolOutputsRequest, - configure = ::defaultConfig - ) - .collect { - val delta = RunDelta.fromServerSentEvent(it) - if (delta is RunDelta.RunStepCompleted) { - flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest)) - } - flowCollector.emit(delta) - } - val run = getRun(event.run.id) - val finalEvent = - when (run.status) { - RunObject.Status.queued -> RunDelta.RunQueued(run) - RunObject.Status.in_progress -> RunDelta.RunInProgress(run) - RunObject.Status.requires_action -> RunDelta.RunRequiresAction(run) - RunObject.Status.cancelling -> RunDelta.RunCancelling(run) - RunObject.Status.cancelled -> RunDelta.RunCancelled(run) - RunObject.Status.failed -> RunDelta.RunFailed(run) - RunObject.Status.completed -> RunDelta.RunCompleted(run) - RunObject.Status.expired -> RunDelta.RunExpired(run) - RunObject.Status.incomplete -> RunDelta.RunIncomplete(run) - } - flowCollector.emit(finalEvent) - run + + api + .submitToolOuputsToRunStream( + threadId = threadId, + runId = event.run.id, + submitToolOutputsRunRequest = toolOutputsRequest, + configure = ::defaultConfig + ) + .collect { + val delta = RunDelta.fromServerSentEvent(it) + + delta.launchMetricsIfNecessary() + + if (delta is RunDelta.RunStepCompleted) { + flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest)) + } + flowCollector.emit(delta) } + val run = getRun(event.run.id) + val finalEvent = + when (run.status) { + RunObject.Status.queued -> Pair(RunDelta.RunQueued(run), "RunQueued") + RunObject.Status.in_progress -> Pair(RunDelta.RunInProgress(run), "RunInProgress") + RunObject.Status.requires_action -> + Pair(RunDelta.RunRequiresAction(run), "RunRequiresAction") + RunObject.Status.cancelling -> Pair(RunDelta.RunCancelling(run), "RunCancelling") + RunObject.Status.cancelled -> Pair(RunDelta.RunCancelled(run), "RunCancelled") + RunObject.Status.failed -> Pair(RunDelta.RunFailed(run), "RunFailed") + RunObject.Status.completed -> Pair(RunDelta.RunCompleted(run), "RunCompleted") + RunObject.Status.expired -> Pair(RunDelta.RunExpired(run), "RunExpired") + RunObject.Status.incomplete -> Pair(RunDelta.RunIncomplete(run), "RunIncomplete") + } + flowCollector.emit(finalEvent.first) + metric.assistantCreateRun(run, finalEvent.second) + if (run.status == RunObject.Status.requires_action) { takeRequiredAction( depth + 1, @@ -210,6 +213,48 @@ class AssistantThread( is RunStepObjectStepDetails.CaseRunStepDetailsToolCallsObject -> step.value.toolCalls } + private suspend fun RunDelta.launchMetricsIfNecessary() { + launchRunMetricsIfNecessary() + launchRunStepsMetricsIfNecessary() + launchMessageMetricsIfNecessary() + } + + private suspend fun RunDelta.launchRunMetricsIfNecessary() { + when (this) { + is RunDelta.RunCreated -> Pair(run, "RunCreated") + is RunDelta.RunQueued -> Pair(run, "RunQueued") + is RunDelta.RunFailed -> Pair(run, "RunFailed") + is RunDelta.RunCancelled -> Pair(run, "RunCancelled") + is RunDelta.RunCancelling -> Pair(run, "RunCancelling") + is RunDelta.RunExpired -> Pair(run, "RunExpired") + is RunDelta.RunInProgress -> Pair(run, "RunInProgress") + is RunDelta.RunIncomplete -> Pair(run, "RunIncomplete") + else -> null + }?.let { metric.assistantCreateRun(it.first, it.second) } + } + + private suspend fun RunDelta.launchRunStepsMetricsIfNecessary() { + when (this) { + is RunDelta.RunStepCreated -> Pair(runStep, "RunStepCreated") + is RunDelta.RunStepInProgress -> Pair(runStep, "RunStepInProgress") + is RunDelta.RunStepCompleted -> Pair(runStep, "RunStepCompleted") + is RunDelta.RunStepFailed -> Pair(runStep, "RunStepFailed") + is RunDelta.RunStepCancelled -> Pair(runStep, "RunStepCancelled") + is RunDelta.RunStepExpired -> Pair(runStep, "RunStepExpired") + else -> null + }?.let { metric.assistantCreateRunStep(it.first, it.second) } + } + + private suspend fun RunDelta.launchMessageMetricsIfNecessary() { + when (this) { + is RunDelta.MessageCreated -> Pair(message, "MessageCreated") + is RunDelta.MessageInProgress -> Pair(message, "MessageInProgress") + is RunDelta.MessageIncomplete -> Pair(message, "MessageIncomplete") + is RunDelta.MessageCompleted -> Pair(message, "MessageCompleted") + else -> null + }?.let { metric.assistantCreatedMessage(it.first, it.second) } + } + companion object { /** Support for OpenAI-Beta: assistants=v2 */ diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt index b5bd1562f..c64c4d194 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt @@ -18,10 +18,17 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { private val logger = KotlinLogging.logger {} - override suspend fun customSpan(name: String, block: suspend Metric.() -> A): A { + override suspend fun customSpan( + name: String, + parameters: Map, + block: suspend Metric.() -> A + ): A { val millis = getTimeMillis() logger.at(level) { message = "${writeIndent(numberOfBlocks.get())}> Custom-Span: $name" } numberOfBlocks.incrementAndGet() + parameters.map { (key, value) -> + logger.at(level) { message = "${writeIndent(numberOfBlocks.get())}|-- $key = $value" } + } val output = block() logger.at(level) { message = "${writeIndent(numberOfBlocks.get())}|-- Finished in ${getTimeMillis() - millis} ms" @@ -43,7 +50,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { return output } - override suspend fun assistantCreateRun(runObject: RunObject) { + override suspend fun assistantCreateRun(runObject: RunObject, source: String) { logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}" } @@ -58,16 +65,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { } } - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject { - val output = block() - assistantCreateRun(output) - return output - } - - override suspend fun assistantCreateRunStep(runObject: RunStepObject) { + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) { logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}" } @@ -82,44 +80,23 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { } } - override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List { - val output = block() + override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) { logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}" - } - return output - } - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject { - val output = block() - logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}" + this.message = + "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${messageObject.assistantId}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}" + this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${messageObject.threadId}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}" + this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${messageObject.id}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.name}" + if (messageObject.status != null) { + this.message = + "${writeIndent(numberOfBlocks.get())}|-- Status: ${messageObject.status!!.name}" + } } - return output - } - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject { - val output = block() - assistantCreateRun(output) - return output } override suspend fun event(message: String) { diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt index 62661f070..e7a1f47f0 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt @@ -6,7 +6,11 @@ import com.xebia.functional.openai.generated.model.RunStepObject import com.xebia.functional.xef.prompt.Prompt interface Metric { - suspend fun customSpan(name: String, block: suspend Metric.() -> A): A + suspend fun customSpan( + name: String, + parameters: Map, + block: suspend Metric.() -> A + ): A suspend fun promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A @@ -16,59 +20,32 @@ interface Metric { suspend fun parameter(key: String, values: List) - suspend fun assistantCreateRun(runObject: RunObject) + suspend fun assistantCreateRun(runObject: RunObject, source: String) - suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject + suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) - suspend fun assistantCreateRunStep(runObject: RunStepObject) - - suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List - - suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject - - suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject + suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) companion object { val EMPTY: Metric = object : Metric { - override suspend fun customSpan(name: String, block: suspend Metric.() -> A): A = - block() + override suspend fun customSpan( + name: String, + parameters: Map, + block: suspend Metric.() -> A + ): A = block() override suspend fun promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A = block() - override suspend fun assistantCreateRun(runObject: RunObject) {} - - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = block() + override suspend fun assistantCreateRun(runObject: RunObject, source: String) {} - override suspend fun assistantCreateRunStep(runObject: RunStepObject) {} + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {} override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List = block() - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject = block() - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = block() + messageObject: MessageObject, + source: String + ) {} override suspend fun event(message: String) {} diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt index dbeed2ef6..04c13c081 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt @@ -5,7 +5,6 @@ import com.xebia.functional.xef.llm.assistants.Assistant import com.xebia.functional.xef.llm.assistants.AssistantThread import com.xebia.functional.xef.llm.assistants.RunDelta import com.xebia.functional.xef.llm.assistants.Tool -import com.xebia.functional.xef.metrics.Metric import kotlinx.serialization.Serializable @Serializable data class SumInput(val left: Int, val right: Int) @@ -42,12 +41,12 @@ suspend fun main() { // - # cd server/docker/opentelemetry // - # docker-compose up - val metric = Metric.EMPTY + val metric = com.xebia.functional.xef.metrics.Metric.EMPTY // val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric() val assistant = Assistant( - assistantId = "asst_BwQvmWIbGUMDvCuXOtAFH8B6", + assistantId = "asst_UxczzpJkysC0l424ood87DAk", toolsConfig = listOf(Tool.toolOf(SumTool())) ) val thread = AssistantThread(api = OpenAI(logRequests = false).assistants, metric = metric) diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/conversation/conversations/Animal.kt b/examples/src/main/kotlin/com/xebia/functional/xef/conversation/conversations/Animal.kt index 330450f7b..41a7534d1 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/conversation/conversations/Animal.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/conversation/conversations/Animal.kt @@ -31,7 +31,7 @@ suspend fun main() { // metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric(), store = LocalVectorStore(openAI.embeddings), ) { - metric.customSpan("Animal Example") { + metric.customSpan("Animal Example", emptyMap()) { val configNoneFromConversation = PromptConfiguration { messagePolicy { addMessagesFromConversation = MessagesFromHistory.NONE } } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt index 787e0a871..616c17a75 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt @@ -8,149 +8,107 @@ import io.opentelemetry.context.Context class OpenTelemetryAssistantState(private val tracer: Tracer) { - private val runIds: MutableMap = mutableMapOf() + private val runStartedSource = "RunCreated" - fun runSpan(runObject: RunObject) { + private val runFinishedSources = + setOf("RunCompleted", "RunCancelled", "RunFailed", "RunIncomplete", "RunExpired") - val parentOrRoot: Context = runObject.id.getOrCreateContext() + private val runStepStartedSource = "RunStepCreated" - val currentSpan = - tracer - .spanBuilder(runObject.status.name) - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() + private val runStepFinishedSources = + setOf("RunStepCompleted", "RunStepCancelled", "RunStepFailed", "RunStepExpired") - try { - currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) } - } finally { - currentSpan.end() - } - } + private val messageStartedSource = "MessageCreated" - suspend fun runSpan(runId: String, block: suspend () -> RunObject): RunObject { + private val messageFinishedSources = setOf("MessageCompleted", "MessageIncomplete") - val parentOrRoot: Context = runId.getOrCreateContext() - - val currentSpan = - tracer - .spanBuilder("New Run: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName(output.status.name) - output.setParameters(currentSpan) - } - output - } finally { - currentSpan.end() - } - } + private val runIds: MutableMap = mutableMapOf() - suspend fun toolOutputRunSpan(runId: String, block: suspend () -> RunObject): RunObject { + private val runSpans: MutableMap = mutableMapOf() - val parentOrRoot: Context = runId.getOrCreateContext() + private val runStepsSpans: MutableMap = mutableMapOf() - val currentSpan = - tracer - .spanBuilder("New ToolOutput: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName("ToolOutput: ${output.status.name}") - output.setParameters(currentSpan) - } - output - } finally { - currentSpan.end() - } - } + private val messagesSpans: MutableMap = mutableMapOf() - fun runStepSpan(runObject: RunStepObject) { + fun runSpan(runObject: RunObject, source: String) { - val parentOrRoot: Context = runObject.runId.getOrCreateContext() + val parentOrRoot: Context = runObject.id.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("step ${runObject.status.name} ${runObject.id}") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == runStartedSource) { + runSpans[runObject.id] = currentSpan + } try { currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) } } finally { - currentSpan.end() + if (runFinishedSources.contains(source) && runSpans[runObject.id] != null) { + runSpans[runObject.id]!!.let { + runObject.setParameters(it) + it.updateName("RunCreated -> $source ${runObject.id}") + it.end() + runSpans.remove(runObject.id) + } + } else if (source != runStartedSource) { + currentSpan.end() + } } } - suspend fun runStepSpan(runId: String, block: suspend () -> RunStepObject): RunStepObject { + fun runStepSpan(runStepObject: RunStepObject, source: String) { - val parentOrRoot: Context = runId.getOrCreateContext() + val parentOrRoot: Context = runStepObject.runId.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("New RunStep: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - when (val detail = output.stepDetails) { - is RunStepObjectStepDetails.CaseRunStepDetailsMessageCreationObject -> - currentSpan.updateName("Creating message: ${output.status.name}") - is RunStepObjectStepDetails.CaseRunStepDetailsToolCallsObject -> - currentSpan.updateName( - "Tools: ${detail.value.toolCalls.joinToString { - when (it) { - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsCodeObject -> it.value.type.name - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsFunctionObject -> it.value.function.name ?: "" - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsFileSearchObject -> it.value.type.name - } - }}: ${output.status.name}" - ) + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == runStepStartedSource) { + runStepsSpans[runStepObject.id] = currentSpan + } + + try { + currentSpan.makeCurrent().use { runStepObject.setParameters(currentSpan, source) } + } finally { + if (runStepFinishedSources.contains(source) && runStepsSpans[runStepObject.id] != null) { + runStepsSpans[runStepObject.id]!!.let { + runStepObject.setParameters(it, source) + it.updateName("RunStepCreated -> $source ${runStepObject.id}") + it.end() + runStepsSpans.remove(runStepObject.id) } - output.setParameters(currentSpan) + } else if (source != runStepStartedSource) { + currentSpan.end() } - output - } finally { - currentSpan.end() } } - suspend fun createdMessagesSpan( - runId: String, - block: suspend () -> List - ): List { + fun createdMessagesSpan(messageObject: MessageObject, source: String) { + val runId = messageObject.runId ?: return val parentOrRoot: Context = runId.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("New Run: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName("Messages: ${output.size}") - output.setParameters(currentSpan) - } - output + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == messageStartedSource) { + messagesSpans[messageObject.id] = currentSpan + } + + try { + currentSpan.makeCurrent().use { messageObject.setParameters(currentSpan, source) } } finally { - currentSpan.end() + if (messageFinishedSources.contains(source) && messagesSpans[messageObject.id] != null) { + messagesSpans[messageObject.id]!!.let { + messageObject.setParameters(it, source) + it.updateName("MessageCreated -> $source ${messageObject.id}") + it.end() + messagesSpans.remove(messageObject.id) + } + } else if (source != messageStartedSource) { + currentSpan.end() + } } } @@ -179,7 +137,8 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - private fun RunStepObject.setParameters(span: Span) { + private fun RunStepObject.setParameters(span: Span, source: String) { + span.setAttribute("openai.assistant.source", source) span.setAttribute("openai.assistant.type", type.name) span.setAttribute("openai.assistant.thread.id", threadId) span.setAttribute("openai.assistant.assistant.id", assistantId) @@ -224,30 +183,24 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - private fun List.setParameters(span: Span) { - span.setAttribute("openai.assistant.messages.count", size.toString()) - forEach { - span.setAttribute("openai.assistant.messages.${indexOf(it)}.role", it.role.name) - when (val inner = it.content.firstOrNull()) { - is MessageObjectContentInner.CaseMessageContentImageFileObject -> { - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.imageFile.fileId - ) - } - is MessageObjectContentInner.CaseMessageContentTextObject -> { - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.text.value - ) - } - is MessageObjectContentInner.CaseMessageContentImageUrlObject -> - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.imageUrl.url - ) - null -> {} + private fun MessageObject.setParameters(span: Span, source: String) { + span.setAttribute("openai.assistant.message.source", source) + span.setAttribute("openai.assistant.message.role", role.name) + span.setAttribute("openai.assistant.message.thread.id", threadId) + assistantId?.let { span.setAttribute("openai.assistant.message.assistant.id", it) } + runId?.let { span.setAttribute("openai.assistant.message.run.id", it) } + span.setAttribute("openai.assistant.message.id", id) + status?.let { span.setAttribute("openai.assistant.message.status", it.name) } + when (val inner = content.firstOrNull()) { + is MessageObjectContentInner.CaseMessageContentImageFileObject -> { + span.setAttribute("openai.assistant.message.content", inner.value.imageFile.fileId) + } + is MessageObjectContentInner.CaseMessageContentTextObject -> { + span.setAttribute("openai.assistant.message.content", inner.value.text.value) } + is MessageObjectContentInner.CaseMessageContentImageUrlObject -> + span.setAttribute("openai.assistant.message.content", inner.value.imageUrl.url) + null -> {} } } } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt index 3ee6c4728..34f859818 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt @@ -18,11 +18,24 @@ class OpenTelemetryMetric( private val assistantState = OpenTelemetryAssistantState(getTracer()) - override suspend fun customSpan(name: String, block: suspend Metric.() -> A): A = - state.span(name) { block() } + override suspend fun customSpan( + name: String, + parameters: Map, + block: suspend Metric.() -> A + ): A = state.span(name, parameters) { block() } override suspend fun promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A = - state.span("Prompt: ${prompt.messages.lastOrNull()?.contentAsString() ?: "empty"}") { block() } + state.span( + "Prompt: ${prompt.messages.lastOrNull()?.contentAsString() ?: "empty"}", + mapOf( + "prompt" to (prompt.messages.lastOrNull()?.contentAsString() ?: "empty"), + "functions" to (prompt.functions.joinToString { it.name }), + "configuration" to prompt.configuration.toString(), + "model" to prompt.model.toString() + ) + ) { + block() + } override suspend fun event(message: String) { state.event(message) @@ -36,30 +49,15 @@ class OpenTelemetryMetric( state.setAttribute(key, values) } - override suspend fun assistantCreateRun(runObject: RunObject) = assistantState.runSpan(runObject) + override suspend fun assistantCreateRun(runObject: RunObject, source: String) = + assistantState.runSpan(runObject, source) - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = assistantState.runSpan(runId) { block() } + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) = + assistantState.runStepSpan(runObject, source) - override suspend fun assistantCreateRunStep(runObject: RunStepObject) = - assistantState.runStepSpan(runObject) - - override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List = assistantState.createdMessagesSpan(runId) { block() } - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject = assistantState.runStepSpan(runId) { block() } - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = assistantState.toolOutputRunSpan(runId) { block() } + override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) { + assistantState.createdMessagesSpan(messageObject, source) + } private fun getTracer(scopeName: String? = null): Tracer = openTelemetry.getTracer(scopeName ?: config.defaultScopeName) diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryState.kt index c8a764a01..d3c81035f 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryState.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryState.kt @@ -11,7 +11,11 @@ import kotlinx.coroutines.withContext class OpenTelemetryState(private val tracer: Tracer) { - suspend fun span(name: String, block: suspend (Span) -> A): A { + suspend fun span( + name: String, + parameters: Map, + block: suspend (Span) -> A + ): A { val parentOrRoot = currentCoroutineContext().getOpenTelemetryContext() val currentSpan = @@ -19,7 +23,10 @@ class OpenTelemetryState(private val tracer: Tracer) { return try { withContext(currentSpan.asContextElement()) { - currentSpan.makeCurrent().use { block(currentSpan) } + currentSpan.makeCurrent().use { + parameters.map { (key, value) -> currentSpan.setAttribute(key, value) } + block(currentSpan) + } } } finally { currentSpan.end()