From f649c94f8de1f590f5c79fbab112e9b4bf7c2911 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 10 Jun 2021 11:53:27 -0700 Subject: [PATCH 1/8] Kill grequests --- conftest.py | 11 ---- nucleus/__init__.py | 132 ++++++-------------------------------------- pyproject.toml | 1 - 3 files changed, 16 insertions(+), 128 deletions(-) diff --git a/conftest.py b/conftest.py index d4d28964..7207cd4e 100644 --- a/conftest.py +++ b/conftest.py @@ -1,14 +1,3 @@ -# grequests must be imported before any module not designed for reentrancy, -# because it relies on aggressive monkey patching that breaks if done after many -# other common module imports, e.g. ssl. -# -# So we import it before everything else. For details see: -# https://github.com/gevent/gevent/issues/1016#issuecomment-328530533 -# https://github.com/spyoungtech/grequests/issues/8 -import grequests - -################ - import logging import os diff --git a/nucleus/__init__.py b/nucleus/__init__.py index 6aaa6a7e..75662a50 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -53,25 +53,20 @@ import json import logging import os -import warnings -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union -import grequests import pkg_resources import requests import tqdm import tqdm.notebook as tqdm_notebook -from requests.adapters import HTTPAdapter # pylint: disable=E1101 # TODO: refactor to reduce this file to under 1000 lines. # pylint: disable=C0302 -from requests.packages.urllib3.util.retry import Retry from .annotation import ( BoxAnnotation, PolygonAnnotation, - Segment, SegmentationAnnotation, ) from .constants import ( @@ -145,6 +140,7 @@ def __init__( api_key: str, use_notebook: bool = False, endpoint: str = None, + verify: bool = True, ): self.api_key = api_key self.tqdm_bar = tqdm.tqdm @@ -324,13 +320,13 @@ def populate_dataset( dataset_id: str, dataset_items: List[DatasetItem], batch_size: int = 100, - force: bool = False, + update: bool = False, ): """ Appends images to a dataset with given dataset_id. - Overwrites images on collision if forced. + Overwrites images on collision if updated. :param dataset_id: id of a dataset - :param payload: { "items": List[DatasetItem], "force": bool } + :param payload: { "items": List[DatasetItem], "update": bool } :param local: flag if images are stored locally :param batch_size: size of the batch for long payload :return: @@ -373,16 +369,19 @@ def populate_dataset( async_responses: List[Any] = [] for batch in tqdm_local_batches: - payload = construct_append_payload(batch, force) + payload = construct_append_payload(batch, update) responses = self._process_append_requests_local( - dataset_id, payload, force + dataset_id, payload, update ) async_responses.extend(responses) for batch in tqdm_remote_batches: - payload = construct_append_payload(batch, force) + payload = construct_append_payload(batch, update) responses = self._process_append_requests( - dataset_id, payload, force, batch_size, batch_size + dataset_id=dataset_id, + payload=payload, + update=update, + batch_size=batch_size, ) async_responses.extend(responses) @@ -397,20 +396,7 @@ def _process_append_requests_local( payload: dict, update: bool, local_batch_size: int = 10, - size: int = 10, ): - def error(batch_items: dict) -> UploadResponse: - return UploadResponse( - { - DATASET_ID_KEY: dataset_id, - ERROR_ITEMS: len(batch_items), - ERROR_PAYLOAD: batch_items, - } - ) - - def exception_handler(request, exception): - logger.error(exception) - def preprocess_payload(batch): request_payload = [ (ITEMS_KEY, (None, json.dumps(batch), "application/json")) @@ -438,21 +424,14 @@ def preprocess_payload(batch): request_payloads.append(batch_payload) payload_items.append(batch) - async_requests = [ - self._make_grequest( + responses = [ + self.make_request( payload, f"dataset/{dataset_id}/append", - local=True, ) for payload in request_payloads ] - async_responses = grequests.map( - async_requests, - exception_handler=exception_handler, - size=size, - ) - def close_files(request_items): for item in request_items: # file buffer in location [1][1] @@ -463,15 +442,6 @@ def close_files(request_items): for p in request_payloads: close_files(p) - # response object will be None if an error occurred - async_responses = [ - response - if (response and response.status_code == 200) - else error(request_items) - for response, request_items in zip(async_responses, payload_items) - ] - responses.extend(async_responses) - return responses def _process_append_requests( @@ -480,20 +450,7 @@ def _process_append_requests( payload: dict, update: bool, batch_size: int = 20, - size: int = 10, ): - def default_error(payload: dict) -> UploadResponse: - return UploadResponse( - { - DATASET_ID_KEY: dataset_id, - ERROR_ITEMS: len(payload[ITEMS_KEY]), - ERROR_PAYLOAD: payload[ITEMS_KEY], - } - ) - - def exception_handler(request, exception): - logger.error(exception) - items = payload[ITEMS_KEY] payloads = [ # batch_size images per request @@ -501,28 +458,14 @@ def exception_handler(request, exception): for i in range(0, len(items), batch_size) ] - async_requests = [ - self._make_grequest( + return [ + self.make_request( payload, f"dataset/{dataset_id}/append", - local=False, ) for payload in payloads ] - async_responses = grequests.map( - async_requests, exception_handler=exception_handler, size=size - ) - - async_responses = [ - response - if (response and response.status_code == 200) - else default_error(payload) - for response, payload in zip(async_responses, payloads) - ] - - return async_responses - def annotate_dataset( self, dataset_id: str, @@ -1070,49 +1013,6 @@ def delete_custom_index(self, dataset_id: str): requests_command=requests.delete, ) - def _make_grequest( - self, - payload: dict, - route: str, - session=None, - requests_command: Callable = grequests.post, - local=True, - ): - """ - makes a grequest to Nucleus endpoint - :param payload: file dict for multipart-formdata - :param route: route for the request - :param session: requests.session - :param requests_command: grequests.post, grequests.get, grequests.delete - :return: An async grequest object - """ - adapter = HTTPAdapter(max_retries=Retry(total=3)) - sess = requests.Session() - sess.mount("https://", adapter) - sess.mount("http://", adapter) - - endpoint = f"{self.endpoint}/{route}" - logger.info("Posting to %s", endpoint) - - if local: - post = requests_command( - endpoint, - session=sess, - files=payload, - auth=(self.api_key, ""), - timeout=DEFAULT_NETWORK_TIMEOUT_SEC, - ) - else: - post = requests_command( - endpoint, - session=sess, - json=payload, - headers={"Content-Type": "application/json"}, - auth=(self.api_key, ""), - timeout=DEFAULT_NETWORK_TIMEOUT_SEC, - ) - return post - def _make_request_raw( self, payload: dict, endpoint: str, requests_command=requests.post ): diff --git a/pyproject.toml b/pyproject.toml index 8f52eaac..f3b692ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,6 @@ packages = [{include="nucleus"}] [tool.poetry.dependencies] python = "^3.6.2" -grequests = "^0.6.0" requests = "^2.25.1" tqdm = "^4.41.0" dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" } From 50b111669d707bc27721a662c1747058f0d62c20 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 10 Jun 2021 14:18:55 -0700 Subject: [PATCH 2/8] local upload passes --- nucleus/__init__.py | 47 +++++++++++++++++++++++++++++---------------- nucleus/dataset.py | 2 +- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/nucleus/__init__.py b/nucleus/__init__.py index 75662a50..762bebd2 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -68,6 +68,7 @@ BoxAnnotation, PolygonAnnotation, SegmentationAnnotation, + Segment, ) from .constants import ( ANNOTATION_METADATA_SCHEMA_KEY, @@ -386,7 +387,7 @@ def populate_dataset( async_responses.extend(responses) for response in async_responses: - agg_response.update_response(response.json()) + agg_response.update_response(response) return agg_response @@ -394,10 +395,10 @@ def _process_append_requests_local( self, dataset_id: str, payload: dict, - update: bool, + update: bool, # TODO: understand how to pass this in. local_batch_size: int = 10, ): - def preprocess_payload(batch): + def get_files(batch): request_payload = [ (ITEMS_KEY, (None, json.dumps(batch), "application/json")) ] @@ -416,20 +417,19 @@ def preprocess_payload(batch): items = payload[ITEMS_KEY] responses: List[Any] = [] - request_payloads = [] + files_per_request = [] payload_items = [] for i in range(0, len(items), local_batch_size): batch = items[i : i + local_batch_size] - batch_payload = preprocess_payload(batch) - request_payloads.append(batch_payload) + files_per_request.append(get_files(batch)) payload_items.append(batch) responses = [ - self.make_request( - payload, - f"dataset/{dataset_id}/append", + self._make_files_request( + files=files, + route=f"dataset/{dataset_id}/append", ) - for payload in request_payloads + for files in files_per_request ] def close_files(request_items): @@ -439,7 +439,7 @@ def close_files(request_items): item[1][1].close() # don't forget to close all open files - for p in request_payloads: + for p in files_per_request: close_files(p) return responses @@ -1013,8 +1013,8 @@ def delete_custom_index(self, dataset_id: str): requests_command=requests.delete, ) - def _make_request_raw( - self, payload: dict, endpoint: str, requests_command=requests.post + def _make_files_request( + self, files, route: str, requests_command=requests.post ): """ Makes a request to Nucleus endpoint. This method returns the raw @@ -1025,18 +1025,22 @@ def _make_request_raw( :param requests_command: requests.post, requests.get, requests.delete :return: response """ + endpoint = f"{self.endpoint}/{route}" + logger.info("Posting to %s", endpoint) response = requests_command( endpoint, - json=payload, - headers={"Content-Type": "application/json"}, + files=files, auth=(self.api_key, ""), timeout=DEFAULT_NETWORK_TIMEOUT_SEC, ) logger.info("API request has response code %s", response.status_code) - return response + if not response.ok: + self.handle_bad_response(endpoint, requests_command, response) + + return response.json() def make_request( self, payload: dict, route: str, requests_command=requests.post @@ -1052,7 +1056,16 @@ def make_request( """ endpoint = f"{self.endpoint}/{route}" - response = self._make_request_raw(payload, endpoint, requests_command) + logger.info("Posting to %s", endpoint) + + response = requests_command( + endpoint, + json=payload, + headers={"Content-Type": "application/json"}, + auth=(self.api_key, ""), + timeout=DEFAULT_NETWORK_TIMEOUT_SEC, + ) + logger.info("API request has response code %s", response.status_code) if not response.ok: self.handle_bad_response(endpoint, requests_command, response) diff --git a/nucleus/dataset.py b/nucleus/dataset.py index d601bf53..a2c627d1 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -225,7 +225,7 @@ def append( return self._client.populate_dataset( self.id, dataset_items, - force=update, + update=update, batch_size=batch_size, ) From c0261805480dbe53315ea7460add8ba9d826adb9 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 10 Jun 2021 17:55:49 -0700 Subject: [PATCH 3/8] Add docs --- nucleus/__init__.py | 130 +++++++++++++++++++++++++++++--------------- nucleus/errors.py | 24 ++++++-- pyproject.toml | 1 + 3 files changed, 107 insertions(+), 48 deletions(-) diff --git a/nucleus/__init__.py b/nucleus/__init__.py index 762bebd2..da7df303 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -50,25 +50,23 @@ geometry | dict | Representation of the bounding box in the Box2DGeometry format.\n metadata | dict | An arbitrary metadata blob for the annotation.\n """ +import asyncio import json import logging import os from typing import Any, Dict, List, Optional, Union +import aiohttp import pkg_resources import requests import tqdm import tqdm.notebook as tqdm_notebook -# pylint: disable=E1101 -# TODO: refactor to reduce this file to under 1000 lines. -# pylint: disable=C0302 - from .annotation import ( BoxAnnotation, PolygonAnnotation, - SegmentationAnnotation, Segment, + SegmentationAnnotation, ) from .constants import ( ANNOTATION_METADATA_SCHEMA_KEY, @@ -122,6 +120,11 @@ from .slice import Slice from .upload_response import UploadResponse +# pylint: disable=E1101 +# TODO: refactor to reduce this file to under 1000 lines. +# pylint: disable=C0302 + + __version__ = pkg_resources.get_distribution("scale-nucleus").version logger = logging.getLogger(__name__) @@ -141,7 +144,6 @@ def __init__( api_key: str, use_notebook: bool = False, endpoint: str = None, - verify: bool = True, ): self.api_key = api_key self.tqdm_bar = tqdm.tqdm @@ -424,13 +426,12 @@ def get_files(batch): files_per_request.append(get_files(batch)) payload_items.append(batch) - responses = [ - self._make_files_request( - files=files, - route=f"dataset/{dataset_id}/append", + responses = asyncio.run( + self.make_many_files_requests_asynchronously( + files_per_request, + f"dataset/{dataset_id}/append", ) - for files in files_per_request - ] + ) def close_files(request_items): for item in request_items: @@ -444,6 +445,70 @@ def close_files(request_items): return responses + async def make_many_files_requests_asynchronously( + self, files_per_request, route + ): + """ + Makes an async post request with files to a Nucleus endpoint. + + :param files_per_request: A list of lists of tuples (name, (filename, file_pointer, content_type)) + name will become the name by which the multer can build an array. + :param route: route for the request + :return: awaitable list(response) + """ + async with aiohttp.ClientSession() as session: + tasks = [ + asyncio.ensure_future( + self._make_files_request( + files=files, route=route, session=session + ) + ) + for files in files_per_request + ] + return await asyncio.gather(*tasks) + + async def _make_files_request( + self, + files, + route: str, + session: aiohttp.ClientSession, + ): + """ + Makes an async post request with files to a Nucleus endpoint. + + :param files: A list of tuples (filename, file_pointer, file_type) + :param route: route for the request + :param session: Session to use for post. + :return: response + """ + endpoint = f"{self.endpoint}/{route}" + + logger.info("Posting to %s", endpoint) + + form = aiohttp.FormData() + + for file in files: + form.add_field( + name=file[0], + filename=file[1][0], + value=file[1][1], + content_type=file[1][2], + ) + + async with session.post( + endpoint, + data=form, + auth=aiohttp.BasicAuth(self.api_key, ""), + timeout=DEFAULT_NETWORK_TIMEOUT_SEC, + ) as response: + logger.info("API request has response code %s", response.status) + if not response.ok: + self.handle_bad_response( + endpoint, session.post, aiohttp_response=response + ) + + return await response.json() + def _process_append_requests( self, dataset_id: str, @@ -1013,35 +1078,6 @@ def delete_custom_index(self, dataset_id: str): requests_command=requests.delete, ) - def _make_files_request( - self, files, route: str, requests_command=requests.post - ): - """ - Makes a request to Nucleus endpoint. This method returns the raw - requests.Response object which is useful for unit testing. - - :param payload: given payload - :param endpoint: endpoint + route for the request - :param requests_command: requests.post, requests.get, requests.delete - :return: response - """ - endpoint = f"{self.endpoint}/{route}" - - logger.info("Posting to %s", endpoint) - - response = requests_command( - endpoint, - files=files, - auth=(self.api_key, ""), - timeout=DEFAULT_NETWORK_TIMEOUT_SEC, - ) - logger.info("API request has response code %s", response.status_code) - - if not response.ok: - self.handle_bad_response(endpoint, requests_command, response) - - return response.json() - def make_request( self, payload: dict, route: str, requests_command=requests.post ) -> dict: @@ -1072,5 +1108,13 @@ def make_request( return response.json() - def handle_bad_response(self, endpoint, requests_command, response): - raise NucleusAPIError(endpoint, requests_command, response) + def handle_bad_response( + self, + endpoint, + requests_command, + requests_response=None, + aiohttp_response=None, + ): + raise NucleusAPIError( + endpoint, requests_command, requests_response, aiohttp_response + ) diff --git a/nucleus/errors.py b/nucleus/errors.py index 58b1b131..30285b88 100644 --- a/nucleus/errors.py +++ b/nucleus/errors.py @@ -25,9 +25,23 @@ def __init__(self, message="Could not retrieve dataset items"): class NucleusAPIError(Exception): - def __init__(self, endpoint, command, response): - message = f"Tried to {command.__name__} {endpoint}, but received {response.status_code}: {response.reason}." - if hasattr(response, "text"): - if response.text: - message += f"\nThe detailed error is:\n{response.text}" + def __init__( + self, endpoint, command, requests_response=None, aiohttp_response=None + ): + + if requests_response is not None: + message = f"Tried to {command.__name__} {endpoint}, but received {requests_response.status_code}: {requests_response.reason}." + if hasattr(requests_response, "text"): + if requests_response.text: + message += ( + f"\nThe detailed error is:\n{requests_response.text}" + ) + + if aiohttp_response is not None: + message = f"Tried to {command.__name__} {endpoint}, but received {aiohttp_response.status}: {aiohttp_response.reason}." + if hasattr(requests_response, "text"): + text = requests_response.text() + if text: + message += f"\nThe detailed error is:\n{text}" + super().__init__(message) diff --git a/pyproject.toml b/pyproject.toml index f3b692ea..efca18bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ python = "^3.6.2" requests = "^2.25.1" tqdm = "^4.41.0" dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" } +aiohttp = "^3.7.4" [tool.poetry.dev-dependencies] poetry = "^1.1.5" From 47220b9007fe0e80b15d954784ad17fed9760bde Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 10 Jun 2021 18:04:01 -0700 Subject: [PATCH 4/8] Add warning --- nucleus/dataset.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nucleus/dataset.py b/nucleus/dataset.py index a2c627d1..a607b793 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -30,6 +30,9 @@ from .payload_constructor import construct_model_run_creation_payload +WARN_FOR_LARGE_UPLOAD = 50000 + + class Dataset: """ Nucleus Dataset. You can append images with metadata to your dataset, @@ -211,6 +214,11 @@ def append( """ check_for_duplicate_reference_ids(dataset_items) + if len(dataset_items) > WARN_FOR_LARGE_UPLOAD and not asynchronous: + print( + "WARNING: for large uploads we recommend uploading your images remotely to a cloud storage provider and then using asynchronous=True." + ) + if asynchronous: check_all_paths_remote(dataset_items) request_id = serialize_and_write_to_presigned_url( From a3b58047cbe8cd4669311266db8af1efc2b09d9e Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 10 Jun 2021 18:04:49 -0700 Subject: [PATCH 5/8] Bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index efca18bb..954e2aa6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ exclude = ''' [tool.poetry] name = "scale-nucleus" -version = "0.1.7" +version = "0.1.8" description = "The official Python client library for Nucleus, the Data Platform for AI" license = "MIT" authors = ["Scale AI Nucleus Team "] From a5447b7576d49a8c6949205a1afb408735643589 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Fri, 11 Jun 2021 03:15:48 -0700 Subject: [PATCH 6/8] Bump python version --- .circleci/config.yml | 4 ++-- pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 41d6530c..5cf445de 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ version: 2.1 jobs: build_test: docker: - - image: python:3.6-slim-buster + - image: python:3.7-slim-buster resource_class: small steps: - checkout # checkout source code to working directory @@ -18,7 +18,7 @@ jobs: - run: name: Black Formatting Check # Only validation, without re-formatting command: | - poetry run black --check -t py36 . + poetry run black --check -t py37 . - run: name: Flake8 Lint Check # Uses setup.cfg for configuration command: | diff --git a/pyproject.toml b/pyproject.toml index 954e2aa6..4e179035 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ documentation = "https://dashboard.scale.com/nucleus/docs/api" packages = [{include="nucleus"}] [tool.poetry.dependencies] -python = "^3.6.2" +python = "^3.7.0" requests = "^2.25.1" tqdm = "^4.41.0" dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" } From 5dda057cf59e9bc4ee88bfded92a0e6ee3dcd074 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Fri, 11 Jun 2021 06:43:20 -0700 Subject: [PATCH 7/8] Nan fixes, back to python 3.6 --- .circleci/config.yml | 4 +-- nucleus/__init__.py | 27 +++++++++++++++---- nucleus/annotation.py | 2 +- nucleus/dataset_item.py | 2 +- nucleus/errors.py | 9 +++---- pyproject.toml | 4 +-- tests/test_dataset.py | 57 +++++++++++++++++++++++++---------------- 7 files changed, 67 insertions(+), 38 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5cf445de..41d6530c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ version: 2.1 jobs: build_test: docker: - - image: python:3.7-slim-buster + - image: python:3.6-slim-buster resource_class: small steps: - checkout # checkout source code to working directory @@ -18,7 +18,7 @@ jobs: - run: name: Black Formatting Check # Only validation, without re-formatting command: | - poetry run black --check -t py37 . + poetry run black --check -t py36 . - run: name: Flake8 Lint Check # Uses setup.cfg for configuration command: | diff --git a/nucleus/__init__.py b/nucleus/__init__.py index da7df303..7a8e1256 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -402,7 +402,14 @@ def _process_append_requests_local( ): def get_files(batch): request_payload = [ - (ITEMS_KEY, (None, json.dumps(batch), "application/json")) + ( + ITEMS_KEY, + ( + None, + json.dumps(batch, allow_nan=False), + "application/json", + ), + ) ] for item in batch: image = open( # pylint: disable=R1732 @@ -426,7 +433,8 @@ def get_files(batch): files_per_request.append(get_files(batch)) payload_items.append(batch) - responses = asyncio.run( + loop = asyncio.get_event_loop() + responses = loop.run_until_complete( self.make_many_files_requests_asynchronously( files_per_request, f"dataset/{dataset_id}/append", @@ -476,7 +484,7 @@ async def _make_files_request( """ Makes an async post request with files to a Nucleus endpoint. - :param files: A list of tuples (filename, file_pointer, file_type) + :param files: A list of tuples (name, (filename, file_pointer, file_type)) :param route: route for the request :param session: Session to use for post. :return: response @@ -502,12 +510,21 @@ async def _make_files_request( timeout=DEFAULT_NETWORK_TIMEOUT_SEC, ) as response: logger.info("API request has response code %s", response.status) + + try: + data = await response.json() + except aiohttp.client_exceptions.ContentTypeError: + # In case of 404, the server returns text + data = await response.text() + if not response.ok: self.handle_bad_response( - endpoint, session.post, aiohttp_response=response + endpoint, + session.post, + aiohttp_response=(response.status, response.reason, data), ) - return await response.json() + return data def _process_append_requests( self, diff --git a/nucleus/annotation.py b/nucleus/annotation.py index 91bc0a06..84150e6c 100644 --- a/nucleus/annotation.py +++ b/nucleus/annotation.py @@ -53,7 +53,7 @@ def to_payload(self): ) def to_json(self) -> str: - return json.dumps(self.to_payload()) + return json.dumps(self.to_payload(), allow_nan=False) @dataclass diff --git a/nucleus/dataset_item.py b/nucleus/dataset_item.py index 91fdac52..2b079c13 100644 --- a/nucleus/dataset_item.py +++ b/nucleus/dataset_item.py @@ -52,7 +52,7 @@ def to_payload(self) -> dict: return payload def to_json(self) -> str: - return json.dumps(self.to_payload()) + return json.dumps(self.to_payload(), allow_nan=False) def is_local_path(path: str) -> bool: diff --git a/nucleus/errors.py b/nucleus/errors.py index 30285b88..108c4efe 100644 --- a/nucleus/errors.py +++ b/nucleus/errors.py @@ -38,10 +38,9 @@ def __init__( ) if aiohttp_response is not None: - message = f"Tried to {command.__name__} {endpoint}, but received {aiohttp_response.status}: {aiohttp_response.reason}." - if hasattr(requests_response, "text"): - text = requests_response.text() - if text: - message += f"\nThe detailed error is:\n{text}" + status, reason, data = aiohttp_response + message = f"Tried to {command.__name__} {endpoint}, but received {status}: {reason}." + if data: + message += f"\nThe detailed error is:\n{data}" super().__init__(message) diff --git a/pyproject.toml b/pyproject.toml index 4e179035..b6cf361f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,8 +32,8 @@ documentation = "https://dashboard.scale.com/nucleus/docs/api" packages = [{include="nucleus"}] [tool.poetry.dependencies] -python = "^3.7.0" -requests = "^2.25.1" +python = "^3.6.2" +requests = "^2.23.0" tqdm = "^4.41.0" dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" } aiohttp = "^3.7.4" diff --git a/tests/test_dataset.py b/tests/test_dataset.py index dbb3d03c..4a3b8794 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -1,39 +1,40 @@ +import math +import os + +import pytest +from nucleus import ( + Dataset, + DatasetItem, + NucleusAPIError, + NucleusClient, + UploadResponse, +) from nucleus.annotation import ( BoxAnnotation, PolygonAnnotation, SegmentationAnnotation, ) +from nucleus.constants import ( + DATASET_ID_KEY, + ERROR_ITEMS, + ERROR_PAYLOAD, + IGNORED_ITEMS, + NEW_ITEMS, + UPDATED_ITEMS, +) from nucleus.job import AsyncJob, JobError -import pytest -import os from .helpers import ( + LOCAL_FILENAME, TEST_BOX_ANNOTATIONS, + TEST_DATASET_NAME, + TEST_IMG_URLS, TEST_POLYGON_ANNOTATIONS, TEST_SEGMENTATION_ANNOTATIONS, TEST_SLICE_NAME, - TEST_DATASET_NAME, - TEST_IMG_URLS, - LOCAL_FILENAME, reference_id_from_url, ) -from nucleus import ( - Dataset, - DatasetItem, - UploadResponse, - NucleusClient, - NucleusAPIError, -) -from nucleus.constants import ( - NEW_ITEMS, - UPDATED_ITEMS, - IGNORED_ITEMS, - ERROR_ITEMS, - ERROR_PAYLOAD, - DATASET_ID_KEY, -) - TEST_AUTOTAG_DATASET = "ds_bz43jm2jwm70060b3890" @@ -132,8 +133,20 @@ def check_is_expected_response(response): def test_dataset_append_local(CLIENT, dataset): - ds_items_local = [DatasetItem(image_location=LOCAL_FILENAME)] + ds_items_local_error = [ + DatasetItem(image_location=LOCAL_FILENAME, metadata={"test": math.nan}) + ] + with pytest.raises(ValueError) as e: + dataset.append(ds_items_local_error) + assert "Out of range float values are not JSON compliant" in str( + e.value + ) + ds_items_local = [ + DatasetItem(image_location=LOCAL_FILENAME, metadata={"test": 0}) + ] + response = dataset.append(ds_items_local) + assert isinstance(response, UploadResponse) resp_json = response.json() assert resp_json[DATASET_ID_KEY] == dataset.id From a9890b928755c5d2ba7c2d16aec8c607510b2168 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Mon, 14 Jun 2021 15:01:01 -0700 Subject: [PATCH 8/8] change warning copy --- nucleus/dataset.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nucleus/dataset.py b/nucleus/dataset.py index a607b793..c91c79aa 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -216,7 +216,10 @@ def append( if len(dataset_items) > WARN_FOR_LARGE_UPLOAD and not asynchronous: print( - "WARNING: for large uploads we recommend uploading your images remotely to a cloud storage provider and then using asynchronous=True." + "Tip: for large uploads, get faster performance by importing your data " + "into Nucleus directly from a cloud storage provider. See " + "https://dashboard.scale.com/nucleus/docs/api?language=python#guide-for-large-ingestions" + " for details." ) if asynchronous: