Skip to content

Höfling et al. 2024 Dataset

Dataloaders

extract_data_info_from_dataloaders(dataloaders)

Extracts the data_info dictionary from the provided dataloaders. Args: dataloaders: A dictionary of dataloaders for different sessions. Returns: data_info: A dictionary containing input_dimensions, input_channels, and output_dimension for each session, nested with these attributes as the first level keys and sessions as the second level.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def extract_data_info_from_dataloaders(
    dataloaders: dict[str, dict[str, Any]] | dict[str, Any],
) -> dict[str, dict[str, Any]]:
    """
    Extracts the data_info dictionary from the provided dataloaders.
    Args:
        dataloaders: A dictionary of dataloaders for different sessions.
    Returns:
        data_info: A dictionary containing input_dimensions, input_channels, and output_dimension for each session,
                   nested with these attributes as the first level keys and sessions as the second level.
    """
    # Ensure train loader is used if available and not provided directly
    dataloaders = dataloaders.get("train", dataloaders)

    # Obtain the named tuple fields from the first entry of the first dataloader in the dictionary
    in_name, out_name, *_ = next(iter(list(dataloaders.values())[0]))._fields  # type: ignore

    # Get the input and output dimensions for each session
    session_shape_dict = get_dims_for_loader_dict(dataloaders)

    # Initialize the new structure
    data_info: dict[str, dict[str, Any]] = {k: {} for k in session_shape_dict.keys()}

    # Populate the new structure
    for session_key, shapes in session_shape_dict.items():
        data_info[session_key]["input_dimensions"] = shapes[in_name]
        data_info[session_key]["input_channels"] = shapes[in_name][1]
        data_info[session_key]["output_dimension"] = shapes[out_name][-1]
        data_info[session_key]["mean_response"] = np.array(dataloaders[session_key].dataset.mean_response)  # type: ignore
        data_info[session_key]["roi_coords"] = np.array(dataloaders[session_key].dataset.roi_coords)  # type: ignore

    return data_info

filter_different_size(batch)

Filters out batches that do not have the same shape as most of the other batches.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def filter_different_size(batch):
    """
    Filters out batches that do not have the same shape as most of the other batches.
    """
    # Get the shapes of all the elements in the batch
    shapes = [element[1].shape for element in batch]

    # Find the most common shape in the batch
    most_common_shape = max(set(shapes), key=shapes.count)

    # Filter out elements that do not have the most common shape
    filtered_batch = [element for element in batch if element[1].shape == most_common_shape]

    # If the filtered batch is empty, return None
    return default_collate(filtered_batch) if filtered_batch else None

filter_nan_collate(batch)

Filters out batches containing NaN values and then calls the default_collate function. Can happen for inferred spikes exported with CASCADE. To be used as a collate_fn in a DataLoader.

Parameters:

Name Type Description Default
batch list

A list of tuples representing the batch.

required

Returns:

Type Description

tuple of torch.Tensor: The collated batch after filtering out NaN values.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def filter_nan_collate(batch):
    """
    Filters out batches containing NaN values and then calls the default_collate function.
    Can happen for inferred spikes exported with CASCADE.
    To be used as a collate_fn in a DataLoader.

    Args:
        batch (list): A list of tuples representing the batch.

    Returns:
        tuple of torch.Tensor: The collated batch after filtering out NaN values.

    """
    batch = list(filter(lambda x: not np.isnan(x[1]).any(), batch))
    return default_collate(batch)

get_dims_for_loader_dict(dataloaders)

Borrowed from nnfabrik/utility/nn_helpers.py.

Given a dictionary of DataLoaders, returns a dictionary with same keys as the input and shape information (as returned by get_io_dims) on each keyed DataLoader.

Parameters:

Name Type Description Default
dataloaders dict of DataLoader

Dictionary of dataloaders.

required

Returns:

Name Type Description
dict dict[str, dict[str, tuple[int, ...]] | tuple]

A dict containing the result of calling get_io_dims for each entry of the input dict

Source code in openretina/data_io/hoefling_2024/dataloaders.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_dims_for_loader_dict(dataloaders: dict[str, dict[str, Any]]) -> dict[str, dict[str, tuple[int, ...]] | tuple]:
    """
    Borrowed from nnfabrik/utility/nn_helpers.py.

    Given a dictionary of DataLoaders, returns a dictionary with same keys as the
    input and shape information (as returned by `get_io_dims`) on each keyed DataLoader.

    Args:
        dataloaders (dict of DataLoader): Dictionary of dataloaders.

    Returns:
        dict: A dict containing the result of calling `get_io_dims` for each entry of the input dict
    """
    return {k: get_io_dims(v) for k, v in dataloaders.items()}

get_io_dims(data_loader)

Borrowed from nnfabrik/utility/nn_helpers.py.

Returns the shape of the dataset for each item within an entry returned by the data_loader The DataLoader object must return either a namedtuple, dictionary or a plain tuple. If data_loader entry is a namedtuple or a dictionary, a dictionary with the same keys as the namedtuple/dict item is returned, where values are the shape of the entry. Otherwise, a tuple of shape information is returned.

Note that the first dimension is always the batch dim with size depending on the data_loader configuration.

