Skip to content

Commit

Permalink
Celery Instrumentation Cleanup (#1128)
Browse files Browse the repository at this point in the history
* Rename files for better conventions

* Move worker config to conftest

* Update existing tests

* Add all celery task running methods to tests

* Patch celery name extraction crash

* Add correct metrics for all tests

* Fix map and starmap validators

* Fully corrected celery tests

* Remove celery tracer instrumentation

* Fix issue with wrappers not being detected by celery

* Add additional logic to celery task naming for mapping tasks

* Patch distributed tracing for new instrumentation point

* Expand celery testing matrix to all supported minor versions

* Remove unused code, and reorganize

* Linting

* Add another comment

* Remove all python 2 testing requirements
  • Loading branch information
TimPansino authored Apr 18, 2024
1 parent 3b46fad commit 7b90b64
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 116 deletions.
15 changes: 0 additions & 15 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4362,26 +4362,11 @@ def _process_module_builtin_defaults():
"instrument_celery_worker",
)

_process_module_definition(
"celery.execute.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.task.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition(
"celery.app.base",
"newrelic.hooks.application_celery",
"instrument_celery_app_base",
)
_process_module_definition(
"celery.app.trace",
"newrelic.hooks.application_celery",
"instrument_celery_execute_trace",
)
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")

_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")
Expand Down
110 changes: 49 additions & 61 deletions newrelic/hooks/application_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,45 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.transaction import current_transaction
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
from newrelic.core.agent import shutdown_agent

UNKNOWN_TASK_NAME = "<Unknown Task>"
MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"}

def CeleryTaskWrapper(wrapped, application=None, name=None):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if callable(name):
# Start Hotfix v2.2.1.
# if instance and inspect.ismethod(wrapped):
# _name = name(instance, *args, **kwargs)
# else:
# _name = name(*args, **kwargs)
def task_name(*args, **kwargs):
# Grab the current task, which can be located in either place
if args:
task = args[0]
elif "task" in kwargs:
task = kwargs["task"]
else:
return UNKNOWN_TASK_NAME # Failsafe

if instance is not None:
_name = name(instance, *args, **kwargs)
else:
_name = name(*args, **kwargs)
# End Hotfix v2.2.1.
# Task can be either a task instance or a signature, which subclasses dict, or an actual dict in some cases.
task_name = getattr(task, "name", None) or task.get("task", UNKNOWN_TASK_NAME)

elif name is None:
_name = callable_name(wrapped)
# Under mapping tasks, the root task name isn't descriptive enough so we append the
# subtask name to differentiate between different mapping tasks
if task_name in MAPPING_TASK_NAMES:
try:
subtask = kwargs["task"]["task"]
task_name = "/".join((task_name, subtask))
except Exception:
pass

else:
_name = name
return task_name

# Helper for obtaining the appropriate application object. If
# has an activate() method assume it is a valid application
# object. Don't check by type so se can easily mock it for
# testing if need be.

def _application():
if hasattr(application, "activate"):
return application
return application_instance(application)
def CeleryTaskWrapper(wrapped):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if instance is not None:
_name = task_name(instance, *args, **kwargs)
else:
_name = task_name(*args, **kwargs)

# A Celery Task can be called either outside of a transaction, or
# within the context of an existing transaction. There are 3
Expand Down Expand Up @@ -95,13 +97,14 @@ def _application():
return wrapped(*args, **kwargs)

else:
with BackgroundTask(_application(), _name, "Celery", source=instance) as transaction:
with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction:
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handler this
# by defaulting to using vars() if headers is not available
headers = getattr(wrapped.request, "headers", None) or vars(wrapped.request)
request = instance.request
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
Expand All @@ -128,20 +131,30 @@ def _application():
# instrumentation via FunctionWrapper() relies on __call__ being called which
# in turn executes the wrapper() function defined above. Since the micro
# optimization bypasses __call__ method it breaks our instrumentation of
# celery. To circumvent this problem, we added a run() attribute to our
# celery.
#
# For versions of celery 2.5.3 to 2.5.5+
# Celery has included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched. Unfortunately, our
# wrappers are too transparent for celery to detect that they've even been
# monky-patched. To circumvent this, we set the __module__ of our wrapped task
# to this file which causes celery to properly detect that it has been patched.
#
# For versions of celery 2.5.3 to 2.5.5
# To circumvent this problem, we added a run() attribute to our
# FunctionWrapper which points to our __call__ method. This causes Celery
# to execute our __call__ method which in turn applies the wrapper
# correctly before executing the task.
#
# This is only a problem in Celery versions 2.5.3 to 2.5.5. The later
# versions included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched.

class TaskWrapper(FunctionWrapper):
def run(self, *args, **kwargs):
return self.__call__(*args, **kwargs)

return TaskWrapper(wrapped, wrapper)
wrapped_task = TaskWrapper(wrapped, wrapper)
# Reset __module__ to be less transparent so celery detects our monkey-patching
wrapped_task.__module__ = CeleryTaskWrapper.__module__

return wrapped_task


def instrument_celery_app_task(module):
Expand All @@ -162,11 +175,8 @@ def instrument_celery_app_task(module):
# the task doesn't pass through it. For Celery 2.5+ need to wrap
# the tracer instead.

def task_name(task, *args, **kwargs):
return task.name

if module.BaseTask.__module__ == module.__name__:
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__, name=task_name)
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)


def wrap_Celery_send_task(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -195,28 +205,6 @@ def instrument_celery_app_base(module):
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)


def instrument_celery_execute_trace(module):
# Triggered for 'celery.execute_trace'.

