diff --git a/db/oracle.sql b/db/oracle.sql index e3fbf0c..de91400 100644 --- a/db/oracle.sql +++ b/db/oracle.sql @@ -55,19 +55,19 @@ create table pd_run_history ( start_date date, end_date date, status varchar2(1 char), - server varchar2(25 char), - user varchar2(25 char), + server_name varchar2(25 char), + user_name varchar2(25 char), pid number(*, 0), - trigger_id number(*, 0), + error_list clob, rerun_id number(*, 0), rerun_seqno number(*, 0), rerun_times number(*, 0), rerun_now varchar2(1 char), rerun_done varchar2(1 char), + trigger_id number(*, 0), + data_dump blob, file_log varchar2(25 char), text_log clob, - text_error clob, - data_dump blob, updated date, constraint pd_run_history_pk primary key (id) ); @@ -95,20 +95,21 @@ end; / create table pd_task_history ( - id number(*, 0), - job_id number(*, 0), - run_id number(*, 0), - task_name varchar2(30 char), - task_date date, - start_date date, - end_date date, - status varchar2(1 char), - records_read number(*, 0), - records_written number(*, 0), - records_error number(*, 0), - result_value number(*, 0), - result_long varchar2(4000 char), - updated date, + id number(*, 0), + job_id number(*, 0), + run_id number(*, 0), + task_name varchar2(30 char), + task_date date, + start_date date, + end_date date, + status varchar2(1 char), + records_read number(*, 0), + records_written number(*, 0), + records_processed number(*, 0), + records_error number(*, 0), + result_value number(*, 0), + result_long varchar2(4000 char), + updated date, constraint pd_task_history_pk primary key (id) ); @@ -126,25 +127,26 @@ end; / create table pd_step_history ( - id number(*, 0), - job_id number(*, 0), - run_id number(*, 0), - task_id number(*, 0), - step_name varchar2(30 char), - step_type varchar2(3 char), - step_a varchar2(30 char), - step_b varchar2(30 char), - step_c varchar2(30 char), - step_date date, - start_date date, - end_date date, - status varchar2(1 char), - records_read number(*, 0), - records_written number(*, 0), - records_error number(*, 0), - result_value number(*, 0), - result_long varchar2(4000 char), - updated date, + id number(*, 0), + job_id number(*, 0), + run_id number(*, 0), + task_id number(*, 0), + step_name varchar2(30 char), + step_type varchar2(3 char), + step_a varchar2(30 char), + step_b varchar2(30 char), + step_c varchar2(30 char), + step_date date, + start_date date, + end_date date, + status varchar2(1 char), + records_read number(*, 0), + records_written number(*, 0), + records_processed number(*, 0), + records_error number(*, 0), + result_value number(*, 0), + result_long varchar2(4000 char), + updated date, constraint pd_step_history_pk primary key (id) ); @@ -223,15 +225,15 @@ end; / create table pd_components ( - id varchar2(100 char), - server varchar2(100 char), - user varchar2(100 char), - pid number(*, 0), - url varchar2(100 char), - debug char(1 char), - start_date date, - stop_date date, - status varchar2(1 char), + id varchar2(100 char), + server_name varchar2(100 char), + user_name varchar2(100 char), + pid number(*, 0), + url varchar2(100 char), + debug char(1 char), + start_date date, + stop_date date, + status varchar2(1 char), constraint pd_components_pk primary key (id) ); diff --git a/db/sqlite.sql b/db/sqlite.sql index aa84c13..6f03aef 100644 --- a/db/sqlite.sql +++ b/db/sqlite.sql @@ -41,19 +41,19 @@ create table pd_run_history ( start_date text, end_date text, status text, - server text, - user text, + server_name text, + user_name text, pid integer, - trigger_id integer, + error_list text, rerun_id integer, rerun_seqno integer, rerun_times integer, rerun_now text, rerun_done text, + trigger_id integer, + data_dump blob, file_log text, text_log text, - text_error text, - data_dump blob, updated text ); @@ -67,42 +67,44 @@ create index pd_run_history_status_ix on pd_run_history (status); create table pd_task_history ( - id integer primary key autoincrement, - job_id integer, - run_id integer, - task_name text, - task_date text, - start_date text, - end_date text, - status text, - records_read integer, - records_written integer, - records_error integer, - result_value integer, - result_long integer, - updated text + id integer primary key autoincrement, + job_id integer, + run_id integer, + task_name text, + task_date text, + start_date text, + end_date text, + status text, + records_read integer, + records_written integer, + records_processed integer, + records_error integer, + result_value integer, + result_long text, + updated text ); create table pd_step_history ( - id integer primary key autoincrement, - job_id integer, - run_id integer, - task_id integer, - step_name text, - step_type text, - step_a text, - step_b text, - step_c text, - step_date text, - start_date text, - end_date text, - status text, - records_read integer, - records_written integer, - records_error integer, - result_value integer, - result_long text, - updated text + id integer primary key autoincrement, + job_id integer, + run_id integer, + task_id integer, + step_name text, + step_type text, + step_a text, + step_b text, + step_c text, + step_date text, + start_date text, + end_date text, + status text, + records_read integer, + records_written integer, + records_processed integer, + records_error integer, + result_value integer, + result_long text, + updated text ); create table pd_file_log ( @@ -139,15 +141,15 @@ create table pd_sql_log ( ); create table pd_components ( - id text primary key, - server text, - user text, - pid integer, - url text, - debug text, - start_date text, - stop_date text, - status text + id text primary key, + server_name text, + user_name text, + pid integer, + url text, + debug text, + start_date text, + stop_date text, + status text ); insert into pd_components (id) values ('SCHEDULER'); diff --git a/pydin/config.py b/pydin/config.py index fdf46cd..9216019 100644 --- a/pydin/config.py +++ b/pydin/config.py @@ -106,7 +106,7 @@ class Logging(): """Represents logging configurator used for special loggers.""" def __init__(self, task=True, step=True, file=False, sql=False, - database=None, schema=None, table=None, **fields): + database=None, schema=None, table=None): self.metadata = sa.MetaData() if task is True: @@ -116,12 +116,11 @@ def __init__(self, task=True, step=True, file=False, sql=False, elif isinstance(task, dict): self.task = self.Task(self, table=task.get('table', table), schema=task.get('schema', schema), - database=task.get('database', database), - **task.get('fields', fields)) + database=task.get('database', database)) else: table = table if not isinstance(task, str) else task self.task = self.Task(self, table=table, schema=schema, - database=database, **fields) + database=database) if step is True: self.step = self.Step(self, table='pd_step_history') @@ -130,8 +129,7 @@ def __init__(self, task=True, step=True, file=False, sql=False, elif isinstance(step, dict): self.step = self.Step(self, table=step.get('table'), schema=step.get('schema', schema), - database=step.get('database', database), - **step.get('fields', {})) + database=step.get('database', database)) else: table = None if not isinstance(step, str) else step self.step = self.Step(self, table=table, schema=schema, @@ -167,8 +165,7 @@ def __init__(self, task=True, step=True, file=False, sql=False, class Task(): """Represents task logger configurator.""" - def __init__(self, logging, database=None, - schema=None, table=None, **fields): + def __init__(self, logging, database=None, schema=None, table=None): self.logging = logging if isinstance(logging, Logging) else None self.database = database if isinstance(database, str) else None if isinstance(database, Database) is True: @@ -180,28 +177,6 @@ def __init__(self, logging, database=None, self.database = module.db self.schema = schema if isinstance(schema, str) else None self.table = table if isinstance(table, str) else None - self.fields = {'records_read': True, - 'records_written': True, - 'records_error': True, - 'result_value': True, - 'result_long': True, - 'mode': False, - 'server': False, - 'user': False, - 'file_log': False, - 'text_log': False, - 'text_error': False} - self.optional = [key for key, value in self.fields.items() - if value is False] - self.modify(**fields) - pass - - def modify(self, **fields): - """Modify table structure.""" - for key, value in fields.items(): - if key in self.fields.keys(): - if isinstance(value, bool): - self.fields[key] = value pass def create_table(self): @@ -236,22 +211,8 @@ def create_table(self): comment='Short numeric execution result'), sa.Column('result_long', sa.String(512), comment='Long string execution result'), - sa.Column('mode', sa.String(1), - comment='Type of initiation in the task'), - sa.Column('server', sa.String(30), - comment='Host address with process'), - sa.Column('user', sa.String(30), - comment='OS user who launched the process'), - sa.Column('file_log', sa.String(30), - comment='Path to the log file'), - sa.Column('text_log', sa.Text, - comment='Textual log records'), - sa.Column('text_error', sa.Text, - comment='Textual error'), sa.Column('updated', sa.DateTime, comment='Date of this record change')] - columns = [column for column in columns - if self.fields.get(column.name, True) is True] table = sa.Table(name, metadata, *columns, schema=schema) table.create(self.database.engine, checkfirst=True) return table @@ -276,8 +237,7 @@ def setup(self): class Step(): """Represents step logger configurator.""" - def __init__(self, logging, database=None, - schema=None, table=None, **fields): + def __init__(self, logging, database=None, schema=None, table=None): self.logging = logging if isinstance(logging, Logging) else None self.database = database if isinstance(database, str) else None if isinstance(database, Database) is True: @@ -289,28 +249,6 @@ def __init__(self, logging, database=None, self.database = module.db self.schema = schema if isinstance(schema, str) else None self.table = table if isinstance(table, str) else None - self.fields = {'records_read': True, - 'records_written': True, - 'records_error': True, - 'result_value': True, - 'result_long': True, - 'mode': False, - 'server': False, - 'user': False, - 'file_log': False, - 'text_log': False, - 'text_error': False} - self.optional = [key for key, value in self.fields.items() - if value is False] - self.modify(**fields) - pass - - def modify(self, **fields): - """Modify table structure.""" - for key, value in fields.items(): - if key in self.fields.keys(): - if isinstance(value, bool): - self.fields[key] = value pass def create_table(self): @@ -355,22 +293,8 @@ def create_table(self): comment='Short numeric result of execution'), sa.Column('result_long', sa.String(512), comment='Long string result of execution'), - sa.Column('mode', sa.String(1), - comment='Type of initiation in the task'), - sa.Column('server', sa.String(30), - comment='Host address with process'), - sa.Column('user', sa.String(30), - comment='OS user who launched the process'), - sa.Column('file_log', sa.String(30), - comment='Path to the log file'), - sa.Column('text_log', sa.Text, - comment='Textual log records'), - sa.Column('text_error', sa.Text, - comment='Textual error'), sa.Column('updated', sa.DateTime, comment='Date of this record change')] - columns = [column for column in columns - if self.fields.get(column.name, True) is True] table = sa.Table(name, metadata, *columns, schema=schema) table.create(self.database.engine, checkfirst=True) return table diff --git a/pydin/core.py b/pydin/core.py index a9eb6f2..8583ae4 100644 --- a/pydin/core.py +++ b/pydin/core.py @@ -71,8 +71,8 @@ def __init__(self, name=None, desc=None, owner=None): self.chargers = [] self.executors = [] - self.server = platform.node() - self.user = os.getlogin() + self.server_name = platform.node() + self.user_name = os.getlogin() self.pid = os.getpid() self.start_date = None self.stop_date = None @@ -845,8 +845,8 @@ def _charge(self, job: Job, tag: int): run_date=date, added=dt.datetime.now(), status='Q', - server=self.server, - user=self.user)) + server_name=self.server_name, + user_name=self.user_name)) result = conn.execute(insert) record_id = result.inserted_primary_key[0] logger.debug(f'Created run history record {record_id} for {job}') @@ -871,8 +871,8 @@ def _postpone(self, job: Job, tag: int): run_date=date, added=dt.datetime.now(), status='W', - server=self.server, - user=self.user)) + server_name=self.server_name, + user_name=self.user_name)) result = conn.execute(insert) record_id = result.inserted_primary_key[0] logger.debug(f'Created run history record {record_id} for {job}') @@ -936,7 +936,7 @@ def _execute(self, job: Job, tag: int): table = db.tables.run_history text_error = proc.stderr.read().decode() update = (table.update(). - values(status='E', text_error=text_error). + values(status='E', error_list=text_error). where(table.c.id == record_id)) conn.execute(update) except Exception: @@ -1034,8 +1034,8 @@ def _update_component(self): update = update.values(status='Y', start_date=self.start_date, stop_date=None, - server=self.server, - user=self.user, + server_name=self.server_name, + user_name=self.user_name, pid=self.pid) elif self.status is False: update = update.values(status='N', @@ -1118,8 +1118,8 @@ def __init__(self, id=None, name=None, desc=None, tag=None, date=None, debug=self.debug, alarming=self.alarm, smtp={'recipients': self.email_list}) - self.server = platform.node() - self.user = os.getlogin() + self.server_name = platform.node() + self.user_name = os.getlogin() self.pid = os.getpid() self.file_log = self._parse_file_log() self.text_log = None @@ -1238,13 +1238,16 @@ def text_parameters(self): @property def text_error(self): - """Get textual exception from the last registered error.""" + """Get textual representation of all found errors.""" if self.errors: - err_type, err_value, err_tb = last(self.errors) - return ''.join(tb.format_exception(err_type, err_value, err_tb)) + text_list = [] + for err_type, err_value, err_tb in self.errors: + exception = tb.format_exception(err_type, err_value, err_tb) + text = ''.join(exception) + text_list.append(text) + return f'{"":->34}\n'.join(text_list) else: return None - pass @property def pipeline(self): @@ -1406,8 +1409,8 @@ def _start_as_new(self): trigger_id=self.trigger_id, rerun_id=self.initiator_id, rerun_seqno=self.seqno, - server=self.server, - user=self.user, + server_name=self.server_name, + user_name=self.user_name, pid=self.pid, file_log=self.file_log) pass @@ -1418,8 +1421,8 @@ def _start_as_continue(self): self.record.write(run_mode=self.mode, start_date=self.start_date, status=self.status, - server=self.server, - user=self.user, + server_name=self.server_name, + user_name=self.user_name, pid=self.pid, file_log=self.file_log) pass @@ -1441,8 +1444,8 @@ def _start_as_rerun(self): trigger_id=self.trigger_id, rerun_id=self.initiator_id, rerun_seqno=self.seqno, - server=self.server, - user=self.user, + server_name=self.server_name, + user_name=self.user_name, pid=self.pid, file_log=self.file_log) self.initiator.write(rerun_now='Y') @@ -1479,7 +1482,7 @@ def _end(self): self.status = self._done() if not self.errors else self._error() self.end_date = dt.datetime.now() self.record.write(end_date=self.end_date, status=self.status, - text_error=self.text_error, + error_list=self.text_error, data_dump=pickle.dumps(self.data)) if self.initiator_id: rerun_done = 'Y' if self.status == 'D' else None @@ -1632,9 +1635,11 @@ def _trig(self): class Pipeline(): """Represents ETL pipeline.""" - def __init__(self, *nodes, name=None, date=None, logging=None): + def __init__(self, *nodes, name=None, date=None, error_limit=1, + logging=None): self.name = name or self.__class__.__name__ self.date = date + self.error_limit = error_limit self.task = Task() self.steps = {} @@ -1648,6 +1653,7 @@ def __init__(self, *nodes, name=None, date=None, logging=None): self.name = self.job.name if self.job.date: self.date = self.job.date + self.logging = logging if isinstance(logging, Logging) else Logging() self.calendar = calendar.Day(self.date) @@ -1804,6 +1810,20 @@ def name(self, value): self._name = None pass + def error_handler(self): + """Process last error.""" + exc_info = sys.exc_info() + if exc_info: + self.errors.add(exc_info) + if isinstance(self, Task): + if self.pipeline.job: + self.pipeline.job.errors.add(exc_info) + if isinstance(self, Step): + if self.pipeline: + self.pipeline.task.errors.add(exc_info) + if self.pipeline.job: + self.pipeline.job.errors.add(exc_info) + pass @@ -1895,9 +1915,10 @@ def __init__(self, task_name=None, pipeline=None): self._status = None self._records_read = 0 self._records_written = 0 - self._records_found = 0 - self._files_found = 0 - self._bytes_found = 0 + self._records_processed = 0 + self._records_error = 0 + self._result_value = 0 + self._result_long = 0 self.errors = set() self.updated = None pass @@ -1984,6 +2005,30 @@ def records_written(self, value): self.logger.table(records_written=self.records_written) pass + @property + def records_processed(self): + """Get number of records processed.""" + return self._records_processed + + @records_written.setter + def records_processed(self, value): + if isinstance(value, int): + self._records_processed += value + self.logger.table(records_processed=self.records_processed) + pass + + @property + def records_error(self): + """Get number of records with error.""" + return self._records_error + + @records_error.setter + def records_error(self, value): + if isinstance(value, int): + self._records_error += value + self.logger.table(records_error=self.records_error) + pass + @property def result_value(self): """Get short numeric result value.""" @@ -2008,16 +2053,6 @@ def result_long(self, value): self.logger.table(result_long=str(self.result_long)) pass - @property - def text_error(self): - """Get textual exception from the last registered error.""" - if self.errors: - err_type, err_value, err_tb = last(self.errors) - return ''.join(tb.format_exception(err_type, err_value, err_tb)) - else: - return None - pass - def setup(self, pipeline): """Configure the task for the given pipeline.""" self.pipeline = pipeline @@ -2139,9 +2174,6 @@ def _end(self): if self.status == 'D': self.end_date = dt.datetime.now() self.logger.table(end_date=self.end_date) - elif self.status == 'E': - if self.pipeline.logging.task.fields.get('text_error'): - self.logger.table(text_error=self.text_error) logger.debug(f'{self} ended') return self.end() @@ -2191,7 +2223,8 @@ def __init__(self, step_name=None, a=None, b=None, c=None, pipeline=None): self._status = None self._records_read = 0 self._records_written = 0 - self._records_found = 0 + self._records_processed = 0 + self._records_error = 0 self._result_value = 0 self._result_long = 0 self.errors = set() @@ -2336,6 +2369,32 @@ def records_written(self, value): self.logger.table(records_written=self.records_written) pass + @property + def records_processed(self): + """Get number of records processed.""" + return self._records_processed + + @records_written.setter + def records_processed(self, value): + if isinstance(value, int): + self._records_processed += value + self.logger.table(records_processed=self.records_processed) + pass + + @property + def records_error(self): + """Get number of records with error.""" + return self._records_error + + @records_error.setter + def records_error(self, value): + print('Hello there!') + if isinstance(value, int): + self._records_error += value + self.task.records_error = value + self.logger.table(records_error=self.records_error) + pass + @property def result_value(self): """Get short numeric result value.""" @@ -2388,15 +2447,6 @@ def loading(self): else: return False - @property - def text_error(self): - """Get textual exception from the last registered error.""" - if self.errors: - err_type, err_value, err_tb = last(self.errors) - return ''.join(tb.format_exception(err_type, err_value, err_tb)) - else: - return None - def setup(self, pipeline): """Configure the step for the given pipeline.""" self.pipeline = pipeline @@ -2543,9 +2593,6 @@ def _end(self): if self.status == 'D': self.end_date = dt.datetime.now() self.logger.table(end_date=self.end_date) - elif self.status == 'E': - if self.pipeline.logging.step.fields.get('text_error'): - self.logger.table(text_error=self.text_error) logger.debug(f'{self} ended') pass diff --git a/pydin/models.py b/pydin/models.py index b44f2ed..10962ec 100644 --- a/pydin/models.py +++ b/pydin/models.py @@ -5,6 +5,7 @@ import gzip import json import os +import sys import re import shutil import stat @@ -238,14 +239,22 @@ def to_extractor(self, step, queue): def extractor(self, step, queue): """Extract data.""" logger.info(f'Reading records from {self}...') - for dataset in self.extract(step): - try: - queue.put(dataset) - count = len(dataset) - step.records_read = count - logger.info(f'{step.records_read} records read') - except Exception: - logger.error() + try: + for dataset in self.extract(step): + try: + queue.put(dataset) + step.records_read = len(dataset) + logger.info(f'{step.records_read} records read') + except Exception: + step.records_error = len(dataset) + logger.info(f'{step.records_error} records with error') + logger.error() + step.error_handler() + if len(step.errors) >= step.pipeline.error_limit: + break + except Exception: + logger.error() + step.error_handler() pass def extract(self): @@ -269,28 +278,31 @@ def to_transformer(self, step, input, output): logger.debug(f'Starting {self.thread.name}...') return self.thread.start() - def transformator(self, step, input, output): + def transformator(self, step, input_queue, output_queue): """Transform data.""" logger.info(f'Processing records of {self}...') - records_processed = 0 while True: - if input.empty() is True: + if input_queue.empty() is True: if step.extraction is True: tm.sleep(0.001) continue break else: - inputs = input.get() + input_records = input_queue.get() try: - outputs = list(map(self.transform, inputs)) + output_records = list(map(self.transform, input_records)) + output_queue.put(output_records) + step.records_processed = len(output_records) + logger.info(f'{step.records_processed} records processed') except Exception: + step.records_error = len(input_records) + logger.info(f'{step.records_error} records with error') logger.error() + step.error_handler() + if len(step.errors) >= step.pipeline.error_limit: + break else: - output.put(outputs) - count = len(outputs) - records_processed += count - logger.info(f'{records_processed} records processed') - input.task_done() + input_queue.task_done() pass def transform(self): @@ -327,12 +339,16 @@ def loader(self, step, queue): dataset = queue.get() try: result = self.load(step, dataset) + step.records_written = len(coalesce(result, dataset)) + logger.info(f'{step.records_written} records written') except Exception: + step.records_error = len(dataset) + logger.info(f'{step.records_error} records with error') logger.error() + step.error_handler() + if len(step.errors) >= step.pipeline.error_limit: + break else: - count = len(coalesce(result, dataset)) - step.records_written = count - logger.info(f'{step.records_written} records written') queue.task_done() pass @@ -367,6 +383,7 @@ def executor(self, step): setattr(step, 'result_long', result) except Exception: logger.error() + step.error_handler() pass def execute(self): diff --git a/pydin/web.py b/pydin/web.py index de16a43..a928a9a 100644 --- a/pydin/web.py +++ b/pydin/web.py @@ -46,8 +46,8 @@ def __init__(self, host=None, port=None, dev=False): if config['API'].get('host'): self.port = config['API'].get('port') - self.server = platform.node() - self.user = os.getlogin() + self.server_name = platform.node() + self.user_name = os.getlogin() self.start_date = None self.stop_date = None self.status = None @@ -97,7 +97,8 @@ def start(self): update = update.values(status='Y', pid=pid, start_date=self.start_date, stop_date=self.stop_date, - server=self.server, user=self.user) + server_name=self.server_name, + user_name=self.user_name) conn.execute(update) pass