Source code for tonic.functional.to_frame

import numpy as np

from tonic.slicers import (
    slice_events_by_count,
    slice_events_by_event_bins,
    slice_events_by_time,
    slice_events_by_time_bins,
)


[docs]def to_frame_numpy( events, sensor_size, time_window=None, event_count=None, n_time_bins=None, n_event_bins=None, overlap=0.0, include_incomplete=False, ): """Accumulate events to frames by slicing along constant time (time_window), constant number of events (event_count) or constant number of frames (n_time_bins / n_event_bins). Parameters: events: ndarray of shape [num_events, num_event_channels] sensor_size: size of the sensor that was used [W,H,P] time_window (None): window length in us. event_count (None): number of events per frame. n_time_bins (None): fixed number of frames, sliced along time axis. n_event_bins (None): fixed number of frames, sliced along number of events in the recording. overlap (0.): overlap between frames defined either in time in us, number of events or number of bins. include_incomplete (False): if True, includes overhang slice when time_window or event_count is specified. Not valid for bin_count methods. Returns: numpy array with dimensions (TxPxHxW) """ assert "x" and "t" and "p" in events.dtype.names if ( not sum( param is not None for param in [time_window, event_count, n_time_bins, n_event_bins] ) == 1 ): raise ValueError( "Please assign a value to exactly one of the parameters time_window," " event_count, n_time_bins or n_event_bins." ) if not sensor_size: sensor_size_x = int(events["x"].max() + 1) sensor_size_p = len(np.unique(events["p"])) if "y" in events.dtype.names: sensor_size_y = int(events["y"].max() + 1) sensor_size = (sensor_size_x, sensor_size_y, sensor_size_p) else: sensor_size = (sensor_size_x, 1, sensor_size_p) # test for single polarity if sensor_size[2] == 1: if np.unique(events["p"]).size > 1: raise ValueError( "Single polarity sensor, but events contain both polarities." ) events["p"] = 0 if time_window: event_slices = slice_events_by_time( events, time_window, overlap=overlap, include_incomplete=include_incomplete ) elif event_count: event_slices = slice_events_by_count( events, event_count, overlap=overlap, include_incomplete=include_incomplete ) elif n_time_bins: event_slices = slice_events_by_time_bins(events, n_time_bins, overlap=overlap) elif n_event_bins: event_slices = slice_events_by_event_bins(events, n_event_bins, overlap=overlap) if "y" in events.dtype.names: frames = np.zeros((len(event_slices), *sensor_size[::-1]), dtype=np.int16) for i, event_slice in enumerate(event_slices): np.add.at( frames, (i, event_slice["p"].astype(int), event_slice["y"], event_slice["x"]), 1, ) else: frames = np.zeros( (len(event_slices), sensor_size[2], sensor_size[0]), dtype=np.int16 ) for i, event_slice in enumerate(event_slices): np.add.at(frames, (i, event_slice["p"].astype(int), event_slice["x"]), 1) return frames