Skip to content

Commit

Permalink
feat: support api pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
sdewitt-newrelic committed Jul 2, 2024
1 parent 584f08a commit 83931b6
Show file tree
Hide file tree
Showing 9 changed files with 966 additions and 163 deletions.
14 changes: 12 additions & 2 deletions src/newrelic_logging/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,25 @@ def query(self, session: Session, soql: str, api_ver: str = None) -> dict:
if not api_ver is None:
ver = api_ver

# @TODO handle pagination

return get(
self.authenticator,
session,
f'/services/data/v{ver}/query?q={soql}',
lambda response : response.json()
)

def query_more(
self,
session: Session,
next_records_url: str,
) -> dict:
return get(
self.authenticator,
session,
next_records_url,
lambda response : response.json()
)

def get_log_file(
self,
session: Session,
Expand Down
46 changes: 44 additions & 2 deletions src/newrelic_logging/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,25 @@

from ..api import Api
from ..config import Config
from ..telemetry import print_info
from ..telemetry import print_info, print_warn
from ..util import \
get_iso_date_with_offset, \
substitute


def is_valid_records_response(response: dict) -> bool:
return response is not None and \
'records' in response and \
isinstance(response['records'], list)


def has_more_records(response: dict) -> bool:
return 'done' in response and \
'nextRecordsUrl' in response and \
not response['done'] and \
not response['nextRecordsUrl'] == ''


class Query:
def __init__(
self,
Expand All @@ -34,7 +47,36 @@ def execute(
session: Session,
):
print_info(f'Running query {self.query}...')
return self.api.query(session, self.query, self.api_ver)
response = self.api.query(session, self.query, self.api_ver)

if not is_valid_records_response(response):
print_warn(f'no records returned for query {self.query}')
return

done = False
while not done:
yield from response['records']

if not has_more_records(response):
done = True
continue

next_records_url = response['nextRecordsUrl']

print_info(
f'Retrieving more query results using {next_records_url}...'
)

response = self.api.query_more(
session,
next_records_url,
)

if not is_valid_records_response(response):
print_warn(
f'no records returned for next records URL {next_records_url}'
)
done = True


class QueryFactory:
Expand Down
29 changes: 15 additions & 14 deletions src/newrelic_logging/query/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
get_iso_date_with_offset, \
get_log_line_timestamp, \
is_logfile_response, \
process_query_result
process_query_result, \
regenerator


DEFAULT_CHUNK_SIZE = 4096
Expand Down Expand Up @@ -292,10 +293,10 @@ def process_log_record(
def process_query_records(
self,
query: Query,
records: list[dict],
iter,
):
return transform_query_records(
records,
iter,
query,
self.data_cache,
)
Expand All @@ -304,10 +305,16 @@ def process_records(
self,
session: Session,
query: Query,
records: list[dict],
iter,
):
if is_logfile_response(records):
for record in records:
first = next(iter, None)
if first is None:
return

reiter = regenerator([first], iter)

if is_logfile_response(first):
for record in reiter:
if 'LogFile' in record:
yield from self.process_log_record(
session,
Expand All @@ -320,7 +327,7 @@ def process_records(

return

yield from self.process_query_records(query, records)
yield from self.process_query_records(query, reiter)

# Flush the cache
if self.data_cache:
Expand All @@ -347,16 +354,10 @@ def execute(
self.generation_interval,
)

response = query.execute(session)

if not response or not 'records' in response:
print_warn(f'no records returned for query {query.query}')
continue

yield from self.process_records(
session,
query,
response['records'],
query.execute(session),
)

self.slide_time_range()
Expand Down
10 changes: 6 additions & 4 deletions src/newrelic_logging/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
PRIMITIVE_TYPES = (str, int, float, bool, type(None))


def is_logfile_response(records):
if len(records) > 0:
return 'LogFile' in records[0]
def is_logfile_response(record):
return 'LogFile' in record

return True

def regenerator(items: list[Any], itr):
yield from items
yield from itr


def generate_record_id(id_keys: list[str], record: dict) -> str:
Expand Down
23 changes: 20 additions & 3 deletions src/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
self.limits_result = limits_result
self.soql = None
self.query_api_ver = None
self.next_records_url = None
self.limits_api_ver = None
self.log_file_path = None
self.chunk_size = None
Expand All @@ -65,6 +66,21 @@ def query(self, session: Session, soql: str, api_ver: str = None) -> dict:

return self.query_result

def query_more(
self,
session: Session,
next_records_url: str,
) -> dict:
self.next_records_url = next_records_url

if self.raise_error:
raise SalesforceApiException()

if self.raise_login_error:
raise LoginException()

return self.query_result[next_records_url]

def get_log_file(
self,
session: Session,
Expand Down Expand Up @@ -242,7 +258,7 @@ def __init__(
self.raise_error = raise_error
self.wrapped = None
if 'results' in config:
self.result = { 'records': config['results'] }
self.result = config['results']
elif wrapped:
self.wrapped = wrapped
else:
Expand Down Expand Up @@ -270,9 +286,10 @@ def execute(self, session: Session = None):
self.executed = True

if self.wrapped:
return self.wrapped.execute(session)
yield from self.wrapped.execute(session)
return

return self.result
yield from self.result


class QueryFactoryStub:
Expand Down
122 changes: 122 additions & 0 deletions src/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,128 @@ def test_query_raises_salesforce_exception_if_get_does(self):
self.assertEqual(session.headers['Authorization'], 'Bearer 123456')
self.assertFalse(session.stream)

def test_query_more_requests_correct_url_with_access_token_and_returns_json_response_on_success(self):
'''
query_more() calls the correct next records URL with the access token and returns a JSON response when no errors occur
given: an authenticator
and given: an api version
and given: a session
and given: a next records URL
when: query_more() is called
then: session.get() is called with correct URL and access token
and: stream is set to False
and when: session.get() response status code is 200
then: returns the JSON response
'''

# setup
auth = AuthenticatorStub(
instance_url='https://my.salesforce.test',
access_token='123456',
)
session = SessionStub()
session.response = ResponseStub(200, 'OK', '{"foo": "bar"}', [] )

# execute
sf_api = api.Api(auth, '55.0')
resp = sf_api.query_more(
session,
'/services/data/v55.0/query/01gRO0000016PIAYA2-500'
)

# verify

self.assertEqual(
session.url,
f'https://my.salesforce.test/services/data/v55.0/query/01gRO0000016PIAYA2-500',
)
self.assertTrue('Authorization' in session.headers)
self.assertEqual(session.headers['Authorization'], 'Bearer 123456')
self.assertFalse(session.stream)
self.assertIsNotNone(resp)
self.assertTrue(type(resp) is dict)
self.assertTrue('foo' in resp)
self.assertEqual(resp['foo'], 'bar')

def test_query_more_raises_login_exception_if_get_does(self):
'''
query_more() calls the correct next records URL with the access token and raises LoginException if get does
given: an authenticator
and given: an api version
and given: a session
and given: a next records URL
when: query_more() is called
then: session.get() is called with correct URL and access token
and: stream is set to False
and when: session.get() response status code is 401
and when: authenticator raises a LoginException
then: query_more() raises a LoginException
'''

# setup
auth = AuthenticatorStub(
instance_url='https://my.salesforce.test',
access_token='123456',
raise_login_error=True
)
session = SessionStub()
session.response = ResponseStub(401, 'Unauthorized', '{"foo": "bar"}', [] )

# execute / verify
with self.assertRaises(LoginException) as _:
sf_api = api.Api(auth, '55.0')
_ = sf_api.query_more(
session,
'/services/data/v55.0/query/01gRO0000016PIAYA2-500',
)

self.assertEqual(
session.url,
f'https://my.salesforce.test/services/data/v55.0/query/01gRO0000016PIAYA2-500',
)
self.assertTrue('Authorization' in session.headers)
self.assertEqual(session.headers['Authorization'], 'Bearer 123456')
self.assertFalse(session.stream)

def test_query_raises_salesforce_exception_if_get_does(self):
'''
query_more() calls the correct next records URL with the access token and raises SalesforceApiException if get does
given: an authenticator
and given: an api version
and given: a session
and given: a next records URL
when: query_more() is called
then: session.get() is called with correct URL and access token
and: stream is set to False
and when: session.get() response status code is not 200 or 401
and when: get() raises a SalesforceApiException
then: query_more() raises a SalesforceApiException
'''

# setup
auth = AuthenticatorStub(
instance_url='https://my.salesforce.test',
access_token='123456',
)
session = SessionStub()
session.response = ResponseStub(500, 'ServerError', '{"foo": "bar"}', [] )

# execute / verify
with self.assertRaises(SalesforceApiException) as _:
sf_api = api.Api(auth, '55.0')
_ = sf_api.query_more(
session,
'/services/data/v55.0/query/01gRO0000016PIAYA2-500',
)

self.assertEqual(
session.url,
f'https://my.salesforce.test/services/data/v55.0/query/01gRO0000016PIAYA2-500',
)
self.assertTrue('Authorization' in session.headers)
self.assertEqual(session.headers['Authorization'], 'Bearer 123456')
self.assertFalse(session.stream)

def test_get_log_file_requests_correct_url_with_access_token_and_returns_generator_on_success(self):
'''
get_log_file() calls the correct url with the access token and returns a generator iterator
Expand Down
Loading

0 comments on commit 83931b6

Please sign in to comment.