Skip to content

Commit

Permalink
Implement generic env vars for queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
asllop committed Jan 16, 2024
1 parent 3251d0d commit 0befcbf
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Queries for `EventLogFile` requiere the following fields to be present:
- `LogDate`
- `LogFile`

For queries of other events only the `Id` field is requiered.
For queries of other event types there is no minimum set of attributes requiered, but they will only be cached (when `cache_enabled` is `True`) if `Id` is present.

## Usage

Expand Down
9 changes: 9 additions & 0 deletions config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ queries: [
rename_timestamp: actualTimestamp,
api_ver: "58.0"
},
{
query: "SELECT EventName, EventType, UsageType, Client, Value, StartDate, EndDate FROM PlatformEventUsageMetric WHERE TimeSegment='FifteenMinutes' AND StartDate >= {start_date} AND EndDate <= {now}",
env: {
now: "now()",
start_date: "now(timedelta(minutes=-60))"
},
api_ver: "58.0",
timestamp_attr: StartDate,
}
]
newrelic:
data_format: "events"
Expand Down
19 changes: 18 additions & 1 deletion src/newrelic_logging/auth_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,21 @@ def get(self, var_name, default):
# Can raise exception
return os.environ[var_name]
else:
return os.environ.get(var_name, default)
return os.environ.get(var_name, default)

class Auth:
access_token = None
instance_url = None
# Never used, maybe in the future
token_type = None

def __init__(self, access_token: str, instance_url: str, token_type: str) -> None:
self.access_token = access_token
self.instance_url = instance_url
self.token_type = token_type

def get_access_token(self) -> str:
return self.access_token

def get_instance_url(self) -> str:
return self.instance_url
4 changes: 2 additions & 2 deletions src/newrelic_logging/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ def cache_processed_data(log_file_id, log_entries, data_cache: DataCache):
# Events
for log in log_entries:
log_id = log.get('attributes', {}).get('Id', '')
print(f"---> ID OF EVENT = {log_id}")
#print(f"---> ID OF EVENT = {log_id}")
data_cache.persist_event(log_id)
else:
# Logs
print(f"---> ID OF LOG FILE = {log_file_id}")
#print(f"---> ID OF LOG FILE = {log_file_id}")
data_cache.persist_logs(log_file_id)

@staticmethod
Expand Down
22 changes: 22 additions & 0 deletions src/newrelic_logging/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class Query:
query = None
env = None

def __init__(self, query) -> None:
if type(query) == dict:
self.query = query.get("query", "")
query.pop('query', None)
self.env = query
elif type(query) == str:
self.query = query
self.env = {}

def get_query(self) -> str:
return self.query

def set_query(self, query: str) -> None:
self.query = query

def get_env(self) -> dict:
return self.env

31 changes: 31 additions & 0 deletions src/newrelic_logging/query_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .query import Query
import string

def sandbox(code):
__import__ = None
__loader__ = None
__build_class__ = None
exec = None

from datetime import datetime, timedelta

def sf_time(t: datetime):
return t.isoformat(timespec='milliseconds') + "Z"

def now(delta: timedelta = None):
if delta:
return sf_time(datetime.utcnow() + delta)
else:
return sf_time(datetime.utcnow())

try:
return eval(code)
except Exception as e:
return e

def substitute(args: dict, query_template: str, env: dict) -> str:
for key, command in env.items():
args[key] = sandbox(command)
for key, val in args.items():
query_template = query_template.replace('{' + key + '}', val)
return query_template
64 changes: 17 additions & 47 deletions src/newrelic_logging/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import redis
from requests import RequestException
import copy
from .query_env import substitute
from .auth_env import Auth
from .query import Query

class LoginException(Exception):
pass
Expand All @@ -35,45 +38,6 @@ def base64_url_encode(json_obj):
encoded_str = str(encoded_bytes, 'utf-8')
return encoded_str

class Query:
query = None
env = None

def __init__(self, query) -> None:
if type(query) == dict:
self.query = query.get("query", "")
query.pop('query', None)
self.env = query
elif type(query) == str:
self.query = query
self.env = {}

def get_query(self) -> str:
return self.query

def set_query(self, query: str) -> None:
self.query = query

def get_env(self) -> dict:
return self.env

class Auth:
access_token = None
instance_url = None
# Never used, maybe in the future
token_type = None

def __init__(self, access_token: str, instance_url: str, token_type: str) -> None:
self.access_token = access_token
self.instance_url = instance_url
self.token_type = token_type

def get_access_token(self) -> str:
return self.access_token

def get_instance_url(self) -> str:
return self.instance_url

# Local cache, to store data before sending it to Redis.
class DataCache:
redis = None
Expand Down Expand Up @@ -371,10 +335,16 @@ def make_single_query(self, query_obj: Query) -> Query:
to_timestamp = (datetime.utcnow() - timedelta(minutes=self.time_lag_minutes)).isoformat(
timespec='milliseconds') + "Z"
from_timestamp = self.last_to_timestamp
query_template = query_obj.get_query()
query = query_template.format(to_timestamp=to_timestamp, from_timestamp=from_timestamp,
log_interval_type=self.generation_interval)

env = copy.deepcopy(query_obj.get_env().get('env', {}))
args = {
'to_timestamp': to_timestamp,
'from_timestamp': from_timestamp,
'log_interval_type': self.generation_interval
}
query = substitute(args, query_obj.get_query(), env)
query = query.replace(' ', '+')

query_obj.set_query(query)
return query_obj

Expand Down Expand Up @@ -495,11 +465,11 @@ def build_log_from_event(self, records, query: Query):
def pack_event_into_log(self, rows, query: Query):
log_entries = []
for row in rows:
record_id = row['Id']

if self.data_cache.check_cached_id(record_id):
# Record cached, skip it
continue
if 'Id' in row:
record_id = row['Id']
if self.data_cache.check_cached_id(record_id):
# Record cached, skip it
continue

timestamp_attr = query.get_env().get("timestamp_attr", "CreatedDate")
if timestamp_attr in row:
Expand Down

0 comments on commit 0befcbf

Please sign in to comment.