Parameters:

Name Type Description Default
data_loader DataLoader

is expected to be a pytorch Dataloader object returning either a namedtuple, dictionary, or a plain tuple.

required

Returns: dict or tuple: If data_loader element is either namedtuple or dictionary, a ditionary of shape information, keyed for each entry of dataset is returned. Otherwise, a tuple of shape information is returned. The first dimension is always the batch dim with size depending on the data_loader configuration.

Source code in openretina/data_io/hoefling_2024/dataloaders.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_io_dims(data_loader) -> dict[str, tuple[int, ...]] | tuple:
    """
    Borrowed from nnfabrik/utility/nn_helpers.py.

    Returns the shape of the dataset for each item within an entry returned by the `data_loader`
    The DataLoader object must return either a namedtuple, dictionary or a plain tuple.
    If `data_loader` entry is a namedtuple or a dictionary, a dictionary with the same keys as the
    namedtuple/dict item is returned, where values are the shape of the entry. Otherwise, a tuple of
    shape information is returned.

    Note that the first dimension is always the batch dim with size depending on the data_loader configuration.

    Args:
        data_loader (torch.DataLoader): is expected to be a pytorch Dataloader object returning
            either a namedtuple, dictionary, or a plain tuple.
    Returns:
        dict or tuple: If data_loader element is either namedtuple or dictionary, a ditionary
            of shape information, keyed for each entry of dataset is returned. Otherwise, a tuple
            of shape information is returned. The first dimension is always the batch dim
            with size depending on the data_loader configuration.
    """
    items = next(iter(data_loader))
    if hasattr(items, "_asdict"):  # if it's a named tuple
        items = items._asdict()

    if hasattr(items, "items"):  # if dict like
        return {k: v.shape for k, v in items.items() if isinstance(v, (torch.Tensor, np.ndarray))}
    else:
        return tuple(v.shape for v in items)

Stimuli

gen_start_indices(random_sequences, val_clip_idx, clip_length, chunk_size, num_clips)

Optimized function to generate a list of indices for training chunks while excluding validation clips.

:param random_sequences: int np array; 108 x 20, giving the ordering of the 108 training clips for the 20 different sequences :param val_clip_idx: list of integers indicating the clips to be used for validation :param clip_length: clip length in frames (5s*30frames/s = 150 frames) :param chunk_size: temporal chunk size per sample in frames (50) :param num_clips: total number of training clips (108) :return: dict; with keys train, validation, and test, and index list as values

Source code in openretina/data_io/hoefling_2024/stimuli.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def gen_start_indices(
    random_sequences: np.ndarray, val_clip_idx: list[int], clip_length: int, chunk_size: int, num_clips: int
):
    """
    Optimized function to generate a list of indices for training chunks while
    excluding validation clips.

    :param random_sequences: int np array; 108 x 20, giving the ordering of the
                             108 training clips for the 20 different sequences
    :param val_clip_idx:     list of integers indicating the clips to be used
                             for validation
    :param clip_length:      clip length in frames (5s*30frames/s = 150 frames)
    :param chunk_size:       temporal chunk size per sample in frames (50)
    :param num_clips:        total number of training clips (108)
    :return: dict; with keys train, validation, and test, and index list as
             values
    """
    # Validation clip indices are consecutive, because the validation clip and
    # stimuli are already isolated in other functions.
    val_start_idx = list(np.linspace(0, clip_length * (len(val_clip_idx) - 1), len(val_clip_idx), dtype=int))

    start_idx_dict = {"train": {}, "validation": val_start_idx, "test": [0]}

    num_train_clips = num_clips - len(val_clip_idx)

    if random_sequences.shape[1] == 1:
        start_idx_dict["train"] = list(np.arange(0, clip_length * (num_train_clips - 1), chunk_size, dtype=int))
    else:
        for sequence_index in range(random_sequences.shape[1]):
            start_idx_dict["train"][sequence_index] = list(  # type: ignore
                np.arange(0, clip_length * (num_train_clips - 1), chunk_size, dtype=int)
            )

    return start_idx_dict

get_all_movie_combinations(movie_train, movie_test, random_sequences, validation_clip_indices, num_clips=NUM_CLIPS, clip_length=CLIP_LENGTH)

Generates combinations of movie data for 'left' and 'right' perspectives and prepares training, validation, and test datasets. It reorders the training movies based on random sequences and flips the movies for the 'left' perspective.

Parameters: - movie_train: Tensor representing the training movie data. - movie_test: Tensor representing the test movie data. - random_sequences: Numpy array of random sequences for reordering training movies. - val_clip_idx: list of indices for validation clips. Needs to be between 0 and the number of clips.

  • movies: Dictionary with processed movies for 'left' and 'right' perspectives, each containing 'train', 'validation', and 'test' datasets.
