diff --git a/cdsobs/api_rest/endpoints.py b/cdsobs/api_rest/endpoints.py index ab22d02..5a364bd 100644 --- a/cdsobs/api_rest/endpoints.py +++ b/cdsobs/api_rest/endpoints.py @@ -6,15 +6,12 @@ import sqlalchemy.orm from fastapi import APIRouter, Depends, HTTPException -from cdsobs.api_rest.models import RetrievePayload from cdsobs.cdm.lite import cdm_lite_variables from cdsobs.config import CDSObsConfig, validate_config from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository -from cdsobs.retrieve.retrieve_services import ( - _get_catalogue_entries, - get_urls_and_check_size, -) +from cdsobs.retrieve.models import RetrieveArgs +from cdsobs.retrieve.retrieve_services import _get_catalogue_entries, get_urls from cdsobs.service_definition.api import get_service_definition from cdsobs.service_definition.service_definition_models import ServiceDefinition from cdsobs.storage import S3Client @@ -31,6 +28,16 @@ class HttpAPISession: def session_gen() -> Iterator[HttpAPISession]: + cdsobs_config = get_config() + try: + catalogue_session = get_database_session(cdsobs_config.catalogue_db.get_url()) + session = HttpAPISession(cdsobs_config, catalogue_session) + yield session + finally: + session.catalogue_session.close() + + +def get_config() -> CDSObsConfig: if "CDSOBS_CONFIG" in os.environ: cdsobs_config_yml = Path(os.environ["CDSOBS_CONFIG"]) else: @@ -38,12 +45,7 @@ def session_gen() -> Iterator[HttpAPISession]: if not Path(cdsobs_config_yml).exists(): raise ConfigNotFound() cdsobs_config = validate_config(cdsobs_config_yml) - try: - catalogue_session = get_database_session(cdsobs_config.catalogue_db.get_url()) - session = HttpAPISession(cdsobs_config, catalogue_session) - yield session - finally: - session.catalogue_session.close() + return cdsobs_config def make_http_exception( @@ -58,13 +60,12 @@ def make_http_exception( return http_exception -@router.post("/get_object_urls_and_check_size") -def get_object_urls_and_check_size( - retrieve_payload: RetrievePayload, +@router.post("/get_object_urls") +def get_object_urls( + retrieve_args: RetrieveArgs, session: Annotated[HttpAPISession, Depends(session_gen)], ) -> list[str]: # Query the storage to get the URLS of the files that contain the data requested - retrieve_args = retrieve_payload.retrieve_args try: catalogue_repository = CatalogueRepository(session.catalogue_session) entries = _get_catalogue_entries(catalogue_repository, retrieve_args) @@ -78,9 +79,7 @@ def get_object_urls_and_check_size( ) s3client = S3Client.from_config(session.cdsobs_config.s3config) try: - object_urls = get_urls_and_check_size( - entries, retrieve_args, retrieve_payload.config.size_limit, s3client.base - ) + object_urls = get_urls(entries, s3client.base) except SizeError as e: raise HTTPException(status_code=500, detail=dict(message=f"Error: {e}")) except Exception as e: diff --git a/cdsobs/api_rest/models.py b/cdsobs/api_rest/models.py deleted file mode 100644 index b7e10e1..0000000 --- a/cdsobs/api_rest/models.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel - -from cdsobs.retrieve.models import RetrieveArgs - - -class RetrieveConfig(BaseModel): - size_limit: int = 10000 - - -class RetrievePayload(BaseModel): - retrieve_args: RetrieveArgs - config: RetrieveConfig diff --git a/cdsobs/ingestion/serialize.py b/cdsobs/ingestion/serialize.py index 6a4f7dd..f3f9696 100644 --- a/cdsobs/ingestion/serialize.py +++ b/cdsobs/ingestion/serialize.py @@ -20,7 +20,6 @@ ) from cdsobs.netcdf import ( get_encoding_with_compression, - get_encoding_with_compression_xarray, ) from cdsobs.service_definition.service_definition_models import ServiceDefinition from cdsobs.storage import StorageClient @@ -277,20 +276,21 @@ def batch_to_netcdf( output_dir, f"{new_dataset_name}_{source}_{time_batch.year}_{time_batch.month:02d}.nc", ) - for field in homogenised_data: - if homogenised_data[field].dtype == "string": - homogenised_data[field] = homogenised_data[field].str.encode("UTF-8") - homogenised_data_xr = homogenised_data.to_xarray() - if service_definition.global_attributes is not None: - homogenised_data.attrs = { - **homogenised_data.attrs, - **service_definition.global_attributes, - } - encoding = get_encoding_with_compression_xarray( - homogenised_data_xr, string_transform="str_to_char" + encoding = get_encoding_with_compression( + homogenised_data, string_transform="str_to_char" ) - logger.info(f"Writing de-normalized and CDM mapped data to {output_path}") - homogenised_data_xr.to_netcdf( - output_path, encoding=encoding, engine="netcdf4", format="NETCDF4" + # Encode dates + attrs = dict() + for varname in homogenised_data.columns: + var_series = homogenised_data[varname] + if var_series.dtype.kind == "M": + homogenised_data[varname] = datetime_to_seconds(var_series) + attrs[varname] = dict(units=constants.TIME_UNITS) + + write_pandas_to_netcdf( + output_path, + homogenised_data, + encoding, + attrs=service_definition.global_attributes, ) return output_path diff --git a/cdsobs/retrieve/api.py b/cdsobs/retrieve/api.py index 675571c..20ce394 100644 --- a/cdsobs/retrieve/api.py +++ b/cdsobs/retrieve/api.py @@ -9,7 +9,7 @@ from cdsobs.retrieve.models import RetrieveArgs from cdsobs.retrieve.retrieve_services import ( _get_catalogue_entries, - get_urls_and_check_size, + get_urls, ) from cdsobs.service_definition.api import get_service_definition from cdsobs.utils.logutils import get_logger @@ -52,9 +52,7 @@ def retrieve_observations( with get_database_session(catalogue_url) as session: catalogue_repository = CatalogueRepository(session) entries = _get_catalogue_entries(catalogue_repository, retrieve_args) - object_urls = get_urls_and_check_size( - entries, retrieve_args, size_limit, storage_url - ) + object_urls = get_urls(entries, storage_url) global_attributes = get_service_definition( retrieve_args.dataset ).global_attributes diff --git a/cdsobs/retrieve/retrieve_services.py b/cdsobs/retrieve/retrieve_services.py index 23cf04a..aa78335 100644 --- a/cdsobs/retrieve/retrieve_services.py +++ b/cdsobs/retrieve/retrieve_services.py @@ -109,10 +109,8 @@ def filter_retrieve_constraints( return ConstraintsSchema.from_table(retrieve_table) -def get_urls_and_check_size( +def get_urls( entries: Sequence[Catalogue], - retrieve_args: RetrieveArgs, - size_limit: int, storage_url: str, ) -> list[str]: """ @@ -122,7 +120,6 @@ def get_urls_and_check_size( are garbage collected. """ object_urls = [f"{storage_url}/{e.asset}" for e in entries] - _check_data_size(entries, retrieve_args, size_limit) return object_urls diff --git a/tests/retrieve/test_adaptor.py b/tests/retrieve/test_adaptor.py index 48b7e31..fe0044b 100644 --- a/tests/retrieve/test_adaptor.py +++ b/tests/retrieve/test_adaptor.py @@ -130,6 +130,7 @@ def test_adaptor_gnss(tmp_path): "year": ["2000"], "month": ["10"], "day": [f"{i:02d}" for i in range(1, 32)], + "area": ["50", "-10", "20", "10"], } test_form = {} # + "/v1/AUTH_{public_user}" will be needed to work with S3 ceph public urls, but it diff --git a/tests/test_http_api.py b/tests/test_http_api.py index 2d4e0b5..cd8a958 100644 --- a/tests/test_http_api.py +++ b/tests/test_http_api.py @@ -17,27 +17,24 @@ def test_session() -> HttpAPISession: app.dependency_overrides[session_gen] = test_session payload = { - "retrieve_args": { - "dataset": "insitu-observations-gnss", - "params": { - "dataset_source": "IGS_R3", - "stations": ["AREQ00PER"], - "latitude_coverage": (-90.0, 0.0), - "longitude_coverage": (-180.0, 0.0), - "format": "netCDF", - "variables": [ - "precipitable_water_column", - "precipitable_water_column_total_uncertainty", - ], - "year": ["2000"], - "month": ["10"], - "day": [f"{i:02d}" for i in range(1, 32)], - }, + "dataset": "insitu-observations-gnss", + "params": { + "dataset_source": "IGS_R3", + "stations": ["AREQ00PER"], + "latitude_coverage": (-90.0, 0.0), + "longitude_coverage": (-180.0, 0.0), + "format": "netCDF", + "variables": [ + "precipitable_water_column", + "precipitable_water_column_total_uncertainty", + ], + "year": ["2000"], + "month": ["10"], + "day": [f"{i:02d}" for i in range(1, 32)], }, - "config": {"size_limit": 1000000}, } - response = client.post("/get_object_urls_and_check_size", json=payload) + response = client.post("/get_object_urls", json=payload) assert response.status_code == 200 assert response.json() == [ "http://127.0.0.1:9000/cds2-obs-dev-insitu-observations-gnss/"