From 117066de050b89e57d99a388ee4d1f381e4812da Mon Sep 17 00:00:00 2001 From: "a.krantz" Date: Mon, 28 Oct 2024 20:01:17 +0000 Subject: [PATCH 1/2] added sitemap to odsbox --- docs/conf.py | 5 +++++ docs/requirements.txt | 1 + src/odsbox/__init__.py | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index 0d9c04a..f0b3148 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -38,6 +38,7 @@ "sphinx.ext.ifconfig", "sphinx.ext.viewcode", # Add links to highlighted source code "sphinx.ext.githubpages", + "sphinx_sitemap", "myst_parser", ] @@ -59,6 +60,10 @@ html_context = {"google_site_verification": "M-YV4bEhpyyWVOBQB9VLsSCjKfqO_UpvTBMJ7DS5t_U"} +# for sitemap +html_baseurl = "https://peak-solution.github.io/odsbox/" +sitemap_url_scheme = "{link}" + templates_path = ["_templates"] # Add any paths that contain custom static files (such as style sheets) here, diff --git a/docs/requirements.txt b/docs/requirements.txt index 24cf263..1e9d77b 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,4 +1,5 @@ sphinx sphinx-copybutton sphinx-rtd-theme +sphinx-sitemap myst_parser diff --git a/src/odsbox/__init__.py b/src/odsbox/__init__.py index 8de9971..df48dff 100644 --- a/src/odsbox/__init__.py +++ b/src/odsbox/__init__.py @@ -2,4 +2,4 @@ from __future__ import annotations -__version__ = "1.0.2" +__version__ = "1.0.3" From 06108fbe62658a1a1228f7af1382c314a2f2de0f Mon Sep 17 00:00:00 2001 From: "a.krantz" Date: Wed, 30 Oct 2024 17:29:40 +0000 Subject: [PATCH 2/2] added file_access --- src/odsbox/con_i.py | 118 ++++++++++++++++++- tests/test_con_i_file_access.py | 199 ++++++++++++++++++++++++++++++++ 2 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 tests/test_con_i_file_access.py diff --git a/src/odsbox/con_i.py b/src/odsbox/con_i.py index 7730b95..8da911e 100644 --- a/src/odsbox/con_i.py +++ b/src/odsbox/con_i.py @@ -16,6 +16,7 @@ from __future__ import annotations import logging +import os from typing import List, Tuple import requests @@ -127,7 +128,7 @@ def __call__(self, r): self.__log.debug("ConI: %s", con_i) self.__session = session self.__con_i = con_i - self.__check_result(response) + self.check_requests_response(response) # lets cache the model self.model_read() @@ -165,7 +166,7 @@ def logout(self): self.__session.close() self.__session = None self.__con_i = None - self.__check_result(response) + self.check_requests_response(response) def query_data( self, @@ -468,6 +469,114 @@ def password_update(self, password_update: ods.PasswordUpdate) -> None: """ self.ods_post_request("password-update", password_update) + def file_access(self, file_identifier: ods.FileIdentifier) -> str: + """ + Get file access URL for file content. + + :param ods.FileIdentifier file_identifier: Define content to be accessed. + Might be an AoFile or a DT_BLOB attribute. + :raises requests.HTTPError: If something went wrong. + :raises ValueError: If no file location provided by server. + :return str: The server file URL. + """ + response = self.ods_post_request("file-access", file_identifier) + server_file_url = response.headers.get("location") + if server_file_url is None: + raise ValueError("No file location provided by server!") + return server_file_url + + def file_access_download( + self, + file_identifier: ods.FileIdentifier, + target_file_or_folder: str, + overwrite_existing: bool = False, + default_filename: str = "download.bin", + ) -> str: + """ + Read file content from server. + + :param ods.FileIdentifier file_identifier: Define content to be read. Might be an AoFile or a DT_BLOB attribute. + :param str target_file_or_folder: Path to save the file content to. If pointing to an existing folder. Original + filename will be used. Full path is returned. + :param bool overwrite_existing: If existing files should be overwritten. It defaults to False. + :param str default_filename: Default filename if no filename is provided by server. It defaults to "download.bin". + :raises requests.HTTPError: If something went wrong. + :raises FileExistsError: If file already exists and 'overwrite_existing' is False. + :raises ValueError: If no open session. + :return str: file path of saved file. + """ + server_file_url = self.file_access(file_identifier) + + if self.__session is None: + raise ValueError("No open session!") + file_response = self.__session.get(server_file_url) + self.check_requests_response(file_response) + + target_file_path = target_file_or_folder + if os.path.isdir(target_file_path): + content_disposition = file_response.headers.get( + "Content-Disposition", f'attachment; filename="{default_filename}"' + ) + filename = ( + content_disposition.split("filename=")[1].strip('"') + if "filename=" in content_disposition + else default_filename + ) + target_file_path = os.path.join(target_file_path, filename) + + if not overwrite_existing and os.path.exists(target_file_path): + raise FileExistsError(f"File '{target_file_path}' already exists and 'overwrite_existing' is False.") + + with open(target_file_path, "wb") as file: + file.write(file_response.content) + + return target_file_path + + def file_access_upload( + self, + file_identifier: ods.FileIdentifier, + source_file_path: str, + ) -> None: + """ + Upload file content to server. + + :param ods.FileIdentifier file_identifier: Define content to be written. Might be an AoFile or a DT_BLOB attribute. + :param str source_file_path: Path to the file to be uploaded. + :raises requests.HTTPError: If something went wrong. + :raises FileNotFoundError: If source file was not found. + :raises ValueError: If no open session. + """ + if not os.path.isfile(source_file_path): + raise FileNotFoundError(f"File '{source_file_path}' not found.") + + server_file_url = self.file_access(file_identifier) + + with open(source_file_path, "rb") as file: + if self.__session is None: + raise ValueError("No open session!") + put_response = self.__session.put( + server_file_url, data=file, headers={"Content-Type": "application/octet-stream"} + ) + self.check_requests_response(put_response) + + def file_access_delete( + self, + file_identifier: ods.FileIdentifier, + ) -> None: + """ + Delete file content from server. + + :param ods.FileIdentifier file_identifier: Define content to be deleted. Might be an AoFile or a DT_BLOB attribute. + :raises requests.HTTPError: If something went wrong. + :raises ValueError: If no open session. + """ + server_file_url = self.file_access(file_identifier) + + if self.__session is None: + raise ValueError("No open session!") + delete_response = self.__session.delete(server_file_url) + self.check_requests_response(delete_response) + def ods_post_request( self, relative_url_part: str, message: Message | None = None, timeout: float = 600.0 ) -> requests.Response: @@ -489,10 +598,11 @@ def ods_post_request( data=message.SerializeToString() if message is not None else None, timeout=timeout, ) - self.__check_result(response) + self.check_requests_response(response) return response - def __check_result(self, response: requests.Response): + @staticmethod + def check_requests_response(response: requests.Response): if response.status_code not in (200, 201): response.headers if ( diff --git a/tests/test_con_i_file_access.py b/tests/test_con_i_file_access.py new file mode 100644 index 0000000..3686600 --- /dev/null +++ b/tests/test_con_i_file_access.py @@ -0,0 +1,199 @@ +"""Integration test for ASAM ODS session""" + +import requests +from odsbox.con_i import ConI + +import pytest +import os +from unittest import mock +import tempfile +from odsbox.proto.ods_pb2 import FileIdentifier + + +@pytest.fixture +def con_i(): + return ConI("http://79.140.180.128:10032/api", ("sa", "sa")) + + +def test_file_access_download_success(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = "test_download.bin" + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/file"): + with mock.patch.object(con_i._ConI__session, "get") as mock_get: + mock_response = mock.Mock() + mock_response.content = b"file content" + mock_response.headers = {"Content-Disposition": "attachment; filename=test_download.bin"} + mock_get.return_value = mock_response + + downloaded_file = con_i.file_access_download(file_identifier, target_file, overwrite_existing=True) + + assert downloaded_file == target_file + assert os.path.exists(target_file) + with open(target_file, "rb") as f: + assert f.read() == b"file content" + + os.remove(target_file) + + +def test_file_access_download_success_2(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = "test_download.bin" + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/file"): + with mock.patch.object(con_i._ConI__session, "get") as mock_get: + mock_response = mock.Mock() + mock_response.content = b"file content" + mock_response.headers = {"Content-Disposition": 'attachment; filename="test_download.bin"'} + mock_get.return_value = mock_response + + downloaded_file = con_i.file_access_download(file_identifier, target_file, overwrite_existing=True) + + assert downloaded_file == target_file + assert os.path.exists(target_file) + with open(target_file, "rb") as f: + assert f.read() == b"file content" + + os.remove(target_file) + + +def test_file_access_download_success_3(con_i): + with tempfile.TemporaryDirectory() as temp_dir: + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = os.path.join(temp_dir, "test_download.bin") + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/file"): + with mock.patch.object(con_i._ConI__session, "get") as mock_get: + mock_response = mock.Mock() + mock_response.content = b"file content" + mock_response.headers = {"Content-Disposition": 'attachment; filename="test_download.bin"'} + mock_get.return_value = mock_response + + downloaded_file = con_i.file_access_download(file_identifier, temp_dir, overwrite_existing=True) + + assert downloaded_file == target_file + assert os.path.exists(target_file) + with open(target_file, "rb") as f: + assert f.read() == b"file content" + + +def test_file_access_download_success_4(con_i): + with tempfile.TemporaryDirectory() as temp_dir: + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = os.path.join(temp_dir, "download.bin") + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/file"): + with mock.patch.object(con_i._ConI__session, "get") as mock_get: + mock_response = mock.Mock() + mock_response.content = b"file content" + mock_response.headers = {} + mock_get.return_value = mock_response + + downloaded_file = con_i.file_access_download(file_identifier, temp_dir, overwrite_existing=True) + + assert downloaded_file == target_file + assert os.path.exists(target_file) + with open(target_file, "rb") as f: + assert f.read() == b"file content" + + +def test_file_access_download_no_overwrite(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = "test_download_no_overwrite.bin" + + with open(target_file, "wb") as f: + f.write(b"existing content") + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/file"): + with mock.patch.object(con_i._ConI__session, "get") as mock_get: + mock_response = mock.Mock() + mock_response.content = b"new file content" + mock_response.headers = {"Content-Disposition": "attachment; filename=test_download_no_overwrite.bin"} + mock_get.return_value = mock_response + + with pytest.raises(FileExistsError): + con_i.file_access_download(file_identifier, target_file, overwrite_existing=False) + + os.remove(target_file) + + +def test_file_access_download_no_session(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + target_file = "test_download_no_session.bin" + + con_i._ConI__session = None + + with pytest.raises(ValueError, match="No open session!"): + con_i.file_access_download(file_identifier, target_file) + + +def test_file_access_upload_success(con_i): + # Create a temporary file to upload + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(b"Test content") + temp_file_path = temp_file.name + + file_identifier = FileIdentifier(aid=4711, iid=1) + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/upload"): + with mock.patch.object(con_i._ConI__session, "put") as mock_post: + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + con_i.file_access_upload(file_identifier, temp_file_path) + + mock_post.assert_called_once() + assert mock_post.call_args[0][0] == "http://example.com/upload" + + os.remove(temp_file_path) + + +def test_file_access_upload_failure(con_i): + # Create a temporary file to upload + with tempfile.NamedTemporaryFile() as temp_file: + temp_file.write(b"Test content") + temp_file_path = temp_file.name + + file_identifier = FileIdentifier(aid=4711, iid=1) + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/upload"): + with mock.patch.object(con_i._ConI__session, "put") as mock_post: + mock_response = mock.Mock() + mock_response.status_code = 500 + mock_response.headers = {} + mock_response.raise_for_status.side_effect = requests.HTTPError("Failed to upload file") + mock_post.return_value = mock_response + + with pytest.raises(requests.HTTPError, match="Failed to upload file"): + con_i.file_access_upload(file_identifier, temp_file_path) + + +def test_file_access_delete_success(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/delete"): + with mock.patch.object(con_i._ConI__session, "delete") as mock_delete: + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_delete.return_value = mock_response + + con_i.file_access_delete(file_identifier) + + mock_delete.assert_called_once() + assert mock_delete.call_args[0][0] == "http://example.com/delete" + + +def test_file_access_delete_failure(con_i): + file_identifier = FileIdentifier(aid=4711, iid=1) + + with mock.patch.object(con_i, "file_access", return_value="http://example.com/delete"): + with mock.patch.object(con_i._ConI__session, "delete") as mock_delete: + mock_response = mock.Mock() + mock_response.status_code = 404 + mock_response.headers = {} + mock_response.raise_for_status.side_effect = requests.HTTPError("File not found") + mock_delete.return_value = mock_response + + with pytest.raises(requests.HTTPError, match="File not found"): + con_i.file_access_delete(file_identifier)