From 62cc8cfa235d62e867a72eec70275451d855bdf6 Mon Sep 17 00:00:00 2001 From: garciam Date: Tue, 29 Oct 2024 13:08:08 +0100 Subject: [PATCH 1/2] do not use size_limit The adaptor has its own way to measure request size, that must be very fast, as it is called in every click in the forms. We simplify our code removing this. --- cdsobs/api_rest/endpoints.py | 35 ++++++++++++++-------------- cdsobs/api_rest/models.py | 12 ---------- cdsobs/retrieve/api.py | 6 ++--- cdsobs/retrieve/retrieve_services.py | 5 +--- tests/retrieve/test_adaptor.py | 1 + tests/test_http_api.py | 33 ++++++++++++-------------- 6 files changed, 36 insertions(+), 56 deletions(-) delete mode 100644 cdsobs/api_rest/models.py diff --git a/cdsobs/api_rest/endpoints.py b/cdsobs/api_rest/endpoints.py index 3cddaf1..d28957d 100644 --- a/cdsobs/api_rest/endpoints.py +++ b/cdsobs/api_rest/endpoints.py @@ -6,15 +6,12 @@ import sqlalchemy.orm from fastapi import APIRouter, Depends, HTTPException -from cdsobs.api_rest.models import RetrievePayload from cdsobs.cdm.lite import cdm_lite_variables from cdsobs.config import CDSObsConfig, validate_config from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository -from cdsobs.retrieve.retrieve_services import ( - _get_catalogue_entries, - get_urls_and_check_size, -) +from cdsobs.retrieve.models import RetrieveArgs +from cdsobs.retrieve.retrieve_services import _get_catalogue_entries, get_urls from cdsobs.service_definition.api import get_service_definition from cdsobs.service_definition.service_definition_models import ServiceDefinition from cdsobs.storage import S3Client @@ -31,6 +28,16 @@ class HttpAPISession: def session_gen() -> Iterator[HttpAPISession]: + cdsobs_config = get_config() + try: + catalogue_session = get_database_session(cdsobs_config.catalogue_db.get_url()) + session = HttpAPISession(cdsobs_config, catalogue_session) + yield session + finally: + session.catalogue_session.close() + + +def get_config() -> CDSObsConfig: if "CDSOBS_CONFIG" in os.environ: cdsobs_config_yml = Path(os.environ["CDSOBS_CONFIG"]) else: @@ -38,12 +45,7 @@ def session_gen() -> Iterator[HttpAPISession]: if not Path(cdsobs_config_yml).exists(): raise ConfigNotFound() cdsobs_config = validate_config(cdsobs_config_yml) - try: - catalogue_session = get_database_session(cdsobs_config.catalogue_db.get_url()) - session = HttpAPISession(cdsobs_config, catalogue_session) - yield session - finally: - session.catalogue_session.close() + return cdsobs_config def make_http_exception( @@ -58,13 +60,12 @@ def make_http_exception( return http_exception -@router.post("/get_object_urls_and_check_size") -def get_object_urls_and_check_size( - retrieve_payload: RetrievePayload, +@router.post("/get_object_urls") +def get_object_urls( + retrieve_args: RetrieveArgs, session: Annotated[HttpAPISession, Depends(session_gen)], ) -> list[str]: # Query the storage to get the URLS of the files that contain the data requested - retrieve_args = retrieve_payload.retrieve_args try: catalogue_repository = CatalogueRepository(session.catalogue_session) entries = _get_catalogue_entries(catalogue_repository, retrieve_args) @@ -78,9 +79,7 @@ def get_object_urls_and_check_size( ) s3client = S3Client.from_config(session.cdsobs_config.s3config) try: - object_urls = get_urls_and_check_size( - entries, retrieve_args, retrieve_payload.config.size_limit, s3client.base - ) + object_urls = get_urls(entries, s3client.base) except SizeError as e: raise HTTPException(status_code=500, detail=dict(message=f"Error: {e}")) except Exception as e: diff --git a/cdsobs/api_rest/models.py b/cdsobs/api_rest/models.py deleted file mode 100644 index b7e10e1..0000000 --- a/cdsobs/api_rest/models.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel - -from cdsobs.retrieve.models import RetrieveArgs - - -class RetrieveConfig(BaseModel): - size_limit: int = 10000 - - -class RetrievePayload(BaseModel): - retrieve_args: RetrieveArgs - config: RetrieveConfig diff --git a/cdsobs/retrieve/api.py b/cdsobs/retrieve/api.py index 675571c..20ce394 100644 --- a/cdsobs/retrieve/api.py +++ b/cdsobs/retrieve/api.py @@ -9,7 +9,7 @@ from cdsobs.retrieve.models import RetrieveArgs from cdsobs.retrieve.retrieve_services import ( _get_catalogue_entries, - get_urls_and_check_size, + get_urls, ) from cdsobs.service_definition.api import get_service_definition from cdsobs.utils.logutils import get_logger @@ -52,9 +52,7 @@ def retrieve_observations( with get_database_session(catalogue_url) as session: catalogue_repository = CatalogueRepository(session) entries = _get_catalogue_entries(catalogue_repository, retrieve_args) - object_urls = get_urls_and_check_size( - entries, retrieve_args, size_limit, storage_url - ) + object_urls = get_urls(entries, storage_url) global_attributes = get_service_definition( retrieve_args.dataset ).global_attributes diff --git a/cdsobs/retrieve/retrieve_services.py b/cdsobs/retrieve/retrieve_services.py index 23cf04a..aa78335 100644 --- a/cdsobs/retrieve/retrieve_services.py +++ b/cdsobs/retrieve/retrieve_services.py @@ -109,10 +109,8 @@ def filter_retrieve_constraints( return ConstraintsSchema.from_table(retrieve_table) -def get_urls_and_check_size( +def get_urls( entries: Sequence[Catalogue], - retrieve_args: RetrieveArgs, - size_limit: int, storage_url: str, ) -> list[str]: """ @@ -122,7 +120,6 @@ def get_urls_and_check_size( are garbage collected. """ object_urls = [f"{storage_url}/{e.asset}" for e in entries] - _check_data_size(entries, retrieve_args, size_limit) return object_urls diff --git a/tests/retrieve/test_adaptor.py b/tests/retrieve/test_adaptor.py index 48b7e31..fe0044b 100644 --- a/tests/retrieve/test_adaptor.py +++ b/tests/retrieve/test_adaptor.py @@ -130,6 +130,7 @@ def test_adaptor_gnss(tmp_path): "year": ["2000"], "month": ["10"], "day": [f"{i:02d}" for i in range(1, 32)], + "area": ["50", "-10", "20", "10"], } test_form = {} # + "/v1/AUTH_{public_user}" will be needed to work with S3 ceph public urls, but it diff --git a/tests/test_http_api.py b/tests/test_http_api.py index debbfb3..def25b1 100644 --- a/tests/test_http_api.py +++ b/tests/test_http_api.py @@ -17,27 +17,24 @@ def test_session() -> HttpAPISession: app.dependency_overrides[session_gen] = test_session payload = { - "retrieve_args": { - "dataset": "insitu-observations-gnss", - "params": { - "dataset_source": "IGS_R3", - "stations": ["AREQ00PER"], - "latitude_coverage": (-90.0, 0.0), - "longitude_coverage": (-180.0, 0.0), - "format": "netCDF", - "variables": [ - "precipitable_water_column", - "precipitable_water_column_total_uncertainty", - ], - "year": ["2000"], - "month": ["10"], - "day": [f"{i:02d}" for i in range(1, 32)], - }, + "dataset": "insitu-observations-gnss", + "params": { + "dataset_source": "IGS_R3", + "stations": ["AREQ00PER"], + "latitude_coverage": (-90.0, 0.0), + "longitude_coverage": (-180.0, 0.0), + "format": "netCDF", + "variables": [ + "precipitable_water_column", + "precipitable_water_column_total_uncertainty", + ], + "year": ["2000"], + "month": ["10"], + "day": [f"{i:02d}" for i in range(1, 32)], }, - "config": {"size_limit": 1000000}, } - response = client.post("/get_object_urls_and_check_size", json=payload) + response = client.post("/get_object_urls", json=payload) assert response.status_code == 200 assert response.json() == [ "http://127.0.0.1:9000/cds2-obs-dev-insitu-observations-gnss/" From 69304d248b625af673ee993c6682dd84776458bf Mon Sep 17 00:00:00 2001 From: garciam Date: Wed, 30 Oct 2024 10:49:48 +0100 Subject: [PATCH 2/2] fix test --- cdsobs/ingestion/serialize.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/cdsobs/ingestion/serialize.py b/cdsobs/ingestion/serialize.py index 96ee01a..f3f9696 100644 --- a/cdsobs/ingestion/serialize.py +++ b/cdsobs/ingestion/serialize.py @@ -20,7 +20,6 @@ ) from cdsobs.netcdf import ( get_encoding_with_compression, - get_encoding_with_compression_xarray, ) from cdsobs.service_definition.service_definition_models import ServiceDefinition from cdsobs.storage import StorageClient @@ -277,17 +276,21 @@ def batch_to_netcdf( output_dir, f"{new_dataset_name}_{source}_{time_batch.year}_{time_batch.month:02d}.nc", ) - homogenised_data_xr = homogenised_data.to_xarray() - if service_definition.global_attributes is not None: - homogenised_data.attrs = { - **homogenised_data.attrs, - **service_definition.global_attributes, - } - encoding = get_encoding_with_compression_xarray( - homogenised_data_xr, string_transform="str_to_char" + encoding = get_encoding_with_compression( + homogenised_data, string_transform="str_to_char" ) - logger.info(f"Writing de-normalized and CDM mapped data to {output_path}") - homogenised_data_xr.to_netcdf( - output_path, encoding=encoding, engine="netcdf4", format="NETCDF4" + # Encode dates + attrs = dict() + for varname in homogenised_data.columns: + var_series = homogenised_data[varname] + if var_series.dtype.kind == "M": + homogenised_data[varname] = datetime_to_seconds(var_series) + attrs[varname] = dict(units=constants.TIME_UNITS) + + write_pandas_to_netcdf( + output_path, + homogenised_data, + encoding, + attrs=service_definition.global_attributes, ) return output_path