diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 487e13a935..24911c395d 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -292,7 +292,8 @@ def close(self) -> None: if not self._flushed.wait(timeout=self._max_flush_time_seconds): logger.error("Closing the transport connection timed out.") - stop_thread = close + def stop_thread(self) -> None: + self.close() def flush(self): """ diff --git a/elasticapm/transport/http.py b/elasticapm/transport/http.py index 17ae50ba31..ed132068c6 100644 --- a/elasticapm/transport/http.py +++ b/elasticapm/transport/http.py @@ -32,6 +32,7 @@ import hashlib import json +import os import re import ssl import urllib.parse @@ -250,6 +251,23 @@ def ca_certs(self): return self._server_ca_cert_file return certifi.where() if (certifi and self.client.config.use_certifi) else None + def close(self): + """ + Take care of being able to shutdown cleanly + :return: + """ + if self._closed or (not self._thread or self._thread.pid != os.getpid()): + return + + self._closed = True + # we are racing against urllib3 ConnectionPool weakref finalizer that would lead to having them closed + # and we hanging waiting for send any eventual queued data + # Force the creation of a new PoolManager so that we are always able to flush + self._http = None + self.queue("close", None) + if not self._flushed.wait(timeout=self._max_flush_time_seconds): + logger.error("Closing the transport connection timed out.") + def version_string_to_tuple(version): if version: diff --git a/tests/transports/test_urllib3.py b/tests/transports/test_urllib3.py index b24408e54f..62a355bde0 100644 --- a/tests/transports/test_urllib3.py +++ b/tests/transports/test_urllib3.py @@ -30,6 +30,7 @@ import os +import time import certifi import mock @@ -517,3 +518,89 @@ def test_fetch_server_info_flat_string(waiting_httpserver, caplog, elasticapm_cl transport.fetch_server_info() assert elasticapm_client.server_version is None assert_any_record_contains(caplog.records, "No version key found in server response") + + +def test_close(waiting_httpserver, elasticapm_client): + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) + transport = Transport( + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers + ) + transport.start_thread() + + transport.close() + + assert transport._closed is True + assert transport._flushed.is_set() is True + + +def test_close_does_nothing_if_called_from_another_pid(waiting_httpserver, caplog, elasticapm_client): + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) + transport = Transport( + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers + ) + transport.start_thread() + + with mock.patch("os.getpid") as getpid_mock: + getpid_mock.return_value = 0 + transport.close() + time.sleep(0.1) + + assert transport._closed is False + + transport.close() + + +def test_close_can_be_called_multiple_times(waiting_httpserver, caplog, elasticapm_client): + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) + transport = Transport( + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers + ) + transport.start_thread() + + with caplog.at_level("INFO", logger="elasticapm.transport.http"): + transport.close() + + assert transport._closed is True + + transport.close() + + +def test_close_timeout_error_without_flushing(waiting_httpserver, caplog, elasticapm_client): + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) + + with caplog.at_level("INFO", logger="elasticapm.transport.http"): + with mock.patch.object(Transport, "_max_flush_time_seconds", 0): + with mock.patch.object(Transport, "_flush") as flush_mock: + # enough to take more that the timeout + flush_mock.side_effect = lambda: time.sleep(0.1) + transport = Transport( + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers + ) + transport.start_thread() + transport.close() + + assert transport._flushed.is_set() is False + assert transport._closed is True + record = caplog.records[-1] + assert "Closing the transport connection timed out." in record.msg + + +def test_http_pool_manager_is_recycled_at_stop_thread(waiting_httpserver, caplog, elasticapm_client): + elasticapm_client.server_version = (8, 0, 0) # avoid making server_info request + waiting_httpserver.serve_content(code=202, content="", headers={"Location": "http://example.com/foo"}) + transport = Transport( + waiting_httpserver.url, client=elasticapm_client, headers=elasticapm_client._transport._headers + ) + transport.start_thread() + pool_manager = transport.http + + with caplog.at_level("INFO", logger="elasticapm.transport.http"): + transport.stop_thread() + + assert transport._flushed.is_set() is True + assert pool_manager != transport._http + assert not caplog.records