-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #507 from TeskaLabs/feature/library-libsreg
Libraries registry provider for `asab.library`
- Loading branch information
Showing
6 changed files
with
227 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import os | ||
import shutil | ||
import filecmp | ||
|
||
|
||
def synchronize_dirs(target, source): | ||
''' | ||
Synchronizes 'source' directory into 'target' directory. | ||
The 'source' directory remains unchanged. | ||
1. Recursively walk through the "source" directory and compare files with the "target". | ||
2. If a file exists in "source" but not in "target", copy it. | ||
3. If a file exists in both but has been modified in "source", copy it to overwrite the one in "target". | ||
4. If a directory or file exists in "target" but not in "source", remove it. | ||
''' | ||
|
||
# Ensure target directory exists | ||
if not os.path.exists(target): | ||
os.makedirs(target) | ||
|
||
# Step 1: Recursively copy files from source to target | ||
for dirpath, dirnames, filenames in os.walk(source): | ||
# Compute relative path to the source base | ||
relpath = os.path.relpath(dirpath, source) | ||
target_dir = os.path.join(target, relpath) | ||
|
||
# Create directories in target if they don't exist | ||
if not os.path.exists(target_dir): | ||
os.makedirs(target_dir) | ||
|
||
# Check files and synchronize | ||
for filename in filenames: | ||
source_file = os.path.join(dirpath, filename) | ||
target_file = os.path.join(target_dir, filename) | ||
|
||
# Copy if the file doesn't exist in target or if it's modified | ||
if not os.path.exists(target_file) or not filecmp.cmp(source_file, target_file, shallow=False): | ||
shutil.copy2(source_file, target_file) | ||
|
||
# Step 2: Recursively delete files/folders in target that don't exist in source | ||
for dirpath, dirnames, filenames in os.walk(target, topdown=False): # topdown=False for depth-first | ||
relpath = os.path.relpath(dirpath, target) | ||
source_dir = os.path.join(source, relpath) | ||
|
||
# Check and remove files not in source | ||
for filename in filenames: | ||
target_file = os.path.join(dirpath, filename) | ||
source_file = os.path.join(source_dir, filename) | ||
|
||
if not os.path.exists(source_file): | ||
os.remove(target_file) | ||
|
||
# Check and remove directories not in source | ||
for dirname in dirnames: | ||
target_subdir = os.path.join(dirpath, dirname) | ||
source_subdir = os.path.join(source_dir, dirname) | ||
|
||
if not os.path.exists(source_subdir): | ||
shutil.rmtree(target_subdir) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import os | ||
import logging | ||
import hashlib | ||
import random | ||
import tarfile | ||
import tempfile | ||
import urllib.parse | ||
import shutil | ||
|
||
import aiohttp | ||
|
||
from .filesystem import FileSystemLibraryProvider | ||
from ..dirsync import synchronize_dirs | ||
|
||
# | ||
|
||
L = logging.getLogger(__name__) | ||
|
||
# | ||
|
||
|
||
class LibsRegLibraryProvider(FileSystemLibraryProvider): | ||
""" | ||
Read-only provider to read from remote "library repository". | ||
It provides an option to specify more servers for more reliable content delivery. | ||
Example of the configuration: | ||
```ini | ||
[library] | ||
providers= | ||
... | ||
libsreg+https://libsreg.z6.web.core.windows.net,libsreg-secondary.z6.web.core.windows.net/lmio-common-library | ||
... | ||
``` | ||
""" | ||
|
||
def __init__(self, library, path, layer): | ||
|
||
url = urllib.parse.urlparse(path) | ||
assert url.scheme.startswith('libsreg+') | ||
|
||
version = url.fragment | ||
if version == '': | ||
version = 'master' | ||
|
||
archname = url.path[1:] | ||
|
||
self.URLs = ["{scheme}://{netloc}/{archname}/{archname}-{version}.tar.xz".format( | ||
scheme=url.scheme[8:], | ||
netloc=netloc, | ||
archname=archname, | ||
version=version, | ||
) for netloc in url.netloc.split(',')] | ||
assert len(self.URLs) > 0 | ||
|
||
tempdir = tempfile.gettempdir() | ||
self.RootPath = os.path.join( | ||
tempdir, | ||
"asab.library.libsreg", | ||
) | ||
|
||
self.RepoPath = os.path.join( | ||
self.RootPath, | ||
hashlib.sha256(self.URLs[0].encode('utf-8')).hexdigest() | ||
) | ||
|
||
os.makedirs(os.path.join(self.RepoPath), exist_ok=True) | ||
|
||
super().__init__(library, self.RepoPath, layer, set_ready=False) | ||
|
||
self.PullLock = False | ||
|
||
# TODO: Subscribption to changes in the library | ||
self.SubscribedPaths = set() | ||
|
||
self.App.TaskService.schedule(self._periodic_pull(None)) | ||
self.App.PubSub.subscribe("Application.tick/60!", self._periodic_pull) | ||
|
||
async def _periodic_pull(self, event_name): | ||
""" | ||
Changes in remote repository are being pulled every minute. `PullLock` flag ensures that only if previous "pull" has finished, new one can start. | ||
""" | ||
if self.PullLock: | ||
return | ||
|
||
self.PullLock = True | ||
|
||
try: | ||
headers = {} | ||
|
||
# Check for existing E-Tag | ||
etag_fname = os.path.join(self.RepoPath, "etag") | ||
try: | ||
with open(etag_fname, 'r') as f: | ||
etag = f.read().strip() | ||
headers['If-None-Match'] = etag | ||
except FileNotFoundError: | ||
pass | ||
|
||
url = random.choice(self.URLs) | ||
|
||
async with aiohttp.ClientSession() as session: | ||
async with session.get(url, headers=headers) as response: | ||
|
||
if response.status == 200: # The request indicates a new version that we don't have yet | ||
|
||
etag_incoming = response.headers.get('ETag') | ||
|
||
# Download new version | ||
newtarfname = os.path.join(self.RootPath, "new.tar.xz") | ||
with open(newtarfname, 'wb') as ftmp: | ||
while True: | ||
chunk = await response.content.read(16 * 1024) | ||
if not chunk: | ||
break | ||
ftmp.write(chunk) | ||
|
||
# Extract the contents to the temporary directory | ||
temp_extract_dir = os.path.join( | ||
self.RootPath, | ||
"new" | ||
) | ||
|
||
# Remove temp_extract_dir if it exists (from the last, failed run) | ||
if os.path.exists(temp_extract_dir): | ||
shutil.rmtree(temp_extract_dir) | ||
|
||
# TODO: Remove temp_extract_dir if exists (from last, failed run) | ||
with tarfile.open(newtarfname, mode='r:xz') as tar: | ||
tar.extractall(temp_extract_dir) | ||
|
||
# Synchronize the directories | ||
synchronize_dirs(self.RepoPath, temp_extract_dir) | ||
if etag_incoming is not None: | ||
with open(etag_fname, 'w') as f: | ||
f.write(etag_incoming) | ||
|
||
# Remove temp_extract_dir | ||
if os.path.exists(temp_extract_dir): | ||
shutil.rmtree(temp_extract_dir) | ||
|
||
# Remove newtarfname | ||
if os.path.exists(newtarfname): | ||
os.remove(newtarfname) | ||
|
||
elif response.status == 304: | ||
# The repository has not changed ... | ||
pass | ||
|
||
else: | ||
L.exception("Failed to download the library.", struct_data={"url": url, 'status': response.status}) | ||
|
||
finally: | ||
self.PullLock = False | ||
|
||
|
||
async def subscribe(self, path): | ||
self.SubscribedPaths.add(path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters