Skip to content

Commit

Permalink
feat: redis storage
Browse files Browse the repository at this point in the history
  • Loading branch information
smotornyuk committed Mar 4, 2024
1 parent 4d5dbc3 commit cced1e8
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 40 deletions.
38 changes: 38 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
default_install_hook_types:
- pre-commit
- pre-push
- commit-msg

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
# - id: check-yaml
- id: end-of-file-fixer
stages: [pre-commit]
- id: trailing-whitespace
stages: [pre-commit]
- id: debug-statements
stages: [pre-push]

## Isort
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort
stages: [pre-commit]

## Black
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
stages: [pre-commit]

# ## Ruff
# - repo: https://github.com/charliermarsh/ruff-pre-commit
# rev: v0.0.260
# hooks:
# - id: ruff
# stages: [pre-commit]
9 changes: 8 additions & 1 deletion ckanext/files/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import ckan.plugins as p
import ckan.plugins.toolkit as tk
from ckan.exceptions import CkanConfigurationException

from ckanext.files import (
cli,
config,
exceptions,
helpers,
interfaces,
shared,
Expand Down Expand Up @@ -39,6 +41,7 @@ def files_get_storage_adapters(self):
adapters = {
"files:fs": storage.FileSystemStorage,
"files:public_fs": storage.PublicFileSystemStorage,
"files:redis": storage.RedisStorage,
}

if hasattr(storage, "GoogleCloudStorage"):
Expand All @@ -56,7 +59,11 @@ def configure(self, config_):

shared.storages.reset()
for name, settings in config.storages().items():
storage = utils.storage_from_settings(settings)
try:
storage = utils.storage_from_settings(settings)
except exceptions.UnknownAdapterError as err:
raise CkanConfigurationException(str(err))

shared.storages.register(name, storage)

# IActions
Expand Down
15 changes: 8 additions & 7 deletions ckanext/files/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from .base import Capability, Manager, Reader, Storage, Uploader
from .fs import FileSystemStorage, FileSystemUploader, PublicFileSystemStorage
from .fs import FileSystemStorage, PublicFileSystemStorage
from .redis import RedisStorage

try:
from .google_cloud import GoogleCloudStorage
except ImportError:
pass

__all__ = [
"Storage",
"Capability",
"Uploader",
"Manager",
"Reader",
"FileSystemUploader",
"FileSystemStorage",
"PublicFileSystemStorage",
"GoogleCloudStorage",
"Manager",
"PublicFileSystemStorage",
"Reader",
"RedisStorage",
"Storage",
"Uploader",
]
15 changes: 12 additions & 3 deletions ckanext/files/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@
from ckanext.files import exceptions, utils

if six.PY3:
from typing_extensions import TYPE_CHECKING, NewType
from typing_extensions import TYPE_CHECKING, NewType, TypedDict

from typing import Any, IO # isort: skip

if TYPE_CHECKING:
from werkzeug.datastructures import FileStorage # isort: skip

MinimalStorageData = TypedDict(
"MinimalStorageData",
{
"content_type": str,
"size": int,
"hash": str,
},
)

CapabilityCluster = NewType("CapabilityCluster", int)
CapabilityUnit = NewType("CapabilityUnit", int)

Expand Down Expand Up @@ -55,7 +64,7 @@ class Uploader(StorageService):

@abc.abstractmethod
def upload(self, name, upload, extras):
# type: (str, FileStorage, dict[str, Any]) -> dict[str, Any]
# type: (str, FileStorage, dict[str, Any]) -> MinimalStorageData
raise NotImplementedError

def initialize_multipart_upload(self, name, extras):
Expand All @@ -67,7 +76,7 @@ def update_multipart_upload(self, upload_data, extras):
raise NotImplementedError

def complete_multipart_upload(self, upload_data, extras):
# type: (dict[str, Any], dict[str, Any]) -> dict[str, Any]
# type: (dict[str, Any], dict[str, Any]) -> MinimalStorageData
raise NotImplementedError


Expand Down
30 changes: 21 additions & 9 deletions ckanext/files/storage/fs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import hashlib
import logging
import os
import magic
import uuid
import hashlib

import magic
import six
from werkzeug.datastructures import FileStorage

Expand All @@ -14,10 +15,18 @@

if six.PY3:
from typing import Any # isort: skip
from typing_extensions import TypedDict

from .base import MinimalStorageData

FsAdditionalData = TypedDict("FsAdditionalData", {"filename": str})

class FsStorageData(FsAdditionalData, MinimalStorageData):
pass


log = logging.getLogger(__name__)
CHUNK_SIZE = 16_384
CHUNK_SIZE = 16384


class FileSystemUploader(Uploader):
Expand All @@ -27,7 +36,7 @@ class FileSystemUploader(Uploader):
)

def upload(self, name, upload, extras): # pragma: no cover
# type: (str, FileStorage, dict[str, Any]) -> dict[str, Any]
# type: (str, FileStorage, dict[str, Any]) -> FsStorageData
filename = str(uuid.uuid4())
filepath = os.path.join(self.storage.settings["path"], filename)

Expand Down Expand Up @@ -67,7 +76,7 @@ def initialize_multipart_upload(self, name, extras):
if max_size:
utils.ensure_size(upload, max_size)

result = self.upload(name, upload, data)
result = dict(self.upload(name, upload, data))
result["size"] = data["size"]

