Skip to content

Commit

Permalink
Add Optional OpenTelemetry Instrumentation
Browse files Browse the repository at this point in the history
So we get spans around `once` mostly.
  • Loading branch information
chrisguitarguy committed Jul 15, 2024
1 parent 0caa2a1 commit 3c6474e
Show file tree
Hide file tree
Showing 6 changed files with 529 additions and 3 deletions.
26 changes: 23 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,22 @@
},
"require-dev": {
"phpunit/phpunit": "^9.5",
"symfony/phpunit-bridge": "^5.4"
"symfony/phpunit-bridge": "^5.4",
"open-telemetry/api": "^1.0",
"open-telemetry/context": "^1.0",
"open-telemetry/sem-conv": "^1.25",
"open-telemetry/sdk": "^1.0",
"symfony/http-client": "^7.1",
"nyholm/psr7": "^1.8"
},
"suggest": {
"pmg/queue-pheanstalk": "Power pmg/queue with Beanstalkd"
"pmg/queue-pheanstalk": "Power pmg/queue with Beanstalkd",
"open-telemetry/api": "enables open telemetry auto instrumentation",
"open-telemetry/context": "enables open telemetry auto instrumentation",
"open-telemetry/sem-conv": "enables open telemetry auto instrumentation"
},
"conflict": {
"open-telemetry/sem-conv": "<1.25"
},
"autoload": {
"psr-4": {
Expand All @@ -29,13 +41,21 @@
"test/unit/",
"test/integration/"
]
}
},
"files": [
"src/Otel/_register.php"
]
},
"extra": {
"branch-alias": {
"dev-master": "4.0-dev",
"dev-version-3": "3.0-dev",
"dev-version-2": "2.0-dev"
}
},
"config": {
"allow-plugins": {
"php-http/discovery": true
}
}
}
188 changes: 188 additions & 0 deletions src/Otel/PmgQueueInstrumentation.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<?php declare(strict_types=1);

/**
* This file is part of PMG\Queue
*
* Copyright (c) PMG <https://www.pmg.com>
*
* For full copyright information see the LICENSE file distributed
* with this source code.
*
* @license http://opensource.org/licenses/Apache-2.0 Apache-2.0
*/

namespace PMG\Queue\Otel;

use function OpenTelemetry\Instrumentation\hook;
use OpenTelemetry\API\Instrumentation\CachedInstrumentation;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanBuilderInterface;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SemConv\TraceAttributes;
use PMG\Queue\Consumer;
use PMG\Queue\Driver;
use PMG\Queue\Envelope;

final class PmgQueueInstrumentation
{
public const NAME = 'pmg-queue';
public const INSTRUMENTATION_NAME = 'com.pmg.opentelemetry.'.self::NAME;

// these two are in semconv, but have not yet maded it to the PHP SDK
// type is generic and defined in semconv where name is system specific
public const OPERATION_TYPE = 'messaging.operation.type';
public const OPERATION_NAME = 'messaging.operation.name';

public static bool $registered = false;

public static function register(): bool
{
if (self::$registered) {
return false;
}

if (!extension_loaded('opentelemetry')) {
return false;
}

self::$registered = true;

$instrumentation = new CachedInstrumentation(self::INSTRUMENTATION_NAME);

hook(
Consumer::class,
'once',
pre: static function (
Consumer $consumer,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
$queueName = $params[0];
assert(is_string($queueName));

$builder = $instrumentation
->tracer()
->spanBuilder($queueName.' receive')
->setSpanKind(SpanKind::KIND_CONSUMER)
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $queueName)
->setAttribute(self::OPERATION_TYPE, 'receive') // generic
->setAttribute(self::OPERATION_NAME, 'once') // system specific
;

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);
Context::storage()->attach($context);

return $params;
},
post: static function (
Consumer $consumer,
array $params,
mixed $result,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$queueName = $params[0];
assert(is_string($queueName));

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
} elseif ($result === false) {
$span->setStatus(StatusCode::STATUS_ERROR, 'Message was not handled successfully');
}

$span->end();
}
);

hook(
Driver::class,
'enqueue',
pre: static function (
Driver $bus,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
$queueName = $params[0];
assert(is_string($queueName));

$message = $params[1];
assert(is_object($message));

$builder = $instrumentation
->tracer()
->spanBuilder($queueName.' publish')
->setSpanKind(SpanKind::KIND_PRODUCER)
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $queueName)
->setAttribute(self::OPERATION_TYPE, 'publish')
->setAttribute(self::OPERATION_NAME, 'enqueue')
;

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);
Context::storage()->attach($context);

return $params;
},
post: static function (
Driver $driver,
array $params,
?Envelope $envelope,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);

return self::$registered;
}
}
36 changes: 36 additions & 0 deletions src/Otel/_register.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php declare(strict_types=1);

/**
* This file is part of PMG\Queue
*
* Copyright (c) PMG <https://www.pmg.com>
*
* For full copyright information see the LICENSE file distributed
* with this source code.
*
* @license http://opensource.org/licenses/Apache-2.0 Apache-2.0
*/

use PMG\Queue\Otel\PmgQueueInstrumentation;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SemConv\TraceAttributes;
use OpenTelemetry\SDK\Sdk;

// look for deps and if we have them all we'll load the instrumentation.
if (
!extension_loaded('opentelemetry') ||
!class_exists(Span::class) ||
!class_exists(Context::class) ||
!interface_exists(TraceAttributes::class)
) {
return;
}


// allow disabling instrumentation via the SDK's supported environment variables
if (class_exists(Sdk::class) && Sdk::isInstrumentationDisabled(PmgQueueInstrumentation::NAME)) {
return;
}

PmgQueueInstrumentation::register();
Loading

0 comments on commit 3c6474e

Please sign in to comment.