Skip to content

Commit

Permalink
Video Upload API (#221)
Browse files Browse the repository at this point in the history
* Initial async functionality

* Made index an optional arg, switched to items instead of frames

* Update some docs

* Added tests

* Added some docs

* Better doc strings

* docs

* typo fix

* more typos

* Updates to pyproject.toml and CHANGELOG.md
  • Loading branch information
cmpajot authored Feb 18, 2022
1 parent b237e6e commit eac724d
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 52 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.6.6](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.6.6) - 2021-02-18

### Added
- Video upload support

## [0.6.5](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.6.5) - 2021-02-16

### Fixed
Expand Down
5 changes: 2 additions & 3 deletions nucleus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
"DatasetItem",
"DatasetItemRetrievalError",
"Frame",
"Frame",
"LidarScene",
"LidarScene",
"VideoScene",
"Model",
"ModelCreationError",
# "MultiCategoryAnnotation", # coming soon!
Expand Down Expand Up @@ -124,7 +123,7 @@
SegmentationPrediction,
)
from .retry_strategy import RetryStrategy
from .scene import Frame, LidarScene
from .scene import Frame, LidarScene, VideoScene
from .slice import Slice
from .upload_response import UploadResponse
from .validate import Validate
Expand Down
5 changes: 5 additions & 0 deletions nucleus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ERROR_CODES = "error_codes"
ERROR_ITEMS = "upload_errors"
ERROR_PAYLOAD = "error_payload"
FRAME_RATE_KEY = "frame_rate"
FRAMES_KEY = "frames"
FX_KEY = "fx"
FY_KEY = "fy"
Expand Down Expand Up @@ -101,6 +102,10 @@
UPLOAD_TO_SCALE_KEY = "upload_to_scale"
URL_KEY = "url"
VERTICES_KEY = "vertices"
VIDEO_FRAME_LOCATION_KEY = "video_frame_location"
VIDEO_FRAME_URL_KEY = "video_frame_url"
VIDEO_KEY = "video"
VIDEO_UPLOAD_TYPE_KEY = "video_upload_type"
WIDTH_KEY = "width"
YAW_KEY = "yaw"
W_KEY = "w"
Expand Down
92 changes: 75 additions & 17 deletions nucleus/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
REQUEST_ID_KEY,
SLICE_ID_KEY,
UPDATE_KEY,
VIDEO_UPLOAD_TYPE_KEY,
)
from .data_transfer_object.dataset_info import DatasetInfo
from .data_transfer_object.dataset_size import DatasetSize
Expand All @@ -65,7 +66,7 @@
construct_model_run_creation_payload,
construct_taxonomy_payload,
)
from .scene import LidarScene, Scene, check_all_scene_paths_remote
from .scene import LidarScene, Scene, VideoScene, check_all_scene_paths_remote
from .slice import Slice
from .upload_response import UploadResponse

Expand Down Expand Up @@ -405,16 +406,17 @@ def ingest_tasks(self, task_ids: List[str]) -> dict:

