Skip to content

Commit

Permalink
Persist cache only after requests to NR returned OK.
Browse files Browse the repository at this point in the history
  • Loading branch information
asllop committed Jan 16, 2024
1 parent 4efa368 commit cb17f61
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 22 deletions.
36 changes: 30 additions & 6 deletions src/newrelic_logging/integration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys
from .http_session import new_retry_session
from .newrelic import NewRelic
from .salesforce import SalesForce, SalesforceApiException
from .salesforce import SalesForce, SalesforceApiException, DataCache
from .auth_env import AuthEnv
from enum import Enum

Expand Down Expand Up @@ -75,9 +75,9 @@ def run(self):
continue

if self.data_format == DataFormat.LOGS:
self.process_logs(logs, labels)
self.process_logs(logs, labels, client.data_cache)
else:
self.process_events(logs, labels)
self.process_events(logs, labels, client.data_cache)

def auth_and_fetch(self, retry, client, oauth_type, sfdc_session):
if not client.authenticate(oauth_type, sfdc_session):
Expand Down Expand Up @@ -109,7 +109,20 @@ def response_empty(logs):
return logs == None or (len(logs) == 1 and len(logs[0].get("log_entries", [])) == 0)

@staticmethod
def process_logs(logs, labels):
def persist_data(log_file_id, log_entries, data_cache: DataCache):
if log_file_id == '':
# Events
for log in log_entries:
log_id = log.get('attributes', {}).get('Id', '')
print(f"---> ID OF EVENT = {log_id}")
data_cache.persist_event(log_id)
else:
# Logs
print(f"---> ID OF LOG FILE = {log_file_id}")
data_cache.persist_logs(log_file_id)

@staticmethod
def process_logs(logs, labels, data_cache: DataCache):
nr_session = new_retry_session()
for log_file_obj in logs:
log_entries = log_file_obj['log_entries']
Expand All @@ -119,17 +132,23 @@ def process_logs(logs, labels):
payload = [{'common': labels, 'logs': log_entries}]
log_type = log_file_obj.get('log_type', '')
log_file_id = log_file_obj.get('Id', '')

status_code = NewRelic.post_logs(nr_session, payload)

#TODO: remove this fake response
status_code = 202

if status_code != 202:
print(f'newrelic logs api returned code- {status_code}')
else:
print(f"sent {len(log_entries)} log messages from log file {log_type}/{log_file_id}")
Integration.persist_data(log_file_id, log_entries, data_cache)

@staticmethod
def process_events(logs, labels):
def process_events(logs, labels, data_cache: DataCache):
nr_session = new_retry_session()
for log_file_obj in logs:
log_file_id = log_file_obj.get('Id', '')
log_entries = log_file_obj['log_entries']
if len(log_entries) == 0:
continue
Expand Down Expand Up @@ -166,9 +185,14 @@ def process_events(logs, labels):

for log_entries_slice in x:
status_code = NewRelic.post_events(nr_session, log_entries_slice)

#TODO: remove this fake response
status_code = 200

if status_code != 200:
print(f'newrelic events api returned code- {status_code}')
else:
log_type = log_file_obj.get('log_type', '')
log_file_id = log_file_obj.get('Id', '')
print(f"posted {len(log_entries_slice)} events from log file {log_type}/{log_file_id}")
Integration.persist_data(log_file_id, log_entries, data_cache)
56 changes: 40 additions & 16 deletions src/newrelic_logging/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ def get_instance_url(self) -> str:

# Local cache, to store data before sending it to Redis.
class DataCache:
redis_expire = None
redis = None

#TODO: local cache structure
#TODO: methods to send data to Redis
redis_expire = None
cached_events = {}
cached_logs = {}

def __init__(self) -> None:
pass
Expand All @@ -89,39 +88,65 @@ def __init__(self, redis, redis_expire) -> None:
self.redis_expire = redis_expire
self.redis = redis

def retrieve_cached_message_list(self, record_id):
def persist_logs(self, record_id: str) -> bool:
if self.redis:
if record_id in self.cached_logs:
for row_id in self.cached_logs[record_id]:
self.redis.rpush(record_id, row_id)
if row_id == 'init':
self.set_redis_expire(record_id)
del self.cached_logs[record_id]
return True
else:
return False
else:
return False

def persist_event(self, record_id: str) -> bool:
if self.redis:
if record_id in self.cached_events:
self.redis.set(record_id, '')
self.set_redis_expire(record_id)
del self.cached_events[record_id]
return True
else:
return False
else:
return False

def retrieve_cached_message_list(self, record_id: str):
if self.redis:
cache_key_exists = self.redis.exists(record_id)
if cache_key_exists:
cached_messages = self.redis.lrange(record_id, 0, -1)
return cached_messages
else:
self.redis.rpush(record_id, 'init')
self.set_redis_expire(record_id)
self.cached_logs[record_id] = ['init']
return None

def check_cached_id(self, record_id):
# Cache event
def check_cached_id(self, record_id: str):
if self.redis:
if self.redis.exists(record_id):
return True
else:
self.redis.set(record_id, '')
self.set_redis_expire(record_id)
self.cached_events[record_id] = ''
return False
else:
return False

# Cache log
def record_or_skip_row(self, record_id: str, row: dict, cached_messages: dict) -> bool:
row_id = row["REQUEST_ID"]
if self.redis:
row_id = row["REQUEST_ID"]
if cached_messages is not None:
row_id_b = row_id.encode('utf-8')
if row_id_b in cached_messages:
# print(f' debug: dropping message with REQUEST_ID: {row_id}')
return True
self.redis.rpush(record_id, row_id)
self.cached_logs[record_id].append(row_id)
else:
self.redis.rpush(record_id, row_id)
self.cached_logs[record_id].append(row_id)
return False

def set_redis_expire(self, key):
Expand Down Expand Up @@ -404,20 +429,19 @@ def parse_csv(self, download_response, record_id, record_event_type, cached_mess
return rows

def fetch_logs(self, session):
print(f"self.query_template = {self.query_template}")
print(f"Query object = {self.query_template}")

if type(self.query_template) is list:
# "query_template" contains a list of objects, each one is a Query object
queries = self.make_multiple_queries(copy.deepcopy(self.query_template))
response = self.fetch_logs_from_multiple_req(session, queries)
self.slide_time_range()
# TODO: send data from local cache to Redis
return response
else:
# "query_template" contains a string with the SOQL to run.
query = self.make_single_query(Query(self.query_template))
response = self.fetch_logs_from_single_req(session, query)
self.slide_time_range()
# TODO: send data from local cache to Redis
return response

def fetch_logs_from_multiple_req(self, session, queries: list[Query]):
Expand Down

0 comments on commit cb17f61

Please sign in to comment.