minian.utilities module

class minian.utilities.TaskAnnotation[source]

Bases: distributed.diagnostics.plugin.SchedulerPlugin

Custom SchedulerPlugin that implemented per-task level annotation. The annotations are applied according to the module constant ANNOTATIONS.

update_graph(scheduler, client, tasks, **kwargs)[source]

Run when a new graph / tasks enter the scheduler

minian.utilities.check_key(key, pat)[source]

Check whether key contains pattern.

Parameters
  • key (Union[str, tuple]) – Input key. If a tuple then the first element will be used to check.

  • pat (str) – Pattern to check.

Returns

bool – Whether key contains pattern.

minian.utilities.check_pat(key, pat_ls)[source]

Check whether key contains any pattern in a list.

Parameters
  • key (Union[str, tuple]) – Input key. If a tuple then the first element will be used to check.

  • pat_ls (List[str]) – List of pattern to check.

Returns

bool – Whether key contains any pattern in the list.

minian.utilities.custom_arr_optimize(dsk, keys, fast_funcs=[<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_slice>, <function _vindex_merge>, <function _vindex_transpose>], inline_patterns=[], rename_dict=None, rewrite_dict=None, keep_patterns=[])[source]

Customized implementation of array optimization function.

Parameters
  • dsk (dict) – Input dask task graph.

  • keys (list) – Output task keys.

  • fast_funcs (list, optional) – List of fast functions to be inlined. By default FAST_FUNCTIONS.

  • inline_patterns (list, optional) – List of patterns of task keys to be inlined. By default [].

  • rename_dict (dict, optional) – Dictionary mapping old task keys to new ones. Only used during fusing of tasks. By default None.

  • rewrite_dict (dict, optional) – Dictionary mapping old task key substrings to new ones. Applied at the end of optimization to all task keys. By default None.

  • keep_patterns (list, optional) – List of patterns of task keys that should be preserved during optimization. By default [].

Returns

dsk (dict) – Optimized dask graph.

See also

Optimization dask.array.optimization.optimize

minian.utilities.custom_delay_optimize(dsk, keys, fast_functions=[], inline_patterns=[], **kwargs)[source]

Custom optimization functions for delayed tasks.

By default only fusing of tasks will be carried out.

Parameters
  • dsk (dict) – Input dask task graph.

  • keys (list) – Output task keys.

  • fast_functions (list, optional) – List of fast functions to be inlined. By default [].

  • inline_patterns (list, optional) – List of patterns of task keys to be inlined. By default [].

Returns

dsk (dict) – Optimized dask graph.

minian.utilities.custom_fused_keys_renamer(keys, max_fused_key_length=120, rename_dict=None)[source]

Custom implmentation to create new keys for fuse tasks.

Uses custom split_key implementation.

Parameters
  • keys (list) – List of task keys that should be fused together.

  • max_fused_key_length (int, optional) – Used to limit the maximum string length for each renamed key. If None, there is no limit. By default 120.

  • rename_dict (dict, optional) – Dictionary used to rename keys during fuse. By default None.

Returns

fused_key (str) – The fused task key.

minian.utilities.factors(x)[source]

Compute all factors of an interger.

Parameters

x (int) – Input

Returns

factors (List[int]) – List of factors of x.

minian.utilities.get_chk(arr)[source]

Get chunks of a xr.DataArray.

Parameters

arr (xr.DataArray) – The input xr.DataArray

Returns

chk (dict) – Dictionary mapping dimension names to chunks.

minian.utilities.get_chunksize(arr)[source]

Get chunk size of a xr.DataArray.

Parameters

arr (xr.DataArray) – The input xr.DataArray.

Returns

chk (dict) – Dictionary mapping dimension names to chunk sizes.

minian.utilities.get_keys_pat(pat, keys, return_all=False)[source]

Filter a list of task keys by pattern.

