Working with larger datasets and multiple data#

Some datasets contain GB of multiple data types and targets. In this tutorial we are going to look at how we can slice different data types at the same time in long recordings into smaller chunks and how we can cache those slices to disk for efficient loading. As an example we are going to work with the DAVIS dataset. It’s convenient since we can pick and download single recordings of a few hundred MB in size for the purpose of this tutorial but the lessons learned will scale to larger datasets as well. One recording contains a tuple of data for (events, imu, images). Let’s start by downloading it. This tutorial also works with the Visual Place Recognition dataset (VPR), but be aware that it’s much larger at ~74 GB.

import tonic
import numpy as np
dataset = tonic.datasets.DAVISDATA(
    save_to="./data", recording=["shapes_6dof", "shapes_rotation"]
)

Not only do we want to slice the events for this recording, we also want to slice imu and image data at the same time steps. For that we’ll have to write a custom slicing method which implements the tonic.slicers.Slicer protocol. That means that we need to implement at least get_slice_metadata and slice_with_metadata without having to subclass it.

from tonic.slicers import SliceByTime
from dataclasses import dataclass
from typing import Any, List, Tuple


@dataclass
class MultiDataSlicer:
    time_window: float
    overlap: float = 0.0
    include_incomplete: bool = False

    # this method receives all the data for one recording/sample.
    # Based on the timestamps in there, we'll work out the boundaries
    # of slices, in this case according to a time window. This method
    # is called once per sample.
    def get_slice_metadata(self, data, targets):
        events, imu, images = data
        min_ts = min(min(events["t"]), min(imu["ts"]), min(images["ts"]))
        max_ts = max(max(events["t"]), max(imu["ts"]), max(images["ts"]))
        stride = self.time_window - self.overlap
        if self.include_incomplete:
            n_slices = int(np.ceil(((max_ts - min_ts) - self.time_window) / stride) + 1)
        else:
            n_slices = int(
                np.floor(((max_ts - min_ts) - self.time_window) / stride) + 1
            )
        window_start_times = np.arange(n_slices) * stride + min_ts
        window_end_times = window_start_times + self.time_window
        return list(zip(window_start_times, window_end_times))

    # Even if we are only interested in a single slice, the data is still stored in a file for the
    # whole recording. To access that slice, we thus need to load the whole recording and then pick
    # the part of it that we are interested in. This method receives the whole data recording and
    # metadata about where a slice starts and stops. This can be timestamps, indices or other things.
    # In this example we just copy the targets for each new slice by passing them along.
    @staticmethod
    def slice_with_metadata(data: Tuple[Any], targets: Tuple[Any], metadata: List[Tuple[int, int]]):
        events, imu, images = data  # this is data for a whole recording
        start, stop = metadata[0][0], metadata[0][1]
        event_slice = events[np.logical_and(events["t"] >= start, events["t"] < stop)]
        imu_slice = {}
        imu_slice["ts"] = imu["ts"][
            np.logical_and(imu["ts"] >= start, imu["ts"] < stop)
        ]
        imu_slice["rotQ"] = imu["rotQ"][
            np.logical_and(imu["ts"] >= start, imu["ts"] < stop)
        ]
        imu_slice["angV"] = imu["angV"][
            np.logical_and(imu["ts"] >= start, imu["ts"] < stop)
        ]
        imu_slice["acc"] = imu["acc"][
            np.logical_and(imu["ts"] >= start, imu["ts"] < stop)
        ]
        imu_slice["mag"] = imu["mag"][
            np.logical_and(imu["ts"] >= start, imu["ts"] < stop)
        ]
        image_slice = {}
        image_slice["ts"] = images["ts"][
            np.logical_and(images["ts"] >= start, images["ts"] < stop)
        ]
        image_slice["frames"] = images["frames"][
            np.logical_and(images["ts"] >= start, images["ts"] < stop)
        ]
        return (event_slice, imu_slice, image_slice), targets

Now that we specified how our recording should be chunked, we’ll wrap our dataset in a SlicedDataset class, where we pass our MultiDataSlicer object. To showcase a common use case, we’ll also specify a ToFrame transform which will be applied to every slice after loading it.

from tonic import SlicedDataset
import tonic.transforms as transforms

# the time length of one slice of recording
slicing_time_window = 200000
slicer = MultiDataSlicer(time_window=slicing_time_window)

# bin events in a slice to frames
frame_transform = transforms.ToFrame(sensor_size=dataset.sensor_size, time_window=2000)


def custom_transform(data):
    events, imu, images = data
    return (frame_transform(events), imu, images)

Because it is quite expensive to compute the metadata for a large dataset, we’ll also provide a path where it is stored.

len(dataset)
%%time
sliced_dataset = SlicedDataset(
    dataset,
    slicer=slicer,
    transform=custom_transform,
    metadata_path="./metadata/large_datasets",
)
print(f"Cut a dataset of {len(dataset)} recording into {len(sliced_dataset)} slices.")

The next time we instantiate this SlicedDataset, we’ll just load it from disk for a considerable speed up of accessing slicing metadata.

%%time
sliced_dataset = SlicedDataset(
    dataset,
    slicer=slicer,
    transform=custom_transform,
    metadata_path="./metadata/large_datasets",
)

In a last step, we are going to make use of caching. This is important to avoid loading the whole recording whenever we want to load a slice. When we wrap our sliced dataset in a CachedDataset, we write the data for one slice to disk. Next time want that same slice, we can just load it from disk, where it sits in an efficient format.

from tonic import SlicedDataset, CachedDataset

cached_dataset = CachedDataset(sliced_dataset, cache_path="./cache/large_datasets")

The first time we access a sliced sample, under the hood Tonic loads the whole recording, slices it according to metadata, applies transforms and eventually writes it to a cache directory before returning the results.

%%time
# first time access
(event_frames, imu, images), targets = cached_dataset[1]

Let’s verify that the data looks alright:

print(
    f"Event frames have a shape of {event_frames.shape},\nimages for this slice have a shape of {images['frames'].shape} and go from {images['ts'][0]} to {images['ts'][-1]} microseconds\nand imu time stamps range from {imu['ts'][0]} to {imu['ts'][-1]} microseconds."
)

Next time we access this particular sample, it will be faster:

%%time
# second time access
(event_frames, imu, images), targets = cached_dataset[1]

Last but not least we also check the disk footprint of a single slice in cache. During caching, we make use of lightweight lzf compression, which can save a lot of space.

from pathlib import Path

print(
    f"Last slice takes {sum(p.stat().st_size for p in Path('./cache/large_datasets').rglob('*'))/1e6} MB on disk."
)

That’s pretty good for some 100 images, plus imu and gps data.