return result
Expand Down Expand Up @@ -104,7 +113,7 @@ def update_multipart_upload(self, upload_data, extras):
return upload_data

def complete_multipart_upload(self, upload_data, extras):
# type: (dict[str, Any], dict[str, Any]) -> dict[str, Any]
# type: (dict[str, Any], dict[str, Any]) -> FsStorageData
filepath = os.path.join(self.storage.settings["path"], upload_data["filename"])
size = os.path.getsize(filepath)
if size != upload_data["size"]:
Expand All @@ -127,9 +136,12 @@ def complete_multipart_upload(self, upload_data, extras):
md5.update(chunk)
chunk = src.read(CHUNK_SIZE)

upload_data["hash"] = md5.hexdigest()
upload_data["content_type"] = content_type
return upload_data
return {
"filename": upload_data["filename"],
"content_type": content_type,
"size": upload_data["size"],
"hash": md5.hexdigest(),
}


class FileSystemManager(Manager):
Expand Down
20 changes: 15 additions & 5 deletions ckanext/files/storage/google_cloud.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import base64
import os
import re
import uuid
import base64

import requests
import six
from google.cloud.storage import Client
Expand All @@ -16,8 +17,17 @@
if six.PY3:
from typing import TYPE_CHECKING

from typing_extensions import TypedDict

from .base import MinimalStorageData

from typing import Any # isort: skip

GCAdditionalData = TypedDict("GCAdditionalData", {"filename": str})

class GCStorageData(GCAdditionalData, MinimalStorageData):
pass

if TYPE_CHECKING:
from werkzeug.datastructures import FileStorage # isort: skip

Expand All @@ -34,7 +44,7 @@ class GoogleCloudUploader(Uploader):
)

def upload(self, name, upload, extras): # pragma: no cover
# type: (str, FileStorage, dict[str, Any]) -> dict[str, Any]
# type: (str, FileStorage, dict[str, Any]) -> GCStorageData
filename = str(uuid.uuid4())
filepath = os.path.join(self.storage.settings["path"], filename)

Expand All @@ -46,7 +56,7 @@ def upload(self, name, upload, extras): # pragma: no cover
"filename": filename,
"content_type": upload.content_type,
"hash": filehash,
"size": blob.size,
"size": blob.size or upload.content_length,
}

def initialize_multipart_upload(self, name, extras):
Expand Down Expand Up @@ -74,7 +84,7 @@ def initialize_multipart_upload(self, name, extras):
if max_size and data["size"] > max_size:
raise exceptions.LargeUploadError(data["size"], max_size)

url = blob.create_resumable_upload_session(size=data["size"])
url = blob.create_resumable_upload_session(size=data["size"]) # type: str

return {"session_url": url, "size": data["size"], "uploaded": 0}

Expand Down Expand Up @@ -132,7 +142,7 @@ def update_multipart_upload(self, upload_data, extras):
return upload_data

def complete_multipart_upload(self, upload_data, extras):
# type: (dict[str, Any], dict[str, Any]) -> dict[str, Any]
# type: (dict[str, Any], dict[str, Any]) -> GCStorageData
if upload_data["uploaded"] != upload_data["size"]:
raise tk.ValidationError(
{
Expand Down
79 changes: 79 additions & 0 deletions ckanext/files/storage/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import hashlib
import uuid

import six

import ckan.plugins.toolkit as tk
from ckan.lib.redis import connect_to_redis

from ckanext.files import utils

from .base import Capability, Manager, Storage, Uploader

if six.PY3:
from typing import Any # isort: skip
from werkzeug.datastructures import FileStorage # isort: skip
from typing_extensions import TypedDict

from .base import MinimalStorageData

RedisAdditionalData = TypedDict("RedisAdditionalData", {"filename": str})

class RedisStorageData(RedisAdditionalData, MinimalStorageData):
pass


class RedisUploader(Uploader):
storage = None # type: RedisStorage # pyright: ignore

required_options = ["prefix"]
capabilities = utils.combine_capabilities(Capability.CREATE)

def upload(self, name, upload, extras): # pragma: no cover
# type: (str, FileStorage, dict[str, Any]) -> RedisStorageData

filename = str(uuid.uuid4())
key = self.storage.settings["prefix"] + filename

md5 = hashlib.md5()
content = upload.stream.read()
md5.update(content)

self.storage.redis.set(key, content)

return {
"filename": filename,
"content_type": upload.content_type,
"size": len(content),
"hash": md5.hexdigest(),
}


class RedisManager(Manager):
storage = None # type: RedisStorage # pyright: ignore

required_options = ["prefix"]
capabilities = utils.combine_capabilities(Capability.REMOVE)

def remove(self, data):
# type: (dict[str, Any]) -> bool
key = self.storage.settings["prefix"] + data["filename"]
self.storage.redis.delete(key)
return True


class RedisStorage(Storage):
def make_uploader(self):
return RedisUploader(self)

def make_manager(self):
return RedisManager(self)

def __init__(self, **settings):
# type: (**Any) -> None

settings.setdefault(
"prefix", "ckanext:files:{}:file_content:".format(tk.config["ckan.site_id"])
)
super(RedisStorage, self).__init__(**settings)
self.redis = connect_to_redis()
Loading

0 comments on commit cced1e8

Please sign in to comment.