Skip to content

Höfling et al. 2024 Dataset

Mouse retinal ganglion cell responses to natural stimuli, originally published in Höfling et al. (2024): A chromatic feature detector in the retina signals visual context changes, eLife.

Dataset: gin.g-node.org/eulerlab/rgc-natstim

Dataloaders

get_dims_for_loader_dict

get_dims_for_loader_dict(
    dataloaders: dict[str, dict[str, Any]],
) -> dict[str, dict[str, tuple[int, ...]] | tuple]

Borrowed from nnfabrik/utility/nn_helpers.py.

Given a dictionary of DataLoaders, returns a dictionary with same keys as the input and shape information (as returned by get_io_dims) on each keyed DataLoader.

PARAMETER DESCRIPTION
dataloaders

Dictionary of dataloaders.

TYPE: dict of DataLoader

RETURNS DESCRIPTION
dict

A dict containing the result of calling get_io_dims for each entry of the input dict

TYPE: dict[str, dict[str, tuple[int, ...]] | tuple]

Source code in openretina/data_io/hoefling_2024/dataloaders.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_dims_for_loader_dict(dataloaders: dict[str, dict[str, Any]]) -> dict[str, dict[str, tuple[int, ...]] | tuple]:
    """
    Borrowed from nnfabrik/utility/nn_helpers.py.

    Given a dictionary of DataLoaders, returns a dictionary with same keys as the
    input and shape information (as returned by `get_io_dims`) on each keyed DataLoader.

    Args:
        dataloaders (dict of DataLoader): Dictionary of dataloaders.

    Returns:
        dict: A dict containing the result of calling `get_io_dims` for each entry of the input dict
    """
    return {k: get_io_dims(v) for k, v in dataloaders.items()}

get_io_dims

get_io_dims(
    data_loader,
) -> dict[str, tuple[int, ...]] | tuple

Borrowed from nnfabrik/utility/nn_helpers.py.

Returns the shape of the dataset for each item within an entry returned by the data_loader The DataLoader object must return either a namedtuple, dictionary or a plain tuple. If data_loader entry is a namedtuple or a dictionary, a dictionary with the same keys as the namedtuple/dict item is returned, where values are the shape of the entry. Otherwise, a tuple of shape information is returned.

Note that the first dimension is always the batch dim with size depending on the data_loader configuration.

PARAMETER DESCRIPTION
data_loader

is expected to be a pytorch Dataloader object returning either a namedtuple, dictionary, or a plain tuple.

TYPE: DataLoader

Returns: dict or tuple: If data_loader element is either namedtuple or dictionary, a ditionary of shape information, keyed for each entry of dataset is returned. Otherwise, a tuple of shape information is returned. The first dimension is always the batch dim with size depending on the data_loader configuration.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_io_dims(data_loader) -> dict[str, tuple[int, ...]] | tuple:
    """
    Borrowed from nnfabrik/utility/nn_helpers.py.

    Returns the shape of the dataset for each item within an entry returned by the `data_loader`
    The DataLoader object must return either a namedtuple, dictionary or a plain tuple.
    If `data_loader` entry is a namedtuple or a dictionary, a dictionary with the same keys as the
    namedtuple/dict item is returned, where values are the shape of the entry. Otherwise, a tuple of
    shape information is returned.

    Note that the first dimension is always the batch dim with size depending on the data_loader configuration.

    Args:
        data_loader (torch.DataLoader): is expected to be a pytorch Dataloader object returning
            either a namedtuple, dictionary, or a plain tuple.
    Returns:
        dict or tuple: If data_loader element is either namedtuple or dictionary, a ditionary
            of shape information, keyed for each entry of dataset is returned. Otherwise, a tuple
            of shape information is returned. The first dimension is always the batch dim
            with size depending on the data_loader configuration.
    """
    items = next(iter(data_loader))
    if hasattr(items, "_asdict"):  # if it's a named tuple
        items = items._asdict()

    if hasattr(items, "items"):  # if dict like
        return {k: v.shape for k, v in items.items() if isinstance(v, (torch.Tensor, np.ndarray))}
    else:
        return tuple(v.shape for v in items)

filter_nan_collate

filter_nan_collate(batch)

Filters out batches containing NaN values and then calls the default_collate function. Can happen for inferred spikes exported with CASCADE. To be used as a collate_fn in a DataLoader.

PARAMETER DESCRIPTION
batch

A list of tuples representing the batch.

TYPE: list

RETURNS DESCRIPTION

tuple of torch.Tensor: The collated batch after filtering out NaN values.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def filter_nan_collate(batch):
    """
    Filters out batches containing NaN values and then calls the default_collate function.
    Can happen for inferred spikes exported with CASCADE.
    To be used as a collate_fn in a DataLoader.

    Args:
        batch (list): A list of tuples representing the batch.

    Returns:
        tuple of torch.Tensor: The collated batch after filtering out NaN values.

    """
    batch = list(filter(lambda x: not np.isnan(x[1]).any(), batch))
    return default_collate(batch)

filter_different_size

filter_different_size(batch)

Filters out batches that do not have the same shape as most of the other batches.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def filter_different_size(batch):
    """
    Filters out batches that do not have the same shape as most of the other batches.
    """
    # Get the shapes of all the elements in the batch
    shapes = [element[1].shape for element in batch]

    # Find the most common shape in the batch
    most_common_shape = max(set(shapes), key=shapes.count)

    # Filter out elements that do not have the most common shape
    filtered_batch = [element for element in batch if element[1].shape == most_common_shape]

    # If the filtered batch is empty, return None
    return default_collate(filtered_batch) if filtered_batch else None

extract_data_info_from_dataloaders

extract_data_info_from_dataloaders(
    dataloaders: dict[str, dict[str, Any]] | dict[str, Any],
) -> dict[str, dict[str, Any]]

Extracts the data_info dictionary from the provided dataloaders. Args: dataloaders: A dictionary of dataloaders for different sessions. Returns: data_info: A dictionary containing input_dimensions, input_channels, and output_dimension for each session, nested with these attributes as the first level keys and sessions as the second level.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def extract_data_info_from_dataloaders(
    dataloaders: dict[str, dict[str, Any]] | dict[str, Any],
) -> dict[str, dict[str, Any]]:
    """
    Extracts the data_info dictionary from the provided dataloaders.
    Args:
        dataloaders: A dictionary of dataloaders for different sessions.
    Returns:
        data_info: A dictionary containing input_dimensions, input_channels, and output_dimension for each session,
                   nested with these attributes as the first level keys and sessions as the second level.
    """
    # Ensure train loader is used if available and not provided directly
    dataloaders = dataloaders.get("train", dataloaders)

    # Obtain the named tuple fields from the first entry of the first dataloader in the dictionary
    in_name, out_name, *_ = next(iter(list(dataloaders.values())[0]))._fields  # type: ignore

    # Get the input and output dimensions for each session
    session_shape_dict = get_dims_for_loader_dict(dataloaders)

    # Initialize the new structure
    data_info: dict[str, dict[str, Any]] = {k: {} for k in session_shape_dict.keys()}

    # Populate the new structure
    for session_key, shapes in session_shape_dict.items():
        data_info[session_key]["input_dimensions"] = shapes[in_name]
        data_info[session_key]["input_channels"] = shapes[in_name][1]
        data_info[session_key]["output_dimension"] = shapes[out_name][-1]
        data_info[session_key]["mean_response"] = np.array(dataloaders[session_key].dataset.mean_response)  # type: ignore
        data_info[session_key]["roi_coords"] = np.array(dataloaders[session_key].dataset.roi_coords)  # type: ignore

    return data_info

Stimuli

movies_from_pickle

movies_from_pickle(
    file_path: str | PathLike,
) -> MoviesTrainTestSplit

Load movie data from a pickle file and return it as a MoviesTrainTestSplit object.

Source code in openretina/data_io/hoefling_2024/stimuli.py
11
12
13
14
15
def movies_from_pickle(file_path: str | os.PathLike) -> MoviesTrainTestSplit:
    """
    Load movie data from a pickle file and return it as a MoviesTrainTestSplit object.
    """
    return MoviesTrainTestSplit.from_pickle(file_path)

get_all_movie_combinations

get_all_movie_combinations(
    movie_train,
    movie_test,
    random_sequences: ndarray,
    validation_clip_indices: list[int],
    num_clips: int = NUM_CLIPS,
    clip_length: int = CLIP_LENGTH,
)

Generates combinations of movie data for 'left' and 'right' perspectives and prepares training, validation, and test datasets. It reorders the training movies based on random sequences and flips the movies for the 'left' perspective.

Parameters: - movie_train: Tensor representing the training movie data. - movie_test: Tensor representing the test movie data. - random_sequences: Numpy array of random sequences for reordering training movies. - val_clip_idx: list of indices for validation clips. Needs to be between 0 and the number of clips.

  • movies: Dictionary with processed movies for 'left' and 'right' perspectives, each containing 'train', 'validation', and 'test' datasets.
