minian.utilities module#
- class minian.utilities.TaskAnnotation[source]#
Bases:
SchedulerPluginCustom SchedulerPlugin that implemented per-task level annotation. The annotations are applied according to the module constant
ANNOTATIONS.- update_graph(scheduler, client, tasks, **kwargs)[source]#
Run when a new graph / tasks enter the scheduler
- Parameters:
scheduler – The Scheduler instance.
client – The unique Client id.
keys – The keys the Client is interested in when calling update_graph.
tasks – All the keys submitted through update_graph (including those not directly requested by the Client)
annotations –
Fully resolved annotations as applied to the tasks in the format:
{ "annotation": { key: value, ... }, ... }
priority – Calculated priorities as assigned to the tasks.
stimulus_id – ID of the stimulus causing the graph update
**kwargs – It is recommended to allow plugins to accept more parameters to ensure future compatibility.
- minian.utilities.custom_arr_optimize(dsk, keys, fast_funcs=[<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_merge>], inline_patterns=None, rename_dict=None, rewrite_dict=None, keep_patterns=None, **kwargs)[source]#
Customized implementation of array optimization function.
- Parameters:
dsk (dict) – Input dask task graph.
keys (list) – Output task keys.
fast_funcs (list, optional) – list of fast functions to be inlined. By default
FAST_FUNCTIONS.inline_patterns (list, optional) – list of patterns of task keys to be inlined. By default [].
rename_dict (dict, optional) – Dictionary mapping old task key substrings to new ones. Treated as a synonym of rewrite_dict (applied post-hoc as aliased renames for dependency-link safety on dask >=2025). By default None.
rewrite_dict (dict, optional) – Dictionary mapping old task key substrings to new ones. Applied at the end of optimization to all task keys. By default None.
keep_patterns (list, optional) – list of patterns of task keys that should be preserved during optimization. By default [].
- Returns:
dsk (dict) – Optimized dask graph.
See also
Optimization dask.array.optimization.optimize
- minian.utilities.custom_delay_optimize(dsk, keys, fast_functions=None, inline_patterns=None, **kwargs)[source]#
Custom optimization functions for delayed tasks.
By default only fusing of tasks will be carried out.
- Parameters:
- Returns:
dsk (dict) – Optimized dask graph.
- minian.utilities.custom_fused_keys_renamer(keys, max_fused_key_length=120, rename_dict=None)[source]#
Custom implmentation to create new keys for fuse tasks.
Uses custom split_key implementation.
- Parameters:
- Returns:
fused_key (str) – The fused task key.
See also
- minian.utilities.factors(x)[source]#
Compute all factors of an interger.
- Parameters:
x (int) – Input
- Returns:
factors (list[int]) – list of factors of x.
- minian.utilities.get_chk(arr)[source]#
Get chunks of a xr.DataArray.
- Parameters:
arr (xr.DataArray) – The input xr.DataArray
- Returns:
chk (dict) – Dictionary mapping dimension names to chunks.
- minian.utilities.get_chunksize(arr)[source]#
Get chunk size of a xr.DataArray.
- Parameters:
arr (xr.DataArray) – The input xr.DataArray.
- Returns:
chk (dict) – Dictionary mapping dimension names to chunk sizes.
- minian.utilities.get_keys_pat(pat, keys, return_all=False)[source]#
Filter a list of task keys by pattern.
- Parameters:
- Returns:
keys (Union[list, str]) – If return_all is True then a list of keys will be returned. Otherwise only one key will be returned.
- minian.utilities.get_optimal_chk(arr, dim_grp=None, csize=256, dtype=None)[source]#
Compute the optimal chunk size across all dimensions of the input array.
This function use dask autochunking mechanism to determine the optimal chunk size of an array. The difference between this and directly using “auto” as chunksize is that it understands which dimensions are usually chunked together with the help of dim_grp. It also support computing chunks for custom dtype and explicit requirement of chunk size.
- Parameters:
arr (xr.DataArray) – The input array to estimate for chunk size.
dim_grp (list, optional) – list of tuples specifying which dimensions are usually chunked together during computation. For each tuple in the list, it is assumed that only dimensions in the tuple will be chunked while all other dimensions in the input arr will not be chunked. Each dimensions in the input arr should appear once and only once across the list. By default [(“frame”,), (“height”, “width”)].
csize (int, optional) – The desired space each chunk should occupy, specified in MB. By default 256.
dtype (type, optional) – The datatype of arr during actual computation in case that will be different from the current arr.dtype. By default None.
- Returns:
chk (dict) – Dictionary mapping dimension names to chunk sizes.
- minian.utilities.inline_pattern(dsk, pat_ls, inline_constants)[source]#
Inline tasks whose keys match certain patterns.
- Parameters:
- Returns:
dsk (dict) – Dask graph with keys inlined.
See also
- minian.utilities.local_extreme(fm, k, etype='max', diff=0)[source]#
Find local extreme of a 2d array.
- Parameters:
fm (np.ndarray) – The input 2d array.
k (np.ndarray) – Structuring element defining the locality of the result, passed as kernel to
cv2.erode()andcv2.dilate().etype (str, optional) – Type of local extreme. Either “min” or “max”. By default “max”.
diff (int, optional) – Threshold of difference between local extreme and its neighbours. By default 0.
- Returns:
fm_ext (np.ndarray) – The returned 2d array whose non-zero elements represent the location of local extremes.
- Raises:
ValueError – if etype is not “min” or “max”
- minian.utilities.med_baseline(a, wnd)[source]#
Subtract baseline from a timeseries as estimated by median-filtering the timeseries.
- Parameters:
a (np.ndarray) – Input timeseries.
wnd (int) – Window size of the median filter. This parameter is passed as size to
scipy.ndimage.filters.median_filter().
- Returns:
a (np.ndarray) – Timeseries with baseline subtracted.
- minian.utilities.open_minian(dpath, post_process=None, return_dict=False)[source]#
Load an existing minian dataset.
If dpath is a file, then it is assumed that the full dataset is saved as a single file, and this function will directly call
xarray.open_dataset()on dpath. Otherwise if dpath is a directory, then it is assumed that the dataset is saved as a directory of zarr arrays, as produced bysave_minian(). This function will then iterate through all the directories under input dpath and load them as xr.DataArray with zarr backend, so it is important that the user make sure every directory under dpath can be load this way. The loaded arrays will be combined as either a xr.Dataset or a dict. Optionally a user-supplied custom function can be used to post process the resulting xr.Dataset.- Parameters:
dpath (str) – The path to the minian dataset that should be loaded.
post_process (Callable, optional) – User-supplied function to post process the dataset. Only used if return_dict is False. Two arguments will be passed to the function: the resulting dataset ds and the data path dpath. In other words the function should have signature f(ds: xr.Dataset, dpath: str) -> xr.Dataset. By default None.
return_dict (bool, optional) – Whether to combine the DataArray as dictionary, where the .name attribute will be used as key. Otherwise the DataArray will be combined using xr.merge(…, compat=”no_conflicts”), which will implicitly align the DataArray over all dimensions, so it is important to make sure the coordinates are compatible and will not result in creation of large NaN-padded results. Only used if dpath is a directory, otherwise a xr.Dataset is always returned. By default False.
- Returns:
ds (Union[dict, xr.Dataset]) – The resulting dataset. If return_dict is True it will be a dict, otherwise a xr.Dataset.
See also
xarray.open_zarrfor how each directory will be loaded as xr.DataArray
xarray.mergefor how the xr.DataArray will be merged as xr.Dataset
- minian.utilities.open_minian_mf(dpath, index_dims, result_format='xarray', pattern='minian$', sub_dirs=None, exclude=True, **kwargs)[source]#
Open multiple minian datasets across multiple directories.
This function recursively walks through directories under dpath and try to load minian datasets from all directories matching pattern. It will then combine them based on index_dims into either a xr.Dataset object or a pd.DataFrame. Optionally a subset of paths can be specified, so that they can either be excluded or white-listed. Additional keyword arguments will be passed directly to
open_minian().- Parameters:
dpath (str) – The root folder containing all datasets to be loaded.
index_dims (list[str]) – list of dimensions that can be used to index and merge multiple datasets. All loaded datasets should have unique coordinates in the listed dimensions.
result_format (str, optional) – If “xarray”, the result will be merged together recursively along each dimensions listed in index_dims. Users should make sure the coordinates are compatible and the merging will not cause generation of large NaN-padded results. If “pandas”, then a pd.DataFrame is returned, with columns corresponding to index_dims uniquely identify each dataset, and an additional column named “minian” of object dtype pointing to the loaded minian dataset objects. By default “xarray”.
pattern (regexp, optional) – Pattern of minian dataset directory names. By default r”minian$”.
sub_dirs (list[str], optional) – A list of sub-directories under dpath. Useful if only a subset of datasets under dpath should be recursively loaded. By default [].
exclude (bool, optional) – Whether to exclude directories listed under sub_dirs. If True, then any minian datasets under those specified in sub_dirs will be ignored. If False, then only the datasets under those specified in sub_dirs will be loaded (they still have to be under dpath though). by default True.
- Returns:
ds (Union[xr.Dataset, pd.DataFrame]) – The resulting combined datasets. If result_format is “xarray”, then a xr.Dataset will be returned, otherwise a pd.DataFrame will be returned.
- Raises:
NotImplementedError – if result_format is not “xarray” or “pandas”
- minian.utilities.optimize_chunk(arr, chk)[source]#
Rechunk a xr.DataArray with constrained “rechunk-merge” tasks.
- Parameters:
arr (xr.DataArray) – The array to be rechunked.
chk (dict) – The desired chunk size.
- Returns:
arr_chk (xr.DataArray) – The rechunked array.
- minian.utilities.rechunk_like(x, y)[source]#
Rechunk the input x such that its chunks are compatible with y.
- Parameters:
x (xr.DataArray) – The array to be rechunked.
y (xr.DataArray) – The array where chunk information are extracted.
- Returns:
x_chk (xr.DataArray) – The rechunked x.
- minian.utilities.rewrite_key(key, rwdict)[source]#
Rewrite a task key according to rwdict.
- Parameters:
- Returns:
key (str) – The new key.
- Raises:
ValueError – if input key is neither str or tuple
- minian.utilities.save_minian(var, dpath, meta_dict=None, overwrite=False, chunks=None, compute=True, mem_limit='500MB')[source]#
Save a xr.DataArray with zarr storage backend following minian conventions.
This function will store arbitrary xr.DataArray into dpath with zarr backend. A separate folder will be created under dpath, with folder name var.name + “.zarr”. Optionally metadata can be retrieved from directory hierarchy and added as coordinates of the xr.DataArray. In addition, an on-disk rechunking of the result can be performed using
rechunker.rechunk()if chunks are given.- Parameters:
var (xr.DataArray) – The array to be saved.
dpath (str) – The path to the minian dataset directory.
meta_dict (dict, optional) – How metadata should be retrieved from the directory hierarchy. The keys should be the name of the dimension to assign, and the values should be negative integers representing the directory level relative to dpath (so -1 means the immediate parent directory of dpath). The coordinate value will be the directory name of the corresponding level. For example {“session”: -1, “animal”: -2}. By default None.
overwrite (bool, optional) – Whether to overwrite the result on disk. By default False.
chunks (dict, optional) – A dictionary specifying the desired chunk size. The chunk size should be specified using Chunks convention, except the “auto” specifiication is not supported. The rechunking operation will be carried out with on-disk algorithms using
rechunker.rechunk(). By default None.compute (bool, optional) – Whether to compute var and save it immediately. By default True.
mem_limit (str, optional) – The memory limit for the on-disk rechunking algorithm, passed to
rechunker.rechunk(). Only used if chunks is not None. By default “500MB”.
- Returns:
var (xr.DataArray) – The array representation of saving result. If compute is True, then the returned array will only contain delayed task of loading the on-disk zarr arrays. Otherwise all computation leading to the input var will be preserved in the result.
Examples
The following will save the variable var to directory /spatial_memory/alpha/learning1/minian/important_array.zarr, with the additional coordinates: {“session”: “learning1”, “animal”: “alpha”, “experiment”: “spatial_memory”}.
>>> save_minian( ... var.rename("important_array"), ... "/spatial_memory/alpha/learning1/minian", ... {"session": -1, "animal": -2, "experiment": -3}, ... )
- minian.utilities.split_key(key, rename_dict=None)[source]#
Split, rename and filter task keys.
This is custom implementation that only keeps keys found in
ANNOTATIONS.
- minian.utilities.unique_keys(keys)[source]#
Returns only unique keys in a list of task keys.
Dask task keys regarding arrays are usually tuples representing chunked operations. This function ignore different chunks and only return unique keys.
- Parameters:
keys (list) – list of dask keys.
- Returns:
unique (np.ndarray) – Unique keys.
- minian.utilities.update_meta(dpath, pattern='^minian$', meta_dict=None)[source]#
Permanently update the metadata of saved minian datasets in place.
This function walks dpath and, for every dataset directory whose name matches pattern, re-derives metadata coordinates from the directory hierarchy (following the same convention as
save_minian()) and adds them to the on-disk zarr stores. Only the small coordinate arrays are written, so the variables’ data chunks are left untouched. It is useful as a recovery tool when datasets were originally saved without meta_dict, e.g. before a cross-registration workflow that relies on session/animal coordinates.- Parameters:
dpath (str) – A path containing any number of minian datasets nested under it.
pattern (str, optional) – Regular expression matched against directory names to identify minian dataset directories. By default r”^minian$”.
meta_dict (dict, optional) – How metadata should be retrieved from the directory hierarchy. The keys should be the name of the dimension to assign, and the values should be negative integers representing the directory level relative to the dataset directory (so -1 means its immediate parent directory). The coordinate value will be the directory name of the corresponding level. For example {“session”: -1, “animal”: -2}. By default None.
See also
save_minianfor how meta_dict is applied when first saving a dataset.
- minian.utilities.xrconcat_recursive(var, dims)[source]#
Recursively concatenate xr.DataArray over multiple dimensions.
- Parameters:
var (Union[dict, list]) – Either a dict or a list of xr.DataArray to be concatenated. If a dict then keys should be tuple, with length same as the length of dims and values corresponding to the coordinates that uniquely identify each xr.DataArray. If a list then each xr.DataArray should contain valid coordinates for each dimensions specified in dims.
- Returns:
ds (xr.Dataset) – The concatenated dataset.
- Raises:
NotImplementedError – if input var is neither a dict nor a list
- minian.utilities.ANNOTATIONS = {'est_motion_chunk': {'resources': {'MEM': 1}}, 'from-zarr-store': {'resources': {'MEM': 1}}, 'ks_perseed': {'resources': {'MEM': 0.5}}, 'load_avi_ffmpeg': {'resources': {'MEM': 1}}, 'merge_restricted': {'resources': {'MEM': 1}}, 'pnr_perseed': {'resources': {'MEM': 0.5}}, 'smooth_corr': {'resources': {'MEM': 1}}, 'tensordot_restricted': {'resources': {'MEM': 1}}, 'transform_perframe': {'resources': {'MEM': 0.5}}, 'update_spatial_block': {'resources': {'MEM': 1}}, 'update_temporal_block': {'resources': {'MEM': 1}}, 'vectorize_noise_fft': {'resources': {'MEM': 1}}, 'vectorize_noise_welch': {'resources': {'MEM': 1}}}#
Dask annotations that should be applied to each task.
This is a dict mapping task names (actually patterns) to a dict of dask annotations that should be applied to the tasks. It is mainly used to constrain number of tasks that can be concurrently in memory for each worker.
See also
- minian.utilities.FAST_FUNCTIONS = [<function getter_inline>, <function getter>, <built-in function getitem>, <class 'zarr.core.Array'>, <function astype>, <function concatenate_axes>, <function _vindex_merge>]#
list of fast functions that should be inlined during optimization.
See also