diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..febd3f8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,150 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# poetry +poetry.lock + +# vscode +.vscode + +#pyenv +.python-version + +#git +*.patch diff --git a/README.md b/README.md index 3b4b592..bbb398f 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ os.environ["NEW_RELIC_LICENSE_KEY"] = "" ```python from nr_openai_observability import monitor -monitor.initialization() +monitor.initialization( + application_name="OpenAI observability example" +) ``` #### Code example: @@ -51,15 +53,21 @@ import os import openai from nr_openai_observability import monitor -monitor.initialization() +monitor.initialization( + application_name="OpenAI observability example" +) openai.api_key = os.getenv("OPENAI_API_KEY") -openai.Completion.create( - model="text-davinci-003", - prompt="What is Observability?", - max_tokens=20, - temperature=0 +response = openai.ChatCompletion.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": "Write a rhythm about observability", + }, + ], ) +print(response["choices"][0]["message"]["content"]) ``` #### STEP 3: Follow the instruction [here](https://one.newrelic.com/launcher/catalog-pack-details.launcher/?pane=eyJuZXJkbGV0SWQiOiJjYXRhbG9nLXBhY2stZGV0YWlscy5jYXRhbG9nLXBhY2stY29udGVudHMiLCJxdWlja3N0YXJ0SWQiOiI1ZGIyNWRiZC1hNmU5LTQ2ZmMtYTcyOC00Njk3ZjY3N2ZiYzYifQ==) to add the dashboard to your New Relic account. diff --git a/src/nr_openai_observability/build_events.py b/src/nr_openai_observability/build_events.py new file mode 100644 index 0000000..2d56c98 --- /dev/null +++ b/src/nr_openai_observability/build_events.py @@ -0,0 +1,150 @@ +import uuid +from datetime import datetime + +import openai + + +def _build_messages_events(messages, completion_id, model): + message_id = str(uuid.uuid4()) + events = [] + for index, message in enumerate(messages): + currMessage = { + "id": message_id, + "content": message.get("content")[:4095], + "role": message.get("role"), + "completion_id": completion_id, + "sequence": index, + "model": model, + "vendor": "openAI", + "ingest_source": "PythonSDK", + } + + events.append(currMessage) + + return events + + +def _get_rate_limit_data(response_headers): + def _get_numeric_header(name): + header = response_headers.get(name) + return int(header) if header and header.isdigit() else None + + return { + "ratelimit_limit_requests": _get_numeric_header("ratelimit_limit_requests"), + "ratelimit_limit_tokens": _get_numeric_header("ratelimit_limit_tokens"), + "ratelimit_reset_tokens": response_headers.get("x-ratelimit-reset-tokens"), + "ratelimit_reset_requests": response_headers.get("x-ratelimit-reset-requests"), + "ratelimit_remaining_tokens": _get_numeric_header("ratelimit_remaining_tokens"), + "ratelimit_remaining_requests": _get_numeric_header( + "ratelimit_remaining_requests" + ), + } + + +def build_completion_events(response, request, response_headers, response_time): + completion_id = str(uuid.uuid4()) + + completion = { + "id": completion_id, + "api_key_last_four_digits": f"sk-{response.api_key[-4:]}", + "timestamp": datetime.now(), + "response_time": int(response_time * 1000), + "request.model": request.get("model") or request.get("engine"), + "response.model": response.model, + "usage.completion_tokens": response.usage.completion_tokens, + "usage.total_tokens": response.usage.total_tokens, + "usage.prompt_tokens": response.usage.prompt_tokens, + "temperature": request.get("temperature"), + "max_tokens": request.get("max_tokens"), + "finish_reason": response.choices[0].finish_reason, + "api_type": response.api_type, + "vendor": "openAI", + "ingest_source": "PythonSDK", + "number_of_messages": len(request.get("messages", [])) + len(response.choices), + "organization": response.organization, + "api_version": response_headers.get("openai-version"), + } + + completion.update(_get_rate_limit_data(response_headers)) + + messages = _build_messages_events( + request.get("messages", []) + [response.choices[0].message], + completion_id, + response.model, + ) + + return {"messages": messages, "completion": completion} + + +def build_completion_error_events(request, error): + completion_id = str(uuid.uuid4()) + + completion = { + "id": completion_id, + "api_key_last_four_digits": f"sk-{openai.api_key[-4:]}", + "timestamp": datetime.now(), + "request.model": request.get("model") or request.get("engine"), + "temperature": request.get("temperature"), + "max_tokens": request.get("max_tokens"), + "vendor": "openAI", + "ingest_source": "PythonSDK", + "organization": error.organization, + "number_of_messages": len(request.get("messages", [])), + "error_status": error.http_status, + "error_message": error.error.message, + "error_type": error.error.type, + "error_code": error.error.code, + "error_param": error.error.param, + } + + messages = _build_messages_events( + request.get("messages", []), + completion_id, + request.get("model") or request.get("engine"), + ) + + return {"messages": messages, "completion": completion} + + +def build_embedding_event(response, request, response_headers, response_time): + embedding_id = str(uuid.uuid4()) + + embedding = { + "id": embedding_id, + "api_key_last_four_digits": f"sk-{response.api_key[-4:]}", + "timestamp": datetime.now(), + "response_time": int(response_time * 1000), + "request.model": request.get("model") or request.get("engine"), + "response.model": response.model, + "usage.total_tokens": response.usage.total_tokens, + "usage.prompt_tokens": response.usage.prompt_tokens, + "api_type": response.api_type, + "vendor": "openAI", + "ingest_source": "PythonSDK", + "organization": response.organization, + "api_version": response_headers.get("openai-version"), + } + + embedding.update(_get_rate_limit_data(response_headers)) + return embedding + + +def build_embedding_error_event(request, error): + embedding_id = str(uuid.uuid4()) + + embedding = { + "id": embedding_id, + "api_key_last_four_digits": f"sk-{openai.api_key[-4:]}", + "timestamp": datetime.now(), + "request.model": request.get("model") or request.get("engine"), + "vendor": "openAI", + "ingest_source": "PythonSDK", + "organization": error.organization, + "error_status": error.http_status, + "error_message": error.error.message, + "error_type": error.error.type, + "error_code": error.error.code, + "error_param": error.error.param, + } + + return embedding diff --git a/src/nr_openai_observability/error_handling_decorator.py b/src/nr_openai_observability/error_handling_decorator.py new file mode 100644 index 0000000..4ed4330 --- /dev/null +++ b/src/nr_openai_observability/error_handling_decorator.py @@ -0,0 +1,14 @@ +import logging +from functools import wraps + +logger = logging.getLogger("nr_openai_observability") + + +def handle_errors(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as err: + logger.error(f"An error occurred in {func.__name__}: {err}") + return wrapper diff --git a/src/nr_openai_observability/monitor.py b/src/nr_openai_observability/monitor.py index f16f428..381c21b 100644 --- a/src/nr_openai_observability/monitor.py +++ b/src/nr_openai_observability/monitor.py @@ -7,18 +7,60 @@ import openai from newrelic_telemetry_sdk import Event, EventBatch, EventClient, Harvester +from nr_openai_observability.build_events import ( + build_completion_error_events, + build_completion_events, + build_embedding_error_event, + build_embedding_event, +) +from nr_openai_observability.error_handling_decorator import handle_errors + logger = logging.getLogger("nr_openai_observability") -EventName = "OpenAICompletion" +EventName = "LlmCompletion" +MessageEventName = "LlmChatCompletionMessage" +SummeryEventName = "LlmChatCompletionSummary" +EmbeddingEventName = "LlmEmbedding" def _patched_call(original_fn, patched_fn): + if hasattr(original_fn, "is_patched_by_monitor"): + return original_fn + def _inner_patch(*args, **kwargs): + if kwargs.get("stream") is True: + logger.warning( + "stream = True is not supported by nr_openai_observability. Ignoring monitoring for this function call" + ) + return original_fn(*args, **kwargs) + try: return patched_fn(original_fn, *args, **kwargs) except Exception as ex: raise ex + _inner_patch.is_patched_by_monitor = True + + return _inner_patch + + +def _patched_call_async(original_fn, patched_fn): + if hasattr(original_fn, "is_patched_by_monitor"): + return original_fn + + async def _inner_patch(*args, **kwargs): + if kwargs.get("stream") is True: + logger.warning( + "stream = True is not supported by nr_openai_observability. Ignoring monitoring for this function call" + ) + return await original_fn(*args, **kwargs) + try: + return await patched_fn(original_fn, *args, **kwargs) + except Exception as ex: + raise ex + + _inner_patch.is_patched_by_monitor = True + return _inner_patch @@ -29,6 +71,7 @@ def __init__( use_logger: Optional[bool] = None, ): self.use_logger = use_logger if use_logger else False + self.headers_by_id: dict = {} def _set_license_key( self, @@ -49,7 +92,6 @@ def _set_client_host( self, event_client_host: Optional[str] = None, ): - if not isinstance(event_client_host, str) and event_client_host is not None: raise TypeError("event_client_host instance type must be str or None") @@ -60,7 +102,7 @@ def _set_client_host( def _set_metadata( self, metadata: Dict[str, Any] = {}, - ): + ): self.metadata = metadata if not isinstance(metadata, Dict) and metadata is not None: @@ -74,10 +116,12 @@ def _log(self, msg: str): def start( self, + application_name: str, license_key: Optional[str] = None, metadata: Dict[str, Any] = {}, event_client_host: Optional[str] = None, ): + self.application_name = application_name self._set_license_key(license_key) self._set_metadata(metadata) self._set_client_host(event_client_host) @@ -101,13 +145,121 @@ def _start(self): # Why? To send the remaining data... atexit.register(self.event_harvester.stop) - def record_event(self, event_dict: dict, table: str = EventName): + def record_event( + self, + event_dict: dict, + table: str = EventName, + ): + event_dict["applicationName"] = self.application_name event_dict.update(self.metadata) event = Event(table, event_dict) self.event_batch.record(event) -def patcher_create(original_fn, *args, **kwargs): +def patcher_convert_to_openai_object(original_fn, *args, **kwargs): + response = original_fn(*args, **kwargs) + + if isinstance(args[0], openai.openai_response.OpenAIResponse): + setattr(response, "_nr_response_headers", getattr(args[0], "_headers", {})) + + return response + + +def patcher_create_chat_completion(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + result, time_delta = None, None + try: + timestamp = time.time() + result = original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + except Exception as ex: + handle_create_chat_completion(result, kwargs, ex, time_delta) + raise ex + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_chat_completion(result, kwargs, None, time_delta) + + +async def patcher_create_chat_completion_async(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + result, time_delta = None, None + try: + timestamp = time.time() + result = await original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + except Exception as ex: + handle_create_chat_completion(result, kwargs, ex, time_delta) + raise ex + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_chat_completion(result, kwargs, None, time_delta) + + +@handle_errors +def handle_create_chat_completion(response, request, error, response_time): + events = None + if error: + events = build_completion_error_events(request, error) + else: + events = build_completion_events( + response, request, getattr(response, "_nr_response_headers"), response_time + ) + delattr(response, "_nr_response_headers") + + for event in events["messages"]: + monitor.record_event(event, MessageEventName) + monitor.record_event(events["completion"], SummeryEventName) + + return response + + +async def patcher_create_completion_async(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + timestamp = time.time() + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + timestamp = time.time() + result = await original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_completion(result, time_delta, **kwargs) + + +def patcher_create_completion(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + timestamp = time.time() + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + timestamp = time.time() + result = original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_completion(result, time_delta, **kwargs) + + +@handle_errors +def handle_create_completion(response, time_delta, **kwargs): def flatten_dict(dd, separator=".", prefix="", index=""): if len(index): index = index + separator @@ -121,20 +273,8 @@ def flatten_dict(dd, separator=".", prefix="", index=""): else {prefix: dd} ) - logger.debug( - f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" - ) - - timestamp = time.time() - result = original_fn(*args, **kwargs) - time_delta = time.time() - timestamp - - logger.debug( - f"Finished running function: '{original_fn.__qualname__}'. result: {result}" - ) - choices_payload = {} - for i, choice in enumerate(result.get("choices")): + for i, choice in enumerate(response.get("choices")): choices_payload.update(flatten_dict(choice, prefix="choices", index=str(i))) logger.debug(dict(**kwargs)) @@ -142,7 +282,7 @@ def flatten_dict(dd, separator=".", prefix="", index=""): event_dict = { **kwargs, "response_time": time_delta, - **flatten_dict(result.to_dict_recursive(), separator="."), + **flatten_dict(response.to_dict_recursive(), separator="."), **choices_payload, } event_dict.pop("choices") @@ -151,35 +291,124 @@ def flatten_dict(dd, separator=".", prefix="", index=""): event_dict["messages"] = str(kwargs.get("messages")) logger.debug(f"Reported event dictionary:\n{event_dict}") - monitor.record_event(event_dict) - return result + return response + + +def patcher_create_embedding(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + result, time_delta = None, None + try: + timestamp = time.time() + result = original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + except Exception as ex: + handle_create_embedding(result, kwargs, ex, time_delta) + raise ex + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_embedding(result, kwargs, None, time_delta) + + +async def patcher_create_embedding_async(original_fn, *args, **kwargs): + logger.debug( + f"Running the original function: '{original_fn.__qualname__}'. args:{args}; kwargs: {kwargs}" + ) + + result, time_delta = None, None + try: + timestamp = time.time() + result = await original_fn(*args, **kwargs) + time_delta = time.time() - timestamp + except Exception as ex: + handle_create_embedding(result, kwargs, ex, time_delta) + raise ex + + logger.debug(f"Finished running function: '{original_fn.__qualname__}'.") + + return handle_create_embedding(result, kwargs, None, time_delta) + + +@handle_errors +def handle_create_embedding(response, request, error, response_time): + event = None + if error: + event = build_embedding_error_event(request, error) + else: + event = build_embedding_event( + response, request, getattr(response, "_nr_response_headers"), response_time + ) + delattr(response, "_nr_response_headers") + + monitor.record_event(event, EmbeddingEventName) + + return response monitor = OpenAIMonitoring() def initialization( + application_name: str, license_key: Optional[str] = None, metadata: Dict[str, Any] = {}, event_client_host: Optional[str] = None, ): - monitor.start(license_key, metadata, event_client_host) + monitor.start(application_name, license_key, metadata, event_client_host) perform_patch() def perform_patch(): + try: + openai.Embedding.create = _patched_call( + openai.Embedding.create, patcher_create_embedding + ) + except AttributeError: + pass + + try: + openai.Embedding.acreate = _patched_call_async( + openai.Embedding.acreate, patcher_create_embedding_async + ) + except AttributeError: + pass + try: openai.Completion.create = _patched_call( - openai.Completion.create, patcher_create + openai.Completion.create, patcher_create_completion + ) + except AttributeError: + pass + + try: + openai.Completion.acreate = _patched_call_async( + openai.Completion.acreate, patcher_create_completion_async ) except AttributeError: pass try: openai.ChatCompletion.create = _patched_call( - openai.ChatCompletion.create, patcher_create + openai.ChatCompletion.create, patcher_create_chat_completion + ) + except AttributeError: + pass + + try: + openai.ChatCompletion.acreate = _patched_call_async( + openai.ChatCompletion.acreate, patcher_create_chat_completion_async + ) + except AttributeError: + pass + + try: + openai.util.convert_to_openai_object = _patched_call( + openai.util.convert_to_openai_object, patcher_convert_to_openai_object ) except AttributeError: pass