Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16.0][ADD] fs_image: New field to store your images into external filesystems #274

Merged
merged 7 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions fs_attachment/fs_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ def from_fs_attachment(cls, attachment: IrAttachment) -> FsStream:
attachment.ensure_one()
if not attachment.fs_filename:
raise ValueError("Attachment is not stored into a filesystem storage")
size = 0
if cls._check_use_x_sendfile(attachment):
fs, _storage, fname = attachment._get_fs_parts()
fs_info = fs.info(fname)
size = fs_info["size"]
return cls(
mimetype=attachment.mimetype,
download_name=attachment.name,
conditional=True,
etag=attachment.checksum,
type="fs",
size=size,
size=attachment.file_size,
last_modified=attachment["__last_update"],
fs_attachment=attachment,
)
Expand Down
20 changes: 6 additions & 14 deletions fs_attachment/models/ir_attachment.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,14 @@ class IrAttachment(models.Model):
def _compute_internal_url(self) -> None:
for rec in self:
filename, extension = os.path.splitext(rec.name)
# determine if the file is an image
pfx = "/web/content"
if rec.mimetype and rec.mimetype.startswith("image/"):
pfx = "/web/image"

if not extension:
extension = mimetypes.guess_extension(rec.mimetype)
rec.internal_url = f"/web/content/{rec.id}/{filename}{extension}"
rec.internal_url = f"{pfx}/{rec.id}/{filename}{extension}"

