Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Libraries registry provider for asab.library #507

Merged
merged 25 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v3
Expand Down
59 changes: 59 additions & 0 deletions asab/library/dirsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import shutil
import filecmp


def synchronize_dirs(target, source):
'''
Synchronizes 'source' directory into 'target' directory.
The 'source' directory remains unchanged.

1. Recursively walk through the "source" directory and compare files with the "target".
2. If a file exists in "source" but not in "target", copy it.
3. If a file exists in both but has been modified in "source", copy it to overwrite the one in "target".
4. If a directory or file exists in "target" but not in "source", remove it.
'''

# Ensure target directory exists
if not os.path.exists(target):
os.makedirs(target)

# Step 1: Recursively copy files from source to target
for dirpath, dirnames, filenames in os.walk(source):
# Compute relative path to the source base
relpath = os.path.relpath(dirpath, source)
target_dir = os.path.join(target, relpath)

# Create directories in target if they don't exist
if not os.path.exists(target_dir):
os.makedirs(target_dir)

# Check files and synchronize
for filename in filenames:
source_file = os.path.join(dirpath, filename)
target_file = os.path.join(target_dir, filename)

# Copy if the file doesn't exist in target or if it's modified
if not os.path.exists(target_file) or not filecmp.cmp(source_file, target_file, shallow=False):
shutil.copy2(source_file, target_file)

# Step 2: Recursively delete files/folders in target that don't exist in source
for dirpath, dirnames, filenames in os.walk(target, topdown=False): # topdown=False for depth-first
relpath = os.path.relpath(dirpath, target)
source_dir = os.path.join(source, relpath)

# Check and remove files not in source
for filename in filenames:
target_file = os.path.join(dirpath, filename)
source_file = os.path.join(source_dir, filename)

if not os.path.exists(source_file):
os.remove(target_file)

# Check and remove directories not in source
for dirname in dirnames:
target_subdir = os.path.join(dirpath, dirname)
source_subdir = os.path.join(source_dir, dirname)

if not os.path.exists(source_subdir):
shutil.rmtree(target_subdir)
2 changes: 0 additions & 2 deletions asab/library/providers/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,4 @@ def _do_pull(self):
return to_publish

async def subscribe(self, path):
if not os.path.isdir(self.BasePath + path):
return
self.SubscribedPaths.add(path)
139 changes: 139 additions & 0 deletions asab/library/providers/libsreg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import logging
import hashlib
import random
import tarfile
import tempfile
import urllib.parse

import aiohttp

from .filesystem import FileSystemLibraryProvider
from ..dirsync import synchronize_dirs

#

L = logging.getLogger(__name__)

#


class LibsRegLibraryProvider(FileSystemLibraryProvider):
"""
Read-only provider to read from remote "library repository".

It provides an option to specify more servers for more reliable content delivery.

Example of the configuration:

```ini
[library]
providers=
...
libsreg+https://libsreg.z6.web.core.windows.net,libsreg-secondary.z6.web.core.windows.net/lmio-common-library
...
```

"""

def __init__(self, library, path, layer):

url = urllib.parse.urlparse(path)
assert url.scheme.startswith('libsreg+')

version = url.fragment
if version == '':
version = 'master'

archname = url.path[1:]

self.URLs = ["{scheme}://{netloc}/{archname}/{archname}-{version}.tar.xz".format(
scheme=url.scheme[8:],
netloc=netloc,
archname=archname,
version=version,
) for netloc in url.netloc.split(',')]
assert len(self.URLs) > 0

tempdir = tempfile.gettempdir()
self.RepoPath = os.path.join(
tempdir,
"asab.library.libsreg",
hashlib.sha256(self.URLs[0].encode('utf-8')).hexdigest()
)

os.makedirs(os.path.join(self.RepoPath), exist_ok=True)

super().__init__(library, self.RepoPath, layer, set_ready=False)

self.PullLock = False

# TODO: Subscribption to changes in the library
self.SubscribedPaths = set()

self.App.TaskService.schedule(self._periodic_pull(None))
self.App.PubSub.subscribe("Application.tick/60!", self._periodic_pull)

async def _periodic_pull(self, event_name):
"""
Changes in remote repository are being pulled every minute. `PullLock` flag ensures that only if previous "pull" has finished, new one can start.
"""
if self.PullLock:
return

self.PullLock = True

try:
headers = {}

# Check for existing E-Tag
etag_fname = os.path.join(self.RepoPath, "etag")
try:
with open(etag_fname, 'r') as f:
etag = f.read().strip()
headers['If-None-Match'] = etag
except FileNotFoundError:
pass

url = random.choice(self.URLs)

async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:

if response.status == 200: # The request indicates a new version that we don't have yet

etag_incoming = response.headers.get('ETag')

# Create a separate temporary directory for extraction
with tempfile.TemporaryDirectory() as temp_extract_dir:
fname = os.path.join(temp_extract_dir, "new.tar.xz")
with open(fname, 'wb') as ftmp:
while True:
chunk = await response.content.read(16 * 1024)
if not chunk:
break
ftmp.write(chunk)

# Extract the contents to the temporary directory
with tarfile.open(fname, mode='r:xz') as tar:
tar.extractall(temp_extract_dir)

# Synchronize the directories
synchronize_dirs(self.RepoPath, temp_extract_dir)
if etag_incoming is not None:
with open(etag_fname, 'w') as f:
f.write(etag_incoming)

elif response.status == 304:
# The repository has not changed ...
pass

else:
L.exception("Failed to download the library.", struct_data={"url": url, 'status': response.status})

finally:
self.PullLock = False


async def subscribe(self, path):
self.SubscribedPaths.add(path)
6 changes: 6 additions & 0 deletions asab/library/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ def __init__(self, app: Application, service_name: str, paths: typing.Union[str,
for layer, path in enumerate(paths):
# Create library for each layer of paths
self._create_library(path, layer)

app.PubSub.subscribe("Application.tick/60!", self._on_tick60)


async def finalize(self, app):
while len(self.Libraries) > 0:
lib = self.Libraries.pop(-1)
Expand All @@ -119,6 +121,10 @@ def _create_library(self, path, layer):
from .providers.git import GitLibraryProvider
library_provider = GitLibraryProvider(self, path, layer)

elif path.startswith('libsreg+'):
from .providers.libsreg import LibsRegLibraryProvider
library_provider = LibsRegLibraryProvider(self, path, layer)

elif path == '' or path.startswith("#") or path.startswith(";"):
# This is empty or commented line
return
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def run(self):
platforms='any',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
Expand Down