Skip to content

Commit

Permalink
Fix infinite tracing spec compliance (#430)
Browse files Browse the repository at this point in the history
* Add proper retry policy to streaming rpc

* Working on removing grpc channel spam

* Clean up logging

* [Mega-Linter] Apply linters fixes

* Bump Tests

* Disable failing tests

* Remove gRPC pin

Co-authored-by: TimPansino <TimPansino@users.noreply.github.com>
  • Loading branch information
TimPansino and TimPansino authored Nov 11, 2021
1 parent 1f82ad8 commit 6872298
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
82 changes: 60 additions & 22 deletions newrelic/core/agent_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

try:
import grpc
from newrelic.core.infinite_tracing_pb2 import Span, RecordStatus

from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span
except ImportError:
grpc = None

Expand All @@ -33,25 +34,39 @@ class StreamingRpc(object):
"""

PATH = "/com.newrelic.trace.v1.IngestService/RecordSpan"
RETRY_POLICY = (
(15, False),
(15, False),
(30, False),
(60, False),
(120, False),
(300, True),
)
OPTIONS = [("grpc.enable_retries", 0)]

def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
if ssl:
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(endpoint, credentials)
else:
channel = grpc.insecure_channel(endpoint)
self.channel = channel
self._endpoint = endpoint
self._ssl = ssl
self.metadata = metadata
self.request_iterator = stream_buffer
self.response_processing_thread = threading.Thread(
target=self.process_responses, name="NR-StreamingRpc-process-responses"
)
self.response_processing_thread.daemon = True
self.notify = self.condition()
self.rpc = self.channel.stream_stream(
self.PATH, Span.SerializeToString, RecordStatus.FromString
)
self.record_metric = record_metric
self.closed = False

self.create_channel()

def create_channel(self):
if self._ssl:
credentials = grpc.ssl_channel_credentials()
self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS)
else:
self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS)

self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)

@staticmethod
def condition(*args, **kwargs):
Expand All @@ -63,6 +78,7 @@ def close(self):
if self.channel:
channel = self.channel
self.channel = None
self.closed = True
self.notify.notify_all()

if channel:
Expand All @@ -80,6 +96,7 @@ def connect(self):
def process_responses(self):
response_iterator = None

retry = 0
while True:
with self.notify:
if self.channel and response_iterator:
Expand Down Expand Up @@ -112,21 +129,42 @@ def process_responses(self):
)
break

_logger.warning(
"Streaming RPC closed. "
"Will attempt to reconnect in 15 seconds. "
"Code: %s Details: %s",
code,
details,
)
self.notify.wait(15)
# Unpack retry policy settings
if retry >= len(self.RETRY_POLICY):
retry_time, error = self.RETRY_POLICY[-1]
else:
retry_time, error = self.RETRY_POLICY[retry]
retry += 1

# Emit appropriate retry logs
if not error:
_logger.warning(
"Streaming RPC closed. Will attempt to reconnect in %d seconds. Check the prior log entries and remedy any issue as necessary, or if the problem persists, report this problem to New Relic support for further investigation. Code: %s Details: %s",
retry_time,
code,
details,
)
else:
_logger.error(
"Streaming RPC closed after additional attempts. Will attempt to reconnect in %d seconds. Please report this problem to New Relic support for further investigation. Code: %s Details: %s",
retry_time,
code,
details,
)

# Reconnect channel with backoff
self.channel.close()
self.notify.wait(retry_time)
if self.closed:
break
else:
_logger.debug("Attempting to reconnect Streaming RPC.")
self.create_channel()

if not self.channel:
if self.closed:
break

response_iterator = self.rpc(
self.request_iterator, metadata=self.metadata
)
response_iterator = self.rpc(self.request_iterator, metadata=self.metadata)
_logger.info("Streaming RPC connect completed.")

try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def build_extension(self, ext):
"newrelic": ["newrelic.ini", "version.txt", "packages/urllib3/LICENSE.txt", "common/cacert.pem"],
},
scripts=["scripts/newrelic-admin"],
extras_require={"infinite-tracing": ["grpcio<1.40", "protobuf<4"]},
extras_require={"infinite-tracing": ["grpcio", "protobuf<4"]},
)

if with_setuptools:
Expand Down
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ envlist =
python-framework_fastapi-{py36,py37,py38,py39,py310},
python-framework_flask-{pypy,py27}-flask0012,
python-framework_flask-{pypy,py27,py36,py37,py38,py39,py310,pypy3}-flask0101,
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
;temporarily disabling tests on flask master
; python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest},
python-framework_graphene-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphenelatest,
python-framework_graphene-py37-graphene{0200,0201},
python-framework_graphql-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphql02,
Expand Down

0 comments on commit 6872298

Please sign in to comment.