if hasattr(module, "build_tracer"):
# Need to add a wrapper for background task entry point.

# In Celery 2.5+ we need to wrap the task when tracer is being
# created. Note that in Celery 2.5 the 'build_tracer' function
# actually resided in the module 'celery.execute.task'. In
# Celery 3.0 the 'build_tracer' function moved to
# 'celery.task.trace'.

_build_tracer = module.build_tracer

def build_tracer(name, task, *args, **kwargs):
task = task or module.tasks[name]
task = CeleryTaskWrapper(task, name=name)
return _build_tracer(name, task, *args, **kwargs)

module.build_tracer = build_tracer


def instrument_celery_worker(module):
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.

Expand Down
7 changes: 6 additions & 1 deletion tests/application_celery/_target_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from celery import Celery
from celery import Celery, shared_task
from testing_support.validators.validate_distributed_trace_accepted import (
validate_distributed_trace_accepted,
)
Expand Down Expand Up @@ -44,6 +44,11 @@ def nested_add(x, y):
return add(x, y)


@shared_task
def shared_task_add(x, y):
return x + y


@app.task
@validate_distributed_trace_accepted(transport_type="AMQP")
def assert_dt():
Expand Down
21 changes: 21 additions & 0 deletions tests/application_celery/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand All @@ -27,3 +28,23 @@
collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (application_celery)", default_settings=_default_settings
)


@pytest.fixture(scope="session")
def celery_config():
# Used by celery pytest plugin to configure Celery instance
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
}


@pytest.fixture(scope="session")
def celery_worker_parameters():
# Used by celery pytest plugin to configure worker instance
return {"shutdown_timeout": 120}


@pytest.fixture(scope="session", autouse=True)
def celery_worker_available(celery_session_worker):
yield celery_session_worker
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from _target_application import add, nested_add, tsum
from _target_application import add, nested_add, shared_task_add, tsum
from testing_support.validators.validate_code_level_metrics import (
validate_code_level_metrics,
)
Expand All @@ -28,7 +28,7 @@


@validate_transaction_metrics(
name="test_celery:test_celery_task_as_function_trace",
name="test_application:test_celery_task_as_function_trace",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_celery_task_as_background_task():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_multiple_function_traces",
name="test_application:test_celery_tasks_multiple_function_traces",
scoped_metrics=[("Function/_target_application.add", 1), ("Function/_target_application.tsum", 1)],
background_task=True,
)
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_celery_tasks_ignore_transaction():


@validate_transaction_metrics(
name="test_celery:test_celery_tasks_end_transaction",
name="test_application:test_celery_tasks_end_transaction",
scoped_metrics=[("Function/_target_application.add", 1)],
background_task=True,
)
Expand Down Expand Up @@ -126,3 +126,18 @@ def test_celery_nested_tasks():

add_result = nested_add(1, 2)
assert add_result == 3


@validate_transaction_metrics(
name="_target_application.shared_task_add", group="Celery", scoped_metrics=[], background_task=True
)
@validate_code_level_metrics("_target_application", "shared_task_add")
def test_celery_shared_task_as_background_task():
"""
Calling shared_task_add() outside of a transaction means the agent will create
a background transaction (with a group of 'Celery') and record shared_task_add()
as a background task.
"""
result = shared_task_add(3, 4)
assert result == 7
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from _target_application import add, assert_dt
from testing_support.fixtures import override_application_settings
from testing_support.validators.validate_transaction_count import (
Expand All @@ -23,29 +22,8 @@
)

from newrelic.api.background_task import background_task
from newrelic.packages import six

skip_if_py2 = pytest.mark.skipif(
six.PY2, reason="Celery has no pytest plugin for Python 2, making testing very difficult."
)


@pytest.fixture(scope="module")
def celery_config():
# Used by celery pytest plugin to configure Celery instance
return {
"broker_url": "memory://",
"result_backend": "cache+memory://",
}


@pytest.fixture(scope="module")
def celery_worker_parameters():
# Used by celery pytest plugin to configure worker instance
return {"shutdown_timeout": 120}


@skip_if_py2
@validate_transaction_metrics(
name="_target_application.assert_dt",
group="Celery",
Expand All @@ -57,20 +35,17 @@ def celery_worker_parameters():
index=-2,
)
@validate_transaction_metrics(
name="test_celery_distributed_tracing:test_celery_task_distributed_tracing_enabled",
name="test_distributed_tracing:test_celery_task_distributed_tracing_enabled",
background_task=True,
)
@validate_transaction_count(2)
@background_task()
def test_celery_task_distributed_tracing_enabled(celery_worker):
def test_celery_task_distributed_tracing_enabled():
result = assert_dt.apply_async()
while not result.ready():
pass
result = result.result
result = result.get()
assert result == 1


@skip_if_py2
@override_application_settings({"distributed_tracing.enabled": False})
@validate_transaction_metrics(
name="_target_application.add",
Expand All @@ -83,14 +58,12 @@ def test_celery_task_distributed_tracing_enabled(celery_worker):
index=-2,
)
@validate_transaction_metrics(
name="test_celery_distributed_tracing:test_celery_task_distributed_tracing_disabled",
name="test_distributed_tracing:test_celery_task_distributed_tracing_disabled",
background_task=True,
)
@validate_transaction_count(2)
@background_task()
def test_celery_task_distributed_tracing_disabled(celery_worker):
def test_celery_task_distributed_tracing_disabled():
result = add.apply_async((1, 2))
while not result.ready():
pass
result = result.result
result = result.get()
assert result == 3
Loading

0 comments on commit 7b90b64

Please sign in to comment.