Parameters
  • pat (str) – Pattern to check.

  • keys (list) – List of keys to be filtered.

  • return_all (bool, optional) – Whether to return all keys matching pat. If False then only the first match will be returned. By default False.

Returns

keys (Union[list, str]) – If return_all is True then a list of keys will be returned. Otherwise only one key will be returned.

minian.utilities.get_optimal_chk(arr, dim_grp=[('frame'), ('height', 'width')], csize=256, dtype=None)[source]

Compute the optimal chunk size across all dimensions of the input array.

This function use dask autochunking mechanism to determine the optimal chunk size of an array. The difference between this and directly using “auto” as chunksize is that it understands which dimensions are usually chunked together with the help of dim_grp. It also support computing chunks for custom dtype and explicit requirement of chunk size.

Parameters
  • arr (xr.DataArray) – The input array to estimate for chunk size.

  • dim_grp (list, optional) – List of tuples specifying which dimensions are usually chunked together during computation. For each tuple in the list, it is assumed that only dimensions in the tuple will be chunked while all other dimensions in the input arr will not be chunked. Each dimensions in the input arr should appear once and only once across the list. By default [(“frame”,), (“height”, “width”)].

  • csize (int, optional) – The desired space each chunk should occupy, specified in MB. By default 256.

  • dtype (type, optional) – The datatype of arr during actual computation in case that will be different from the current arr.dtype. By default None.

Returns

chk (dict) – Dictionary mapping dimension names to chunk sizes.

minian.utilities.inline_pattern(dsk, pat_ls, inline_constants)[source]

Inline tasks whose keys match certain patterns.

Parameters
  • dsk (dict) – Input dask graph.

  • pat_ls (List[str]) – List of patterns to check.

  • inline_constants (bool) – Whether to inline constants.

Returns

dsk (dict) – Dask graph with keys inlined.

minian.utilities.load_avi_ffmpeg(fname, h, w, f)[source]

Load an avi video using ffmpeg.

This function directly invoke ffmpeg using the python-ffmpeg wrapper and retrieve the data from buffer.

Parameters
  • fname (str) – The filename of the video to load.

  • h (int) – The height of the video.

  • w (int) – The width of the video.

  • f (int) – The number of frames in the video.

Returns

arr (np.ndarray) – The resulting array. Has shape (f, h, w).

minian.utilities.load_avi_lazy(fname)[source]

Lazy load an avi video.

This function construct a single delayed task for loading the video as a whole.

Parameters

fname (str) – The filename of the video to load.

Returns

arr (darr.array) – The array representation of the video.

minian.utilities.load_tif_lazy(fname)[source]

Lazy load a tif stack of images.

Parameters

fname (str) – The filename of the tif stack to load.

Returns

arr (darr.array) – Resulting dask array representation of the tif stack.

minian.utilities.load_tif_perframe(fname, fid)[source]

Load a single image from a tif stack.

Parameters
  • fname (str) – The filename of the tif stack.

  • fid (int) – The index of the image to load.

Returns

arr (np.ndarray) – Array representation of the image.

minian.utilities.load_videos(vpath, pattern='msCam[0-9]+\\.avi$', dtype=<class 'numpy.float64'>, downsample=None, downsample_strategy='subset', post_process=None)[source]

Load multiple videos in a folder and return a xr.DataArray.

Load videos from the folder specified in vpath and according to the regex pattern, then concatenate them together and return a xr.DataArray representation of the concatenated videos. The videos are sorted by filenames with natsort.natsorted() before concatenation. Optionally the data can be downsampled, and the user can pass in a custom callable to post-process the result.

