diff --git a/README.md b/README.md index 53825e2..b12d3b5 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,7 @@ Queries for `EventLogFile` requiere the following fields to be present: - `LogDate` - `LogFile` -For queries of other events only the `Id` field is requiered. +For queries of other event types there is no minimum set of attributes requiered, but they will only be cached (when `cache_enabled` is `True`) if `Id` is present. ## Usage diff --git a/config.yml.sample b/config.yml.sample index 2f104d2..258a7eb 100644 --- a/config.yml.sample +++ b/config.yml.sample @@ -38,6 +38,15 @@ queries: [ rename_timestamp: actualTimestamp, api_ver: "58.0" }, + { + query: "SELECT EventName, EventType, UsageType, Client, Value, StartDate, EndDate FROM PlatformEventUsageMetric WHERE TimeSegment='FifteenMinutes' AND StartDate >= {start_date} AND EndDate <= {now}", + env: { + now: "now()", + start_date: "now(timedelta(minutes=-60))" + }, + api_ver: "58.0", + timestamp_attr: StartDate, + } ] newrelic: data_format: "events" diff --git a/src/newrelic_logging/auth_env.py b/src/newrelic_logging/auth_env.py index 84258d9..bee6c57 100644 --- a/src/newrelic_logging/auth_env.py +++ b/src/newrelic_logging/auth_env.py @@ -57,4 +57,21 @@ def get(self, var_name, default): # Can raise exception return os.environ[var_name] else: - return os.environ.get(var_name, default) \ No newline at end of file + return os.environ.get(var_name, default) + +class Auth: + access_token = None + instance_url = None + # Never used, maybe in the future + token_type = None + + def __init__(self, access_token: str, instance_url: str, token_type: str) -> None: + self.access_token = access_token + self.instance_url = instance_url + self.token_type = token_type + + def get_access_token(self) -> str: + return self.access_token + + def get_instance_url(self) -> str: + return self.instance_url \ No newline at end of file diff --git a/src/newrelic_logging/integration.py b/src/newrelic_logging/integration.py index 9b0dd6e..93e6e29 100644 --- a/src/newrelic_logging/integration.py +++ b/src/newrelic_logging/integration.py @@ -115,11 +115,11 @@ def cache_processed_data(log_file_id, log_entries, data_cache: DataCache): # Events for log in log_entries: log_id = log.get('attributes', {}).get('Id', '') - print(f"---> ID OF EVENT = {log_id}") + #print(f"---> ID OF EVENT = {log_id}") data_cache.persist_event(log_id) else: # Logs - print(f"---> ID OF LOG FILE = {log_file_id}") + #print(f"---> ID OF LOG FILE = {log_file_id}") data_cache.persist_logs(log_file_id) @staticmethod diff --git a/src/newrelic_logging/query.py b/src/newrelic_logging/query.py new file mode 100644 index 0000000..d3799f1 --- /dev/null +++ b/src/newrelic_logging/query.py @@ -0,0 +1,22 @@ +class Query: + query = None + env = None + + def __init__(self, query) -> None: + if type(query) == dict: + self.query = query.get("query", "") + query.pop('query', None) + self.env = query + elif type(query) == str: + self.query = query + self.env = {} + + def get_query(self) -> str: + return self.query + + def set_query(self, query: str) -> None: + self.query = query + + def get_env(self) -> dict: + return self.env + \ No newline at end of file diff --git a/src/newrelic_logging/query_env.py b/src/newrelic_logging/query_env.py new file mode 100644 index 0000000..c5fb13d --- /dev/null +++ b/src/newrelic_logging/query_env.py @@ -0,0 +1,31 @@ +from .query import Query +import string + +def sandbox(code): + __import__ = None + __loader__ = None + __build_class__ = None + exec = None + + from datetime import datetime, timedelta + + def sf_time(t: datetime): + return t.isoformat(timespec='milliseconds') + "Z" + + def now(delta: timedelta = None): + if delta: + return sf_time(datetime.utcnow() + delta) + else: + return sf_time(datetime.utcnow()) + + try: + return eval(code) + except Exception as e: + return e + +def substitute(args: dict, query_template: str, env: dict) -> str: + for key, command in env.items(): + args[key] = sandbox(command) + for key, val in args.items(): + query_template = query_template.replace('{' + key + '}', val) + return query_template diff --git a/src/newrelic_logging/salesforce.py b/src/newrelic_logging/salesforce.py index 3a01720..827ae6d 100644 --- a/src/newrelic_logging/salesforce.py +++ b/src/newrelic_logging/salesforce.py @@ -9,6 +9,9 @@ import redis from requests import RequestException import copy +from .query_env import substitute +from .auth_env import Auth +from .query import Query class LoginException(Exception): pass @@ -35,45 +38,6 @@ def base64_url_encode(json_obj): encoded_str = str(encoded_bytes, 'utf-8') return encoded_str -class Query: - query = None - env = None - - def __init__(self, query) -> None: - if type(query) == dict: - self.query = query.get("query", "") - query.pop('query', None) - self.env = query - elif type(query) == str: - self.query = query - self.env = {} - - def get_query(self) -> str: - return self.query - - def set_query(self, query: str) -> None: - self.query = query - - def get_env(self) -> dict: - return self.env - -class Auth: - access_token = None - instance_url = None - # Never used, maybe in the future - token_type = None - - def __init__(self, access_token: str, instance_url: str, token_type: str) -> None: - self.access_token = access_token - self.instance_url = instance_url - self.token_type = token_type - - def get_access_token(self) -> str: - return self.access_token - - def get_instance_url(self) -> str: - return self.instance_url - # Local cache, to store data before sending it to Redis. class DataCache: redis = None @@ -371,10 +335,16 @@ def make_single_query(self, query_obj: Query) -> Query: to_timestamp = (datetime.utcnow() - timedelta(minutes=self.time_lag_minutes)).isoformat( timespec='milliseconds') + "Z" from_timestamp = self.last_to_timestamp - query_template = query_obj.get_query() - query = query_template.format(to_timestamp=to_timestamp, from_timestamp=from_timestamp, - log_interval_type=self.generation_interval) + + env = copy.deepcopy(query_obj.get_env().get('env', {})) + args = { + 'to_timestamp': to_timestamp, + 'from_timestamp': from_timestamp, + 'log_interval_type': self.generation_interval + } + query = substitute(args, query_obj.get_query(), env) query = query.replace(' ', '+') + query_obj.set_query(query) return query_obj @@ -495,11 +465,11 @@ def build_log_from_event(self, records, query: Query): def pack_event_into_log(self, rows, query: Query): log_entries = [] for row in rows: - record_id = row['Id'] - - if self.data_cache.check_cached_id(record_id): - # Record cached, skip it - continue + if 'Id' in row: + record_id = row['Id'] + if self.data_cache.check_cached_id(record_id): + # Record cached, skip it + continue timestamp_attr = query.get_env().get("timestamp_attr", "CreatedDate") if timestamp_attr in row: