From 8bad017172980b2947d6dad6aaab176fa21c35c6 Mon Sep 17 00:00:00 2001 From: Minkyu Choi Date: Sun, 17 Dec 2023 07:36:07 +0900 Subject: [PATCH] init --- pyproject.toml | 16 +- run_scripts/run_1_synthetic_tlv.py | 114 +++++ .../run_2_synthetic_tlv_until_long_horizon.py | 114 +++++ run_scripts/run_3_real_tlv.py | 88 ++++ tlv_dataset/.DS_Store | Bin 0 -> 6148 bytes tlv_dataset/__init__.py | 2 + tlv_dataset/common/__init__.py | 0 tlv_dataset/common/frame_grouping.py | 71 +++ tlv_dataset/common/ltl_utility.py | 36 ++ tlv_dataset/common/omegaconf.py | 22 + tlv_dataset/common/utility.py | 70 +++ tlv_dataset/data/__init__.py | 5 + tlv_dataset/data/tlv_dataset.py | 65 +++ tlv_dataset/data/tlv_raw_image.py | 113 +++++ tlv_dataset/generator/.DS_Store | Bin 0 -> 6148 bytes tlv_dataset/generator/__init__.py | 1 + tlv_dataset/generator/_base.py | 12 + tlv_dataset/generator/real_tlv_generator.py | 291 +++++++++++ .../generator/synthetic_tlv_generator.py | 455 ++++++++++++++++++ tlv_dataset/label_mapper/__init__.py | 0 .../label_mapper/metadata/imagenet_to_coco.py | 160 ++++++ .../label_mapper/metadata/nuscenes_to_coco.py | 35 ++ .../label_mapper/metadata/waymo_to_coco.py | 6 + tlv_dataset/loader/__init__.py | 84 ++++ tlv_dataset/loader/_base.py | 22 + tlv_dataset/loader/cifar.py | 169 +++++++ tlv_dataset/loader/coco.py | 134 ++++++ tlv_dataset/loader/imagenet.py | 281 +++++++++++ tlv_dataset/loader/nuscenes.py | 315 ++++++++++++ tlv_dataset/loader/waymo.py | 20 + 30 files changed, 2693 insertions(+), 8 deletions(-) create mode 100644 run_scripts/run_1_synthetic_tlv.py create mode 100644 run_scripts/run_2_synthetic_tlv_until_long_horizon.py create mode 100644 run_scripts/run_3_real_tlv.py create mode 100644 tlv_dataset/.DS_Store create mode 100644 tlv_dataset/__init__.py create mode 100644 tlv_dataset/common/__init__.py create mode 100644 tlv_dataset/common/frame_grouping.py create mode 100644 tlv_dataset/common/ltl_utility.py create mode 100644 tlv_dataset/common/omegaconf.py create mode 100644 tlv_dataset/common/utility.py create mode 100644 tlv_dataset/data/__init__.py create mode 100644 tlv_dataset/data/tlv_dataset.py create mode 100644 tlv_dataset/data/tlv_raw_image.py create mode 100644 tlv_dataset/generator/.DS_Store create mode 100644 tlv_dataset/generator/__init__.py create mode 100644 tlv_dataset/generator/_base.py create mode 100644 tlv_dataset/generator/real_tlv_generator.py create mode 100644 tlv_dataset/generator/synthetic_tlv_generator.py create mode 100644 tlv_dataset/label_mapper/__init__.py create mode 100644 tlv_dataset/label_mapper/metadata/imagenet_to_coco.py create mode 100644 tlv_dataset/label_mapper/metadata/nuscenes_to_coco.py create mode 100644 tlv_dataset/label_mapper/metadata/waymo_to_coco.py create mode 100644 tlv_dataset/loader/__init__.py create mode 100644 tlv_dataset/loader/_base.py create mode 100644 tlv_dataset/loader/cifar.py create mode 100644 tlv_dataset/loader/coco.py create mode 100644 tlv_dataset/loader/imagenet.py create mode 100644 tlv_dataset/loader/nuscenes.py create mode 100644 tlv_dataset/loader/waymo.py diff --git a/pyproject.toml b/pyproject.toml index c6a0596..85cb2e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,14 +3,14 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [project] -name = "your_project_name" -version = "0.0.1" +name = "tlv_data" +version = "1.0.0" authors = [ - { name="Example Author", email="author@example.com" }, + { name="Minkyu Choi", email="minkyu.choi@utexas.edu" }, ] -description = "A small example package" +description = "Temporal Logic Video (TLV) Dataset" readme = "README.md" -requires-python = ">=3.9" +requires-python = ">=3.8" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -40,9 +40,9 @@ select = [ "D", # pydocstyle ] -[tool.ruff.pydocstyle] -convention = "google" - ignore = ["ANN101", "ANN102"] extend-exclude = [".venv", "venv", "vscode"] + +[tool.ruff.pydocstyle] +convention = "google" diff --git a/run_scripts/run_1_synthetic_tlv.py b/run_scripts/run_1_synthetic_tlv.py new file mode 100644 index 0000000..4575f8b --- /dev/null +++ b/run_scripts/run_1_synthetic_tlv.py @@ -0,0 +1,114 @@ +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + # - - - - - - - COMMON ARGUMENTS - - - - - - - # + parser.add_argument( + "--type_of_generator", + type=str, + default="synthetic", + choices=["synthetic", "real"], + ) + parser.add_argument( + "--dataloader", + type=str, + default="coco", + choices=[ + "coco", + "imagenet", + "cifar10", + "cifar100", + "waymo", + "nuscenes", + ], + ) + parser.add_argument( + "--save_dir", + type=str, + default="", + ) + # - - - - - - Synthetic Generator - - - - - - # + parser.add_argument( + "--initial_number_of_frame", + type=int, + default=25, + ) + parser.add_argument( + "--max_number_frame", + type=int, + default=200, + ) + parser.add_argument( + "--number_video_per_set_of_frame", + type=int, + default=3, + ) + parser.add_argument( + "--increase_rate", + type=int, + default=3, + ) + parser.add_argument( + "--ltl_logic", + type=str, + default="all", + choices=["all", "F prop1", "G prop1", "prop1 U prop2", "prop1 & prop2"], + ) + parser.add_argument( + "--save_frames", + type=bool, + default=False, + ) + + # Start. + args = parser.parse_args() + + if args.type_of_generator == "synthetic": + from tlv_dataset.generator.synthetic_tlv_generator import ( + SyntheticTLVGenerator, + ) + from tlv_dataset.loader.coco import COCOImageLoader + from tlv_dataset.loader.imagenet import ImageNetDataloader + from tlv_dataset.loader.cifar import Cifar10ImageLoader + from tlv_dataset.loader.cifar import Cifar100ImageLoader + + valid_dataloader = ["coco", "imagenet", "cifar10", "cifar100"] + assert ( + args.dataloader in valid_dataloader + ), "please use valide dataloader for synthetic tlv dataset: coco, imagenet,cifar10, cifar100" + if args.dataloader == "coco": + dataloader = COCOImageLoader() + + elif args.dataloader == "imagenet": + dataloader = ImageNetDataloader() + + elif args.dataloader == "cifar10": + dataloader = Cifar10ImageLoader() + + elif args.dataloader == "cifar100": + dataloader = Cifar100ImageLoader() + + TLV_generator = SyntheticTLVGenerator( + dataloader=dataloader, save_dir=args.save_dir + ) + + if args.ltl_logic == "all": + available_tl = [ + "F prop1", + "G prop1", + "prop1 U prop2", + "prop1 & prop2", + ] + else: + available_tl = [args.ltl_logic] + for tl in available_tl: + TLV_generator.generate( + initial_number_of_frame=args.initial_number_of_frame, + max_number_frame=args.max_number_frame, + number_video_per_set_of_frame=args.number_video_per_set_of_frame, + increase_rate=args.increase_rate, + ltl_logic=tl, + present_prop1_till_prop2=args.present_prop1_till_prop2, + save_frames=args.save_frames, + ) diff --git a/run_scripts/run_2_synthetic_tlv_until_long_horizon.py b/run_scripts/run_2_synthetic_tlv_until_long_horizon.py new file mode 100644 index 0000000..4575f8b --- /dev/null +++ b/run_scripts/run_2_synthetic_tlv_until_long_horizon.py @@ -0,0 +1,114 @@ +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + # - - - - - - - COMMON ARGUMENTS - - - - - - - # + parser.add_argument( + "--type_of_generator", + type=str, + default="synthetic", + choices=["synthetic", "real"], + ) + parser.add_argument( + "--dataloader", + type=str, + default="coco", + choices=[ + "coco", + "imagenet", + "cifar10", + "cifar100", + "waymo", + "nuscenes", + ], + ) + parser.add_argument( + "--save_dir", + type=str, + default="", + ) + # - - - - - - Synthetic Generator - - - - - - # + parser.add_argument( + "--initial_number_of_frame", + type=int, + default=25, + ) + parser.add_argument( + "--max_number_frame", + type=int, + default=200, + ) + parser.add_argument( + "--number_video_per_set_of_frame", + type=int, + default=3, + ) + parser.add_argument( + "--increase_rate", + type=int, + default=3, + ) + parser.add_argument( + "--ltl_logic", + type=str, + default="all", + choices=["all", "F prop1", "G prop1", "prop1 U prop2", "prop1 & prop2"], + ) + parser.add_argument( + "--save_frames", + type=bool, + default=False, + ) + + # Start. + args = parser.parse_args() + + if args.type_of_generator == "synthetic": + from tlv_dataset.generator.synthetic_tlv_generator import ( + SyntheticTLVGenerator, + ) + from tlv_dataset.loader.coco import COCOImageLoader + from tlv_dataset.loader.imagenet import ImageNetDataloader + from tlv_dataset.loader.cifar import Cifar10ImageLoader + from tlv_dataset.loader.cifar import Cifar100ImageLoader + + valid_dataloader = ["coco", "imagenet", "cifar10", "cifar100"] + assert ( + args.dataloader in valid_dataloader + ), "please use valide dataloader for synthetic tlv dataset: coco, imagenet,cifar10, cifar100" + if args.dataloader == "coco": + dataloader = COCOImageLoader() + + elif args.dataloader == "imagenet": + dataloader = ImageNetDataloader() + + elif args.dataloader == "cifar10": + dataloader = Cifar10ImageLoader() + + elif args.dataloader == "cifar100": + dataloader = Cifar100ImageLoader() + + TLV_generator = SyntheticTLVGenerator( + dataloader=dataloader, save_dir=args.save_dir + ) + + if args.ltl_logic == "all": + available_tl = [ + "F prop1", + "G prop1", + "prop1 U prop2", + "prop1 & prop2", + ] + else: + available_tl = [args.ltl_logic] + for tl in available_tl: + TLV_generator.generate( + initial_number_of_frame=args.initial_number_of_frame, + max_number_frame=args.max_number_frame, + number_video_per_set_of_frame=args.number_video_per_set_of_frame, + increase_rate=args.increase_rate, + ltl_logic=tl, + present_prop1_till_prop2=args.present_prop1_till_prop2, + save_frames=args.save_frames, + ) diff --git a/run_scripts/run_3_real_tlv.py b/run_scripts/run_3_real_tlv.py new file mode 100644 index 0000000..5416716 --- /dev/null +++ b/run_scripts/run_3_real_tlv.py @@ -0,0 +1,88 @@ +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + # - - - - - - - COMMON ARGUMENTS - - - - - - - # + parser.add_argument( + "--type_of_generator", + type=str, + default="synthetic", + choices=["synthetic", "real"], + ) + parser.add_argument( + "--dataloader", + type=str, + default="coco", + choices=[ + "coco", + "imagenet", + "cifar10", + "cifar100", + "waymo", + "nuscenes", + ], + ) + parser.add_argument( + "--save_dir", + type=str, + default="", + ) + # - - - - - - Real Generator - - - - - - # + parser.add_argument( + "--unique_propositions", + type=list, + default=None, + ) + parser.add_argument( + "--tlv_data_dir", + type=str, + default=None, + ) + # - - - - - - Nuscenes Loader - - - - - - # + parser.add_argument( + "--dataroot", + type=str, + default=None, + ) + parser.add_argument( + "--version", + type=str, + default="v1.0-mini", + ) + parser.add_argument( + "--verbose", + type=bool, + default=False, + ) + + # Start. + args = parser.parse_args() + + if args.type_of_generator == "real": + from tlv_dataset.generator.real_tlv_generator import RealTLVGenerator + from tlv_dataset.loader.nuscenes import NuScenesImageLoader + from tlv_dataset.loader.waymo import WaymoImageLoader + + valid_dataloader = ["waymo", "nuscenes"] + assert ( + args.dataloader in valid_dataloader + ), "please use valide dataloader for real tlv dataset: waymo, nuscenes" + + if args.dataloader == "waymo": + dataloader = WaymoImageLoader() + + elif args.dataloader == "nuscenes": + dataloader = NuScenesImageLoader( + dataroot=args.dataroot, + version=args.version, + verbose=args.verbose, + ) + + TLV_generator = RealTLVGenerator( + dataloader=dataloader, + save_dir=args.save_dir, + unique_propositions=args.unique_propositions, + tlv_data_dir=args.tlv_data_dir, + ) + TLV_generator.generate() diff --git a/tlv_dataset/.DS_Store b/tlv_dataset/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..0003f3112e6a29601d9f38a00cd39c881c925d1e GIT binary patch literal 6148 zcmeHK%}T>S5T0#on^1%v6nb3nTCnw}P`rd%U%-eSRBA#(4aRIqQ*$VVob`o#5}(JJ z-K|(#uOjUZ%zm>wGYRu$+06g|>raCYKm!0QRKk*t!x|w!>57!}N(GV0XQbdk1_Dw) zUx_vg|B(Uub~cP4gaq~5_nVCUSor9D_s4NG$=dA?Rw`F^sx_--)vbH?QRHssWz$LD zOK)j*B}D8mY|p=r!hG1+I~PgjMM;>-geV9xgMg~{^Npx_siq(GF4!?>7%&WMHv{_QwCdYUgQ?yyU>I1#0G$sUmC(^xD3nJB z4zdM6q~Ay>Xj3miIb5Tou~3K}6sA%URVvdh22<&{&UKulu~4Yef$8Rh={Gaop)mP& zd_I@qz#N5UGz=I9mKi9kX@lsR3>XGBiUC#`xPv~Hr0>?1;^?mBs1K+l p6jvzxN 0: + # Normalize data to ensure all elements are lists + data = [[x] if not isinstance(x, list) else x for x in data] + + # Sort the data based on the first element of each sublist + data.sort(key=lambda x: x[0]) + + combined_lists = [data[0]] + + for sublist in data[1:]: + # Check if the last number of the previous sublist is consecutive to the first number of the current sublist + if sublist[0] - combined_lists[-1][-1] == 1: + # If the current sublist is single-item and the previous sublist is also single-item, combine them + if len(sublist) == len(combined_lists[-1]) == 1: + combined_lists[-1].extend(sublist) + # If the current sublist is single-item but the previous sublist is multi-item, append it + elif len(sublist) == 1 and len(combined_lists[-1]) > 1: + combined_lists[-1].append(sublist[0]) + # Otherwise, start a new group + else: + combined_lists.append(sublist) + else: + combined_lists.append(sublist) + + return combined_lists + else: + return [] + + +def prop1_u_prop2_grouping(my_list): + # To hold the indices of non-None elements + groups = [] + current_group = [] + + for i, elem in enumerate(my_list): + if elem is not None: + current_group.append(i) + elif current_group: + groups.append(current_group) + current_group = [] + + # Append the last group if it exists + if current_group: + groups.append(current_group) + return groups + + +if __name__ == "__main__": + # data = [1, 2, [3], [4, 5], 9, 21] + # # data = [[2, 4, 6], [9, 21]] + # # data = [[1], [2], [3], [5], [7], [9], [10], [21]] + # print(combine_consecutive_lists(data)) + my_list = [ + ["book", "toilet"], + ["book", "toilet"], + ["book", "toilet"], + None, + ["book", "toilet"], + "hello", + None, + None, + None, + None, + ["book", "toilet"], + ["book", "toilet"], + None, + "hello", + None, + ] + print(prop1_u_prop2_grouping(my_list)) diff --git a/tlv_dataset/common/ltl_utility.py b/tlv_dataset/common/ltl_utility.py new file mode 100644 index 0000000..1f90f31 --- /dev/null +++ b/tlv_dataset/common/ltl_utility.py @@ -0,0 +1,36 @@ +import regex as re +from stormpy.core import ExplicitQualitativeCheckResult + + +def get_not_operator_mapping(ltl_formula): + operator_set = ["F", "G" "U"] + not_operator_proposition_list = list() + if len(ltl_formula.split("!")) > 2: + # more than one "!" + pass + else: + s = ltl_formula.split("!")[-1] + match = re.search(r'"(.*?)"', s) + if match: + prop_name = match.group(1).strip() + if prop_name not in operator_set: + not_operator_proposition_list.append(prop_name) + else: + raise ValueError("No ! match") + return not_operator_proposition_list + + +def verification_result_eval(verification_result: ExplicitQualitativeCheckResult): + # string result is "true" when is absolutely true + # but it returns "true, false" when we have some true and false + verification_result_str = str(verification_result) + string_result = verification_result_str.split("{")[-1].split("}")[0] + if len(string_result) == 4: + if string_result[0] == "t": # 0,6 + result = True + elif len(string_result) > 5: + # "true, false" -> some true and some false + result = "PartialTrue" + else: + result = False + return result diff --git a/tlv_dataset/common/omegaconf.py b/tlv_dataset/common/omegaconf.py new file mode 100644 index 0000000..cf3e35b --- /dev/null +++ b/tlv_dataset/common/omegaconf.py @@ -0,0 +1,22 @@ +from __future__ import annotations +from pathlib import Path +from omegaconf import DictConfig, ListConfig, OmegaConf + +def load_config_from_yaml(config_path: str, read_only=False) -> DictConfig: + """Load a yaml config file and return a DictConfig object.""" + config = OmegaConf.load(config_path) + if read_only: + OmegaConf.set_readonly(config, True) + return config + +def load_config_from_dict(config_dict: dict, read_only=False) -> DictConfig: + """Load a dictionary and return a DictConfig object.""" + config = OmegaConf.create(config_dict) + if read_only: + OmegaConf.set_readonly(config, True) + return config + +def save_config_to_yaml(config: DictConfig, config_path: str) -> None: + """Save a DictConfig object to a yaml file.""" + with Path(config_path) as fp: + OmegaConf.save(config=config, f=fp) \ No newline at end of file diff --git a/tlv_dataset/common/utility.py b/tlv_dataset/common/utility.py new file mode 100644 index 0000000..e581f7a --- /dev/null +++ b/tlv_dataset/common/utility.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import pickle +import random +from datetime import datetime +from pathlib import Path + + +def list_flatten(lst): + flattened = [] + for item in lst: + if isinstance(item, list): + flattened.extend(list_flatten(item)) + else: + flattened.append(item) + return flattened + + +def get_file_or_dir_with_datetime(base_name, ext="."): + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{base_name}_{current_time}{ext}" + + +def save_frames( + frames: list, path="/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/result", file_label: str = "" +) -> None: + """Save image to path. + + Args: + path (str, optional): Path to save image. + """ + path = Path(path) + path.mkdir(parents=True, exist_ok=True) + from PIL import Image + + for idx, img in enumerate(frames): + Image.fromarray(img).save(f"{path}/{file_label}_{idx}.png") + + +def save_dict_to_pickle(dict_obj: dict | object, path: str, file_name: str = "data.pkl"): + # Decode the JSON data into a Python object + # data_python = json.loads(dict_obj) + full_path = Path(path) / file_name + # Check if the path exists + if not Path(path).exists(): + # Create the directory, including any necessary parent directories + Path(path).mkdir(parents=True, exist_ok=True) + # Save the Python object using pickle + if full_path.exists(): + random_number = random.randint(0, 99999) + new_file_name = f"{full_path.stem}_{random_number}{full_path.suffix}" + full_path = full_path.parent / new_file_name + with open(full_path, "wb") as file: + pickle.dump(dict_obj, file) + + +def load_pickle_to_dict(path: str, file_name=None) -> dict | object: + if file_name is None: + if isinstance(path, str): + full_path = Path(path) + else: + full_path = path + else: + full_path = Path(path) / file_name + + # Load the Python object using pickle + with open(full_path, "rb") as file: + dict_obj = pickle.load(file) + + return dict_obj diff --git a/tlv_dataset/data/__init__.py b/tlv_dataset/data/__init__.py new file mode 100644 index 0000000..430188e --- /dev/null +++ b/tlv_dataset/data/__init__.py @@ -0,0 +1,5 @@ +from tlv_dataset.data.tlv_dataset import TLVDataset + +from tlv_dataset.data.tlv_raw_image import TLVRawImage, TLVRawImageDataset + +all = [TLVDataset, TLVRawImage, TLVRawImageDataset] diff --git a/tlv_dataset/data/tlv_dataset.py b/tlv_dataset/data/tlv_dataset.py new file mode 100644 index 0000000..79234f8 --- /dev/null +++ b/tlv_dataset/data/tlv_dataset.py @@ -0,0 +1,65 @@ +import dataclasses # noqa: D100 +import random +from pathlib import Path +from typing import Dict, List, Optional + +import cv2 +import numpy as np +import torch +from PIL import Image + +from tlv_dataset.common.frame_grouping import combine_consecutive_lists +from tlv_dataset.common.utility import get_file_or_dir_with_datetime + + +@dataclasses.dataclass +class TLVDataset: + """TLV Dataset Data Class. + + ground_truth (bool): Ground truth answer of LTL condition for frames + ltl_frame (str): LTL formula + number_of_frame (int): Number of frame + frames_of_interest (list): List of frames that satisfy LTL condition + - [[0]] -> Frame 0 satisfy LTL condition; + [[4,5,6,7]] -> Frame 4 to 7 satisfy LTL condition + [[0],[4,5,6,7]] -> Frame 0 and Frame 4 to 7 satisfy LTL condition. + labels_of_frame: list of labels of frame + """ + + ground_truth: bool + ltl_formula: str + proposition: list + number_of_frame: int + frames_of_interest: Optional[List[List[int]]] + labels_of_frames: List[str] + images_of_frames: List[np.ndarray] = dataclasses.field(default_factory=list) + + def __post_init__(self): + """Post init.""" + self.frames_of_interest = combine_consecutive_lists( + data=self.frames_of_interest + ) + + def save_frames( + self, path="/opt/Neuro-Symbolic-Video-Frame-Search/artifacts" + ) -> None: + """Save image to path. + + Args: + path (str, optional): Path to save image. + """ + from PIL import Image + + for idx, img in enumerate(self.images_of_frames): + Image.fromarray(img).save(f"{path}/{idx}.png") + + def save( + self, + save_path: str = "/opt/Neuro-Symbolic-Video-Frame-Search/artifacts", + ) -> None: + """Save the current instance to a pickle file.""" + import pickle + + """Save the current instance to a pickle file.""" + with open(save_path, "wb") as f: + pickle.dump(self, f) diff --git a/tlv_dataset/data/tlv_raw_image.py b/tlv_dataset/data/tlv_raw_image.py new file mode 100644 index 0000000..ea21453 --- /dev/null +++ b/tlv_dataset/data/tlv_raw_image.py @@ -0,0 +1,113 @@ +import dataclasses # noqa: D100 +import random +from pathlib import Path +from typing import Dict, List, Optional + +import cv2 +import numpy as np +import torch +from PIL import Image + +from tlv_dataset.common.frame_grouping import combine_consecutive_lists +from tlv_dataset.common.utility import get_file_or_dir_with_datetime + + +@dataclasses.dataclass +class TLVRawImage: + """Benchmark image frame class.""" + + unique_labels: list + labels: List[List[str]] + images: List[np.ndarray] + + def sample_image_from_label( + self, labels: list, proposition: list + ) -> np.ndarray: + """Sample image from label.""" + image_of_frame = [] + img_to_label = {} + img_to_label_list = {} + for prop in proposition: + # img_to_label[prop] = [i for i, value in enumerate(self.labels) if value == prop] + img_to_label[prop] = [ + i for i, value in enumerate(self.labels) if prop in value + ] + # img_to_label_list[tuple(sorted(proposition))] = [ + # i for i, value in enumerate(self.labels) if all(prop in value for prop in proposition) + # ] + + label_idx = 0 + for label in labels: + if label is None: + while True: + random_idx = random.randrange( + len(self.images) + ) # pick one random image with idx + val = [] + for single_label in self.labels[ + random_idx + ]: # go over all labels of the image + if single_label in proposition: + val.append(True) + if True not in val: + labels[label_idx] = single_label + image_of_frame.append(self.images[random_idx]) + break + else: + # lable available - just get the image + if isinstance(label, str): + # one label in the frame + random_idx = random.choice(img_to_label[label]) + # plt.imshow(self.images[random_idx]) + # plt.axis("off") + # plt.savefig("data_loader_sample_image.png") + image_of_frame.append(self.images[random_idx]) + elif isinstance(label, list): + img_to_label_list[tuple(sorted(label))] = [ + i + for i, value in enumerate(self.labels) + if all(prop in value for prop in label) + ] + random_idx = random.choice( + img_to_label_list[tuple(sorted(label))] + ) + image_of_frame.append(self.images[random_idx]) + label_idx += 1 + return labels, image_of_frame + + +@dataclasses.dataclass +class TLVRawImageDataset: + """Benchmark image frame class with a torchvision dataset for large datasets.""" + + unique_labels: list + labels: List[List[str]] + images: torch.utils.data.Dataset + + def sample_image_from_label( + self, labels: list, proposition: list + ) -> np.ndarray: + """Sample image from label.""" + image_of_frame = [] + img_to_label = {} + for prop in proposition: + # img_to_label[prop] = [i for i, value in enumerate(self.labels) if value == prop] + img_to_label[prop] = [ + i for i, value in enumerate(self.labels) if prop in value + ] + + label_idx = 0 + for label in labels: + if label is None: + while True: + random_idx = random.randrange(len(self.images)) + if self.labels[random_idx] not in proposition: + break + labels[label_idx] = self.labels[random_idx] + image_of_frame.append(self.images[random_idx][0]) + else: + random_idx = random.choice(img_to_label[label]) + image_of_frame.append(self.images[random_idx][0]) + + label_idx += 1 + return labels, image_of_frame diff --git a/tlv_dataset/generator/.DS_Store b/tlv_dataset/generator/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 any: + """Generate data.""" diff --git a/tlv_dataset/generator/real_tlv_generator.py b/tlv_dataset/generator/real_tlv_generator.py new file mode 100644 index 0000000..8c154ff --- /dev/null +++ b/tlv_dataset/generator/real_tlv_generator.py @@ -0,0 +1,291 @@ +from __future__ import annotations + +from _base import DataGenerator +import copy +import random +import re +from collections import Counter +from pathlib import Path + +from tlv_dataset.common.utility import load_pickle_to_dict, save_dict_to_pickle +from tlv_dataset.data import TLVDataset + + +class RealTLVGenerator(DataGenerator): + def __init__( + self, + dataloader, + save_dir: str, + unique_propositions: list = None, + tlv_data_dir: str = None, + ): + """LTL ground truth generator. + + Args: + dataloader (TLVImageLoader): Image data loader. + save_dir (str): Saving directory + unique_propositions (List): Unique/Rare propositions from the dataset. + If given, they will be used for a proposition after "until" operator. + e.g: non-unique U unique -> [non-unique,non-unique,non-unique,unique]. + + """ + self.tlv_data_dir = tlv_data_dir + if tlv_data_dir is not None: + self._tlv_data_dir = Path(tlv_data_dir) + self._tlv_file_path = list(self._data_dir.glob("*.pkl")) + self._dataloader = dataloader + self._unique_propositions = unique_propositions + self._save_dir = Path(save_dir) + self._save_dir.mkdir(parents=True, exist_ok=True) + + def generate(self): + """Run.""" + if self._tlv_data_dir is not None: + print(f"Total number of files: {len(self._all_files)}") + for file in self._all_files: + benchmark_frame: TLVDataset = load_pickle_to_dict(file) + self.generate_ltl_ground_truth(benchmark_frame) + else: + self._dataloader.loading_data( + generate_func=self.generate_ltl_ground_truth + ) + + def get_label_count(self, lst): + output = Counter() + + for item in lst: + if isinstance(item, list): + # If the item is a list, sort it, convert it to a tuple, and count occurrences + item_tuple = tuple(sorted(item)) + output[item_tuple] += 1 + else: + # If the item is not a list, simply count occurrences + output[item] += 1 + + # Convert Counter object to dictionary (optional) + return dict(output) + + def evaluate_unique_prop(self, unique_prop, lst): + for item in lst: + if isinstance(item, list): + if unique_prop in item: + return True + else: + if unique_prop == item: + return True + else: + return False + + def _class_map(self, prop): + map_dict = { + "vehicle": "car", + "pedestrian": "person", + "cyclist": "bicycle", + "sign": "traffic_sign", + } + return map_dict[prop] + + def f_prop1(self, prop1: str, lst: list[list]): + ltl_formula = f'F "{self._class_map(prop1)}"' + new_prop_set = [self._class_map(prop1)] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if prop1 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + return ltl_formula, new_prop_set, ltl_ground_truth + + def prop1_u_prop2(self, prop1: str, prop2: str, lst: list[list]): + ltl_formula = f'"{self._class_map(prop1)}" U "{self._class_map(prop2)}"' + new_prop_set = [self._class_map(prop1), self._class_map(prop2)] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if prop1 in label: + if len(tmp_ground_truth) == 0: + tmp_ground_truth.append(idx) + else: + if prop2 not in label: + tmp_ground_truth.append(idx) + if isinstance(label, list): + if prop2 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + + return ltl_formula, new_prop_set, ltl_ground_truth + + def prop1_and_prop2_u_prop3( + self, prop1: str, prop2: str, prop3: str, lst: list[list] + ): + ltl_formula = f'("{self._class_map(prop1)}" & "{self._class_map(prop2)}") U "{self._class_map(prop3)}"' + new_prop_set = [ + self._class_map(prop1), + self._class_map(prop2), + self._class_map(prop3), + ] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if all(elem in label for elem in [prop1, prop2]): + if len(tmp_ground_truth) == 0: + tmp_ground_truth.append(idx) + else: + if prop3 not in label: + tmp_ground_truth.append(idx) + + if isinstance(label, list): + if prop3 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + + # return here since we don't + return ltl_formula, new_prop_set, ltl_ground_truth + + def get_unique_propositions(self, labels_of_frames: list) -> list: + # TODO: select unique propositions based on data distribution. + label_count = self.get_label_count(labels_of_frames) + sorted_items = sorted(label_count.items(), key=lambda x: x[1]) + try: + if len(sorted_items) == 3: + unique_prpos = [sorted_items[0][0]] # First two unique values + highest_count_props = [ + sorted_items[1][0], + sorted_items[2][0], + ] # [sorted_items[-1][0], sorted_items[-2][0]] # First two unique values + elif len(sorted_items) > 3: + unique_prpos = [sorted_items[0][0], sorted_items[1][0]] + highest_count_props = [sorted_items[2][0], sorted_items[3][0]] + elif len(sorted_items) < 3: + unique_prpos = [sorted_items[0][0]] + highest_count_props = [sorted_items[1][0]] + else: + unique_prpos = [sorted_items[0][0]] + highest_count_props = [sorted_items[0][0]] + except IndexError: + unique_prop = sorted_items[0][0] + return unique_prop + + def generate_ltl_ground_truth( + self, + benchmark_frame: TLVDataset, + unique_prop_threshold: int = 25, + ): + ltl_formula = {} + label_counter = self.get_label_count(benchmark_frame.labels_of_frames) + if self._unique_propositions is not None: + unique_propositions = self._unique_propositions + else: + unique_propositions = self.get_unique_propositions() + + # Start. + if any( + unique_prop in benchmark_frame.proposition + for unique_prop in unique_propositions + ): + unique_prop = next( + ( + unique_prop + for unique_prop in unique_propositions + if unique_prop in benchmark_frame.proposition + ), + None, + ) + # F prop + # if unique props shows up too early, F prop + if self.evaluate_unique_prop( + unique_prop, + benchmark_frame.labels_of_frames[:unique_prop_threshold], + ): + ltl_formula, new_prop_set, frames_of_interest = self.f_prop1( + prop1=unique_prop, lst=benchmark_frame.labels_of_frames + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + else: + # prop1 u prop2 + label = benchmark_frame.proposition + ( + ltl_formula, + new_prop_set, + frames_of_interest, + ) = self.prop1_u_prop2( + prop1=random.choice(label.pop(label.index(unique_prop))), + prop2=unique_prop, + lst=benchmark_frame.labels_of_frames, + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + # prop1_and_prop2_u_prop3 + if len(benchmark_frame.proposition) > 2: + label = benchmark_frame.proposition + prop1 = random.choice(label.pop(label.index(unique_prop))) + prop2 = random.choice(label.pop(label.index(prop1))) + ( + ltl_formula, + new_prop_set, + frames_of_interest, + ) = self.prop1_and_prop2_u_prop3( + prop1=prop1, + prop2=prop2, + prop3=unique_prop, + lst=benchmark_frame.labels_of_frames, + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + + def label_mapping_function(self, lst): + new_label = [] + try: + for item in lst: + if isinstance(item, list): + multi_label = [] + for item_ in item: + multi_label.append(self._class_map(item_)) + new_label.append(multi_label) + else: + new_label.append(self._class_map(item)) + except KeyError: + return lst + return new_label + + def update_and_save_benchmark_frame( + self, + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame: TLVDataset, + save_dir, + ): + file_name = f"benchmark_waymo_ltl_{ltl_formula}_{len(benchmark_frame.images_of_frames)}_0.pkl" + benchmark_frame_ = copy.deepcopy(benchmark_frame) + benchmark_frame_.frames_of_interest = frames_of_interest + benchmark_frame_.ltl_formula = ltl_formula + benchmark_frame_.proposition = new_prop_set + benchmark_frame_.labels_of_frames = self.label_mapping_function( + benchmark_frame.labels_of_frames + ) + save_dict_to_pickle( + dict_obj=benchmark_frame_, path=save_dir, file_name=file_name + ) diff --git a/tlv_dataset/generator/synthetic_tlv_generator.py b/tlv_dataset/generator/synthetic_tlv_generator.py new file mode 100644 index 0000000..8ce5653 --- /dev/null +++ b/tlv_dataset/generator/synthetic_tlv_generator.py @@ -0,0 +1,455 @@ +from __future__ import annotations + +from _base import DataGenerator +import copy +import random +import re +from collections import Counter +from pathlib import Path + +from tlv_dataset.common.utility import load_pickle_to_dict, save_dict_to_pickle +from tlv_dataset.data import TLVDataset +from tlv_dataset.loader._base import BenchmarkImageLoader + + +class SyntheticTLVGenerator(DataGenerator): + """Benchmark video generator.""" + + def __init__( + self, + dataloader: BenchmarkImageLoader, + save_dir: str, + ): + """Benchmark video generator. + + Args: + dataloader (BenchmarkImageLoader): Image data loader. + """ + self._data_loader = dataloader + self.data = self._data_loader.data + self._save_dir = Path(save_dir) + self._save_dir.mkdir(parents=True, exist_ok=True) + + def extract_properties(self, s): + # Extract properties, the U operator, and the F operator + properties = re.findall(r'"([^"]*)"', s) + operators = re.findall(r"\bF\b|\bU\b|\bG\b\b!\b", s) + + result = [] + for i, prop in enumerate(properties): + if i < len(operators): + result.append(operators[i]) + result.append(prop) + return "".join(result) + + def sample_proposition( + self, + class_list: list[str], + is_prop_2: bool = False, + is_prop_3: bool = False, + is_and_conditional_op=False, + ) -> str: + """Sample proposition from class list.""" + if is_and_conditional_op: + assert is_prop_2 is True + while True: + sample = random.sample(self.data.labels, 1)[0] + if len(sample) > 1: + if not is_prop_3: + return random.sample(sample, 2) + [None] + else: + prop_1_and_2 = random.sample(sample, 2) + prop_3 = [random.sample(class_list, 2)[0]] + if prop_3[0] not in prop_1_and_2: + return prop_1_and_2 + prop_3 + else: + continue + if is_prop_2 and is_prop_3 is False: + return random.sample(class_list, 2) + [None] + elif is_prop_2 is False and is_prop_3: + return random.sample(class_list, 3) + else: + return [random.sample(class_list, 2)[0], None, None] + + def generate( + self, + initial_number_of_frame: int = 25, + max_number_frame: int = 200, + number_video_per_set_of_frame: int = 3, + increase_rate: int = 25, + ltl_logic: str = "F prop1", + temporal_property: str = "", + conditional_property: str = "", + save_frames: bool = False, + ) -> any: + """Generate data.""" + number_frame = initial_number_of_frame + is_prop_2 = False + is_prop_3 = False + is_and_conditional_op = False + for logic_component in ltl_logic.split(" "): + if logic_component in ["F", "G", "U"]: + temporal_property = logic_component + elif logic_component in ["prop2", "prop2)"]: + is_prop_2 = True + elif logic_component == "prop3": + is_prop_3 = True + elif logic_component in ["!", "&", "|"]: + conditional_property = logic_component + if conditional_property == "&": + is_and_conditional_op = True + + while number_frame < max_number_frame + 1: + for video_idx in range(number_video_per_set_of_frame): + proposition = self.sample_proposition( + class_list=self.data.unique_labels, + is_prop_2=is_prop_2, + is_prop_3=is_prop_3, + is_and_conditional_op=is_and_conditional_op, + ) + ltl_frame = self.ltl_function( + logic_component=ltl_logic.split(" "), + temporal_property=temporal_property, + proposition_1=proposition[0], + proposition_2=proposition[1], + proposition_3=proposition[2], + conditional_property=conditional_property, + number_of_frame=number_frame, + ) + ( + ltl_frame.labels_of_frames, + ltl_frame.images_of_frames, + ) = self.data.sample_image_from_label( + labels=ltl_frame.labels_of_frames, + proposition=ltl_frame.proposition, + ) + self.extract_properties(ltl_frame.ltl_formula) + ltl_frame.save( + save_path=self._save_dir + / f"benchmark_{self._data_loader.name}_ltl_{ltl_frame.ltl_formula}_{number_frame}_{video_idx}.pkl" + ) + + number_frame += increase_rate + if save_frames: + ltl_frame.save_frames() + + def generate_until_time_delta( + self, + initial_number_of_frame=5, + max_number_frame: int = 200, + number_video_per_set_of_frame: int = 3, + increase_rate: int = 1, + ltl_logic: str = "prop1 U prop2", + present_prop1_till_prop2: bool = False, + save_frames: bool = False, + ) -> any: + """Generate data.""" + number_frame = initial_number_of_frame + is_prop_2 = False + is_prop_3 = False + is_and_conditional_op = False + for logic_component in ltl_logic.split(" "): + if logic_component in ["F", "G", "U"]: + temporal_property = logic_component + elif logic_component in ["prop2", "prop2)"]: + is_prop_2 = True + elif logic_component == "prop3": + is_prop_3 = True + elif logic_component in ["!", "&", "|"]: + conditional_property = logic_component + if conditional_property == "&": + is_and_conditional_op = True + + while number_frame < max_number_frame + 1: + for video_idx in range(number_video_per_set_of_frame): + proposition = self.sample_proposition( + class_list=self.data.unique_labels, + is_prop_2=is_prop_2, + is_prop_3=is_prop_3, + is_and_conditional_op=is_and_conditional_op, + ) + ltl_frame = self.until_time_delta_ltl_function( + logic_component=ltl_logic.split(" "), + temporal_property=temporal_property, + proposition_1=proposition[0], + proposition_2=proposition[1], + proposition_3=proposition[2], + conditional_property=conditional_property, + number_of_frame=number_frame, + present_prop1_till_prop2=present_prop1_till_prop2, + ) + ( + ltl_frame.labels_of_frames, + ltl_frame.images_of_frames, + ) = self.data.sample_image_from_label( + labels=ltl_frame.labels_of_frames, + proposition=ltl_frame.proposition, + ) + self.extract_properties(ltl_frame.ltl_formula) + ltl_frame.save( + save_path=self._save_dir + / f"timedelta_{number_frame - (initial_number_of_frame - 1)}_benchmark_{self._data_loader.name}_ltl_{ltl_frame.ltl_formula}_{number_frame}_{video_idx}.pkl" + ) + + number_frame += increase_rate + if save_frames: + ltl_frame.save_frames() + + def generate_unique_random_indices_within_range(self, start, end): + """Generate a random sequence of unique indices between 'start' and 'end'. + The length of the sequence will be random. + """ + # Determine the length of the sequence (at most the number of elements in the range) + sequence_length = random.randint(1, end - start + 1) + # Generate the sequence + return random.sample(range(start, end + 1), sequence_length) + + def until_time_delta_ltl_function( + self, + logic_component: list[str], + proposition_1: str, + temporal_property: str, + number_of_frame: int, + proposition_2: str | None = None, + proposition_3: str | None = None, + conditional_property: str = "", + present_prop1_till_prop2: bool = False, + initial_number_of_frame=5, + ) -> TLVDataset: + labels_of_frame = [None] * number_of_frame + time_delta = number_of_frame - (initial_number_of_frame - 1) + temp_frames_of_interest = [] + proposition_set = [proposition_1, proposition_2] + ltl_formula = f'"{proposition_1}" U "{proposition_2}"' + + prop2_placeholder = 0 + time_delta + # temp_frames_of_interest.append(0) + labels_of_frame[0] = proposition_1 + # temp_frames_of_interest.append(prop2_placeholder) + labels_of_frame[prop2_placeholder] = proposition_2 + + prop1_range = self.generate_unique_random_indices_within_range( + start=0, end=prop2_placeholder - 1 + ) + # labels_of_frame[0] = proposition_1 + if present_prop1_till_prop2: + prop1_range = range(0, prop2_placeholder - 1) + prop2_range = self.generate_unique_random_indices_within_range( + start=prop2_placeholder + 1, end=number_of_frame - 1 + ) + + p1_indexes = [0] + for p1 in prop1_range: + p1_indexes.append(p1) + labels_of_frame[p1] = proposition_1 + + p2_indexes = [prop2_placeholder] + for p2 in prop2_range: + p2_indexes.append(p2) + labels_of_frame[p2] = proposition_2 + + # p1_random_idx = sorted(p1_random_idx) + p1_indexes = sorted(list(set(p1_indexes))) + temp_frames_of_interest.append(p1_indexes + [prop2_placeholder]) + for p2_idx in p2_indexes: + if p2_idx not in temp_frames_of_interest[0]: + temp_frames_of_interest.append([p2_idx]) + + return TLVDataset( + ground_truth=True, + ltl_formula=ltl_formula, + proposition=proposition_set, + number_of_frame=number_of_frame, + frames_of_interest=temp_frames_of_interest, + labels_of_frames=labels_of_frame, + ) + + def ltl_function( + self, + logic_component: list[str], + proposition_1: str, + temporal_property: str, + number_of_frame: int, + proposition_2: str | None = None, + proposition_3: str | None = None, + conditional_property: str = "", + ) -> TLVDataset: + """LTL function. + + Args: + proposition_1 (str): Proposition 1. + temporal_property (str): Temporal property. + - F: eventuallyProperty + - G: alwaysProperty + - U: untilProperty + number_of_frame (int): Number of frame. + proposition_2 (str, optional): Proposition 2. Defaults to None. + conditional_property (str): Conditional property. + - &: andProperty + - |: orProperty + - !: notProperty. + """ + labels_of_frame = [None] * number_of_frame + temp_frames_of_interest = [] + proposition_set = [] + random_frame_idx_selection = sorted( + list( + set( + [ + random.randint(0, number_of_frame - 3) + for _ in range(int(number_of_frame / 5)) + ] + ) + ) + ) # number_of_frame - 3 to avoid random.range(same_idx,same_idx) + # Single Rule + if proposition_2 is not None: + if proposition_3 is not None: + proposition_set = [proposition_1, proposition_2, proposition_3] + else: + proposition_set = [proposition_1, proposition_2] + if temporal_property == "U": + assert ( + proposition_2 is not None + ), "proposition 2 must be not None" + u_index = logic_component.index("U") + pre_u_index = logic_component[u_index - 1] + post_u_index = logic_component[u_index + 1] + if post_u_index == "prop2": + # TODO: F & G... + ltl_formula = f'"{proposition_1}" {temporal_property} "{proposition_2}"' + post_u_label_idx = [] + for idx in list(set(random_frame_idx_selection)): + temp_frames_of_interest.append(idx) + labels_of_frame[idx] = proposition_1 + if len(post_u_label_idx) > 0: + if idx >= post_u_label_idx[-1]: + prop2_idx = random.randrange( + idx + 1, number_of_frame - 1 + ) + temp_frames_of_interest.append(prop2_idx) + post_u_label_idx.append(prop2_idx) + labels_of_frame[prop2_idx] = proposition_2 + else: + prop2_idx = random.randrange( + idx + 1, number_of_frame - 1 + ) + temp_frames_of_interest.append(prop2_idx) + post_u_label_idx.append(prop2_idx) + labels_of_frame[prop2_idx] = proposition_2 + + temp_frames_of_interest = list(set(temp_frames_of_interest)) + for i, temp_label in enumerate(temp_frames_of_interest): + if temp_label not in post_u_label_idx: + if temp_label > max(post_u_label_idx): + labels_of_frame[temp_label] = None + temp_frames_of_interest.pop( + temp_frames_of_interest.index(temp_label) + ) + + if proposition_2 not in labels_of_frame: + labels_of_frame[-1] = proposition_2 + temp_frames_of_interest.append(len(labels_of_frame) - 1) + temp_frames_of_interest = list(set(temp_frames_of_interest)) + + elif pre_u_index == "prop2" and post_u_index == "prop3": + # TODO + pass + elif pre_u_index == "prop2)" and post_u_index == "prop3": + ltl_formula = f'("{proposition_1}" {logic_component[logic_component.index(pre_u_index)-1]} "{proposition_2}") U "{proposition_3}"' + post_u_label_idx = [] + for idx in list(set(random_frame_idx_selection)): + temp_frames_of_interest.append(idx) + labels_of_frame[idx] = [proposition_1, proposition_2] + if len(post_u_label_idx) > 0: + if idx >= post_u_label_idx[-1]: + prop3_idx = random.randrange( + idx + 1, number_of_frame - 1 + ) + temp_frames_of_interest.append(prop3_idx) + post_u_label_idx.append(prop3_idx) + labels_of_frame[prop3_idx] = proposition_3 + else: + prop3_idx = random.randrange( + idx + 1, number_of_frame - 1 + ) + temp_frames_of_interest.append(prop3_idx) + post_u_label_idx.append(prop3_idx) + labels_of_frame[prop3_idx] = proposition_3 + + temp_frames_of_interest = list(set(temp_frames_of_interest)) + for i, temp_label in enumerate(temp_frames_of_interest): + if temp_label not in post_u_label_idx: + if temp_label > max(post_u_label_idx): + labels_of_frame[temp_label] = None + temp_frames_of_interest.pop( + temp_frames_of_interest.index(temp_label) + ) + + if proposition_3 not in labels_of_frame: + labels_of_frame[-1] = proposition_3 + temp_frames_of_interest.append(len(labels_of_frame) - 1) + temp_frames_of_interest = list(set(temp_frames_of_interest)) + + else: + assert ( + conditional_property is not None + ), "conditional_property must be not None" + ltl_formula = f'{temporal_property} "{proposition_1}" {conditional_property} "{proposition_2}"' + + for idx in list(set(random_frame_idx_selection)): + temp_frames_of_interest.append([idx]) + labels_of_frame[idx] = proposition_set + + else: + proposition_set.append(proposition_1) + if conditional_property == "": + ltl_formula = f'{temporal_property} "{proposition_1}"' + else: + assert ( + conditional_property == "!" + ), "conditional_property must be ! with one proposition" + ltl_formula = f'{temporal_property} {conditional_property} "{proposition_1}"' + # 1. F "prop1" + frame_index = [] + if temporal_property == "F": + frame_index = [ + random.randint(0, number_of_frame - 1) + for _ in range(int(number_of_frame / 5)) + ] + for idx in list(set(frame_index)): + temp_frames_of_interest.append([idx]) + labels_of_frame[idx] = proposition_1 + elif temporal_property == "G": + frame_index = [ + random.randint(0, number_of_frame - 1) + for _ in range(int(number_of_frame / 10)) + ] + for idx in list(set(frame_index)): + min_idx = idx - int(number_of_frame / 10) + if min_idx < 0: + min_idx = 0 + if min_idx == idx: + idx = min_idx + 1 + for sub_idx in list(range(min_idx, idx)): + if sub_idx not in temp_frames_of_interest: + temp_frames_of_interest.append(sub_idx) + labels_of_frame[sub_idx] = proposition_1 + if conditional_property == "!": + temp_frames_of_interest = [ + x + for x in range(len(labels_of_frame)) + if x not in temp_frames_of_interest + ] + + # 2. G "prop1" + + # TODO: Make a false case + return TLVDataset( + ground_truth=True, + ltl_formula=ltl_formula, + proposition=proposition_set, + number_of_frame=number_of_frame, + frames_of_interest=temp_frames_of_interest, + labels_of_frames=labels_of_frame, + ) diff --git a/tlv_dataset/label_mapper/__init__.py b/tlv_dataset/label_mapper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tlv_dataset/label_mapper/metadata/imagenet_to_coco.py b/tlv_dataset/label_mapper/metadata/imagenet_to_coco.py new file mode 100644 index 0000000..4fba3fd --- /dev/null +++ b/tlv_dataset/label_mapper/metadata/imagenet_to_coco.py @@ -0,0 +1,160 @@ +MAPPER_METADATA = { + "person": ["groom", "scuba_diver", "ballplayer"], + "bicycle": ["mountain_bike"], + "car": [ + "sports_car", + "limousine", + "racer", + "convertible", + "jeep", + "cab", + "Model_T", + "amphibian", + "trailer_truck", + "fire_engine", + "garbage_truck", + "tow_truck", + ], + "airplane": ["aircraft_carrier", "airliner", "warplane"], + "bus": ["school_bus", "trolleybus"], + "train": ["steam_locomotive"], + "truck": [ + "trailer_truck", + "fire_engine", + "garbage_truck", + "moving_van", + "tow_truck", + "tank", + ], + "boat": [ + "canoe", + "catamaran", + "yawl", + "gondola", + "paddlewheel", + "schooner", + "pirate", + "airship", + "submarine", + ], + "traffic_light": ["traffic_light"], + "fire_hydrant": [], + "stop_sign": ["street_sign"], + "parking_meter": ["parking_meter"], + "bench": ["park_bench"], + "bird": [ + "cock", + "hen", + "ostrich", + "brambling", + "goldfinch", + "robin", + "bulbul", + "jay", + "magpie", + "chickadee", + "water_ouzel", + "kite", + "bald_eagle", + "vulture", + "great_grey_owl", + "European_fire_salamander", + "common_newt", + "eft", + ], + "cat": [ + "Egyptian_cat", + "tiger_cat", + "Persian_cat", + "Siamese_cat", + "tabby", + "lynx", + ], + "dog": [ + "Chihuahua", + "Japanese_spaniel", + "Maltese_dog", + "Pekinese", + "Shih-Tzu", + "Blenheim_spaniel", + "papillon", + "toy_terrier", + "Rhodesian_ridgeback", + "Afghan_hound", + "basset", + "beagle", + "bloodhound", + "bluetick", + ], + "horse": ["sorrel", "zebra", "Appenzeller"], + "sheep": [ + "ram", + "bighorn", + "ibex", + "impala", + "gazelle", + "Arabian_camel", + "llama", + ], + "cow": ["ox", "water_buffalo", "bison"], + "elephant": ["Indian_elephant", "African_elephant"], + "bear": ["brown_bear", "American_black_bear", "ice_bear", "sloth_bear"], + "zebra": ["zebra"], + "giraffe": ["giraffe"], + "backpack": ["backpack"], + "umbrella": ["umbrella"], + "handbag": ["handbag"], + "tie": ["tie", "Windsor_tie"], + "suitcase": ["suitcase"], + "frisbee": [], + "skis": [], + "snowboard": [], + "sports_ball": ["soccer_ball", "rugby_ball", "tennis_ball", "basketball"], + "kite": ["kite"], + "baseball_bat": [], + "baseball_glove": [], + "skateboard": [], + "surfboard": [], + "tennis_racket": [], + "bottle": ["water_bottle", "beer_bottle", "wine_bottle", "pop_bottle"], + "wine_glass": [], + "cup": ["coffee_mug", "cup"], + "fork": [], + "knife": [], + "spoon": [], + "bowl": ["soup_bowl"], + "banana": ["banana"], + "apple": ["Granny_Smith"], + "sandwich": [], + "orange": ["orange"], + "broccoli": ["broccoli"], + "carrot": [], + "hot_dog": ["hotdog"], + "pizza": ["pizza"], + "donut": [], + "cake": [], + "chair": ["rocking_chair", "folding_chair"], + "couch": [], + "potted_plant": [], + "bed": ["four-poster"], + "dining_table": ["dining_table"], + "toilet": ["toilet_seat"], + "tv": ["television"], + "laptop": ["laptop"], + "mouse": ["mouse"], + "remote": ["remote_control"], + "keyboard": ["computer_keyboard", "typewriter_keyboard"], + "cell_phone": ["cellular_telephone"], + "microwave": ["microwave"], + "oven": [], + "toaster": ["toaster"], + "sink": [], + "refrigerator": ["refrigerator"], + "book": ["book_jacket", "comic_book"], + "clock": ["wall_clock", "analog_clock", "digital_clock"], + "vase": ["vase"], + "scissors": [], + "teddy_bear": ["teddy"], + "hair_drier": [], + "toothbrush": ["toothbrush"], +} diff --git a/tlv_dataset/label_mapper/metadata/nuscenes_to_coco.py b/tlv_dataset/label_mapper/metadata/nuscenes_to_coco.py new file mode 100644 index 0000000..74ac3a6 --- /dev/null +++ b/tlv_dataset/label_mapper/metadata/nuscenes_to_coco.py @@ -0,0 +1,35 @@ +MAPPER_METADATA = { + "animal": None, # Assuming a generic animal category + "human.pedestrian.adult": "person", + "human.pedestrian.child": "person", + "human.pedestrian.construction_worker": "person", + "human.pedestrian.personal_mobility": "person", + "human.pedestrian.police_officer": "person", + "human.pedestrian.stroller": "person", + "human.pedestrian.wheelchair": "person", + "movable_object.barrier": None, # No direct equivalent + "movable_object.debris": None, # No direct equivalent + "movable_object.pushable_pullable": None, # No direct equivalent + "movable_object.trafficcone": None, # No direct equivalent + "static_object.bicycle_rack": None, # Possibly 'bicycle' if the bikes are significant in the image + "vehicle.bicycle": "bicycle", + "vehicle.bus.bendy": "bus", + "vehicle.bus.rigid": "bus", + "vehicle.car": "car", + "vehicle.construction": "car", # Broadly categorizing as 'truck' + "vehicle.emergency.ambulance": "car", # Or 'car', depending on size/shape + "vehicle.emergency.police": "car", + "vehicle.motorcycle": "motorcycle", + "vehicle.trailer": "truck", + "vehicle.truck": "truck", + # Additional mappings for other labels + "flat.driveable_surface": None, # No direct equivalent + "flat.other": None, # No direct equivalent + "flat.sidewalk": None, # No direct equivalent + "flat.terrain": None, # No direct equivalent + "static.manmade": None, # No direct equivalent, but could consider 'building' if part of your labels + "static.other": None, # No direct equivalent + "static.vegetation": None, # Could be 'tree' or 'plant' if those are part of your labels + "vehicle.ego": "car", # Assuming this is the ego vehicle (the car with the sensor/camera) + "noise": None, # No direct equivalent +} diff --git a/tlv_dataset/label_mapper/metadata/waymo_to_coco.py b/tlv_dataset/label_mapper/metadata/waymo_to_coco.py new file mode 100644 index 0000000..c4ef0d4 --- /dev/null +++ b/tlv_dataset/label_mapper/metadata/waymo_to_coco.py @@ -0,0 +1,6 @@ +MAPPER_METADATA = { + "vehicle": "car", + "pedestrian": "person", + "cyclist": "bicycle", + "sign": None, +} diff --git a/tlv_dataset/loader/__init__.py b/tlv_dataset/loader/__init__.py new file mode 100644 index 0000000..3c1243c --- /dev/null +++ b/tlv_dataset/loader/__init__.py @@ -0,0 +1,84 @@ +"""ns_vfs package.""" + +LABEL_OF_INTEREST = [ + "person", + "bicycle", + "car", + "motorcycle", + "airplane", + "bus", + "train", + "truck", + "boat", + "traffic light", + "fire hydrant", + "stop sign", + "parking meter", + "bench", + "bird", + "cat", + "dog", + "horse", + "sheep", + "cow", + "elephant", + "bear", + "zebra", + "giraffe", + "backpack", + "umbrella", + "handbag", + "tie", + "suitcase", + "frisbee", + "skis", + "snowboard", + "sports ball", + "kite", + "baseball bat", + "baseball glove", + "skateboard", + "surfboard", + "tennis racket", + "bottle", + "wine glass", + "cup", + "fork", + "knife", + "spoon", + "bowl", + "banana", + "apple", + "sandwich", + "orange", + "broccoli", + "carrot", + "hot dog", + "pizza", + "donut", + "cake", + "chair", + "couch", + "potted plant", + "bed", + "dining table", + "toilet", + "tv", + "laptop", + "mouse", + "remote", + "keyboard", + "cell phone", + "microwave", + "oven", + "toaster", + "sink", + "refrigerator", + "book", + "clock", + "vase", + "scissors", + "teddy bear", + "hair drier", + "toothbrush", +] diff --git a/tlv_dataset/loader/_base.py b/tlv_dataset/loader/_base.py new file mode 100644 index 0000000..a763cc3 --- /dev/null +++ b/tlv_dataset/loader/_base.py @@ -0,0 +1,22 @@ +from __future__ import annotations +from tlv_dataset.data import TLVRawImage +import abc + + +class DataLoader(abc.ABC): + """Data Loader Abstract Class.""" + + @abc.abstractmethod + def load_data(self, image_path) -> any: + """Load raw image from image_path.""" + + +class TLVImageLoader(DataLoader): + """Benchmark image loader.""" + + class_labels: list + data: TLVRawImage + + @abc.abstractmethod + def process_data(self, raw_data) -> any: + """Process raw data to BenchmarkRawImage Data Class.""" diff --git a/tlv_dataset/loader/cifar.py b/tlv_dataset/loader/cifar.py new file mode 100644 index 0000000..fa10232 --- /dev/null +++ b/tlv_dataset/loader/cifar.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +from pathlib import Path + +import matplotlib.pyplot as plt +import numpy as np + +from tlv_dataset.data import TLVRawImage +from tlv_dataset.loader import LABEL_OF_INTEREST + +from ._base import TLVImageLoader + + +class Cifar10ImageLoader(TLVImageLoader): + """Load CIFAR 10 image data from file.""" + + def __init__( + self, + cifar_dir_path: str, + batch_id: int | str = 1, + ): + """Load CIFAR image data from file. + + Args: + cifar_dir_path (str): Path to CIFAR image file. + Your cifar_dir_path must be the same as official CIFAR dataset. + website: http://www.cs.toronto.edu/~kriz/cifar.html + batch_id (int | str, optional): Batch ID. Defaults to 1. + If "all", load all batches. + """ + self.name = "CIFAR10" + self._cifar_dir_path = Path(cifar_dir_path) + self._batch_id = f"data_batch_{batch_id}" + self._original_class_labels = [ + "airplane", + "automobile", + "bird", + "cat", + "deer", + "dog", + "frog", + "horse", + "ship", + "truck", + ] + self.class_labels = [ + label + for label in self._original_class_labels + if label in LABEL_OF_INTEREST + ] + self.data: TLVRawImage = self.process_data( + raw_data=self.load_data( + data_path=self._cifar_dir_path / self._batch_id + ) + ) + + def process_data(self, raw_data) -> any: + """Process raw data to TLVRawImage Data Class.""" + class_labels = list( + map(lambda x: [self._original_class_labels[x]], raw_data[b"labels"]) + ) + data = raw_data[b"data"].reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1) + plt.imshow(data[4]) + plt.savefig("test_1.png") + return TLVRawImage( + unique_labels=self.class_labels, labels=class_labels, images=data + ) + + def load_data(self, data_path, load_all_batch: bool = False) -> dict: + """Load CIFAR image data from file. + + Args: + data_path (str): Path to CIFAR image file. + Your cifar_dir_path must be the same as official CIFAR dataset. + load_all_batch (bool, optional): Whether to load all batches. Defaults to False. + + Returns: + any: CIFAR image data. + """ + if self._batch_id == "data_batch_all" or load_all_batch: + # TODO: Concatenate all batches + pass + else: + data = self.unpickle(file=data_path) + return data + + def unpickle(self, file): + """Unpickle CIFAR image data.""" + import pickle + + with open(file, "rb") as fo: + dict = pickle.load(fo, encoding="bytes") + return dict + + +class Cifar100ImageLoader(TLVImageLoader): + """Load CIFAR 10 image data from file.""" + + def __init__( + self, + cifar_dir_path: str, + batch_id: int | str = 1, + ): + """Load CIFAR image data from file. + + Args: + cifar_dir_path (str): Path to CIFAR image file. + Your cifar_dir_path must be the same as official CIFAR dataset. + website: http://www.cs.toronto.edu/~kriz/cifar.html + batch_id (int | str, optional): Batch ID. Defaults to 1. + If "all", load all batches. + """ + self.name = "CIFAR100" + self._cifar_dir_path = Path(cifar_dir_path) / "train" + self._raw_data = self.load_data(data_path=self._cifar_dir_path) + self._original_class_labels = self.get_original_class_labels() + self.class_labels = self.class_labels = [ + label + for label in self._original_class_labels + if label in LABEL_OF_INTEREST + ] # YOLO labels + + self.data: TLVRawImage = self.process_data(raw_data=self._raw_data) + + def get_original_class_labels(self): + meta_file = f"{self._cifar_dir_path.parent}/meta" + meta_data = self.unpickle(meta_file) + raw = meta_data[b"fine_label_names"] + return [x.decode("utf-8") for x in raw] + + def process_data(self, raw_data) -> any: + """Process raw data to TLVRawImage Data Class.""" + class_labels = list( + map( + lambda x: [self._original_class_labels[x]], + raw_data[b"fine_labels"], + ) + ) + # class_labels: + # image_idx_1: "label" + # image_idx_2: "label" + data = np.array(raw_data[b"data"]) + data = data.reshape(len(data), 3, 32, 32).transpose(0, 2, 3, 1) + plt.imshow(data[5]) + plt.savefig("data_loader_sample_image.png") + return TLVRawImage( + unique_labels=self.class_labels, labels=class_labels, images=data + ) + + def load_data(self, data_path) -> dict: + """Load CIFAR image data from file. + + Args: + data_path (str): Path to CIFAR image file. + Your cifar_dir_path must be the same as official CIFAR dataset. + load_all_batch (bool, optional): Whether to load all batches. Defaults to False. + + Returns: + any: CIFAR image data. + """ + return self.unpickle(file=data_path) + + def unpickle(self, file): + """Unpickle CIFAR image data.""" + import pickle + + with open(file, "rb") as fo: + dict = pickle.load(fo, encoding="bytes") + return dict diff --git a/tlv_dataset/loader/coco.py b/tlv_dataset/loader/coco.py new file mode 100644 index 0000000..dcce284 --- /dev/null +++ b/tlv_dataset/loader/coco.py @@ -0,0 +1,134 @@ +from pathlib import Path + +import cv2 +import matplotlib.pyplot as plt +from pycocotools.coco import COCO + +from tlv_dataset.data import TLVRawImage +from tlv_dataset.loader import LABEL_OF_INTEREST +from tlv_dataset.loader._base import TLVImageLoader + + +class COCOImageLoader(TLVImageLoader): + """Load COCO image data.""" + + def __init__( + self, coco_dir_path: str, annotation_file: str, image_dir: str + ): + """Load COCO image data from file. + + Args: + - coco_dir_path (str): Path to COCO dataset directory. + - annotation_file (str): Name of the annotation file. + - image_dir (str): Name of the image directory. + """ + self.name = "COCO" + self._coco_dir_path = Path(coco_dir_path) + self._annotation_file = self._coco_dir_path / annotation_file + self._image_dir = self._coco_dir_path / image_dir + self._coco = COCO(self._annotation_file) + self._original_class_labels = [ + cat["name"].replace(" ", "_") for cat in self._coco.cats.values() + ] + self.class_labels = [ + label.replace(" ", "_") + for label in self._original_class_labels + if label in LABEL_OF_INTEREST + ] + self.images, self.annotations = self.load_data() + self.data: TLVRawImage = self.process_data() + + def load_data(self): + """Load COCO image and annotation data. + + Returns: + - images (list): A list of image file paths. + - annotations (list): A list of annotation data corresponding to the images. + """ + img_ids = self._coco.getImgIds() + images = [ + self._image_dir / self._coco.loadImgs(id)[0]["file_name"] + for id in img_ids + ] + annotations = [ + self._coco.loadAnns(self._coco.getAnnIds(imgIds=id)) + for id in img_ids + ] + + return images, annotations + + def process_data(self) -> TLVRawImage: + """Process raw COCO data to TLVRawImage Data Class.""" + img_ids = self._coco.getImgIds() + images = [] + class_labels = [] + # class_labels: + # image_idx_1: ["label_1, label_2, ..."] + # image_idx_2: ["label_1, label_2, ..."] + for id in img_ids: + images.append( + cv2.imread( + str( + self._image_dir + / self._coco.loadImgs(id)[0]["file_name"] + ) + )[:, :, ::-1] + ) # Read it as RGB + annotation = self._coco.loadAnns(self._coco.getAnnIds(imgIds=id)) + labels_per_image = [] + for i in range(len(annotation)): + labels_per_image.append( + self._coco.cats[annotation[i]["category_id"]][ + "name" + ].replace(" ", "_") + ) + unique_labels = list(set(labels_per_image)) + if len(unique_labels) == 0: + images.pop() + else: + val = [] + for l in list(set(labels_per_image)): + if l not in self.class_labels: + val.append(False) + if False in val: + images.pop() + else: + class_labels.append(list(set(labels_per_image))) + assert len(images) == len(class_labels) + + # Plot a sample image + # for i in range(10): + # plt.imshow(images[i]) + # plt.axis("off") + # plt.savefig(f"{i}_data_loader_sample_image.png") + + return TLVRawImage( + unique_labels=self.class_labels, labels=class_labels, images=images + ) + + def display_sample_image(self, index: int): + """Display a sample image with annotations. + + Args: + - index (int): The index of the sample image in the dataset. + """ + image_path = str(self.images[index]) + image = cv2.imread(image_path) + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + ann_ids = self._coco.getAnnIds(imgIds=self._coco.getImgIds()[index]) + anns = self._coco.loadAnns(ann_ids) + plt.imshow(image) + plt.axis("off") + self._coco.showAnns(anns) + plt.show() + + +# # Example usage: +# coco_loader = COCOImageLoader( +# coco_dir_path="/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/data/benchmark_image_dataset/coco", +# annotation_file="annotations/instances_val2017.json", +# image_dir="val2017", +# ) + +# # Display a sample image +# coco_loader.display_sample_image(0) diff --git a/tlv_dataset/loader/imagenet.py b/tlv_dataset/loader/imagenet.py new file mode 100644 index 0000000..3c57b80 --- /dev/null +++ b/tlv_dataset/loader/imagenet.py @@ -0,0 +1,281 @@ +from __future__ import annotations + +import os + +import cv2 +import matplotlib.pyplot as plt +import numpy as np +from torch.utils.data import Dataset + +from tlv_dataset.data import TLVRawImageDataset + +from ._base import TLVImageLoader +from .meta_to_imagenet import META_TO_IMAGENET, filter + + +class ImageNetDS(Dataset): + """ImageNet Dataset.""" + + mapping_number_to_class: dict + class_mapping_dict: dict + class_mapping_dict_number: dict + mapping_class_to_number: dict + classname_classid: dict + + metaclass_imagenetclass: dict + imagenetclass_metaclass: dict + valid_classes: list + + # TODO: Use the filtered meta to imagenet to rd + def __init__( + self, + imagenet_dir_path: str, + type: str = "train", + batch_id: int | str = 1, + target_size: tuple = (224, 224, 3), + map_to_cocometa: bool = False, + ): + """Load ImageNet dataset from file.""" + self.imagenet_path = imagenet_dir_path + mapping_path = imagenet_dir_path + "/LOC_synset_mapping.txt" + + self.class_mapping_dict = {} + self.class_mapping_dict_number = {} + self.mapping_class_to_number = {} + self.mapping_number_to_class = {} + self.classname_classid = {} + i = 0 + + for line in open(mapping_path): + class_name = line[9:].strip().split(", ")[0] + class_name = class_name.replace(" ", "_") + self.class_mapping_dict[line[:9].strip()] = class_name + self.class_mapping_dict_number[i] = class_name + + if class_name in self.classname_classid.keys(): + self.classname_classid[class_name].append(line[:9].strip()) + else: + self.classname_classid[class_name] = [line[:9].strip()] + + self.mapping_class_to_number[line[:9].strip()] = i + self.mapping_number_to_class[i] = line[:9].strip() + i += 1 + + self.length_dataset = 0 + self.image_path = imagenet_dir_path + "/Data/CLS-LOC/" + type + "/" + self._num_images_per_class = {} + + for root in self.class_mapping_dict.keys(): + files = os.listdir(self.image_path + root) + self.length_dataset += len(files) + self._num_images_per_class[root] = len(files) + + self.target_size = target_size + + print( + "loaded imagenet dataset ({}) with {} images and {} classes: ".format( + type, self.length_dataset, len(self.class_mapping_dict.keys()) + ) + ) + + self.map_to_cocometa = map_to_cocometa + + if self.map_to_cocometa: + # Mapped dataset with respect to COCO metaclasses + self.map_data() + print( + "After mapping imagenet dataset({}), we have {} images and {} classes: ".format( + type, + self.length_dataset, + len(self.metaclass_imagenetclass.keys()), + ) + ) + + @property + def class_counts(self): + if self.map_to_cocometa: + return self._num_images_per_metaclass + elif self._num_images_per_class: + # remap keys to classnames + return { + self.class_mapping_dict[k]: v + for k, v in self._num_images_per_class.items() + } + + def map_data(self): + """Map data meta data on to the imagenet classes.""" + filtered_meta_data = filter( + META_TO_IMAGENET, self.imagenet_path + "/LOC_synset_mapping.txt" + ) + + self.metaclass_imagenetclass = filtered_meta_data + self.imagenetclass_metaclass = {} + self.length_dataset = 0 + self._num_images_per_metaclass = {} + for key, val in self.metaclass_imagenetclass.items(): + num_images = 0 + for v in val: + v_id = self.classname_classid[v][0] + self.imagenetclass_metaclass[v] = key + self.length_dataset += self._num_images_per_class[v_id] + num_images += self._num_images_per_class[v_id] + + self._num_images_per_metaclass[key] = num_images + # From the mapped data evalaute the length of the dataset + + def __getitem__(self, index): + """Get item from dataset.""" + index_copy = index + if not self.map_to_cocometa: + # Find the class ID where the index is located + class_id = 0 + while ( + index + >= self._num_images_per_class[ + self.mapping_number_to_class[class_id] + ] + ): + index -= self._num_images_per_class[ + self.mapping_number_to_class[class_id] + ] + class_id += 1 + # Find the image ID within the class + class_name = self.mapping_number_to_class[class_id] + image_id = os.listdir(self.image_path + class_name)[index] + # Load the image + image = plt.imread(self.image_path + class_name + "/" + image_id) + # Convert to RGB if grayscale + if len(image.shape) == 2: + image = np.repeat(image[:, :, np.newaxis], 3, axis=2) + # Resize + image = cv2.resize(image, self.target_size[:2]) + + return image, class_id + else: + # Obtain the metaclass where the index is located + metaclass_id = 0 + metaclassname = list(self.metaclass_imagenetclass.keys())[ + metaclass_id + ] + cum_count = 0 + while index >= self._num_images_per_metaclass[metaclassname]: + index -= self._num_images_per_metaclass[metaclassname] + cum_count += self._num_images_per_metaclass[metaclassname] + metaclass_id += 1 + metaclassname = list(self.metaclass_imagenetclass.keys())[ + metaclass_id + ] + + imagenet_class_for_metaclass = self.metaclass_imagenetclass[ + metaclassname + ] + index = index_copy - cum_count + + class_id_val = 0 + class_id = self.classname_classid[ + imagenet_class_for_metaclass[class_id_val] + ][0] + while index >= self._num_images_per_class[class_id]: + index -= self._num_images_per_class[class_id] + class_id_val += 1 + class_id = self.classname_classid[ + imagenet_class_for_metaclass[class_id_val] + ][0] + + image_id = os.listdir(self.image_path + class_id)[index] + + image = plt.imread(self.image_path + class_id + "/" + image_id) + # Convert to RGB if grayscale + if len(image.shape) == 2: + image = np.repeat(image[:, :, np.newaxis], 3, axis=2) + # Resize + image = cv2.resize(image, self.target_size[:2]) + + return image, metaclass_id + + def __len__(self): + """Get the length of the dataset.""" + return self.length_dataset + + def class_to_class_number(self, id): + """Get the class number from the class ID.""" + return self.mapping_class_to_number[id] + + def class_number_to_class(self, id): + """Get the class ID from the class number.""" + return self.mapping_number_to_class[id] + + def class_number_to_class_name(self, id): + """Get the class name from the class ID.""" + return self.class_mapping_dict_number[id] + + def class_to_class_name(self, id): + """Get the class name from the class ID.""" + return self.class_mapping_dict[id] + + @property + def classnames(self): + if self.map_to_cocometa: + keys = [] + for k, v in self._num_images_per_metaclass.items(): + if v != 0: + keys.append(k) + return keys + else: + return list(self.class_mapping_dict.keys()) + + +class ImageNetDataloader(TLVImageLoader): + """Load ImageNet dataset from file.""" + + def __init__( + self, + imagenet_dir_path: str, + batch_id: int | str = 1, + ): + """Load ImageNet dataset from file.""" + # Create an imagenet dataset + self.name = "ImageNet2017-1K" + self.imagenet = ImageNetDS(imagenet_dir_path, map_to_cocometa=True) + # Get text labels from metadata + self.class_labels = list(self.imagenet.classnames) + self.data: TLVRawImageDataset = self.process_data( + raw_data=self.load_data() + ) + + def load_data(self) -> dict: + """Load the labels of the data + Returns: + any: dataset. + """ + labels = [0 for _ in range(len(self.imagenet))] + mapped_labels = [0 for _ in range(len(self.imagenet))] + cum_count = 0 + for idx, (class_, count) in enumerate( + self.imagenet.class_counts.items() + ): + cum_count += count + for j in range(cum_count - count, cum_count): + labels[j] = idx + mapped_labels[j] = class_ + + data = {"dataset": self.imagenet, "labels": mapped_labels} + return data + + def process_data(self, raw_data) -> any: + """Process raw data to BenchmarkRawImage Data Class.""" + return TLVRawImageDataset( + unique_labels=self.class_labels, + labels=raw_data["labels"], + images=raw_data["dataset"], + ) + + +if __name__ == "__main__": + image_dir = "/store/datasets/ILSVRC" + classes = [] + text = "" + for j, line in enumerate(open(image_dir + "/LOC_synset_mapping.txt")): + text += f"{j}. {line[9:].strip()} \n" + with open("imagenet_classes.txt", "w") as f: + f.write(text) diff --git a/tlv_dataset/loader/nuscenes.py b/tlv_dataset/loader/nuscenes.py new file mode 100644 index 0000000..daeb736 --- /dev/null +++ b/tlv_dataset/loader/nuscenes.py @@ -0,0 +1,315 @@ +from __future__ import annotations + +import copy + +import cv2 +from nuscenes.nuscenes import NuScenes + +from tlv_dataset.common.utility import save_dict_to_pickle +from tlv_dataset.data import TLVDataset +from tlv_dataset.label_mapper.metadata.nuscenes_to_coco import MAPPER_METADATA + + +class NuScenesImageLoader: + """https://www.nuscenes.org/nuscenes?tutorial=nuscenes""" + + def __init__( + self, + dataroot: str, + version: str = "v1.0-mini", + verbose: bool = False, + ): + self.name = "NuScenes" + self._nusc = NuScenes( + version=version, dataroot=dataroot, verbose=verbose + ) + self._nuscene: list = self._nusc.scene + self._scene_data = self.loading_data(self._nuscene) + + def get_label_count(self, lst): + output = {} + + for item in lst: + if isinstance(item, list): + # If the item is a list, sort it, convert it to a tuple, and count occurrences + for i in item: + if i not in output.keys(): + output[i] = 1 + else: + output[i] += 1 + else: + # If the item is not a list, simply count occurrences + output[item] += 1 + + # Convert Counter object to dictionary (optional) + return output + + def map_label(self, nuscenes_label): + return LABEL_MAPPING.get( + nuscenes_label, None + ) # Returns None if no mapping exists + + def parse_object_class(self, annotation_tokens: list): + labels = [] + for annotation_token in annotation_tokens: + annotation = self._nusc.get("sample_annotation", annotation_token) + label = self.map_label(annotation["category_name"]) + if label is not None: + labels.append(label) + return list(set(labels)) + + def convert_to_tlv_dataset(self, raw_data: any): + benchmark_frame = TLVDataset( + ground_truth=True, + ltl_formula="", + proposition=[], + number_of_frame=len(raw_data["images_of_frame"]), + frames_of_interest=[], + labels_of_frames=raw_data["labels_of_frame"], + images_of_frames=raw_data["images_of_frame"], + ) + + def loading_data(self, generate_func: callable = None): + scene_data = {} + for scene in self._nuscene: + scene_token = scene["token"] + scene_data[scene_token] = { + "images_of_frame": [], + "labels_of_frame": [], + } + data_validation = True + for data in self._nusc.sample: + if scene_token == data["scene_token"]: + front_cam_frame = cv2.imread( + self._nusc.get_sample_data_path( + data["data"]["CAM_FRONT"] + ) + ) + # print(len(data["anns"])) + labels = self.parse_object_class(data["anns"]) + # cv2.imwrite("test__.png", front_cam_frame) + if front_cam_frame is not None: + scene_data[scene_token]["images_of_frame"].append( + front_cam_frame + ) + scene_data[scene_token]["labels_of_frame"].append( + labels + ) + else: + data_validation = False + if data_validation: + if generate_func is not None: + generate_func( + self.convert_to_tlv_dataset(scene_data[scene_token]) + ) + else: + save_dict_to_pickle( + dict_obj=scene_data[scene_token], + path=self._save_dir, + file_name=f"nuscene_{scene_token}.pkl", + ) + + return scene_data + + def evaluate_unique_prop(self, unique_prop, lst): + for item in lst: + if isinstance(item, list): + if unique_prop in item: + return True + else: + if unique_prop == item: + return True + else: + return False + + def prop1_u_prop2(self, prop1: str, prop2: str, lst: list[list]): + ltl_formula = f'"{str(prop1)}" U "{str(prop2)}"' + new_prop_set = [prop1, prop2] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if prop1 in label: + tmp_ground_truth.append(idx) + + if isinstance(label, list): + if prop2 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + + return ltl_formula, new_prop_set, ltl_ground_truth + + def f_prop1(self, prop1: str, lst: list[list]): + ltl_formula = f'F "{str(prop1)}"' + new_prop_set = [prop1] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if prop1 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + else: + if prop1 == label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + return ltl_formula, new_prop_set, ltl_ground_truth + + def prop1_and_prop2_u_prop3( + self, prop1: str, prop2: str, prop3: str, lst: list[list] + ): + ltl_formula = f'("{str(prop1)}" & "{str(prop2)}") U "{str(prop3)}"' + new_prop_set = [prop1, prop2, prop3] + ltl_ground_truth = [] + tmp_ground_truth = [] + for idx, label in enumerate(lst): + if isinstance(label, list): + if all(elem in label for elem in [prop1, prop2]): + tmp_ground_truth.append(idx) + + if isinstance(label, list): + if prop3 in label: + tmp_ground_truth.append(idx) + ltl_ground_truth.append(list(set(tmp_ground_truth))) + tmp_ground_truth = [] + + return ltl_formula, new_prop_set, ltl_ground_truth + + def generate_ltl_ground_truth(self, scene_data: dict): + benchmark_frame = TLVDataset( + ground_truth=True, + ltl_formula="", + proposition=[], + number_of_frame=len(scene_data["images_of_frame"]), + frames_of_interest=[], + labels_of_frames=scene_data["labels_of_frame"], + images_of_frames=scene_data["images_of_frame"], + ) + + label_count = self.get_label_count(benchmark_frame.labels_of_frames) + sorted_items = sorted(label_count.items(), key=lambda x: x[1]) + + # Ensure exactly two labels for lowest and highest + try: + if len(sorted_items) == 3: + unique_prpos = [sorted_items[0][0]] # First two unique values + highest_count_props = [ + sorted_items[1][0], + sorted_items[2][0], + ] # [sorted_items[-1][0], sorted_items[-2][0]] # First two unique values + elif len(sorted_items) > 3: + unique_prpos = [sorted_items[0][0], sorted_items[1][0]] + highest_count_props = [sorted_items[2][0], sorted_items[3][0]] + elif len(sorted_items) < 3: + unique_prpos = [sorted_items[0][0]] + highest_count_props = [sorted_items[1][0]] + else: + unique_prpos = [sorted_items[0][0]] + highest_count_props = [sorted_items[0][0]] + + for unique_prop in unique_prpos: + if self.evaluate_unique_prop( + unique_prop, benchmark_frame.labels_of_frames[:5] + ): + ( + ltl_formula, + new_prop_set, + frames_of_interest, + ) = self.f_prop1( + prop1=str(unique_prop), + lst=benchmark_frame.labels_of_frames, + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + else: + for prop in highest_count_props: + if prop != unique_prop: + ( + ltl_formula, + new_prop_set, + frames_of_interest, + ) = self.prop1_u_prop2( + prop1=prop, + prop2=unique_prop, + lst=benchmark_frame.labels_of_frames, + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + if unique_prop not in highest_count_props: + ( + ltl_formula, + new_prop_set, + frames_of_interest, + ) = self.prop1_and_prop2_u_prop3( + prop1=highest_count_props[0], + prop2=highest_count_props[1], + prop3=unique_prop, + lst=benchmark_frame.labels_of_frames, + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + except IndexError: + unique_prop = sorted_items[0][0] + ltl_formula, new_prop_set, frames_of_interest = self.f_prop1( + prop1=unique_prop, lst=benchmark_frame.labels_of_frames + ) + self.update_and_save_benchmark_frame( + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame, + self._save_dir, + ) + + def update_and_save_benchmark_frame( + self, + ltl_formula, + new_prop_set, + frames_of_interest, + benchmark_frame: TLVDataset, + save_dir, + ): + file_name = f"benchmark_nuscene_ltl_{ltl_formula}_{len(benchmark_frame.images_of_frames)}_0.pkl" + benchmark_frame_ = copy.deepcopy(benchmark_frame) + benchmark_frame_.frames_of_interest = frames_of_interest + benchmark_frame_.ltl_formula = ltl_formula + benchmark_frame_.proposition = new_prop_set + # print(len(benchmark_frame_.images_of_frames)) + assert len(benchmark_frame_.images_of_frames) == len( + benchmark_frame_.images_of_frames + ) + assert benchmark_frame_.images_of_frames[0] is not None + assert len(benchmark_frame_.frames_of_interest) > 0 + save_dict_to_pickle( + dict_obj=benchmark_frame_, path=save_dir, file_name=file_name + ) + + @property + def all_labels(self) -> list: + return list(MAPPER_METADATA.keys()) + + +if __name__ == "__main__": + test = NuSceneImageLoader( + version="v1.0-trainval", + dataroot="/opt/Neuro-Symbolic-Video-Frame-Search/store/datasets/NUSCENES/train", + save_dir="/opt/Neuro-Symbolic-Video-Frame-Search/store/nsvs_artifact/nuscene_video", + ) diff --git a/tlv_dataset/loader/waymo.py b/tlv_dataset/loader/waymo.py new file mode 100644 index 0000000..b58a54b --- /dev/null +++ b/tlv_dataset/loader/waymo.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import copy + +import cv2 +from nuscenes.nuscenes import NuScenes + +from tlv_dataset.common.utility import save_dict_to_pickle +from tlv_dataset.data import TLVDataset + + +class WaymoImageLoader: + """WaymoImageLoader""" + + def __init__( + self, + ): + self.name = "Waymo" + + pass