Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #36 from signalfx/context-propagation
Browse files Browse the repository at this point in the history
Add context propagation
  • Loading branch information
owais authored Sep 8, 2020
2 parents 583af57 + 4664a27 commit 3e5df21
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 53 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
*.pyc
.idea
*.iml
.vscode
venv
dist
build
*.egg-info
*.egg-info
18 changes: 16 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ The decorators can be used individually or together.
import signalfx_lambda
@signalfx_lambda.emits_metrics
@signalfx_lambda.emits_metrics()
def handler(event, context):
# your code
Expand All @@ -143,10 +143,24 @@ The decorators can be used individually or together.
import signalfx_lambda
@signalfx_lambda.is_traced
@signalfx_lambda.is_traced()
def handler(event, context):
# your code
3. Optionally, you can tell the wrapper to not auto-create a span but still initialize tracing for manual usage.

This is useful when processing SQS messages and you want each message to tie to the trace from producer that emitted the message.

.. code:: python
import signalfx_lambda
@signalfx_lambda.is_traced(with_span=False)
def handler(event, context):
for record in event.get('Records', []):
with signalfx_lambda.create_span(record, context):
# your code to process record
Step 5: Send custom metrics from a Lambda function
-------------------------------------------------------
Expand Down
9 changes: 4 additions & 5 deletions signalfx_lambda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
from .version import name, version


# backwards compatibility
def wrapper(*args, **kwargs):
return metrics.wrapper(*args, **kwargs)


def emits_metrics(*args, **kwargs):
return metrics.wrapper(*args, **kwargs)

Expand All @@ -20,6 +15,10 @@ def is_traced(*args, **kwargs):
return tracing.wrapper(*args, **kwargs)


# backwards compatibility
wrapper = emits_metrics


# less convenient method
def send_metric(counters=[], gauges=[]):
metrics.send_metric(counters, gauges)
Expand Down
42 changes: 22 additions & 20 deletions signalfx_lambda/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,25 @@ def call(*args, **kwargs):
return wrapper_decorator


def wrapper(*args, **kwargs):
access_token = utils.get_access_token()
if len(args) == 1 and callable(args[0]):
# plain wrapper with no parameter
# call the wrapper decorator like normally would
decorator = generate_wrapper_decorator(access_token)
return decorator(args[0])
else:
dimensions = kwargs.get('dimensions')
if isinstance(dimensions, dict):
# wrapper with dimension parameter
# assign default dimensions
# then return the wrapper decorator
default_dimensions.update(dimensions)

token = kwargs.get('access_token')
if isinstance(token, six.string_types):
access_token = token

return generate_wrapper_decorator(access_token)
def wrapper():
def inner(*args, **kwargs):
access_token = utils.get_access_token()
if len(args) == 1 and callable(args[0]):
# plain wrapper with no parameter
# call the wrapper decorator like normally would
decorator = generate_wrapper_decorator(access_token)
return decorator(args[0])
else:
dimensions = kwargs.get('dimensions')
if isinstance(dimensions, dict):
# wrapper with dimension parameter
# assign default dimensions
# then return the wrapper decorator
default_dimensions.update(dimensions)

token = kwargs.get('access_token')
if isinstance(token, six.string_types):
access_token = token

return generate_wrapper_decorator(access_token)
return inner
99 changes: 75 additions & 24 deletions signalfx_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,31 @@

from . import utils

def wrapper(func):
@functools.wraps(func)
def call(*args, **kwargs):
context = args[1]
_tracer = None

tracer = init_jaeger_tracer(context)
span_kind_mapping = {
'aws:sqs': ext_tags.SPAN_KIND_CONSUMER,
}

span_tags = utils.get_tracing_fields(context)
span_tags['component'] = 'python-lambda-wrapper'
span_tags[ext_tags.SPAN_KIND] = ext_tags.SPAN_KIND_RPC_SERVER
def wrapper(with_span=True):
def inner(func):
@functools.wraps(func)
def call(event, context):
tracer = init_jaeger_tracer(context)
try:
if with_span:
with create_span(event, context):
# call the original handler
return func(event, context)
else:
return func(event, context)
except BaseException as e:
raise
finally:
tracer.close()

span_prefix = os.getenv('SIGNALFX_SPAN_PREFIX', 'lambda_python_')

try:
with tracer.start_active_span(span_prefix + context.function_name, tags=span_tags) as scope:
# call the original handler
return func(*args, **kwargs)
except BaseException as e:
scope.span.set_tag('error', True)
scope.span.log_kv({'message': e})

raise
finally:
tracer.close()

return call
return call
return inner


def init_jaeger_tracer(context):
Expand All @@ -56,6 +55,58 @@ def init_jaeger_tracer(context):
config = Config(config=tracer_config, service_name=service_name)

tracer = config.new_tracer()
opentracing.tracer = tracer
global _tracer
_tracer = opentracing.tracer = tracer

return tracer


class create_span(object):
def __init__(self, event, context, auto_add_tags=True, operation_name=None):
if not _tracer:
raise RuntimeError((
'tracing has not been initialized. Use signalfx_lambda.is_tracer'
' decorator to initialize tracing'))
self.event = event
self.context = context
self.auto_add_tags = auto_add_tags
self.operation_name = operation_name
self.tracer = _tracer
self.scope = None

def __enter__(self):
headers = self.event.get('headers', self.event.get('attributes', {}))
parent_span = self.tracer.extract(opentracing.Format.HTTP_HEADERS, headers)

span_tags = {}
if self.auto_add_tags:
span_tags = utils.get_tracing_fields(self.context)
span_tags['component'] = 'python-lambda-wrapper'
span_tags[ext_tags.SPAN_KIND] = span_kind_mapping.get(
self.event.get('eventSource'),
ext_tags.SPAN_KIND_RPC_SERVER
)

op_name = self.operation_name
if not op_name:
span_prefix = os.getenv('SIGNALFX_SPAN_PREFIX', 'lambda_python_')
op_name = span_prefix + self.context.function_name

self.scope = self.tracer.start_active_span(
op_name,
tags=span_tags,
child_of=parent_span
)
return self.scope

def __exit__(self, exc_type, exc_val, exc_tb):
if not self.scope:
return

if exc_val:
span = self.scope.span
span.set_tag(ext_tags.ERROR, True)
span.set_tag("sfx.error.message", str(exc_val))
span.set_tag("sfx.error.object", str(exc_val.__class__))
span.set_tag("sfx.error.kind", exc_val.__class__.__name__)
self.scope.close()
2 changes: 1 addition & 1 deletion signalfx_lambda/version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (C) 2017 SignalFx, Inc. All rights reserved.

name = 'signalfx_lambda'
version = '0.2.1'
version = '1.0.0beta1'

user_agent = 'signalfx_lambda/' + version

0 comments on commit 3e5df21

Please sign in to comment.