def append(
self,
items: Union[Sequence[DatasetItem], Sequence[LidarScene]],
items: Union[
Sequence[DatasetItem], Sequence[LidarScene], Sequence[VideoScene]
],
update: bool = False,
batch_size: int = 20,
asynchronous: bool = False,
) -> Union[Dict[Any, Any], AsyncJob, UploadResponse]:
"""Appends items or scenes to a dataset.
.. note::
Datasets can only accept one of :class:`DatasetItems <DatasetItem>`
or :class:`Scenes <LidarScene>`, never both.
Datasets can only accept one of DatasetItems or Scenes, never both.
This behavior is set during Dataset :meth:`creation
<NucleusClient.create_dataset>` with the ``is_scene`` flag.
Expand Down Expand Up @@ -478,13 +480,14 @@ def append(
Union[ \
Sequence[:class:`DatasetItem`], \
Sequence[:class:`LidarScene`] \
Sequence[:class:`VideoScene`]
]): List of items or scenes to upload.
batch_size: Size of the batch for larger uploads. Default is 20.
update: Whether or not to overwrite metadata on reference ID collision.
Default is False.
asynchronous: Whether or not to process the upload asynchronously (and
return an :class:`AsyncJob` object). This is highly encouraged for
3D data to drastically increase throughput. Default is False.
return an :class:`AsyncJob` object). This is required when uploading
scenes. Default is False.
Returns:
For scenes
Expand All @@ -508,17 +511,26 @@ def append(
dataset_items = [
item for item in items if isinstance(item, DatasetItem)
]
scenes = [item for item in items if isinstance(item, LidarScene)]
if dataset_items and scenes:
lidar_scenes = [item for item in items if isinstance(item, LidarScene)]
video_scenes = [item for item in items if isinstance(item, VideoScene)]
if dataset_items and (lidar_scenes or video_scenes):
raise Exception(
"You must append either DatasetItems or Scenes to the dataset."
)
if scenes:
if lidar_scenes:
assert (
asynchronous
), "In order to avoid timeouts, you must set asynchronous=True when uploading scenes."
), "In order to avoid timeouts, you must set asynchronous=True when uploading 3D scenes."

return self._append_scenes(scenes, update, asynchronous)
return self._append_scenes(lidar_scenes, update, asynchronous)
if video_scenes:
assert (
asynchronous
), "In order to avoid timeouts, you must set asynchronous=True when uploading videos."

return self._append_video_scenes(
video_scenes, update, asynchronous
)

check_for_duplicate_reference_ids(dataset_items)

Expand Down Expand Up @@ -601,6 +613,51 @@ def _append_scenes(
)
return response

def _append_video_scenes(
self,
scenes: List[VideoScene],
update: Optional[bool] = False,
asynchronous: Optional[bool] = False,
) -> Union[dict, AsyncJob]:
# TODO: make private in favor of Dataset.append invocation
if not self.is_scene:
raise Exception(
"Your dataset is not a scene dataset but only supports single dataset items. "
"In order to be able to add scenes, please create another dataset with "
"client.create_dataset(<dataset_name>, is_scene=True) or add the scenes to "
"an existing scene dataset."
)

for scene in scenes:
scene.validate()

if not asynchronous:
print(
"WARNING: Processing videos usually takes several seconds. As a result, synchronous video scene upload"
"requests are likely to timeout. For large uploads, we recommend using the flag asynchronous=True "
"to avoid HTTP timeouts. Please see"
"https://dashboard.scale.com/nucleus/docs/api?language=python#guide-for-large-ingestions"
" for details."
)

if asynchronous:
# TODO check_all_scene_paths_remote(scenes)
request_id = serialize_and_write_to_presigned_url(
scenes, self.id, self._client
)
response = self._client.make_request(
payload={REQUEST_ID_KEY: request_id, UPDATE_KEY: update},
route=f"{self.id}/upload_video_scenes?async=1",
)
return AsyncJob.from_json(response, self._client)

payload = construct_append_scenes_payload(scenes, update)
response = self._client.make_request(
payload=payload,
route=f"{self.id}/upload_video_scenes",
)
return response

def iloc(self, i: int) -> dict:
"""Retrieves dataset item by absolute numerical index.
Expand Down Expand Up @@ -1082,13 +1139,14 @@ def get_scene(self, reference_id: str) -> Scene:
:class:`Scene<LidarScene>`: A scene object containing frames, which
in turn contain pointcloud or image items.
"""
return LidarScene.from_json(
self._client.make_request(
payload=None,
route=f"dataset/{self.id}/scene/{reference_id}",
requests_command=requests.get,
)
response = self._client.make_request(
payload=None,
route=f"dataset/{self.id}/scene/{reference_id}",
requests_command=requests.get,
)
if VIDEO_UPLOAD_TYPE_KEY in response:
return VideoScene.from_json(response)
return LidarScene.from_json(response)

def export_predictions(self, model):
"""Fetches all predictions of a model that were uploaded to the dataset.
Expand Down
67 changes: 45 additions & 22 deletions nucleus/dataset_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
TYPE_KEY,
UPLOAD_TO_SCALE_KEY,
URL_KEY,
VIDEO_FRAME_URL_KEY,
W_KEY,
X_KEY,
Y_KEY,
Expand Down Expand Up @@ -120,34 +121,42 @@ def to_payload(self) -> dict:
class DatasetItemType(Enum):
IMAGE = "image"
POINTCLOUD = "pointcloud"
VIDEO = "video"


