Skip to content

Commit

Permalink
Merge pull request #20 from newrelic/feat/redis-config-env
Browse files Browse the repository at this point in the history
feat: support reading redis config from env vars (optionally with prefix)
  • Loading branch information
asllop authored Feb 26, 2024
2 parents dfb2aa1 + f72e41f commit 0c795f7
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 253 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,7 @@ config.yml
logs/**

#scripts
run.sh
run.sh

#local files
*.local.*
6 changes: 2 additions & 4 deletions src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from apscheduler.schedulers.background import BlockingScheduler
from pytz import utc
from yaml import Loader, load
from newrelic_logging.env import get_var, var_exists
from newrelic_logging.config import getenv
from newrelic_logging.integration import Integration
from newrelic_logging.telemetry import print_info, print_warn

Expand Down Expand Up @@ -75,10 +75,8 @@ def main():
if not run_as_service:
if 'cron_interval_minutes' in config:
cron_interval = config['cron_interval_minutes']
elif var_exists("CRON_INTERVAL_MINUTES"):
cron_interval = int(get_var("CRON_INTERVAL_MINUTES"))
else:
cron_interval = 60
cron_interval = int(getenv("CRON_INTERVAL_MINUTES", 60))

integration = Integration(config, event_mapping, cron_interval)
integration.run()
Expand Down
157 changes: 157 additions & 0 deletions src/newrelic_logging/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import redis
from datetime import timedelta

from .config import Config
from .telemetry import print_err, print_info


CONFIG_CACHE_ENABLED = 'cache_enabled'
CONFIG_REDIS_HOST = 'redis.host'
CONFIG_REDIS_PORT = 'redis.port'
CONFIG_REDIS_DB_NUMBER = 'redis.db_number'
CONFIG_REDIS_PASSWORD = 'redis.password'
CONFIG_REDIS_USE_SSL = 'redis.ssl'
CONFIG_REDIS_EXPIRE_DAYS = 'redis.expire_days'
DEFAULT_CACHE_ENABLED = False
DEFAULT_REDIS_HOST = 'localhost'
DEFAULT_REDIS_PORT = 6379
DEFAULT_REDIS_DB_NUMBER = 0
DEFAULT_REDIS_EXPIRE_DAYS = 2
DEFAULT_REDIS_SSL = False


# Local cache, to store data before sending it to Redis.
class DataCache:
redis = None
redis_expire = None
cached_events = {}
cached_logs = {}

def __init__(self, redis, redis_expire) -> None:
self.redis = redis
self.redis_expire = redis_expire

def set_redis_expire(self, key):
try:
self.redis.expire(key, timedelta(days=self.redis_expire))
except Exception as e:
print_err(f"Failed setting expire time for key {key}: {e}")
exit(1)

def persist_logs(self, record_id: str) -> bool:
if record_id in self.cached_logs:
for row_id in self.cached_logs[record_id]:
try:
self.redis.rpush(record_id, row_id)
except Exception as e:
print_err(f"Failed pushing record {record_id}: {e}")
exit(1)
# Set expire date for the whole list only once, when it find the first entry ('init')
if row_id == 'init':
self.set_redis_expire(record_id)
del self.cached_logs[record_id]
return True
else:
return False

def persist_event(self, record_id: str) -> bool:
if record_id in self.cached_events:
try:
self.redis.set(record_id, '')
except Exception as e:
print_err(f"Failed setting record {record_id}: {e}")
exit(1)
self.set_redis_expire(record_id)
del self.cached_events[record_id]
return True
else:
return False

def can_skip_downloading_record(self, record_id: str) -> bool:
try:
does_exist = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)
if does_exist:
try:
return self.redis.llen(record_id) > 1
except Exception as e:
print_err(f"Failed checking len for record {record_id}: {e}")
exit(1)

return False

def retrieve_cached_message_list(self, record_id: str):
try:
cache_key_exists = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)

if cache_key_exists:
try:
cached_messages = self.redis.lrange(record_id, 0, -1)
except Exception as e:
print_err(f"Failed getting list range for record {record_id}: {e}")
exit(1)
return cached_messages
else:
self.cached_logs[record_id] = ['init']

return None

# Cache event
def check_cached_id(self, record_id: str):
try:
does_exist = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)

if does_exist:
return True
else:
self.cached_events[record_id] = ''
return False

# Cache log
def record_or_skip_row(self, record_id: str, row: dict, cached_messages: dict) -> bool:
row_id = row["REQUEST_ID"]

if cached_messages is not None:
row_id_b = row_id.encode('utf-8')
if row_id_b in cached_messages:
return True
self.cached_logs[record_id].append(row_id)
else:
self.cached_logs[record_id].append(row_id)

return False


def make_cache(config: Config):
if config.get_bool(CONFIG_CACHE_ENABLED, DEFAULT_CACHE_ENABLED):
host = config.get(CONFIG_REDIS_HOST, DEFAULT_REDIS_HOST)
port = config.get_int(CONFIG_REDIS_PORT, DEFAULT_REDIS_PORT)
db = config.get_int(CONFIG_REDIS_DB_NUMBER, DEFAULT_REDIS_DB_NUMBER)
password = config.get(CONFIG_REDIS_PASSWORD)
ssl = config.get_bool(CONFIG_REDIS_USE_SSL, DEFAULT_REDIS_SSL)
expire_days = config.get_int(CONFIG_REDIS_EXPIRE_DAYS)
password_display = "XXXXXX" if password != None else None

print_info(
f'cache enabled, connecting to redis instance {host}:{port}:{db}, ssl={ssl}, password={password_display}'
)

return DataCache(redis.Redis(
host=host,
port=port,
db=db,
password=password,
ssl=ssl
), expire_days)

print_info('cache disabled')

return None
90 changes: 90 additions & 0 deletions src/newrelic_logging/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import re
from types import SimpleNamespace
from typing import Any


BOOL_TRUE_VALS = ['true', '1', 'on', 'yes']
NOT_FOUND = SimpleNamespace()


def _get_nested_helper(val: Any, arr: list[str] = [], index: int = 0) -> Any:
if index == len(arr):
return NOT_FOUND
elif type(val) is dict:
key = arr[index]
if index == len(arr) - 1:
return val[key] if key in val else NOT_FOUND
return _get_nested_helper(val[key], arr, index + 1) if key in val else NOT_FOUND
elif type(val) is list:
key = arr[index]
if type(key) is int and key >= 0:
if index == len(arr) - 1:
return val[key] if key < len(val) else NOT_FOUND
return _get_nested_helper(val[key], arr, index + 1) if key < len(val) else NOT_FOUND

return NOT_FOUND


def get_nested(d: dict, path: str) -> Any:
return _get_nested_helper(d, path.split('.'))


def getenv(var_name, default = None, prefix = ''):
return os.environ.get(prefix + var_name, default)


def tobool(s):
if s == None:
return False
elif type(s) == bool:
return s
elif type(s) == str:
if s.lower() in BOOL_TRUE_VALS:
return True
return False

return bool(s)


class Config:
def __init__(self, config: dict, prefix: str):
self.config = config
self.prefix = prefix

def __getitem__(self, key):
return self.config[key]

def __setitem__(self, key, value):
self.config[key] = value

def __len__(self):
return len(self.config)

def __contains__(self, key):
return key in self.config

def set_prefix(self, prefix: str) -> None:
self.prefix = prefix

def getenv(self, env_var_name: str, default = None) -> str:
return getenv(env_var_name, default, self.prefix)

def get(self, key: str, default = None) -> Any:
val = get_nested(self.config, key)
if not val == NOT_FOUND:
return val

env_var_name = re.sub(r'[^a-zA-Z0-9_]', '_', key.upper())
val = self.getenv(env_var_name, default)
if not val is False and not val is None:
return val

return default

def get_int(self, key: str, default = None) -> int:
val = self.get(key, default)
return int(val) if val else val

def get_bool(self, key: str, default = None) -> bool:
return tobool(self.get(key, default))
Loading

0 comments on commit 0c795f7

Please sign in to comment.