Source code in openretina/data_io/hoefling_2024/stimuli.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_all_movie_combinations(
    movie_train,
    movie_test,
    random_sequences: np.ndarray,
    validation_clip_indices: list[int],
    num_clips: int = NUM_CLIPS,
    clip_length: int = CLIP_LENGTH,
):
    """
    Generates combinations of movie data for 'left' and 'right' perspectives and
    prepares training, validation, and test datasets. It reorders the training
    movies based on random sequences and flips the movies for the 'left' perspective.

    Parameters:
    - movie_train: Tensor representing the training movie data.
    - movie_test: Tensor representing the test movie data.
    - random_sequences: Numpy array of random sequences for reordering training movies.
    - val_clip_idx: list of indices for validation clips. Needs to be between 0 and the number of clips.

    Returns:
    - movies: Dictionary with processed movies for 'left' and 'right' perspectives, each
      containing 'train', 'validation', and 'test' datasets.
    """

    # Generate train, validation, and test datasets
    movie_train_subset, movie_val, movie_test_dict = generate_movie_splits(
        movie_train, {"test": movie_test}, validation_clip_indices, num_clips, clip_length
    )

    # Assemble datasets into the final movies structure using the random sequences
    movies = apply_random_sequences(
        torch.tensor(movie_train, dtype=torch.float),
        movie_train_subset,
        movie_val,
        movie_test_dict["test"],
        random_sequences,
        validation_clip_indices,
        clip_length,
    )

    return movies

movies_from_pickle(file_path)

Load movie data from a pickle file and return it as a MoviesTrainTestSplit object.

Source code in openretina/data_io/hoefling_2024/stimuli.py
11
12
13
14
15
def movies_from_pickle(file_path: str | os.PathLike) -> MoviesTrainTestSplit:
    """
    Load movie data from a pickle file and return it as a MoviesTrainTestSplit object.
    """
    return MoviesTrainTestSplit.from_pickle(file_path)

Responses

NeuronDataSplitHoefling

Source code in openretina/data_io/hoefling_2024/responses.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class NeuronDataSplitHoefling:
    def __init__(
        self,
        neural_responses: ResponsesTrainTestSplit,
        val_clip_idx: Optional[list[int]],
        num_clips: Optional[int],
        clip_length: Optional[int],
        roi_mask: Optional[Float[np.ndarray, "64 64"]] = None,
        roi_ids: Optional[Float[np.ndarray, " n_neurons"]] = None,
        scan_sequence_idx: Optional[int] = None,
        random_sequences: Optional[Float[np.ndarray, "n_clips n_sequences"]] = None,
        eye: Optional[Literal["left", "right"]] = None,
        group_assignment: Optional[Float[np.ndarray, " n_neurons"]] = None,
        key: Optional[dict] = None,
        use_base_sequence: Optional[bool] = False,
        **kwargs,
    ):
        """
        Initialize the NeuronData object.
        Boilerplate class to store neuron data train/test/validation splits before feeding into a dataloader.
        Customized for compatibility with the data format in Hoefling et al., 2024.

        Args:
            eye (str): The eye from which the neuron data is recorded.
            group_assignment (Float[np.ndarray, "n_neurons"]): The group assignment of neurons.
            key (dict): The key information for the neuron data,
                        includes date, exp_num, experimenter, field_id, stim_id.
            responses_final (Float[np.ndarray, "n_neurons n_timepoints"]): The responses of neurons.
            roi_coords (Float[np.ndarray, "n_neurons 2"]): The coordinates of regions of interest (ROIs).
            roi_ids (Float[np.ndarray, "n_neurons"]): The IDs of regions of interest (ROIs).
            scan_sequence_idx (int): The index of the scan sequence.
            stim_id (int): The ID of the stimulus. 5 is mouse natural scenes.
            random_sequences (Float[np.ndarray, "n_clips n_sequences"]): The random sequences of clips.
            val_clip_idx (List[int]): The indices of validation clips.
            num_clips (int): The number of clips.
            clip_length (int): The length of each clip.
            use_base_sequence (bool): Whether to re-order all training responses to use the same "base" sequence.
        """
        self.neural_responses = neural_responses

        self.num_neurons = self.neural_responses.n_neurons

        self.eye = eye if eye is not None else "right"
        self.group_assignment = group_assignment
        self.key = key
        self.roi_ids = roi_ids
        self.roi_coords = (
            np.array(self.transform_roi_mask(roi_mask), dtype=np.float32) if roi_mask is not None else None
        )
        self.scan_sequence_idx = scan_sequence_idx
        self.stim_id = self.neural_responses.stim_id
        self.clip_length = clip_length
        self.num_clips = num_clips
        self.random_sequences = random_sequences if random_sequences is not None else np.array([])
        self.use_base_sequence = use_base_sequence
        self.val_clip_idx = val_clip_idx

        self.responses_train_and_val, self.responses_test, self.test_responses_by_trial = (
            self.neural_responses.train.T,
            self.neural_responses.test_response.T,
            self.neural_responses.test_by_trial.transpose(0, 2, 1)
            if self.neural_responses.test_by_trial is not None
            else None,
        )
        self.responses_train, self.responses_val = self.compute_validation_responses()

    @property
    def response_dict(self):
        return {
            "train": torch.tensor(self.responses_train).to(torch.float),
            "validation": torch.tensor(self.responses_val).to(torch.float),
            "test": {
                "avg": torch.tensor(self.responses_test).to(torch.float),
                "by_trial": torch.tensor(self.test_responses_by_trial),
            },
        }

    @no_type_check
    def compute_validation_responses(self) -> tuple[np.ndarray, np.ndarray]:
        if self.stim_id in ["mb", "chirp"]:
            # Chirp and moving bar
            responses_val = np.array(np.nan)
            responses_train = self.responses_train_and_val
        else:
            movie_ordering = (
                np.arange(self.num_clips)
                if (len(self.random_sequences) == 0 or self.scan_sequence_idx is None)
                else self.random_sequences[:, self.scan_sequence_idx]
            )

            # Initialise validation responses

            base_movie_sorting = np.argsort(movie_ordering)

            validation_mask = np.ones_like(self.responses_train_and_val, dtype=bool)
            responses_val = np.zeros([len(self.val_clip_idx) * self.clip_length, self.num_neurons])

            # Compute validation responses and remove sections from training responses

            for i, ind1 in enumerate(self.val_clip_idx):
                grab_index = base_movie_sorting[ind1]
                responses_val[i * self.clip_length : (i + 1) * self.clip_length, :] = self.responses_train_and_val[
                    grab_index * self.clip_length : (grab_index + 1) * self.clip_length,
                    :,
                ]
                validation_mask[
                    (grab_index * self.clip_length) : (grab_index + 1) * self.clip_length,
                    :,
                ] = False

            if self.use_base_sequence:
                # Reorder training responses to use the same "base" sequence, which follows the numbering of clips.
                # This way all training responses are wrt the same order of clips, which can be useful in analysis.
                train_clip_idx = [i for i in range(self.num_clips) if i not in self.val_clip_idx]
                responses_train = np.zeros([len(train_clip_idx) * self.clip_length, self.num_neurons])
                for i, train_idx in enumerate(train_clip_idx):
                    grab_index = base_movie_sorting[train_idx]
                    responses_train[i * self.clip_length : (i + 1) * self.clip_length, :] = (
                        self.responses_train_and_val[
                            grab_index * self.clip_length : (grab_index + 1) * self.clip_length,
                            :,
                        ]
                    )
            else:
                responses_train = self.responses_train_and_val[validation_mask].reshape(-1, self.num_neurons)

        return responses_train, responses_val

    def transform_roi_mask(self, roi_mask):
        roi_coords = np.zeros((len(self.roi_ids), 2))
        for i, roi_id in enumerate(self.roi_ids):
            single_roi_mask = np.zeros_like(roi_mask)
            single_roi_mask[roi_mask == -roi_id] = 1
            roi_coords[i] = self.roi2readout(single_roi_mask)
        return roi_coords

    def roi2readout(
        self,
        single_roi_mask,
        roi_mask_pixelsize=2,
        readout_mask_pixelsize=50,
        x_offset=2.75,
        y_offset=2.75,
    ):
        """
        Maps a roi mask of a single roi from recording coordinates to model
        readout coordinates
        :param single_roi_mask: 2d array with nonzero values indicating the pixels
                of the current roi
        :param roi_mask_pixelsize: size of a pixel in the roi mask in um
        :param readout_mask_pixelsize: size of a pixel in the readout mask in um
        :param x_offset: x offset indicating the start of the recording field in readout mask
        :param y_offset: y offset indicating the start of the recording field in readout mask
        :return:
        """
        pixel_factor = readout_mask_pixelsize / roi_mask_pixelsize
        y, x = np.nonzero(single_roi_mask)
        y_trans, x_trans = y / pixel_factor, x / pixel_factor
        y_trans += y_offset
        x_trans += x_offset
        x_trans = x_trans.mean()
        y_trans = y_trans.mean()
        coords = np.asarray(
            [
                self.map_to_range(max=8, val=y_trans),
                self.map_to_range(max=8, val=x_trans),
            ],
            dtype=np.float32,
        )
        return coords

    def map_to_range(self, max, val):
        val = val / max
        val = val - 0.5
        val = val * 2
        return val

__init__(neural_responses, val_clip_idx, num_clips, clip_length, roi_mask=None, roi_ids=None, scan_sequence_idx=None, random_sequences=None, eye=None, group_assignment=None, key=None, use_base_sequence=False, **kwargs)

Initialize the NeuronData object. Boilerplate class to store neuron data train/test/validation splits before feeding into a dataloader. Customized for compatibility with the data format in Hoefling et al., 2024.

Parameters:

Name Type Description Default
eye str

The eye from which the neuron data is recorded.

None
group_assignment Float[ndarray, n_neurons]

The group assignment of neurons.

None
key dict

The key information for the neuron data, includes date, exp_num, experimenter, field_id, stim_id.

None
responses_final Float[ndarray, 'n_neurons n_timepoints']

The responses of neurons.

required
roi_coords Float[ndarray, 'n_neurons 2']

The coordinates of regions of interest (ROIs).

required
roi_ids Float[ndarray, n_neurons]

The IDs of regions of interest (ROIs).

None
scan_sequence_idx int

The index of the scan sequence.

None
stim_id int

The ID of the stimulus. 5 is mouse natural scenes.

required
random_sequences Float[ndarray, 'n_clips n_sequences']