Source code in openretina/data_io/hoefling_2024/stimuli.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_all_movie_combinations(
    movie_train,
    movie_test,
    random_sequences: np.ndarray,
    validation_clip_indices: list[int],
    num_clips: int = NUM_CLIPS,
    clip_length: int = CLIP_LENGTH,
):
    """
    Generates combinations of movie data for 'left' and 'right' perspectives and
    prepares training, validation, and test datasets. It reorders the training
    movies based on random sequences and flips the movies for the 'left' perspective.

    Parameters:
    - movie_train: Tensor representing the training movie data.
    - movie_test: Tensor representing the test movie data.
    - random_sequences: Numpy array of random sequences for reordering training movies.
    - val_clip_idx: list of indices for validation clips. Needs to be between 0 and the number of clips.

    Returns:
    - movies: Dictionary with processed movies for 'left' and 'right' perspectives, each
      containing 'train', 'validation', and 'test' datasets.
    """

    # Generate train, validation, and test datasets
    movie_train_subset, movie_val, movie_test_dict = generate_movie_splits(
        movie_train, {"test": movie_test}, validation_clip_indices, num_clips, clip_length
    )

    # Assemble datasets into the final movies structure using the random sequences
    movies = apply_random_sequences(
        torch.tensor(movie_train, dtype=torch.float),
        movie_train_subset,
        movie_val,
        movie_test_dict["test"],
        random_sequences,
        validation_clip_indices,
        clip_length,
    )

    return movies

gen_start_indices

gen_start_indices(
    random_sequences: ndarray,
    val_clip_idx: list[int],
    clip_length: int,
    chunk_size: int,
    num_clips: int,
)

Generate a list of indices for training chunks while excluding validation clips.

PARAMETER DESCRIPTION
random_sequences

int np array; 108 x 20, giving the ordering of the 108 training clips for the 20 different sequences.

TYPE: ndarray

val_clip_idx

List of integers indicating the clips to be used for validation.

TYPE: list[int]

clip_length

Clip length in frames (5s*30frames/s = 150 frames).

TYPE: int

chunk_size

Temporal chunk size per sample in frames (50).

TYPE: int

num_clips

Total number of training clips (108).

TYPE: int

RETURNS DESCRIPTION
dict

With keys train, validation, and test, and index list as values.

Source code in openretina/data_io/hoefling_2024/stimuli.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def gen_start_indices(
    random_sequences: np.ndarray, val_clip_idx: list[int], clip_length: int, chunk_size: int, num_clips: int
):
    """Generate a list of indices for training chunks while excluding validation clips.

    Args:
        random_sequences: int np array; 108 x 20, giving the ordering of the
            108 training clips for the 20 different sequences.
        val_clip_idx: List of integers indicating the clips to be used for validation.
        clip_length: Clip length in frames (5s*30frames/s = 150 frames).
        chunk_size: Temporal chunk size per sample in frames (50).
        num_clips: Total number of training clips (108).

    Returns:
        dict: With keys train, validation, and test, and index list as values.
    """
    # Validation clip indices are consecutive, because the validation clip and
    # stimuli are already isolated in other functions.
    val_start_idx = list(np.linspace(0, clip_length * (len(val_clip_idx) - 1), len(val_clip_idx), dtype=int))

    start_idx_dict = {"train": {}, "validation": val_start_idx, "test": [0]}

    num_train_clips = num_clips - len(val_clip_idx)

    if random_sequences.shape[1] == 1:
        start_idx_dict["train"] = list(np.arange(0, clip_length * (num_train_clips - 1), chunk_size, dtype=int))
    else:
        for sequence_index in range(random_sequences.shape[1]):
            start_idx_dict["train"][sequence_index] = list(  # type: ignore
                np.arange(0, clip_length * (num_train_clips - 1), chunk_size, dtype=int)
            )

    return start_idx_dict

Responses

NeuronDataSplitHoefling

NeuronDataSplitHoefling(
    neural_responses: ResponsesTrainTestSplit,
    val_clip_idx: Optional[list[int]],
    num_clips: Optional[int],
    clip_length: Optional[int],
    roi_mask: Optional[Float[ndarray, "64 64"]] = None,
    roi_ids: Optional[Float[ndarray, " n_neurons"]] = None,
    scan_sequence_idx: Optional[int] = None,
    random_sequences: Optional[
        Float[ndarray, "n_clips n_sequences"]
    ] = None,
    eye: Optional[Literal["left", "right"]] = None,
    group_assignment: Optional[
        Float[ndarray, " n_neurons"]
    ] = None,
    key: Optional[dict] = None,
    use_base_sequence: Optional[bool] = False,
    **kwargs,
)

Initialize the NeuronData object. Boilerplate class to store neuron data train/test/validation splits before feeding into a dataloader. Customized for compatibility with the data format in Hoefling et al., 2024.

PARAMETER DESCRIPTION
neural_responses

The responses of neurons.

TYPE: ResponsesTrainTestSplit

val_clip_idx

The indices of validation clips.

TYPE: List[int]

num_clips

The number of clips.

TYPE: int

clip_length

The length of each clip.

TYPE: int

roi_mask

region of interest mask of the neurons

TYPE: Optional[Float[ndarray, '64 64']] DEFAULT: None

roi_ids

The IDs of regions of interest (ROIs).

TYPE: Float[ndarray, n_neurons] DEFAULT: None

scan_sequence_idx

The index of the scan sequence.

TYPE: int DEFAULT: None

random_sequences

The random sequences of clips.

TYPE: Float[ndarray, 'n_clips n_sequences'] DEFAULT: None

eye

The eye from which the neuron data is recorded.

TYPE: str DEFAULT: None

group_assignment

The group assignment of neurons.

TYPE: Float[ndarray, n_neurons] DEFAULT: None

key

The key information for the neuron data, includes date, exp_num, experimenter, field_id, stim_id.

TYPE: dict DEFAULT: None

use_base_sequence

Whether to re-order all training responses to use the same "base" sequence.

TYPE: bool DEFAULT: False

Source code in openretina/data_io/hoefling_2024/responses.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(
    self,
    neural_responses: ResponsesTrainTestSplit,
    val_clip_idx: Optional[list[int]],
    num_clips: Optional[int],
    clip_length: Optional[int],
    roi_mask: Optional[Float[np.ndarray, "64 64"]] = None,
    roi_ids: Optional[Float[np.ndarray, " n_neurons"]] = None,
    scan_sequence_idx: Optional[int] = None,
    random_sequences: Optional[Float[np.ndarray, "n_clips n_sequences"]] = None,
    eye: Optional[Literal["left", "right"]] = None,
    group_assignment: Optional[Float[np.ndarray, " n_neurons"]] = None,
    key: Optional[dict] = None,
    use_base_sequence: Optional[bool] = False,
    **kwargs,
):
    """
    Initialize the NeuronData object.
    Boilerplate class to store neuron data train/test/validation splits before feeding into a dataloader.
    Customized for compatibility with the data format in Hoefling et al., 2024.

    Args:
        neural_responses (ResponsesTrainTestSplit): The responses of neurons.
        val_clip_idx (List[int]): The indices of validation clips.
        num_clips (int): The number of clips.
        clip_length (int): The length of each clip.
        roi_mask: region of interest mask of the neurons
        roi_ids (Float[np.ndarray, "n_neurons"]): The IDs of regions of interest (ROIs).
        scan_sequence_idx (int): The index of the scan sequence.
        random_sequences (Float[np.ndarray, "n_clips n_sequences"]): The random sequences of clips.
        eye (str): The eye from which the neuron data is recorded.
        group_assignment (Float[np.ndarray, "n_neurons"]): The group assignment of neurons.
        key (dict): The key information for the neuron data,
                    includes date, exp_num, experimenter, field_id, stim_id.
        use_base_sequence (bool): Whether to re-order all training responses to use the same "base" sequence.
    """
    self.neural_responses = neural_responses

    self.num_neurons = self.neural_responses.n_neurons

    self.eye = eye if eye is not None else "right"
    self.group_assignment = group_assignment
    self.key = key
    self.roi_ids = roi_ids
    self.roi_coords = (
        np.array(self.transform_roi_mask(roi_mask), dtype=np.float32) if roi_mask is not None else None
    )
    self.scan_sequence_idx = scan_sequence_idx
    self.stim_id = self.neural_responses.stim_id
    self.clip_length = clip_length
    self.num_clips = num_clips
    self.random_sequences = random_sequences if random_sequences is not None else np.array([])
    self.use_base_sequence = use_base_sequence
    self.val_clip_idx = val_clip_idx

    self.responses_train_and_val, self.responses_test, self.test_responses_by_trial = (
        self.neural_responses.train.T,
        self.neural_responses.test_response.T,
        self.neural_responses.test_by_trial.transpose(0, 2, 1)
        if self.neural_responses.test_by_trial is not None
        else None,
    )
    self.responses_train, self.responses_val = self.compute_validation_responses()

roi2readout

roi2readout(
    single_roi_mask,
    roi_mask_pixelsize=2,
    readout_mask_pixelsize=50,
    x_offset=2.75,
    y_offset=2.75,
)

