Source code for tonic.datasets.davisdataset

import os
from typing import Callable, List, Optional, Union

import numpy as np
from importRosbag.importRosbag import importRosbag

from tonic.dataset import Dataset
from tonic.download_utils import check_integrity, download_url
from tonic.io import make_structured_array


[docs]class DAVISDATA(Dataset): """`DAVIS event camera dataset <http://rpg.ifi.uzh.ch/davis_data.html>`_ :: @article{mueggler2017event, title={The event-camera dataset and simulator: Event-based data for pose estimation, visual odometry, and SLAM}, author={Mueggler, Elias and Rebecq, Henri and Gallego, Guillermo and Delbruck, Tobi and Scaramuzza, Davide}, journal={The International Journal of Robotics Research}, volume={36}, number={2}, pages={142--149}, year={2017}, publisher={SAGE Publications Sage UK: London, England} } Parameters: save_to (string): Location to save files to on disk. Will save files in a sub folder 'davis_dataset'. recording (string): Use the name of the recording or a list thereof to download it, for example 'dynamic_6dof' or ['slider_far', 'urban']. See project homepage for a list of available recordings. Can use 'all' to download all available recordings. transform (callable, optional): A callable of transforms to apply to events and/or images. target_transform (callable, optional): A callable of transforms to apply to the targets/labels. transforms (callable, optional): A callable of transforms that is applied to both data and labels at the same time. """ base_url = "http://rpg.ifi.uzh.ch/datasets/davis/" recordings = { # recording names and their md5 hash "boxes_6dof": "c919cebc25e564935abfc0a3954bf016", "boxes_rotation": "1ec1ef4f354ce1908ed091703c1f98d0", "boxes_translation": "4fd11a69015022c72b375a4d000ee8cd", "calibration": "6e67817c7b6d2e95ca5796e89d219bac", "dynamic_6dof": "16e8cb3b151da15ff1fe6019b517e695", "dynamic_rotation": "77a2b0a87582e10e702365a38dc0c93c", "dynamic_translation": "2db36fd0c2945cbace48735f0416c506", "hdr_boxes": "67661ed4b472189432e35e8849faf200", "hdr_poster": "b59e37b616a3eda81184183b12b0dce5", "office_spiral": "60fe934450c558aff90ff5ba6b370a05", "office_zigzag": "3eba8fc8f42adbfd394b630852ce1f78", "outdoors_running": "7db2de811cb22b71fa34abd4ab1bba6b", "outdoors_walking": "2ef9b03c87d3c565d30211b7dcfaabc5", "poster_6dof": "e42ef11f523a52f11921cdb4a0fca231", "poster_rotation": "abcd843a894546775e5dda3560979edf", "poster_translation": "bb6736c56ff38f07cbe72613b72d25ed", "shapes_6dof": "9b9495ace2dd82881bcc5c0620dd595f", "shapes_rotation": "ee436cfe74b545fa25a2534f3ef021df", "shapes_translation": "2a805ba32671b237e4e13c023f276be9", "slider_close": "9426bbb70334c7b849dd5dd38eb7f2a9", "slider_depth": "b38a7a373f170f4b6aeca4f36e06f71a", "slider_far": "0f341da6aec0fd1801129e3d3a9981fa", "slider_hdr_close": "34cc10dd212ca1fddd3a8584046d5d1c", "slider_hdr_far": "c310f4f2d62cdf7b8d1c0a49315fb253", "urban": "c22db0b3ecbcbba8d282b0d8c3393851", } sensor_size = (240, 180, 2) dtype = np.dtype([("t", int), ("x", int), ("y", int), ("p", int)]) ordering = dtype.names folder_name = "" def __init__( self, save_to: str, recording: Union[str, List[str]], transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, transforms: Optional[Callable] = None, ): super().__init__( save_to, transform=transform, target_transform=target_transform, transforms=transforms, ) self.selection = ( list(self.recordings.keys()) if recording == "all" else recording ) if not isinstance(self.selection, list): self.selection = [self.selection] for recording in self.selection: if recording not in self.recordings: raise RuntimeError( "Recording {} is not available or in the wrong format.".format( recording ) ) if not self._check_exists(): self.download()
[docs] def __getitem__(self, index): """ Returns: tuple of (data, target), where data is another tuple of (events, imu, images) and target is the opti track ground truth """ filename = os.path.join(self.location_on_system, self.selection[index] + ".bag") topics = importRosbag(filename, log="ERROR") events = topics["/dvs/events"] events["ts"] -= events["ts"][0] events["ts"] *= 1e6 events = make_structured_array( events["ts"], events["x"], events["y"], events["pol"], dtype=self.dtype ) if "/dvs/imu" in topics.keys(): imu = topics["/dvs/imu"] imu["ts"] = ((imu["ts"] - imu["ts"][0]) * 1e6).astype(int) else: imu = None images = topics["/dvs/image_raw"] images["frames"] = np.stack(images["frames"]) images["ts"] = ((images["ts"] - images["ts"][0]) * 1e6).astype(int) data = (events, imu, images) try: target = topics["/optitrack/davis"] target["ts"] = ((target["ts"] - target["ts"][0]) * 1e6).astype(int) except KeyError: target = None if self.transform is not None: data = self.transform(data) if self.target_transform is not None: target = self.target_transform(target) if self.transforms is not None: data, target = self.transforms(data, target) return data, target
[docs] def __len__(self): return len(self.selection)
[docs] def download(self): for recording in self.selection: download_url( self.base_url + recording + ".bag", self.location_on_system, filename=recording + ".bag", md5=self.recordings[recording], )
def _check_exists(self): # check if all filenames are correct files_present = list( [ check_integrity( os.path.join(self.location_on_system, recording + ".bag") ) for recording in self.selection ] ) return all(files_present)