Parameters
  • vpath (str) – The path containing the videos to load.

  • pattern (regexp, optional) – The regexp matching the filenames of the videso. By default r”msCam[0-9]+.avi$”, which can be interpreted as filenames starting with “msCam” followed by at least a number, and then followed by “.avi”.

  • dtype (Union[str, type], optional) – Datatype of the resulting DataArray, by default np.float64.

  • downsample (dict, optional) – A dictionary mapping dimension names to an integer downsampling factor. The dimension names should be one of “height”, “width” or “frame”. By default None.

  • downsample_strategy (str, optional) – How the downsampling should be done. Only used if downsample is not None. Either “subset” where data points are taken at an interval specified in downsample, or “mean” where mean will be taken over data within each interval. By default “subset”.

  • post_process (Callable, optional) – An user-supplied custom function to post-process the resulting array. Four arguments will be passed to the function: the resulting DataArray varr, the input path vpath, the list of matched video filenames vlist, and the list of DataArray before concatenation varr_list. The function should output another valide DataArray. In other words, the function should have signature f(varr: xr.DataArray, vpath: str, vlist: List[str], varr_list: List[xr.DataArray]) -> xr.DataArray. By default None

Returns

varr (xr.DataArray) – The resulting array representation of the input movie. Should have dimensions (“frame”, “height”, “width”).

Raises
  • FileNotFoundError – if no files under vpath match the pattern pattern

  • ValueError – if the matched files does not have extension “.avi”, “.mkv” or “.tif”

  • NotImplementedError – if downsample_strategy is not “subset” or “mean”

minian.utilities.local_extreme(fm, k, etype='max', diff=0)[source]

Find local extreme of a 2d array.

Parameters
  • fm (np.ndarray) – The input 2d array.

  • k (np.ndarray) – Structuring element defining the locality of the result, passed as kernel to cv2.erode() and cv2.dilate().

  • etype (str, optional) – Type of local extreme. Either “min” or “max”. By default “max”.

  • diff (int, optional) – Threshold of difference between local extreme and its neighbours. By default 0.

Returns

fm_ext (np.ndarray) – The returned 2d array whose non-zero elements represent the location of local extremes.

Raises

ValueError – if etype is not “min” or “max”

minian.utilities.med_baseline(a, wnd)[source]

Subtract baseline from a timeseries as estimated by median-filtering the timeseries.

Parameters
  • a (np.ndarray) – Input timeseries.

  • wnd (int) – Window size of the median filter. This parameter is passed as size to scipy.ndimage.filters.median_filter().

Returns

a (np.ndarray) – Timeseries with baseline subtracted.

minian.utilities.open_minian(dpath, post_process=None, return_dict=False)[source]

Load an existing minian dataset.

If dpath is a file, then it is assumed that the full dataset is saved as a single file, and this function will directly call xarray.open_dataset() on dpath. Otherwise if dpath is a directory, then it is assumed that the dataset is saved as a directory of zarr arrays, as produced by save_minian(). This function will then iterate through all the directories under input dpath and load them as xr.DataArray with zarr backend, so it is important that the user make sure every directory under dpath can be load this way. The loaded arrays will be combined as either a xr.Dataset or a dict. Optionally a user-supplied custom function can be used to post process the resulting xr.Dataset.

Parameters
  • dpath (str) – The path to the minian dataset that should be loaded.

  • post_process (Callable, optional) – User-supplied function to post process the dataset. Only used if return_dict is False. Two arguments will be passed to the function: the resulting dataset ds and the data path dpath. In other words the function should have signature f(ds: xr.Dataset, dpath: str) -> xr.Dataset. By default None.

  • return_dict (bool, optional) – Whether to combine the DataArray as dictionary, where the .name attribute will be used as key. Otherwise the DataArray will be combined using xr.merge(…, compat=”no_conflicts”), which will implicitly align the DataArray over all dimensions, so it is important to make sure the coordinates are compatible and will not result in creation of large NaN-padded results. Only used if dpath is a directory, otherwise a xr.Dataset is always returned. By default False.

Returns

ds (Union[dict, xr.Dataset]) – The resulting dataset. If return_dict is True it will be a dict, otherwise a xr.Dataset.

See also

xarray.open_zarr

for how each directory will be loaded as xr.DataArray