@dataclass # pylint: disable=R0902
class DatasetItem: # pylint: disable=R0902
"""A dataset item is an image or pointcloud that has associated metadata.
"""A dataset item is an image, pointcloud or video frame that has associated metadata.
Note: for 3D data, please include a :class:`CameraParams` object under a key named
"camera_params" within the metadata dictionary. This will allow for projecting
3D annotations to any image within a scene.
Args:
image_location (Optional[str]): Required if pointcloud_location not present: The
location containing the image for the given row of data. This can be a
local path, or a remote URL. Remote formats supported include any URL
(``http://`` or ``https://``) or URIs for AWS S3, Azure, or GCS
(i.e. ``s3://``, ``gcs://``).
pointcloud_location (Optional[str]): Required if image_location not
present: The remote URL containing the pointcloud JSON. Remote
formats supported include any URL (``http://`` or ``https://``) or
URIs for AWS S3, Azure, or GCS (i.e. ``s3://``, ``gcs://``).
image_location (Optional[str]): Required if pointcloud_location and
video_frame_location are not present: The location containing the image for
the given row of data. This can be a local path, or a remote URL. Remote
formats supported include any URL (``http://`` or ``https://``) or URIs for
AWS S3, Azure, or GCS (i.e. ``s3://``, ``gcs://``).
pointcloud_location (Optional[str]): Required if image_location and
video_frame_location are not present: The remote URL containing the
pointcloud JSON. Remote formats supported include any URL (``http://``
or ``https://``) or URIs for AWS S3, Azure, or GCS (i.e. ``s3://``,
``gcs://``).
video_frame_location (Optional[str]): Required if image_location and
pointcloud_location are not present: The remote URL containing the
video frame image. Remote formats supported include any URL (``http://``
or ``https://``) or URIs for AWS S3, Azure, or GCS (i.e. ``s3://``,
``gcs://``).
reference_id (Optional[str]): A user-specified identifier to reference the
item.
metadata (Optional[dict]): Extra information about the particular
dataset item. ints, floats, string values will be made searchable in
the query bar by the key in this dict For example, ``{"animal":
the query bar by the key in this dict. For example, ``{"animal":
"dog"}`` will become searchable via ``metadata.animal = "dog"``.
Categorical data can be passed as a string and will be treated
Expand Down Expand Up @@ -190,9 +199,10 @@ class DatasetItem: # pylint: disable=R0902
upload_to_scale (Optional[bool]): Set this to false in order to use
`privacy mode <https://nucleus.scale.com/docs/privacy-mode>`_.
Setting this to false means the actual data within the item (i.e. the
image or pointcloud) will not be uploaded to scale meaning that you can
send in links that are only accessible to certain users, and not to Scale.
Setting this to false means the actual data within the item will not be
uploaded to scale meaning that you can send in links that are only accessible
to certain users, and not to Scale. Skipping upload to Scale is currently only
implemented for images.
"""

image_location: Optional[str] = None
Expand All @@ -202,23 +212,33 @@ class DatasetItem: # pylint: disable=R0902
metadata: Optional[dict] = None
pointcloud_location: Optional[str] = None
upload_to_scale: Optional[bool] = True
video_frame_location: Optional[str] = None

def __post_init__(self):
assert self.reference_id != "DUMMY_VALUE", "reference_id is required."
assert bool(self.image_location) != bool(
self.pointcloud_location
), "Must specify exactly one of the image_location, pointcloud_location parameters"
if self.pointcloud_location and not self.upload_to_scale:
assert (
bool(self.image_location)
+ bool(self.pointcloud_location)
+ bool(self.video_frame_location)
== 1
), "Must specify exactly one of the image_location, pointcloud_location, video_frame_location parameters"
if (
self.pointcloud_location or self.video_frame_location
) and not self.upload_to_scale:
raise NotImplementedError(
"Skipping upload to Scale is not currently implemented for pointclouds."
"Skipping upload to Scale is not currently implemented for pointclouds and videos."
)
self.local = (
is_local_path(self.image_location) if self.image_location else None
)
self.type = (
DatasetItemType.IMAGE
if self.image_location
else DatasetItemType.POINTCLOUD
else (
DatasetItemType.POINTCLOUD
if self.pointcloud_location
else DatasetItemType.VIDEO
)
)
camera_params = (
self.metadata.get(CAMERA_PARAMS_KEY, None)
Expand All @@ -238,6 +258,7 @@ def from_json(cls, payload: dict):
return cls(
image_location=image_url,
pointcloud_location=payload.get(POINTCLOUD_URL_KEY, None),
video_frame_location=payload.get(VIDEO_FRAME_URL_KEY, None),
reference_id=payload.get(REFERENCE_ID_KEY, None),
metadata=payload.get(METADATA_KEY, {}),
upload_to_scale=payload.get(UPLOAD_TO_SCALE_KEY, True),
Expand All @@ -260,13 +281,15 @@ def to_payload(self, is_scene=False) -> dict:
payload[URL_KEY] = self.image_location
elif self.pointcloud_location:
payload[URL_KEY] = self.pointcloud_location
elif self.video_frame_location:
payload[URL_KEY] = self.video_frame_location
payload[TYPE_KEY] = self.type.value
if self.camera_params:
payload[CAMERA_PARAMS_KEY] = self.camera_params.to_payload()
else:
assert (
self.image_location
), "Must specify image_location for DatasetItems not in a LidarScene"
), "Must specify image_location for DatasetItems not in a LidarScene or VideoScene"
payload[IMAGE_URL_KEY] = self.image_location
payload[UPLOAD_TO_SCALE_KEY] = self.upload_to_scale

Expand Down
5 changes: 3 additions & 2 deletions nucleus/payload_constructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
PolygonPrediction,
SegmentationPrediction,
)
from .scene import LidarScene
from .scene import LidarScene, VideoScene


def construct_append_payload(
Expand All @@ -50,7 +50,8 @@ def construct_append_payload(


def construct_append_scenes_payload(
scene_list: List[LidarScene], update: Optional[bool] = False
scene_list: Union[List[LidarScene], List[VideoScene]],
update: Optional[bool] = False,
) -> dict:
scenes = []
for scene in scene_list:
Expand Down
Loading

0 comments on commit eac724d

Please sign in to comment.