@api.depends("fs_filename")
def _compute_fs_url(self) -> None:
Expand Down Expand Up @@ -441,8 +446,6 @@ def _enforce_meaningful_storage_filename(self) -> None:
if self.env["fs.storage"]._must_use_filename_obfuscation(storage):
attachment.fs_filename = filename
continue
if self._is_fs_filename_meaningful(filename):
continue
new_filename = attachment._build_fs_filename()
# we must keep the same full path as the original filename
new_filename_with_path = os.path.join(
Expand Down Expand Up @@ -491,17 +494,6 @@ def _fs_parse_store_fname(
fname = partition[2]
return fs, storage_code, fname

@api.model
def _is_fs_filename_meaningful(self, filename: str) -> bool:
"""Return True if the filename is meaningful
A filename is meaningful if it's formatted as
"""
parsed = self._parse_fs_filename(filename)
if not parsed:
return False
name, res_id, version, extension = parsed
return bool(name and res_id and version is not None and extension)

@api.model
def _parse_fs_filename(self, filename: str) -> tuple[str, int, int, str] | None:
"""Parse the filename and return the name, id, version and extension
Expand Down
70 changes: 70 additions & 0 deletions fs_attachment/models/ir_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
import logging

import werkzeug.http

from odoo import models
from odoo.http import request
from odoo.tools.image import image_process

from ..fs_stream import FsStream

Expand Down Expand Up @@ -39,3 +43,69 @@
if fs_attachment:
return FsStream.from_fs_attachment(fs_attachment)
return super()._record_to_stream(record, field_name)

def _get_image_stream_from(
self,
record,
field_name="raw",
filename=None,
filename_field="name",
mimetype=None,
default_mimetype="image/png",
placeholder=None,
width=0,
height=0,
crop=False,
quality=0,
):
# we need to override this method since if you pass a width or height or
# set crop=True, the stream data must be a bytes object, not a
# file-like object. In the base implementation, the stream data is
# passed to `image_process` method to transform it and this method
# expects a bytes object.
initial_width = width
initial_height = height
initial_crop = crop
if record._name != "ir.attachment" and field_name:
field_def = record._fields[field_name]

Check warning on line 70 in fs_attachment/models/ir_binary.py

View check run for this annotation

Codecov / codecov/patch

fs_attachment/models/ir_binary.py#L70

Added line #L70 was not covered by tests
if field_def.type in ("fs_image", "fs_file"):
value = record[field_name]

Check warning on line 72 in fs_attachment/models/ir_binary.py

View check run for this annotation

Codecov / codecov/patch

fs_attachment/models/ir_binary.py#L72

Added line #L72 was not covered by tests
if value:
record = value.attachment
field_name = "raw"

Check warning on line 75 in fs_attachment/models/ir_binary.py

View check run for this annotation

Codecov / codecov/patch

fs_attachment/models/ir_binary.py#L74-L75

Added lines #L74 - L75 were not covered by tests
stream = super()._get_image_stream_from(
record,
field_name=field_name,
filename=filename,
filename_field=filename_field,
mimetype=mimetype,
default_mimetype=default_mimetype,
placeholder=placeholder,
width=0,
height=0,
crop=False,
quality=quality,
)
modified = werkzeug.http.is_resource_modified(
request.httprequest.environ,
etag=stream.etag,
last_modified=stream.last_modified,
)
if modified and (initial_width or initial_height or initial_crop):
if stream.type == "path":
with open(stream.path, "rb") as file:
stream.type = "data"
stream.path = None
stream.data = file.read()

Check warning on line 99 in fs_attachment/models/ir_binary.py

View check run for this annotation

Codecov / codecov/patch

fs_attachment/models/ir_binary.py#L97-L99

Added lines #L97 - L99 were not covered by tests
elif stream.type == "fs":
stream.data = stream.read()
stream.type = "data"
stream.data = image_process(
stream.data,
size=(initial_width, initial_height),
crop=initial_crop,
quality=quality,
)
stream.size = len(stream.data)

return stream
1 change: 1 addition & 0 deletions fs_attachment/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from . import test_fs_attachment_file_like_adapter
from . import test_fs_attachment_internal_url
from . import test_fs_storage
from . import test_stream
138 changes: 138 additions & 0 deletions fs_attachment/tests/test_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import io
import os
import shutil
import tempfile

from PIL import Image

from odoo.tests.common import HttpCase


class TestStream(HttpCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
temp_dir = tempfile.mkdtemp()
cls.temp_backend = cls.env["fs.storage"].create(
{
"name": "Temp FS Storage",
"protocol": "file",
"code": "tmp_dir",
"directory_path": temp_dir,
"base_url": "http://my.public.files/",
}
)
cls.temp_dir = temp_dir
cls.content = b"This is a test attachment"
cls.attachment_binary = (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.txt",
)
.create({"name": "test.txt", "raw": cls.content})
)

cls.image = cls._create_image(128, 128)
cls.attachment_image = (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.png",
)
.create({"name": "test.png", "raw": cls.image})
)

@cls.addClassCleanup
def cleanup_tempdir():
shutil.rmtree(temp_dir)

assert cls.attachment_binary.fs_filename
assert cls.attachment_image.fs_filename

def setUp(self):
super().setUp()
# enforce temp_backend field since it seems that they are reset on
# savepoint rollback when managed by server_environment -> TO Be investigated
self.temp_backend.write(
{
"protocol": "file",
"code": "tmp_dir",
"directory_path": self.temp_dir,
"base_url": "http://my.public.files/",
}
)

@classmethod
def tearDownClass(cls):
super().tearDownClass()
for f in os.listdir(cls.temp_dir):
os.remove(os.path.join(cls.temp_dir, f))

@classmethod
def _create_image(cls, width, height, color="#4169E1", img_format="PNG"):
f = io.BytesIO()
Image.new("RGB", (width, height), color).save(f, img_format)
f.seek(0)
return f.read()

def assertDownload(
self, url, headers, assert_status_code, assert_headers, assert_content=None
):
res = self.url_open(url, headers=headers)
res.raise_for_status()
self.assertEqual(res.status_code, assert_status_code)
for header_name, header_value in assert_headers.items():
self.assertEqual(
res.headers.get(header_name),
header_value,
f"Wrong value for header {header_name}",
)
if assert_content:
self.assertEqual(res.content, assert_content, "Wong content")
return res

def test_content_url(self):
self.authenticate("admin", "admin")
url = f"/web/content/{self.attachment_binary.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "text/plain; charset=utf-8",
"Content-Disposition": "inline; filename=test.txt",
},
assert_content=self.content,
)

def test_image_url(self):
self.authenticate("admin", "admin")
url = f"/web/image/{self.attachment_image.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
"Content-Disposition": "inline; filename=test.png",
},
assert_content=self.image,
)

def test_image_url_with_size(self):
self.authenticate("admin", "admin")
url = f"/web/image/{self.attachment_image.id}?width=64&height=64"
res = self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
"Content-Disposition": "inline; filename=test.png",
},
)
self.assertEqual(Image.open(io.BytesIO(res.content)).size, (64, 64))
Loading
Loading