Skip to content

Commit

Permalink
expunge homegrown logger in favour of python standard logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Aug 9, 2024
1 parent 5ad98aa commit 38ab26d
Show file tree
Hide file tree
Showing 35 changed files with 289 additions and 469 deletions.
19 changes: 11 additions & 8 deletions countess/core/cmd.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import logging
import logging.handlers
import multiprocessing
import sys
import time

from .config import read_config
from .logger import ConsoleLogger

logging_queue: multiprocessing.Queue = multiprocessing.Queue()
logging.getLogger().addHandler(logging.handlers.QueueHandler(logging_queue))
logging.getLogger().setLevel(logging.INFO)

logging.handlers.QueueListener(logging_queue, logging.StreamHandler())

start_time = time.time()


def process_ini(config_filename):
logger = ConsoleLogger()

graph = read_config(
config_filename,
logger=logger,
)
graph.run(logger)
graph = read_config(config_filename)
graph.run()


def run(argv):
Expand Down
2 changes: 0 additions & 2 deletions countess/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import sys
from configparser import ConfigParser

from countess.core.logger import ConsoleLogger, Logger
from countess.core.pipeline import PipelineGraph, PipelineNode
from countess.core.plugins import load_plugin

Expand Down Expand Up @@ -59,7 +58,6 @@ def read_config_dict(name: str, base_dir: str, config_dict: dict) -> PipelineNod

def read_config(
filename: str,
logger: Logger = ConsoleLogger(),
) -> PipelineGraph:
"""Reads `filenames` and returns a PipelineGraph"""

Expand Down
86 changes: 0 additions & 86 deletions countess/core/logger.py

This file was deleted.

52 changes: 27 additions & 25 deletions countess/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import logging
import re
import time
from queue import Empty, Queue
from threading import Thread
from typing import Any, Optional

from countess.core.logger import Logger
from countess.core.plugins import BasePlugin, FileInputPlugin, ProcessPlugin, get_plugin_classes

PRERUN_ROW_LIMIT = 100000

logger = logging.getLogger(__name__)


class SentinelQueue(Queue):

Expand Down Expand Up @@ -117,23 +119,23 @@ def finish_output(self):
for queue in self.output_queues:
queue.finish()

def run_multithread(self, queue: SentinelQueue, name: str, logger: Logger, row_limit: Optional[int] = None):
def run_multithread(self, queue: SentinelQueue, name: str, row_limit: Optional[int] = None):
assert isinstance(self.plugin, ProcessPlugin)
for data_in in queue:
self.counter_in += 1
self.plugin.preprocess(data_in, name, logger)
self.queue_output(self.plugin.process(data_in, name, logger))
self.plugin.preprocess(data_in, name)
self.queue_output(self.plugin.process(data_in, name))

def run_subthread(self, queue: SentinelQueue, name: str, logger: Logger, row_limit: Optional[int] = None):
def run_subthread(self, queue: SentinelQueue, name: str, row_limit: Optional[int] = None):
assert isinstance(self.plugin, ProcessPlugin)

for data_in in queue:
self.counter_in += 1
self.plugin.preprocess(data_in, name, logger)
self.queue_output(self.plugin.process(data_in, name, logger))
self.queue_output(self.plugin.finished(name, logger))
self.plugin.preprocess(data_in, name)
self.queue_output(self.plugin.process(data_in, name))
self.queue_output(self.plugin.finished(name))

def run_thread(self, logger: Logger, row_limit: Optional[int] = None):
def run_thread(self, row_limit: Optional[int] = None):
"""For each PipelineNode, this is run in its own thread."""
assert isinstance(self.plugin, (ProcessPlugin, FileInputPlugin))

Expand All @@ -146,15 +148,15 @@ def run_thread(self, logger: Logger, row_limit: Optional[int] = None):
only_parent_node = list(self.parent_nodes)[0]
only_parent_queue = only_parent_node.add_output_queue()
subthreads = [
Thread(target=self.run_multithread, args=(only_parent_queue, only_parent_node.name, logger, row_limit))
Thread(target=self.run_multithread, args=(only_parent_queue, only_parent_node.name, row_limit))
for _ in range(0, 4)
]
for subthread in subthreads:
subthread.start()
for subthread in subthreads:
subthread.join()

self.queue_output(self.plugin.finished(only_parent_node.name, logger))
self.queue_output(self.plugin.finished(only_parent_node.name))

elif len(self.parent_nodes) > 1:
assert isinstance(self.plugin, ProcessPlugin)
Expand All @@ -163,7 +165,7 @@ def run_thread(self, logger: Logger, row_limit: Optional[int] = None):
subthreads = [
Thread(
target=self.run_subthread,
args=(parent_node.add_output_queue(), parent_node.name, logger, row_limit),
args=(parent_node.add_output_queue(), parent_node.name, row_limit),
)
for parent_node in self.parent_nodes
]
Expand All @@ -172,38 +174,38 @@ def run_thread(self, logger: Logger, row_limit: Optional[int] = None):
for subthread in subthreads:
subthread.join()

self.queue_output(self.plugin.finalize(logger))
self.queue_output(self.plugin.finalize())
self.finish_output()

def load_config(self, logger: Logger):
def load_config(self):
assert isinstance(self.plugin, BasePlugin)
if self.config:
for key, val, base_dir in self.config:
try:
self.plugin.set_parameter(key, val, base_dir)
except (KeyError, ValueError):
logger.warning(f"Parameter {key}={val} Not Found")
logger.warning("Parameter %s=%s Not Found", key, val)
self.config = None

def prerun(self, logger: Logger, row_limit=PRERUN_ROW_LIMIT):
def prerun(self, row_limit=PRERUN_ROW_LIMIT):
if not self.plugin:
return
self.load_config(logger)
self.load_config()
if self.is_dirty:
assert isinstance(self.plugin, (ProcessPlugin, FileInputPlugin))
self.result = []
self.plugin.prepare([node.name for node in self.parent_nodes], row_limit)

for parent_node in self.parent_nodes:
assert isinstance(self.plugin, ProcessPlugin)
parent_node.prerun(logger, row_limit)
parent_node.prerun(row_limit)
if parent_node.result:
for data_in in parent_node.result:
self.plugin.preprocess(data_in, parent_node.name, logger)
self.plugin.preprocess(data_in, parent_node.name)
for data_in in parent_node.result:
self.result += list(self.plugin.process(data_in, parent_node.name, logger))
self.result += list(self.plugin.finished(parent_node.name, logger))
self.result += list(self.plugin.finalize(logger))
self.result += list(self.plugin.process(data_in, parent_node.name))
self.result += list(self.plugin.finished(parent_node.name))
self.result += list(self.plugin.finalize())
self.is_dirty = False

def mark_dirty(self):
Expand Down Expand Up @@ -294,11 +296,11 @@ def traverse_nodes_backwards(self):
yield node
found_nodes.add(node)

def run(self, logger):
def run(self):
threads_and_nodes = []
for node in self.traverse_nodes_backwards():
node.load_config(logger)
threads_and_nodes.append((Thread(target=node.run_thread, args=(logger,)), node))
node.load_config()
threads_and_nodes.append((Thread(target=node.run_thread), node))

for thread, _ in threads_and_nodes:
thread.start()
Expand Down
Loading

0 comments on commit 38ab26d

Please sign in to comment.