xarray.merge

for how the xr.DataArray will be merged as xr.Dataset

minian.utilities.open_minian_mf(dpath, index_dims, result_format='xarray', pattern='minian$', sub_dirs=[], exclude=True, **kwargs)[source]

Open multiple minian datasets across multiple directories.

This function recursively walks through directories under dpath and try to load minian datasets from all directories matching pattern. It will then combine them based on index_dims into either a xr.Dataset object or a pd.DataFrame. Optionally a subset of paths can be specified, so that they can either be excluded or white-listed. Additional keyword arguments will be passed directly to open_minian().

Parameters
  • dpath (str) – The root folder containing all datasets to be loaded.

  • index_dims (List[str]) – List of dimensions that can be used to index and merge multiple datasets. All loaded datasets should have unique coordinates in the listed dimensions.

  • result_format (str, optional) – If “xarray”, the result will be merged together recursively along each dimensions listed in index_dims. Users should make sure the coordinates are compatible and the merging will not cause generation of large NaN-padded results. If “pandas”, then a pd.DataFrame is returned, with columns corresponding to index_dims uniquely identify each dataset, and an additional column named “minian” of object dtype pointing to the loaded minian dataset objects. By default “xarray”.

  • pattern (regexp, optional) – Pattern of minian dataset directory names. By default r”minian$”.

  • sub_dirs (List[str], optional) – A list of sub-directories under dpath. Useful if only a subset of datasets under dpath should be recursively loaded. By default [].

  • exclude (bool, optional) – Whether to exclude directories listed under sub_dirs. If True, then any minian datasets under those specified in sub_dirs will be ignored. If False, then only the datasets under those specified in sub_dirs will be loaded (they still have to be under dpath though). by default True.

Returns

ds (Union[xr.Dataset, pd.DataFrame]) – The resulting combined datasets. If result_format is “xarray”, then a xr.Dataset will be returned, otherwise a pd.DataFrame will be returned.

Raises

NotImplementedError – if result_format is not “xarray” or “pandas”

minian.utilities.optimize_chunk(arr, chk)[source]

Rechunk a xr.DataArray with constrained “rechunk-merge” tasks.

Parameters
  • arr (xr.DataArray) – The array to be rechunked.

  • chk (dict) – The desired chunk size.

Returns

arr_chk (xr.DataArray) – The rechunked array.

minian.utilities.rechunk_like(x, y)[source]

Rechunk the input x such that its chunks are compatible with y.

Parameters
Returns

x_chk (xr.DataArray) – The rechunked x.

minian.utilities.rewrite_key(key, rwdict)[source]

Rewrite a task key according to rwdict.

Parameters
  • key (Union[str, tuple]) – Input task key.

  • rwdict (dict) – Dictionary mapping old task key substring to new ones. All keys in this dictionary that exists in input key will be substituted.

Returns

key (str) – The new key.

Raises

ValueError – if input key is neither str or tuple

minian.utilities.save_minian(var, dpath, meta_dict=None, overwrite=False, chunks=None, compute=True, mem_limit='500MB')[source]

Save a xr.DataArray with zarr storage backend following minian conventions.

This function will store arbitrary xr.DataArray into dpath with zarr backend. A separate folder will be created under dpath, with folder name var.name + “.zarr”. Optionally metadata can be retrieved from directory hierarchy and added as coordinates of the xr.DataArray. In addition, an on-disk rechunking of the result can be performed using rechunker.rechunk() if chunks are given.

