From 61722dc6ea97af87072e4171ee53d3130e175088 Mon Sep 17 00:00:00 2001 From: CPBridge Date: Tue, 21 May 2024 17:46:35 +0000 Subject: [PATCH] Add option to process a single file --- .../pan_cancer_nuclei_seg/__main__.py | 356 ++++++++++-------- 1 file changed, 190 insertions(+), 166 deletions(-) diff --git a/src/idc_annotation_conversion/pan_cancer_nuclei_seg/__main__.py b/src/idc_annotation_conversion/pan_cancer_nuclei_seg/__main__.py index 49c7184..9d7e132 100644 --- a/src/idc_annotation_conversion/pan_cancer_nuclei_seg/__main__.py +++ b/src/idc_annotation_conversion/pan_cancer_nuclei_seg/__main__.py @@ -24,6 +24,7 @@ ANNOTATION_BUCKET = "tcia-nuclei-seg" +ANNOTATION_PREFIX = 'cnn-nuclear-segmentations-2019/data-files/' COLLECTIONS = [ @@ -121,11 +122,19 @@ def get_dicom_web_client( help="Collections to use, all by default.", show_choices=True, ) +@click.option( + "-l", + "--annotation-blob", + help=( + "Specify a single annotation blob to process, using its path within " + "the bucket." + ), +) @click.option( "--number", "-n", type=int, - help="Number to process per collection. All by default.", + help="Number of annotations to process. All by default.", ) @click.option( "--output-dir", @@ -273,6 +282,7 @@ def run( segmentation_type: str, dimension_organization_type: str, create_pyramid: bool, + annotation_blob: Optional[str] = None, dicom_archive: Optional[str] = None, archive_token_url: Optional[str] = None, archive_client_id: Optional[str] = None, @@ -342,193 +352,207 @@ def run( errors = [] - # Loop over requested collections - for collection in collections: - prefix = f'cnn-nuclear-segmentations-2019/data-files/{collection}/' - - if output_dir is not None: - collection_dir = output_dir / collection - collection_dir.mkdir(exist_ok=True) + if annotation_blob is not None: + ann_blob = ann_bucket.get_blob(annotation_blob) + if not ann_blob.exists(): + raise RuntimeError(f"No such blob found: {ann_blob.name}") + ann_blobs = [ann_blob] + else: + ann_blobs = [] + + # Loop over requested collections + for collection in collections: + collection_prefix = f'{ANNOTATION_PREFIX}{collection}/' + + ann_blobs.extend( + [ + b for b in ann_bucket.list_blobs(prefix=collection_prefix) + if b.name.endswith('.svs.tar.gz') + ] + ) - ann_blobs = [ - b for b in ann_bucket.list_blobs(prefix=prefix) - if b.name.endswith('.svs.tar.gz') - ] if number is not None: ann_blobs = ann_blobs[:number] - # Loop over annotations in the bucket for this collection - for ann_blob in ann_blobs: + # Loop over all annotations + for ann_blob in ann_blobs: - image_start_time = time() + image_start_time = time() - # Massage the blob name to derive container information - # eg TCGA-05-4244-01Z-00-DX1.d4ff32cd-38cf-40ea-8213-45c2b100ac01 - filename = ( - ann_blob.name - .replace(prefix, '') - .split('/')[0] - .replace('.svs.tar.gz', '') - ) + collection = ann_blob.name.split("/")[-3] + collection_prefix = f'{ANNOTATION_PREFIX}{collection}/' - # eg TCGA-05-4244-01Z-00-DX1, d4ff32cd-38cf-40ea-8213-45c2b100ac01 - if '.' in filename: - container_id, _ = filename.split('.') - else: - container_id = filename - - logging.info(f"Processing container: {container_id}") - - try: - - selection_query = f""" - SELECT - gcs_url, - Cast(NumberOfFrames AS int) AS NumberOfFrames - FROM - bigquery-public-data.idc_current.dicom_all - WHERE - ContainerIdentifier='{container_id}' - ORDER BY - NumberOfFrames DESC - """ - selection_result = bq_client.query(selection_query) - selection_df = selection_result.result().to_dataframe() - - if len(selection_df) == 0: - # No image found, skip this for now - logging.error( - f"Could not locate image for container {container_id}." - ) - continue - - source_images = [] - for i, url in enumerate(selection_df.gcs_url): - blob_name = "/".join(url.split("/")[3:]) - wsi_dcm = cloud_io.read_dataset_from_blob( - bucket=public_bucket, - blob_name=blob_name, - ) - source_images.append(wsi_dcm) + if output_dir is not None: + collection_dir = output_dir / collection + collection_dir.mkdir(exist_ok=True) - # Store to disk - if output_dir is not None and store_wsi_dicom: - wsi_path = ( - collection_dir / f"{container_id}_im_{i}.dcm" - ) - wsi_dcm.save_as(wsi_path) - - # Store to DICOM archive - if dicom_archive is not None: - web_client = get_dicom_web_client( - url=dicom_archive, - token_url=archive_token_url, - client_id=archive_client_id, - client_secret=archive_client_secret, - ) - web_client.store_instances([wsi_dcm]) - - ann_dcm, seg_dcms = convert_annotations( - annotation_csvs=iter_csvs(ann_blob), - source_images=source_images, - include_segmentation=with_segmentation, - segmentation_type=segmentation_type, - annotation_coordinate_type=annotation_coordinate_type, - dimension_organization_type=dimension_organization_type, - create_pyramid=create_pyramid, - graphic_type=graphic_type, - workers=workers, - ) + # Massage the blob name to derive container information + # eg TCGA-05-4244-01Z-00-DX1.d4ff32cd-38cf-40ea-8213-45c2b100ac01 + filename = ( + ann_blob.name + .replace(collection_prefix, '') + .split('/')[0] + .replace('.svs.tar.gz', '') + ) - # Store objects to bucket - if store_bucket: - if output_bucket is None: - data_str = (datetime.date.today()) - output_bucket = ( - "pan_cancer_nuclei_seg_annotation_" - f"conversion_{data_str}" - ) - output_bucket_obj = output_client.bucket(output_bucket) + # eg TCGA-05-4244-01Z-00-DX1, d4ff32cd-38cf-40ea-8213-45c2b100ac01 + if '.' in filename: + container_id, _ = filename.split('.') + else: + container_id = filename - if not output_bucket_obj.exists(): - output_bucket_obj.create( - location=cloud_config.GCP_DEFAULT_LOCATION - ) + logging.info(f"Processing container: {container_id}") - blob_root = ( - "" if output_prefix is None else f"{output_prefix}/" - ) - ann_blob_name = ( - f"{blob_root}{collection}/{container_id}_ann.dcm" - ) + try: - logging.info(f"Uploading annotation to {ann_blob_name}.") - cloud_io.write_dataset_to_blob( - ann_dcm, - output_bucket_obj, - ann_blob_name, + selection_query = f""" + SELECT + gcs_url, + Cast(NumberOfFrames AS int) AS NumberOfFrames + FROM + bigquery-public-data.idc_current.dicom_all + WHERE + ContainerIdentifier='{container_id}' + ORDER BY + NumberOfFrames DESC + """ + selection_result = bq_client.query(selection_query) + selection_df = selection_result.result().to_dataframe() + + if len(selection_df) == 0: + # No image found, skip this for now + logging.error( + f"Could not locate image for container {container_id}." + ) + continue + + source_images = [] + for i, url in enumerate(selection_df.gcs_url): + blob_name = "/".join(url.split("/")[3:]) + wsi_dcm = cloud_io.read_dataset_from_blob( + bucket=public_bucket, + blob_name=blob_name, + ) + source_images.append(wsi_dcm) + + # Store to disk + if output_dir is not None and store_wsi_dicom: + wsi_path = ( + collection_dir / f"{container_id}_im_{i}.dcm" ) - if with_segmentation: - for s, seg_dcm in enumerate(seg_dcms): - seg_blob_name = ( - f"{blob_root}{collection}/{container_id}_seg_{s}.dcm" - ) - logging.info( - f"Uploading segmentation to {seg_blob_name}." - ) - cloud_io.write_dataset_to_blob( - seg_dcm, - output_bucket_obj, - seg_blob_name, - ) - - # Store objects to filesystem - if output_dir is not None: - ann_path = collection_dir / f"{container_id}_ann.dcm" - - logging.info(f"Writing annotation to {str(ann_path)}.") - ann_dcm.save_as(ann_path) - - if with_segmentation: - for s, seg_dcm in enumerate(seg_dcms): - seg_path = collection_dir / f"{container_id}_seg_{s}.dcm" - logging.info(f"Writing segmentation to {str(seg_path)}.") - seg_dcm.save_as(seg_path) - - # Store objects to DICOM archive + wsi_dcm.save_as(wsi_path) + + # Store to DICOM archive if dicom_archive is not None: - # Recreate client each time to deal with token expiration web_client = get_dicom_web_client( url=dicom_archive, token_url=archive_token_url, client_id=archive_client_id, client_secret=archive_client_secret, ) + web_client.store_instances([wsi_dcm]) + + ann_dcm, seg_dcms = convert_annotations( + annotation_csvs=iter_csvs(ann_blob), + source_images=source_images, + include_segmentation=with_segmentation, + segmentation_type=segmentation_type, + annotation_coordinate_type=annotation_coordinate_type, + dimension_organization_type=dimension_organization_type, + create_pyramid=create_pyramid, + graphic_type=graphic_type, + workers=workers, + ) + + # Store objects to bucket + if store_bucket: + if output_bucket is None: + data_str = (datetime.date.today()) + output_bucket = ( + "pan_cancer_nuclei_seg_annotation_" + f"conversion_{data_str}" + ) + output_bucket_obj = output_client.bucket(output_bucket) + + if not output_bucket_obj.exists(): + output_bucket_obj.create( + location=cloud_config.GCP_DEFAULT_LOCATION + ) + + blob_root = ( + "" if output_prefix is None else f"{output_prefix}/" + ) + ann_blob_name = ( + f"{blob_root}{collection}/{container_id}_ann.dcm" + ) - logging.info(f"Writing annotation to {dicom_archive}.") - web_client.store_instances([ann_dcm]) - - if with_segmentation: - logging.info(f"Writing segmentation(s) to {dicom_archive}.") - for seg_dcm in seg_dcms: - web_client.store_instances([seg_dcm]) - - image_stop_time = time() - time_for_image = image_stop_time - image_start_time - logging.info(f"Processed {container_id} in {time_for_image:.2f}s") - - except Exception as e: - logging.error(f"Error {str(e)}") - errors.append( - { - "collection": collection, - "container_id": container_id, - "error_message": str(e), - "datetime": str(datetime.datetime.now()), - } + logging.info(f"Uploading annotation to {ann_blob_name}.") + cloud_io.write_dataset_to_blob( + ann_dcm, + output_bucket_obj, + ann_blob_name, ) - errors_df = pd.DataFrame(errors) - errors_df.to_csv("error_log.csv") + if with_segmentation: + for s, seg_dcm in enumerate(seg_dcms): + seg_blob_name = ( + f"{blob_root}{collection}/{container_id}_seg_{s}.dcm" + ) + logging.info( + f"Uploading segmentation to {seg_blob_name}." + ) + cloud_io.write_dataset_to_blob( + seg_dcm, + output_bucket_obj, + seg_blob_name, + ) + + # Store objects to filesystem + if output_dir is not None: + ann_path = collection_dir / f"{container_id}_ann.dcm" + + logging.info(f"Writing annotation to {str(ann_path)}.") + ann_dcm.save_as(ann_path) + + if with_segmentation: + for s, seg_dcm in enumerate(seg_dcms): + seg_path = collection_dir / f"{container_id}_seg_{s}.dcm" + logging.info(f"Writing segmentation to {str(seg_path)}.") + seg_dcm.save_as(seg_path) + + # Store objects to DICOM archive + if dicom_archive is not None: + # Recreate client each time to deal with token expiration + web_client = get_dicom_web_client( + url=dicom_archive, + token_url=archive_token_url, + client_id=archive_client_id, + client_secret=archive_client_secret, + ) + + logging.info(f"Writing annotation to {dicom_archive}.") + web_client.store_instances([ann_dcm]) + + if with_segmentation: + logging.info(f"Writing segmentation(s) to {dicom_archive}.") + for seg_dcm in seg_dcms: + web_client.store_instances([seg_dcm]) + + image_stop_time = time() + time_for_image = image_stop_time - image_start_time + logging.info(f"Processed {container_id} in {time_for_image:.2f}s") + + except Exception as e: + logging.error(f"Error {str(e)}") + errors.append( + { + "collection": collection, + "container_id": container_id, + "error_message": str(e), + "datetime": str(datetime.datetime.now()), + } + ) + errors_df = pd.DataFrame(errors) + errors_df.to_csv("error_log.csv") if __name__ == "__main__":