Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded log sender #106

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions journalpump/senders/aws_cloudwatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import LogSender, SenderInitializationError
from .base import ThreadedLogSender, SenderInitializationError


import boto3
import botocore
Expand All @@ -8,7 +9,7 @@
MAX_INIT_TRIES = 3


class AWSCloudWatchSender(LogSender):
class AWSCloudWatchSender(ThreadedLogSender):
def __init__(self, *, config, aws_cloudwatch_logs=None, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
self._logs = aws_cloudwatch_logs
Expand Down
100 changes: 64 additions & 36 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def add_item(self, *, item, cursor):
self.total_size += len(item)


class LogSender(Thread, Tagged):
class LogSender(Tagged):
def __init__(
self,
*,
Expand All @@ -81,9 +81,9 @@ def __init__(
tags=None,
msg_buffer_max_length=50000
):
Thread.__init__(self)
Tagged.__init__(self, tags, sender=name)
self.log = logging.getLogger("LogSender:{}".format(reader.name))
self._wait_for = 0.1
self.name = name
self.stats = stats
self.config = config
Expand All @@ -106,12 +106,14 @@ def __init__(
self._backoff_attempt = 0
self.log.info("Initialized %s", self.__class__.__name__)

def _backoff(self, *, base=0.5, cap=1800.0):
def _get_backoff_secs(self, *, base=0.5, cap=1000.0):
self._backoff_attempt += 1
t = min(cap, base * 2 ** self._backoff_attempt) / 2
t = random.random() * t + t
self.log.info("Sleeping for %.0f seconds", t)
time.sleep(t)
return t

def _backoff(self, *, base=0.5, cap=1800.0):
raise NotImplementedError()

def refresh_stats(self):
tags = self.make_tags()
Expand Down Expand Up @@ -192,48 +194,74 @@ def maintenance_operations(self):
# This can be overridden in the classes that inherit this
pass

def handle_maintenance_operations(self):
try:
# Don't run maintenance operations again immediately if it just failed
if self.last_maintenance_fail or time.monotonic() - self.last_maintenance_fail > 60:
self.maintenance_operations()
except Exception as ex: # pylint: disable=broad-except
self.log.error("Maintenance operation failed: %r", ex)
self.stats.unexpected_exception(ex=ex, where="maintenance_operation")
self.last_maintenance_fail = time.monotonic()

def should_try_sending_messages(self):
return len(self.msg_buffer) > 1000 or \
time.monotonic() - self.last_send_time > self.max_send_interval

def get_message_bodies_and_cursor(self):
messages = self.msg_buffer.get_items()
ret = []
while messages:
batch_size = len(messages[0][0]) + self.batch_message_overhead
index = 1
while index < len(messages):
item_size = len(messages[index][0]) + self.batch_message_overhead
if batch_size + item_size >= self.max_batch_size:
break
batch_size += item_size
index += 1

batch = messages[:index]
message_bodies = [m[0] for m in batch]
ret.append((message_bodies, batch[-1][1]))
del messages[:index]
return ret


class ThreadedLogSender(Thread, LogSender):
def __init__(self, **kw):
Thread.__init__(self)
LogSender.__init__(self, **kw)

def _backoff(self, *, base=0.5, cap=1800.0):
t = self._get_backoff_secs(base=base, cap=cap)
self.log.info("Sleeping for %.0f seconds", t)
time.sleep(t)

def run(self):
while self.running:
try:
# Don't run maintenance operations again immediately if it just failed
if not self.last_maintenance_fail or time.monotonic() - self.last_maintenance_fail > 60:
self.maintenance_operations()
except Exception as ex: # pylint: disable=broad-except
self.log.error("Maintenance operation failed: %r", ex)
self.stats.unexpected_exception(ex=ex, where="maintenance_operation")
self.last_maintenance_fail = time.monotonic()
if len(self.msg_buffer) > 1000 or \
time.monotonic() - self.last_send_time > self.max_send_interval:
self.handle_maintenance_operations()
if self.should_try_sending_messages():
self.get_and_send_messages()
else:
time.sleep(0.1)
time.sleep(self._wait_for)
self.log.info("Stopping")

def get_and_send_messages(self):
batches = self.get_message_bodies_and_cursor()
msg_count = sum(len(batch[0]) for batch in batches)
self.log.debug("Got %d items from msg_buffer", msg_count)
start_time = time.monotonic()
msg_count = None
try:
messages = self.msg_buffer.get_items()
msg_count = len(messages)
self.log.debug("Got %d items from msg_buffer", msg_count)

while self.running and messages:
batch_size = len(messages[0][0]) + self.batch_message_overhead
index = 1
while index < len(messages):
item_size = len(messages[index][0]) + self.batch_message_overhead
if batch_size + item_size >= self.max_batch_size:
break
batch_size += item_size
index += 1

messages_batch = messages[:index]
message_bodies = [m[0] for m in messages_batch]
if self.send_messages(messages=message_bodies, cursor=messages_batch[-1][1]):
messages = messages[index:]

# pop to get free up memory as soon as the send was successful
while batches:
batch = batches.pop(0)
# die retrying, backoff is part of sending mechanism
while self.running and not self.send_messages(messages=batch[0], cursor=batch[1]):
pass
self.log.debug("Sending %d msgs, took %.4fs", msg_count, time.monotonic() - start_time)
self.last_send_time = time.monotonic()
except Exception: # pylint: disable=broad-except
# there is already a broad except handler in send_messages, so why this ?
self.log.exception("Problem sending %r messages", msg_count)
self._backoff()
4 changes: 2 additions & 2 deletions journalpump/senders/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import LogSender
from .base import ThreadedLogSender
from io import BytesIO
from journalpump.util import default_json_serialization, get_requests_session

Expand All @@ -7,7 +7,7 @@
import time


class ElasticsearchSender(LogSender):
class ElasticsearchSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 10.0), **kwargs)
self.session_url = self.config.get("elasticsearch_url")
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .base import LogSender
from .base import ThreadedLogSender


class FileSender(LogSender):
class FileSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
self.mark_disconnected()
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/google_cloud_logging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import LogSender
from .base import ThreadedLogSender
from googleapiclient.discovery import build
from googleapiclient.errors import Error as GoogleApiClientError
from oauth2client.service_account import ServiceAccountCredentials
Expand All @@ -10,7 +10,7 @@
logging.getLogger("googleapiclient.discovery").setLevel(logging.WARNING)


class GoogleCloudLoggingSender(LogSender):
class GoogleCloudLoggingSender(ThreadedLogSender):
_SEVERITY_MAPPING = { # mapping from journald priority to cloud logging severity
7: "DEBUG",
6: "INFO",
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import LogSender
from .base import ThreadedLogSender
from kafka import errors, KafkaAdminClient, KafkaProducer
from kafka.admin import NewTopic

Expand All @@ -23,7 +23,7 @@
logging.getLogger("kafka").setLevel(logging.CRITICAL) # remove client-internal tracebacks from logging output


class KafkaSender(LogSender):
class KafkaSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
self.kafka_producer = None
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/logplex.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .base import LogSender
from .base import ThreadedLogSender
from journalpump.util import get_requests_session

import datetime
import json


class LogplexSender(LogSender):
class LogplexSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 5.0), **kwargs)
self.logplex_input_url = config["logplex_log_input_url"]
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/rsyslog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import LogSender
from .base import ThreadedLogSender
from journalpump.rsyslog import SyslogTcpClient