Maps a roi mask of a single roi from recording coordinates to model readout coordinates.

PARAMETER DESCRIPTION
single_roi_mask

2d array with nonzero values indicating the pixels of the current roi.

roi_mask_pixelsize

Size of a pixel in the roi mask in um.

DEFAULT: 2

readout_mask_pixelsize

Size of a pixel in the readout mask in um.

DEFAULT: 50

x_offset

X offset indicating the start of the recording field in readout mask.

DEFAULT: 2.75

y_offset

Y offset indicating the start of the recording field in readout mask.

DEFAULT: 2.75

Source code in openretina/data_io/hoefling_2024/responses.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def roi2readout(
    self,
    single_roi_mask,
    roi_mask_pixelsize=2,
    readout_mask_pixelsize=50,
    x_offset=2.75,
    y_offset=2.75,
):
    """Maps a roi mask of a single roi from recording coordinates to model readout coordinates.

    Args:
        single_roi_mask: 2d array with nonzero values indicating the pixels of the current roi.
        roi_mask_pixelsize: Size of a pixel in the roi mask in um.
        readout_mask_pixelsize: Size of a pixel in the readout mask in um.
        x_offset: X offset indicating the start of the recording field in readout mask.
        y_offset: Y offset indicating the start of the recording field in readout mask.
    """
    pixel_factor = readout_mask_pixelsize / roi_mask_pixelsize
    y, x = np.nonzero(single_roi_mask)
    y_trans, x_trans = y / pixel_factor, x / pixel_factor
    y_trans += y_offset
    x_trans += x_offset
    x_trans = x_trans.mean()
    y_trans = y_trans.mean()
    coords = np.asarray(
        [
            self.map_to_range(max=8, val=y_trans),
            self.map_to_range(max=8, val=x_trans),
        ],
        dtype=np.float32,
    )
    return coords

upsample_traces

upsample_traces(
    triggertimes,
    traces,
    tracestimes,
    stim_id: int,
    target_fr: int = 30,
    norm_by_std: bool = True,
) -> ndarray

Upsamples the traces based on the stimulus type.

PARAMETER DESCRIPTION
triggertimes

List of trigger times.

TYPE: list

traces

List of traces.

TYPE: list

tracestimes

List of trace times.

TYPE: list

stim_id

Stimulus ID.

TYPE: int

target_fr

Target frame rate for upsampling. Default is 30.

TYPE: int DEFAULT: 30

RETURNS DESCRIPTION
ndarray

numpy.ndarray: Upsampled responses.

RAISES DESCRIPTION
NotImplementedError

If the stimulus ID is not implemented.

Source code in openretina/data_io/hoefling_2024/responses.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def upsample_traces(
    triggertimes,
    traces,
    tracestimes,
    stim_id: int,
    target_fr: int = 30,
    norm_by_std: bool = True,
) -> np.ndarray:
    """
    Upsamples the traces based on the stimulus type.

    Args:
        triggertimes (list): List of trigger times.
        traces (list): List of traces.
        tracestimes (list): List of trace times.
        stim_id (int): Stimulus ID.
        target_fr (int, optional): Target frame rate for upsampling. Default is 30.

    Returns:
        numpy.ndarray: Upsampled responses.

    Raises:
        NotImplementedError: If the stimulus ID is not implemented.
    """
    if stim_id == 1:
        # Chirp: each chirp has two triggers, one at the start and one 5s later, after a 2s OFF and 3s full field ON.
        # We need only the first trigger of each chirp for the upsampling.
        # 32.98999999 is the total chirp duration in seconds. Should be 33 but there is a small discrepancy
        chirp_starts = triggertimes[::2]
        upsampled_triggertimes = _upsample_triggertimes(32.98999999, 33, chirp_starts, target_fr)
    elif stim_id == 2:
        # Moving bar: each bar has one trigger at the start of the bar stim. Bar duration is 4s.
        # It is a bit more because each trigger has a duration of 3 frames at 60Hz, so around 50 ms.
        upsampled_triggertimes = _upsample_triggertimes(4.054001, 4.1, triggertimes, target_fr)
    elif stim_id == 5:
        # Movie stimulus
        # 4.966666 is the time between triggers in the movie stimulus.
        # It is not exactly 5s because it is not a perfect world :)
        upsampled_triggertimes = _upsample_triggertimes(4.9666667, 5, triggertimes, target_fr)
    else:
        raise NotImplementedError(f"Stimulus ID {stim_id} not implemented")

    upsampled_responses = np.zeros((traces.shape[0], len(upsampled_triggertimes)))
    for i in range(traces.shape[0]):
        upsampled_responses[i] = np.interp(upsampled_triggertimes, tracestimes[i].ravel(), traces[i].ravel())

    if norm_by_std:
        upsampled_responses = upsampled_responses / np.std(
            upsampled_responses, axis=1, keepdims=True
        )  # normalize response std

    return upsampled_responses