The random sequences of clips.

None
val_clip_idx List[int]

The indices of validation clips.

required
num_clips int

The number of clips.

required
clip_length int

The length of each clip.

required
use_base_sequence bool

Whether to re-order all training responses to use the same "base" sequence.

False
Source code in openretina/data_io/hoefling_2024/responses.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    neural_responses: ResponsesTrainTestSplit,
    val_clip_idx: Optional[list[int]],
    num_clips: Optional[int],
    clip_length: Optional[int],
    roi_mask: Optional[Float[np.ndarray, "64 64"]] = None,
    roi_ids: Optional[Float[np.ndarray, " n_neurons"]] = None,
    scan_sequence_idx: Optional[int] = None,
    random_sequences: Optional[Float[np.ndarray, "n_clips n_sequences"]] = None,
    eye: Optional[Literal["left", "right"]] = None,
    group_assignment: Optional[Float[np.ndarray, " n_neurons"]] = None,
    key: Optional[dict] = None,
    use_base_sequence: Optional[bool] = False,
    **kwargs,
):
    """
    Initialize the NeuronData object.
    Boilerplate class to store neuron data train/test/validation splits before feeding into a dataloader.
    Customized for compatibility with the data format in Hoefling et al., 2024.

    Args:
        eye (str): The eye from which the neuron data is recorded.
        group_assignment (Float[np.ndarray, "n_neurons"]): The group assignment of neurons.
        key (dict): The key information for the neuron data,
                    includes date, exp_num, experimenter, field_id, stim_id.
        responses_final (Float[np.ndarray, "n_neurons n_timepoints"]): The responses of neurons.
        roi_coords (Float[np.ndarray, "n_neurons 2"]): The coordinates of regions of interest (ROIs).
        roi_ids (Float[np.ndarray, "n_neurons"]): The IDs of regions of interest (ROIs).
        scan_sequence_idx (int): The index of the scan sequence.
        stim_id (int): The ID of the stimulus. 5 is mouse natural scenes.
        random_sequences (Float[np.ndarray, "n_clips n_sequences"]): The random sequences of clips.
        val_clip_idx (List[int]): The indices of validation clips.
        num_clips (int): The number of clips.
        clip_length (int): The length of each clip.
        use_base_sequence (bool): Whether to re-order all training responses to use the same "base" sequence.
    """
    self.neural_responses = neural_responses

    self.num_neurons = self.neural_responses.n_neurons

    self.eye = eye if eye is not None else "right"
    self.group_assignment = group_assignment
    self.key = key
    self.roi_ids = roi_ids
    self.roi_coords = (
        np.array(self.transform_roi_mask(roi_mask), dtype=np.float32) if roi_mask is not None else None
    )
    self.scan_sequence_idx = scan_sequence_idx
    self.stim_id = self.neural_responses.stim_id
    self.clip_length = clip_length
    self.num_clips = num_clips
    self.random_sequences = random_sequences if random_sequences is not None else np.array([])
    self.use_base_sequence = use_base_sequence
    self.val_clip_idx = val_clip_idx

    self.responses_train_and_val, self.responses_test, self.test_responses_by_trial = (
        self.neural_responses.train.T,
        self.neural_responses.test_response.T,
        self.neural_responses.test_by_trial.transpose(0, 2, 1)
        if self.neural_responses.test_by_trial is not None
        else None,
    )
    self.responses_train, self.responses_val = self.compute_validation_responses()

roi2readout(single_roi_mask, roi_mask_pixelsize=2, readout_mask_pixelsize=50, x_offset=2.75, y_offset=2.75)

Maps a roi mask of a single roi from recording coordinates to model readout coordinates :param single_roi_mask: 2d array with nonzero values indicating the pixels of the current roi :param roi_mask_pixelsize: size of a pixel in the roi mask in um :param readout_mask_pixelsize: size of a pixel in the readout mask in um :param x_offset: x offset indicating the start of the recording field in readout mask :param y_offset: y offset indicating the start of the recording field in readout mask :return:

Source code in openretina/data_io/hoefling_2024/responses.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def roi2readout(
    self,
    single_roi_mask,
    roi_mask_pixelsize=2,
    readout_mask_pixelsize=50,
    x_offset=2.75,
    y_offset=2.75,
):
    """
    Maps a roi mask of a single roi from recording coordinates to model
    readout coordinates
    :param single_roi_mask: 2d array with nonzero values indicating the pixels
            of the current roi
    :param roi_mask_pixelsize: size of a pixel in the roi mask in um
    :param readout_mask_pixelsize: size of a pixel in the readout mask in um
    :param x_offset: x offset indicating the start of the recording field in readout mask
    :param y_offset: y offset indicating the start of the recording field in readout mask
    :return:
    """
    pixel_factor = readout_mask_pixelsize / roi_mask_pixelsize
    y, x = np.nonzero(single_roi_mask)
    y_trans, x_trans = y / pixel_factor, x / pixel_factor
    y_trans += y_offset
    x_trans += x_offset
    x_trans = x_trans.mean()
    y_trans = y_trans.mean()
    coords = np.asarray(
        [
            self.map_to_range(max=8, val=y_trans),
            self.map_to_range(max=8, val=x_trans),
        ],
        dtype=np.float32,
    )
    return coords