import json
Expand All @@ -8,7 +8,7 @@
RSYSLOG_CONN_ERRORS = (socket.timeout, ConnectionRefusedError)


class RsyslogSender(LogSender):
class RsyslogSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
self.rsyslog_client = None
Expand Down
10 changes: 5 additions & 5 deletions journalpump/senders/websocket.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import LogSender, MAX_KAFKA_MESSAGE_SIZE
from .base import ThreadedLogSender, MAX_KAFKA_MESSAGE_SIZE
from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
from aiohttp_socks.utils import Proxy
from concurrent.futures import CancelledError, TimeoutError as ConnectionTimeoutError
Expand Down Expand Up @@ -51,9 +51,9 @@ def __init__(
self.stop_event = asyncio.Event()
self.stopped_event = asyncio.Event()
self.running = True
# Send messages as batches, LogSender base class takes care of batch size.
# Send messages as batches, ThreadedLogSender base class takes care of batch size.
# If batching is disabled, do not adjust the max_batch_size member variable,
# so that LogSender will still give us multiple messages in a single send_messages() call.
# so that ThreadedLogSender will still give us multiple messages in a single send_messages() call.
self.batching_enabled = False
if max_batch_size > 0:
self.max_batch_size = max_batch_size
Expand Down Expand Up @@ -81,7 +81,7 @@ async def async_send(self, messages):
return False

if self.batching_enabled:
# LogSender has limited the batch size already
# ThreadedLogSender has limited the batch size already
batch = b"\x00".join(messages)
messages = [batch]

Expand Down Expand Up @@ -263,7 +263,7 @@ async def comms_channel_loop(self):
self.log.info("Websocket closed")


class WebsocketSender(LogSender):
class WebsocketSender(ThreadedLogSender):
def __init__(self, *, config, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 1.0), **kwargs)
self.runner = None
Expand Down