From ecdd723d621071d499bdb5dcbb0bf2124b990242 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Mon, 23 Dec 2024 18:09:19 +0200 Subject: [PATCH] `capture_request_text_body` (#722) --- logfire/_internal/integrations/httpx.py | 186 +++++++++++++++++------- logfire/_internal/main.py | 8 + tests/otel_integrations/test_httpx.py | 39 ++++- 3 files changed, 176 insertions(+), 57 deletions(-) diff --git a/logfire/_internal/integrations/httpx.py b/logfire/_internal/integrations/httpx.py index 4750fc09..aa327ed2 100644 --- a/logfire/_internal/integrations/httpx.py +++ b/logfire/_internal/integrations/httpx.py @@ -8,7 +8,6 @@ from typing import TYPE_CHECKING, Any, Callable, Literal, Mapping, cast import httpx -import opentelemetry.sdk.trace from logfire._internal.stack_info import warn_at_user_stacklevel from logfire.propagate import attach_context, get_context @@ -70,6 +69,7 @@ def instrument_httpx( client: httpx.Client | httpx.AsyncClient | None, capture_headers: bool, capture_request_json_body: bool, + capture_request_text_body: bool, capture_response_json_body: bool, capture_request_form_data: bool, **kwargs: Any, @@ -109,13 +109,21 @@ def instrument_httpx( async_request_hook = cast('AsyncRequestHook | None', final_kwargs.get('async_request_hook')) async_response_hook = cast('AsyncResponseHook | None', final_kwargs.get('async_response_hook')) final_kwargs['request_hook'] = make_request_hook( - request_hook, should_capture_request_headers, capture_request_json_body, capture_request_form_data + request_hook, + should_capture_request_headers, + capture_request_json_body, + capture_request_text_body, + capture_request_form_data, ) final_kwargs['response_hook'] = make_response_hook( response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance ) final_kwargs['async_request_hook'] = make_async_request_hook( - async_request_hook, should_capture_request_headers, capture_request_json_body, capture_request_form_data + async_request_hook, + should_capture_request_headers, + capture_request_json_body, + capture_request_text_body, + capture_request_form_data, ) final_kwargs['async_response_hook'] = make_async_response_hook( async_response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance @@ -128,7 +136,11 @@ def instrument_httpx( response_hook = cast('ResponseHook | AsyncResponseHook | None', final_kwargs.get('response_hook')) request_hook = make_async_request_hook( - request_hook, should_capture_request_headers, capture_request_json_body, capture_request_form_data + request_hook, + should_capture_request_headers, + capture_request_json_body, + capture_request_text_body, + capture_request_form_data, ) response_hook = make_async_response_hook( response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance @@ -138,7 +150,11 @@ def instrument_httpx( response_hook = cast('ResponseHook | None', final_kwargs.get('response_hook')) request_hook = make_request_hook( - request_hook, should_capture_request_headers, capture_request_json_body, capture_request_form_data + request_hook, + should_capture_request_headers, + capture_request_json_body, + capture_request_text_body, + capture_request_form_data, ) response_hook = make_response_hook( response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance @@ -148,15 +164,107 @@ def instrument_httpx( instrumentor.instrument_client(client, tracer_provider, request_hook, response_hook) # type: ignore[reportArgumentType] +class LogfireHttpxRequestInfo(RequestInfo): + span: Span + + def capture_headers(self): + capture_headers(self.span, self.headers, 'request') + + def capture_body_if_json(self, attr_name: str = 'http.request.body.json'): + if not self.body_is_streaming and self.content_type_is_json: + self.capture_text_as_json(attr_name=attr_name) + + def capture_body_if_text(self, attr_name: str = 'http.request.body.text'): + if not self.body_is_streaming and self.content_type_is_text: + self.span.set_attribute(attr_name, self.text) + + def capture_body_if_form(self, attr_name: str = 'http.request.body.form'): + if not self.content_type_is_form: + return + + data = self.form_data + if not (data and isinstance(data, Mapping)): # pragma: no cover # type: ignore + return + self.set_complex_span_attributes({attr_name: data}) + + def capture_text_as_json(self, attr_name: str = 'http.request.body.json', text: str | None = None): + if text is None: # pragma: no branch + text = self.text + self.set_complex_span_attributes({attr_name: {}}) # Set the JSON schema + self.span.set_attribute(attr_name, text) + + @property + def body_is_streaming(self): + return not isinstance(self.stream, httpx.ByteStream) + + @property + def content_type_header_object(self) -> ContentTypeHeader: + return content_type_header_from_string(self.content_type_header_string) + + @property + def content_type_header_string(self) -> str: + return self.headers.get('content-type', '') + + @property + def content_type_is_json(self): + return is_json_type(self.content_type_header_string) + + @property + def content_type_is_text(self): + return is_text_type(self.content_type_header_string) + + @property + def content_type_is_form(self): + content_type = self.content_type_header_string + return content_type == 'application/x-www-form-urlencoded' + + @property + def content_type_charset(self): + return self.content_type_header_object.params.get('charset', 'utf-8') + + @property + def content(self) -> bytes: + if self.body_is_streaming: # pragma: no cover + raise ValueError('Cannot read content from a streaming body') + return list(self.stream)[0] # type: ignore + + @property + def text(self) -> str: + return decode_body(self.content, self.content_type_charset) + + @property + def form_data(self) -> Mapping[str, Any] | None: + frame = inspect.currentframe().f_back.f_back.f_back # type: ignore + while frame: + if frame.f_code in CODES_FOR_METHODS_WITH_DATA_PARAM: + break + frame = frame.f_back + else: # pragma: no cover + return + + return frame.f_locals.get('data') + + def set_complex_span_attributes(self, attributes: dict[str, Any]): + set_user_attributes_on_raw_span(self.span, attributes) # type: ignore + + def make_request_hook( - hook: RequestHook | None, should_capture_headers: bool, should_capture_json: bool, should_capture_form_data: bool + hook: RequestHook | None, + should_capture_headers: bool, + should_capture_json: bool, + should_capture_text: bool, + should_capture_form_data: bool, ) -> RequestHook | None: - if not should_capture_headers and not should_capture_json and not should_capture_form_data and not hook: + if not (should_capture_headers or should_capture_json or should_capture_text or should_capture_form_data or hook): return None def new_hook(span: Span, request: RequestInfo) -> None: with handle_internal_errors(): - capture_request(request, span, should_capture_headers, should_capture_json, should_capture_form_data) + request = LogfireHttpxRequestInfo(*request) + request.span = span + capture_request( + request, should_capture_headers, should_capture_json, should_capture_text, should_capture_form_data + ) run_hook(hook, span, request) return new_hook @@ -166,32 +274,39 @@ def make_async_request_hook( hook: AsyncRequestHook | RequestHook | None, should_capture_headers: bool, should_capture_json: bool, + should_capture_text: bool, should_capture_form_data: bool, ) -> AsyncRequestHook | None: - if not should_capture_headers and not should_capture_json and not should_capture_form_data and not hook: + if not (should_capture_headers or should_capture_json or should_capture_text or should_capture_form_data or hook): return None async def new_hook(span: Span, request: RequestInfo) -> None: with handle_internal_errors(): - capture_request(request, span, should_capture_headers, should_capture_json, should_capture_form_data) + request = LogfireHttpxRequestInfo(*request) + request.span = span + capture_request( + request, should_capture_headers, should_capture_json, should_capture_text, should_capture_form_data + ) await run_async_hook(hook, span, request) return new_hook def capture_request( - request: RequestInfo, - span: Span, + request: LogfireHttpxRequestInfo, should_capture_headers: bool, should_capture_json: bool, + should_capture_text: bool, should_capture_form_data: bool, ) -> None: if should_capture_headers: - capture_request_headers(span, request) + request.capture_headers() if should_capture_json: - capture_request_body(span, request) + request.capture_body_if_json() + if should_capture_text and not (should_capture_json and request.content_type_is_json): + request.capture_body_if_text() if should_capture_form_data: - capture_request_form_data(span, request) + request.capture_body_if_form() def make_response_hook( @@ -299,10 +414,6 @@ def capture_response_headers(span: Span, response: ResponseInfo) -> None: capture_headers(span, response.headers, 'response') -def capture_request_headers(span: Span, request: RequestInfo) -> None: - capture_headers(span, request.headers, 'request') - - def capture_headers(span: Span, headers: httpx.Headers, request_or_response: Literal['request', 'response']) -> None: span.set_attributes( { @@ -321,23 +432,6 @@ def decode_body(body: bytes, charset: str): return body.decode(charset, errors='replace') -def capture_request_body(span: Span, request: RequestInfo) -> None: - if not isinstance(request.stream, httpx.ByteStream): - return - - content_type_string = request.headers.get('content-type', '') - if not is_json_type(content_type_string): - return - - content_type_header = content_type_header_from_string(content_type_string) - charset = content_type_header.params.get('charset', 'utf-8') - body = decode_body(list(request.stream)[0], charset) - - attr_name = 'http.request.body.json' - set_user_attributes_on_raw_span(span, {attr_name: {}}) # type: ignore - span.set_attribute(attr_name, body) - - CODES_FOR_METHODS_WITH_DATA_PARAM = [ inspect.unwrap(method).__code__ for method in [ @@ -349,26 +443,6 @@ def capture_request_body(span: Span, request: RequestInfo) -> None: ] -def capture_request_form_data(span: Span, request: RequestInfo) -> None: - content_type = request.headers.get('content-type', '') - if content_type != 'application/x-www-form-urlencoded': - return - - frame = inspect.currentframe().f_back.f_back.f_back # type: ignore - while frame: - if frame.f_code in CODES_FOR_METHODS_WITH_DATA_PARAM: - break - frame = frame.f_back - else: # pragma: no cover - return - - data = frame.f_locals.get('data') - if not (data and isinstance(data, Mapping)): # pragma: no cover - return - span = cast(opentelemetry.sdk.trace.Span, span) - set_user_attributes_on_raw_span(span, {'http.request.body.form': data}) - - @lru_cache def content_type_header_from_string(content_type: str) -> ContentTypeHeader: return EmailPolicy.header_factory('content-type', content_type) diff --git a/logfire/_internal/main.py b/logfire/_internal/main.py index b2873d17..186519b3 100644 --- a/logfire/_internal/main.py +++ b/logfire/_internal/main.py @@ -1157,6 +1157,7 @@ def instrument_httpx( client: httpx.Client, *, capture_headers: bool = False, + capture_request_text_body: bool = False, capture_request_json_body: bool = False, capture_response_json_body: bool = False, capture_request_form_data: bool = False, @@ -1170,6 +1171,7 @@ def instrument_httpx( *, capture_headers: bool = False, capture_request_json_body: bool = False, + capture_request_text_body: bool = False, capture_response_json_body: bool = False, capture_request_form_data: bool = False, **kwargs: Unpack[AsyncClientKwargs], @@ -1182,6 +1184,7 @@ def instrument_httpx( *, capture_headers: bool = False, capture_request_json_body: bool = False, + capture_request_text_body: bool = False, capture_response_json_body: bool = False, capture_request_form_data: bool = False, **kwargs: Unpack[HTTPXInstrumentKwargs], @@ -1193,6 +1196,7 @@ def instrument_httpx( *, capture_headers: bool = False, capture_request_json_body: bool = False, + capture_request_text_body: bool = False, capture_response_json_body: bool = False, capture_request_form_data: bool = False, **kwargs: Any, @@ -1212,6 +1216,9 @@ def instrument_httpx( If you don't want to capture all headers, you can customize the headers captured. See the [Capture Headers](https://logfire.pydantic.dev/docs/guides/advanced/capture_headers/) section for more info. + capture_request_text_body: Set to `True` to capture the request text body + if the content type is either `text/*` + or `application/` followed by a known human-readable text format, e.g. XML. capture_request_json_body: Set to `True` to capture the request JSON body. Specifically captures the raw request body whenever the content type is `application/json`. Doesn't check if the body is actually JSON. @@ -1234,6 +1241,7 @@ def instrument_httpx( client, capture_headers=capture_headers, capture_request_json_body=capture_request_json_body, + capture_request_text_body=capture_request_text_body, capture_response_json_body=capture_response_json_body, capture_request_form_data=capture_request_form_data, **kwargs, diff --git a/tests/otel_integrations/test_httpx.py b/tests/otel_integrations/test_httpx.py index 33b56b1e..c53761b6 100644 --- a/tests/otel_integrations/test_httpx.py +++ b/tests/otel_integrations/test_httpx.py @@ -355,6 +355,7 @@ def test_httpx_client_capture_full(exporter: TestExporter): client, capture_request_headers=True, capture_request_json_body=True, + capture_request_text_body=True, capture_response_headers=True, capture_response_json_body=True, ) @@ -448,6 +449,7 @@ async def test_async_httpx_client_capture_full(exporter: TestExporter): client, capture_request_headers=True, capture_request_json_body=True, + capture_request_text_body=True, capture_response_headers=True, capture_response_json_body=True, capture_request_form_data=True, @@ -564,7 +566,7 @@ def test_httpx_client_capture_request_form_data(exporter: TestExporter): assert [code.co_name for code in CODES_FOR_METHODS_WITH_DATA_PARAM] == ['request', 'stream', 'request', 'stream'] with httpx.Client(transport=create_transport()) as client: - logfire.instrument_httpx(client, capture_request_form_data=True) + logfire.instrument_httpx(client, capture_request_form_data=True, capture_request_text_body=True) client.post('https://example.org/', data={'form': 'values'}) assert exporter.exported_spans_as_dict() == snapshot( @@ -598,6 +600,41 @@ def test_httpx_client_capture_request_form_data(exporter: TestExporter): ) +def test_httpx_client_capture_request_text_body(exporter: TestExporter): + with httpx.Client(transport=create_transport()) as client: + logfire.instrument_httpx(client, capture_request_text_body=True) + client.post('https://example.org/', headers={'Content-Type': 'text/plain'}, content='hello') + + assert exporter.exported_spans_as_dict() == snapshot( + [ + { + 'name': 'POST', + 'context': {'trace_id': 1, 'span_id': 1, 'is_remote': False}, + 'parent': None, + 'start_time': 1000000000, + 'end_time': 2000000000, + 'attributes': { + 'http.method': 'POST', + 'http.request.method': 'POST', + 'http.url': 'https://example.org/', + 'url.full': 'https://example.org/', + 'http.host': 'example.org', + 'server.address': 'example.org', + 'network.peer.address': 'example.org', + 'logfire.span_type': 'span', + 'logfire.msg': 'POST /', + 'http.request.body.text': 'hello', + 'http.status_code': 200, + 'http.response.status_code': 200, + 'http.flavor': '1.1', + 'network.protocol.version': '1.1', + 'http.target': '/', + }, + } + ] + ) + + def test_is_json_type(): assert is_json_type('application/json') assert is_json_type(' APPLICATION / JSON ')