Source code for minian.initialization

import functools as fct
import itertools as itt
import os
from typing import Optional, Tuple, Union

import cv2
import dask as da
import dask.array as darr
import networkx as nx
import numpy as np
import pandas as pd
import sparse
import xarray as xr
from scipy.ndimage.measurements import label
from scipy.signal import butter, lfilter
from scipy.sparse import csc_matrix
from scipy.stats import kstest, zscore
from skimage.morphology import disk
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KDTree, radius_neighbors_graph

from .cnmf import adj_corr, filt_fft, graph_optimize_corr, label_connected
from .utilities import local_extreme, med_baseline, save_minian, sps_lstsq


[docs]def seeds_init( varr: xr.DataArray, wnd_size=500, method="rolling", stp_size=200, nchunk=100, max_wnd=10, diff_thres=2, ): """ Generate over-complete set of seeds by finding local maxima across frames. This function computes the maximum intensity projection of a subset of frames and finds the local maxima. The subsetting use either a rolling window or random sampling of frames. `wnd_size` `stp_size` and `nchunk` controls different aspects of the subsetting. `max_wnd` and `diff_thres` controls how local maxima are computed. The set of all local maxima found in this process constitutes an overly-complete set of seeds, representing putative locations of cells. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimensions "frame", "height" and "width". wnd_size : int, optional Number of frames in each chunk, for which a max projection will be calculated. By default `500`. method : str, optional Either `"rolling"` or `"random"`. Controls whether to use rolling window or random sampling of frames to construct chunks. By default `"rolling"`. stp_size : int, optional Number of frames between the center of each chunk when stepping through the data with rolling windows. Only used if `method is "rolling"`. By default `200`. nchunk : int, optional Number of chunks to sample randomly. Only used if `method is "random"`. By default `100`. max_wnd : int, optional Radius (in pixels) of the disk window used for computing local maxima. Local maximas are defined as pixels with maximum intensity in such a window. By default `10`. diff_thres : int, optional Intensity threshold for the difference between local maxima and its neighbours. Any local maxima that is not birghter than its neighbor (defined by the same disk window) by `diff_thres` intensity values will be filtered out. By default `2`. Returns ------- seeds : pd.DataFrame Seeds dataframe with each seed as a row. Has column "height" and "width" which are location of the seeds. Also has column "seeds" which is an integer showing how many chunks where the seed is considered a local maxima. """ int_path = os.environ["MINIAN_INTERMEDIATE"] print("constructing chunks") idx_fm = varr.coords["frame"] nfm = len(idx_fm) if method == "rolling": nstp = np.ceil(nfm / stp_size) + 1 centers = np.linspace(0, nfm - 1, int(nstp)) hwnd = np.ceil(wnd_size / 2) max_idx = list( map( lambda c: slice( int(np.floor(c - hwnd).clip(0)), int(np.ceil(c + hwnd)) ), centers, ) ) elif method == "random": max_idx = [np.random.randint(0, nfm - 1, wnd_size) for _ in range(nchunk)] print("computing max projections") res = [max_proj_frame(varr, cur_idx) for cur_idx in max_idx] max_res = xr.concat(res, "sample") max_res = save_minian(max_res.rename("max_res"), int_path, overwrite=True) print("calculating local maximum") loc_max = xr.apply_ufunc( local_max_roll, max_res, input_core_dims=[["height", "width"]], output_core_dims=[["height", "width"]], vectorize=True, dask="parallelized", output_dtypes=[np.uint8], kwargs=dict(k0=2, k1=max_wnd, diff=diff_thres), ).sum("sample") seeds = ( loc_max.where(loc_max > 0).rename("seeds").to_dataframe().dropna().reset_index() ) return seeds[["height", "width", "seeds"]]
[docs]def max_proj_frame(varr: xr.DataArray, idx: np.ndarray) -> xr.DataArray: """ Compute max projection on a given subset of frames. Parameters ---------- varr : xr.DataArray The input movie data containing all frames. idx : np.ndarray The subset of frames to use to compute max projection. Returns ------- max_proj : xr.DataArray The max projection. """ return varr.isel(frame=idx).max("frame")
[docs]def local_max_roll( fm: np.ndarray, k0: int, k1: int, diff: Union[int, float] ) -> np.ndarray: """ Compute local maxima of a frame with a range of kernel size. This function wraps around :func:`minian.utilities.local_extreme` and compute local maxima of the input frame with kernels of size ranging from `k0` to `k1`. It then takes the union of all the local maxima, and additionally merge all the connecting local maxima by using the middle pixel. Parameters ---------- fm : np.ndarray The input frame. k0 : int The lower bound (inclusive) of the range of kernel sizes. k1 : int The upper bound (inclusive) of the range of kernel sizes. diff : Union[int, float] Intensity threshold for the difference between local maxima and its neighbours, passed to :func:`minian.utilities.local_extreme`. Returns ------- max_res : np.ndarray The image of local maxima. Has same shape as `fm`, and 1 at local maxima. """ max_ls = [] for ksize in range(k0, k1): selem = disk(ksize) fm_max = local_extreme(fm, selem, diff=diff) max_ls.append(fm_max) lmax = (np.stack(max_ls, axis=0).sum(axis=0) > 0).astype(np.uint8) nlab, max_lab = cv2.connectedComponents(lmax) max_res = np.zeros_like(lmax) for lb in range(1, nlab): area = max_lab == lb if np.sum(area) > 1: crds = tuple(int(np.median(c)) for c in np.where(area)) max_res[crds] = 1 else: max_res[np.where(area)] = 1 return max_res
[docs]def gmm_refine( varr: xr.DataArray, seeds: pd.DataFrame, q=(0.1, 99.9), n_components=2, valid_components=1, mean_mask=True, ) -> Tuple[pd.DataFrame, xr.DataArray, GaussianMixture]: """ Filter seeds by fitting a GMM to peak-to-peak values. This function assume that the distribution of peak-to-peak values of fluorescence across all seeds can be model by a Gaussian Mixture Model (GMM) with different means. It computes peak-to-peak value for all the seeds, then fit a GMM with `n_components` to the distribution, and filter out the seeds belonging to the `n_components - valid_components` number of gaussians with lower means. Parameters ---------- varr : xr.DataArray The input movie data. Should have dimension "spatial" and "frame". seeds : pd.DataFrame The input over-complete set of seeds to be filtered. q : tuple, optional Percentile to use to compute the peak-to-peak values. For a given seed with corresponding fluorescent fluctuation `f`, the peak-to-peak value for that seed is computed as `np.percentile(f, q[1]) - np.percentile(f, q[0])`. By default `(0.1, 99.9)`. n_components : int, optional Number of components (Gaussians) in the GMM model. By default `2`. valid_components : int, optional Number of components (Gaussians) to be considered as modeling the distribution of peak-to-peak values of valid seeds. Should be smaller than `n_components`. By default `1`. mean_mask : bool, optional Whether to apply additional criteria where a seed is valid only if its peak-to-peak value exceeds the mean of the lowest gaussian distribution. Only useful in corner cases where the distribution of the gaussian heavily overlap. By default `True`. Returns ------- seeds : pd.DataFrame The resulting seeds dataframe with an additional column "mask_gmm", indicating whether the seed is considered valid by this function. If the column already exists in input `seeds` it will be overwritten. varr_pv : xr.DataArray The computed peak-to-peak values for each seeds. gmm : GaussianMixture The fitted GMM model object. See Also ------- sklearn.mixture.GaussianMixture """ print("selecting seeds") varr_sub = varr.sel(spatial=[tuple(hw) for hw in seeds[["height", "width"]].values]) print("computing peak-valley values") varr_valley = xr.apply_ufunc( np.percentile, varr_sub.chunk(dict(frame=-1)), input_core_dims=[["frame"]], kwargs=dict(q=q[0], axis=-1), dask="parallelized", output_dtypes=[varr_sub.dtype], ) varr_peak = xr.apply_ufunc( np.percentile, varr_sub.chunk(dict(frame=-1)), input_core_dims=[["frame"]], kwargs=dict(q=q[1], axis=-1), dask="parallelized", output_dtypes=[varr_sub.dtype], ) varr_pv = varr_peak - varr_valley varr_pv = varr_pv.compute() print("fitting GMM models") dat = varr_pv.values.reshape(-1, 1) gmm = GaussianMixture(n_components=n_components) gmm.fit(dat) idg = np.argsort(gmm.means_.reshape(-1))[-valid_components:] idx_valid = np.isin(gmm.predict(dat), idg) if mean_mask: idx_mean = dat > np.sort(gmm.means_)[0] idx_valid = np.logical_and(idx_mean.squeeze(), idx_valid) seeds["mask_gmm"] = idx_valid return seeds, varr_pv, gmm
[docs]def pnr_refine( varr: xr.DataArray, seeds: pd.DataFrame, noise_freq=0.25, thres: Union[float, str] = 1.5, q=(0.1, 99.9), med_wnd: Optional[int] = None, ) -> Tuple[pd.DataFrame, xr.DataArray, Optional[GaussianMixture]]: """ Filter seeds by thresholding peak-to-noise ratio. For each input seed, the noise is defined as high-pass filtered fluorescence trace of the seed. The peak-to-noise ratio (pnr) of that seed is then defined as the ratio between the peak-to-peak value of the originial fluorescence trace and that of the noise trace. Optionally, if abrupt changes in baseline fluorescence is expected, then the baseline can be estimated by median-filtering the fluorescence trace and subtracted from the original trace before computing the peak-to-noise ratio. In addition, if a hard threshold of pnr is not desired, then a Gaussian Mixture Model with 2 components can be fitted to the distribution of pnr across all seeds, and only seeds with pnr belonging to the higher-mean Gaussian will be considered valide. Parameters ---------- varr : xr.DataArray Input movie data, should have dimensions "height", "width" and "frame". seeds : pd.DataFrame The input over-complete set of seeds to be filtered. noise_freq : float, optional Cut-off frequency for the high-pass filter used to define noise, specified as fraction of sampling frequency. By default `0.25`. thres : Union[float, str], optional Threshold of the peak-to-noise ratio. If `"auto"` then a :class:`GMM <sklearn.mixture.GaussianMixture>` will be fit to the distribution of pnr. By default `1.5`. q : tuple, optional Percentile to use to compute the peak-to-peak values. For a given fluorescence fluctuation `f`, the peak-to-peak value for that seed is computed as `np.percentile(f, q[1]) - np.percentile(f, q[0])`. By default `(0.1, 99.9)`. med_wnd : int, optional Size of the median filter window to remove baseline. If `None` then no filtering will be done. By default `None`. Returns ------- seeds : pd.DataFrame The resulting seeds dataframe with an additional column "mask_pnr", indicating whether the seed is considered valid by this function. If the column already exists in input `seeds` it will be overwritten. pnr : xr.DataArray The computed peak-to-noise ratio for each seeds. gmm : GaussianMixture, optional The GMM model object fitted to the distribution of pnr. Will be `None` unless `thres` is `"auto"`. """ print("selecting seeds") # vectorized indexing on dask arrays produce a single chunk. # to memory issue, split seeds into 128 chunks, with chunk size no greater than 100 chk_size = min(int(len(seeds) / 128), 100) vsub_ls = [] for _, seed_sub in seeds.groupby(np.arange(len(seeds)) // chk_size): vsub = varr.sel( height=seed_sub["height"].to_xarray(), width=seed_sub["width"].to_xarray() ) vsub_ls.append(vsub) varr_sub = xr.concat(vsub_ls, "index") if med_wnd: print("removing baseline") varr = xr.apply_ufunc( med_baseline, varr_sub, input_core_dims=[["frame"]], output_core_dims=[["frame"]], dask="parallelized", kwargs={"wnd": med_wnd}, vectorize=True, output_dtypes=[varr.dtype], ) print("computing peak-noise ratio") pnr = xr.apply_ufunc( pnr_perseed, varr_sub, input_core_dims=[["frame"]], output_core_dims=[[]], kwargs={"freq": noise_freq, "q": q}, vectorize=True, dask="parallelized", output_dtypes=[float], ).compute() if thres == "auto": gmm = GaussianMixture(n_components=2) gmm.fit(np.nan_to_num(pnr.values.reshape(-1, 1))) idg = np.argsort(gmm.means_.reshape(-1))[-1] idx_valid = np.isin(gmm.predict(pnr.values.reshape(-1, 1)), idg) seeds["mask_pnr"] = idx_valid else: mask = pnr > thres mask_df = mask.to_pandas().rename("mask_pnr") seeds["mask_pnr"] = mask_df gmm = None return seeds, pnr, gmm
[docs]def ptp_q(a: np.ndarray, q: tuple) -> float: """ Compute peak-to-peak value of input with percentile values. Parameters ---------- a : np.ndarray Input array. q : tuple Tuple specifying low and high percentile values. Returns ------- ptp : float The peak-to-peak value. """ return np.percentile(a, q[1]) - np.percentile(a, q[0])
[docs]def pnr_perseed(a: np.ndarray, freq: float, q: tuple) -> float: """ Compute peak-to-noise ratio of a given timeseries. Parameters ---------- a : np.ndarray Input timeseries. freq : float Cut-off frequency of the high-pass filtering used to define noise. q : tuple Percentile used to compute peak-to-peak values. Returns ------- pnr : float Peak-to-noise ratio. See Also ------- pnr_refine : for definition of peak-to-noise ratio """ ptp = ptp_q(a, q) a = filt_fft(a, freq, btype="high") ptp_noise = ptp_q(a, q) return ptp / ptp_noise
[docs]def intensity_refine( varr: xr.DataArray, seeds: pd.DataFrame, thres_mul=2 ) -> pd.DataFrame: """ Filter seeds by thresholding the intensity of their corresponding pixels in the max projection of the movie. This function generate a histogram of the max projection by spliting the intensity into bins of roughly 10 pixels. Then the intensity threshold is defined as the intensity of the peak of the histogram times `thres_mul`. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimensions "height", "width" and "frame". seeds : pd.DataFrame The input over-complete set of seeds to be filtered. thres_mul : int, optional Scalar multiplied to the intensity value corresponding to the peak of max projection histogram. By default `2`, which can be interpreted as "seeds are only valid if they are more than twice as bright as the majority of the pixels". Returns ------- seeds : pd.DataFrame The resulting seeds dataframe with an additional column "mask_int", indicating whether the seed is considered valid by this function. """ try: fm_max = varr.max("frame") except ValueError: print("using input as max projection") fm_max = varr bins = np.around(fm_max.sizes["height"] * fm_max.sizes["width"] / 10).astype(int) hist, edges = np.histogram(fm_max, bins=bins) try: thres = edges[int(np.around(np.argmax(hist) * thres_mul))] except IndexError: print("threshold out of bound, returning input") return seeds mask = (fm_max > thres).stack(spatial=["height", "width"]) mask_df = mask.to_pandas().rename("mask_int").reset_index() seeds = pd.merge(seeds, mask_df, on=["height", "width"], how="left") return seeds
[docs]def ks_refine(varr: xr.DataArray, seeds: pd.DataFrame, sig=0.01) -> pd.DataFrame: """ Filter the seeds using Kolmogorov-Smirnov (KS) test. This function assume that the valid seeds’ fluorescence across frames notionally follows a bimodal distribution: with a large normal distribution representing baseline activity, and a second peak representing when the seed/cell is active. KS allows to discard the seeds where the null-hypothesis (i.e. the fluorescence intensity is simply a normal distribution) is rejected at `sig`. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimensions "height", "width" and "frame". seeds : pd.DataFrame The input over-complete set of seeds to be filtered. sig : float, optional The significance threshold to reject null-hypothesis. By default `0.01`. Returns ------- seeds : pd.DataFrame The resulting seeds dataframe with an additional column "mask_ks", indicating whether the seed is considered valid by this function. If the column already exists in input `seeds` it will be overwritten. """ print("selecting seeds") # vectorized indexing on dask arrays produce a single chunk. # to memory issue, split seeds into 128 chunks, with chunk size no greater than 100 chk_size = min(int(len(seeds) / 128), 100) vsub_ls = [] for _, seed_sub in seeds.groupby(np.arange(len(seeds)) // chk_size): vsub = varr.sel( height=seed_sub["height"].to_xarray(), width=seed_sub["width"].to_xarray() ) vsub_ls.append(vsub) varr_sub = xr.concat(vsub_ls, "index") print("performing KS test") ks = xr.apply_ufunc( ks_perseed, varr_sub, input_core_dims=[["frame"]], output_core_dims=[[]], vectorize=True, dask="parallelized", output_dtypes=[float], ).compute() ks = (ks < sig).to_pandas().rename("mask_ks") seeds["mask_ks"] = ks return seeds
[docs]def ks_perseed(a: np.ndarray) -> float: """ Perform KS test on input and return the p-value. Parameters ---------- a : np.ndarray Input data. Returns ------- p : float The p-value of the KS test. See Also ------- scipy.stats.kstest """ a = zscore(a) return kstest(a, "norm")[1]
[docs]def seeds_merge( varr: xr.DataArray, max_proj: xr.DataArray, seeds: pd.DataFrame, thres_dist=5, thres_corr=0.6, noise_freq: Optional[float] = None, ) -> pd.DataFrame: """ Merge seeds based on spatial distance and temporal correlation of their activities. This function build an adjacency matrix by thresholding spatial distance between seeds and temporal correlation between activities of seeds. It then merge seeds using the adjacency matrix by only keeping the seed with maximum intensity in the max projection within each connected group of seeds. The merge is therefore transitive. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimension "height", "width" and "frame". max_proj : xr.DataArray Max projection of the movie data. seeds : pd.DataFrame Dataframe of seeds to be merged. thres_dist : int, optional Threshold of distance between seeds in pixel. By default `5`. thres_corr : float, optional Threshold of temporal correlation between activities of seeds. By default `0.6`. noise_freq : float, optional Cut-off frequency for optional smoothing of activities before computing the correlation. If `None` then no smoothing will be done. By default `None`. Returns ------- seeds : pd.DataFrame The resulting seeds dataframe with an additional column "mask_mrg", indicating whether the seed should be kept after the merge. If the column already exists in input `seeds` it will be overwritten. """ print("computing distance") nng = radius_neighbors_graph(seeds[["height", "width"]], thres_dist) print("computing correlations") adj = adj_corr(varr, nng, seeds[["height", "width"]], noise_freq) print("merging seeds") adj = adj > thres_corr adj = adj + adj.T labels = label_connected(adj, only_connected=True) iso = np.where(labels < 0)[0] seeds_final = set(iso.tolist()) for cur_cmp in np.unique(labels): if cur_cmp < 0: continue cur_smp = np.where(labels == cur_cmp)[0] cur_max = np.array( [ max_proj.sel( height=seeds.iloc[s]["height"], width=seeds.iloc[s]["width"] ) for s in cur_smp ] ) max_seed = cur_smp[np.argmax(cur_max)] seeds_final.add(max_seed) seeds["mask_mrg"] = False seeds.loc[list(seeds_final), "mask_mrg"] = True return seeds
[docs]def initA( varr: xr.DataArray, seeds: pd.DataFrame, thres_corr=0.8, wnd=10, noise_freq: Optional[float] = None, ) -> xr.DataArray: """ Initialize spatial footprints from seeds. For each input seed, this function compute the correlation between the fluorescence activity of the seed and those of its neighboring pixels up to `wnd` pixels. It then set all correlation below `thres_corr` to zero, and use the resulting correlation image as the resutling spatial footprint of the seed. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimension "height", "width" and "frame". seeds : pd.DataFrame Dataframe of seeds. thres_corr : float, optional Threshold of correlation, below which the values will be set to zero in the resulting spatial footprints. By default `0.8`. wnd : int, optional Radius (in pixels) of a disk window within which correlation will be computed for each seed. By default `10`. noise_freq : float, optional Cut-off frequency for optional smoothing of activities before computing the correlation. If `None` then no smoothing will be done. By default `None`. Returns ------- A : xr.DataArray The initial estimation of spatial footprint for each cell. Should have dimensions ("unit_id", "height", "width"). See Also ------- minian.cnmf.graph_optimize_corr : for how the correlation are computed in an out-of-core fashion """ print("optimizing computation graph") nod_df = pd.DataFrame( np.array( list(itt.product(varr.coords["height"].values, varr.coords["width"].values)) ), columns=["height", "width"], ).merge(seeds.reset_index(), how="outer", on=["height", "width"]) seed_df = nod_df[nod_df["index"].notnull()] nn_tree = KDTree(nod_df[["height", "width"]], leaf_size=(2 * wnd) ** 2) nns_arr = nn_tree.query_radius(seed_df[["height", "width"]], r=wnd) sdg = nx.Graph() sdg.add_nodes_from( [ (i, d) for i, d in enumerate( nod_df[["height", "width", "index"]].to_dict("records") ) ] ) for isd, nns in enumerate(nns_arr): cur_sd = seed_df.index[isd] sdg.add_edges_from([(cur_sd, n) for n in nns if n != cur_sd]) sdg.remove_nodes_from(list(nx.isolates(sdg))) sdg = nx.convert_node_labels_to_integers(sdg) corr_df = graph_optimize_corr(varr, sdg, noise_freq) print("building spatial matrix") corr_df = corr_df[corr_df["corr"] > thres_corr] nod_df = pd.DataFrame.from_dict(dict(sdg.nodes(data=True)), orient="index") seed_df = nod_df[nod_df["index"].notnull()].astype({"index": int}) A_ls = [] ih_dict = ( varr.coords["height"] .to_series() .reset_index(drop=True) .reset_index() .set_index("height")["index"] .to_dict() ) iw_dict = ( varr.coords["width"] .to_series() .reset_index(drop=True) .reset_index() .set_index("width")["index"] .to_dict() ) Ashape = (varr.sizes["height"], varr.sizes["width"]) for seed_id, sd in seed_df.iterrows(): src_corr = corr_df[corr_df["target"] == seed_id].copy() src_nods = nod_df.loc[src_corr["source"]] src_corr["height"], src_corr["width"] = ( src_nods["height"].values, src_nods["width"].values, ) tgt_corr = corr_df[corr_df["source"] == seed_id].copy() tgt_nods = nod_df.loc[tgt_corr["target"]] tgt_corr["height"], tgt_corr["width"] = ( tgt_nods["height"].values, tgt_nods["width"].values, ) cur_corr = pd.concat([src_corr, tgt_corr]).append( {"corr": 1, "height": sd["height"], "width": sd["width"]}, ignore_index=True ) cur_corr["iheight"] = cur_corr["height"].map(ih_dict) cur_corr["iwidth"] = cur_corr["width"].map(iw_dict) cur_A = darr.array( sparse.COO( cur_corr[["iheight", "iwidth"]].T, cur_corr["corr"], shape=Ashape ) ) A_ls.append(cur_A) A = xr.DataArray( darr.stack(A_ls).map_blocks(lambda a: a.todense(), dtype=float), dims=["unit_id", "height", "width"], coords={ "unit_id": seed_df["index"].values, "height": varr.coords["height"].values, "width": varr.coords["width"].values, }, ) return A
[docs]def initC(varr: xr.DataArray, A: xr.DataArray) -> xr.DataArray: """ Initialize temporal component given spatial footprints. The temporal component is computed as the least-square solution between the input movie and the spatial footprints over the "height" and "width" dimensions. Parameters ---------- varr : xr.DataArray Input movie data. Should have dimensions ("height", "width", "frame"). A : xr.DataArray Spatial footprints of cells. Should have dimensions ("unit_id", "height", "width"). Returns ------- C : xr.DataArray The initial estimation of temporal components for each cell. Should have dimensions ("unit_id", "frame"). """ uids = A.coords["unit_id"] fms = varr.coords["frame"] A = ( A.stack(spatial=["height", "width"]) .transpose("spatial", "unit_id") .data.map_blocks(csc_matrix) .rechunk(-1) .persist() ) varr = varr.stack(spatial=["height", "width"]).transpose("frame", "spatial").data C = sps_lstsq(A, varr, iter_lim=10) C = xr.DataArray( C, dims=["frame", "unit_id"], coords={"unit_id": uids, "frame": fms} ).transpose("unit_id", "frame") return C
[docs]@darr.as_gufunc(signature="(h, w)->(h, w)", output_dtypes=int, allow_rechunk=True) def da_label(im: np.ndarray) -> np.ndarray: """ Label connected features in a 2d array. Parameters ---------- im : np.ndarray Input array. Returns ------- label : np.ndarray Label array. Should have same shape as input `im`. See Also ------- scipy.ndimage.label """ return label(im)[0]