From 959e2b5797881cbf5e90a342c2c71228e119c98b Mon Sep 17 00:00:00 2001 From: t3eHawk Date: Thu, 5 Jan 2023 06:11:52 +0300 Subject: [PATCH] Rework FileManager Model --- db/oracle.sql | 30 ++-- db/sqlite.sql | 10 +- pydin/__init__.py | 4 +- pydin/core.py | 101 +++++++++++++- pydin/models.py | 349 +++++++++++++++++++++------------------------- 5 files changed, 284 insertions(+), 210 deletions(-) diff --git a/db/oracle.sql b/db/oracle.sql index 076bfda..5362f63 100644 --- a/db/oracle.sql +++ b/db/oracle.sql @@ -130,8 +130,8 @@ create table pd_task_history ( records_error number(*, 0), files_read number(*, 0), files_written number(*, 0), - volume_read number(*, 0), - volume_written number(*, 0), + bytes_read number(*, 0), + bytes_written number(*, 0), result_value number(*, 0), result_long varchar2(4000 char), updated date, @@ -171,8 +171,8 @@ create table pd_step_history ( records_error number(*, 0), files_read number(*, 0), files_written number(*, 0), - volume_read number(*, 0), - volume_written number(*, 0), + bytes_read number(*, 0), + bytes_written number(*, 0), result_value number(*, 0), result_long varchar2(4000 char), updated date, @@ -193,17 +193,17 @@ end; / create table pd_file_log ( - id number(*, 0), - job_id number(*, 0), - run_id number(*, 0), - task_id number(*, 0), - step_id number(*, 0), - server varchar2(50 char), - file_name varchar2(200 char), - file_date date, - file_size number(*, 0), - start_date date, - end_date date, + id number(*, 0), + job_id number(*, 0), + run_id number(*, 0), + task_id number(*, 0), + step_id number(*, 0), + server_name varchar2(50 char), + file_name varchar2(200 char), + file_date date, + file_size number(*, 0), + start_date date, + end_date date, constraint pd_file_log_pk primary key (id) ); diff --git a/db/sqlite.sql b/db/sqlite.sql index b6d056e..0ee17f8 100644 --- a/db/sqlite.sql +++ b/db/sqlite.sql @@ -100,8 +100,8 @@ create table pd_task_history ( records_error integer, files_read integer, files_written integer, - volume_read integer, - volume_written integer, + bytes_read integer, + bytes_written integer, result_value integer, result_long text, updated text @@ -127,8 +127,8 @@ create table pd_step_history ( records_error integer, files_read integer, files_written integer, - volume_read integer, - volume_written integer, + bytes_read integer, + bytes_written integer, result_value integer, result_long text, updated text @@ -140,7 +140,7 @@ create table pd_file_log ( run_id integer, task_id integer, step_id integer, - server text, + server_name text, file_name text, file_date text, file_size integer, diff --git a/pydin/__init__.py b/pydin/__init__.py index b406761..335ffbf 100644 --- a/pydin/__init__.py +++ b/pydin/__init__.py @@ -8,7 +8,7 @@ from .models import Mapper from .models import Table, SQL, Select, Insert from .models import CSV, JSON, XML -from .models import Files, FileManager +from .models import Filenames, FileManager from .fields import run_id, task_id, step_id, process_id @@ -23,5 +23,5 @@ __maintainer__ = __author__ __all__ = [Driver, Manager, Server, Scheduler, Job, Pipeline, Mapper, - Table, SQL, Select, Insert, CSV, JSON, XML, Files, FileManager, + Table, SQL, Select, Insert, CSV, JSON, XML, Filenames, FileManager, run_id, task_id, step_id, process_id] diff --git a/pydin/core.py b/pydin/core.py index bf8849b..5117186 100644 --- a/pydin/core.py +++ b/pydin/core.py @@ -2093,6 +2093,10 @@ def __init__(self, task_name=None, pipeline=None): self._records_written = 0 self._records_processed = 0 self._records_error = 0 + self._files_read = 0 + self._files_written = 0 + self._bytes_read = 0 + self._bytes_written = 0 self._result_value = 0 self._result_long = 0 self.errors = set() @@ -2205,6 +2209,50 @@ def records_error(self, value): self.logger.table(records_error=self.records_error) pass + @property + def files_read(self): + """Get number of files read.""" + return self._files_read + + @files_read.setter + def files_read(self, value): + if isinstance(value, int): + self._files_read += value + self.logger.table(files_read=self.files_read) + + @property + def files_written(self): + """Get number of files written.""" + return self._files_written + + @files_written.setter + def files_written(self, value): + if isinstance(value, int): + self._files_written += value + self.logger.table(files_written=self.files_written) + + @property + def bytes_read(self): + """Get number of bytes read.""" + return self._bytes_read + + @bytes_read.setter + def bytes_read(self, value): + if isinstance(value, int): + self._bytes_read += value + self.logger.table(bytes_read=self.bytes_read) + + @property + def bytes_written(self): + """Get number of bytes written.""" + return self._bytes_written + + @bytes_written.setter + def bytes_written(self, value): + if isinstance(value, int): + self._bytes_written += value + self.logger.table(bytes_written=self.bytes_written) + @property def result_value(self): """Get short numeric result value.""" @@ -2413,6 +2461,10 @@ def __init__(self, step_name=None, a=None, b=None, c=None, pipeline=None): self._records_written = 0 self._records_processed = 0 self._records_error = 0 + self._files_read = 0 + self._files_written = 0 + self._bytes_read = 0 + self._bytes_written = 0 self._result_value = 0 self._result_long = 0 self.errors = set() @@ -2576,13 +2628,60 @@ def records_error(self): @records_error.setter def records_error(self, value): - print('Hello there!') if isinstance(value, int): self._records_error += value self.task.records_error = value self.logger.table(records_error=self.records_error) pass + @property + def files_read(self): + """Get number of files read.""" + return self._files_read + + @files_read.setter + def files_read(self, value): + if isinstance(value, int): + self._files_read += value + self.task.files_read = value + self.logger.table(files_read=self.files_read) + + @property + def files_written(self): + """Get number of files written.""" + return self._files_written + + @files_written.setter + def files_written(self, value): + if isinstance(value, int): + self._files_written += value + self.task.files_written = value + self.logger.table(files_written=self.files_written) + + @property + def bytes_read(self): + """Get number of bytes read.""" + return self._bytes_read + + @bytes_read.setter + def bytes_read(self, value): + if isinstance(value, int): + self._bytes_read += value + self.task.bytes_read = value + self.logger.table(bytes_read=self.bytes_read) + + @property + def bytes_written(self): + """Get number of bytes written.""" + return self._bytes_written + + @bytes_written.setter + def bytes_written(self, value): + if isinstance(value, int): + self._bytes_written += value + self.task.bytes_written = value + self.logger.table(bytes_written=self.bytes_written) + @property def result_value(self): """Get short numeric result value.""" diff --git a/pydin/models.py b/pydin/models.py index f8404be..3ae7b85 100644 --- a/pydin/models.py +++ b/pydin/models.py @@ -1409,15 +1409,16 @@ def load(self, step, dataset): pass -class Files(Extractable, Model): +class Files(Model): """Represents file sequence as ETL model.""" - def configure(self, path=None, mask=None, created=None, recursive=None): + def configure(self, path=None, mask=None, created=None, recursive=None, + server_name=None): self.path = path self.mask = mask self.created = created self.recursive = recursive - pass + self.server_name = server_name @property def path(self): @@ -1428,7 +1429,6 @@ def path(self): def path(self, value): if isinstance(value, str) or value is None: self._path = value - pass @property def recursive(self): @@ -1439,7 +1439,6 @@ def recursive(self): def recursive(self, value): if isinstance(value, bool) or value is None: self._recursive = value - pass @property def mask(self): @@ -1448,9 +1447,10 @@ def mask(self): @mask.setter def mask(self, value): - if isinstance(value, str) or value is None: + if isinstance(value, str): self._mask = value - pass + elif value is None: + self._mask = '*' @property def created(self): @@ -1461,15 +1461,31 @@ def created(self): def created(self, value): if isinstance(value, (str, (dict, list, tuple))) or value is None: self._created = value - pass + + @property + def server_name(self): + """Get file source server name.""" + return self._server_name + + @server_name.setter + def server_name(self, value): + if isinstance(value, str) or value is None: + self._server_name = value + + @property + def file_server(self): + """Get file source connection proxy.""" + if not self.server_name: + return self.server + else: + return connector.receive(self.server_name) def read(self): """Read full list of files.""" for record in self.walk(): path = record['path'] - logger.debug(f'{path} found') + logger.debug(f'File {path} found') yield record - pass def filter(self): """Generate filtered list of files.""" @@ -1479,21 +1495,21 @@ def filter(self): for record in self.read(): path = record['path'] mtime = record['mtime'] - if self.mask is not None: - if re.match(self.mask, os.path.basename(path)) is None: - logger.debug(f'{path} filtered by mask') + if self.mask and self.mask != '*': + filename = os.path.basename(path) + if not re.match(self.mask, filename): + logger.debug(f'File {path} filtered by mask') continue if mtime < date_from or mtime > date_to: - logger.debug(f'{path} filtered by date') + logger.debug(f'File {path} filtered by date') continue - logger.debug(f'{path} matched') + logger.debug(f'File {path} matched') yield record - pass def walk(self, dir=None): """Generate list of necessary files.""" dir = dir or self.path - if isinstance(self.server, Localhost) is True: + if isinstance(self.file_server, Localhost) is True: dir = os.path.normpath(dir) if os.path.exists(dir) is True: for file in os.listdir(dir): @@ -1505,14 +1521,14 @@ def walk(self, dir=None): elif isfile is True or isdir is True: root = os.path.normpath(self.path) relpath = os.path.relpath(dir, self.path) - row = dict(server=self.server, path=path, + row = dict(server=self.file_server, path=path, root=root, dir=relpath, file=file, isdir=isdir, isfile=isfile, mtime=mtime, size=size) yield row - elif self.server.sftp is True: - if self.server.exists(dir) is True: - conn = self.server.connect() + elif self.file_server.sftp is True: + if self.file_server.exists(dir) is True: + conn = self.file_server.connect() for stats in conn.sftp.listdir_attr(dir): isdir, isfile, mtime, size = self.explore(stats) if isdir is True and self.recursive is True: @@ -1520,7 +1536,7 @@ def walk(self, dir=None): elif isfile is True or isdir is True: path = conn.sftp.normalize(f'{dir}/{stats.filename}') relpath = os.path.relpath(dir, self.path) - row = dict(server=self.server, path=path, + row = dict(server=self.file_server, path=path, root=self.path, dir=relpath, file=stats.filename, isdir=isdir, isfile=isfile, @@ -1530,9 +1546,9 @@ def walk(self, dir=None): # FTP support is pretty poor. So here not all FTP server will return # modification time and script will certainly fail in case of directory # found. - elif self.server.ftp is True: - if self.server.exists(dir) is True: - conn = self.server.connect() + elif self.file_server.ftp is True: + if self.file_server.exists(dir) is True: + conn = self.file_server.connect() for path in conn.ftp.nlst(): filename = os.path.basename(path) relpath = '.' @@ -1541,12 +1557,11 @@ def walk(self, dir=None): mtime = conn.ftp.sendcmd(f'MDTM {path}')[4:] mtime = dt.datetime.strptime(mtime, '%Y%m%d%H%M%S') size = conn.ftp.size(path) - row = dict(host=self.server, path=path, + row = dict(server=self.file_server, path=path, root=self.path, dir=relpath, filename=filename, isdir=isdir, isfile=isfile, mtime=mtime, size=size) yield row - pass def explore(self, stats): """Get main properties from file statistics.""" @@ -1571,30 +1586,44 @@ def between(self): elif isinstance(self.created, str) is True: date_from = eval(self.created, {'calendar': today}) date_to = dt.datetime.max + elif self.date_field: + date_from = self.date_from + date_to = self.date_to else: date_from = dt.datetime.min date_to = dt.datetime.max return (date_from, date_to) + pass + + +class Filenames(Files, Extractable, Model): + """Represents file names as ETL model.""" + def extract(self, step): """Extract data with file description.""" - yield list(self.filter()) + yield from self.filter() pass -class FileManager(Loadable, Model): +class FileManager(Files, Executable, Model): """Represents file manager as ETL model.""" - def configure(self, action='copy', dest=None, nodir=False, tempname=False, - zip=False, unzip=False): + def configure(self, path=None, mask=None, created=None, recursive=None, + action='copy', dest=None, zip=False, unzip=False, + nodir=False, tempname=False, server_name=None): + self.path = path + self.mask = mask + self.created = created + self.recursive = recursive self.action = action self.dest = dest - self.nodir = nodir - self.tempname = tempname self.zip = zip self.unzip = unzip - pass + self.nodir = nodir + self.tempname = tempname + self.server_name = server_name @property def action(self): @@ -1604,8 +1633,12 @@ def action(self): @action.setter def action(self, value): if isinstance(value, str) or value is None: + if isinstance(value, str): + requires = ['copy', 'move', 'delete'] + if value not in requires: + message = f'action must be {requires}, not {value}' + raise AttributeError(message) self._action = value - pass @property def dest(self): @@ -1623,7 +1656,6 @@ def dest(self, values): self._dest = [value for value in values] elif values is None: self._dest = [] - pass @property def nodir(self): @@ -1634,7 +1666,6 @@ def nodir(self): def nodir(self, value): if isinstance(value, bool) or value is None: self._nodir = value - pass @property def tempname(self): @@ -1645,7 +1676,6 @@ def tempname(self): def tempname(self, value): if isinstance(value, (str, bool)) or value is None: self._tempname = value - pass @property def zip(self): @@ -1656,7 +1686,6 @@ def zip(self): def zip(self, value): if isinstance(value, bool) or value is None: self._zip = value - pass @property def unzip(self): @@ -1667,7 +1696,6 @@ def unzip(self): def unzip(self, value): if isinstance(value, bool) or value is None: self._unzip = value - pass def createlog(self): """Generate special logger.""" @@ -1676,138 +1704,129 @@ def createlog(self): def startlog(self, step, fileinfo): """Start special logging.""" self.logger.root.table.new() + server_name = fileinfo['server'].host + file_name = fileinfo['file'] + file_date = fileinfo['mtime'] + file_size = fileinfo['size'] self.logger.table(job_id=self.job.id if self.job else None, run_id=self.job.record_id if self.job else None, task_id=self.task.id, step_id=step.id, - server=fileinfo['server'].host, - file_name=fileinfo['file'], - file_date=fileinfo['mtime'], - file_size=fileinfo['size'], + server_name=server_name, + file_name=file_name, + file_date=file_date, + file_size=file_size, start_date=dt.datetime.now()) - pass def endlog(self): """End special logging.""" self.logger.table(end_date=dt.datetime.now()) - pass def process(self, fileinfo): """Perform requested action for particular file.""" - server = fileinfo['server'] path = fileinfo['path'] - logger.debug(f'Processing {path}') + logger.debug(f'File {path} processing...') try: # Source and target are local. if ( isinstance(self.server, Localhost) - and isinstance(server, Localhost) + and isinstance(self.file_server, Localhost) ): - logger.debug('On localhost by OS') if self.action in ('copy', 'move'): self._copy_on_localhost(fileinfo) if self.action in ('move', 'delete'): self._delete_on_localhost(fileinfo) # Source and target are the same remote, SSH enabled. elif ( - server == self.server - and server.ssh is True + self.file_server == self.server + and self.file_server.ssh is True ): - logger.debug('On remote by SSH') if self.action in ('copy', 'move'): self._copy_on_remote_by_ssh(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_ssh(fileinfo) # Source and target are the same remote, SFTP enabled. elif ( - server == self.server - and server.sftp is True + self.file_server == self.server + and self.file_server.sftp is True ): - logger.debug('On remote by SFTP and OS') if self.action in ('copy', 'move'): self._copy_on_remote_by_sftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_sftp(fileinfo) # Source and target are the same remote, FTP enabled. elif ( - server == self.server - and server.ftp is True + self.file_server == self.server + and self.file_server.ftp is True ): - logger.debug('On remote by FTP and OS') if self.action in ('copy', 'move'): self._copy_on_remote_by_ftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_ftp(fileinfo) # Source is local, target is remote, SFTP enabled. elif ( - server != self.server - and isinstance(server, Localhost) + self.file_server != self.server + and isinstance(self.file_server, Localhost) and isinstance(self.server, Server) and self.server.sftp is True ): - logger.debug('To remote by SFTP') if self.action in ('copy', 'move'): self._copy_from_localhost_to_remote_by_sftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_localhost(fileinfo) # Source is remote, target is local, SFTP enabled. elif ( - server != self.server - and isinstance(server, Server) + self.file_server != self.server + and isinstance(self.file_server, Server) and isinstance(self.server, Localhost) - and server.sftp is True + and self.file_server.sftp is True ): - logger.debug('To localhost by SFTP') if self.action in ('copy', 'move'): self._copy_from_remote_to_localhost_by_sftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_sftp(fileinfo) # Source is local, target is remote, FTP enabled. elif ( - server != self.server - and isinstance(server, Localhost) + self.file_server != self.server + and isinstance(self.file_server, Localhost) and isinstance(self.server, Server) and self.server.ftp is True ): - logger.debug('To remote by FTP') if self.action in ('copy', 'move'): self._copy_from_localhost_to_remote_by_ftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_localhost(fileinfo) # Source is remote, target is local, FTP enabled. elif ( - server != self.server - and isinstance(server, Server) is True + self.file_server != self.server + and isinstance(self.file_server, Server) is True and isinstance(self.server, Localhost) is True - and server.ftp is True + and self.file_server.ftp is True ): - logger.debug('To localhost by FTP') if self.action in ('copy', 'move'): self._copy_from_remote_to_localhost_by_ftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_ftp(fileinfo) # Source and target are different remotes, SFTP on both enabled. elif ( - server != self.server - and isinstance(server, Server) + self.file_server != self.server + and isinstance(self.file_server, Server) and isinstance(self.server, Server) - and server.sftp is True + and self.file_server.sftp is True and self.server.sftp is True ): - logger.debug('From remote by SFTP to remote by SFTP') if self.action in ('copy', 'move'): self._copy_from_remote_to_remote_by_sftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_sftp(fileinfo) # Source and target are different remotes, FTP on both enabled. elif ( - server != self.server - and isinstance(server, Server) + self.file_server != self.server + and isinstance(self.file_server, Server) and isinstance(self.server, Server) - and server.ftp is True + and self.file_server.ftp is True and self.server.ftp is True ): - logger.debug('From remote by FTP to remote by FTP') if self.action in ('copy', 'move'): self._copy_from_remote_to_remote_by_ftp(fileinfo) if self.action in ('move', 'delete'): @@ -1815,13 +1834,12 @@ def process(self, fileinfo): # Source and target are different remotes, FTP enabled on source, # SFTP enabled on target. elif ( - server != self.server - and isinstance(server, Server) + self.file_server != self.server + and isinstance(self.file_server, Server) and isinstance(self.server, Server) - and server.ftp is True + and self.file_server.ftp is True and self.server.sftp is True ): - logger.debug('From remote by FTP to remote by SFTP') if self.action in ('copy', 'move'): self._copy_from_remote_by_ftp_to_remote_by_sftp(fileinfo) if self.action in ('move', 'delete'): @@ -1829,38 +1847,38 @@ def process(self, fileinfo): # Source and target are different remotes, SFTP enabled on source, # FTP enabled on target. elif ( - server != self.server - and isinstance(server, Server) + self.file_server != self.server + and isinstance(self.file_server, Server) and isinstance(self.server, Server) - and server.sftp is True + and self.file_server.sftp is True and self.server.ftp is True ): - logger.debug('From remote by SFTP to remote by FTP') if self.action in ('copy', 'move'): self._copy_from_remote_by_sftp_to_remote_by_ftp(fileinfo) if self.action in ('move', 'delete'): self._delete_on_remote_by_sftp(fileinfo) except Exception: logger.warning() - pass - pass def prepare(self): """Prepare model for ETL operation.""" self.logger = self.createlog() - pass - def load(self, step, dataset): - """Load files into file manager and process them.""" - for fileinfo in dataset: + def execute(self, step): + """Execute file manager action.""" + for fileinfo in self.filter(): + files_number = 1 + bytes_volume = fileinfo['size'] + step.files_read = files_number + step.bytes_read = bytes_volume self.startlog(step, fileinfo) self.process(fileinfo) self.endlog() - pass + step.files_written = files_number + step.bytes_written = bytes_volume def _copy_on_localhost(self, fileinfo): for path in self.dest: - logger.debug(f'To {path}') if fileinfo['isfile'] is True: source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' @@ -1877,7 +1895,7 @@ def _copy_on_localhost(self, fileinfo): fho.write(fhi.read()) if temp is not None: os.rename(temp, dest) - logger.info(f'Unzipped {source} to {dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -1887,21 +1905,18 @@ def _copy_on_localhost(self, fileinfo): fho.write(fhi.read()) if temp is not None: os.rename(temp, dest) - logger.info(f'Zipped {source} to {dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None shutil.copyfile(source, temp or dest) if temp is not None: os.rename(temp, dest) - logger.info(f'Copied {source} to {dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_on_remote_by_ssh(self, fileinfo): - remote = fileinfo['server'] - conn = remote.connect() + conn = self.file_server.connect() for path in self.dest: if fileinfo['isfile'] is True: - logger.debug(f'To {path}') source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] @@ -1914,8 +1929,7 @@ def _copy_on_remote_by_ssh(self, fileinfo): conn.execute(f'gunzip -c {source} > {temp or dest}') if temp is not None: conn.execute(f'mv {temp} {dest}') - logger.info(f'Unzipped {remote}:{source} ' - f'to {remote}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -1923,33 +1937,28 @@ def _copy_on_remote_by_ssh(self, fileinfo): conn.execute(f'gzip -c {source} > {temp or dest}') if temp is not None: conn.execute(f'mv {temp} {dest}') - logger.info(f'Zipped {remote}:{source} to {remote}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None conn.execute(f'cp {source} {temp or dest}') if temp is not None: conn.execute(f'mv {temp} {dest}') - logger.info(f'Copied {conn}:{source} to {conn}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_on_remote_by_sftp(self, fileinfo): self._copy_from_remote_to_remote_by_sftp(fileinfo) - pass def _copy_on_remote_by_ftp(self, fileinfo): self._copy_from_remote_to_remote_by_ftp(fileinfo) - pass def _copy_from_remote_to_localhost_by_sftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote = fileinfo['server'] - conn = remote.connect() + conn = self.file_server.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = os.path.normpath(os.path.join(path, dir)) if os.path.exists(dest) is False: os.makedirs(dest) @@ -1965,7 +1974,7 @@ def _copy_from_remote_to_localhost_by_sftp(self, fileinfo): shutil.copyfileobj(gz, fho) if temp is not None: os.rename(temp, dest) - logger.info(f'Unzipped {remote}:{source} to {dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -1974,27 +1983,24 @@ def _copy_from_remote_to_localhost_by_sftp(self, fileinfo): conn.sftp.getfo(source, fho) if temp is not None: os.rename(temp, dest) - logger.info(f'Zipped {remote}:{source} to {dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None conn.sftp.get(source, temp or dest) if temp is not None: os.rename(temp, dest) - logger.info(f'Copied {remote}:{source} to {dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_localhost_to_remote_by_sftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote = self.server - conn = remote.connect() + conn = self.server.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = conn.sftp.normalize(f'{path}/{dir}') - if remote.exists(dest) is False: + if self.server.exists(dest) is False: conn.sftp.mkdir(dest) dest = conn.sftp.normalize(f'{dest}/{file}') if self.unzip == 'gz' or self.unzip is True: @@ -2004,7 +2010,7 @@ def _copy_from_localhost_to_remote_by_sftp(self, fileinfo): conn.sftp.putfo(fhi, temp or dest) if temp is not None: conn.sftp.rename(temp, dest) - logger.info(f'Unzipped {source} to {remote}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2018,27 +2024,25 @@ def _copy_from_localhost_to_remote_by_sftp(self, fileinfo): conn.sftp.putfo(fho, temp or dest) if temp is not None: conn.sftp.rename(temp, dest) - logger.info(f'Zipped {source} to {remote}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None conn.sftp.put(source, temp or dest) if temp is not None: conn.sftp.rename(temp, dest) - logger.info(f'Copied {source} to {remote}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_remote_to_remote_by_sftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote_in = fileinfo['server'] - conn_in = remote_in.connect() + remote_in = self.file_server remote_out = self.server + conn_in = remote_in.connect() conn_out = remote_out.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = conn_out.sftp.normalize(f'{path}/{dir}') if remote_out.exists(dest) is False: conn_out.sftp.mkdir(dest) @@ -2053,8 +2057,7 @@ def _copy_from_remote_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(gz, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Unzipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2067,8 +2070,7 @@ def _copy_from_remote_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(th, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Zipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with tempfile.TemporaryFile() as th: @@ -2077,22 +2079,18 @@ def _copy_from_remote_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(th, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Copied {remote_in}:{source} to ' - f'{remote_out}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_localhost_to_remote_by_ftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote = self.server - conn = remote.connect() + conn = self.server.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = f'{path}/{dir}' - if remote.exists(dest) is False: + if self.server.exists(dest) is False: conn.ftp.mkd(dest) dest = f'{dest}/{file}' if self.unzip == 'gz' or self.unzip is True: @@ -2105,7 +2103,7 @@ def _copy_from_localhost_to_remote_by_ftp(self, fileinfo): conn.ftp.storbinary(f'STOR {temp or dest}', fho) if temp is not None: conn.ftp.rename(temp, dest) - logger.info(f'Unzipped {source} to {remote}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2119,26 +2117,23 @@ def _copy_from_localhost_to_remote_by_ftp(self, fileinfo): conn.ftp.storbinary(f'STOR {temp or dest}', fho) if temp is not None: conn.ftp.rename(temp, dest) - logger.info(f'Zipped {source} to {remote}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with open(source, 'rb') as fhi: conn.ftp.storbinary(f'STOR {temp or dest}', fhi) if temp is not None: conn.ftp.rename(temp, dest) - logger.info(f'Copied {source} to {remote}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_remote_to_localhost_by_ftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote = fileinfo['server'] - conn = remote.connect() + conn = self.file_server.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = os.path.normpath(os.path.join(path, dir)) if os.path.exists(dest) is False: os.makedirs(dest) @@ -2154,7 +2149,7 @@ def _copy_from_remote_to_localhost_by_ftp(self, fileinfo): shutil.copyfileobj(gz, fho) if temp is not None: os.rename(temp, dest) - logger.info(f'Unzipped {remote}:{source} to {dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2166,28 +2161,26 @@ def _copy_from_remote_to_localhost_by_ftp(self, fileinfo): shutil.copyfileobj(fhi, fho) if temp is not None: os.rename(temp, dest) - logger.info(f'Zipped {remote}:{source} to {dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with open(temp or dest, 'wb') as fho: conn.ftp.retrbinary(f'RETR {source}', fho.write) if temp is not None: os.rename(temp, dest) - logger.info(f'Copied {remote}:{source} to {dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_remote_to_remote_by_ftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote_in = fileinfo['server'] - conn_in = remote_in.connect() + remote_in = self.file_server remote_out = self.server + conn_in = remote_in.connect() conn_out = remote_out.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = f'{path}/{dir}' if remote_out.exists(dest) is False: conn_out.ftp.mkd(dest) @@ -2202,8 +2195,7 @@ def _copy_from_remote_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {temp or dest}', gz) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Unzipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2216,8 +2208,7 @@ def _copy_from_remote_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {temp or dest}', th) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Zipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with tempfile.TemporaryFile() as th: @@ -2226,22 +2217,19 @@ def _copy_from_remote_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {temp or dest}', th) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Copied {remote_in}:{source} ' - f'to {remote_out}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_remote_by_ftp_to_remote_by_sftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: - remote_in = fileinfo['server'] - conn_in = remote_in.connect() + remote_in = self.file_server remote_out = self.server + conn_in = remote_in.connect() conn_out = remote_out.connect() source = fileinfo['path'] dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = conn_out.sftp.normalize(f'{path}/{dir}') if remote_out.exists(dest) is False: conn_out.sftp.mkdir(dest) @@ -2256,8 +2244,7 @@ def _copy_from_remote_by_ftp_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(gz, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Unzipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2270,8 +2257,7 @@ def _copy_from_remote_by_ftp_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(fhi, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Zipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with tempfile.TemporaryFile() as th: @@ -2280,9 +2266,7 @@ def _copy_from_remote_by_ftp_to_remote_by_sftp(self, fileinfo): conn_out.sftp.putfo(th, temp or dest) if temp is not None: conn_out.sftp.rename(temp, dest) - logger.info(f'Copied {remote_in}:{source} ' - f'to {remote_out}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _copy_from_remote_by_sftp_to_remote_by_ftp(self, fileinfo): isfile = fileinfo['isfile'] @@ -2295,7 +2279,6 @@ def _copy_from_remote_by_sftp_to_remote_by_ftp(self, fileinfo): dir = fileinfo['dir'] if self.nodir is False else '.' file = fileinfo['file'] for path in self.dest: - logger.debug(f'To {path}') dest = f'{path}/{dir}' if remote_out.exists(dest) is False: conn_out.ftp.mkd(dest) @@ -2310,8 +2293,7 @@ def _copy_from_remote_by_sftp_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {temp or dest}', gz) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Unzipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} unzipped to {dest}') continue if self.zip == 'gz' or self.zip is True: dest = f'{dest}.gz' @@ -2324,8 +2306,7 @@ def _copy_from_remote_by_sftp_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {temp or dest}', fho) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Zipped {remote_in}:{source} to ' - f'{remote_out}:{dest}') + logger.info(f'File {source} zipped to {dest}') continue temp = f'{dest}.tmp' if self.tempname is True else None with tempfile.TemporaryFile() as th: @@ -2334,17 +2315,14 @@ def _copy_from_remote_by_sftp_to_remote_by_ftp(self, fileinfo): conn_out.ftp.storbinary(f'STOR {dest}', th) if temp is not None: conn_out.ftp.rename(temp, dest) - logger.info(f'Copied {remote_in}:{source} ' - f'to {remote_out}:{dest}') - pass + logger.info(f'File {source} copied to {dest}') def _delete_on_localhost(self, fileinfo): path = fileinfo['path'] isfile = fileinfo['isfile'] if isfile is True: os.remove(path) - logger.info(f'Deleted {path}') - pass + logger.info(f'File {path} deleted') def _delete_on_remote_by_ssh(self, fileinfo): remote = fileinfo['server'] @@ -2352,8 +2330,7 @@ def _delete_on_remote_by_ssh(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: remote.execute(f'rm -f {path}') - logger.info(f'Deleted {remote}:{path}') - pass + logger.info(f'File {path} deleted') def _delete_on_remote_by_sftp(self, fileinfo): remote = fileinfo['server'] @@ -2361,8 +2338,7 @@ def _delete_on_remote_by_sftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: remote.sftp.remove(path) - logger.info(f'Deleted {remote}:{path}') - pass + logger.info(f'File {path} deleted') def _delete_on_remote_by_ftp(self, fileinfo): remote = fileinfo['server'] @@ -2370,7 +2346,6 @@ def _delete_on_remote_by_ftp(self, fileinfo): isfile = fileinfo['isfile'] if isfile is True: remote.ftp.delete(path) - logger.info(f'Deleted {remote}:{path}') - pass + logger.info(f'File {path} deleted') pass