From 4664a278468d60801ff3cbd1aeba5ba8435ef584 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Mon, 7 Sep 2020 03:43:12 +0530 Subject: [PATCH] Add context propagation New features and breaking changes: 1. HTTP context propagation The wrapper now automatically tries to extract B3 tracing headers from incoming HTTP requests and uses the extracted span context as the parent span when creating new spans. 2. Allows users to initialize the tracing without auto-creating the span The decorator now must be called and accepts an optional argument `with_span=` which defaults to `True`. When set to `False`, the wrapper still initializes and flushes the tracer but does not auto create spans. 3. Removed signalfx_lambda.wrapper Since we are breaking backward compatibility here and releaing a breaking version according to semver, we also removed the depracated wrapper decorator. --- .gitignore | 4 +- README.rst | 18 ++++++- signalfx_lambda/__init__.py | 9 ++-- signalfx_lambda/metrics.py | 42 ++++++++-------- signalfx_lambda/tracing.py | 99 ++++++++++++++++++++++++++++--------- signalfx_lambda/version.py | 2 +- 6 files changed, 121 insertions(+), 53 deletions(-) diff --git a/.gitignore b/.gitignore index 21504c1..09472a1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ *.pyc .idea *.iml +.vscode +venv dist build -*.egg-info \ No newline at end of file +*.egg-info diff --git a/README.rst b/README.rst index 5c9a530..45e707d 100644 --- a/README.rst +++ b/README.rst @@ -133,7 +133,7 @@ The decorators can be used individually or together. import signalfx_lambda - @signalfx_lambda.emits_metrics + @signalfx_lambda.emits_metrics() def handler(event, context): # your code @@ -143,10 +143,24 @@ The decorators can be used individually or together. import signalfx_lambda - @signalfx_lambda.is_traced + @signalfx_lambda.is_traced() def handler(event, context): # your code +3. Optionally, you can tell the wrapper to not auto-create a span but still initialize tracing for manual usage. + +This is useful when processing SQS messages and you want each message to tie to the trace from producer that emitted the message. + +.. code:: python + + import signalfx_lambda + + @signalfx_lambda.is_traced(with_span=False) + def handler(event, context): + for record in event.get('Records', []): + with signalfx_lambda.create_span(record, context): + # your code to process record + Step 5: Send custom metrics from a Lambda function ------------------------------------------------------- diff --git a/signalfx_lambda/__init__.py b/signalfx_lambda/__init__.py index fe5b3d1..8b6a7ec 100644 --- a/signalfx_lambda/__init__.py +++ b/signalfx_lambda/__init__.py @@ -7,11 +7,6 @@ from .version import name, version -# backwards compatibility -def wrapper(*args, **kwargs): - return metrics.wrapper(*args, **kwargs) - - def emits_metrics(*args, **kwargs): return metrics.wrapper(*args, **kwargs) @@ -20,6 +15,10 @@ def is_traced(*args, **kwargs): return tracing.wrapper(*args, **kwargs) +# backwards compatibility +wrapper = emits_metrics + + # less convenient method def send_metric(counters=[], gauges=[]): metrics.send_metric(counters, gauges) diff --git a/signalfx_lambda/metrics.py b/signalfx_lambda/metrics.py index cc050d8..1bb2d72 100644 --- a/signalfx_lambda/metrics.py +++ b/signalfx_lambda/metrics.py @@ -108,23 +108,25 @@ def call(*args, **kwargs): return wrapper_decorator -def wrapper(*args, **kwargs): - access_token = utils.get_access_token() - if len(args) == 1 and callable(args[0]): - # plain wrapper with no parameter - # call the wrapper decorator like normally would - decorator = generate_wrapper_decorator(access_token) - return decorator(args[0]) - else: - dimensions = kwargs.get('dimensions') - if isinstance(dimensions, dict): - # wrapper with dimension parameter - # assign default dimensions - # then return the wrapper decorator - default_dimensions.update(dimensions) - - token = kwargs.get('access_token') - if isinstance(token, six.string_types): - access_token = token - - return generate_wrapper_decorator(access_token) +def wrapper(): + def inner(*args, **kwargs): + access_token = utils.get_access_token() + if len(args) == 1 and callable(args[0]): + # plain wrapper with no parameter + # call the wrapper decorator like normally would + decorator = generate_wrapper_decorator(access_token) + return decorator(args[0]) + else: + dimensions = kwargs.get('dimensions') + if isinstance(dimensions, dict): + # wrapper with dimension parameter + # assign default dimensions + # then return the wrapper decorator + default_dimensions.update(dimensions) + + token = kwargs.get('access_token') + if isinstance(token, six.string_types): + access_token = token + + return generate_wrapper_decorator(access_token) + return inner diff --git a/signalfx_lambda/tracing.py b/signalfx_lambda/tracing.py index 8e2444a..ea07ece 100644 --- a/signalfx_lambda/tracing.py +++ b/signalfx_lambda/tracing.py @@ -6,32 +6,31 @@ from . import utils -def wrapper(func): - @functools.wraps(func) - def call(*args, **kwargs): - context = args[1] +_tracer = None - tracer = init_jaeger_tracer(context) +span_kind_mapping = { + 'aws:sqs': ext_tags.SPAN_KIND_CONSUMER, +} - span_tags = utils.get_tracing_fields(context) - span_tags['component'] = 'python-lambda-wrapper' - span_tags[ext_tags.SPAN_KIND] = ext_tags.SPAN_KIND_RPC_SERVER +def wrapper(with_span=True): + def inner(func): + @functools.wraps(func) + def call(event, context): + tracer = init_jaeger_tracer(context) + try: + if with_span: + with create_span(event, context): + # call the original handler + return func(event, context) + else: + return func(event, context) + except BaseException as e: + raise + finally: + tracer.close() - span_prefix = os.getenv('SIGNALFX_SPAN_PREFIX', 'lambda_python_') - - try: - with tracer.start_active_span(span_prefix + context.function_name, tags=span_tags) as scope: - # call the original handler - return func(*args, **kwargs) - except BaseException as e: - scope.span.set_tag('error', True) - scope.span.log_kv({'message': e}) - - raise - finally: - tracer.close() - - return call + return call + return inner def init_jaeger_tracer(context): @@ -56,6 +55,58 @@ def init_jaeger_tracer(context): config = Config(config=tracer_config, service_name=service_name) tracer = config.new_tracer() - opentracing.tracer = tracer + global _tracer + _tracer = opentracing.tracer = tracer return tracer + + +class create_span(object): + def __init__(self, event, context, auto_add_tags=True, operation_name=None): + if not _tracer: + raise RuntimeError(( + 'tracing has not been initialized. Use signalfx_lambda.is_tracer' + ' decorator to initialize tracing')) + self.event = event + self.context = context + self.auto_add_tags = auto_add_tags + self.operation_name = operation_name + self.tracer = _tracer + self.scope = None + + def __enter__(self): + headers = self.event.get('headers', self.event.get('attributes', {})) + parent_span = self.tracer.extract(opentracing.Format.HTTP_HEADERS, headers) + + span_tags = {} + if self.auto_add_tags: + span_tags = utils.get_tracing_fields(self.context) + span_tags['component'] = 'python-lambda-wrapper' + span_tags[ext_tags.SPAN_KIND] = span_kind_mapping.get( + self.event.get('eventSource'), + ext_tags.SPAN_KIND_RPC_SERVER + ) + + op_name = self.operation_name + if not op_name: + span_prefix = os.getenv('SIGNALFX_SPAN_PREFIX', 'lambda_python_') + op_name = span_prefix + self.context.function_name + + self.scope = self.tracer.start_active_span( + op_name, + tags=span_tags, + child_of=parent_span + ) + return self.scope + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.scope: + return + + if exc_val: + span = self.scope.span + span.set_tag(ext_tags.ERROR, True) + span.set_tag("sfx.error.message", str(exc_val)) + span.set_tag("sfx.error.object", str(exc_val.__class__)) + span.set_tag("sfx.error.kind", exc_val.__class__.__name__) + self.scope.close() diff --git a/signalfx_lambda/version.py b/signalfx_lambda/version.py index 0c1088d..183ecc5 100644 --- a/signalfx_lambda/version.py +++ b/signalfx_lambda/version.py @@ -1,6 +1,6 @@ # Copyright (C) 2017 SignalFx, Inc. All rights reserved. name = 'signalfx_lambda' -version = '0.2.1' +version = '1.0.0beta1' user_agent = 'signalfx_lambda/' + version