import json import math import os import shutil from typing import List from TF_merge_images import get_images_from_dir # updated to allow all 4 types def remove_items_not_found_in_every_dataset(json_file, lab_file, hill_dir=None, slope_dir=None, htmc_dir=None, naip_dir=None) -> None: """ Args: json_file: A JSON database with the above information in the images fields lab_file: directory of text files with information on the images hill_dir: Directory of hill images slope_dir: Directory of slope images htmc_dir: Directory of HTMC images naip_dir: Directory of NAIP images Returns: None. Modifies each of the above arguments so they no longer contain values the others don't """ with open(json_file, 'r') as f: database = json.load(f) htmc_imgs: List[str] = get_images_from_dir(htmc_dir) if htmc_dir is not None else None hill_imgs: List[str] = get_images_from_dir(hill_dir) if hill_dir is not None else None slope_imgs: List[str] = get_images_from_dir(slope_dir) if slope_dir is not None else None naip_imgs: List[str] = get_images_from_dir(naip_dir) if naip_dir is not None else None # Find all image prefixes for files we have downloaded htmc_img_prefixes: List[str] = [x.split('/')[4] for x in htmc_imgs] if htmc_imgs is not None else None hill_img_prefixes: List[str] = [x.split('/')[4] for x in hill_imgs] if hill_imgs is not None else None slope_img_prefixes: List[str] = [x.split('/')[4] for x in slope_imgs] if slope_imgs is not None else None naip_img_prefixes: List[str] = [x.split('/')[4] for x in naip_imgs] if naip_imgs is not None else None prefixes = [] types = [] if htmc_img_prefixes is not None: prefixes.append(htmc_img_prefixes) types.append(htmc_dir) if hill_img_prefixes is not None: prefixes.append(hill_img_prefixes) types.append(hill_dir) if slope_img_prefixes is not None: prefixes.append(slope_img_prefixes) types.append(slope_dir) if naip_img_prefixes is not None: prefixes.append(naip_img_prefixes) types.append(naip_dir) # Then, by taking the set intersection we get the elements that are shared amongst all of them image_names = list(set(prefixes.pop()).intersection(*map(set, prefixes))) # For each of the image directories remove any of the image_prefixes not found in their intersection for dir in types: for root, dirs, files in os.walk(dir): for name in files: path = os.path.join(root, name) if name not in image_names: print("Removing ", name) os.remove(path) print("Images: ", image_names) list_of_image_ids = [] for index, image_name in enumerate(database['images']): if image_name['file_name'] not in image_names: print(f"Removing {image_name['file_name']} from the JSON") del database['images'][index] list_of_image_ids.append(image_name['id']) for index, annotation in enumerate(database['annotations']): if annotation['image_id'] not in list_of_image_ids: del database['annotations'][index] with open(json_file, 'w') as f: json.dump(database, f) print("Images after: ", database['images']) for item in os.listdir(lab_file): if item.replace('.txt', '.jpg') not in image_names and item != "dataset.json": print("Removing ", item, " from lab folder") os.remove(f"{lab_file}/{item}") def add_annotation_to_json(id_to_append_to: int, annotation: str, json_file: str): with open(json_file, 'r') as f: database = json.load(f) index_to_append_at = -1 annotation_list = database['annotations'] # print(annotation_list[0]['id']) for idx, annotation in enumerate(annotation_list): if annotation['id'] == id_to_append_to: index_to_append_at = idx print("Index to append at : ", index_to_append_at) def adjust_all_bounding_boxes(x_inc: int, y_inc: int, json_file: str) -> None: """ Purpose: Modify all bounding boxes within a given file. Can be used for modeling performance with different bounding boxes Args: x_inc: Amount to increase the length of the bbox by y_inc: Amount to increase the height of the bbox by json_file: The JSON file containing the annotations to adjust Returns: None. Modifies the given json file """ with open(json_file, 'r') as f: database = json.load(f) def split_dataset(training_ratio: float): file_counter = 0 test_dataset = "../dataset/test_set" training_dataset = "../dataset/training_set" if not os.path.isdir(test_dataset): os.mkdir(test_dataset) for dir_names in ["htmc", "hill", "slope"]: dataset = test_dataset + "/" + dir_names if not os.path.isdir(dataset): os.mkdir(dataset) if not os.path.isdir(training_dataset): os.mkdir(training_dataset) for dir_names in ["htmc", "hill", "slope"]: dataset = training_dataset + "/" + dir_names if not os.path.isdir(dataset): os.mkdir(dataset) directory = '../dataset/' images = '../dataset/test/htmc' label_file = directory + "test/lab/dataset.json" if not os.path.isdir("../dataset/test_set/lab"): os.mkdir("../dataset/test_set/lab") if not os.path.isdir("../dataset/training_set/lab"): os.mkdir("../dataset/training_set/lab") test_label_location = test_dataset + "/lab/dataset.json" shutil.copy(label_file, test_label_location) training_label_location = training_dataset + "/lab/dataset.json" shutil.copy(label_file, training_label_location) for filename in os.listdir(images): file_counter += 1 number_of_training_files = math.floor(file_counter * training_ratio) for index, filename in enumerate(os.listdir(images)): if index < number_of_training_files: for path in ["htmc", "slope", "hill"]: src_path = directory + "test/" + path + "/" + filename dest_path = directory + "training_set/" + path + "/" + filename shutil.copy(src_path, dest_path) else: for path in ["htmc", "slope", "hill"]: src_path = directory + "test/" + path + "/" + filename dest_path = directory + "test_set/" + path + "/" + filename shutil.copy(src_path, dest_path) def dataset_modify_pipeline(): split_dataset(0.8) remove_items_not_found_in_every_dataset("../dataset/test_set/htmc", "../dataset/test_set/hill", "../dataset/test_set/slope", "../dataset/test_set/lab/dataset.json") remove_items_not_found_in_every_dataset("../dataset/training_set/htmc", "../dataset/training_set/hill", "../dataset/training_set/slope", "../dataset/training_set/lab/dataset.json") if __name__ == "__main__": # add_annotation_to_json(1, "Blue", "../dataset/test/lab/dataset.json") # remove_items_not_found_in_every_dataset(htmc_dir, hill_dir, slope_dir, json_database) dataset_modify_pipeline()