filter_responses(all_responses, filter_cell_types=False, cell_types_list=None, chirp_qi=0.35, d_qi=0.6, qi_logic='or', filter_counts=True, count_threshold=10, classifier_confidence=0.25, verbose=False)

This function processes the input dictionary of neuron responses, applying various filters to exclude unwanted data based on the provided parameters. It can filter by cell types, quality indices, classifier confidence, and the number of responding cells, while also providing verbose output for tracking the filtering process. Note: default arguments are from the Hoefling et al., 2024 paper.

Parameters:

Name Type Description Default
all_responses Dict[str, dict]

A dictionary containing neuron response data.

required
filter_cell_types bool

Whether to filter by specific cell types. Defaults to False.

False
cell_types_list Optional[List[int] | int]

List or single value of cell types to filter. Defaults to None.

None
chirp_qi float

Quality index threshold for chirp responses. Defaults to 0.35.

0.35
d_qi float

Quality index threshold for d responses. Defaults to 0.6.

0.6
qi_logic Literal['and', 'or']

The logic to combine different quality indices. Defaults to "and".

'or'
filter_counts bool

Whether to filter based on response counts. Defaults to True.

True
count_threshold int

Minimum number of responding cells required. Defaults to 10.

10
classifier_confidence float

Minimum confidence level for classifier responses. Defaults to 0.3.

0.25
verbose bool

If True, prints detailed filtering information. Defaults to False.

False

Returns:

Type Description
dict[str, dict]

Dict[str, dict]: A filtered dictionary of neuron responses that meet the specified criteria.

Source code in openretina/data_io/hoefling_2024/responses.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
def filter_responses(
    all_responses: dict[str, dict],
    filter_cell_types: bool = False,
    cell_types_list: Optional[list[int] | int] = None,
    chirp_qi: float = 0.35,
    d_qi: float = 0.6,
    qi_logic: Literal["and", "or"] = "or",
    filter_counts: bool = True,
    count_threshold: int = 10,
    classifier_confidence: float = 0.25,
    verbose: bool = False,
) -> dict[str, dict]:
    """
    This function processes the input dictionary of neuron responses, applying various filters
    to exclude unwanted data based on the provided parameters. It can filter by cell types,
    quality indices, classifier confidence, and the number of responding cells, while also
    providing verbose output for tracking the filtering process.
    Note: default arguments are from the Hoefling et al., 2024 paper.

    Args:
        all_responses (Dict[str, dict]): A dictionary containing neuron response data.
        filter_cell_types (bool, optional): Whether to filter by specific cell types. Defaults to False.
        cell_types_list (Optional[List[int] | int], optional): List or single value of cell types to filter.
                                                                Defaults to None.
        chirp_qi (float, optional): Quality index threshold for chirp responses. Defaults to 0.35.
        d_qi (float, optional): Quality index threshold for d responses. Defaults to 0.6.
        qi_logic (Literal["and", "or"], optional): The logic to combine different quality indices. Defaults to "and".
        filter_counts (bool, optional): Whether to filter based on response counts. Defaults to True.
        count_threshold (int, optional): Minimum number of responding cells required. Defaults to 10.
        classifier_confidence (float, optional): Minimum confidence level for classifier responses. Defaults to 0.3.
        verbose (bool, optional): If True, prints detailed filtering information. Defaults to False.

    Returns:
        Dict[str, dict]: A filtered dictionary of neuron responses that meet the specified criteria.
    """

    def print_verbose(message):
        if verbose:
            print(message)

    def get_n_neurons(all_responses) -> int:
        return sum(len(field["group_assignment"]) for field in all_responses.values())

    original_neuron_count = get_n_neurons(all_responses)
    print_verbose(f"Original dataset contains {original_neuron_count} neurons over {len(all_responses)} fields")
    print_verbose(" ------------------------------------ ")

    # Filter by cell types
    if filter_cell_types and cell_types_list is not None:
        all_rgcs_responses_ct_filtered = _mask_by_cell_type(all_responses, cell_types_list)
        print_verbose(
            f"Dropped {len(all_responses) - len(all_rgcs_responses_ct_filtered)} fields that did not "
            f"contain the target cell types ({len(all_rgcs_responses_ct_filtered)} remaining)"
        )
        dropped_n_cell_types = original_neuron_count - get_n_neurons(all_rgcs_responses_ct_filtered)
        print_verbose(
            f"Overall, dropped {dropped_n_cell_types} neurons of non-target cell types "
            f"(-{(dropped_n_cell_types) / original_neuron_count:.2%})."
        )
        print_verbose(" ------------------------------------ ")
    else:
        all_rgcs_responses_ct_filtered = deepcopy(all_responses)

    count_before_checks = get_n_neurons(all_rgcs_responses_ct_filtered)

    # Apply quality checks
    all_rgcs_responses_ct_filtered = _apply_qi_mask(
        all_rgcs_responses_ct_filtered, ["d", "chirp"], [d_qi, chirp_qi], qi_logic
    )

    dropped_n_qi = count_before_checks - get_n_neurons(all_rgcs_responses_ct_filtered)

    print_verbose(
        f"Dropped {len(all_responses) - len(all_rgcs_responses_ct_filtered)} fields with quality indices "
        f"below threshold ({len(all_rgcs_responses_ct_filtered)} remaining)"
    )
    print_verbose(
        f"Overall, dropped {dropped_n_qi} neurons over quality checks (-{dropped_n_qi / count_before_checks:.2%})."
    )
    print_verbose(" ------------------------------------ ")

    # Filter by classifier confidence
    if classifier_confidence is not None and classifier_confidence > 0.0:
        all_rgcs_responses_confidence_filtered = _mask_by_classifier_confidence(
            all_rgcs_responses_ct_filtered, classifier_confidence
        )
        dropped_n_classifier = get_n_neurons(all_rgcs_responses_ct_filtered) - get_n_neurons(
            all_rgcs_responses_confidence_filtered
        )
        print_verbose(
            f"Dropped {len(all_rgcs_responses_ct_filtered) - len(all_rgcs_responses_confidence_filtered)} fields with "
            f"classifier confidences below {classifier_confidence}"
        )
        print_verbose(
            f"Overall, dropped {dropped_n_classifier} neurons with classifier confidences below {classifier_confidence}"
            f" (-{dropped_n_classifier / get_n_neurons(all_rgcs_responses_ct_filtered):.2%})."
        )
        print_verbose(" ------------------------------------ ")
    else:
        all_rgcs_responses_confidence_filtered = all_rgcs_responses_ct_filtered

    # Filter by low counts
    if filter_counts:
        all_rgcs_responses = {
            k: v
            for k, v in all_rgcs_responses_confidence_filtered.items()
            if len(v["group_assignment"]) > count_threshold
        }
        dropped_n_counts = get_n_neurons(all_rgcs_responses_confidence_filtered) - get_n_neurons(all_rgcs_responses)
        print_verbose(
            f"Dropped {len(all_rgcs_responses_confidence_filtered) - len(all_rgcs_responses)} fields with less than "
            f"{count_threshold} responding cells ({len(all_rgcs_responses)} remaining)"
        )
        print_verbose(
            f"Overall, dropped {dropped_n_counts} neurons in fields with less than {count_threshold} responding cells "
            f"(-{dropped_n_counts / get_n_neurons(all_rgcs_responses_confidence_filtered):.2%})."
        )
    else:
        all_rgcs_responses = all_rgcs_responses_confidence_filtered

    print_verbose(" ------------------------------------ ")
    print_verbose(
        f"Final dataset contains {get_n_neurons(all_rgcs_responses)} neurons over {len(all_rgcs_responses)} fields"
    )
    final_n_dropped = original_neuron_count - get_n_neurons(all_rgcs_responses)
    print(f"Total number of cells dropped: {final_n_dropped} (-{(final_n_dropped) / original_neuron_count:.2%})")

    # Clean up RAM
    del all_rgcs_responses_ct_filtered
    del all_rgcs_responses_confidence_filtered

    return all_rgcs_responses

