-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #25 from UTAustin-SwarmLab/test-ltl-scripts
For release 0.0.4
- Loading branch information
Showing
38 changed files
with
1,937 additions
and
357 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,7 @@ | ||
#ns_vfs_custom | ||
test_scripts/runs/ | ||
artifacts/ | ||
|
||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from pathlib import Path | ||
|
||
|
||
def get_available_benchmark_video(path_to_directory: str): | ||
if isinstance(path_to_directory, str): | ||
directory_path = Path(path_to_directory) | ||
return list(directory_path.glob("*.pkl")) | ||
else: | ||
directory_path = path_to_directory | ||
return list(directory_path.rglob("*.pkl")) |
164 changes: 164 additions & 0 deletions
164
experiments/evaluate_benchmark_frame_video_with_manual_confidence.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
from __future__ import annotations | ||
|
||
import csv | ||
from pathlib import Path | ||
|
||
from ns_vfs.common.utility import save_frames | ||
from ns_vfs.config.loader import load_config | ||
from ns_vfs.data.frame import BenchmarkLTLFrame, FramesofInterest | ||
from ns_vfs.frame_searcher import FrameSearcher | ||
from ns_vfs.model.vision.grounding_dino import GroundingDino | ||
from ns_vfs.processor.benchmark_video_processor import BenchmarkVideoFrameProcessor | ||
from ns_vfs.video_to_automaton import VideotoAutomaton | ||
|
||
|
||
def get_frames(frames_of_interest: list, benchmark_video: BenchmarkLTLFrame): | ||
output = [] | ||
for frame_set in frames_of_interest: | ||
if len(frame_set) == 1: | ||
idx = frame_set[0] | ||
output.append([benchmark_video.images_of_frames[idx]]) | ||
else: | ||
idx_first, idx_last = frame_set[0], frame_set[-1] | ||
tmp_list = [] | ||
for idx in range(idx_first, idx_last + 1): | ||
tmp_list.append(benchmark_video.images_of_frames[idx]) | ||
output.append(tmp_list) | ||
tmp_list = [] | ||
return output | ||
|
||
|
||
def evaluate_frame_of_interest( | ||
benchmark_video_file: str, | ||
benchmark_video: BenchmarkLTLFrame, | ||
frame_of_interest: FramesofInterest, | ||
directory_path: str, | ||
): | ||
result = dict() | ||
dir_path = Path(directory_path) / benchmark_video_file.name.split(".pkl")[0] | ||
dir_path.mkdir(parents=True, exist_ok=True) | ||
|
||
true_foi_list = benchmark_video.frames_of_interest | ||
total_num_true_foi = len(true_foi_list) | ||
|
||
num_of_mathching_frame_set = sum(1 for a, b in zip(true_foi_list, frame_of_interest.foi_list) if a == b) | ||
frame_set_accuracy = num_of_mathching_frame_set / total_num_true_foi | ||
|
||
# matching_accuracy | ||
flattened_true_foi = set([item for sublist in true_foi_list for item in sublist]) | ||
flattened_predicted_foi = set([item for sublist in frame_of_interest.foi_list for item in sublist]) | ||
true_positive_set = flattened_true_foi.intersection(flattened_predicted_foi) | ||
false_positive_set = flattened_predicted_foi.difference(flattened_true_foi) | ||
false_negatives = flattened_true_foi.difference(flattened_predicted_foi) | ||
|
||
# filename = benchmark_video_file.name.split("_ltl_")[-1].split("_")[0] | ||
dir_path / benchmark_video_file.name.split(".pkl")[0] / ".json" | ||
|
||
result["ltl_formula"] = benchmark_video.ltl_formula | ||
result["total_number_of_frame"] = len(benchmark_video.labels_of_frames) | ||
result["exact_frame_accuracy"] = frame_set_accuracy | ||
result["num_true_positive"] = len(true_positive_set) | ||
result["num_false_positive"] = len(false_positive_set) | ||
result["precision"] = len(true_positive_set) / len(true_positive_set) + len(false_positive_set) | ||
result["recall"] = len(true_positive_set) / len(true_positive_set) + len(false_negatives) | ||
|
||
result["groud_truth_frame"] = benchmark_video.frames_of_interest | ||
result["predicted_frame"] = frame_of_interest.foi_list | ||
|
||
result["total_number_of_framer_of_interest"] = len(benchmark_video.frames_of_interest) | ||
result["total_number_of_frame"] = len(benchmark_video.labels_of_frames) | ||
|
||
i = 0 | ||
for frame_image_set in get_frames(frame_of_interest.foi_list, benchmark_video): | ||
path = Path(directory_path) / benchmark_video_file.name.split(".pkl")[0] / f"video_frame_{i}" | ||
save_frames(frames=frame_image_set, path=path, file_label="predicted_frame") | ||
i += 1 | ||
|
||
# save_dict_to_pickle( | ||
# path=Path(directory_path) / benchmark_video_file.name.split(".pkl")[0], | ||
# dict_obj=result, | ||
# file_name="result.pkl", | ||
# ) | ||
# Specifying the file name | ||
csv_file_name = Path(directory_path) / "data.csv" | ||
|
||
with open(csv_file_name, mode="a", newline="") as file: | ||
writer = csv.DictWriter(file, fieldnames=result.keys()) | ||
|
||
if not csv_file_name.exists(): | ||
# If file does not exist, write header | ||
writer.writeheader() | ||
writer.writerow(result) | ||
|
||
acc_file = Path(directory_path) / "accuracy.txt" | ||
with acc_file.open("a") as f: | ||
f.write( | ||
f"""{result["ltl_formula"]} - total num frame: {result["total_number_of_frame"]} - exact_frame_accuracy: {result["exact_frame_accuracy"]} | ||
num_true_positive: {result["num_true_positive"]}, num_false_positive: {result["num_false_positive"]} , | ||
precision: {result["precision"]} recall: {result["recall"]}\n""" | ||
) | ||
|
||
|
||
def get_available_benchmark_video(path_to_directory: str): | ||
if isinstance(path_to_directory, str): | ||
directory_path = Path(path_to_directory) | ||
return list(directory_path.glob("*.pkl")) | ||
else: | ||
directory_path = path_to_directory | ||
return list(directory_path.rglob("*.pkl")) | ||
|
||
|
||
if __name__ == "__main__": | ||
config = load_config() | ||
benchmark_frame_video_root_dir = Path( | ||
"/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/benchmark_frame_video/" | ||
) | ||
|
||
benchmark_image_set_dir = [x for x in benchmark_frame_video_root_dir.iterdir() if x.is_dir()] | ||
|
||
for benchmark_name_dir in benchmark_image_set_dir: | ||
ltl_video_dir_set = [x for x in benchmark_name_dir.iterdir() if x.is_dir()] | ||
if len(ltl_video_dir_set) > 0: | ||
print(f"--processing {benchmark_name_dir.name}--") | ||
print(f"number of ltl rule: {len(ltl_video_dir_set)}") | ||
for ltl_video_dir in ltl_video_dir_set: | ||
benchmark_video_file_list = get_available_benchmark_video(ltl_video_dir) | ||
print(f"number of examples of {ltl_video_dir.name}: {len(benchmark_video_file_list)}") | ||
|
||
for benchmark_video_file in benchmark_video_file_list: | ||
benchmark_video_processor = BenchmarkVideoFrameProcessor( | ||
video_path=benchmark_video_file, | ||
artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH, | ||
manual_confidence_probability=1.0, | ||
) | ||
|
||
benchmark_img_frame: BenchmarkLTLFrame = benchmark_video_processor.benchmark_image_frames | ||
|
||
video_automata_builder = VideotoAutomaton( | ||
detector=GroundingDino( | ||
config=config.GROUNDING_DINO, | ||
weight_path=config.GROUNDING_DINO.GROUNDING_DINO_CHECKPOINT_PATH, | ||
config_path=config.GROUNDING_DINO.GROUNDING_DINO_CONFIG_PATH, | ||
), | ||
video_processor=benchmark_video_processor, | ||
artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH, | ||
proposition_set=benchmark_img_frame.proposition, | ||
save_annotation=False, # TODO: Debug only | ||
save_image=False, # TODO: Debug only | ||
ltl_formula=f"P>=0.80 [{benchmark_img_frame.ltl_formula}]", | ||
verbose=False, | ||
manual_confidence_probability=1.0, | ||
) | ||
frame_sercher = FrameSearcher( | ||
video_automata_builder=video_automata_builder, | ||
video_processor=benchmark_video_processor, | ||
) | ||
|
||
frame_of_interest = frame_sercher.search() | ||
|
||
evaluate_frame_of_interest( | ||
benchmark_video_file=benchmark_video_file, | ||
benchmark_video=benchmark_img_frame, | ||
frame_of_interest=frame_of_interest, | ||
directory_path="/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/benchmark_eval_results", | ||
) |
129 changes: 129 additions & 0 deletions
129
experiments/experiment_1_frame_search_with_benchmark_video.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 5, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import csv\n", | ||
"from pathlib import Path\n", | ||
"\n", | ||
"from ns_vfs.common.utility import save_frames\n", | ||
"from ns_vfs.config.loader import load_config\n", | ||
"from ns_vfs.data.frame import BenchmarkLTLFrame, FramesofInterest\n", | ||
"from ns_vfs.frame_searcher import FrameSearcher\n", | ||
"from ns_vfs.model.vision.grounding_dino import GroundingDino\n", | ||
"from ns_vfs.processor.benchmark_video_processor import BenchmarkVideoFrameProcessor\n", | ||
"from ns_vfs.video_to_automaton import VideotoAutomaton\n", | ||
"from common import get_available_benchmark_video\n", | ||
"from ns_vfs.model.vision.yolo import Yolo" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"**Global Variable**" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"config = load_config()\n", | ||
"benchmark_frame_video_root_dir = Path(\n", | ||
" \"/opt/Neuro-Symbolic-Video-Frame-Search/artifacts/benchmark_frame_video/\"\n", | ||
")\n", | ||
"benchmark_image_set_dir = [x for x in benchmark_frame_video_root_dir.iterdir() if x.is_dir()]\n", | ||
"cv_model_list = [\"grounding_dino\", \"yolo\"]" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"**Local Variable for the experiment**" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"\"\"\"\n", | ||
"It will go over all available benchmark video and search for frame of interest for each cv detection model.\n", | ||
"\"\"\"\n", | ||
"for benchmark_name_dir in benchmark_image_set_dir:\n", | ||
" ltl_video_dir_set = [x for x in benchmark_name_dir.iterdir() if x.is_dir()]\n", | ||
" if len(ltl_video_dir_set) > 0:\n", | ||
" print(f\"--processing {benchmark_name_dir.name}--\")\n", | ||
" print(f\"number of ltl rule: {len(ltl_video_dir_set)}\")\n", | ||
" for ltl_video_dir in ltl_video_dir_set:\n", | ||
" benchmark_video_file_list = get_available_benchmark_video(ltl_video_dir)\n", | ||
" print(f\"number of examples of {ltl_video_dir.name}: {len(benchmark_video_file_list)}\")\n", | ||
"\n", | ||
" for benchmark_video_file in benchmark_video_file_list:\n", | ||
" for cv_model in cv_model_list:\n", | ||
" if cv_model == \"yolo\":\n", | ||
" cv_detection_model = Yolo(config=config.YOLO,\n", | ||
" weight_path=config.YOLO.YOLO_CHECKPOINT_PATH)\n", | ||
" elif cv_model == \"grounding_dino\":\n", | ||
" cv_detection_model = GroundingDino(\n", | ||
" config=config.GROUNDING_DINO,\n", | ||
" weight_path=config.GROUNDING_DINO.GROUNDING_DINO_CHECKPOINT_PATH,\n", | ||
" config_path=config.GROUNDING_DINO.GROUNDING_DINO_CONFIG_PATH,\n", | ||
" )\n", | ||
" benchmark_video_processor = BenchmarkVideoFrameProcessor(\n", | ||
" video_path=benchmark_video_file,\n", | ||
" artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH,\n", | ||
" manual_confidence_probability=1.0,\n", | ||
" )\n", | ||
"\n", | ||
" benchmark_img_frame: BenchmarkLTLFrame = benchmark_video_processor.benchmark_image_frames\n", | ||
"\n", | ||
" video_automata_builder = VideotoAutomaton(\n", | ||
" detector=cv_detection_model,\n", | ||
" video_processor=benchmark_video_processor,\n", | ||
" artifact_dir=config.VERSION_AND_PATH.ARTIFACTS_PATH,\n", | ||
" proposition_set=benchmark_img_frame.proposition,\n", | ||
" save_annotation=False, # TODO: Debug only\n", | ||
" save_image=False, # TODO: Debug only\n", | ||
" ltl_formula=f\"P>=0.80 [{benchmark_img_frame.ltl_formula}]\",\n", | ||
" verbose=False,\n", | ||
" )\n", | ||
" frame_sercher = FrameSearcher(\n", | ||
" video_automata_builder=video_automata_builder,\n", | ||
" video_processor=benchmark_video_processor,\n", | ||
" )\n", | ||
"\n", | ||
" frame_of_interest = frame_sercher.search()" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.8.10" | ||
}, | ||
"orig_nbformat": 4 | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
def combine_consecutive_lists(data): | ||
if len(data) > 0: | ||
# Normalize data to ensure all elements are lists | ||
data = [[x] if not isinstance(x, list) else x for x in data] | ||
|
||
# Sort the data based on the first element of each sublist | ||
data.sort(key=lambda x: x[0]) | ||
|
||
combined_lists = [data[0]] | ||
|
||
for sublist in data[1:]: | ||
# Check if the last number of the previous sublist is consecutive to the first number of the current sublist | ||
if sublist[0] - combined_lists[-1][-1] == 1: | ||
# If the current sublist is single-item and the previous sublist is also single-item, combine them | ||
if len(sublist) == len(combined_lists[-1]) == 1: | ||
combined_lists[-1].extend(sublist) | ||
# If the current sublist is single-item but the previous sublist is multi-item, append it | ||
elif len(sublist) == 1 and len(combined_lists[-1]) > 1: | ||
combined_lists[-1].append(sublist[0]) | ||
# Otherwise, start a new group | ||
else: | ||
combined_lists.append(sublist) | ||
else: | ||
combined_lists.append(sublist) | ||
|
||
return combined_lists | ||
else: | ||
return [] | ||
|
||
|
||
if __name__ == "__main__": | ||
data = [1, 2, [3], [4, 5], 9, 21] | ||
# data = [[2, 4, 6], [9, 21]] | ||
# data = [[1], [2], [3], [5], [7], [9], [10], [21]] | ||
print(combine_consecutive_lists(data)) |
Oops, something went wrong.