Source code for tonic.datasets.dsec

import os
import warnings
from typing import Callable, List, Optional, Union

import h5py
import numpy as np

from tonic.dataset import Dataset
from tonic.download_utils import download_and_extract_archive, download_url, list_files
from tonic.io import make_structured_array


[docs]class DSEC(Dataset): """`DSEC <https://dsec.ifi.uzh.ch/>`_ This is a fairly large dataset, so in order to save some disk space, event and image zips are deleted after extraction. If your download gets interrupted and you are left with a corrupted file on disk, Tonic will not be able to detect that and just proceed to download files that are not yet on disk. If you experience issues loading a particular recording, delete that folder manually and Tonic will re-download it the next time. Optical flow targets are not available for every recording, so if you select optical flow targets, only a subset of 18 training recordings will be selected. .. note:: To be able to read this dataset, you will need `hdf5plugin`, `PIL` and `imageio` packages installed. Parameters: save_to (str): Location to save files to on disk. split (str): Can be 'train', 'test' or a selection of individual recordings such as 'interlaken_00_c' or ['thun_00_a', 'zurich_city_00_a']. Cannot mix across train/test. data_selection (str): Select which data to load per sample. Can be 'events_left', 'events_right', 'images_rectified_left', 'images_rectified_right', 'image_timestamps' or any combination thereof in a list. target_selection (str, optional): Select which targets to load per sample. Can be 'disparity_events', 'disparity_images', 'disparity_timestamps', 'optical_flow_forward_event', 'optical_flow_forward_timestamps', 'optical_flow_backward_event', 'optical_flow_backward_timestamps' or a combination thereof in a list. Note that optical flow targets are not available for every recording. transform (callable, optional): A callable of transforms to apply to the data. target_transform (callable, optional): A callable of transforms to apply to the targets/labels. transforms (callable, optional): A callable of transforms that is applied to both data and labels at the same time. """ base_url = "https://download.ifi.uzh.ch/rpg/DSEC/" # boolean flag indicates optical flow availability recordings = { "train": { "interlaken_00_c": False, "interlaken_00_d": False, "interlaken_00_e": False, "interlaken_00_f": False, "interlaken_00_g": False, "thun_00_a": True, "zurich_city_00_a": False, "zurich_city_00_b": False, "zurich_city_01_a": True, "zurich_city_01_b": False, "zurich_city_01_c": False, "zurich_city_01_d": False, "zurich_city_01_e": False, "zurich_city_01_f": False, "zurich_city_02_a": True, "zurich_city_02_b": False, "zurich_city_02_c": True, "zurich_city_02_d": True, "zurich_city_02_e": True, "zurich_city_03_a": True, "zurich_city_04_a": False, "zurich_city_04_b": False, "zurich_city_04_c": False, "zurich_city_04_d": False, "zurich_city_04_e": False, "zurich_city_04_f": False, "zurich_city_05_a": True, "zurich_city_05_b": True, "zurich_city_06_a": True, "zurich_city_07_a": True, "zurich_city_08_a": True, "zurich_city_09_a": True, "zurich_city_09_b": False, "zurich_city_09_c": False, "zurich_city_09_d": False, "zurich_city_09_e": False, "zurich_city_10_a": True, "zurich_city_10_b": True, "zurich_city_11_a": True, "zurich_city_11_b": True, "zurich_city_11_c": True, }, "test": [ "thun_01_a", "thun_01_b", "interlaken_00_a", "interlaken_00_b", "interlaken_01_a", "zurich_city_12_a", "zurich_city_13_a", "zurich_city_13_b", "zurich_city_14_a", "zurich_city_14_b", "zurich_city_14_c", "zurich_city_15_a", ], } # that's a combination of the different data available, their # extension when downloaded and their extension when extracted data_names = { "events_left": [".zip", ".h5"], "events_right": [".zip", ".h5"], "image_timestamps": [".txt", ".txt"], "image_exposure_timestamps_left": [".txt", ".txt"], "image_exposure_timestamps_right": [".txt", ".txt"], "images_rectified_left": [".zip", ".png"], "images_rectified_right": [".zip", ".png"], } target_names = { "disparity_event": [".zip", ".png"], "disparity_image": [".zip", ".png"], "disparity_timestamps": [".txt", ".txt"], "optical_flow_forward_event": [".zip", ".png"], "optical_flow_forward_timestamps": [".txt", ".txt"], "optical_flow_backward_event": [".zip", ".png"], "optical_flow_backward_timestamps": [".txt", ".txt"], } sensor_size = (640, 480, 2) dtype = np.dtype([("x", np.int16), ("y", np.int16), ("t", np.int64), ("p", bool)]) ordering = dtype.names def __init__( self, save_to: str, split: Union[str, List[str]], data_selection: Union[str, List[str]], target_selection: Optional[Union[str, List[str]]] = None, transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, transforms: Optional[Callable] = None, ): super().__init__( save_to, transform=transform, target_transform=target_transform, transforms=transforms, ) import imageio imageio.plugins.freeimage.download() if split == "train": self.recording_selection = self.recordings[split].keys() self.train_or_test = split elif split == "test": self.recording_selection = self.recordings[split] self.train_or_test = split else: if not isinstance(split, list): split = [split] for recording in split: if ( recording not in self.recordings["train"] and recording not in self.recordings["test"] ): raise RuntimeError( f"Recording {recording} is neither in train nor in test set." ) self.recording_selection = split if all([recording in self.recordings["train"] for recording in split]): self.train_or_test = "train" elif all([recording in self.recordings["test"] for recording in split]): self.train_or_test = "test" else: raise RuntimeError("Cannot mix across train/test split.") self.train = self.train_or_test == "train" if isinstance(data_selection, str): data_selection = [data_selection] elif data_selection is None: data_selection = [] self.data_selection = data_selection for data_piece in data_selection: if data_piece not in self.data_names.keys(): raise RuntimeError( f"Selection {data_piece} not available. Please select from the following options: {self.data_names.keys()}." ) if isinstance(target_selection, str): target_selection = [target_selection] elif target_selection is None: target_selection = [] self.target_selection = target_selection if not self.train and len(target_selection) > 0: raise Exception( "You wanted targets for the test set but they are not available." ) for data_piece in target_selection: if data_piece not in self.target_names.keys(): raise RuntimeError( f"Selection {data_piece} not available. Please select from the following options: {self.target_names.keys()}." ) # only take those recordings that have optical flow ground truth if any(["optical_flow" in selection for selection in target_selection]): deselect = [ name for name in self.recording_selection if not self.recordings["train"][name] ] if len(deselect) > 0: warnings.warn( f"Since you asked for optical flow targets, the following recordings without optical flow ground truth are dropped: {deselect}." ) self.recording_selection = [ name for name in self.recording_selection if self.recordings["train"][name] ] self._check_exists(data_selection + target_selection)
[docs] def __getitem__(self, index): """ Returns: a tuple of (data, target) where data is another tuple of data_selction and target a tuple of target_selection if train=True. """ import hdf5plugin # necessary to read event files import imageio # necessary to read optical flow pngs from PIL import Image # necessary to read images recording = self.recording_selection[index] base_folder = os.path.join(self.location_on_system, recording) data_tuple = [] for data_name in self.data_selection: full_base_folder = os.path.join(base_folder, data_name) if data_name in ["events_left", "events_right"]: with h5py.File(full_base_folder + "/events.h5", "r") as file: data = {} data[data_name] = make_structured_array( file["events"]["x"][()], file["events"]["y"][()], file["events"]["t"][()], file["events"]["p"][()], dtype=self.dtype, ) data[data_name]["t"] += file["t_offset"][()] data["ms_to_idx"] = file["ms_to_idx"][()] elif "images" in data_name: images_rectified_filenames = sorted( list_files(full_base_folder, ".png", prefix=True) ) data = np.stack( [np.array(Image.open(file)) for file in images_rectified_filenames] ) elif data_name == "image_timestamps": with open(full_base_folder + f"/{recording}_image_timestamps.txt") as f: data = np.array([int(line) for line in f.readlines()]) data_tuple.append(data) if self.transform is not None: data_tuple = self.transform(data_tuple) target_tuple = [] for target_name in self.target_selection: full_base_folder = os.path.join(base_folder, target_name) if target_name in [ "disparity_event", "disparity_image", ]: png_filenames = sorted( list_files(full_base_folder, ".png", prefix=True) ) target = np.stack( [np.array(Image.open(file)) for file in png_filenames] ) elif target_name in [ "optical_flow_forward_event", "optical_flow_backward_event", ]: png_filenames = sorted( list_files(full_base_folder, ".png", prefix=True) ) target = np.array( [imageio.v2.imread(file, format="PNG-FI") for file in png_filenames] ).astype(float) target[:, :, :, :2] -= 2**15 target[:, :, :, :2] /= 128 elif target_name == "disparity_timestamps": with open(full_base_folder + f"/{recording}_{target_name}.txt") as f: target = np.array([int(line) for line in f.readlines()]) elif target_name in [ "optical_flow_forward_timestamps", "optical_flow_backward_timestamps", ]: with open(full_base_folder + f"/{recording}_{target_name}.txt") as f: lines = f.readlines() lines = lines[1:] # first line is a comment # first number is start timestamp, second number is stop timestamp number_strs = [line.split(", ") for line in lines] target = np.array( [(int(start), int(stop)) for start, stop in number_strs] ) target_tuple.append(target) if self.target_transform is not None: target_tuple = self.target_transform(target_tuple) if self.transforms is not None: data_tuple, target_tuple = self.transforms(data_tuple, target_tuple) return data_tuple, target_tuple
[docs] def __len__(self): return len(self.recording_selection)
def _check_exists(self, data_selection: List): all_names = {**self.data_names, **self.target_names} for recording in self.recording_selection: for data_name in data_selection: file_folder = os.path.join( self.location_on_system, recording, data_name ) os.makedirs(file_folder, exist_ok=True) extension, extracted_file_extension = all_names[data_name] file_name = f"{recording}_{data_name + extension}" if any( file.endswith(extracted_file_extension) for file in os.listdir(file_folder) ): continue url = f"{self.base_url}{self.train_or_test}/{recording}/{file_name}" if extension == ".zip": download_and_extract_archive(url, file_folder) os.remove(os.path.join(file_folder, file_name)) else: download_url(url, file_folder)