import os
from collections.abc import Callable
from typing import Any
import numpy as np
from tonic.download_utils import extract_archive
from tonic.io import make_structured_array
[docs]class NERDD:
"""`NeRDD <https://github.com/MagriniGabriele/NeRDD>`_
Neuromorphic Event-based Reactive Driving Dataset.
Parameters:
root (string): Location to save files to on disk.
transforms (callable, optional): A callable of transforms that is applied to both data and
labels at the same time.
"""
filename = "NeRDD.zip"
folder_name = "DATA"
sensor_size = (1280, 720, 2)
dtype = np.dtype([("t", np.int32), ("x", np.int16), ("y", np.int16), ("p", bool)])
ordering = dtype.names
def __init__(
self,
root: str,
transforms: Callable | None = None,
):
self.data = []
self.targets = []
self.location_on_system = root
self.transforms = transforms
if not self._check_extracted():
if not self._check_zip_exists():
raise FileNotFoundError(
f"The dataset archive '{self.filename}' was not found in {self.location_on_system}. "
"Please download it manually and place it in the specified directory."
)
self._extract_archive()
self._load_dataset_structure()
def _check_zip_exists(self) -> bool:
"""Check if the zip file exists in the specified directory."""
return os.path.isfile(os.path.join(self.location_on_system, self.filename))
def _check_extracted(self) -> bool:
"""Check if the dataset has been extracted and contains at least 10 .npz files."""
data_path = os.path.join(self.location_on_system, self.folder_name)
if not os.path.isdir(data_path):
return False
npz_files = []
for root, _, files in os.walk(data_path):
npz_files.extend([f for f in files if f.endswith(".npz")])
return len(npz_files) == 115
def _extract_archive(self):
"""Extract the dataset archive."""
print(f"Extracting {self.filename}...")
archive_path = os.path.join(self.location_on_system, self.filename)
extract_archive(archive_path)
print(f"Extraction complete. Files are now in {self.location_on_system}.")
def _load_dataset_structure(self):
"""Load the dataset files and their corresponding labels."""
data_path = os.path.join(self.location_on_system, self.folder_name)
archives = [
f
for f in os.listdir(data_path)
if os.path.isdir(os.path.join(data_path, f))
]
for archive in archives:
archive_path = os.path.join(data_path, archive)
scenes = [
f
for f in os.listdir(archive_path)
if os.path.isdir(os.path.join(archive_path, f))
]
for scene in scenes:
scene_path = os.path.join(archive_path, scene)
event_file = os.path.join(scene_path, "Event", "output_events.npz")
label_file = os.path.join(scene_path, "ev_rgb_coordinates.txt")
if os.path.exists(event_file) and os.path.exists(label_file):
self.data.append(
(event_file, label_file, int(archive[8:]), int(scene))
)
else:
print(
f"Skipping scene {scene} in archive {archive}: missing files."
)
[docs] def __getitem__(self, index: int) -> tuple[Any, Any]:
"""
Returns:
(events, target) where target is index of the target class.
"""
event_file, label_file, archive, scene = self.data[index]
events = np.load(event_file)["data"]
events = make_structured_array(
events[:, 3], # t
events[:, 0], # x
events[:, 1], # y
events[:, 2], # p
dtype=self.dtype,
)
with open(label_file) as f:
bboxes = []
for line in f.readlines():
timestamp, bbox_str = line.strip().split(":")
bbox_coords = list(map(float, bbox_str.split(",")))
bboxes.append([float(timestamp)] + bbox_coords)
bboxes = np.array(bboxes, dtype=np.float32)
targets = {
"archive": archive,
"scene": scene,
"bboxes": bboxes,
}
# Apply transforms if provided
if self.transforms is not None:
events, targets = self.transforms(events, targets)
return events, targets
[docs] def __len__(self):
return len(self.data)