upsample_all_responses(data_dict, response_type='natural', trace_type='spikes', d_qi=None, chirp_qi=None, qi_logic='or', scale_traces=1.0, norm_by_std=True)

Converts inferred spikes into final responses by upsampling the traces of all sessions of a given response_type. This is to match the framerate used in the stimulus presentation.

Parameters:

Name Type Description Default
data_dict dict

A dictionary containing the data.

required
response_type str

The type of response. Defaults to "natural".

'natural'

Returns:

Name Type Description
dict

The updated data dictionary with final responses.

Raises: NotImplementedError: If the conversion is not yet implemented for the given response type.

Source code in openretina/data_io/hoefling_2024/responses.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def upsample_all_responses(
    data_dict: dict,
    response_type: Literal["natural", "chirp", "mb"] = "natural",
    trace_type: Literal["spikes", "raw", "preprocessed", "detrended"] = "spikes",
    d_qi: Optional[float] = None,
    chirp_qi: Optional[float] = None,
    qi_logic: Literal["and", "or"] = "or",
    scale_traces: float = 1.0,
    norm_by_std: bool = True,
):
    """
    Converts inferred spikes into final responses by upsampling the traces of all sessions of a given response_type.
    This is to match the framerate used in the stimulus presentation.

    Args:
        data_dict (dict): A dictionary containing the data.
        response_type (str, optional): The type of response. Defaults to "natural".

    Returns:
        dict: The updated data dictionary with final responses.
    Raises:
        NotImplementedError: If the conversion is not yet implemented for the given response type.
    """

    new_data_dict = deepcopy(data_dict)

    stim_id = STIMULI_IDS.get(response_type, None)
    if stim_id is None:
        raise NotImplementedError(f"Conversion not yet implemented for response type {response_type}")

    for field in tqdm(
        new_data_dict.keys(),
        desc=f"Upsampling {response_type} {trace_type} traces to get final responses.",
    ):
        if trace_type == "detrended":
            raw_traces = new_data_dict[field][f"{response_type}_raw_traces"]
            smoothed_traces = new_data_dict[field][f"{response_type}_smoothed_traces"]
            traces = raw_traces - smoothed_traces
        elif trace_type == "spikes":
            try:
                traces = new_data_dict[field][f"{response_type}_inferred_spikes"]
            except KeyError:
                # For new data format
                traces = new_data_dict[field][f"{response_type}_spikes"]
        else:
            traces = new_data_dict[field][f"{response_type}_{trace_type}_traces"]

        triggertimes = new_data_dict[field][f"{response_type}_trigger_times"][0]

        try:
            tracestimes = new_data_dict[field][f"{response_type}_traces_times"]
        except KeyError:
            # New djimaing exports have a different save format for trace_times
            traces_t0 = np.tile(
                np.atleast_1d(new_data_dict[field][f"{response_type}_traces_t0"].squeeze())[:, None],
                (1, traces.shape[1]),
            )
            traces_dt = np.tile(
                np.atleast_1d(new_data_dict[field][f"{response_type}_traces_dt"].squeeze())[:, None],
                (1, traces.shape[1]),
            )
            tracestimes = np.tile(np.arange(traces.shape[1]), reps=(traces.shape[0], 1)) * traces_dt + traces_t0

        upsampled_traces = (
            upsample_traces(
                triggertimes=triggertimes,
                traces=traces,
                tracestimes=tracestimes,
                stim_id=stim_id,
                norm_by_std=norm_by_std,
            )
            * scale_traces
        )

        new_data_dict[field][f"{response_type}_responses_final"] = upsampled_traces

        if "responses_final" in new_data_dict[field]:
            warnings.warn(
                f"You seem to already have computed `responses_final` for a "
                f"stim_id of {new_data_dict[field]['stim_id']}. "
                f"Overwriting with {stim_id} ({response_type})."
            )
        new_data_dict[field]["responses_final"] = upsampled_traces
        new_data_dict[field]["stim_id"] = stim_id

    d_qi = d_qi if d_qi is not None else 0.0
    chirp_qi = chirp_qi if chirp_qi is not None else 0.0
    # Apply quality indices mask only if requested
    if d_qi > 0.0 or chirp_qi > 0.0:
        new_data_dict = _apply_qi_mask(new_data_dict, ["d", "chirp"], [d_qi, chirp_qi], qi_logic)

    return new_data_dict

