minian.utilities module¶
-
class
minian.utilities.
TaskAnnotation
[source]¶ Bases:
distributed.diagnostics.plugin.SchedulerPlugin
Custom SchedulerPlugin that implemented per-task level annotation. The annotations are applied according to the module constant
ANNOTATIONS
.
-
minian.utilities.
custom_arr_optimize
(dsk, keys, fast_funcs=[<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_slice>, <function _vindex_merge>, <function _vindex_transpose>], inline_patterns=[], rename_dict=None, rewrite_dict=None, keep_patterns=[])[source]¶ Customized implementation of array optimization function.
- Parameters
dsk (dict) – Input dask task graph.
keys (list) – Output task keys.
fast_funcs (list, optional) – List of fast functions to be inlined. By default
FAST_FUNCTIONS
.inline_patterns (list, optional) – List of patterns of task keys to be inlined. By default [].
rename_dict (dict, optional) – Dictionary mapping old task keys to new ones. Only used during fusing of tasks. By default None.
rewrite_dict (dict, optional) – Dictionary mapping old task key substrings to new ones. Applied at the end of optimization to all task keys. By default None.
keep_patterns (list, optional) – List of patterns of task keys that should be preserved during optimization. By default [].
- Returns
dsk (dict) – Optimized dask graph.
See also
Optimization dask.array.optimization.optimize
-
minian.utilities.
custom_delay_optimize
(dsk, keys, fast_functions=[], inline_patterns=[], **kwargs)[source]¶ Custom optimization functions for delayed tasks.
By default only fusing of tasks will be carried out.
- Parameters
- Returns
dsk (dict) – Optimized dask graph.
-
minian.utilities.
custom_fused_keys_renamer
(keys, max_fused_key_length=120, rename_dict=None)[source]¶ Custom implmentation to create new keys for fuse tasks.
Uses custom split_key implementation.
- Parameters
- Returns
fused_key (str) – The fused task key.
See also
-
minian.utilities.
factors
(x)[source]¶ Compute all factors of an interger.
- Parameters
x (int) – Input
- Returns
factors (List[int]) – List of factors of x.
-
minian.utilities.
get_chk
(arr)[source]¶ Get chunks of a xr.DataArray.
- Parameters
arr (xr.DataArray) – The input xr.DataArray
- Returns
chk (dict) – Dictionary mapping dimension names to chunks.
-
minian.utilities.
get_chunksize
(arr)[source]¶ Get chunk size of a xr.DataArray.
- Parameters
arr (xr.DataArray) – The input xr.DataArray.
- Returns
chk (dict) – Dictionary mapping dimension names to chunk sizes.
-
minian.utilities.
get_keys_pat
(pat, keys, return_all=False)[source]¶ Filter a list of task keys by pattern.
- Parameters
- Returns
keys (Union[list, str]) – If return_all is True then a list of keys will be returned. Otherwise only one key will be returned.
-
minian.utilities.
get_optimal_chk
(arr, dim_grp=[('frame'), ('height', 'width')], csize=256, dtype=None)[source]¶ Compute the optimal chunk size across all dimensions of the input array.
This function use dask autochunking mechanism to determine the optimal chunk size of an array. The difference between this and directly using “auto” as chunksize is that it understands which dimensions are usually chunked together with the help of dim_grp. It also support computing chunks for custom dtype and explicit requirement of chunk size.
- Parameters
arr (xr.DataArray) – The input array to estimate for chunk size.
dim_grp (list, optional) – List of tuples specifying which dimensions are usually chunked together during computation. For each tuple in the list, it is assumed that only dimensions in the tuple will be chunked while all other dimensions in the input arr will not be chunked. Each dimensions in the input arr should appear once and only once across the list. By default [(“frame”,), (“height”, “width”)].
csize (int, optional) – The desired space each chunk should occupy, specified in MB. By default 256.
dtype (type, optional) – The datatype of arr during actual computation in case that will be different from the current arr.dtype. By default None.
- Returns
chk (dict) – Dictionary mapping dimension names to chunk sizes.
-
minian.utilities.
inline_pattern
(dsk, pat_ls, inline_constants)[source]¶ Inline tasks whose keys match certain patterns.
- Parameters
- Returns
dsk (dict) – Dask graph with keys inlined.
See also
-
minian.utilities.
load_avi_ffmpeg
(fname, h, w, f)[source]¶ Load an avi video using ffmpeg.
This function directly invoke ffmpeg using the python-ffmpeg wrapper and retrieve the data from buffer.
-
minian.utilities.
load_avi_lazy
(fname)[source]¶ Lazy load an avi video.
This function construct a single delayed task for loading the video as a whole.
- Parameters
fname (str) – The filename of the video to load.
- Returns
arr (darr.array) – The array representation of the video.
-
minian.utilities.
load_tif_lazy
(fname)[source]¶ Lazy load a tif stack of images.
- Parameters
fname (str) – The filename of the tif stack to load.
- Returns
arr (darr.array) – Resulting dask array representation of the tif stack.
-
minian.utilities.
load_videos
(vpath, pattern='msCam[0-9]+\\.avi$', dtype=<class 'numpy.float64'>, downsample=None, downsample_strategy='subset', post_process=None)[source]¶ Load multiple videos in a folder and return a xr.DataArray.
Load videos from the folder specified in vpath and according to the regex pattern, then concatenate them together and return a xr.DataArray representation of the concatenated videos. The videos are sorted by filenames with
natsort.natsorted()
before concatenation. Optionally the data can be downsampled, and the user can pass in a custom callable to post-process the result.- Parameters
vpath (str) – The path containing the videos to load.
pattern (regexp, optional) – The regexp matching the filenames of the videso. By default r”msCam[0-9]+.avi$”, which can be interpreted as filenames starting with “msCam” followed by at least a number, and then followed by “.avi”.
dtype (Union[str, type], optional) – Datatype of the resulting DataArray, by default np.float64.
downsample (dict, optional) – A dictionary mapping dimension names to an integer downsampling factor. The dimension names should be one of “height”, “width” or “frame”. By default None.
downsample_strategy (str, optional) – How the downsampling should be done. Only used if downsample is not None. Either “subset” where data points are taken at an interval specified in downsample, or “mean” where mean will be taken over data within each interval. By default “subset”.
post_process (Callable, optional) – An user-supplied custom function to post-process the resulting array. Four arguments will be passed to the function: the resulting DataArray varr, the input path vpath, the list of matched video filenames vlist, and the list of DataArray before concatenation varr_list. The function should output another valide DataArray. In other words, the function should have signature f(varr: xr.DataArray, vpath: str, vlist: List[str], varr_list: List[xr.DataArray]) -> xr.DataArray. By default None
- Returns
varr (xr.DataArray) – The resulting array representation of the input movie. Should have dimensions (“frame”, “height”, “width”).
- Raises
FileNotFoundError – if no files under vpath match the pattern pattern
ValueError – if the matched files does not have extension “.avi”, “.mkv” or “.tif”
NotImplementedError – if downsample_strategy is not “subset” or “mean”
-
minian.utilities.
local_extreme
(fm, k, etype='max', diff=0)[source]¶ Find local extreme of a 2d array.
- Parameters
fm (np.ndarray) – The input 2d array.
k (np.ndarray) – Structuring element defining the locality of the result, passed as kernel to
cv2.erode()
andcv2.dilate()
.etype (str, optional) – Type of local extreme. Either “min” or “max”. By default “max”.
diff (int, optional) – Threshold of difference between local extreme and its neighbours. By default 0.
- Returns
fm_ext (np.ndarray) – The returned 2d array whose non-zero elements represent the location of local extremes.
- Raises
ValueError – if etype is not “min” or “max”
-
minian.utilities.
med_baseline
(a, wnd)[source]¶ Subtract baseline from a timeseries as estimated by median-filtering the timeseries.
- Parameters
a (np.ndarray) – Input timeseries.
wnd (int) – Window size of the median filter. This parameter is passed as size to
scipy.ndimage.filters.median_filter()
.
- Returns
a (np.ndarray) – Timeseries with baseline subtracted.
-
minian.utilities.
open_minian
(dpath, post_process=None, return_dict=False)[source]¶ Load an existing minian dataset.
If dpath is a file, then it is assumed that the full dataset is saved as a single file, and this function will directly call
xarray.open_dataset()
on dpath. Otherwise if dpath is a directory, then it is assumed that the dataset is saved as a directory of zarr arrays, as produced bysave_minian()
. This function will then iterate through all the directories under input dpath and load them as xr.DataArray with zarr backend, so it is important that the user make sure every directory under dpath can be load this way. The loaded arrays will be combined as either a xr.Dataset or a dict. Optionally a user-supplied custom function can be used to post process the resulting xr.Dataset.- Parameters
dpath (str) – The path to the minian dataset that should be loaded.
post_process (Callable, optional) – User-supplied function to post process the dataset. Only used if return_dict is False. Two arguments will be passed to the function: the resulting dataset ds and the data path dpath. In other words the function should have signature f(ds: xr.Dataset, dpath: str) -> xr.Dataset. By default None.
return_dict (bool, optional) – Whether to combine the DataArray as dictionary, where the .name attribute will be used as key. Otherwise the DataArray will be combined using xr.merge(…, compat=”no_conflicts”), which will implicitly align the DataArray over all dimensions, so it is important to make sure the coordinates are compatible and will not result in creation of large NaN-padded results. Only used if dpath is a directory, otherwise a xr.Dataset is always returned. By default False.
- Returns
ds (Union[dict, xr.Dataset]) – The resulting dataset. If return_dict is True it will be a dict, otherwise a xr.Dataset.
See also
xarray.open_zarr
for how each directory will be loaded as xr.DataArray
xarray.merge
for how the xr.DataArray will be merged as xr.Dataset
-
minian.utilities.
open_minian_mf
(dpath, index_dims, result_format='xarray', pattern='minian$', sub_dirs=[], exclude=True, **kwargs)[source]¶ Open multiple minian datasets across multiple directories.
This function recursively walks through directories under dpath and try to load minian datasets from all directories matching pattern. It will then combine them based on index_dims into either a xr.Dataset object or a pd.DataFrame. Optionally a subset of paths can be specified, so that they can either be excluded or white-listed. Additional keyword arguments will be passed directly to
open_minian()
.- Parameters
dpath (str) – The root folder containing all datasets to be loaded.
index_dims (List[str]) – List of dimensions that can be used to index and merge multiple datasets. All loaded datasets should have unique coordinates in the listed dimensions.
result_format (str, optional) – If “xarray”, the result will be merged together recursively along each dimensions listed in index_dims. Users should make sure the coordinates are compatible and the merging will not cause generation of large NaN-padded results. If “pandas”, then a pd.DataFrame is returned, with columns corresponding to index_dims uniquely identify each dataset, and an additional column named “minian” of object dtype pointing to the loaded minian dataset objects. By default “xarray”.
pattern (regexp, optional) – Pattern of minian dataset directory names. By default r”minian$”.
sub_dirs (List[str], optional) – A list of sub-directories under dpath. Useful if only a subset of datasets under dpath should be recursively loaded. By default [].
exclude (bool, optional) – Whether to exclude directories listed under sub_dirs. If True, then any minian datasets under those specified in sub_dirs will be ignored. If False, then only the datasets under those specified in sub_dirs will be loaded (they still have to be under dpath though). by default True.
- Returns
ds (Union[xr.Dataset, pd.DataFrame]) – The resulting combined datasets. If result_format is “xarray”, then a xr.Dataset will be returned, otherwise a pd.DataFrame will be returned.
- Raises
NotImplementedError – if result_format is not “xarray” or “pandas”
-
minian.utilities.
optimize_chunk
(arr, chk)[source]¶ Rechunk a xr.DataArray with constrained “rechunk-merge” tasks.
- Parameters
arr (xr.DataArray) – The array to be rechunked.
chk (dict) – The desired chunk size.
- Returns
arr_chk (xr.DataArray) – The rechunked array.
-
minian.utilities.
rechunk_like
(x, y)[source]¶ Rechunk the input x such that its chunks are compatible with y.
- Parameters
x (xr.DataArray) – The array to be rechunked.
y (xr.DataArray) – The array where chunk information are extracted.
- Returns
x_chk (xr.DataArray) – The rechunked x.
-
minian.utilities.
rewrite_key
(key, rwdict)[source]¶ Rewrite a task key according to rwdict.
- Parameters
- Returns
key (str) – The new key.
- Raises
ValueError – if input key is neither str or tuple
-
minian.utilities.
save_minian
(var, dpath, meta_dict=None, overwrite=False, chunks=None, compute=True, mem_limit='500MB')[source]¶ Save a xr.DataArray with zarr storage backend following minian conventions.
This function will store arbitrary xr.DataArray into dpath with zarr backend. A separate folder will be created under dpath, with folder name var.name + “.zarr”. Optionally metadata can be retrieved from directory hierarchy and added as coordinates of the xr.DataArray. In addition, an on-disk rechunking of the result can be performed using
rechunker.rechunk()
if chunks are given.- Parameters
var (xr.DataArray) – The array to be saved.
dpath (str) – The path to the minian dataset directory.
meta_dict (dict, optional) – How metadata should be retrieved from directory hierarchy. The keys should be negative integers representing directory level relative to dpath (so -1 means the immediate parent directory of dpath), and values should be the name of dimensions represented by the corresponding level of directory. The actual coordinate value of the dimensions will be the directory name of corresponding level. By default None.
overwrite (bool, optional) – Whether to overwrite the result on disk. By default False.
chunks (dict, optional) – A dictionary specifying the desired chunk size. The chunk size should be specified using Chunks convention, except the “auto” specifiication is not supported. The rechunking operation will be carried out with on-disk algorithms using
rechunker.rechunk()
. By default None.compute (bool, optional) – Whether to compute var and save it immediately. By default True.
mem_limit (str, optional) – The memory limit for the on-disk rechunking algorithm, passed to
rechunker.rechunk()
. Only used if chunks is not None. By default “500MB”.
- Returns
var (xr.DataArray) – The array representation of saving result. If compute is True, then the returned array will only contain delayed task of loading the on-disk zarr arrays. Otherwise all computation leading to the input var will be preserved in the result.
Examples
The following will save the variable var to directory /spatial_memory/alpha/learning1/minian/important_array.zarr, with the additional coordinates: {“session”: “learning1”, “animal”: “alpha”, “experiment”: “spatial_memory”}.
>>> save_minian( ... var.rename("important_array"), ... "/spatial_memory/alpha/learning1/minian", ... {-1: "session", -2: "animal", -3: "experiment"}, ... )
-
minian.utilities.
split_key
(key, rename_dict=None)[source]¶ Split, rename and filter task keys.
This is custom implementation that only keeps keys found in
ANNOTATIONS
.
-
minian.utilities.
unique_keys
(keys)[source]¶ Returns only unique keys in a list of task keys.
Dask task keys regarding arrays are usually tuples representing chunked operations. This function ignore different chunks and only return unique keys.
- Parameters
keys (list) – List of dask keys.
- Returns
unique (np.ndarray) – Unique keys.
-
minian.utilities.
xrconcat_recursive
(var, dims)[source]¶ Recursively concatenate xr.DataArray over multiple dimensions.
- Parameters
var (Union[dict, list]) – Either a dict or a list of xr.DataArray to be concatenated. If a dict then keys should be tuple, with length same as the length of dims and values corresponding to the coordinates that uniquely identify each xr.DataArray. If a list then each xr.DataArray should contain valid coordinates for each dimensions specified in dims.
dims (List[str]) – Dimensions to be concatenated over.
- Returns
ds (xr.Dataset) – The concatenated dataset.
- Raises
NotImplementedError – if input var is neither a dict nor a list
-
minian.utilities.
ANNOTATIONS
= {'est_motion_chunk': {'resources': {'MEM': 1}}, 'from-zarr-store': {'resources': {'MEM': 1}}, 'ks_perseed': {'resources': {'MEM': 0.5}}, 'load_avi_ffmpeg': {'resources': {'MEM': 1}}, 'merge_restricted': {'resources': {'MEM': 1}}, 'pnr_perseed': {'resources': {'MEM': 0.5}}, 'smooth_corr': {'resources': {'MEM': 1}}, 'tensordot_restricted': {'resources': {'MEM': 1}}, 'transform_perframe': {'resources': {'MEM': 0.5}}, 'update_spatial_block': {'resources': {'MEM': 1}}, 'update_temporal_block': {'resources': {'MEM': 1}}, 'vectorize_noise_fft': {'resources': {'MEM': 1}}, 'vectorize_noise_welch': {'resources': {'MEM': 1}}}¶ Dask annotations that should be applied to each task.
This is a dict mapping task names (actually patterns) to a dict of dask annotations that should be applied to the tasks. It is mainly used to constrain number of tasks that can be concurrently in memory for each worker.
See also
-
minian.utilities.
FAST_FUNCTIONS
= [<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_slice>, <function _vindex_merge>, <function _vindex_transpose>]¶ List of fast functions that should be inlined during optimization.
See also