Source code for tonic.datasets.visual_place_recognition

import os
from typing import Callable, Optional

import numpy as np
from importRosbag.importRosbag import importRosbag

from tonic.dataset import Dataset
from tonic.download_utils import check_integrity, download_url
from tonic.io import make_structured_array


[docs]class VPR(Dataset): """`Visual Place Recognition <https://zenodo.org/record/4302805>`_ Event-Based Visual Place Recognition With Ensembles of Temporal Windows. Events have (txyp) ordering. .. note:: To be able to read this dataset and its GPS files, you will need the `pynmea2` package installed. :: @article{fischer2020event, title={Event-based visual place recognition with ensembles of temporal windows}, author={Fischer, Tobias and Milford, Michael}, journal={IEEE Robotics and Automation Letters}, volume={5}, number={4}, pages={6924--6931}, year={2020}, publisher={IEEE} } Parameters: save_to (string): Location to save files to on disk. transform (callable, optional): A callable of transforms to apply to the data. target_transform (callable, optional): A callable of transforms to apply to the targets/labels. transforms (callable, optional): A callable of transforms that is applied to both data and labels at the same time. """ base_url = "https://zenodo.org/record/4302805/files/" recordings = [ # recording names and their md5 hash [ ["dvs_vpr_2020-04-21-17-03-03.bag", "04473f623aec6bda3d7eadfecfc1b2ce"], ["20200421_170039-sunset1_concat.nmea", "11f0107a4df845fd315e9134fbee5c1e"], ], [ ["dvs_vpr_2020-04-22-17-24-21.bag", "ca6db080a4054196fe65825bce3db351"], ["20200422_172431-sunset2_concat.nmea", "ff879bf22a9552a6d8500a98cff6c7f9"], ], [ ["dvs_vpr_2020-04-24-15-12-03.bag", "909569732e323ff04c94379a787f2a69"], ["20200424_151015-daytime_concat.nmea", "867fdf43ef393ac7e8de251c1a5cd585"], ], [ ["dvs_vpr_2020-04-27-18-13-29.bag", "e80b6c0434690908d855445792d4de3b"], ["20200427_181204-night_concat.nmea", "441e6673e0dfc8746f76cd646c4aba8d"], ], [ ["dvs_vpr_2020-04-28-09-14-11.bag", "7854ede61c0947adb0f072a041dc3bad"], ["20200428_091154-morning_concat.nmea", "b86af464ceac478711e52ef4271c198c"], ], [ ["dvs_vpr_2020-04-29-06-20-23.bag", "d7ccfeb6539f1e7b077ab4fe6f45193c"], ["20200429_061912-sunrise_concat.nmea", "ec04cf35c10eb5b519b11297adef024b"], ], ] sensor_size = (346, 260, 2) dtype = np.dtype([("t", int), ("x", int), ("y", int), ("p", int)]) ordering = dtype.names def __init__( self, save_to, transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, transforms: Optional[Callable] = None, ): super().__init__( save_to, transform=transform, target_transform=target_transform, transforms=transforms, ) if not self._check_exists(): self.download()
[docs] def __getitem__(self, index): """ Returns: a tuple of (data, target) where data is another tuple of (events, imu, images) and target is gps positional data. """ (bag_filename, _), (gps_filename, _) = self.recordings[index] file_path = os.path.join(self.location_on_system, bag_filename) topics = importRosbag(filePathOrName=file_path, log="ERROR") events = topics["/dvs/events"] events["ts"] -= events["ts"][0] events["ts"] *= 1e6 events = make_structured_array( events["ts"], events["x"], events["y"], events["pol"], dtype=self.dtype ) imu = topics["/dvs/imu"] imu["ts"] = ((imu["ts"] - imu["ts"][0]) * 1e6).astype(int) images = topics["/dvs/image_raw"] incorrect_shape_indices = [ i for i, image in enumerate(images["frames"]) if not (image.shape[0] == 260 and image.shape[1] == 346) ] for ( index ) in ( incorrect_shape_indices ): # fix frame shapes that don't match for some reason shape_diff_x = self.sensor_size[0] - images["frames"][index].shape[1] shape_diff_y = self.sensor_size[1] - images["frames"][index].shape[0] images["frames"][index] = np.pad( images["frames"][index], [(0, shape_diff_y), (0, shape_diff_x), (0, 0)] ) images["frames"] = np.stack(images["frames"]) # errors for some recordings images["ts"] = ((images["ts"] - images["ts"][0]) * 1e6).astype(int) data = events, imu, images targets = self.read_gps_file( os.path.join(self.location_on_system, gps_filename) ) if self.transform is not None: data = self.transform(data) if self.target_transform is not None: targets = self.target_transform(targets) if self.transforms is not None: data, targets = self.transforms(data, targets) return data, targets
[docs] def __len__(self): return len(self.recordings)
[docs] def download(self): for recording in self.recordings: for filename, md5_hash in recording: download_url( self.base_url + filename, self.location_on_system, filename=filename, md5=md5_hash, )
def _check_exists(self): # check if all filenames are correct files_present = list( [ check_integrity(os.path.join(self.location_on_system, filename)) for recording in self.recordings for filename, md5 in recording ] ) return all(files_present) # code taken from https://github.com/Tobias-Fischer/ensemble-event-vpr/blob/master/read_gps.py
[docs] def read_gps_file(self, nmea_file_path): import pynmea2 nmea_file = open(nmea_file_path, encoding="utf-8") latitudes, longitudes, timestamps = [], [], [] first_timestamp = None previous_lat, previous_lon = 0, 0 for line in nmea_file.readlines(): try: msg = pynmea2.parse(line) if first_timestamp is None: first_timestamp = msg.timestamp if msg.sentence_type not in ["GSV", "VTG", "GSA"]: dist_to_prev = np.linalg.norm( np.array([msg.latitude, msg.longitude]) - np.array([previous_lat, previous_lon]) ) if ( msg.latitude != 0 and msg.longitude != 0 and msg.latitude != previous_lat and msg.longitude != previous_lon and dist_to_prev > 0.0001 ): timestamp_diff = ( (msg.timestamp.hour - first_timestamp.hour) * 3600 + (msg.timestamp.minute - first_timestamp.minute) * 60 + (msg.timestamp.second - first_timestamp.second) ) latitudes.append(msg.latitude) longitudes.append(msg.longitude) timestamps.append(timestamp_diff) previous_lat, previous_lon = msg.latitude, msg.longitude except pynmea2.ParseError as e: continue return np.array(np.vstack((latitudes, longitudes, timestamps))).T