Parameters
  • var (xr.DataArray) – The array to be saved.

  • dpath (str) – The path to the minian dataset directory.

  • meta_dict (dict, optional) – How metadata should be retrieved from directory hierarchy. The keys should be negative integers representing directory level relative to dpath (so -1 means the immediate parent directory of dpath), and values should be the name of dimensions represented by the corresponding level of directory. The actual coordinate value of the dimensions will be the directory name of corresponding level. By default None.

  • overwrite (bool, optional) – Whether to overwrite the result on disk. By default False.

  • chunks (dict, optional) – A dictionary specifying the desired chunk size. The chunk size should be specified using Chunks convention, except the “auto” specifiication is not supported. The rechunking operation will be carried out with on-disk algorithms using rechunker.rechunk(). By default None.

  • compute (bool, optional) – Whether to compute var and save it immediately. By default True.

  • mem_limit (str, optional) – The memory limit for the on-disk rechunking algorithm, passed to rechunker.rechunk(). Only used if chunks is not None. By default “500MB”.

Returns

var (xr.DataArray) – The array representation of saving result. If compute is True, then the returned array will only contain delayed task of loading the on-disk zarr arrays. Otherwise all computation leading to the input var will be preserved in the result.

Examples

The following will save the variable var to directory /spatial_memory/alpha/learning1/minian/important_array.zarr, with the additional coordinates: {“session”: “learning1”, “animal”: “alpha”, “experiment”: “spatial_memory”}.

>>> save_minian(
...     var.rename("important_array"),
...     "/spatial_memory/alpha/learning1/minian",
...     {-1: "session", -2: "animal", -3: "experiment"},
... ) 
minian.utilities.split_key(key, rename_dict=None)[source]

Split, rename and filter task keys.

This is custom implementation that only keeps keys found in ANNOTATIONS.

Parameters
  • key (Union[tuple, str]) – The input task key.

  • rename_dict (dict, optional) – Dictionary used to rename keys. By default None.

Returns

new_key (str) – New key.

minian.utilities.unique_keys(keys)[source]

Returns only unique keys in a list of task keys.

Dask task keys regarding arrays are usually tuples representing chunked operations. This function ignore different chunks and only return unique keys.

Parameters

keys (list) – List of dask keys.

Returns

unique (np.ndarray) – Unique keys.

minian.utilities.xrconcat_recursive(var, dims)[source]

Recursively concatenate xr.DataArray over multiple dimensions.

Parameters
  • var (Union[dict, list]) – Either a dict or a list of xr.DataArray to be concatenated. If a dict then keys should be tuple, with length same as the length of dims and values corresponding to the coordinates that uniquely identify each xr.DataArray. If a list then each xr.DataArray should contain valid coordinates for each dimensions specified in dims.

  • dims (List[str]) – Dimensions to be concatenated over.

Returns

ds (xr.Dataset) – The concatenated dataset.

Raises

NotImplementedError – if input var is neither a dict nor a list

minian.utilities.ANNOTATIONS = {'est_motion_chunk': {'resources': {'MEM': 1}}, 'from-zarr-store': {'resources': {'MEM': 1}}, 'ks_perseed': {'resources': {'MEM': 0.5}}, 'load_avi_ffmpeg': {'resources': {'MEM': 1}}, 'merge_restricted': {'resources': {'MEM': 1}}, 'pnr_perseed': {'resources': {'MEM': 0.5}}, 'smooth_corr': {'resources': {'MEM': 1}}, 'tensordot_restricted': {'resources': {'MEM': 1}}, 'transform_perframe': {'resources': {'MEM': 0.5}}, 'update_spatial_block': {'resources': {'MEM': 1}}, 'update_temporal_block': {'resources': {'MEM': 1}}, 'vectorize_noise_fft': {'resources': {'MEM': 1}}, 'vectorize_noise_welch': {'resources': {'MEM': 1}}}

Dask annotations that should be applied to each task.

This is a dict mapping task names (actually patterns) to a dict of dask annotations that should be applied to the tasks. It is mainly used to constrain number of tasks that can be concurrently in memory for each worker.

See also

Worker Resources

minian.utilities.FAST_FUNCTIONS = [<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_slice>, <function _vindex_merge>, <function _vindex_transpose>]

List of fast functions that should be inlined during optimization.

See also

Optimization