upsample_all_responses

upsample_all_responses(
    data_dict: dict,
    response_type: Literal[
        "natural", "chirp", "mb"
    ] = "natural",
    trace_type: Literal[
        "spikes", "raw", "preprocessed", "detrended"
    ] = "spikes",
    d_qi: Optional[float] = None,
    chirp_qi: Optional[float] = None,
    qi_logic: Literal["and", "or"] = "or",
    scale_traces: float = 1.0,
    norm_by_std: bool = True,
) -> dict

Converts inferred spikes into final responses by upsampling the traces of all sessions of a given response_type. This is to match the framerate used in the stimulus presentation.

PARAMETER DESCRIPTION
data_dict

A dictionary containing the data.

TYPE: dict

response_type

The type of response. Defaults to "natural".

TYPE: str DEFAULT: 'natural'

RETURNS DESCRIPTION
dict

The updated data dictionary with final responses.

TYPE: dict

Raises: NotImplementedError: If the conversion is not yet implemented for the given response type.

Source code in openretina/data_io/hoefling_2024/responses.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def upsample_all_responses(
    data_dict: dict,
    response_type: Literal["natural", "chirp", "mb"] = "natural",
    trace_type: Literal["spikes", "raw", "preprocessed", "detrended"] = "spikes",
    d_qi: Optional[float] = None,
    chirp_qi: Optional[float] = None,
    qi_logic: Literal["and", "or"] = "or",
    scale_traces: float = 1.0,
    norm_by_std: bool = True,
) -> dict:
    """
    Converts inferred spikes into final responses by upsampling the traces of all sessions of a given response_type.
    This is to match the framerate used in the stimulus presentation.

    Args:
        data_dict (dict): A dictionary containing the data.
        response_type (str, optional): The type of response. Defaults to "natural".

    Returns:
        dict: The updated data dictionary with final responses.
    Raises:
        NotImplementedError: If the conversion is not yet implemented for the given response type.
    """

    new_data_dict = deepcopy(data_dict)

    stim_id = STIMULI_IDS.get(response_type, None)
    if stim_id is None:
        raise NotImplementedError(f"Conversion not yet implemented for response type {response_type}")

    for field in tqdm(
        new_data_dict.keys(),
        desc=f"Upsampling {response_type} {trace_type} traces to get final responses.",
    ):
        if trace_type == "detrended":
            raw_traces = new_data_dict[field][f"{response_type}_raw_traces"]
            smoothed_traces = new_data_dict[field][f"{response_type}_smoothed_traces"]
            traces = raw_traces - smoothed_traces
        elif trace_type == "spikes":
            try:
                traces = new_data_dict[field][f"{response_type}_inferred_spikes"]
            except KeyError:
                # For new data format
                traces = new_data_dict[field][f"{response_type}_spikes"]
        else:
            traces = new_data_dict[field][f"{response_type}_{trace_type}_traces"]

        triggertimes = new_data_dict[field][f"{response_type}_trigger_times"][0]

        try:
            tracestimes = new_data_dict[field][f"{response_type}_traces_times"]
        except KeyError:
            # New djimaing exports have a different save format for trace_times
            traces_t0 = np.tile(
                np.atleast_1d(new_data_dict[field][f"{response_type}_traces_t0"].squeeze())[:, None],
                (1, traces.shape[1]),
            )
            traces_dt = np.tile(
                np.atleast_1d(new_data_dict[field][f"{response_type}_traces_dt"].squeeze())[:, None],
                (1, traces.shape[1]),
            )
            tracestimes = np.tile(np.arange(traces.shape[1]), reps=(traces.shape[0], 1)) * traces_dt + traces_t0

        upsampled_traces = (
            upsample_traces(
                triggertimes=triggertimes,
                traces=traces,
                tracestimes=tracestimes,
                stim_id=stim_id,
                norm_by_std=norm_by_std,
            )
            * scale_traces
        )

        new_data_dict[field][f"{response_type}_responses_final"] = upsampled_traces

        if "responses_final" in new_data_dict[field]:
            warnings.warn(
                f"You seem to already have computed `responses_final` for a "
                f"stim_id of {new_data_dict[field]['stim_id']}. "
                f"Overwriting with {stim_id} ({response_type})."
            )
        new_data_dict[field]["responses_final"] = upsampled_traces
        new_data_dict[field]["stim_id"] = stim_id

    d_qi = d_qi if d_qi is not None else 0.0
    chirp_qi = chirp_qi if chirp_qi is not None else 0.0
    # Apply quality indices mask only if requested
    if d_qi > 0.0 or chirp_qi > 0.0:
        new_data_dict = _apply_qi_mask(new_data_dict, ["d", "chirp"], [d_qi, chirp_qi], qi_logic)

    return new_data_dict

