diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3c785f660..ce015e26b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v3 diff --git a/asab/library/dirsync.py b/asab/library/dirsync.py new file mode 100644 index 000000000..858ba4dbe --- /dev/null +++ b/asab/library/dirsync.py @@ -0,0 +1,59 @@ +import os +import shutil +import filecmp + + +def synchronize_dirs(target, source): + ''' + Synchronizes 'source' directory into 'target' directory. + The 'source' directory remains unchanged. + + 1. Recursively walk through the "source" directory and compare files with the "target". + 2. If a file exists in "source" but not in "target", copy it. + 3. If a file exists in both but has been modified in "source", copy it to overwrite the one in "target". + 4. If a directory or file exists in "target" but not in "source", remove it. + ''' + + # Ensure target directory exists + if not os.path.exists(target): + os.makedirs(target) + + # Step 1: Recursively copy files from source to target + for dirpath, dirnames, filenames in os.walk(source): + # Compute relative path to the source base + relpath = os.path.relpath(dirpath, source) + target_dir = os.path.join(target, relpath) + + # Create directories in target if they don't exist + if not os.path.exists(target_dir): + os.makedirs(target_dir) + + # Check files and synchronize + for filename in filenames: + source_file = os.path.join(dirpath, filename) + target_file = os.path.join(target_dir, filename) + + # Copy if the file doesn't exist in target or if it's modified + if not os.path.exists(target_file) or not filecmp.cmp(source_file, target_file, shallow=False): + shutil.copy2(source_file, target_file) + + # Step 2: Recursively delete files/folders in target that don't exist in source + for dirpath, dirnames, filenames in os.walk(target, topdown=False): # topdown=False for depth-first + relpath = os.path.relpath(dirpath, target) + source_dir = os.path.join(source, relpath) + + # Check and remove files not in source + for filename in filenames: + target_file = os.path.join(dirpath, filename) + source_file = os.path.join(source_dir, filename) + + if not os.path.exists(source_file): + os.remove(target_file) + + # Check and remove directories not in source + for dirname in dirnames: + target_subdir = os.path.join(dirpath, dirname) + source_subdir = os.path.join(source_dir, dirname) + + if not os.path.exists(source_subdir): + shutil.rmtree(target_subdir) diff --git a/asab/library/providers/git.py b/asab/library/providers/git.py index 415395874..67532171d 100644 --- a/asab/library/providers/git.py +++ b/asab/library/providers/git.py @@ -217,6 +217,4 @@ def _do_pull(self): return to_publish async def subscribe(self, path): - if not os.path.isdir(self.BasePath + path): - return self.SubscribedPaths.add(path) diff --git a/asab/library/providers/libsreg.py b/asab/library/providers/libsreg.py new file mode 100644 index 000000000..9578630f9 --- /dev/null +++ b/asab/library/providers/libsreg.py @@ -0,0 +1,161 @@ +import os +import logging +import hashlib +import random +import tarfile +import tempfile +import urllib.parse +import shutil + +import aiohttp + +from .filesystem import FileSystemLibraryProvider +from ..dirsync import synchronize_dirs + +# + +L = logging.getLogger(__name__) + +# + + +class LibsRegLibraryProvider(FileSystemLibraryProvider): + """ + Read-only provider to read from remote "library repository". + + It provides an option to specify more servers for more reliable content delivery. + + Example of the configuration: + + ```ini + [library] + providers= + ... + libsreg+https://libsreg.z6.web.core.windows.net,libsreg-secondary.z6.web.core.windows.net/lmio-common-library + ... + ``` + + """ + + def __init__(self, library, path, layer): + + url = urllib.parse.urlparse(path) + assert url.scheme.startswith('libsreg+') + + version = url.fragment + if version == '': + version = 'master' + + archname = url.path[1:] + + self.URLs = ["{scheme}://{netloc}/{archname}/{archname}-{version}.tar.xz".format( + scheme=url.scheme[8:], + netloc=netloc, + archname=archname, + version=version, + ) for netloc in url.netloc.split(',')] + assert len(self.URLs) > 0 + + tempdir = tempfile.gettempdir() + self.RootPath = os.path.join( + tempdir, + "asab.library.libsreg", + ) + + self.RepoPath = os.path.join( + self.RootPath, + hashlib.sha256(self.URLs[0].encode('utf-8')).hexdigest() + ) + + os.makedirs(os.path.join(self.RepoPath), exist_ok=True) + + super().__init__(library, self.RepoPath, layer, set_ready=False) + + self.PullLock = False + + # TODO: Subscribption to changes in the library + self.SubscribedPaths = set() + + self.App.TaskService.schedule(self._periodic_pull(None)) + self.App.PubSub.subscribe("Application.tick/60!", self._periodic_pull) + + async def _periodic_pull(self, event_name): + """ + Changes in remote repository are being pulled every minute. `PullLock` flag ensures that only if previous "pull" has finished, new one can start. + """ + if self.PullLock: + return + + self.PullLock = True + + try: + headers = {} + + # Check for existing E-Tag + etag_fname = os.path.join(self.RepoPath, "etag") + try: + with open(etag_fname, 'r') as f: + etag = f.read().strip() + headers['If-None-Match'] = etag + except FileNotFoundError: + pass + + url = random.choice(self.URLs) + + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + + if response.status == 200: # The request indicates a new version that we don't have yet + + etag_incoming = response.headers.get('ETag') + + # Download new version + newtarfname = os.path.join(self.RootPath, "new.tar.xz") + with open(newtarfname, 'wb') as ftmp: + while True: + chunk = await response.content.read(16 * 1024) + if not chunk: + break + ftmp.write(chunk) + + # Extract the contents to the temporary directory + temp_extract_dir = os.path.join( + self.RootPath, + "new" + ) + + # Remove temp_extract_dir if it exists (from the last, failed run) + if os.path.exists(temp_extract_dir): + shutil.rmtree(temp_extract_dir) + + # TODO: Remove temp_extract_dir if exists (from last, failed run) + with tarfile.open(newtarfname, mode='r:xz') as tar: + tar.extractall(temp_extract_dir) + + # Synchronize the directories + synchronize_dirs(self.RepoPath, temp_extract_dir) + if etag_incoming is not None: + with open(etag_fname, 'w') as f: + f.write(etag_incoming) + + # Remove temp_extract_dir + if os.path.exists(temp_extract_dir): + shutil.rmtree(temp_extract_dir) + + # Remove newtarfname + if os.path.exists(newtarfname): + os.remove(newtarfname) + + elif response.status == 304: + # The repository has not changed ... + pass + + else: + L.exception("Failed to download the library.", struct_data={"url": url, 'status': response.status}) + + finally: + self.PullLock = False + + + async def subscribe(self, path): + self.SubscribedPaths.add(path) diff --git a/asab/library/service.py b/asab/library/service.py index c2b214e95..323f035f7 100644 --- a/asab/library/service.py +++ b/asab/library/service.py @@ -91,8 +91,10 @@ def __init__(self, app: Application, service_name: str, paths: typing.Union[str, for layer, path in enumerate(paths): # Create library for each layer of paths self._create_library(path, layer) + app.PubSub.subscribe("Application.tick/60!", self._on_tick60) + async def finalize(self, app): while len(self.Libraries) > 0: lib = self.Libraries.pop(-1) @@ -119,6 +121,10 @@ def _create_library(self, path, layer): from .providers.git import GitLibraryProvider library_provider = GitLibraryProvider(self, path, layer) + elif path.startswith('libsreg+'): + from .providers.libsreg import LibsRegLibraryProvider + library_provider = LibsRegLibraryProvider(self, path, layer) + elif path == '' or path.startswith("#") or path.startswith(";"): # This is empty or commented line return diff --git a/setup.py b/setup.py index f9c19fbe2..164256471 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,6 @@ def run(self): platforms='any', classifiers=[ 'Development Status :: 5 - Production/Stable', - 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10',