upsample_traces(triggertimes, traces, tracestimes, stim_id, target_fr=30, norm_by_std=True)

Upsamples the traces based on the stimulus type.

Parameters:

Name Type Description Default
triggertimes list

List of trigger times.

required
traces list

List of traces.

required
tracestimes list

List of trace times.

required
stim_id int

Stimulus ID.

required
target_fr int

Target frame rate for upsampling. Default is 30.

30

Returns:

Type Description
ndarray

numpy.ndarray: Upsampled responses.

Raises:

Type Description
NotImplementedError

If the stimulus ID is not implemented.

Source code in openretina/data_io/hoefling_2024/responses.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def upsample_traces(
    triggertimes,
    traces,
    tracestimes,
    stim_id: int,
    target_fr: int = 30,
    norm_by_std: bool = True,
) -> np.ndarray:
    """
    Upsamples the traces based on the stimulus type.

    Args:
        triggertimes (list): List of trigger times.
        traces (list): List of traces.
        tracestimes (list): List of trace times.
        stim_id (int): Stimulus ID.
        target_fr (int, optional): Target frame rate for upsampling. Default is 30.

    Returns:
        numpy.ndarray: Upsampled responses.

    Raises:
        NotImplementedError: If the stimulus ID is not implemented.
    """
    if stim_id == 1:
        # Chirp: each chirp has two triggers, one at the start and one 5s later, after a 2s OFF and 3s full field ON.
        # We need only the first trigger of each chirp for the upsampling.
        # 32.98999999 is the total chirp duration in seconds. Should be 33 but there is a small discrepancy
        chirp_starts = triggertimes[::2]
        upsampled_triggertimes = _upsample_triggertimes(32.98999999, 33, chirp_starts, target_fr)
    elif stim_id == 2:
        # Moving bar: each bar has one trigger at the start of the bar stim. Bar duration is 4s.
        # It is a bit more because each trigger has a duration of 3 frames at 60Hz, so around 50 ms.
        upsampled_triggertimes = _upsample_triggertimes(4.054001, 4.1, triggertimes, target_fr)
    elif stim_id == 5:
        # Movie stimulus
        # 4.966666 is the time between triggers in the movie stimulus.
        # It is not exactly 5s because it is not a perfect world :)
        upsampled_triggertimes = _upsample_triggertimes(4.9666667, 5, triggertimes, target_fr)
    else:
        raise NotImplementedError(f"Stimulus ID {stim_id} not implemented")

    upsampled_responses = np.zeros((traces.shape[0], len(upsampled_triggertimes)))
    for i in range(traces.shape[0]):
        upsampled_responses[i] = np.interp(upsampled_triggertimes, tracestimes[i].ravel(), traces[i].ravel())

    if norm_by_std:
        upsampled_responses = upsampled_responses / np.std(
            upsampled_responses, axis=1, keepdims=True
        )  # normalize response std

    return upsampled_responses

Constants