filter_responses

filter_responses(
    all_responses: dict[str, dict],
    filter_cell_types: bool = False,
    cell_types_list: Optional[list[int] | int] = None,
    chirp_qi: float = 0.35,
    d_qi: float = 0.6,
    qi_logic: Literal["and", "or"] = "or",
    filter_counts: bool = True,
    count_threshold: int = 10,
    classifier_confidence: float = 0.25,
    verbose: bool = False,
) -> dict[str, dict]

This function processes the input dictionary of neuron responses, applying various filters to exclude unwanted data based on the provided parameters. It can filter by cell types, quality indices, classifier confidence, and the number of responding cells, while also providing verbose output for tracking the filtering process. Note: default arguments are from the Hoefling et al., 2024 paper.

PARAMETER DESCRIPTION
all_responses

A dictionary containing neuron response data.

TYPE: Dict[str, dict]

filter_cell_types

Whether to filter by specific cell types. Defaults to False.

TYPE: bool DEFAULT: False

cell_types_list

List or single value of cell types to filter. Defaults to None.

TYPE: Optional[List[int] | int] DEFAULT: None

chirp_qi

Quality index threshold for chirp responses. Defaults to 0.35.

TYPE: float DEFAULT: 0.35

d_qi

Quality index threshold for d responses. Defaults to 0.6.

TYPE: float DEFAULT: 0.6

qi_logic

The logic to combine different quality indices. Defaults to "and".

TYPE: Literal['and', 'or'] DEFAULT: 'or'

filter_counts

Whether to filter based on response counts. Defaults to True.

TYPE: bool DEFAULT: True

count_threshold

Minimum number of responding cells required. Defaults to 10.

TYPE: int DEFAULT: 10

classifier_confidence

Minimum confidence level for classifier responses. Defaults to 0.3.

TYPE: float DEFAULT: 0.25

verbose

If True, prints detailed filtering information. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, dict]

Dict[str, dict]: A filtered dictionary of neuron responses that meet the specified criteria.

