Source code for tonic.functional.to_averaged_timesurface

#! /usr/bin/env python3

from math import ceil

import numpy as np


def _get_ts(event, locmem, time_window, tau, surface_size, decay):
    rho = surface_size // 2
    t_i, t_j = event["t"].astype(np.float32), locmem["t"].astype(np.float32)
    # Starting time stamp, calculated subtracting the time window from the event timestamp.
    t_start = max(0, t_i - time_window)
    # Relative coordinates in the time surfaces.
    ts_x, ts_y = locmem["x"] - event["x"], locmem["y"] - event["y"]
    # Including only the events in the neighbourhood and in the time window.
    mask = np.asarray(
        (np.abs(ts_x) <= rho) & (np.abs(ts_y) <= rho) & (t_j >= t_start)
    ).nonzero()[0]
    # For each event in the local memory that belongs to the spatial and temporal windows and for the current event, a time surface is generated.
    locmem_ts = np.zeros((1 + len(mask), surface_size, surface_size), dtype=np.float32)
    if len(mask) > 0:
        locmem_ts[np.arange(len(mask)), ts_y[mask] + rho, ts_x[mask] + rho] = (
            np.exp(-(t_i - t_j[mask]) / tau)
            if decay == "exp"
            else -(t_i - t_j[mask]) / (3 * tau) + 1
        )
    # Adding the current event time surface.
    locmem_ts[-1, rho, rho] += 1
    # The accumulated time surfaces are returned.
    return np.sum(locmem_ts, axis=0)


def _map_to_locmems(events, sensor_size, cell_size):
    w, h, npols = sensor_size
    wgrid, hgrid = ceil(w / cell_size), ceil(h / cell_size)
    ncells = int(wgrid * hgrid)
    px_to_cell = lambda y, x: int((y // cell_size) * wgrid + x // cell_size)
    # Mapping events to local memories.
    locmems = [[[] for p in range(npols)] for c in range(ncells)]
    for event in events:
        locmems[px_to_cell(int(event["y"]), int(event["x"]))][
            max(0, int(event["p"]))
        ].append(event)
    # Converting the lists in structured NumPy arrays.
    locmems = [
        [
            np.stack(locmems[c][p]) if locmems[c][p] else np.empty((0,))
            for p in range(npols)
        ]
        for c in range(ncells)
    ]
    return locmems


[docs]def to_averaged_timesurface_numpy( events, sensor_size, cell_size, surface_size, time_window, tau, decay, ): """Representation that creates averaged timesurfaces for each event for one recording. Taken from the paper Sironi et al. 2018, HATS: Histograms of averaged time surfaces for robust event-based object classification https://openaccess.thecvf.com/content_cvpr_2018/papers/Sironi_HATS_Histograms_of_CVPR_2018_paper.pdf Parameters: cell_size (int): size of each square in the grid surface_size (int): has to be odd time_window (int): how far back to look for past events for the time averaging. Expressed in microseconds. tau (int): time constant to decay events around occuring event with. Expressed in microseconds. decay (str): can be either 'lin' or 'exp', corresponding to linear or exponential decay. Returns: array of histograms (numpy.Array with shape (n_cells, n_pols, surface_size, surface_size)) """ assert surface_size <= cell_size assert surface_size % 2 != 0 assert "x" and "y" and "t" and "p" in events.dtype.names assert decay == "lin" or decay == "exp" # Organizing the events in cells which are, then, saved as NumPy arrays. locmems = _map_to_locmems(events, sensor_size, cell_size) w, h, npols = sensor_size ncells = int(ceil(w / cell_size) * ceil(h / cell_size)) hist = np.zeros((ncells, npols, surface_size, surface_size), dtype=np.float32) # Now we have fun: we cycle on the local memories and generate the histogram corresponding to each of them. for c in range(ncells): for p in range(npols): hist[c, p, :, :] = np.sum( np.stack( [ _get_ts( locmems[c][p][i], locmems[c][p][:i], time_window, tau, surface_size, decay, ) for i in range(len(locmems[c][p])) ] if locmems[c][p].size > 0 else np.zeros((surface_size, surface_size), dtype=np.float32) ), axis=0, ) / max(1, locmems[c][p].size) return hist