Source code in openretina/data_io/hoefling_2024/responses.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def filter_responses(
    all_responses: dict[str, dict],
    filter_cell_types: bool = False,
    cell_types_list: Optional[list[int] | int] = None,
    chirp_qi: float = 0.35,
    d_qi: float = 0.6,
    qi_logic: Literal["and", "or"] = "or",
    filter_counts: bool = True,
    count_threshold: int = 10,
    classifier_confidence: float = 0.25,
    verbose: bool = False,
) -> dict[str, dict]:
    """
    This function processes the input dictionary of neuron responses, applying various filters
    to exclude unwanted data based on the provided parameters. It can filter by cell types,
    quality indices, classifier confidence, and the number of responding cells, while also
    providing verbose output for tracking the filtering process.
    Note: default arguments are from the Hoefling et al., 2024 paper.

    Args:
        all_responses (Dict[str, dict]): A dictionary containing neuron response data.
        filter_cell_types (bool, optional): Whether to filter by specific cell types. Defaults to False.
        cell_types_list (Optional[List[int] | int], optional): List or single value of cell types to filter.
                                                                Defaults to None.
        chirp_qi (float, optional): Quality index threshold for chirp responses. Defaults to 0.35.
        d_qi (float, optional): Quality index threshold for d responses. Defaults to 0.6.
        qi_logic (Literal["and", "or"], optional): The logic to combine different quality indices. Defaults to "and".
        filter_counts (bool, optional): Whether to filter based on response counts. Defaults to True.
        count_threshold (int, optional): Minimum number of responding cells required. Defaults to 10.
        classifier_confidence (float, optional): Minimum confidence level for classifier responses. Defaults to 0.3.
        verbose (bool, optional): If True, prints detailed filtering information. Defaults to False.

    Returns:
        Dict[str, dict]: A filtered dictionary of neuron responses that meet the specified criteria.
    """

    def print_verbose(message):
        if verbose:
            print(message)

    def get_n_neurons(all_responses) -> int:
        return sum(len(field["group_assignment"]) for field in all_responses.values())

    original_neuron_count = get_n_neurons(all_responses)
    print_verbose(f"Original dataset contains {original_neuron_count} neurons over {len(all_responses)} fields")
    print_verbose(" ------------------------------------ ")

    # Filter by cell types
    if filter_cell_types and cell_types_list is not None:
        all_rgcs_responses_ct_filtered = _mask_by_cell_type(all_responses, cell_types_list)
        print_verbose(
            f"Dropped {len(all_responses) - len(all_rgcs_responses_ct_filtered)} fields that did not "
            f"contain the target cell types ({len(all_rgcs_responses_ct_filtered)} remaining)"
        )
        dropped_n_cell_types = original_neuron_count - get_n_neurons(all_rgcs_responses_ct_filtered)
        print_verbose(
            f"Overall, dropped {dropped_n_cell_types} neurons of non-target cell types "
            f"(-{(dropped_n_cell_types) / original_neuron_count:.2%})."
        )
        print_verbose(" ------------------------------------ ")
    else:
        all_rgcs_responses_ct_filtered = deepcopy(all_responses)

    count_before_checks = get_n_neurons(all_rgcs_responses_ct_filtered)

    # Apply quality checks
    all_rgcs_responses_ct_filtered = _apply_qi_mask(
        all_rgcs_responses_ct_filtered, ["d", "chirp"], [d_qi, chirp_qi], qi_logic
    )

    dropped_n_qi = count_before_checks - get_n_neurons(all_rgcs_responses_ct_filtered)

    print_verbose(
        f"Dropped {len(all_responses) - len(all_rgcs_responses_ct_filtered)} fields with quality indices "
        f"below threshold ({len(all_rgcs_responses_ct_filtered)} remaining)"
    )
    print_verbose(
        f"Overall, dropped {dropped_n_qi} neurons over quality checks (-{dropped_n_qi / count_before_checks:.2%})."
    )
    print_verbose(" ------------------------------------ ")

    # Filter by classifier confidence
    if classifier_confidence is not None and classifier_confidence > 0.0:
        all_rgcs_responses_confidence_filtered = _mask_by_classifier_confidence(
            all_rgcs_responses_ct_filtered, classifier_confidence
        )
        dropped_n_classifier = get_n_neurons(all_rgcs_responses_ct_filtered) - get_n_neurons(
            all_rgcs_responses_confidence_filtered
        )
        print_verbose(
            f"Dropped {len(all_rgcs_responses_ct_filtered) - len(all_rgcs_responses_confidence_filtered)} fields with "
            f"classifier confidences below {classifier_confidence}"
        )
        print_verbose(
            f"Overall, dropped {dropped_n_classifier} neurons with classifier confidences below {classifier_confidence}"
            f" (-{dropped_n_classifier / get_n_neurons(all_rgcs_responses_ct_filtered):.2%})."
        )
        print_verbose(" ------------------------------------ ")
    else:
        all_rgcs_responses_confidence_filtered = all_rgcs_responses_ct_filtered

    # Filter by low counts
    if filter_counts:
        all_rgcs_responses = {
            k: v
            for k, v in all_rgcs_responses_confidence_filtered.items()
            if len(v["group_assignment"]) > count_threshold
        }
        dropped_n_counts = get_n_neurons(all_rgcs_responses_confidence_filtered) - get_n_neurons(all_rgcs_responses)
        print_verbose(
            f"Dropped {len(all_rgcs_responses_confidence_filtered) - len(all_rgcs_responses)} fields with less than "
            f"{count_threshold} responding cells ({len(all_rgcs_responses)} remaining)"
        )
        print_verbose(
            f"Overall, dropped {dropped_n_counts} neurons in fields with less than {count_threshold} responding cells "
            f"(-{dropped_n_counts / get_n_neurons(all_rgcs_responses_confidence_filtered):.2%})."
        )
    else:
        all_rgcs_responses = all_rgcs_responses_confidence_filtered

    print_verbose(" ------------------------------------ ")
    print_verbose(
        f"Final dataset contains {get_n_neurons(all_rgcs_responses)} neurons over {len(all_rgcs_responses)} fields"
    )
    final_n_dropped = original_neuron_count - get_n_neurons(all_rgcs_responses)
    print(f"Total number of cells dropped: {final_n_dropped} (-{(final_n_dropped) / original_neuron_count:.2%})")

    # Clean up RAM
    del all_rgcs_responses_ct_filtered
    del all_rgcs_responses_confidence_filtered

    return all_rgcs_responses

Constants