1from __future__ import annotations
2
3import contextlib
4import math
5import operator
6import os
7import pickle
8import re
9import sys
10import traceback
11import uuid
12import warnings
13from bisect import bisect
14from collections.abc import (
15    Collection,
16    Hashable,
17    Iterable,
18    Iterator,
19    Mapping,
20    MutableMapping,
21)
22from functools import partial, reduce, wraps
23from itertools import product, zip_longest
24from numbers import Integral, Number
25from operator import add, mul
26from threading import Lock
27from typing import Any, Sequence
28
29import numpy as np
30from fsspec import get_mapper
31from tlz import accumulate, concat, first, frequencies, groupby, partition
32from tlz.curried import pluck
33
34from .. import compute, config, core, threaded
35from ..base import (
36    DaskMethodsMixin,
37    compute_as_if_collection,
38    dont_optimize,
39    is_dask_collection,
40    persist,
41    tokenize,
42)
43from ..blockwise import broadcast_dimensions
44from ..context import globalmethod
45from ..core import quote
46from ..delayed import Delayed, delayed
47from ..highlevelgraph import HighLevelGraph
48from ..layers import reshapelist
49from ..sizeof import sizeof
50from ..utils import (
51    IndexCallable,
52    M,
53    SerializableLock,
54    cached_property,
55    concrete,
56    derived_from,
57    factors,
58    format_bytes,
59    funcname,
60    has_keyword,
61    is_arraylike,
62    is_dataframe_like,
63    is_index_like,
64    is_integer,
65    is_series_like,
66    ndeepmap,
67    ndimlist,
68    parse_bytes,
69    typename,
70)
71from ..widgets import get_template
72from . import chunk
73from .chunk import getitem
74from .chunk_types import is_valid_array_chunk, is_valid_chunk_type
75
76# Keep einsum_lookup and tensordot_lookup here for backwards compatibility
77from .dispatch import concatenate_lookup, einsum_lookup, tensordot_lookup  # noqa: F401
78from .numpy_compat import _numpy_120, _Recurser
79from .slicing import cached_cumsum, replace_ellipsis, setitem_array, slice_array
80
81config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}})
82
83unknown_chunk_message = (
84    "\n\n"
85    "A possible solution: "
86    "https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks\n"
87    "Summary: to compute chunks sizes, use\n\n"
88    "   x.compute_chunk_sizes()  # for Dask Array `x`\n"
89    "   ddf.to_dask_array(lengths=True)  # for Dask DataFrame `ddf`"
90)
91
92
93class PerformanceWarning(Warning):
94    """A warning given when bad chunking may cause poor performance"""
95
96
97def getter(a, b, asarray=True, lock=None):
98    if isinstance(b, tuple) and any(x is None for x in b):
99        b2 = tuple(x for x in b if x is not None)
100        b3 = tuple(
101            None if x is None else slice(None, None)
102            for x in b
103            if not isinstance(x, Integral)
104        )
105        return getter(a, b2, asarray=asarray, lock=lock)[b3]
106
107    if lock:
108        lock.acquire()
109    try:
110        c = a[b]
111        # Below we special-case `np.matrix` to force a conversion to
112        # `np.ndarray` and preserve original Dask behavior for `getter`,
113        # as for all purposes `np.matrix` is array-like and thus
114        # `is_arraylike` evaluates to `True` in that case.
115        if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)):
116            c = np.asarray(c)
117    finally:
118        if lock:
119            lock.release()
120    return c
121
122
123def getter_nofancy(a, b, asarray=True, lock=None):
124    """A simple wrapper around ``getter``.
125
126    Used to indicate to the optimization passes that the backend doesn't
127    support fancy indexing.
128    """
129    return getter(a, b, asarray=asarray, lock=lock)
130
131
132def getter_inline(a, b, asarray=True, lock=None):
133    """A getter function that optimizations feel comfortable inlining
134
135    Slicing operations with this function may be inlined into a graph, such as
136    in the following rewrite
137
138    **Before**
139
140    >>> a = x[:10]  # doctest: +SKIP
141    >>> b = a + 1  # doctest: +SKIP
142    >>> c = a * 2  # doctest: +SKIP
143
144    **After**
145
146    >>> b = x[:10] + 1  # doctest: +SKIP
147    >>> c = x[:10] * 2  # doctest: +SKIP
148
149    This inlining can be relevant to operations when running off of disk.
150    """
151    return getter(a, b, asarray=asarray, lock=lock)
152
153
154from .optimization import fuse_slice, optimize
155
156# __array_function__ dict for mapping aliases and mismatching names
157_HANDLED_FUNCTIONS = {}
158
159
160def implements(*numpy_functions):
161    """Register an __array_function__ implementation for dask.array.Array
162
163    Register that a function implements the API of a NumPy function (or several
164    NumPy functions in case of aliases) which is handled with
165    ``__array_function__``.
166
167    Parameters
168    ----------
169    \\*numpy_functions : callables
170        One or more NumPy functions that are handled by ``__array_function__``
171        and will be mapped by `implements` to a `dask.array` function.
172    """
173
174    def decorator(dask_func):
175        for numpy_function in numpy_functions:
176            _HANDLED_FUNCTIONS[numpy_function] = dask_func
177
178        return dask_func
179
180    return decorator
181
182
183def _should_delegate(other) -> bool:
184    """Check whether Dask should delegate to the other.
185    This implementation follows NEP-13:
186    https://numpy.org/neps/nep-0013-ufunc-overrides.html#behavior-in-combination-with-python-s-binary-operations
187    """
188    if hasattr(other, "__array_ufunc__") and other.__array_ufunc__ is None:
189        return True
190    elif (
191        hasattr(other, "__array_ufunc__")
192        and not is_valid_array_chunk(other)
193        and type(other).__array_ufunc__ is not Array.__array_ufunc__
194    ):
195        return True
196    return False
197
198
199def check_if_handled_given_other(f):
200    """Check if method is handled by Dask given type of other
201
202    Ensures proper deferral to upcast types in dunder operations without
203    assuming unknown types are automatically downcast types.
204    """
205
206    @wraps(f)
207    def wrapper(self, other):
208        if _should_delegate(other):
209            return NotImplemented
210        else:
211            return f(self, other)
212
213    return wrapper
214
215
216def slices_from_chunks(chunks):
217    """Translate chunks tuple to a set of slices in product order
218
219    >>> slices_from_chunks(((2, 2), (3, 3, 3)))  # doctest: +NORMALIZE_WHITESPACE
220     [(slice(0, 2, None), slice(0, 3, None)),
221      (slice(0, 2, None), slice(3, 6, None)),
222      (slice(0, 2, None), slice(6, 9, None)),
223      (slice(2, 4, None), slice(0, 3, None)),
224      (slice(2, 4, None), slice(3, 6, None)),
225      (slice(2, 4, None), slice(6, 9, None))]
226    """
227    cumdims = [cached_cumsum(bds, initial_zero=True) for bds in chunks]
228    slices = [
229        [slice(s, s + dim) for s, dim in zip(starts, shapes)]
230        for starts, shapes in zip(cumdims, chunks)
231    ]
232    return list(product(*slices))
233
234
235def getem(
236    arr,
237    chunks,
238    getitem=getter,
239    shape=None,
240    out_name=None,
241    lock=False,
242    asarray=True,
243    dtype=None,
244):
245    """Dask getting various chunks from an array-like
246
247    >>> getem('X', chunks=(2, 3), shape=(4, 6))  # doctest: +SKIP
248    {('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))),
249     ('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))),
250     ('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))),
251     ('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))}
252
253    >>> getem('X', chunks=((2, 2), (3, 3)))  # doctest: +SKIP
254    {('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))),
255     ('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))),
256     ('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))),
257     ('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))}
258    """
259    out_name = out_name or arr
260    chunks = normalize_chunks(chunks, shape, dtype=dtype)
261    keys = product([out_name], *(range(len(bds)) for bds in chunks))
262    slices = slices_from_chunks(chunks)
263
264    if (
265        has_keyword(getitem, "asarray")
266        and has_keyword(getitem, "lock")
267        and (not asarray or lock)
268    ):
269        values = [(getitem, arr, x, asarray, lock) for x in slices]
270    else:
271        # Common case, drop extra parameters
272        values = [(getitem, arr, x) for x in slices]
273
274    return dict(zip(keys, values))
275
276
277def dotmany(A, B, leftfunc=None, rightfunc=None, **kwargs):
278    """Dot product of many aligned chunks
279
280    >>> x = np.array([[1, 2], [1, 2]])
281    >>> y = np.array([[10, 20], [10, 20]])
282    >>> dotmany([x, x, x], [y, y, y])
283    array([[ 90, 180],
284           [ 90, 180]])
285
286    Optionally pass in functions to apply to the left and right chunks
287
288    >>> dotmany([x, x, x], [y, y, y], rightfunc=np.transpose)
289    array([[150, 150],
290           [150, 150]])
291    """
292    if leftfunc:
293        A = map(leftfunc, A)
294    if rightfunc:
295        B = map(rightfunc, B)
296    return sum(map(partial(np.dot, **kwargs), A, B))
297
298
299def _concatenate2(arrays, axes=[]):
300    """Recursively concatenate nested lists of arrays along axes
301
302    Each entry in axes corresponds to each level of the nested list.  The
303    length of axes should correspond to the level of nesting of arrays.
304    If axes is an empty list or tuple, return arrays, or arrays[0] if
305    arrays is a list.
306
307    >>> x = np.array([[1, 2], [3, 4]])
308    >>> _concatenate2([x, x], axes=[0])
309    array([[1, 2],
310           [3, 4],
311           [1, 2],
312           [3, 4]])
313
314    >>> _concatenate2([x, x], axes=[1])
315    array([[1, 2, 1, 2],
316           [3, 4, 3, 4]])
317
318    >>> _concatenate2([[x, x], [x, x]], axes=[0, 1])
319    array([[1, 2, 1, 2],
320           [3, 4, 3, 4],
321           [1, 2, 1, 2],
322           [3, 4, 3, 4]])
323
324    Supports Iterators
325    >>> _concatenate2(iter([x, x]), axes=[1])
326    array([[1, 2, 1, 2],
327           [3, 4, 3, 4]])
328
329    Special Case
330    >>> _concatenate2([x, x], axes=())
331    array([[1, 2],
332           [3, 4]])
333    """
334    if axes == ():
335        if isinstance(arrays, list):
336            return arrays[0]
337        else:
338            return arrays
339
340    if isinstance(arrays, Iterator):
341        arrays = list(arrays)
342    if not isinstance(arrays, (list, tuple)):
343        return arrays
344    if len(axes) > 1:
345        arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays]
346    concatenate = concatenate_lookup.dispatch(
347        type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0)))
348    )
349    if isinstance(arrays[0], dict):
350        # Handle concatenation of `dict`s, used as a replacement for structured
351        # arrays when that's not supported by the array library (e.g., CuPy).
352        keys = list(arrays[0].keys())
353        assert all(list(a.keys()) == keys for a in arrays)
354        ret = dict()
355        for k in keys:
356            ret[k] = concatenate(list(a[k] for a in arrays), axis=axes[0])
357        return ret
358    else:
359        return concatenate(arrays, axis=axes[0])
360
361
362def apply_infer_dtype(func, args, kwargs, funcname, suggest_dtype="dtype", nout=None):
363    """
364    Tries to infer output dtype of ``func`` for a small set of input arguments.
365
366    Parameters
367    ----------
368    func: Callable
369        Function for which output dtype is to be determined
370
371    args: List of array like
372        Arguments to the function, which would usually be used. Only attributes
373        ``ndim`` and ``dtype`` are used.
374
375    kwargs: dict
376        Additional ``kwargs`` to the ``func``
377
378    funcname: String
379        Name of calling function to improve potential error messages
380
381    suggest_dtype: None/False or String
382        If not ``None`` adds suggestion to potential error message to specify a dtype
383        via the specified kwarg. Defaults to ``'dtype'``.
384
385    nout: None or Int
386        ``None`` if function returns single output, integer if many.
387        Deafults to ``None``.
388
389    Returns
390    -------
391    : dtype or List of dtype
392        One or many dtypes (depending on ``nout``)
393    """
394    from .utils import meta_from_array
395
396    # make sure that every arg is an evaluated array
397    args = [
398        np.ones_like(meta_from_array(x), shape=((1,) * x.ndim), dtype=x.dtype)
399        if is_arraylike(x)
400        else x
401        for x in args
402    ]
403    try:
404        with np.errstate(all="ignore"):
405            o = func(*args, **kwargs)
406    except Exception as e:
407        exc_type, exc_value, exc_traceback = sys.exc_info()
408        tb = "".join(traceback.format_tb(exc_traceback))
409        suggest = (
410            (
411                "Please specify the dtype explicitly using the "
412                "`{dtype}` kwarg.\n\n".format(dtype=suggest_dtype)
413            )
414            if suggest_dtype
415            else ""
416        )
417        msg = (
418            f"`dtype` inference failed in `{funcname}`.\n\n"
419            f"{suggest}"
420            "Original error is below:\n"
421            "------------------------\n"
422            f"{e!r}\n\n"
423            "Traceback:\n"
424            "---------\n"
425            f"{tb}"
426        )
427    else:
428        msg = None
429    if msg is not None:
430        raise ValueError(msg)
431    return o.dtype if nout is None else tuple(e.dtype for e in o)
432
433
434def normalize_arg(x):
435    """Normalize user provided arguments to blockwise or map_blocks
436
437    We do a few things:
438
439    1.  If they are string literals that might collide with blockwise_token then we
440        quote them
441    2.  IF they are large (as defined by sizeof) then we put them into the
442        graph on their own by using dask.delayed
443    """
444    if is_dask_collection(x):
445        return x
446    elif isinstance(x, str) and re.match(r"_\d+", x):
447        return delayed(x)
448    elif isinstance(x, list) and len(x) >= 10:
449        return delayed(x)
450    elif sizeof(x) > 1e6:
451        return delayed(x)
452    else:
453        return x
454
455
456def _pass_extra_kwargs(func, keys, *args, **kwargs):
457    """Helper for :func:`dask.array.map_blocks` to pass `block_info` or `block_id`.
458
459    For each element of `keys`, a corresponding element of args is changed
460    to a keyword argument with that key, before all arguments re passed on
461    to `func`.
462    """
463    kwargs.update(zip(keys, args))
464    return func(*args[len(keys) :], **kwargs)
465
466
467def map_blocks(
468    func,
469    *args,
470    name=None,
471    token=None,
472    dtype=None,
473    chunks=None,
474    drop_axis=[],
475    new_axis=None,
476    meta=None,
477    **kwargs,
478):
479    """Map a function across all blocks of a dask array.
480
481    Note that ``map_blocks`` will attempt to automatically determine the output
482    array type by calling ``func`` on 0-d versions of the inputs. Please refer to
483    the ``meta`` keyword argument below if you expect that the function will not
484    succeed when operating on 0-d arrays.
485
486    Parameters
487    ----------
488    func : callable
489        Function to apply to every block in the array.
490    args : dask arrays or other objects
491    dtype : np.dtype, optional
492        The ``dtype`` of the output array. It is recommended to provide this.
493        If not provided, will be inferred by applying the function to a small
494        set of fake data.
495    chunks : tuple, optional
496        Chunk shape of resulting blocks if the function does not preserve
497        shape. If not provided, the resulting array is assumed to have the same
498        block structure as the first input array.
499    drop_axis : number or iterable, optional
500        Dimensions lost by the function.
501    new_axis : number or iterable, optional
502        New dimensions created by the function. Note that these are applied
503        after ``drop_axis`` (if present).
504    token : string, optional
505        The key prefix to use for the output array. If not provided, will be
506        determined from the function name.
507    name : string, optional
508        The key name to use for the output array. Note that this fully
509        specifies the output key name, and must be unique. If not provided,
510        will be determined by a hash of the arguments.
511    meta : array-like, optional
512        The ``meta`` of the output array, when specified is expected to be an
513        array of the same type and dtype of that returned when calling ``.compute()``
514        on the array returned by this function. When not provided, ``meta`` will be
515        inferred by applying the function to a small set of fake data, usually a
516        0-d array. It's important to ensure that ``func`` can successfully complete
517        computation without raising exceptions when 0-d is passed to it, providing
518        ``meta`` will be required otherwise. If the output type is known beforehand
519        (e.g., ``np.ndarray``, ``cupy.ndarray``), an empty array of such type dtype
520        can be passed, for example: ``meta=np.array((), dtype=np.int32)``.
521    **kwargs :
522        Other keyword arguments to pass to function. Values must be constants
523        (not dask.arrays)
524
525    See Also
526    --------
527    dask.array.blockwise : Generalized operation with control over block alignment.
528
529    Examples
530    --------
531    >>> import dask.array as da
532    >>> x = da.arange(6, chunks=3)
533
534    >>> x.map_blocks(lambda x: x * 2).compute()
535    array([ 0,  2,  4,  6,  8, 10])
536
537    The ``da.map_blocks`` function can also accept multiple arrays.
538
539    >>> d = da.arange(5, chunks=2)
540    >>> e = da.arange(5, chunks=2)
541
542    >>> f = da.map_blocks(lambda a, b: a + b**2, d, e)
543    >>> f.compute()
544    array([ 0,  2,  6, 12, 20])
545
546    If the function changes shape of the blocks then you must provide chunks
547    explicitly.
548
549    >>> y = x.map_blocks(lambda x: x[::2], chunks=((2, 2),))
550
551    You have a bit of freedom in specifying chunks.  If all of the output chunk
552    sizes are the same, you can provide just that chunk size as a single tuple.
553
554    >>> a = da.arange(18, chunks=(6,))
555    >>> b = a.map_blocks(lambda x: x[:3], chunks=(3,))
556
557    If the function changes the dimension of the blocks you must specify the
558    created or destroyed dimensions.
559
560    >>> b = a.map_blocks(lambda x: x[None, :, None], chunks=(1, 6, 1),
561    ...                  new_axis=[0, 2])
562
563    If ``chunks`` is specified but ``new_axis`` is not, then it is inferred to
564    add the necessary number of axes on the left.
565
566    Map_blocks aligns blocks by block positions without regard to shape. In the
567    following example we have two arrays with the same number of blocks but
568    with different shape and chunk sizes.
569
570    >>> x = da.arange(1000, chunks=(100,))
571    >>> y = da.arange(100, chunks=(10,))
572
573    The relevant attribute to match is numblocks.
574
575    >>> x.numblocks
576    (10,)
577    >>> y.numblocks
578    (10,)
579
580    If these match (up to broadcasting rules) then we can map arbitrary
581    functions across blocks
582
583    >>> def func(a, b):
584    ...     return np.array([a.max(), b.max()])
585
586    >>> da.map_blocks(func, x, y, chunks=(2,), dtype='i8')
587    dask.array<func, shape=(20,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>
588
589    >>> _.compute()
590    array([ 99,   9, 199,  19, 299,  29, 399,  39, 499,  49, 599,  59, 699,
591            69, 799,  79, 899,  89, 999,  99])
592
593    Your block function get information about where it is in the array by
594    accepting a special ``block_info`` or ``block_id`` keyword argument.
595
596    >>> def func(block_info=None):
597    ...     pass
598
599    This will receive the following information:
600
601    >>> block_info  # doctest: +SKIP
602    {0: {'shape': (1000,),
603         'num-chunks': (10,),
604         'chunk-location': (4,),
605         'array-location': [(400, 500)]},
606     None: {'shape': (1000,),
607            'num-chunks': (10,),
608            'chunk-location': (4,),
609            'array-location': [(400, 500)],
610            'chunk-shape': (100,),
611            'dtype': dtype('float64')}}
612
613    For each argument and keyword arguments that are dask arrays (the positions
614    of which are the first index), you will receive the shape of the full
615    array, the number of chunks of the full array in each dimension, the chunk
616    location (for example the fourth chunk over in the first dimension), and
617    the array location (for example the slice corresponding to ``40:50``). The
618    same information is provided for the output, with the key ``None``, plus
619    the shape and dtype that should be returned.
620
621    These features can be combined to synthesize an array from scratch, for
622    example:
623
624    >>> def func(block_info=None):
625    ...     loc = block_info[None]['array-location'][0]
626    ...     return np.arange(loc[0], loc[1])
627
628    >>> da.map_blocks(func, chunks=((4, 4),), dtype=np.float_)
629    dask.array<func, shape=(8,), dtype=float64, chunksize=(4,), chunktype=numpy.ndarray>
630
631    >>> _.compute()
632    array([0, 1, 2, 3, 4, 5, 6, 7])
633
634    ``block_id`` is similar to ``block_info`` but contains only the ``chunk_location``:
635
636    >>> def func(block_id=None):
637    ...     pass
638
639    This will receive the following information:
640
641    >>> block_id  # doctest: +SKIP
642    (4, 3)
643
644    You may specify the key name prefix of the resulting task in the graph with
645    the optional ``token`` keyword argument.
646
647    >>> x.map_blocks(lambda x: x + 1, name='increment')
648    dask.array<increment, shape=(1000,), dtype=int64, chunksize=(100,), chunktype=numpy.ndarray>
649
650    For functions that may not handle 0-d arrays, it's also possible to specify
651    ``meta`` with an empty array matching the type of the expected result. In
652    the example below, ``func`` will result in an ``IndexError`` when computing
653    ``meta``:
654
655    >>> da.map_blocks(lambda x: x[2], da.random.random(5), meta=np.array(()))
656    dask.array<lambda, shape=(5,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>
657
658    Similarly, it's possible to specify a non-NumPy array to ``meta``, and provide
659    a ``dtype``:
660
661    >>> import cupy  # doctest: +SKIP
662    >>> rs = da.random.RandomState(RandomState=cupy.random.RandomState)  # doctest: +SKIP
663    >>> dt = np.float32
664    >>> da.map_blocks(lambda x: x[2], rs.random(5, dtype=dt), meta=cupy.array((), dtype=dt))  # doctest: +SKIP
665    dask.array<lambda, shape=(5,), dtype=float32, chunksize=(5,), chunktype=cupy.ndarray>
666    """
667    if not callable(func):
668        msg = (
669            "First argument must be callable function, not %s\n"
670            "Usage:   da.map_blocks(function, x)\n"
671            "   or:   da.map_blocks(function, x, y, z)"
672        )
673        raise TypeError(msg % type(func).__name__)
674    if token:
675        warnings.warn("The token= keyword to map_blocks has been moved to name=")
676        name = token
677
678    name = f"{name or funcname(func)}-{tokenize(func, *args, **kwargs)}"
679    new_axes = {}
680
681    if isinstance(drop_axis, Number):
682        drop_axis = [drop_axis]
683    if isinstance(new_axis, Number):
684        new_axis = [new_axis]  # TODO: handle new_axis
685
686    arrs = [a for a in args if isinstance(a, Array)]
687
688    argpairs = [
689        (a, tuple(range(a.ndim))[::-1]) if isinstance(a, Array) else (a, None)
690        for a in args
691    ]
692    if arrs:
693        out_ind = tuple(range(max(a.ndim for a in arrs)))[::-1]
694    else:
695        out_ind = ()
696
697    original_kwargs = kwargs
698
699    if dtype is None and meta is None:
700        try:
701            meta = compute_meta(func, dtype, *args, **kwargs)
702        except Exception:
703            pass
704
705        dtype = apply_infer_dtype(func, args, original_kwargs, "map_blocks")
706
707    if drop_axis:
708        ndim_out = len(out_ind)
709        if any(i < -ndim_out or i >= ndim_out for i in drop_axis):
710            raise ValueError(
711                f"drop_axis out of range (drop_axis={drop_axis}, "
712                f"but output is {ndim_out}d)."
713            )
714        drop_axis = [i % ndim_out for i in drop_axis]
715        out_ind = tuple(x for i, x in enumerate(out_ind) if i not in drop_axis)
716    if new_axis is None and chunks is not None and len(out_ind) < len(chunks):
717        new_axis = range(len(chunks) - len(out_ind))
718    if new_axis:
719        # new_axis = [x + len(drop_axis) for x in new_axis]
720        out_ind = list(out_ind)
721        for ax in sorted(new_axis):
722            n = len(out_ind) + len(drop_axis)
723            out_ind.insert(ax, n)
724            if chunks is not None:
725                new_axes[n] = chunks[ax]
726            else:
727                new_axes[n] = 1
728        out_ind = tuple(out_ind)
729        if max(new_axis) > max(out_ind):
730            raise ValueError("New_axis values do not fill in all dimensions")
731
732    if chunks is not None:
733        if len(chunks) != len(out_ind):
734            raise ValueError(
735                f"Provided chunks have {len(chunks)} dims; expected {len(out_ind)} dims"
736            )
737        adjust_chunks = dict(zip(out_ind, chunks))
738    else:
739        adjust_chunks = None
740
741    out = blockwise(
742        func,
743        out_ind,
744        *concat(argpairs),
745        name=name,
746        new_axes=new_axes,
747        dtype=dtype,
748        concatenate=True,
749        align_arrays=False,
750        adjust_chunks=adjust_chunks,
751        meta=meta,
752        **kwargs,
753    )
754
755    extra_argpairs = []
756    extra_names = []
757    # If func has block_id as an argument, construct an array of block IDs and
758    # prepare to inject it.
759    if has_keyword(func, "block_id"):
760        block_id_name = "block-id-" + out.name
761        block_id_dsk = {
762            (block_id_name,) + block_id: block_id
763            for block_id in product(*(range(len(c)) for c in out.chunks))
764        }
765        block_id_array = Array(
766            block_id_dsk,
767            block_id_name,
768            chunks=tuple((1,) * len(c) for c in out.chunks),
769            dtype=np.object_,
770        )
771        extra_argpairs.append((block_id_array, out_ind))
772        extra_names.append("block_id")
773
774    # If func has block_info as an argument, construct an array of block info
775    # objects and prepare to inject it.
776    if has_keyword(func, "block_info"):
777        starts = {}
778        num_chunks = {}
779        shapes = {}
780
781        for i, (arg, in_ind) in enumerate(argpairs):
782            if in_ind is not None:
783                shapes[i] = arg.shape
784                if drop_axis:
785                    # We concatenate along dropped axes, so we need to treat them
786                    # as if there is only a single chunk.
787                    starts[i] = [
788                        (
789                            cached_cumsum(arg.chunks[j], initial_zero=True)
790                            if ind in out_ind
791                            else [0, arg.shape[j]]
792                        )
793                        for j, ind in enumerate(in_ind)
794                    ]
795                    num_chunks[i] = tuple(len(s) - 1 for s in starts[i])
796                else:
797                    starts[i] = [
798                        cached_cumsum(c, initial_zero=True) for c in arg.chunks
799                    ]
800                    num_chunks[i] = arg.numblocks
801        out_starts = [cached_cumsum(c, initial_zero=True) for c in out.chunks]
802
803        block_info_name = "block-info-" + out.name
804        block_info_dsk = {}
805        for block_id in product(*(range(len(c)) for c in out.chunks)):
806            # Get position of chunk, indexed by axis labels
807            location = {out_ind[i]: loc for i, loc in enumerate(block_id)}
808            info = {}
809            for i, shape in shapes.items():
810                # Compute chunk key in the array, taking broadcasting into
811                # account. We don't directly know which dimensions are
812                # broadcast, but any dimension with only one chunk can be
813                # treated as broadcast.
814                arr_k = tuple(
815                    location.get(ind, 0) if num_chunks[i][j] > 1 else 0
816                    for j, ind in enumerate(argpairs[i][1])
817                )
818                info[i] = {
819                    "shape": shape,
820                    "num-chunks": num_chunks[i],
821                    "array-location": [
822                        (starts[i][ij][j], starts[i][ij][j + 1])
823                        for ij, j in enumerate(arr_k)
824                    ],
825                    "chunk-location": arr_k,
826                }
827
828            info[None] = {
829                "shape": out.shape,
830                "num-chunks": out.numblocks,
831                "array-location": [
832                    (out_starts[ij][j], out_starts[ij][j + 1])
833                    for ij, j in enumerate(block_id)
834                ],
835                "chunk-location": block_id,
836                "chunk-shape": tuple(
837                    out.chunks[ij][j] for ij, j in enumerate(block_id)
838                ),
839                "dtype": dtype,
840            }
841            block_info_dsk[(block_info_name,) + block_id] = info
842
843        block_info = Array(
844            block_info_dsk,
845            block_info_name,
846            chunks=tuple((1,) * len(c) for c in out.chunks),
847            dtype=np.object_,
848        )
849        extra_argpairs.append((block_info, out_ind))
850        extra_names.append("block_info")
851
852    if extra_argpairs:
853        # Rewrite the Blockwise layer. It would be nice to find a way to
854        # avoid doing it twice, but it's currently needed to determine
855        # out.chunks from the first pass. Since it constructs a Blockwise
856        # rather than an expanded graph, it shouldn't be too expensive.
857        out = blockwise(
858            _pass_extra_kwargs,
859            out_ind,
860            func,
861            None,
862            tuple(extra_names),
863            None,
864            *concat(extra_argpairs),
865            *concat(argpairs),
866            name=out.name,
867            dtype=out.dtype,
868            concatenate=True,
869            align_arrays=False,
870            adjust_chunks=dict(zip(out_ind, out.chunks)),
871            meta=meta,
872            **kwargs,
873        )
874
875    return out
876
877
878def broadcast_chunks(*chunkss):
879    """Construct a chunks tuple that broadcasts many chunks tuples
880
881    >>> a = ((5, 5),)
882    >>> b = ((5, 5),)
883    >>> broadcast_chunks(a, b)
884    ((5, 5),)
885
886    >>> a = ((10, 10, 10), (5, 5),)
887    >>> b = ((5, 5),)
888    >>> broadcast_chunks(a, b)
889    ((10, 10, 10), (5, 5))
890
891    >>> a = ((10, 10, 10), (5, 5),)
892    >>> b = ((1,), (5, 5),)
893    >>> broadcast_chunks(a, b)
894    ((10, 10, 10), (5, 5))
895
896    >>> a = ((10, 10, 10), (5, 5),)
897    >>> b = ((3, 3,), (5, 5),)
898    >>> broadcast_chunks(a, b)
899    Traceback (most recent call last):
900        ...
901    ValueError: Chunks do not align: [(10, 10, 10), (3, 3)]
902    """
903    if not chunkss:
904        return ()
905    elif len(chunkss) == 1:
906        return chunkss[0]
907    n = max(map(len, chunkss))
908    chunkss2 = [((1,),) * (n - len(c)) + c for c in chunkss]
909    result = []
910    for i in range(n):
911        step1 = [c[i] for c in chunkss2]
912        if all(c == (1,) for c in step1):
913            step2 = step1
914        else:
915            step2 = [c for c in step1 if c != (1,)]
916        if len(set(step2)) != 1:
917            raise ValueError("Chunks do not align: %s" % str(step2))
918        result.append(step2[0])
919    return tuple(result)
920
921
922def store(
923    sources: Array | Collection[Array],
924    targets,
925    lock: bool | Lock = True,
926    regions: tuple[slice, ...] | Collection[tuple[slice, ...]] | None = None,
927    compute: bool = True,
928    return_stored: bool = False,
929    **kwargs,
930):
931    """Store dask arrays in array-like objects, overwrite data in target
932
933    This stores dask arrays into object that supports numpy-style setitem
934    indexing.  It stores values chunk by chunk so that it does not have to
935    fill up memory.  For best performance you can align the block size of
936    the storage target with the block size of your array.
937
938    If your data fits in memory then you may prefer calling
939    ``np.array(myarray)`` instead.
940
941    Parameters
942    ----------
943
944    sources: Array or collection of Arrays
945    targets: array-like or Delayed or collection of array-likes and/or Delayeds
946        These should support setitem syntax ``target[10:20] = ...``
947    lock: boolean or threading.Lock, optional
948        Whether or not to lock the data stores while storing.
949        Pass True (lock each file individually), False (don't lock) or a
950        particular :class:`threading.Lock` object to be shared among all writes.
951    regions: tuple of slices or collection of tuples of slices
952        Each ``region`` tuple in ``regions`` should be such that
953        ``target[region].shape = source.shape``
954        for the corresponding source and target in sources and targets,
955        respectively. If this is a tuple, the contents will be assumed to be
956        slices, so do not provide a tuple of tuples.
957    compute: boolean, optional
958        If true compute immediately; return :class:`dask.delayed.Delayed` otherwise.
959    return_stored: boolean, optional
960        Optionally return the stored result (default False).
961    kwargs:
962        Parameters passed to compute/persist (only used if compute=True)
963
964    Returns
965    -------
966
967    If return_stored=True
968        tuple of Arrays
969    If return_stored=False and compute=True
970        None
971    If return_stored=False and compute=False
972        Delayed
973
974    Examples
975    --------
976
977    >>> import h5py  # doctest: +SKIP
978    >>> f = h5py.File('myfile.hdf5', mode='a')  # doctest: +SKIP
979    >>> dset = f.create_dataset('/data', shape=x.shape,
980    ...                                  chunks=x.chunks,
981    ...                                  dtype='f8')  # doctest: +SKIP
982
983    >>> store(x, dset)  # doctest: +SKIP
984
985    Alternatively store many arrays at the same time
986
987    >>> store([x, y, z], [dset1, dset2, dset3])  # doctest: +SKIP
988    """
989
990    if isinstance(sources, Array):
991        sources = [sources]
992        targets = [targets]
993
994    if any(not isinstance(s, Array) for s in sources):
995        raise ValueError("All sources must be dask array objects")
996
997    if len(sources) != len(targets):
998        raise ValueError(
999            "Different number of sources [%d] and targets [%d]"
1000            % (len(sources), len(targets))
1001        )
1002
1003    if isinstance(regions, tuple) or regions is None:
1004        regions = [regions]
1005
1006    if len(sources) > 1 and len(regions) == 1:
1007        regions *= len(sources)
1008
1009    if len(sources) != len(regions):
1010        raise ValueError(
1011            "Different number of sources [%d] and targets [%d] than regions [%d]"
1012            % (len(sources), len(targets), len(regions))
1013        )
1014
1015    # Optimize all sources together
1016    sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources])
1017    sources_layer = Array.__dask_optimize__(
1018        sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources]))
1019    )
1020    sources_name = "store-sources-" + tokenize(sources)
1021    layers = {sources_name: sources_layer}
1022    dependencies = {sources_name: set()}
1023
1024    # Optimize all targets together
1025    targets_keys = []
1026    targets_dsks = []
1027    for t in targets:
1028        if isinstance(t, Delayed):
1029            targets_keys.append(t.key)
1030            targets_dsks.append(t.__dask_graph__())
1031        elif is_dask_collection(t):
1032            raise TypeError("Targets must be either Delayed objects or array-likes")
1033
1034    if targets_dsks:
1035        targets_hlg = HighLevelGraph.merge(*targets_dsks)
1036        targets_layer = Delayed.__dask_optimize__(targets_hlg, targets_keys)
1037        targets_name = "store-targets-" + tokenize(targets_keys)
1038        layers[targets_name] = targets_layer
1039        dependencies[targets_name] = set()
1040
1041    load_stored = return_stored and not compute
1042
1043    map_names = [
1044        "store-map-" + tokenize(s, t if isinstance(t, Delayed) else id(t), r)
1045        for s, t, r in zip(sources, targets, regions)
1046    ]
1047    map_keys = []
1048
1049    for s, t, n, r in zip(sources, targets, map_names, regions):
1050        map_layer = insert_to_ooc(
1051            keys=s.__dask_keys__(),
1052            chunks=s.chunks,
1053            out=t.key if isinstance(t, Delayed) else t,
1054            name=n,
1055            lock=lock,
1056            region=r,
1057            return_stored=return_stored,
1058            load_stored=load_stored,
1059        )
1060        layers[n] = map_layer
1061        if isinstance(t, Delayed):
1062            dependencies[n] = {sources_name, targets_name}
1063        else:
1064            dependencies[n] = {sources_name}
1065        map_keys += map_layer.keys()
1066
1067    if return_stored:
1068        store_dsk = HighLevelGraph(layers, dependencies)
1069        load_store_dsk = store_dsk
1070        if compute:
1071            store_dlyds = [Delayed(k, store_dsk) for k in map_keys]
1072            store_dlyds = persist(*store_dlyds, **kwargs)
1073            store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds])
1074            load_store_dsk = retrieve_from_ooc(map_keys, store_dsk, store_dsk_2)
1075            map_names = ["load-" + n for n in map_names]
1076
1077        return tuple(
1078            Array(load_store_dsk, n, s.chunks, meta=s)
1079            for s, n in zip(sources, map_names)
1080        )
1081
1082    elif compute:
1083        store_dsk = HighLevelGraph(layers, dependencies)
1084        compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
1085        return None
1086
1087    else:
1088        key = "store-" + tokenize(map_names)
1089        layers[key] = {key: map_keys}
1090        dependencies[key] = set(map_names)
1091        store_dsk = HighLevelGraph(layers, dependencies)
1092        return Delayed(key, store_dsk)
1093
1094
1095def blockdims_from_blockshape(shape, chunks):
1096    """
1097
1098    >>> blockdims_from_blockshape((10, 10), (4, 3))
1099    ((4, 4, 2), (3, 3, 3, 1))
1100    >>> blockdims_from_blockshape((10, 0), (4, 0))
1101    ((4, 4, 2), (0,))
1102    """
1103    if chunks is None:
1104        raise TypeError("Must supply chunks= keyword argument")
1105    if shape is None:
1106        raise TypeError("Must supply shape= keyword argument")
1107    if np.isnan(sum(shape)) or np.isnan(sum(chunks)):
1108        raise ValueError(
1109            "Array chunk sizes are unknown. shape: %s, chunks: %s%s"
1110            % (shape, chunks, unknown_chunk_message)
1111        )
1112    if not all(map(is_integer, chunks)):
1113        raise ValueError("chunks can only contain integers.")
1114    if not all(map(is_integer, shape)):
1115        raise ValueError("shape can only contain integers.")
1116    shape = tuple(map(int, shape))
1117    chunks = tuple(map(int, chunks))
1118    return tuple(
1119        ((bd,) * (d // bd) + ((d % bd,) if d % bd else ()) if d else (0,))
1120        for d, bd in zip(shape, chunks)
1121    )
1122
1123
1124def finalize(results):
1125    if not results:
1126        return concatenate3(results)
1127    results2 = results
1128    while isinstance(results2, (tuple, list)):
1129        if len(results2) > 1:
1130            return concatenate3(results)
1131        else:
1132            results2 = results2[0]
1133    return unpack_singleton(results)
1134
1135
1136CHUNKS_NONE_ERROR_MESSAGE = """
1137You must specify a chunks= keyword argument.
1138This specifies the chunksize of your array blocks.
1139
1140See the following documentation page for details:
1141  https://docs.dask.org/en/latest/array-creation.html#chunks
1142""".strip()
1143
1144
1145class Array(DaskMethodsMixin):
1146    """Parallel Dask Array
1147
1148    A parallel nd-array comprised of many numpy arrays arranged in a grid.
1149
1150    This constructor is for advanced uses only.  For normal use see the
1151    :func:`dask.array.from_array` function.
1152
1153    Parameters
1154    ----------
1155    dask : dict
1156        Task dependency graph
1157    name : string
1158        Name of array in dask
1159    shape : tuple of ints
1160        Shape of the entire array
1161    chunks: iterable of tuples
1162        block sizes along each dimension
1163    dtype : str or dtype
1164        Typecode or data-type for the new Dask Array
1165    meta : empty ndarray
1166        empty ndarray created with same NumPy backend, ndim and dtype as the
1167        Dask Array being created (overrides dtype)
1168
1169    See Also
1170    --------
1171    dask.array.from_array
1172    """
1173
1174    __slots__ = "dask", "__name", "_cached_keys", "__chunks", "_meta", "__dict__"
1175
1176    def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None):
1177        self = super().__new__(cls)
1178        assert isinstance(dask, Mapping)
1179        if not isinstance(dask, HighLevelGraph):
1180            dask = HighLevelGraph.from_collections(name, dask, dependencies=())
1181        self.dask = dask
1182        self._name = str(name)
1183        meta = meta_from_array(meta, dtype=dtype)
1184
1185        if (
1186            isinstance(chunks, str)
1187            or isinstance(chunks, tuple)
1188            and chunks
1189            and any(isinstance(c, str) for c in chunks)
1190        ):
1191            dt = meta.dtype
1192        else:
1193            dt = None
1194        self._chunks = normalize_chunks(chunks, shape, dtype=dt)
1195        if self.chunks is None:
1196            raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
1197        self._meta = meta_from_array(meta, ndim=self.ndim, dtype=dtype)
1198
1199        for plugin in config.get("array_plugins", ()):
1200            result = plugin(self)
1201            if result is not None:
1202                self = result
1203
1204        try:
1205            layer = self.dask.layers[name]
1206        except (AttributeError, KeyError):
1207            # self is no longer an Array after applying the plugins, OR
1208            # a plugin replaced the HighLevelGraph with a plain dict, OR
1209            # name is not the top layer's name (this can happen after the layer is
1210            # manipulated, to avoid a collision)
1211            pass
1212        else:
1213            if layer.collection_annotations is None:
1214                layer.collection_annotations = {
1215                    "shape": self.shape,
1216                    "dtype": self.dtype,
1217                    "chunksize": self.chunksize,
1218                    "chunks": self.chunks,
1219                    "type": typename(type(self)),
1220                    "chunk_type": typename(type(self._meta)),
1221                }
1222            else:
1223                layer.collection_annotations.update(
1224                    {
1225                        "shape": self.shape,
1226                        "dtype": self.dtype,
1227                        "chunksize": self.chunksize,
1228                        "chunks": self.chunks,
1229                        "type": typename(type(self)),
1230                        "chunk_type": typename(type(self._meta)),
1231                    }
1232                )
1233
1234        return self
1235
1236    def __reduce__(self):
1237        return (Array, (self.dask, self.name, self.chunks, self.dtype))
1238
1239    def __dask_graph__(self):
1240        return self.dask
1241
1242    def __dask_layers__(self):
1243        return (self.name,)
1244
1245    def __dask_keys__(self):
1246        if self._cached_keys is not None:
1247            return self._cached_keys
1248
1249        name, chunks, numblocks = self.name, self.chunks, self.numblocks
1250
1251        def keys(*args):
1252            if not chunks:
1253                return [(name,)]
1254            ind = len(args)
1255            if ind + 1 == len(numblocks):
1256                result = [(name,) + args + (i,) for i in range(numblocks[ind])]
1257            else:
1258                result = [keys(*(args + (i,))) for i in range(numblocks[ind])]
1259            return result
1260
1261        self._cached_keys = result = keys()
1262        return result
1263
1264    def __dask_tokenize__(self):
1265        return self.name
1266
1267    __dask_optimize__ = globalmethod(
1268        optimize, key="array_optimize", falsey=dont_optimize
1269    )
1270    __dask_scheduler__ = staticmethod(threaded.get)
1271
1272    def __dask_postcompute__(self):
1273        return finalize, ()
1274
1275    def __dask_postpersist__(self):
1276        return self._rebuild, ()
1277
1278    def _rebuild(self, dsk, *, rename=None):
1279        name = self._name
1280        if rename:
1281            name = rename.get(name, name)
1282        return Array(dsk, name, self.chunks, self.dtype, self._meta)
1283
1284    def _reset_cache(self, key=None):
1285        """
1286        Reset cached properties.
1287
1288        Parameters
1289        ----------
1290        key : str, optional
1291            Remove specified key. The default removes all items.
1292        """
1293        if key is None:
1294            self.__dict__.clear()
1295        else:
1296            self.__dict__.pop(key, None)
1297
1298    @cached_property
1299    def _key_array(self):
1300        return np.array(self.__dask_keys__(), dtype=object)
1301
1302    @cached_property
1303    def numblocks(self):
1304        return tuple(map(len, self.chunks))
1305
1306    @cached_property
1307    def npartitions(self):
1308        return reduce(mul, self.numblocks, 1)
1309
1310    def compute_chunk_sizes(self):
1311        """
1312        Compute the chunk sizes for a Dask array. This is especially useful
1313        when the chunk sizes are unknown (e.g., when indexing one Dask array
1314        with another).
1315
1316        Notes
1317        -----
1318        This function modifies the Dask array in-place.
1319
1320        Examples
1321        --------
1322        >>> import dask.array as da
1323        >>> import numpy as np
1324        >>> x = da.from_array([-2, -1, 0, 1, 2], chunks=2)
1325        >>> x.chunks
1326        ((2, 2, 1),)
1327        >>> y = x[x <= 0]
1328        >>> y.chunks
1329        ((nan, nan, nan),)
1330        >>> y.compute_chunk_sizes()  # in-place computation
1331        dask.array<getitem, shape=(3,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>
1332        >>> y.chunks
1333        ((2, 1, 0),)
1334
1335        """
1336        x = self
1337        chunk_shapes = x.map_blocks(
1338            _get_chunk_shape,
1339            dtype=int,
1340            chunks=tuple(len(c) * (1,) for c in x.chunks) + ((x.ndim,),),
1341            new_axis=x.ndim,
1342        )
1343
1344        c = []
1345        for i in range(x.ndim):
1346            s = x.ndim * [0] + [i]
1347            s[i] = slice(None)
1348            s = tuple(s)
1349
1350            c.append(tuple(chunk_shapes[s]))
1351
1352        # `map_blocks` assigns numpy dtypes
1353        # cast chunk dimensions back to python int before returning
1354        x._chunks = tuple(
1355            tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0]
1356        )
1357        return x
1358
1359    @cached_property
1360    def shape(self):
1361        return tuple(cached_cumsum(c, initial_zero=True)[-1] for c in self.chunks)
1362
1363    @property
1364    def chunksize(self):
1365        return tuple(max(c) for c in self.chunks)
1366
1367    @property
1368    def dtype(self):
1369        if isinstance(self._meta, tuple):
1370            dtype = self._meta[0].dtype
1371        else:
1372            dtype = self._meta.dtype
1373        return dtype
1374
1375    @property
1376    def _chunks(self):
1377        """Non-public chunks property. Allows setting a chunk value."""
1378        return self.__chunks
1379
1380    @_chunks.setter
1381    def _chunks(self, chunks):
1382        self.__chunks = chunks
1383
1384        # When the chunks changes the cached properties that was
1385        # dependent on it needs to be deleted:
1386        for key in ["numblocks", "npartitions", "shape", "ndim", "size", "_key_array"]:
1387            self._reset_cache(key)
1388
1389    @property
1390    def chunks(self):
1391        """Chunks property."""
1392        return self.__chunks
1393
1394    @chunks.setter
1395    def chunks(self, chunks):
1396        raise TypeError(
1397            "Can not set chunks directly\n\n"
1398            "Please use the rechunk method instead:\n"
1399            f"  x.rechunk({chunks})\n\n"
1400            "If trying to avoid unknown chunks, use\n"
1401            "  x.compute_chunk_sizes()"
1402        )
1403
1404    def __len__(self):
1405        if not self.chunks:
1406            raise TypeError("len() of unsized object")
1407        return sum(self.chunks[0])
1408
1409    def __array_ufunc__(self, numpy_ufunc, method, *inputs, **kwargs):
1410        out = kwargs.get("out", ())
1411        for x in inputs + out:
1412            if _should_delegate(x):
1413                return NotImplemented
1414
1415        if method == "__call__":
1416            if numpy_ufunc is np.matmul:
1417                from .routines import matmul
1418
1419                # special case until apply_gufunc handles optional dimensions
1420                return matmul(*inputs, **kwargs)
1421            if numpy_ufunc.signature is not None:
1422                from .gufunc import apply_gufunc
1423
1424                return apply_gufunc(
1425                    numpy_ufunc, numpy_ufunc.signature, *inputs, **kwargs
1426                )
1427            if numpy_ufunc.nout > 1:
1428                from . import ufunc
1429
1430                try:
1431                    da_ufunc = getattr(ufunc, numpy_ufunc.__name__)
1432                except AttributeError:
1433                    return NotImplemented
1434                return da_ufunc(*inputs, **kwargs)
1435            else:
1436                return elemwise(numpy_ufunc, *inputs, **kwargs)
1437        elif method == "outer":
1438            from . import ufunc
1439
1440            try:
1441                da_ufunc = getattr(ufunc, numpy_ufunc.__name__)
1442            except AttributeError:
1443                return NotImplemented
1444            return da_ufunc.outer(*inputs, **kwargs)
1445        else:
1446            return NotImplemented
1447
1448    def __repr__(self):
1449        """
1450
1451        >>> import dask.array as da
1452        >>> da.ones((10, 10), chunks=(5, 5), dtype='i4')
1453        dask.array<..., shape=(10, 10), dtype=int32, chunksize=(5, 5), chunktype=numpy.ndarray>
1454        """
1455        chunksize = str(self.chunksize)
1456        name = self.name.rsplit("-", 1)[0]
1457        return (
1458            "dask.array<{}, shape={}, dtype={}, chunksize={}, chunktype={}.{}>".format(
1459                name,
1460                self.shape,
1461                self.dtype,
1462                chunksize,
1463                type(self._meta).__module__.split(".")[0],
1464                type(self._meta).__name__,
1465            )
1466        )
1467
1468    def _repr_html_(self):
1469        try:
1470            grid = self.to_svg(size=config.get("array.svg.size", 120))
1471        except NotImplementedError:
1472            grid = ""
1473
1474        if "sparse" in typename(type(self._meta)):
1475            nbytes = None
1476            cbytes = None
1477        elif not math.isnan(self.nbytes):
1478            nbytes = format_bytes(self.nbytes)
1479            cbytes = format_bytes(np.prod(self.chunksize) * self.dtype.itemsize)
1480        else:
1481            nbytes = "unknown"
1482            cbytes = "unknown"
1483
1484        return get_template("array.html.j2").render(
1485            array=self,
1486            grid=grid,
1487            nbytes=nbytes,
1488            cbytes=cbytes,
1489        )
1490
1491    @cached_property
1492    def ndim(self):
1493        return len(self.shape)
1494
1495    @cached_property
1496    def size(self):
1497        """Number of elements in array"""
1498        return reduce(mul, self.shape, 1)
1499
1500    @property
1501    def nbytes(self):
1502        """Number of bytes in array"""
1503        return self.size * self.dtype.itemsize
1504
1505    @property
1506    def itemsize(self):
1507        """Length of one array element in bytes"""
1508        return self.dtype.itemsize
1509
1510    @property
1511    def _name(self):
1512        return self.__name
1513
1514    @_name.setter
1515    def _name(self, val):
1516        self.__name = val
1517        # Clear the key cache when the name is reset
1518        self._cached_keys = None
1519        self._reset_cache("_key_array")
1520
1521    @property
1522    def name(self):
1523        return self.__name
1524
1525    @name.setter
1526    def name(self, val):
1527        raise TypeError(
1528            "Cannot set name directly\n\n"
1529            "Name is used to relate the array to the task graph.\n"
1530            "It is uncommon to need to change it, but if you do\n"
1531            "please set ``._name``"
1532        )
1533
1534    def __iter__(self):
1535        for i in range(len(self)):
1536            yield self[i]
1537
1538    __array_priority__ = 11  # higher than numpy.ndarray and numpy.matrix
1539
1540    def __array__(self, dtype=None, **kwargs):
1541        x = self.compute()
1542        if dtype and x.dtype != dtype:
1543            x = x.astype(dtype)
1544        if not isinstance(x, np.ndarray):
1545            x = np.array(x)
1546        return x
1547
1548    def __array_function__(self, func, types, args, kwargs):
1549        import dask.array as module
1550
1551        def handle_nonmatching_names(func, args, kwargs):
1552            if func not in _HANDLED_FUNCTIONS:
1553                warnings.warn(
1554                    "The `{}` function is not implemented by Dask array. "
1555                    "You may want to use the da.map_blocks function "
1556                    "or something similar to silence this warning. "
1557                    "Your code may stop working in a future release.".format(
1558                        func.__module__ + "." + func.__name__
1559                    ),
1560                    FutureWarning,
1561                )
1562                # Need to convert to array object (e.g. numpy.ndarray or
1563                # cupy.ndarray) as needed, so we can call the NumPy function
1564                # again and it gets the chance to dispatch to the right
1565                # implementation.
1566                args, kwargs = compute(args, kwargs)
1567                return func(*args, **kwargs)
1568
1569            return _HANDLED_FUNCTIONS[func](*args, **kwargs)
1570
1571        # First, verify that all types are handled by Dask. Otherwise, return NotImplemented.
1572        if not all(type is Array or is_valid_chunk_type(type) for type in types):
1573            return NotImplemented
1574
1575        # Now try to find a matching function name.  If that doesn't work, we may
1576        # be dealing with an alias or a function that's simply not in the Dask API.
1577        # Handle aliases via the _HANDLED_FUNCTIONS dict mapping, and warn otherwise.
1578        for submodule in func.__module__.split(".")[1:]:
1579            try:
1580                module = getattr(module, submodule)
1581            except AttributeError:
1582                return handle_nonmatching_names(func, args, kwargs)
1583
1584        if not hasattr(module, func.__name__):
1585            return handle_nonmatching_names(func, args, kwargs)
1586
1587        da_func = getattr(module, func.__name__)
1588        if da_func is func:
1589            return handle_nonmatching_names(func, args, kwargs)
1590
1591        # If ``like`` is contained in ``da_func``'s signature, add ``like=self``
1592        # to the kwargs dictionary.
1593        if has_keyword(da_func, "like"):
1594            kwargs["like"] = self
1595
1596        return da_func(*args, **kwargs)
1597
1598    @property
1599    def _elemwise(self):
1600        return elemwise
1601
1602    @wraps(store)
1603    def store(self, target, **kwargs):
1604        r = store([self], [target], **kwargs)
1605
1606        if kwargs.get("return_stored", False):
1607            r = r[0]
1608
1609        return r
1610
1611    def to_svg(self, size=500):
1612        """Convert chunks from Dask Array into an SVG Image
1613
1614        Parameters
1615        ----------
1616        chunks: tuple
1617        size: int
1618            Rough size of the image
1619
1620        Examples
1621        --------
1622        >>> x.to_svg(size=500)  # doctest: +SKIP
1623
1624        Returns
1625        -------
1626        text: An svg string depicting the array as a grid of chunks
1627        """
1628        from .svg import svg
1629
1630        return svg(self.chunks, size=size)
1631
1632    def to_hdf5(self, filename, datapath, **kwargs):
1633        """Store array in HDF5 file
1634
1635        >>> x.to_hdf5('myfile.hdf5', '/x')  # doctest: +SKIP
1636
1637        Optionally provide arguments as though to ``h5py.File.create_dataset``
1638
1639        >>> x.to_hdf5('myfile.hdf5', '/x', compression='lzf', shuffle=True)  # doctest: +SKIP
1640
1641        See Also
1642        --------
1643        da.store
1644        h5py.File.create_dataset
1645        """
1646        return to_hdf5(filename, datapath, self, **kwargs)
1647
1648    def to_dask_dataframe(self, columns=None, index=None, meta=None):
1649        """Convert dask Array to dask Dataframe
1650
1651        Parameters
1652        ----------
1653        columns: list or string
1654            list of column names if DataFrame, single string if Series
1655        index : dask.dataframe.Index, optional
1656            An optional *dask* Index to use for the output Series or DataFrame.
1657
1658            The default output index depends on whether the array has any unknown
1659            chunks. If there are any unknown chunks, the output has ``None``
1660            for all the divisions (one per chunk). If all the chunks are known,
1661            a default index with known divsions is created.
1662
1663            Specifying ``index`` can be useful if you're conforming a Dask Array
1664            to an existing dask Series or DataFrame, and you would like the
1665            indices to match.
1666        meta : object, optional
1667            An optional `meta` parameter can be passed for dask
1668            to specify the concrete dataframe type to use for partitions of
1669            the Dask dataframe. By default, pandas DataFrame is used.
1670
1671        See Also
1672        --------
1673        dask.dataframe.from_dask_array
1674        """
1675        from ..dataframe import from_dask_array
1676
1677        return from_dask_array(self, columns=columns, index=index, meta=meta)
1678
1679    def __bool__(self):
1680        if self.size > 1:
1681            raise ValueError(
1682                f"The truth value of a {self.__class__.__name__} is ambiguous. "
1683                "Use a.any() or a.all()."
1684            )
1685        else:
1686            return bool(self.compute())
1687
1688    __nonzero__ = __bool__  # python 2
1689
1690    def _scalarfunc(self, cast_type):
1691        if self.size > 1:
1692            raise TypeError("Only length-1 arrays can be converted to Python scalars")
1693        else:
1694            return cast_type(self.compute())
1695
1696    def __int__(self):
1697        return self._scalarfunc(int)
1698
1699    __long__ = __int__  # python 2
1700
1701    def __float__(self):
1702        return self._scalarfunc(float)
1703
1704    def __complex__(self):
1705        return self._scalarfunc(complex)
1706
1707    def __index__(self):
1708        return self._scalarfunc(operator.index)
1709
1710    def __setitem__(self, key, value):
1711        if value is np.ma.masked:
1712            value = np.ma.masked_all(())
1713
1714        ## Use the "where" method for cases when key is an Array
1715        if isinstance(key, Array):
1716            from .routines import where
1717
1718            if isinstance(value, Array) and value.ndim > 1:
1719                raise ValueError("boolean index array should have 1 dimension")
1720            try:
1721                y = where(key, value, self)
1722            except ValueError as e:
1723                raise ValueError(
1724                    "Boolean index assignment in Dask "
1725                    "expects equally shaped arrays.\nExample: da1[da2] = da3 "
1726                    "where da1.shape == (4,), da2.shape == (4,) "
1727                    "and da3.shape == (4,)."
1728                ) from e
1729            self._meta = y._meta
1730            self.dask = y.dask
1731            self._name = y.name
1732            self._chunks = y.chunks
1733            return
1734
1735        if np.isnan(self.shape).any():
1736            raise ValueError(f"Arrays chunk sizes are unknown. {unknown_chunk_message}")
1737
1738        # Still here? Then apply the assignment to other type of
1739        # indices via the `setitem_array` function.
1740        value = asanyarray(value)
1741
1742        out = "setitem-" + tokenize(self, key, value)
1743        dsk = setitem_array(out, self, key, value)
1744
1745        meta = meta_from_array(self._meta)
1746        if np.isscalar(meta):
1747            meta = np.array(meta)
1748
1749        graph = HighLevelGraph.from_collections(out, dsk, dependencies=[self])
1750        y = Array(graph, out, chunks=self.chunks, dtype=self.dtype, meta=meta)
1751
1752        self._meta = y._meta
1753        self.dask = y.dask
1754        self._name = y.name
1755        self._chunks = y.chunks
1756
1757    def __getitem__(self, index):
1758        # Field access, e.g. x['a'] or x[['a', 'b']]
1759        if isinstance(index, str) or (
1760            isinstance(index, list) and index and all(isinstance(i, str) for i in index)
1761        ):
1762            if isinstance(index, str):
1763                dt = self.dtype[index]
1764            else:
1765                dt = np.dtype(
1766                    {
1767                        "names": index,
1768                        "formats": [self.dtype.fields[name][0] for name in index],
1769                        "offsets": [self.dtype.fields[name][1] for name in index],
1770                        "itemsize": self.dtype.itemsize,
1771                    }
1772                )
1773
1774            if dt.shape:
1775                new_axis = list(range(self.ndim, self.ndim + len(dt.shape)))
1776                chunks = self.chunks + tuple((i,) for i in dt.shape)
1777                return self.map_blocks(
1778                    getitem, index, dtype=dt.base, chunks=chunks, new_axis=new_axis
1779                )
1780            else:
1781                return self.map_blocks(getitem, index, dtype=dt)
1782
1783        if not isinstance(index, tuple):
1784            index = (index,)
1785
1786        from .slicing import (
1787            normalize_index,
1788            slice_with_bool_dask_array,
1789            slice_with_int_dask_array,
1790        )
1791
1792        index2 = normalize_index(index, self.shape)
1793        dependencies = {self.name}
1794        for i in index2:
1795            if isinstance(i, Array):
1796                dependencies.add(i.name)
1797
1798        if any(isinstance(i, Array) and i.dtype.kind in "iu" for i in index2):
1799            self, index2 = slice_with_int_dask_array(self, index2)
1800        if any(isinstance(i, Array) and i.dtype == bool for i in index2):
1801            self, index2 = slice_with_bool_dask_array(self, index2)
1802
1803        if all(isinstance(i, slice) and i == slice(None) for i in index2):
1804            return self
1805
1806        out = "getitem-" + tokenize(self, index2)
1807        dsk, chunks = slice_array(out, self.name, self.chunks, index2, self.itemsize)
1808
1809        graph = HighLevelGraph.from_collections(out, dsk, dependencies=[self])
1810
1811        meta = meta_from_array(self._meta, ndim=len(chunks))
1812        if np.isscalar(meta):
1813            meta = np.array(meta)
1814
1815        return Array(graph, out, chunks, meta=meta)
1816
1817    def _vindex(self, key):
1818        if not isinstance(key, tuple):
1819            key = (key,)
1820        if any(k is None for k in key):
1821            raise IndexError(
1822                "vindex does not support indexing with None (np.newaxis), "
1823                "got {}".format(key)
1824            )
1825        if all(isinstance(k, slice) for k in key):
1826            if all(
1827                k.indices(d) == slice(0, d).indices(d) for k, d in zip(key, self.shape)
1828            ):
1829                return self
1830            raise IndexError(
1831                "vindex requires at least one non-slice to vectorize over "
1832                "when the slices are not over the entire array (i.e, x[:]). "
1833                "Use normal slicing instead when only using slices. Got: {}".format(key)
1834            )
1835        return _vindex(self, *key)
1836
1837    @property
1838    def vindex(self):
1839        """Vectorized indexing with broadcasting.
1840
1841        This is equivalent to numpy's advanced indexing, using arrays that are
1842        broadcast against each other. This allows for pointwise indexing:
1843
1844        >>> import dask.array as da
1845        >>> x = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
1846        >>> x = da.from_array(x, chunks=2)
1847        >>> x.vindex[[0, 1, 2], [0, 1, 2]].compute()
1848        array([1, 5, 9])
1849
1850        Mixed basic/advanced indexing with slices/arrays is also supported. The
1851        order of dimensions in the result follows those proposed for
1852        `ndarray.vindex <https://github.com/numpy/numpy/pull/6256>`_:
1853        the subspace spanned by arrays is followed by all slices.
1854
1855        Note: ``vindex`` provides more general functionality than standard
1856        indexing, but it also has fewer optimizations and can be significantly
1857        slower.
1858        """
1859        return IndexCallable(self._vindex)
1860
1861    @property
1862    def blocks(self):
1863        """An array-like interface to the blocks of an array.
1864
1865        This returns a ``Blockview`` object that provides an array-like interface
1866        to the blocks of a dask array.  Numpy-style indexing of a ``Blockview`` object
1867        returns a selection of blocks as a new dask array.
1868
1869        You can index ``array.blocks`` like a numpy array of shape
1870        equal to the number of blocks in each dimension, (available as
1871        array.blocks.size).  The dimensionality of the output array matches
1872        the dimension of this array, even if integer indices are passed.
1873        Slicing with ``np.newaxis`` or multiple lists is not supported.
1874
1875        Examples
1876        --------
1877        >>> import dask.array as da
1878        >>> x = da.arange(8, chunks=2)
1879        >>> x.blocks.shape # aliases x.numblocks
1880        (4,)
1881        >>> x.blocks[0].compute()
1882        array([0, 1])
1883        >>> x.blocks[:3].compute()
1884        array([0, 1, 2, 3, 4, 5])
1885        >>> x.blocks[::2].compute()
1886        array([0, 1, 4, 5])
1887        >>> x.blocks[[-1, 0]].compute()
1888        array([6, 7, 0, 1])
1889        >>> x.blocks.ravel() # doctest: +NORMALIZE_WHITESPACE
1890        [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1891         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1892         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1893         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>]
1894
1895        Returns
1896        -------
1897        An instance of ``dask.array.Blockview``
1898        """
1899        return BlockView(self)
1900
1901    @property
1902    def partitions(self):
1903        """Slice an array by partitions. Alias of dask array .blocks attribute.
1904
1905        This alias allows you to write agnostic code that works with both
1906        dask arrays and dask dataframes.
1907
1908        This returns a ``Blockview`` object that provides an array-like interface
1909        to the blocks of a dask array.  Numpy-style indexing of a ``Blockview`` object
1910        returns a selection of blocks as a new dask array.
1911
1912        You can index ``array.blocks`` like a numpy array of shape
1913        equal to the number of blocks in each dimension, (available as
1914        array.blocks.size).  The dimensionality of the output array matches
1915        the dimension of this array, even if integer indices are passed.
1916        Slicing with ``np.newaxis`` or multiple lists is not supported.
1917
1918        Examples
1919        --------
1920        >>> import dask.array as da
1921        >>> x = da.arange(8, chunks=2)
1922        >>> x.partitions.shape # aliases x.numblocks
1923        (4,)
1924        >>> x.partitions[0].compute()
1925        array([0, 1])
1926        >>> x.partitions[:3].compute()
1927        array([0, 1, 2, 3, 4, 5])
1928        >>> x.partitions[::2].compute()
1929        array([0, 1, 4, 5])
1930        >>> x.partitions[[-1, 0]].compute()
1931        array([6, 7, 0, 1])
1932        >>> x.partitions.ravel() # doctest: +NORMALIZE_WHITESPACE
1933        [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1934         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1935         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
1936         dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>]
1937
1938        Returns
1939        -------
1940        An instance of ``da.array.Blockview``
1941        """
1942        return self.blocks
1943
1944    @derived_from(np.ndarray)
1945    def dot(self, other):
1946        from .routines import tensordot
1947
1948        return tensordot(self, other, axes=((self.ndim - 1,), (other.ndim - 2,)))
1949
1950    @property
1951    def A(self):
1952        return self
1953
1954    @property
1955    def T(self):
1956        return self.transpose()
1957
1958    @derived_from(np.ndarray)
1959    def transpose(self, *axes):
1960        from .routines import transpose
1961
1962        if not axes:
1963            axes = None
1964        elif len(axes) == 1 and isinstance(axes[0], Iterable):
1965            axes = axes[0]
1966        if (axes == tuple(range(self.ndim))) or (axes == tuple(range(-self.ndim, 0))):
1967            # no transpose necessary
1968            return self
1969        else:
1970            return transpose(self, axes=axes)
1971
1972    @derived_from(np.ndarray)
1973    def ravel(self):
1974        from .routines import ravel
1975
1976        return ravel(self)
1977
1978    flatten = ravel
1979
1980    @derived_from(np.ndarray)
1981    def choose(self, choices):
1982        from .routines import choose
1983
1984        return choose(self, choices)
1985
1986    @derived_from(np.ndarray)
1987    def reshape(self, *shape, merge_chunks=True):
1988        """
1989        .. note::
1990
1991           See :meth:`dask.array.reshape` for an explanation of
1992           the ``merge_chunks`` keyword.
1993        """
1994        from .reshape import reshape
1995
1996        if len(shape) == 1 and not isinstance(shape[0], Number):
1997            shape = shape[0]
1998        return reshape(self, shape, merge_chunks=merge_chunks)
1999
2000    def topk(self, k, axis=-1, split_every=None):
2001        """The top k elements of an array.
2002
2003        See :func:`dask.array.topk` for docstring.
2004
2005        """
2006        from .reductions import topk
2007
2008        return topk(self, k, axis=axis, split_every=split_every)
2009
2010    def argtopk(self, k, axis=-1, split_every=None):
2011        """The indices of the top k elements of an array.
2012
2013        See :func:`dask.array.argtopk` for docstring.
2014
2015        """
2016        from .reductions import argtopk
2017
2018        return argtopk(self, k, axis=axis, split_every=split_every)
2019
2020    def astype(self, dtype, **kwargs):
2021        """Copy of the array, cast to a specified type.
2022
2023        Parameters
2024        ----------
2025        dtype : str or dtype
2026            Typecode or data-type to which the array is cast.
2027        casting : {'no', 'equiv', 'safe', 'same_kind', 'unsafe'}, optional
2028            Controls what kind of data casting may occur. Defaults to 'unsafe'
2029            for backwards compatibility.
2030
2031            * 'no' means the data types should not be cast at all.
2032            * 'equiv' means only byte-order changes are allowed.
2033            * 'safe' means only casts which can preserve values are allowed.
2034            * 'same_kind' means only safe casts or casts within a kind,
2035                like float64 to float32, are allowed.
2036            * 'unsafe' means any data conversions may be done.
2037        copy : bool, optional
2038            By default, astype always returns a newly allocated array. If this
2039            is set to False and the `dtype` requirement is satisfied, the input
2040            array is returned instead of a copy.
2041        """
2042        # Scalars don't take `casting` or `copy` kwargs - as such we only pass
2043        # them to `map_blocks` if specified by user (different than defaults).
2044        extra = set(kwargs) - {"casting", "copy"}
2045        if extra:
2046            raise TypeError(
2047                f"astype does not take the following keyword arguments: {list(extra)}"
2048            )
2049        casting = kwargs.get("casting", "unsafe")
2050        dtype = np.dtype(dtype)
2051        if self.dtype == dtype:
2052            return self
2053        elif not np.can_cast(self.dtype, dtype, casting=casting):
2054            raise TypeError(
2055                f"Cannot cast array from {self.dtype!r} to {dtype!r} "
2056                f"according to the rule {casting!r}"
2057            )
2058        return self.map_blocks(chunk.astype, dtype=dtype, astype_dtype=dtype, **kwargs)
2059
2060    def __abs__(self):
2061        return elemwise(operator.abs, self)
2062
2063    @check_if_handled_given_other
2064    def __add__(self, other):
2065        return elemwise(operator.add, self, other)
2066
2067    @check_if_handled_given_other
2068    def __radd__(self, other):
2069        return elemwise(operator.add, other, self)
2070
2071    @check_if_handled_given_other
2072    def __and__(self, other):
2073        return elemwise(operator.and_, self, other)
2074
2075    @check_if_handled_given_other
2076    def __rand__(self, other):
2077        return elemwise(operator.and_, other, self)
2078
2079    @check_if_handled_given_other
2080    def __div__(self, other):
2081        return elemwise(operator.div, self, other)
2082
2083    @check_if_handled_given_other
2084    def __rdiv__(self, other):
2085        return elemwise(operator.div, other, self)
2086
2087    @check_if_handled_given_other
2088    def __eq__(self, other):
2089        return elemwise(operator.eq, self, other)
2090
2091    @check_if_handled_given_other
2092    def __gt__(self, other):
2093        return elemwise(operator.gt, self, other)
2094
2095    @check_if_handled_given_other
2096    def __ge__(self, other):
2097        return elemwise(operator.ge, self, other)
2098
2099    def __invert__(self):
2100        return elemwise(operator.invert, self)
2101
2102    @check_if_handled_given_other
2103    def __lshift__(self, other):
2104        return elemwise(operator.lshift, self, other)
2105
2106    @check_if_handled_given_other
2107    def __rlshift__(self, other):
2108        return elemwise(operator.lshift, other, self)
2109
2110    @check_if_handled_given_other
2111    def __lt__(self, other):
2112        return elemwise(operator.lt, self, other)
2113
2114    @check_if_handled_given_other
2115    def __le__(self, other):
2116        return elemwise(operator.le, self, other)
2117
2118    @check_if_handled_given_other
2119    def __mod__(self, other):
2120        return elemwise(operator.mod, self, other)
2121
2122    @check_if_handled_given_other
2123    def __rmod__(self, other):
2124        return elemwise(operator.mod, other, self)
2125
2126    @check_if_handled_given_other
2127    def __mul__(self, other):
2128        return elemwise(operator.mul, self, other)
2129
2130    @check_if_handled_given_other
2131    def __rmul__(self, other):
2132        return elemwise(operator.mul, other, self)
2133
2134    @check_if_handled_given_other
2135    def __ne__(self, other):
2136        return elemwise(operator.ne, self, other)
2137
2138    def __neg__(self):
2139        return elemwise(operator.neg, self)
2140
2141    @check_if_handled_given_other
2142    def __or__(self, other):
2143        return elemwise(operator.or_, self, other)
2144
2145    def __pos__(self):
2146        return self
2147
2148    @check_if_handled_given_other
2149    def __ror__(self, other):
2150        return elemwise(operator.or_, other, self)
2151
2152    @check_if_handled_given_other
2153    def __pow__(self, other):
2154        return elemwise(operator.pow, self, other)
2155
2156    @check_if_handled_given_other
2157    def __rpow__(self, other):
2158        return elemwise(operator.pow, other, self)
2159
2160    @check_if_handled_given_other
2161    def __rshift__(self, other):
2162        return elemwise(operator.rshift, self, other)
2163
2164    @check_if_handled_given_other
2165    def __rrshift__(self, other):
2166        return elemwise(operator.rshift, other, self)
2167
2168    @check_if_handled_given_other
2169    def __sub__(self, other):
2170        return elemwise(operator.sub, self, other)
2171
2172    @check_if_handled_given_other
2173    def __rsub__(self, other):
2174        return elemwise(operator.sub, other, self)
2175
2176    @check_if_handled_given_other
2177    def __truediv__(self, other):
2178        return elemwise(operator.truediv, self, other)
2179
2180    @check_if_handled_given_other
2181    def __rtruediv__(self, other):
2182        return elemwise(operator.truediv, other, self)
2183
2184    @check_if_handled_given_other
2185    def __floordiv__(self, other):
2186        return elemwise(operator.floordiv, self, other)
2187
2188    @check_if_handled_given_other
2189    def __rfloordiv__(self, other):
2190        return elemwise(operator.floordiv, other, self)
2191
2192    @check_if_handled_given_other
2193    def __xor__(self, other):
2194        return elemwise(operator.xor, self, other)
2195
2196    @check_if_handled_given_other
2197    def __rxor__(self, other):
2198        return elemwise(operator.xor, other, self)
2199
2200    @check_if_handled_given_other
2201    def __matmul__(self, other):
2202        from .routines import matmul
2203
2204        return matmul(self, other)
2205
2206    @check_if_handled_given_other
2207    def __rmatmul__(self, other):
2208        from .routines import matmul
2209
2210        return matmul(other, self)
2211
2212    @check_if_handled_given_other
2213    def __divmod__(self, other):
2214        from .ufunc import divmod
2215
2216        return divmod(self, other)
2217
2218    @check_if_handled_given_other
2219    def __rdivmod__(self, other):
2220        from .ufunc import divmod
2221
2222        return divmod(other, self)
2223
2224    @derived_from(np.ndarray)
2225    def any(self, axis=None, keepdims=False, split_every=None, out=None):
2226        from .reductions import any
2227
2228        return any(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out)
2229
2230    @derived_from(np.ndarray)
2231    def all(self, axis=None, keepdims=False, split_every=None, out=None):
2232        from .reductions import all
2233
2234        return all(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out)
2235
2236    @derived_from(np.ndarray)
2237    def min(self, axis=None, keepdims=False, split_every=None, out=None):
2238        from .reductions import min
2239
2240        return min(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out)
2241
2242    @derived_from(np.ndarray)
2243    def max(self, axis=None, keepdims=False, split_every=None, out=None):
2244        from .reductions import max
2245
2246        return max(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out)
2247
2248    @derived_from(np.ndarray)
2249    def argmin(self, axis=None, split_every=None, out=None):
2250        from .reductions import argmin
2251
2252        return argmin(self, axis=axis, split_every=split_every, out=out)
2253
2254    @derived_from(np.ndarray)
2255    def argmax(self, axis=None, split_every=None, out=None):
2256        from .reductions import argmax
2257
2258        return argmax(self, axis=axis, split_every=split_every, out=out)
2259
2260    @derived_from(np.ndarray)
2261    def sum(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
2262        from .reductions import sum
2263
2264        return sum(
2265            self,
2266            axis=axis,
2267            dtype=dtype,
2268            keepdims=keepdims,
2269            split_every=split_every,
2270            out=out,
2271        )
2272
2273    @derived_from(np.ndarray)
2274    def trace(self, offset=0, axis1=0, axis2=1, dtype=None):
2275        from .reductions import trace
2276
2277        return trace(self, offset=offset, axis1=axis1, axis2=axis2, dtype=dtype)
2278
2279    @derived_from(np.ndarray)
2280    def prod(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
2281        from .reductions import prod
2282
2283        return prod(
2284            self,
2285            axis=axis,
2286            dtype=dtype,
2287            keepdims=keepdims,
2288            split_every=split_every,
2289            out=out,
2290        )
2291
2292    @derived_from(np.ndarray)
2293    def mean(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
2294        from .reductions import mean
2295
2296        return mean(
2297            self,
2298            axis=axis,
2299            dtype=dtype,
2300            keepdims=keepdims,
2301            split_every=split_every,
2302            out=out,
2303        )
2304
2305    @derived_from(np.ndarray)
2306    def std(
2307        self, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None, out=None
2308    ):
2309        from .reductions import std
2310
2311        return std(
2312            self,
2313            axis=axis,
2314            dtype=dtype,
2315            keepdims=keepdims,
2316            ddof=ddof,
2317            split_every=split_every,
2318            out=out,
2319        )
2320
2321    @derived_from(np.ndarray)
2322    def var(
2323        self, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None, out=None
2324    ):
2325        from .reductions import var
2326
2327        return var(
2328            self,
2329            axis=axis,
2330            dtype=dtype,
2331            keepdims=keepdims,
2332            ddof=ddof,
2333            split_every=split_every,
2334            out=out,
2335        )
2336
2337    def moment(
2338        self,
2339        order,
2340        axis=None,
2341        dtype=None,
2342        keepdims=False,
2343        ddof=0,
2344        split_every=None,
2345        out=None,
2346    ):
2347        """Calculate the nth centralized moment.
2348
2349        Parameters
2350        ----------
2351        order : int
2352            Order of the moment that is returned, must be >= 2.
2353        axis : int, optional
2354            Axis along which the central moment is computed. The default is to
2355            compute the moment of the flattened array.
2356        dtype : data-type, optional
2357            Type to use in computing the moment. For arrays of integer type the
2358            default is float64; for arrays of float types it is the same as the
2359            array type.
2360        keepdims : bool, optional
2361            If this is set to True, the axes which are reduced are left in the
2362            result as dimensions with size one. With this option, the result
2363            will broadcast correctly against the original array.
2364        ddof : int, optional
2365            "Delta Degrees of Freedom": the divisor used in the calculation is
2366            N - ddof, where N represents the number of elements. By default
2367            ddof is zero.
2368
2369        Returns
2370        -------
2371        moment : ndarray
2372
2373        References
2374        ----------
2375        .. [1] Pebay, Philippe (2008), "Formulas for Robust, One-Pass Parallel
2376           Computation of Covariances and Arbitrary-Order Statistical Moments",
2377           Technical Report SAND2008-6212, Sandia National Laboratories.
2378
2379        """
2380
2381        from .reductions import moment
2382
2383        return moment(
2384            self,
2385            order,
2386            axis=axis,
2387            dtype=dtype,
2388            keepdims=keepdims,
2389            ddof=ddof,
2390            split_every=split_every,
2391            out=out,
2392        )
2393
2394    @wraps(map_blocks)
2395    def map_blocks(self, func, *args, **kwargs):
2396        return map_blocks(func, self, *args, **kwargs)
2397
2398    def map_overlap(self, func, depth, boundary=None, trim=True, **kwargs):
2399        """Map a function over blocks of the array with some overlap
2400
2401        We share neighboring zones between blocks of the array, then map a
2402        function, then trim away the neighboring strips.
2403
2404        Note that this function will attempt to automatically determine the output
2405        array type before computing it, please refer to the ``meta`` keyword argument
2406        in :func:`map_blocks <dask.array.core.map_blocks>` if you expect that the function will not succeed when
2407        operating on 0-d arrays.
2408
2409        Parameters
2410        ----------
2411        func: function
2412            The function to apply to each extended block
2413        depth: int, tuple, or dict
2414            The number of elements that each block should share with its neighbors
2415            If a tuple or dict then this can be different per axis
2416        boundary: str, tuple, dict
2417            How to handle the boundaries.
2418            Values include 'reflect', 'periodic', 'nearest', 'none',
2419            or any constant value like 0 or np.nan
2420        trim: bool
2421            Whether or not to trim ``depth`` elements from each block after
2422            calling the map function.
2423            Set this to False if your mapping function already does this for you
2424        **kwargs:
2425            Other keyword arguments valid in :func:`map_blocks <dask.array.core.map_blocks>`.
2426
2427        Examples
2428        --------
2429        >>> import dask.array as da
2430        >>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
2431        >>> x = da.from_array(x, chunks=5)
2432        >>> def derivative(x):
2433        ...     return x - np.roll(x, 1)
2434
2435        >>> y = x.map_overlap(derivative, depth=1, boundary=0)
2436        >>> y.compute()
2437        array([ 1,  0,  1,  1,  0,  0, -1, -1,  0])
2438
2439        >>> import dask.array as da
2440        >>> x = np.arange(16).reshape((4, 4))
2441        >>> d = da.from_array(x, chunks=(2, 2))
2442        >>> d.map_overlap(lambda x: x + x.size, depth=1).compute()
2443        array([[16, 17, 18, 19],
2444               [20, 21, 22, 23],
2445               [24, 25, 26, 27],
2446               [28, 29, 30, 31]])
2447
2448        >>> func = lambda x: x + x.size
2449        >>> depth = {0: 1, 1: 1}
2450        >>> boundary = {0: 'reflect', 1: 'none'}
2451        >>> d.map_overlap(func, depth, boundary).compute()  # doctest: +NORMALIZE_WHITESPACE
2452        array([[12,  13,  14,  15],
2453               [16,  17,  18,  19],
2454               [20,  21,  22,  23],
2455               [24,  25,  26,  27]])
2456
2457        >>> x = np.arange(16).reshape((4, 4))
2458        >>> d = da.from_array(x, chunks=(2, 2))
2459        >>> y = d.map_overlap(lambda x: x + x[2], depth=1, meta=np.array(()))
2460        >>> y
2461        dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray>
2462        >>> y.compute()
2463        array([[ 4,  6,  8, 10],
2464               [ 8, 10, 12, 14],
2465               [20, 22, 24, 26],
2466               [24, 26, 28, 30]])
2467
2468        >>> import cupy  # doctest: +SKIP
2469        >>> x = cupy.arange(16).reshape((5, 4))  # doctest: +SKIP
2470        >>> d = da.from_array(x, chunks=(2, 2))  # doctest: +SKIP
2471        >>> y = d.map_overlap(lambda x: x + x[2], depth=1, meta=cupy.array(()))  # doctest: +SKIP
2472        >>> y  # doctest: +SKIP
2473        dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=cupy.ndarray>
2474        >>> y.compute()  # doctest: +SKIP
2475        array([[ 4,  6,  8, 10],
2476               [ 8, 10, 12, 14],
2477               [20, 22, 24, 26],
2478               [24, 26, 28, 30]])
2479        """
2480        from .overlap import map_overlap
2481
2482        return map_overlap(
2483            func, self, depth=depth, boundary=boundary, trim=trim, **kwargs
2484        )
2485
2486    @derived_from(np.ndarray)
2487    def cumsum(self, axis, dtype=None, out=None, *, method="sequential"):
2488        """Dask added an additional keyword-only argument ``method``.
2489
2490        method : {'sequential', 'blelloch'}, optional
2491            Choose which method to use to perform the cumsum.  Default is 'sequential'.
2492
2493            * 'sequential' performs the cumsum of each prior block before the current block.
2494            * 'blelloch' is a work-efficient parallel cumsum.  It exposes parallelism by
2495              first taking the sum of each block and combines the sums via a binary tree.
2496              This method may be faster or more memory efficient depending on workload,
2497              scheduler, and hardware.  More benchmarking is necessary.
2498        """
2499        from .reductions import cumsum
2500
2501        return cumsum(self, axis, dtype, out=out, method=method)
2502
2503    @derived_from(np.ndarray)
2504    def cumprod(self, axis, dtype=None, out=None, *, method="sequential"):
2505        """Dask added an additional keyword-only argument ``method``.
2506
2507        method : {'sequential', 'blelloch'}, optional
2508            Choose which method to use to perform the cumprod.  Default is 'sequential'.
2509
2510            * 'sequential' performs the cumprod of each prior block before the current block.
2511            * 'blelloch' is a work-efficient parallel cumprod.  It exposes parallelism by first
2512              taking the product of each block and combines the products via a binary tree.
2513              This method may be faster or more memory efficient depending on workload,
2514              scheduler, and hardware.  More benchmarking is necessary.
2515        """
2516        from .reductions import cumprod
2517
2518        return cumprod(self, axis, dtype, out=out, method=method)
2519
2520    @derived_from(np.ndarray)
2521    def squeeze(self, axis=None):
2522        from .routines import squeeze
2523
2524        return squeeze(self, axis)
2525
2526    def rechunk(
2527        self, chunks="auto", threshold=None, block_size_limit=None, balance=False
2528    ):
2529        """See da.rechunk for docstring"""
2530        from .rechunk import rechunk  # avoid circular import
2531
2532        return rechunk(self, chunks, threshold, block_size_limit, balance)
2533
2534    @property
2535    def real(self):
2536        from .ufunc import real
2537
2538        return real(self)
2539
2540    @property
2541    def imag(self):
2542        from .ufunc import imag
2543
2544        return imag(self)
2545
2546    def conj(self):
2547        from .ufunc import conj
2548
2549        return conj(self)
2550
2551    @derived_from(np.ndarray)
2552    def clip(self, min=None, max=None):
2553        from .ufunc import clip
2554
2555        return clip(self, min, max)
2556
2557    def view(self, dtype=None, order="C"):
2558        """Get a view of the array as a new data type
2559
2560        Parameters
2561        ----------
2562        dtype:
2563            The dtype by which to view the array.
2564            The default, None, results in the view having the same data-type
2565            as the original array.
2566        order: string
2567            'C' or 'F' (Fortran) ordering
2568
2569        This reinterprets the bytes of the array under a new dtype.  If that
2570        dtype does not have the same size as the original array then the shape
2571        will change.
2572
2573        Beware that both numpy and dask.array can behave oddly when taking
2574        shape-changing views of arrays under Fortran ordering.  Under some
2575        versions of NumPy this function will fail when taking shape-changing
2576        views of Fortran ordered arrays if the first dimension has chunks of
2577        size one.
2578        """
2579        if dtype is None:
2580            dtype = self.dtype
2581        else:
2582            dtype = np.dtype(dtype)
2583        mult = self.dtype.itemsize / dtype.itemsize
2584
2585        if order == "C":
2586            chunks = self.chunks[:-1] + (
2587                tuple(ensure_int(c * mult) for c in self.chunks[-1]),
2588            )
2589        elif order == "F":
2590            chunks = (
2591                tuple(ensure_int(c * mult) for c in self.chunks[0]),
2592            ) + self.chunks[1:]
2593        else:
2594            raise ValueError("Order must be one of 'C' or 'F'")
2595
2596        return self.map_blocks(
2597            chunk.view, dtype, order=order, dtype=dtype, chunks=chunks
2598        )
2599
2600    @derived_from(np.ndarray)
2601    def swapaxes(self, axis1, axis2):
2602        from .routines import swapaxes
2603
2604        return swapaxes(self, axis1, axis2)
2605
2606    @derived_from(np.ndarray)
2607    def round(self, decimals=0):
2608        from .routines import round
2609
2610        return round(self, decimals=decimals)
2611
2612    def copy(self):
2613        """
2614        Copy array.  This is a no-op for dask.arrays, which are immutable
2615        """
2616        if self.npartitions == 1:
2617            return self.map_blocks(M.copy)
2618        else:
2619            return Array(self.dask, self.name, self.chunks, meta=self)
2620
2621    def __deepcopy__(self, memo):
2622        c = self.copy()
2623        memo[id(self)] = c
2624        return c
2625
2626    def to_delayed(self, optimize_graph=True):
2627        """Convert into an array of :class:`dask.delayed.Delayed` objects, one per chunk.
2628
2629        Parameters
2630        ----------
2631        optimize_graph : bool, optional
2632            If True [default], the graph is optimized before converting into
2633            :class:`dask.delayed.Delayed` objects.
2634
2635        See Also
2636        --------
2637        dask.array.from_delayed
2638        """
2639        keys = self.__dask_keys__()
2640        graph = self.__dask_graph__()
2641        if optimize_graph:
2642            graph = self.__dask_optimize__(graph, keys)  # TODO, don't collape graph
2643            name = "delayed-" + self.name
2644            graph = HighLevelGraph.from_collections(name, graph, dependencies=())
2645        L = ndeepmap(self.ndim, lambda k: Delayed(k, graph), keys)
2646        return np.array(L, dtype=object)
2647
2648    @derived_from(np.ndarray)
2649    def repeat(self, repeats, axis=None):
2650        from .creation import repeat
2651
2652        return repeat(self, repeats, axis=axis)
2653
2654    @derived_from(np.ndarray)
2655    def nonzero(self):
2656        from .routines import nonzero
2657
2658        return nonzero(self)
2659
2660    def to_zarr(self, *args, **kwargs):
2661        """Save array to the zarr storage format
2662
2663        See https://zarr.readthedocs.io for details about the format.
2664
2665        See function :func:`dask.array.to_zarr` for parameters.
2666        """
2667        return to_zarr(self, *args, **kwargs)
2668
2669    def to_tiledb(self, uri, *args, **kwargs):
2670        """Save array to the TileDB storage manager
2671
2672        See function :func:`dask.array.to_tiledb` for argument documentation.
2673
2674        See https://docs.tiledb.io for details about the format and engine.
2675        """
2676        from .tiledb_io import to_tiledb
2677
2678        return to_tiledb(self, uri, *args, **kwargs)
2679
2680
2681def ensure_int(f):
2682    i = int(f)
2683    if i != f:
2684        raise ValueError("Could not coerce %f to integer" % f)
2685    return i
2686
2687
2688def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None):
2689    """Normalize chunks to tuple of tuples
2690
2691    This takes in a variety of input types and information and produces a full
2692    tuple-of-tuples result for chunks, suitable to be passed to Array or
2693    rechunk or any other operation that creates a Dask array.
2694
2695    Parameters
2696    ----------
2697    chunks: tuple, int, dict, or string
2698        The chunks to be normalized.  See examples below for more details
2699    shape: Tuple[int]
2700        The shape of the array
2701    limit: int (optional)
2702        The maximum block size to target in bytes,
2703        if freedom is given to choose
2704    dtype: np.dtype
2705    previous_chunks: Tuple[Tuple[int]] optional
2706        Chunks from a previous array that we should use for inspiration when
2707        rechunking auto dimensions.  If not provided but auto-chunking exists
2708        then auto-dimensions will prefer square-like chunk shapes.
2709
2710    Examples
2711    --------
2712    Specify uniform chunk sizes
2713
2714    >>> from dask.array.core import normalize_chunks
2715    >>> normalize_chunks((2, 2), shape=(5, 6))
2716    ((2, 2, 1), (2, 2, 2))
2717
2718    Also passes through fully explicit tuple-of-tuples
2719
2720    >>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6))
2721    ((2, 2, 1), (2, 2, 2))
2722
2723    Cleans up lists to tuples
2724
2725    >>> normalize_chunks([[2, 2], [3, 3]])
2726    ((2, 2), (3, 3))
2727
2728    Expands integer inputs 10 -> (10, 10)
2729
2730    >>> normalize_chunks(10, shape=(30, 5))
2731    ((10, 10, 10), (5,))
2732
2733    Expands dict inputs
2734
2735    >>> normalize_chunks({0: 2, 1: 3}, shape=(6, 6))
2736    ((2, 2, 2), (3, 3))
2737
2738    The values -1 and None get mapped to full size
2739
2740    >>> normalize_chunks((5, -1), shape=(10, 10))
2741    ((5, 5), (10,))
2742
2743    Use the value "auto" to automatically determine chunk sizes along certain
2744    dimensions.  This uses the ``limit=`` and ``dtype=`` keywords to
2745    determine how large to make the chunks.  The term "auto" can be used
2746    anywhere an integer can be used.  See array chunking documentation for more
2747    information.
2748
2749    >>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8')
2750    ((5, 5, 5, 5),)
2751
2752    You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of
2753    "auto" to ask for a particular size
2754
2755    >>> normalize_chunks("1kiB", shape=(2000,), dtype='float32')
2756    ((250, 250, 250, 250, 250, 250, 250, 250),)
2757
2758    Respects null dimensions
2759
2760    >>> normalize_chunks((), shape=(0, 0))
2761    ((0,), (0,))
2762    """
2763    if dtype and not isinstance(dtype, np.dtype):
2764        dtype = np.dtype(dtype)
2765    if chunks is None:
2766        raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
2767    if isinstance(chunks, list):
2768        chunks = tuple(chunks)
2769    if isinstance(chunks, (Number, str)):
2770        chunks = (chunks,) * len(shape)
2771    if isinstance(chunks, dict):
2772        chunks = tuple(chunks.get(i, None) for i in range(len(shape)))
2773    if isinstance(chunks, np.ndarray):
2774        chunks = chunks.tolist()
2775    if not chunks and shape and all(s == 0 for s in shape):
2776        chunks = ((0,),) * len(shape)
2777
2778    if (
2779        shape
2780        and len(shape) == 1
2781        and len(chunks) > 1
2782        and all(isinstance(c, (Number, str)) for c in chunks)
2783    ):
2784        chunks = (chunks,)
2785
2786    if shape and len(chunks) != len(shape):
2787        raise ValueError(
2788            "Chunks and shape must be of the same length/dimension. "
2789            "Got chunks=%s, shape=%s" % (chunks, shape)
2790        )
2791    if -1 in chunks or None in chunks:
2792        chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape))
2793
2794    # If specifying chunk size in bytes, use that value to set the limit.
2795    # Verify there is only one consistent value of limit or chunk-bytes used.
2796    for c in chunks:
2797        if isinstance(c, str) and c != "auto":
2798            parsed = parse_bytes(c)
2799            if limit is None:
2800                limit = parsed
2801            elif parsed != limit:
2802                raise ValueError(
2803                    "Only one consistent value of limit or chunk is allowed."
2804                    "Used %s != %s" % (parsed, limit)
2805                )
2806    # Substitute byte limits with 'auto' now that limit is set.
2807    chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks)
2808
2809    if any(c == "auto" for c in chunks):
2810        chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks)
2811
2812    if shape is not None:
2813        chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape))
2814
2815    if chunks and shape is not None:
2816        chunks = sum(
2817            (
2818                blockdims_from_blockshape((s,), (c,))
2819                if not isinstance(c, (tuple, list))
2820                else (c,)
2821                for s, c in zip(shape, chunks)
2822            ),
2823            (),
2824        )
2825    for c in chunks:
2826        if not c:
2827            raise ValueError(
2828                "Empty tuples are not allowed in chunks. Express "
2829                "zero length dimensions with 0(s) in chunks"
2830            )
2831
2832    if shape is not None:
2833        if len(chunks) != len(shape):
2834            raise ValueError(
2835                "Input array has %d dimensions but the supplied "
2836                "chunks has only %d dimensions" % (len(shape), len(chunks))
2837            )
2838        if not all(
2839            c == s or (math.isnan(c) or math.isnan(s))
2840            for c, s in zip(map(sum, chunks), shape)
2841        ):
2842            raise ValueError(
2843                "Chunks do not add up to shape. "
2844                "Got chunks=%s, shape=%s" % (chunks, shape)
2845            )
2846
2847    return tuple(tuple(int(x) if not math.isnan(x) else x for x in c) for c in chunks)
2848
2849
2850def _compute_multiplier(limit: int, dtype, largest_block: int, result):
2851    """
2852    Utility function for auto_chunk, to fin how much larger or smaller the ideal
2853    chunk size is relative to what we have now.
2854    """
2855    return (
2856        limit
2857        / dtype.itemsize
2858        / largest_block
2859        / np.prod(list(r if r != 0 else 1 for r in result.values()))
2860    )
2861
2862
2863def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None):
2864    """Determine automatic chunks
2865
2866    This takes in a chunks value that contains ``"auto"`` values in certain
2867    dimensions and replaces those values with concrete dimension sizes that try
2868    to get chunks to be of a certain size in bytes, provided by the ``limit=``
2869    keyword.  If multiple dimensions are marked as ``"auto"`` then they will
2870    all respond to meet the desired byte limit, trying to respect the aspect
2871    ratio of their dimensions in ``previous_chunks=``, if given.
2872
2873    Parameters
2874    ----------
2875    chunks: Tuple
2876        A tuple of either dimensions or tuples of explicit chunk dimensions
2877        Some entries should be "auto"
2878    shape: Tuple[int]
2879    limit: int, str
2880        The maximum allowable size of a chunk in bytes
2881    previous_chunks: Tuple[Tuple[int]]
2882
2883    See also
2884    --------
2885    normalize_chunks: for full docstring and parameters
2886    """
2887    if previous_chunks is not None:
2888        previous_chunks = tuple(
2889            c if isinstance(c, tuple) else (c,) for c in previous_chunks
2890        )
2891    chunks = list(chunks)
2892
2893    autos = {i for i, c in enumerate(chunks) if c == "auto"}
2894    if not autos:
2895        return tuple(chunks)
2896
2897    if limit is None:
2898        limit = config.get("array.chunk-size")
2899    if isinstance(limit, str):
2900        limit = parse_bytes(limit)
2901
2902    if dtype is None:
2903        raise TypeError("dtype must be known for auto-chunking")
2904
2905    if dtype.hasobject:
2906        raise NotImplementedError(
2907            "Can not use auto rechunking with object dtype. "
2908            "We are unable to estimate the size in bytes of object data"
2909        )
2910
2911    for x in tuple(chunks) + tuple(shape):
2912        if (
2913            isinstance(x, Number)
2914            and np.isnan(x)
2915            or isinstance(x, tuple)
2916            and np.isnan(x).any()
2917        ):
2918            raise ValueError(
2919                "Can not perform automatic rechunking with unknown "
2920                "(nan) chunk sizes.%s" % unknown_chunk_message
2921            )
2922
2923    limit = max(1, limit)
2924
2925    largest_block = np.prod(
2926        [cs if isinstance(cs, Number) else max(cs) for cs in chunks if cs != "auto"]
2927    )
2928
2929    if previous_chunks:
2930        # Base ideal ratio on the median chunk size of the previous chunks
2931        result = {a: np.median(previous_chunks[a]) for a in autos}
2932
2933        ideal_shape = []
2934        for i, s in enumerate(shape):
2935            chunk_frequencies = frequencies(previous_chunks[i])
2936            mode, count = max(chunk_frequencies.items(), key=lambda kv: kv[1])
2937            if mode > 1 and count >= len(previous_chunks[i]) / 2:
2938                ideal_shape.append(mode)
2939            else:
2940                ideal_shape.append(s)
2941
2942        # How much larger or smaller the ideal chunk size is relative to what we have now
2943        multiplier = _compute_multiplier(limit, dtype, largest_block, result)
2944
2945        last_multiplier = 0
2946        last_autos = set()
2947        while (
2948            multiplier != last_multiplier or autos != last_autos
2949        ):  # while things change
2950            last_multiplier = multiplier  # record previous values
2951            last_autos = set(autos)  # record previous values
2952
2953            # Expand or contract each of the dimensions appropriately
2954            for a in sorted(autos):
2955                if ideal_shape[a] == 0:
2956                    result[a] = 0
2957                    continue
2958                proposed = result[a] * multiplier ** (1 / len(autos))
2959                if proposed > shape[a]:  # we've hit the shape boundary
2960                    autos.remove(a)
2961                    largest_block *= shape[a]
2962                    chunks[a] = shape[a]
2963                    del result[a]
2964                else:
2965                    result[a] = round_to(proposed, ideal_shape[a])
2966
2967            # recompute how much multiplier we have left, repeat
2968            multiplier = _compute_multiplier(limit, dtype, largest_block, result)
2969
2970        for k, v in result.items():
2971            chunks[k] = v
2972        return tuple(chunks)
2973
2974    else:
2975        size = (limit / dtype.itemsize / largest_block) ** (1 / len(autos))
2976        small = [i for i in autos if shape[i] < size]
2977        if small:
2978            for i in small:
2979                chunks[i] = (shape[i],)
2980            return auto_chunks(chunks, shape, limit, dtype)
2981
2982        for i in autos:
2983            chunks[i] = round_to(size, shape[i])
2984
2985        return tuple(chunks)
2986
2987
2988def round_to(c, s):
2989    """Return a chunk dimension that is close to an even multiple or factor
2990
2991    We want values for c that are nicely aligned with s.
2992
2993    If c is smaller than s then we want the largest factor of s that is less than the
2994    desired chunk size, but not less than half, which is too much.  If no such
2995    factor exists then we just go with the original chunk size and accept an
2996    uneven chunk at the end.
2997
2998    If c is larger than s then we want the largest multiple of s that is still
2999    smaller than c.
3000    """
3001    if c <= s:
3002        try:
3003            return max(f for f in factors(s) if c / 2 <= f <= c)
3004        except ValueError:  # no matching factors within factor of two
3005            return max(1, int(c))
3006    else:
3007        return c // s * s
3008
3009
3010def _get_chunk_shape(a):
3011    s = np.asarray(a.shape, dtype=int)
3012    return s[len(s) * (None,) + (slice(None),)]
3013
3014
3015def from_array(
3016    x,
3017    chunks="auto",
3018    name=None,
3019    lock=False,
3020    asarray=None,
3021    fancy=True,
3022    getitem=None,
3023    meta=None,
3024    inline_array=False,
3025):
3026    """Create dask array from something that looks like an array.
3027
3028    Input must have a ``.shape``, ``.ndim``, ``.dtype`` and support numpy-style slicing.
3029
3030    Parameters
3031    ----------
3032    x : array_like
3033    chunks : int, tuple
3034        How to chunk the array. Must be one of the following forms:
3035
3036        - A blocksize like 1000.
3037        - A blockshape like (1000, 1000).
3038        - Explicit sizes of all blocks along all dimensions like
3039          ((1000, 1000, 500), (400, 400)).
3040        - A size in bytes, like "100 MiB" which will choose a uniform
3041          block-like shape
3042        - The word "auto" which acts like the above, but uses a configuration
3043          value ``array.chunk-size`` for the chunk size
3044
3045        -1 or None as a blocksize indicate the size of the corresponding
3046        dimension.
3047    name : str or bool, optional
3048        The key name to use for the array. Defaults to a hash of ``x``.
3049
3050        Hashing is useful if the same value of ``x`` is used to create multiple
3051        arrays, as Dask can then recognise that they're the same and
3052        avoid duplicate computations. However, it can also be slow, and if the
3053        array is not contiguous it is copied for hashing. If the array uses
3054        stride tricks (such as :func:`numpy.broadcast_to` or
3055        :func:`skimage.util.view_as_windows`) to have a larger logical
3056        than physical size, this copy can cause excessive memory usage.
3057
3058        If you don't need the deduplication provided by hashing, use
3059        ``name=False`` to generate a random name instead of hashing, which
3060        avoids the pitfalls described above. Using ``name=True`` is
3061        equivalent to the default.
3062
3063        By default, hashing uses python's standard sha1. This behaviour can be
3064        changed by installing cityhash, xxhash or murmurhash. If installed,
3065        a large-factor speedup can be obtained in the tokenisation step.
3066
3067        .. note::
3068
3069           Because this ``name`` is used as the key in task graphs, you should
3070           ensure that it uniquely identifies the data contained within. If
3071           you'd like to provide a descriptive name that is still unique, combine
3072           the descriptive name with :func:`dask.base.tokenize` of the
3073           ``array_like``. See :ref:`graphs` for more.
3074
3075    lock : bool or Lock, optional
3076        If ``x`` doesn't support concurrent reads then provide a lock here, or
3077        pass in True to have dask.array create one for you.
3078    asarray : bool, optional
3079        If True then call np.asarray on chunks to convert them to numpy arrays.
3080        If False then chunks are passed through unchanged.
3081        If None (default) then we use True if the ``__array_function__`` method
3082        is undefined.
3083    fancy : bool, optional
3084        If ``x`` doesn't support fancy indexing (e.g. indexing with lists or
3085        arrays) then set to False. Default is True.
3086    meta : Array-like, optional
3087        The metadata for the resulting dask array.  This is the kind of array
3088        that will result from slicing the input array.
3089        Defaults to the input array.
3090    inline_array : bool, default False
3091        How to include the array in the task graph. By default
3092        (``inline_array=False``) the array is included in a task by itself,
3093        and each chunk refers to that task by its key.
3094
3095        .. code-block:: python
3096
3097           >>> x = h5py.File("data.h5")["/x"]  # doctest: +SKIP
3098           >>> a = da.from_array(x, chunks=500)  # doctest: +SKIP
3099           >>> dict(a.dask)  # doctest: +SKIP
3100           {
3101              'array-original-<name>': <HDF5 dataset ...>,
3102              ('array-<name>', 0): (getitem, "array-original-<name>", ...),
3103              ('array-<name>', 1): (getitem, "array-original-<name>", ...)
3104           }
3105
3106        With ``inline_array=True``, Dask will instead inline the array directly
3107        in the values of the task graph.
3108
3109        .. code-block:: python
3110
3111           >>> a = da.from_array(x, chunks=500, inline_array=True)  # doctest: +SKIP
3112           >>> dict(a.dask)  # doctest: +SKIP
3113           {
3114              ('array-<name>', 0): (getitem, <HDF5 dataset ...>, ...),
3115              ('array-<name>', 1): (getitem, <HDF5 dataset ...>, ...)
3116           }
3117
3118        Note that there's no key in the task graph with just the array `x`
3119        anymore. Instead it's placed directly in the values.
3120
3121        The right choice for ``inline_array`` depends on several factors,
3122        including the size of ``x``, how expensive it is to create, which
3123        scheduler you're using, and the pattern of downstream computations.
3124        As a heuristic, ``inline_array=True`` may be the right choice when
3125        the array ``x`` is cheap to serialize and deserialize (since it's
3126        included in the graph many times) and if you're experiencing ordering
3127        issues (see :ref:`order` for more).
3128
3129        This has no effect when ``x`` is a NumPy array.
3130
3131    Examples
3132    --------
3133
3134    >>> x = h5py.File('...')['/data/path']  # doctest: +SKIP
3135    >>> a = da.from_array(x, chunks=(1000, 1000))  # doctest: +SKIP
3136
3137    If your underlying datastore does not support concurrent reads then include
3138    the ``lock=True`` keyword argument or ``lock=mylock`` if you want multiple
3139    arrays to coordinate around the same lock.
3140
3141    >>> a = da.from_array(x, chunks=(1000, 1000), lock=True)  # doctest: +SKIP
3142
3143    If your underlying datastore has a ``.chunks`` attribute (as h5py and zarr
3144    datasets do) then a multiple of that chunk shape will be used if you
3145    do not provide a chunk shape.
3146
3147    >>> a = da.from_array(x, chunks='auto')  # doctest: +SKIP
3148    >>> a = da.from_array(x, chunks='100 MiB')  # doctest: +SKIP
3149    >>> a = da.from_array(x)  # doctest: +SKIP
3150
3151    If providing a name, ensure that it is unique
3152
3153    >>> import dask.base
3154    >>> token = dask.base.tokenize(x)  # doctest: +SKIP
3155    >>> a = da.from_array('myarray-' + token)  # doctest: +SKIP
3156
3157    NumPy ndarrays are eagerly sliced and then embedded in the graph.
3158
3159    >>> import dask.array
3160    >>> a = dask.array.from_array(np.array([[1, 2], [3, 4]]), chunks=(1,1))
3161    >>> a.dask[a.name, 0, 0][0]
3162    array([1])
3163
3164    Chunks with exactly-specified, different sizes can be created.
3165
3166    >>> import numpy as np
3167    >>> import dask.array as da
3168    >>> x = np.random.random((100, 6))
3169    >>> a = da.from_array(x, chunks=((67, 33), (6,)))
3170    """
3171    if isinstance(x, Array):
3172        raise ValueError(
3173            "Array is already a dask array. Use 'asarray' or " "'rechunk' instead."
3174        )
3175    elif is_dask_collection(x):
3176        warnings.warn(
3177            "Passing an object to dask.array.from_array which is already a "
3178            "Dask collection. This can lead to unexpected behavior."
3179        )
3180
3181    if isinstance(x, (list, tuple, memoryview) + np.ScalarType):
3182        x = np.array(x)
3183
3184    if asarray is None:
3185        asarray = not hasattr(x, "__array_function__")
3186
3187    previous_chunks = getattr(x, "chunks", None)
3188
3189    chunks = normalize_chunks(
3190        chunks, x.shape, dtype=x.dtype, previous_chunks=previous_chunks
3191    )
3192
3193    if name in (None, True):
3194        token = tokenize(x, chunks)
3195        original_name = "array-original-" + token
3196        name = name or "array-" + token
3197    elif name is False:
3198        original_name = name = "array-" + str(uuid.uuid1())
3199    else:
3200        original_name = name
3201
3202    if lock is True:
3203        lock = SerializableLock()
3204
3205    is_ndarray = type(x) is np.ndarray
3206    is_single_block = all(len(c) == 1 for c in chunks)
3207    # Always use the getter for h5py etc. Not using isinstance(x, np.ndarray)
3208    # because np.matrix is a subclass of np.ndarray.
3209    if is_ndarray and not is_single_block and not lock:
3210        # eagerly slice numpy arrays to prevent memory blowup
3211        # GH5367, GH5601
3212        slices = slices_from_chunks(chunks)
3213        keys = product([name], *(range(len(bds)) for bds in chunks))
3214        values = [x[slc] for slc in slices]
3215        dsk = dict(zip(keys, values))
3216
3217    elif is_ndarray and is_single_block:
3218        # No slicing needed
3219        dsk = {(name,) + (0,) * x.ndim: x}
3220    else:
3221        if getitem is None:
3222            if fancy:
3223                getitem = getter
3224            else:
3225                getitem = getter_nofancy
3226
3227        if inline_array:
3228            get_from = x
3229        else:
3230            get_from = original_name
3231
3232        dsk = getem(
3233            get_from,
3234            chunks,
3235            getitem=getitem,
3236            shape=x.shape,
3237            out_name=name,
3238            lock=lock,
3239            asarray=asarray,
3240            dtype=x.dtype,
3241        )
3242        if not inline_array:
3243            dsk[original_name] = x
3244
3245    # Workaround for TileDB, its indexing is 1-based,
3246    # and doesn't seems to support 0-length slicing
3247    if x.__class__.__module__.split(".")[0] == "tiledb" and hasattr(x, "_ctx_"):
3248        return Array(dsk, name, chunks, dtype=x.dtype)
3249
3250    if meta is None:
3251        meta = x
3252
3253    return Array(dsk, name, chunks, meta=meta, dtype=getattr(x, "dtype", None))
3254
3255
3256def from_zarr(
3257    url,
3258    component=None,
3259    storage_options=None,
3260    chunks=None,
3261    name=None,
3262    inline_array=False,
3263    **kwargs,
3264):
3265    """Load array from the zarr storage format
3266
3267    See https://zarr.readthedocs.io for details about the format.
3268
3269    Parameters
3270    ----------
3271    url: Zarr Array or str or MutableMapping
3272        Location of the data. A URL can include a protocol specifier like s3://
3273        for remote data. Can also be any MutableMapping instance, which should
3274        be serializable if used in multiple processes.
3275    component: str or None
3276        If the location is a zarr group rather than an array, this is the
3277        subcomponent that should be loaded, something like ``'foo/bar'``.
3278    storage_options: dict
3279        Any additional parameters for the storage backend (ignored for local
3280        paths)
3281    chunks: tuple of ints or tuples of ints
3282        Passed to :func:`dask.array.from_array`, allows setting the chunks on
3283        initialisation, if the chunking scheme in the on-disc dataset is not
3284        optimal for the calculations to follow.
3285    name : str, optional
3286         An optional keyname for the array.  Defaults to hashing the input
3287    kwargs:
3288        Passed to :class:`zarr.core.Array`.
3289    inline_array : bool, default False
3290        Whether to inline the zarr Array in the values of the task graph.
3291        See :meth:`dask.array.from_array` for an explanation.
3292
3293    See Also
3294    --------
3295    from_array
3296    """
3297    import zarr
3298
3299    storage_options = storage_options or {}
3300    if isinstance(url, zarr.Array):
3301        z = url
3302    elif isinstance(url, (str, os.PathLike)):
3303        if isinstance(url, os.PathLike):
3304            url = os.fspath(url)
3305        mapper = get_mapper(url, **storage_options)
3306        z = zarr.Array(mapper, read_only=True, path=component, **kwargs)
3307    else:
3308        mapper = url
3309        z = zarr.Array(mapper, read_only=True, path=component, **kwargs)
3310    chunks = chunks if chunks is not None else z.chunks
3311    if name is None:
3312        name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs)
3313    return from_array(z, chunks, name=name, inline_array=inline_array)
3314
3315
3316def to_zarr(
3317    arr,
3318    url,
3319    component=None,
3320    storage_options=None,
3321    overwrite=False,
3322    compute=True,
3323    return_stored=False,
3324    **kwargs,
3325):
3326    """Save array to the zarr storage format
3327
3328    See https://zarr.readthedocs.io for details about the format.
3329
3330    Parameters
3331    ----------
3332    arr: dask.array
3333        Data to store
3334    url: Zarr Array or str or MutableMapping
3335        Location of the data. A URL can include a protocol specifier like s3://
3336        for remote data. Can also be any MutableMapping instance, which should
3337        be serializable if used in multiple processes.
3338    component: str or None
3339        If the location is a zarr group rather than an array, this is the
3340        subcomponent that should be created/over-written.
3341    storage_options: dict
3342        Any additional parameters for the storage backend (ignored for local
3343        paths)
3344    overwrite: bool
3345        If given array already exists, overwrite=False will cause an error,
3346        where overwrite=True will replace the existing data.
3347    compute: bool
3348        See :func:`~dask.array.store` for more details.
3349    return_stored: bool
3350        See :func:`~dask.array.store` for more details.
3351    **kwargs:
3352        Passed to the :func:`zarr.creation.create` function, e.g., compression options.
3353
3354    Raises
3355    ------
3356    ValueError
3357        If ``arr`` has unknown chunk sizes, which is not supported by Zarr.
3358
3359    See Also
3360    --------
3361    dask.array.store
3362    dask.array.Array.compute_chunk_sizes
3363
3364    """
3365    import zarr
3366
3367    if np.isnan(arr.shape).any():
3368        raise ValueError(
3369            "Saving a dask array with unknown chunk sizes is not "
3370            "currently supported by Zarr.%s" % unknown_chunk_message
3371        )
3372
3373    if isinstance(url, zarr.Array):
3374        z = url
3375        if isinstance(z.store, (dict, MutableMapping)) and "distributed" in config.get(
3376            "scheduler", ""
3377        ):
3378            raise RuntimeError(
3379                "Cannot store into in memory Zarr Array using "
3380                "the Distributed Scheduler."
3381            )
3382        arr = arr.rechunk(z.chunks)
3383        return arr.store(z, lock=False, compute=compute, return_stored=return_stored)
3384
3385    if not _check_regular_chunks(arr.chunks):
3386        raise ValueError(
3387            "Attempt to save array to zarr with irregular "
3388            "chunking, please call `arr.rechunk(...)` first."
3389        )
3390
3391    storage_options = storage_options or {}
3392
3393    if isinstance(url, str):
3394        mapper = get_mapper(url, **storage_options)
3395    else:
3396        # assume the object passed is already a mapper
3397        mapper = url
3398
3399    chunks = [c[0] for c in arr.chunks]
3400
3401    z = zarr.create(
3402        shape=arr.shape,
3403        chunks=chunks,
3404        dtype=arr.dtype,
3405        store=mapper,
3406        path=component,
3407        overwrite=overwrite,
3408        **kwargs,
3409    )
3410    return arr.store(z, lock=False, compute=compute, return_stored=return_stored)
3411
3412
3413def _check_regular_chunks(chunkset):
3414    """Check if the chunks are regular
3415
3416    "Regular" in this context means that along every axis, the chunks all
3417    have the same size, except the last one, which may be smaller
3418
3419    Parameters
3420    ----------
3421    chunkset: tuple of tuples of ints
3422        From the ``.chunks`` attribute of an ``Array``
3423
3424    Returns
3425    -------
3426    True if chunkset passes, else False
3427
3428    Examples
3429    --------
3430    >>> import dask.array as da
3431    >>> arr = da.zeros(10, chunks=(5, ))
3432    >>> _check_regular_chunks(arr.chunks)
3433    True
3434
3435    >>> arr = da.zeros(10, chunks=((3, 3, 3, 1), ))
3436    >>> _check_regular_chunks(arr.chunks)
3437    True
3438
3439    >>> arr = da.zeros(10, chunks=((3, 1, 3, 3), ))
3440    >>> _check_regular_chunks(arr.chunks)
3441    False
3442    """
3443    for chunks in chunkset:
3444        if len(chunks) == 1:
3445            continue
3446        if len(set(chunks[:-1])) > 1:
3447            return False
3448        if chunks[-1] > chunks[0]:
3449            return False
3450    return True
3451
3452
3453def from_delayed(value, shape, dtype=None, meta=None, name=None):
3454    """Create a dask array from a dask delayed value
3455
3456    This routine is useful for constructing dask arrays in an ad-hoc fashion
3457    using dask delayed, particularly when combined with stack and concatenate.
3458
3459    The dask array will consist of a single chunk.
3460
3461    Examples
3462    --------
3463    >>> import dask
3464    >>> import dask.array as da
3465    >>> import numpy as np
3466    >>> value = dask.delayed(np.ones)(5)
3467    >>> array = da.from_delayed(value, (5,), dtype=float)
3468    >>> array
3469    dask.array<from-value, shape=(5,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>
3470    >>> array.compute()
3471    array([1., 1., 1., 1., 1.])
3472    """
3473    from ..delayed import Delayed, delayed
3474
3475    if not isinstance(value, Delayed) and hasattr(value, "key"):
3476        value = delayed(value)
3477
3478    name = name or "from-value-" + tokenize(value, shape, dtype, meta)
3479    dsk = {(name,) + (0,) * len(shape): value.key}
3480    chunks = tuple((d,) for d in shape)
3481    # TODO: value._key may not be the name of the layer in value.dask
3482    # This should be fixed after we build full expression graphs
3483    graph = HighLevelGraph.from_collections(name, dsk, dependencies=[value])
3484    return Array(graph, name, chunks, dtype=dtype, meta=meta)
3485
3486
3487def from_func(func, shape, dtype=None, name=None, args=(), kwargs={}):
3488    """Create dask array in a single block by calling a function
3489
3490    Calling the provided function with func(*args, **kwargs) should return a
3491    NumPy array of the indicated shape and dtype.
3492
3493    Examples
3494    --------
3495
3496    >>> a = from_func(np.arange, (3,), dtype='i8', args=(3,))
3497    >>> a.compute()
3498    array([0, 1, 2])
3499
3500    This works particularly well when coupled with dask.array functions like
3501    concatenate and stack:
3502
3503    >>> arrays = [from_func(np.array, (), dtype='i8', args=(n,)) for n in range(5)]
3504    >>> stack(arrays).compute()
3505    array([0, 1, 2, 3, 4])
3506    """
3507    name = name or "from_func-" + tokenize(func, shape, dtype, args, kwargs)
3508    if args or kwargs:
3509        func = partial(func, *args, **kwargs)
3510    dsk = {(name,) + (0,) * len(shape): (func,)}
3511    chunks = tuple((i,) for i in shape)
3512    return Array(dsk, name, chunks, dtype)
3513
3514
3515def common_blockdim(blockdims):
3516    """Find the common block dimensions from the list of block dimensions
3517
3518    Currently only implements the simplest possible heuristic: the common
3519    block-dimension is the only one that does not span fully span a dimension.
3520    This is a conservative choice that allows us to avoid potentially very
3521    expensive rechunking.
3522
3523    Assumes that each element of the input block dimensions has all the same
3524    sum (i.e., that they correspond to dimensions of the same size).
3525
3526    Examples
3527    --------
3528    >>> common_blockdim([(3,), (2, 1)])
3529    (2, 1)
3530    >>> common_blockdim([(1, 2), (2, 1)])
3531    (1, 1, 1)
3532    >>> common_blockdim([(2, 2), (3, 1)])  # doctest: +SKIP
3533    Traceback (most recent call last):
3534        ...
3535    ValueError: Chunks do not align
3536    """
3537    if not any(blockdims):
3538        return ()
3539    non_trivial_dims = {d for d in blockdims if len(d) > 1}
3540    if len(non_trivial_dims) == 1:
3541        return first(non_trivial_dims)
3542    if len(non_trivial_dims) == 0:
3543        return max(blockdims, key=first)
3544
3545    if np.isnan(sum(map(sum, blockdims))):
3546        raise ValueError(
3547            "Arrays' chunk sizes (%s) are unknown.\n\n"
3548            "A possible solution:\n"
3549            "  x.compute_chunk_sizes()" % blockdims
3550        )
3551
3552    if len(set(map(sum, non_trivial_dims))) > 1:
3553        raise ValueError("Chunks do not add up to same value", blockdims)
3554
3555    # We have multiple non-trivial chunks on this axis
3556    # e.g. (5, 2) and (4, 3)
3557
3558    # We create a single chunk tuple with the same total length
3559    # that evenly divides both, e.g. (4, 1, 2)
3560
3561    # To accomplish this we walk down all chunk tuples together, finding the
3562    # smallest element, adding it to the output, and subtracting it from all
3563    # other elements and remove the element itself.  We stop once we have
3564    # burned through all of the chunk tuples.
3565    # For efficiency's sake we reverse the lists so that we can pop off the end
3566    rchunks = [list(ntd)[::-1] for ntd in non_trivial_dims]
3567    total = sum(first(non_trivial_dims))
3568    i = 0
3569
3570    out = []
3571    while i < total:
3572        m = min(c[-1] for c in rchunks)
3573        out.append(m)
3574        for c in rchunks:
3575            c[-1] -= m
3576            if c[-1] == 0:
3577                c.pop()
3578        i += m
3579
3580    return tuple(out)
3581
3582
3583def unify_chunks(*args, **kwargs):
3584    """
3585    Unify chunks across a sequence of arrays
3586
3587    This utility function is used within other common operations like
3588    :func:`dask.array.core.map_blocks` and :func:`dask.array.core.blockwise`.
3589    It is not commonly used by end-users directly.
3590
3591    Parameters
3592    ----------
3593    *args: sequence of Array, index pairs
3594        Sequence like (x, 'ij', y, 'jk', z, 'i')
3595
3596    Examples
3597    --------
3598    >>> import dask.array as da
3599    >>> x = da.ones(10, chunks=((5, 2, 3),))
3600    >>> y = da.ones(10, chunks=((2, 3, 5),))
3601    >>> chunkss, arrays = unify_chunks(x, 'i', y, 'i')
3602    >>> chunkss
3603    {'i': (2, 3, 2, 3)}
3604
3605    >>> x = da.ones((100, 10), chunks=(20, 5))
3606    >>> y = da.ones((10, 100), chunks=(4, 50))
3607    >>> chunkss, arrays = unify_chunks(x, 'ij', y, 'jk', 'constant', None)
3608    >>> chunkss  # doctest: +SKIP
3609    {'k': (50, 50), 'i': (20, 20, 20, 20, 20), 'j': (4, 1, 3, 2)}
3610
3611    >>> unify_chunks(0, None)
3612    ({}, [0])
3613
3614    Returns
3615    -------
3616    chunkss : dict
3617        Map like {index: chunks}.
3618    arrays : list
3619        List of rechunked arrays.
3620
3621    See Also
3622    --------
3623    common_blockdim
3624    """
3625    if not args:
3626        return {}, []
3627
3628    arginds = [
3629        (asanyarray(a) if ind is not None else a, ind) for a, ind in partition(2, args)
3630    ]  # [x, ij, y, jk]
3631    warn = kwargs.get("warn", True)
3632
3633    arrays, inds = zip(*arginds)
3634    if all(ind is None for ind in inds):
3635        return {}, list(arrays)
3636    if all(ind == inds[0] for ind in inds) and all(
3637        a.chunks == arrays[0].chunks for a in arrays
3638    ):
3639        return dict(zip(inds[0], arrays[0].chunks)), arrays
3640
3641    nameinds = []
3642    blockdim_dict = dict()
3643    max_parts = 0
3644    for a, ind in arginds:
3645        if ind is not None:
3646            nameinds.append((a.name, ind))
3647            blockdim_dict[a.name] = a.chunks
3648            max_parts = max(max_parts, a.npartitions)
3649        else:
3650            nameinds.append((a, ind))
3651
3652    chunkss = broadcast_dimensions(nameinds, blockdim_dict, consolidate=common_blockdim)
3653    nparts = np.prod(list(map(len, chunkss.values())))
3654
3655    if warn and nparts and nparts >= max_parts * 10:
3656        warnings.warn(
3657            "Increasing number of chunks by factor of %d" % (nparts / max_parts),
3658            PerformanceWarning,
3659            stacklevel=3,
3660        )
3661
3662    arrays = []
3663    for a, i in arginds:
3664        if i is None:
3665            arrays.append(a)
3666        else:
3667            chunks = tuple(
3668                chunkss[j]
3669                if a.shape[n] > 1
3670                else a.shape[n]
3671                if not np.isnan(sum(chunkss[j]))
3672                else None
3673                for n, j in enumerate(i)
3674            )
3675            if chunks != a.chunks and all(a.chunks):
3676                arrays.append(a.rechunk(chunks))
3677            else:
3678                arrays.append(a)
3679    return chunkss, arrays
3680
3681
3682def unpack_singleton(x):
3683    """
3684
3685    >>> unpack_singleton([[[[1]]]])
3686    1
3687    >>> unpack_singleton(np.array(np.datetime64('2000-01-01')))
3688    array('2000-01-01', dtype='datetime64[D]')
3689    """
3690    while isinstance(x, (list, tuple)):
3691        try:
3692            x = x[0]
3693        except (IndexError, TypeError, KeyError):
3694            break
3695    return x
3696
3697
3698def block(arrays, allow_unknown_chunksizes=False):
3699    """
3700    Assemble an nd-array from nested lists of blocks.
3701
3702    Blocks in the innermost lists are concatenated along the last
3703    dimension (-1), then these are concatenated along the second-last
3704    dimension (-2), and so on until the outermost list is reached
3705
3706    Blocks can be of any dimension, but will not be broadcasted using the normal
3707    rules. Instead, leading axes of size 1 are inserted, to make ``block.ndim``
3708    the same for all blocks. This is primarily useful for working with scalars,
3709    and means that code like ``block([v, 1])`` is valid, where
3710    ``v.ndim == 1``.
3711
3712    When the nested list is two levels deep, this allows block matrices to be
3713    constructed from their components.
3714
3715    Parameters
3716    ----------
3717    arrays : nested list of array_like or scalars (but not tuples)
3718        If passed a single ndarray or scalar (a nested list of depth 0), this
3719        is returned unmodified (and not copied).
3720
3721        Elements shapes must match along the appropriate axes (without
3722        broadcasting), but leading 1s will be prepended to the shape as
3723        necessary to make the dimensions match.
3724
3725    allow_unknown_chunksizes: bool
3726        Allow unknown chunksizes, such as come from converting from dask
3727        dataframes.  Dask.array is unable to verify that chunks line up.  If
3728        data comes from differently aligned sources then this can cause
3729        unexpected results.
3730
3731    Returns
3732    -------
3733    block_array : ndarray
3734        The array assembled from the given blocks.
3735
3736        The dimensionality of the output is equal to the greatest of:
3737        * the dimensionality of all the inputs
3738        * the depth to which the input list is nested
3739
3740    Raises
3741    ------
3742    ValueError
3743        * If list depths are mismatched - for instance, ``[[a, b], c]`` is
3744          illegal, and should be spelt ``[[a, b], [c]]``
3745        * If lists are empty - for instance, ``[[a, b], []]``
3746
3747    See Also
3748    --------
3749    concatenate : Join a sequence of arrays together.
3750    stack : Stack arrays in sequence along a new dimension.
3751    hstack : Stack arrays in sequence horizontally (column wise).
3752    vstack : Stack arrays in sequence vertically (row wise).
3753    dstack : Stack arrays in sequence depth wise (along third dimension).
3754    vsplit : Split array into a list of multiple sub-arrays vertically.
3755
3756    Notes
3757    -----
3758
3759    When called with only scalars, ``block`` is equivalent to an ndarray
3760    call. So ``block([[1, 2], [3, 4]])`` is equivalent to
3761    ``array([[1, 2], [3, 4]])``.
3762
3763    This function does not enforce that the blocks lie on a fixed grid.
3764    ``block([[a, b], [c, d]])`` is not restricted to arrays of the form::
3765
3766        AAAbb
3767        AAAbb
3768        cccDD
3769
3770    But is also allowed to produce, for some ``a, b, c, d``::
3771
3772        AAAbb
3773        AAAbb
3774        cDDDD
3775
3776    Since concatenation happens along the last axis first, `block` is _not_
3777    capable of producing the following directly::
3778
3779        AAAbb
3780        cccbb
3781        cccDD
3782
3783    Matlab's "square bracket stacking", ``[A, B, ...; p, q, ...]``, is
3784    equivalent to ``block([[A, B, ...], [p, q, ...]])``.
3785    """
3786
3787    # This was copied almost verbatim from numpy.core.shape_base.block
3788    # See numpy license at https://github.com/numpy/numpy/blob/master/LICENSE.txt
3789    # or NUMPY_LICENSE.txt within this directory
3790
3791    def atleast_nd(x, ndim):
3792        x = asanyarray(x)
3793        diff = max(ndim - x.ndim, 0)
3794        if diff == 0:
3795            return x
3796        else:
3797            return x[(None,) * diff + (Ellipsis,)]
3798
3799    def format_index(index):
3800        return "arrays" + "".join(f"[{i}]" for i in index)
3801
3802    rec = _Recurser(recurse_if=lambda x: type(x) is list)
3803
3804    # ensure that the lists are all matched in depth
3805    list_ndim = None
3806    any_empty = False
3807    for index, value, entering in rec.walk(arrays):
3808        if type(value) is tuple:
3809            # not strictly necessary, but saves us from:
3810            #  - more than one way to do things - no point treating tuples like
3811            #    lists
3812            #  - horribly confusing behaviour that results when tuples are
3813            #    treated like ndarray
3814            raise TypeError(
3815                "{} is a tuple. "
3816                "Only lists can be used to arrange blocks, and np.block does "
3817                "not allow implicit conversion from tuple to ndarray.".format(
3818                    format_index(index)
3819                )
3820            )
3821        if not entering:
3822            curr_depth = len(index)
3823        elif len(value) == 0:
3824            curr_depth = len(index) + 1
3825            any_empty = True
3826        else:
3827            continue
3828
3829        if list_ndim is not None and list_ndim != curr_depth:
3830            raise ValueError(
3831                "List depths are mismatched. First element was at depth {}, "
3832                "but there is an element at depth {} ({})".format(
3833                    list_ndim, curr_depth, format_index(index)
3834                )
3835            )
3836        list_ndim = curr_depth
3837
3838    # do this here so we catch depth mismatches first
3839    if any_empty:
3840        raise ValueError("Lists cannot be empty")
3841
3842    # convert all the arrays to ndarrays
3843    arrays = rec.map_reduce(arrays, f_map=asanyarray, f_reduce=list)
3844
3845    # determine the maximum dimension of the elements
3846    elem_ndim = rec.map_reduce(arrays, f_map=lambda xi: xi.ndim, f_reduce=max)
3847    ndim = max(list_ndim, elem_ndim)
3848
3849    # first axis to concatenate along
3850    first_axis = ndim - list_ndim
3851
3852    # Make all the elements the same dimension
3853    arrays = rec.map_reduce(
3854        arrays, f_map=lambda xi: atleast_nd(xi, ndim), f_reduce=list
3855    )
3856
3857    # concatenate innermost lists on the right, outermost on the left
3858    return rec.map_reduce(
3859        arrays,
3860        f_reduce=lambda xs, axis: concatenate(
3861            list(xs), axis=axis, allow_unknown_chunksizes=allow_unknown_chunksizes
3862        ),
3863        f_kwargs=lambda axis: dict(axis=(axis + 1)),
3864        axis=first_axis,
3865    )
3866
3867
3868def concatenate(seq, axis=0, allow_unknown_chunksizes=False):
3869    """
3870    Concatenate arrays along an existing axis
3871
3872    Given a sequence of dask Arrays form a new dask Array by stacking them
3873    along an existing dimension (axis=0 by default)
3874
3875    Parameters
3876    ----------
3877    seq: list of dask.arrays
3878    axis: int
3879        Dimension along which to align all of the arrays
3880    allow_unknown_chunksizes: bool
3881        Allow unknown chunksizes, such as come from converting from dask
3882        dataframes.  Dask.array is unable to verify that chunks line up.  If
3883        data comes from differently aligned sources then this can cause
3884        unexpected results.
3885
3886    Examples
3887    --------
3888
3889    Create slices
3890
3891    >>> import dask.array as da
3892    >>> import numpy as np
3893
3894    >>> data = [da.from_array(np.ones((4, 4)), chunks=(2, 2))
3895    ...          for i in range(3)]
3896
3897    >>> x = da.concatenate(data, axis=0)
3898    >>> x.shape
3899    (12, 4)
3900
3901    >>> da.concatenate(data, axis=1).shape
3902    (4, 12)
3903
3904    Result is a new dask Array
3905
3906    See Also
3907    --------
3908    stack
3909    """
3910    from . import wrap
3911
3912    seq = [asarray(a, allow_unknown_chunksizes=allow_unknown_chunksizes) for a in seq]
3913
3914    if not seq:
3915        raise ValueError("Need array(s) to concatenate")
3916
3917    seq_metas = [meta_from_array(s) for s in seq]
3918    _concatenate = concatenate_lookup.dispatch(
3919        type(max(seq_metas, key=lambda x: getattr(x, "__array_priority__", 0)))
3920    )
3921    meta = _concatenate(seq_metas, axis=axis)
3922
3923    # Promote types to match meta
3924    seq = [a.astype(meta.dtype) for a in seq]
3925
3926    # Find output array shape
3927    ndim = len(seq[0].shape)
3928    shape = tuple(
3929        sum(a.shape[i] for a in seq) if i == axis else seq[0].shape[i]
3930        for i in range(ndim)
3931    )
3932
3933    # Drop empty arrays
3934    seq2 = [a for a in seq if a.size]
3935    if not seq2:
3936        seq2 = seq
3937
3938    if axis < 0:
3939        axis = ndim + axis
3940    if axis >= ndim:
3941        msg = (
3942            "Axis must be less than than number of dimensions"
3943            "\nData has %d dimensions, but got axis=%d"
3944        )
3945        raise ValueError(msg % (ndim, axis))
3946
3947    n = len(seq2)
3948    if n == 0:
3949        try:
3950            return wrap.empty_like(meta, shape=shape, chunks=shape, dtype=meta.dtype)
3951        except TypeError:
3952            return wrap.empty(shape, chunks=shape, dtype=meta.dtype)
3953    elif n == 1:
3954        return seq2[0]
3955
3956    if not allow_unknown_chunksizes and not all(
3957        i == axis or all(x.shape[i] == seq2[0].shape[i] for x in seq2)
3958        for i in range(ndim)
3959    ):
3960        if any(map(np.isnan, seq2[0].shape)):
3961            raise ValueError(
3962                "Tried to concatenate arrays with unknown"
3963                " shape %s.\n\nTwo solutions:\n"
3964                "  1. Force concatenation pass"
3965                " allow_unknown_chunksizes=True.\n"
3966                "  2. Compute shapes with "
3967                "[x.compute_chunk_sizes() for x in seq]" % str(seq2[0].shape)
3968            )
3969        raise ValueError("Shapes do not align: %s", [x.shape for x in seq2])
3970
3971    inds = [list(range(ndim)) for i in range(n)]
3972    for i, ind in enumerate(inds):
3973        ind[axis] = -(i + 1)
3974
3975    uc_args = list(concat(zip(seq2, inds)))
3976    _, seq2 = unify_chunks(*uc_args, warn=False)
3977
3978    bds = [a.chunks for a in seq2]
3979
3980    chunks = (
3981        seq2[0].chunks[:axis]
3982        + (sum((bd[axis] for bd in bds), ()),)
3983        + seq2[0].chunks[axis + 1 :]
3984    )
3985
3986    cum_dims = [0] + list(accumulate(add, [len(a.chunks[axis]) for a in seq2]))
3987
3988    names = [a.name for a in seq2]
3989
3990    name = "concatenate-" + tokenize(names, axis)
3991    keys = list(product([name], *[range(len(bd)) for bd in chunks]))
3992
3993    values = [
3994        (names[bisect(cum_dims, key[axis + 1]) - 1],)
3995        + key[1 : axis + 1]
3996        + (key[axis + 1] - cum_dims[bisect(cum_dims, key[axis + 1]) - 1],)
3997        + key[axis + 2 :]
3998        for key in keys
3999    ]
4000
4001    dsk = dict(zip(keys, values))
4002    graph = HighLevelGraph.from_collections(name, dsk, dependencies=seq2)
4003
4004    return Array(graph, name, chunks, meta=meta)
4005
4006
4007def load_store_chunk(x, out, index, lock, return_stored, load_stored):
4008    """
4009    A function inserted in a Dask graph for storing a chunk.
4010
4011    Parameters
4012    ----------
4013    x: array-like
4014        An array (potentially a NumPy one)
4015    out: array-like
4016        Where to store results too.
4017    index: slice-like
4018        Where to store result from ``x`` in ``out``.
4019    lock: Lock-like or False
4020        Lock to use before writing to ``out``.
4021    return_stored: bool
4022        Whether to return ``out``.
4023    load_stored: bool
4024        Whether to return the array stored in ``out``.
4025        Ignored if ``return_stored`` is not ``True``.
4026
4027    Examples
4028    --------
4029
4030    >>> a = np.ones((5, 6))
4031    >>> b = np.empty(a.shape)
4032    >>> load_store_chunk(a, b, (slice(None), slice(None)), False, False, False)
4033    """
4034
4035    result = None
4036    if return_stored and not load_stored:
4037        result = out
4038
4039    if lock:
4040        lock.acquire()
4041    try:
4042        if x is not None:
4043            if is_arraylike(x):
4044                out[index] = x
4045            else:
4046                out[index] = np.asanyarray(x)
4047        if return_stored and load_stored:
4048            result = out[index]
4049    finally:
4050        if lock:
4051            lock.release()
4052
4053    return result
4054
4055
4056def store_chunk(x, out, index, lock, return_stored):
4057    return load_store_chunk(x, out, index, lock, return_stored, False)
4058
4059
4060def load_chunk(out, index, lock):
4061    return load_store_chunk(None, out, index, lock, True, True)
4062
4063
4064def insert_to_ooc(
4065    keys: list,
4066    chunks: tuple[tuple[int, ...], ...],
4067    out,
4068    name: str,
4069    *,
4070    lock: Lock | bool = True,
4071    region: slice | None = None,
4072    return_stored: bool = False,
4073    load_stored: bool = False,
4074) -> dict:
4075    """
4076    Creates a Dask graph for storing chunks from ``arr`` in ``out``.
4077
4078    Parameters
4079    ----------
4080    keys: list
4081        Dask keys of the input array
4082    chunks: tuple
4083        Dask chunks of the input array
4084    out: array-like
4085        Where to store results to
4086    name: str
4087        First element of dask keys
4088    lock: Lock-like or bool, optional
4089        Whether to lock or with what (default is ``True``,
4090        which means a :class:`threading.Lock` instance).
4091    region: slice-like, optional
4092        Where in ``out`` to store ``arr``'s results
4093        (default is ``None``, meaning all of ``out``).
4094    return_stored: bool, optional
4095        Whether to return ``out``
4096        (default is ``False``, meaning ``None`` is returned).
4097    load_stored: bool, optional
4098        Whether to handling loading from ``out`` at the same time.
4099        Ignored if ``return_stored`` is not ``True``.
4100        (default is ``False``, meaning defer to ``return_stored``).
4101
4102    Returns
4103    -------
4104    dask graph of store operation
4105
4106    Examples
4107    --------
4108    >>> import dask.array as da
4109    >>> d = da.ones((5, 6), chunks=(2, 3))
4110    >>> a = np.empty(d.shape)
4111    >>> insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123")  # doctest: +SKIP
4112    """
4113
4114    if lock is True:
4115        lock = Lock()
4116
4117    slices = slices_from_chunks(chunks)
4118    if region:
4119        slices = [fuse_slice(region, slc) for slc in slices]
4120
4121    if return_stored and load_stored:
4122        func = load_store_chunk
4123        args = (load_stored,)
4124    else:
4125        func = store_chunk
4126        args = ()
4127
4128    dsk = {
4129        (name,) + t[1:]: (func, t, out, slc, lock, return_stored) + args
4130        for t, slc in zip(core.flatten(keys), slices)
4131    }
4132    return dsk
4133
4134
4135def retrieve_from_ooc(
4136    keys: Collection[Hashable], dsk_pre: Mapping, dsk_post: Mapping
4137) -> dict:
4138    """
4139    Creates a Dask graph for loading stored ``keys`` from ``dsk``.
4140
4141    Parameters
4142    ----------
4143    keys: Collection
4144        A sequence containing Dask graph keys to load
4145    dsk_pre: Mapping
4146        A Dask graph corresponding to a Dask Array before computation
4147    dsk_post: Mapping
4148        A Dask graph corresponding to a Dask Array after computation
4149
4150    Examples
4151    --------
4152    >>> import dask.array as da
4153    >>> d = da.ones((5, 6), chunks=(2, 3))
4154    >>> a = np.empty(d.shape)
4155    >>> g = insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123")
4156    >>> retrieve_from_ooc(g.keys(), g, {k: k for k in g.keys()})  # doctest: +SKIP
4157    """
4158    load_dsk = {
4159        ("load-" + k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1]
4160        for k in keys
4161    }
4162
4163    return load_dsk
4164
4165
4166def asarray(
4167    a, allow_unknown_chunksizes=False, dtype=None, order=None, *, like=None, **kwargs
4168):
4169    """Convert the input to a dask array.
4170
4171    Parameters
4172    ----------
4173    a : array-like
4174        Input data, in any form that can be converted to a dask array. This
4175        includes lists, lists of tuples, tuples, tuples of tuples, tuples of
4176        lists and ndarrays.
4177    allow_unknown_chunksizes: bool
4178        Allow unknown chunksizes, such as come from converting from dask
4179        dataframes.  Dask.array is unable to verify that chunks line up.  If
4180        data comes from differently aligned sources then this can cause
4181        unexpected results.
4182    dtype : data-type, optional
4183        By default, the data-type is inferred from the input data.
4184    order : {‘C’, ‘F’, ‘A’, ‘K’}, optional
4185        Memory layout. ‘A’ and ‘K’ depend on the order of input array a.
4186        ‘C’ row-major (C-style), ‘F’ column-major (Fortran-style) memory
4187        representation. ‘A’ (any) means ‘F’ if a is Fortran contiguous, ‘C’
4188        otherwise ‘K’ (keep) preserve input order. Defaults to ‘C’.
4189    like: array-like
4190        Reference object to allow the creation of Dask arrays with chunks
4191        that are not NumPy arrays. If an array-like passed in as ``like``
4192        supports the ``__array_function__`` protocol, the chunk type of the
4193        resulting array will be definde by it. In this case, it ensures the
4194        creation of a Dask array compatible with that passed in via this
4195        argument. If ``like`` is a Dask array, the chunk type of the
4196        resulting array will be defined by the chunk type of ``like``.
4197        Requires NumPy 1.20.0 or higher.
4198
4199    Returns
4200    -------
4201    out : dask array
4202        Dask array interpretation of a.
4203
4204    Examples
4205    --------
4206    >>> import dask.array as da
4207    >>> import numpy as np
4208    >>> x = np.arange(3)
4209    >>> da.asarray(x)
4210    dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray>
4211
4212    >>> y = [[1, 2, 3], [4, 5, 6]]
4213    >>> da.asarray(y)
4214    dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3), chunktype=numpy.ndarray>
4215    """
4216    if like is None:
4217        if isinstance(a, Array):
4218            return a
4219        elif hasattr(a, "to_dask_array"):
4220            return a.to_dask_array()
4221        elif type(a).__module__.split(".")[0] == "xarray" and hasattr(a, "data"):
4222            return asarray(a.data)
4223        elif isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
4224            return stack(a, allow_unknown_chunksizes=allow_unknown_chunksizes)
4225        elif not isinstance(getattr(a, "shape", None), Iterable):
4226            a = np.asarray(a, dtype=dtype, order=order)
4227    else:
4228        if not _numpy_120:
4229            raise RuntimeError("The use of ``like`` required NumPy >= 1.20")
4230
4231        like_meta = meta_from_array(like)
4232        if isinstance(a, Array):
4233            return a.map_blocks(np.asarray, like=like_meta, dtype=dtype, order=order)
4234        else:
4235            a = np.asarray(a, like=like_meta, dtype=dtype, order=order)
4236    return from_array(a, getitem=getter_inline, **kwargs)
4237
4238
4239def asanyarray(a, dtype=None, order=None, *, like=None):
4240    """Convert the input to a dask array.
4241
4242    Subclasses of ``np.ndarray`` will be passed through as chunks unchanged.
4243
4244    Parameters
4245    ----------
4246    a : array-like
4247        Input data, in any form that can be converted to a dask array. This
4248        includes lists, lists of tuples, tuples, tuples of tuples, tuples of
4249        lists and ndarrays.
4250    dtype : data-type, optional
4251        By default, the data-type is inferred from the input data.
4252    order : {‘C’, ‘F’, ‘A’, ‘K’}, optional
4253        Memory layout. ‘A’ and ‘K’ depend on the order of input array a.
4254        ‘C’ row-major (C-style), ‘F’ column-major (Fortran-style) memory
4255        representation. ‘A’ (any) means ‘F’ if a is Fortran contiguous, ‘C’
4256        otherwise ‘K’ (keep) preserve input order. Defaults to ‘C’.
4257    like: array-like
4258        Reference object to allow the creation of Dask arrays with chunks
4259        that are not NumPy arrays. If an array-like passed in as ``like``
4260        supports the ``__array_function__`` protocol, the chunk type of the
4261        resulting array will be definde by it. In this case, it ensures the
4262        creation of a Dask array compatible with that passed in via this
4263        argument. If ``like`` is a Dask array, the chunk type of the
4264        resulting array will be defined by the chunk type of ``like``.
4265        Requires NumPy 1.20.0 or higher.
4266
4267    Returns
4268    -------
4269    out : dask array
4270        Dask array interpretation of a.
4271
4272    Examples
4273    --------
4274    >>> import dask.array as da
4275    >>> import numpy as np
4276    >>> x = np.arange(3)
4277    >>> da.asanyarray(x)
4278    dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray>
4279
4280    >>> y = [[1, 2, 3], [4, 5, 6]]
4281    >>> da.asanyarray(y)
4282    dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3), chunktype=numpy.ndarray>
4283    """
4284    if like is None:
4285        if isinstance(a, Array):
4286            return a
4287        elif hasattr(a, "to_dask_array"):
4288            return a.to_dask_array()
4289        elif type(a).__module__.split(".")[0] == "xarray" and hasattr(a, "data"):
4290            return asanyarray(a.data)
4291        elif isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
4292            return stack(a)
4293        elif not isinstance(getattr(a, "shape", None), Iterable):
4294            a = np.asanyarray(a, dtype=dtype, order=order)
4295    else:
4296        if not _numpy_120:
4297            raise RuntimeError("The use of ``like`` required NumPy >= 1.20")
4298
4299        like_meta = meta_from_array(like)
4300        if isinstance(a, Array):
4301            return a.map_blocks(np.asanyarray, like=like_meta, dtype=dtype, order=order)
4302        else:
4303            a = np.asanyarray(a, like=like_meta, dtype=dtype, order=order)
4304    return from_array(a, chunks=a.shape, getitem=getter_inline, asarray=False)
4305
4306
4307def is_scalar_for_elemwise(arg):
4308    """
4309
4310    >>> is_scalar_for_elemwise(42)
4311    True
4312    >>> is_scalar_for_elemwise('foo')
4313    True
4314    >>> is_scalar_for_elemwise(True)
4315    True
4316    >>> is_scalar_for_elemwise(np.array(42))
4317    True
4318    >>> is_scalar_for_elemwise([1, 2, 3])
4319    True
4320    >>> is_scalar_for_elemwise(np.array([1, 2, 3]))
4321    False
4322    >>> is_scalar_for_elemwise(from_array(np.array(0), chunks=()))
4323    False
4324    >>> is_scalar_for_elemwise(np.dtype('i4'))
4325    True
4326    """
4327    # the second half of shape_condition is essentially just to ensure that
4328    # dask series / frame are treated as scalars in elemwise.
4329    maybe_shape = getattr(arg, "shape", None)
4330    shape_condition = not isinstance(maybe_shape, Iterable) or any(
4331        is_dask_collection(x) for x in maybe_shape
4332    )
4333
4334    return (
4335        np.isscalar(arg)
4336        or shape_condition
4337        or isinstance(arg, np.dtype)
4338        or (isinstance(arg, np.ndarray) and arg.ndim == 0)
4339    )
4340
4341
4342def broadcast_shapes(*shapes):
4343    """
4344    Determines output shape from broadcasting arrays.
4345
4346    Parameters
4347    ----------
4348    shapes : tuples
4349        The shapes of the arguments.
4350
4351    Returns
4352    -------
4353    output_shape : tuple
4354
4355    Raises
4356    ------
4357    ValueError
4358        If the input shapes cannot be successfully broadcast together.
4359    """
4360    if len(shapes) == 1:
4361        return shapes[0]
4362    out = []
4363    for sizes in zip_longest(*map(reversed, shapes), fillvalue=-1):
4364        if np.isnan(sizes).any():
4365            dim = np.nan
4366        else:
4367            dim = 0 if 0 in sizes else np.max(sizes)
4368        if any(i not in [-1, 0, 1, dim] and not np.isnan(i) for i in sizes):
4369            raise ValueError(
4370                "operands could not be broadcast together with "
4371                "shapes {}".format(" ".join(map(str, shapes)))
4372            )
4373        out.append(dim)
4374    return tuple(reversed(out))
4375
4376
4377def elemwise(op, *args, out=None, **kwargs):
4378    """Apply elementwise function across arguments
4379
4380    Respects broadcasting rules
4381
4382    Parameters
4383    ----------
4384    out : dask array or None
4385        If out is a dask.array then this overwrites the contents of that array with
4386        the result
4387
4388    Examples
4389    --------
4390    >>> elemwise(add, x, y)  # doctest: +SKIP
4391    >>> elemwise(sin, x)  # doctest: +SKIP
4392    >>> elemwise(sin, x, out=dask_array)  # doctest: +SKIP
4393
4394    See Also
4395    --------
4396    blockwise
4397    """
4398    if not {"name", "dtype"}.issuperset(kwargs):
4399        msg = "%s does not take the following keyword arguments %s"
4400        raise TypeError(
4401            msg % (op.__name__, str(sorted(set(kwargs) - {"name", "dtype"})))
4402        )
4403
4404    args = [np.asarray(a) if isinstance(a, (list, tuple)) else a for a in args]
4405
4406    shapes = []
4407    for arg in args:
4408        shape = getattr(arg, "shape", ())
4409        if any(is_dask_collection(x) for x in shape):
4410            # Want to exclude Delayed shapes and dd.Scalar
4411            shape = ()
4412        shapes.append(shape)
4413
4414    shapes = [s if isinstance(s, Iterable) else () for s in shapes]
4415    out_ndim = len(
4416        broadcast_shapes(*shapes)
4417    )  # Raises ValueError if dimensions mismatch
4418    expr_inds = tuple(range(out_ndim))[::-1]
4419
4420    need_enforce_dtype = False
4421    if "dtype" in kwargs:
4422        need_enforce_dtype = True
4423        dt = kwargs["dtype"]
4424    else:
4425        # We follow NumPy's rules for dtype promotion, which special cases
4426        # scalars and 0d ndarrays (which it considers equivalent) by using
4427        # their values to compute the result dtype:
4428        # https://github.com/numpy/numpy/issues/6240
4429        # We don't inspect the values of 0d dask arrays, because these could
4430        # hold potentially very expensive calculations. Instead, we treat
4431        # them just like other arrays, and if necessary cast the result of op
4432        # to match.
4433        vals = [
4434            np.empty((1,) * max(1, a.ndim), dtype=a.dtype)
4435            if not is_scalar_for_elemwise(a)
4436            else a
4437            for a in args
4438        ]
4439        try:
4440            dt = apply_infer_dtype(op, vals, {}, "elemwise", suggest_dtype=False)
4441        except Exception:
4442            return NotImplemented
4443        need_enforce_dtype = any(
4444            not is_scalar_for_elemwise(a) and a.ndim == 0 for a in args
4445        )
4446
4447    name = kwargs.get("name", None) or f"{funcname(op)}-{tokenize(op, dt, *args)}"
4448
4449    blockwise_kwargs = dict(dtype=dt, name=name, token=funcname(op).strip("_"))
4450    if need_enforce_dtype:
4451        blockwise_kwargs["enforce_dtype"] = dt
4452        blockwise_kwargs["enforce_dtype_function"] = op
4453        op = _enforce_dtype
4454    result = blockwise(
4455        op,
4456        expr_inds,
4457        *concat(
4458            (a, tuple(range(a.ndim)[::-1]) if not is_scalar_for_elemwise(a) else None)
4459            for a in args
4460        ),
4461        **blockwise_kwargs,
4462    )
4463
4464    return handle_out(out, result)
4465
4466
4467def handle_out(out, result):
4468    """Handle out parameters
4469
4470    If out is a dask.array then this overwrites the contents of that array with
4471    the result
4472    """
4473    if isinstance(out, tuple):
4474        if len(out) == 1:
4475            out = out[0]
4476        elif len(out) > 1:
4477            raise NotImplementedError("The out parameter is not fully supported")
4478        else:
4479            out = None
4480    if isinstance(out, Array):
4481        if out.shape != result.shape:
4482            raise ValueError(
4483                "Mismatched shapes between result and out parameter. "
4484                "out=%s, result=%s" % (str(out.shape), str(result.shape))
4485            )
4486        out._chunks = result.chunks
4487        out.dask = result.dask
4488        out._meta = result._meta
4489        out._name = result.name
4490    elif out is not None:
4491        msg = (
4492            "The out parameter is not fully supported."
4493            " Received type %s, expected Dask Array" % type(out).__name__
4494        )
4495        raise NotImplementedError(msg)
4496    else:
4497        return result
4498
4499
4500def _enforce_dtype(*args, **kwargs):
4501    """Calls a function and converts its result to the given dtype.
4502
4503    The parameters have deliberately been given unwieldy names to avoid
4504    clashes with keyword arguments consumed by blockwise
4505
4506    A dtype of `object` is treated as a special case and not enforced,
4507    because it is used as a dummy value in some places when the result will
4508    not be a block in an Array.
4509
4510    Parameters
4511    ----------
4512    enforce_dtype : dtype
4513        Result dtype
4514    enforce_dtype_function : callable
4515        The wrapped function, which will be passed the remaining arguments
4516    """
4517    dtype = kwargs.pop("enforce_dtype")
4518    function = kwargs.pop("enforce_dtype_function")
4519
4520    result = function(*args, **kwargs)
4521    if hasattr(result, "dtype") and dtype != result.dtype and dtype != object:
4522        if not np.can_cast(result, dtype, casting="same_kind"):
4523            raise ValueError(
4524                "Inferred dtype from function %r was %r "
4525                "but got %r, which can't be cast using "
4526                "casting='same_kind'"
4527                % (funcname(function), str(dtype), str(result.dtype))
4528            )
4529        if np.isscalar(result):
4530            # scalar astype method doesn't take the keyword arguments, so
4531            # have to convert via 0-dimensional array and back.
4532            result = result.astype(dtype)
4533        else:
4534            try:
4535                result = result.astype(dtype, copy=False)
4536            except TypeError:
4537                # Missing copy kwarg
4538                result = result.astype(dtype)
4539    return result
4540
4541
4542def broadcast_to(x, shape, chunks=None, meta=None):
4543    """Broadcast an array to a new shape.
4544
4545    Parameters
4546    ----------
4547    x : array_like
4548        The array to broadcast.
4549    shape : tuple
4550        The shape of the desired array.
4551    chunks : tuple, optional
4552        If provided, then the result will use these chunks instead of the same
4553        chunks as the source array. Setting chunks explicitly as part of
4554        broadcast_to is more efficient than rechunking afterwards. Chunks are
4555        only allowed to differ from the original shape along dimensions that
4556        are new on the result or have size 1 the input array.
4557    meta : empty ndarray
4558        empty ndarray created with same NumPy backend, ndim and dtype as the
4559        Dask Array being created (overrides dtype)
4560
4561    Returns
4562    -------
4563    broadcast : dask array
4564
4565    See Also
4566    --------
4567    :func:`numpy.broadcast_to`
4568    """
4569    x = asarray(x)
4570    shape = tuple(shape)
4571
4572    if meta is None:
4573        meta = meta_from_array(x)
4574
4575    if x.shape == shape and (chunks is None or chunks == x.chunks):
4576        return x
4577
4578    ndim_new = len(shape) - x.ndim
4579    if ndim_new < 0 or any(
4580        new != old for new, old in zip(shape[ndim_new:], x.shape) if old != 1
4581    ):
4582        raise ValueError(f"cannot broadcast shape {x.shape} to shape {shape}")
4583
4584    if chunks is None:
4585        chunks = tuple((s,) for s in shape[:ndim_new]) + tuple(
4586            bd if old > 1 else (new,)
4587            for bd, old, new in zip(x.chunks, x.shape, shape[ndim_new:])
4588        )
4589    else:
4590        chunks = normalize_chunks(
4591            chunks, shape, dtype=x.dtype, previous_chunks=x.chunks
4592        )
4593        for old_bd, new_bd in zip(x.chunks, chunks[ndim_new:]):
4594            if old_bd != new_bd and old_bd != (1,):
4595                raise ValueError(
4596                    "cannot broadcast chunks %s to chunks %s: "
4597                    "new chunks must either be along a new "
4598                    "dimension or a dimension of size 1" % (x.chunks, chunks)
4599                )
4600
4601    name = "broadcast_to-" + tokenize(x, shape, chunks)
4602    dsk = {}
4603
4604    enumerated_chunks = product(*(enumerate(bds) for bds in chunks))
4605    for new_index, chunk_shape in (zip(*ec) for ec in enumerated_chunks):
4606        old_index = tuple(
4607            0 if bd == (1,) else i for bd, i in zip(x.chunks, new_index[ndim_new:])
4608        )
4609        old_key = (x.name,) + old_index
4610        new_key = (name,) + new_index
4611        dsk[new_key] = (np.broadcast_to, old_key, quote(chunk_shape))
4612
4613    graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x])
4614    return Array(graph, name, chunks, dtype=x.dtype, meta=meta)
4615
4616
4617@derived_from(np)
4618def broadcast_arrays(*args, subok=False):
4619    subok = bool(subok)
4620
4621    to_array = asanyarray if subok else asarray
4622    args = tuple(to_array(e) for e in args)
4623
4624    # Unify uneven chunking
4625    inds = [list(reversed(range(x.ndim))) for x in args]
4626    uc_args = concat(zip(args, inds))
4627    _, args = unify_chunks(*uc_args, warn=False)
4628
4629    shape = broadcast_shapes(*(e.shape for e in args))
4630    chunks = broadcast_chunks(*(e.chunks for e in args))
4631
4632    result = [broadcast_to(e, shape=shape, chunks=chunks) for e in args]
4633
4634    return result
4635
4636
4637def offset_func(func, offset, *args):
4638    """Offsets inputs by offset
4639
4640    >>> double = lambda x: x * 2
4641    >>> f = offset_func(double, (10,))
4642    >>> f(1)
4643    22
4644    >>> f(300)
4645    620
4646    """
4647
4648    def _offset(*args):
4649        args2 = list(map(add, args, offset))
4650        return func(*args2)
4651
4652    with contextlib.suppress(Exception):
4653        _offset.__name__ = "offset_" + func.__name__
4654
4655    return _offset
4656
4657
4658def chunks_from_arrays(arrays):
4659    """Chunks tuple from nested list of arrays
4660
4661    >>> x = np.array([1, 2])
4662    >>> chunks_from_arrays([x, x])
4663    ((2, 2),)
4664
4665    >>> x = np.array([[1, 2]])
4666    >>> chunks_from_arrays([[x], [x]])
4667    ((1, 1), (2,))
4668
4669    >>> x = np.array([[1, 2]])
4670    >>> chunks_from_arrays([[x, x]])
4671    ((1,), (2, 2))
4672
4673    >>> chunks_from_arrays([1, 1])
4674    ((1, 1),)
4675    """
4676    if not arrays:
4677        return ()
4678    result = []
4679    dim = 0
4680
4681    def shape(x):
4682        try:
4683            return x.shape
4684        except AttributeError:
4685            return (1,)
4686
4687    while isinstance(arrays, (list, tuple)):
4688        result.append(tuple(shape(deepfirst(a))[dim] for a in arrays))
4689        arrays = arrays[0]
4690        dim += 1
4691    return tuple(result)
4692
4693
4694def deepfirst(seq):
4695    """First element in a nested list
4696
4697    >>> deepfirst([[[1, 2], [3, 4]], [5, 6], [7, 8]])
4698    1
4699    """
4700    if not isinstance(seq, (list, tuple)):
4701        return seq
4702    else:
4703        return deepfirst(seq[0])
4704
4705
4706def shapelist(a):
4707    """Get the shape of nested list"""
4708    if type(a) is list:
4709        return tuple([len(a)] + list(shapelist(a[0])))
4710    else:
4711        return ()
4712
4713
4714def transposelist(arrays, axes, extradims=0):
4715    """Permute axes of nested list
4716
4717    >>> transposelist([[1,1,1],[1,1,1]], [2,1])
4718    [[[1, 1], [1, 1], [1, 1]]]
4719
4720    >>> transposelist([[1,1,1],[1,1,1]], [2,1], extradims=1)
4721    [[[[1], [1]], [[1], [1]], [[1], [1]]]]
4722    """
4723    if len(axes) != ndimlist(arrays):
4724        raise ValueError("Length of axes should equal depth of nested arrays")
4725    if extradims < 0:
4726        raise ValueError("`newdims` should be positive")
4727    if len(axes) > len(set(axes)):
4728        raise ValueError("`axes` should be unique")
4729
4730    ndim = max(axes) + 1
4731    shape = shapelist(arrays)
4732    newshape = [
4733        shape[axes.index(i)] if i in axes else 1 for i in range(ndim + extradims)
4734    ]
4735
4736    result = list(core.flatten(arrays))
4737    return reshapelist(newshape, result)
4738
4739
4740def stack(seq, axis=0, allow_unknown_chunksizes=False):
4741    """
4742    Stack arrays along a new axis
4743
4744    Given a sequence of dask arrays, form a new dask array by stacking them
4745    along a new dimension (axis=0 by default)
4746
4747    Parameters
4748    ----------
4749    seq: list of dask.arrays
4750    axis: int
4751        Dimension along which to align all of the arrays
4752    allow_unknown_chunksizes: bool
4753        Allow unknown chunksizes, such as come from converting from dask
4754        dataframes.  Dask.array is unable to verify that chunks line up.  If
4755        data comes from differently aligned sources then this can cause
4756        unexpected results.
4757
4758    Examples
4759    --------
4760
4761    Create slices
4762
4763    >>> import dask.array as da
4764    >>> import numpy as np
4765
4766    >>> data = [da.from_array(np.ones((4, 4)), chunks=(2, 2))
4767    ...         for i in range(3)]
4768
4769    >>> x = da.stack(data, axis=0)
4770    >>> x.shape
4771    (3, 4, 4)
4772
4773    >>> da.stack(data, axis=1).shape
4774    (4, 3, 4)
4775
4776    >>> da.stack(data, axis=-1).shape
4777    (4, 4, 3)
4778
4779    Result is a new dask Array
4780
4781    See Also
4782    --------
4783    concatenate
4784    """
4785    from . import wrap
4786
4787    seq = [asarray(a, allow_unknown_chunksizes=allow_unknown_chunksizes) for a in seq]
4788
4789    if not seq:
4790        raise ValueError("Need array(s) to stack")
4791    if not allow_unknown_chunksizes and not all(x.shape == seq[0].shape for x in seq):
4792        idx = first(i for i in enumerate(seq) if i[1].shape != seq[0].shape)
4793        raise ValueError(
4794            "Stacked arrays must have the same shape. The first array had shape "
4795            f"{seq[0].shape}, while array {idx[0] + 1} has shape {idx[1].shape}."
4796        )
4797
4798    meta = np.stack([meta_from_array(a) for a in seq], axis=axis)
4799    seq = [x.astype(meta.dtype) for x in seq]
4800
4801    ndim = meta.ndim - 1
4802    if axis < 0:
4803        axis = ndim + axis + 1
4804    shape = tuple(
4805        len(seq)
4806        if i == axis
4807        else (seq[0].shape[i] if i < axis else seq[0].shape[i - 1])
4808        for i in range(meta.ndim)
4809    )
4810
4811    seq2 = [a for a in seq if a.size]
4812    if not seq2:
4813        seq2 = seq
4814
4815    n = len(seq2)
4816    if n == 0:
4817        try:
4818            return wrap.empty_like(meta, shape=shape, chunks=shape, dtype=meta.dtype)
4819        except TypeError:
4820            return wrap.empty(shape, chunks=shape, dtype=meta.dtype)
4821
4822    ind = list(range(ndim))
4823    uc_args = list(concat((x, ind) for x in seq2))
4824    _, seq2 = unify_chunks(*uc_args)
4825
4826    assert len({a.chunks for a in seq2}) == 1  # same chunks
4827    chunks = seq2[0].chunks[:axis] + ((1,) * n,) + seq2[0].chunks[axis:]
4828
4829    names = [a.name for a in seq2]
4830    name = "stack-" + tokenize(names, axis)
4831    keys = list(product([name], *[range(len(bd)) for bd in chunks]))
4832
4833    inputs = [
4834        (names[key[axis + 1]],) + key[1 : axis + 1] + key[axis + 2 :] for key in keys
4835    ]
4836    values = [
4837        (
4838            getitem,
4839            inp,
4840            (slice(None, None, None),) * axis
4841            + (None,)
4842            + (slice(None, None, None),) * (ndim - axis),
4843        )
4844        for inp in inputs
4845    ]
4846
4847    layer = dict(zip(keys, values))
4848    graph = HighLevelGraph.from_collections(name, layer, dependencies=seq2)
4849
4850    return Array(graph, name, chunks, meta=meta)
4851
4852
4853def concatenate3(arrays):
4854    """Recursive np.concatenate
4855
4856    Input should be a nested list of numpy arrays arranged in the order they
4857    should appear in the array itself.  Each array should have the same number
4858    of dimensions as the desired output and the nesting of the lists.
4859
4860    >>> x = np.array([[1, 2]])
4861    >>> concatenate3([[x, x, x], [x, x, x]])
4862    array([[1, 2, 1, 2, 1, 2],
4863           [1, 2, 1, 2, 1, 2]])
4864
4865    >>> concatenate3([[x, x], [x, x], [x, x]])
4866    array([[1, 2, 1, 2],
4867           [1, 2, 1, 2],
4868           [1, 2, 1, 2]])
4869    """
4870    # We need this as __array_function__ may not exist on older NumPy versions.
4871    # And to reduce verbosity.
4872    NDARRAY_ARRAY_FUNCTION = getattr(np.ndarray, "__array_function__", None)
4873
4874    arrays = concrete(arrays)
4875    if not arrays:
4876        return np.empty(0)
4877
4878    advanced = max(
4879        core.flatten(arrays, container=(list, tuple)),
4880        key=lambda x: getattr(x, "__array_priority__", 0),
4881    )
4882
4883    if not all(
4884        NDARRAY_ARRAY_FUNCTION
4885        is getattr(type(arr), "__array_function__", NDARRAY_ARRAY_FUNCTION)
4886        for arr in core.flatten(arrays, container=(list, tuple))
4887    ):
4888        try:
4889            x = unpack_singleton(arrays)
4890            return _concatenate2(arrays, axes=tuple(range(x.ndim)))
4891        except TypeError:
4892            pass
4893
4894    if concatenate_lookup.dispatch(type(advanced)) is not np.concatenate:
4895        x = unpack_singleton(arrays)
4896        return _concatenate2(arrays, axes=list(range(x.ndim)))
4897
4898    ndim = ndimlist(arrays)
4899    if not ndim:
4900        return arrays
4901    chunks = chunks_from_arrays(arrays)
4902    shape = tuple(map(sum, chunks))
4903
4904    def dtype(x):
4905        try:
4906            return x.dtype
4907        except AttributeError:
4908            return type(x)
4909
4910    result = np.empty(shape=shape, dtype=dtype(deepfirst(arrays)))
4911
4912    for (idx, arr) in zip(
4913        slices_from_chunks(chunks), core.flatten(arrays, container=(list, tuple))
4914    ):
4915        if hasattr(arr, "ndim"):
4916            while arr.ndim < ndim:
4917                arr = arr[None, ...]
4918        result[idx] = arr
4919
4920    return result
4921
4922
4923def concatenate_axes(arrays, axes):
4924    """Recursively call np.concatenate along axes"""
4925    if len(axes) != ndimlist(arrays):
4926        raise ValueError("Length of axes should equal depth of nested arrays")
4927
4928    extradims = max(0, deepfirst(arrays).ndim - (max(axes) + 1))
4929    return concatenate3(transposelist(arrays, axes, extradims=extradims))
4930
4931
4932def to_hdf5(filename, *args, chunks=True, **kwargs):
4933    """Store arrays in HDF5 file
4934
4935    This saves several dask arrays into several datapaths in an HDF5 file.
4936    It creates the necessary datasets and handles clean file opening/closing.
4937
4938    Parameters
4939    ----------
4940    chunks: tuple or ``True``
4941        Chunk shape, or ``True`` to pass the chunks from the dask array.
4942        Defaults to ``True``.
4943
4944    Examples
4945    --------
4946
4947    >>> da.to_hdf5('myfile.hdf5', '/x', x)  # doctest: +SKIP
4948
4949    or
4950
4951    >>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y})  # doctest: +SKIP
4952
4953    Optionally provide arguments as though to ``h5py.File.create_dataset``
4954
4955    >>> da.to_hdf5('myfile.hdf5', '/x', x, compression='lzf', shuffle=True)  # doctest: +SKIP
4956
4957    >>> da.to_hdf5('myfile.hdf5', '/x', x, chunks=(10,20,30))  # doctest: +SKIP
4958
4959    This can also be used as a method on a single Array
4960
4961    >>> x.to_hdf5('myfile.hdf5', '/x')  # doctest: +SKIP
4962
4963    See Also
4964    --------
4965    da.store
4966    h5py.File.create_dataset
4967    """
4968    if len(args) == 1 and isinstance(args[0], dict):
4969        data = args[0]
4970    elif len(args) == 2 and isinstance(args[0], str) and isinstance(args[1], Array):
4971        data = {args[0]: args[1]}
4972    else:
4973        raise ValueError("Please provide {'/data/path': array} dictionary")
4974
4975    import h5py
4976
4977    with h5py.File(filename, mode="a") as f:
4978        dsets = [
4979            f.require_dataset(
4980                dp,
4981                shape=x.shape,
4982                dtype=x.dtype,
4983                chunks=tuple(c[0] for c in x.chunks) if chunks is True else chunks,
4984                **kwargs,
4985            )
4986            for dp, x in data.items()
4987        ]
4988        store(list(data.values()), dsets)
4989
4990
4991def interleave_none(a, b):
4992    """
4993
4994    >>> interleave_none([0, None, 2, None], [1, 3])
4995    (0, 1, 2, 3)
4996    """
4997    result = []
4998    i = j = 0
4999    n = len(a) + len(b)
5000    while i + j < n:
5001        if a[i] is not None:
5002            result.append(a[i])
5003            i += 1
5004        else:
5005            result.append(b[j])
5006            i += 1
5007            j += 1
5008    return tuple(result)
5009
5010
5011def keyname(name, i, okey):
5012    """
5013
5014    >>> keyname('x', 3, [None, None, 0, 2])
5015    ('x', 3, 0, 2)
5016    """
5017    return (name, i) + tuple(k for k in okey if k is not None)
5018
5019
5020def _vindex(x, *indexes):
5021    """Point wise indexing with broadcasting.
5022
5023    >>> x = np.arange(56).reshape((7, 8))
5024    >>> x
5025    array([[ 0,  1,  2,  3,  4,  5,  6,  7],
5026           [ 8,  9, 10, 11, 12, 13, 14, 15],
5027           [16, 17, 18, 19, 20, 21, 22, 23],
5028           [24, 25, 26, 27, 28, 29, 30, 31],
5029           [32, 33, 34, 35, 36, 37, 38, 39],
5030           [40, 41, 42, 43, 44, 45, 46, 47],
5031           [48, 49, 50, 51, 52, 53, 54, 55]])
5032
5033    >>> d = from_array(x, chunks=(3, 4))
5034    >>> result = _vindex(d, [0, 1, 6, 0], [0, 1, 0, 7])
5035    >>> result.compute()
5036    array([ 0,  9, 48,  7])
5037    """
5038    indexes = replace_ellipsis(x.ndim, indexes)
5039
5040    nonfancy_indexes = []
5041    reduced_indexes = []
5042    for i, ind in enumerate(indexes):
5043        if isinstance(ind, Number):
5044            nonfancy_indexes.append(ind)
5045        elif isinstance(ind, slice):
5046            nonfancy_indexes.append(ind)
5047            reduced_indexes.append(slice(None))
5048        else:
5049            nonfancy_indexes.append(slice(None))
5050            reduced_indexes.append(ind)
5051
5052    nonfancy_indexes = tuple(nonfancy_indexes)
5053    reduced_indexes = tuple(reduced_indexes)
5054
5055    x = x[nonfancy_indexes]
5056
5057    array_indexes = {}
5058    for i, (ind, size) in enumerate(zip(reduced_indexes, x.shape)):
5059        if not isinstance(ind, slice):
5060            ind = np.array(ind, copy=True)
5061            if ind.dtype.kind == "b":
5062                raise IndexError("vindex does not support indexing with boolean arrays")
5063            if ((ind >= size) | (ind < -size)).any():
5064                raise IndexError(
5065                    "vindex key has entries out of bounds for "
5066                    "indexing along axis %s of size %s: %r" % (i, size, ind)
5067                )
5068            ind %= size
5069            array_indexes[i] = ind
5070
5071    if array_indexes:
5072        x = _vindex_array(x, array_indexes)
5073
5074    return x
5075
5076
5077def _vindex_array(x, dict_indexes):
5078    """Point wise indexing with only NumPy Arrays."""
5079
5080    try:
5081        broadcast_indexes = np.broadcast_arrays(*dict_indexes.values())
5082    except ValueError as e:
5083        # note: error message exactly matches numpy
5084        shapes_str = " ".join(str(a.shape) for a in dict_indexes.values())
5085        raise IndexError(
5086            "shape mismatch: indexing arrays could not be "
5087            "broadcast together with shapes " + shapes_str
5088        ) from e
5089    broadcast_shape = broadcast_indexes[0].shape
5090
5091    lookup = dict(zip(dict_indexes, broadcast_indexes))
5092    flat_indexes = [
5093        lookup[i].ravel().tolist() if i in lookup else None for i in range(x.ndim)
5094    ]
5095    flat_indexes.extend([None] * (x.ndim - len(flat_indexes)))
5096
5097    flat_indexes = [
5098        list(index) if index is not None else index for index in flat_indexes
5099    ]
5100    bounds = [list(accumulate(add, (0,) + c)) for c in x.chunks]
5101    bounds2 = [b for i, b in zip(flat_indexes, bounds) if i is not None]
5102    axis = _get_axis(flat_indexes)
5103    token = tokenize(x, flat_indexes)
5104    out_name = "vindex-merge-" + token
5105
5106    points = list()
5107    for i, idx in enumerate(zip(*[i for i in flat_indexes if i is not None])):
5108        block_idx = [bisect(b, ind) - 1 for b, ind in zip(bounds2, idx)]
5109        inblock_idx = [
5110            ind - bounds2[k][j] for k, (ind, j) in enumerate(zip(idx, block_idx))
5111        ]
5112        points.append((i, tuple(block_idx), tuple(inblock_idx)))
5113
5114    chunks = [c for i, c in zip(flat_indexes, x.chunks) if i is None]
5115    chunks.insert(0, (len(points),) if points else (0,))
5116    chunks = tuple(chunks)
5117
5118    if points:
5119        per_block = groupby(1, points)
5120        per_block = {k: v for k, v in per_block.items() if v}
5121
5122        other_blocks = list(
5123            product(
5124                *[
5125                    list(range(len(c))) if i is None else [None]
5126                    for i, c in zip(flat_indexes, x.chunks)
5127                ]
5128            )
5129        )
5130
5131        full_slices = [slice(None, None) if i is None else None for i in flat_indexes]
5132
5133        name = "vindex-slice-" + token
5134        vindex_merge_name = "vindex-merge-" + token
5135        dsk = {}
5136        for okey in other_blocks:
5137            for i, key in enumerate(per_block):
5138                dsk[keyname(name, i, okey)] = (
5139                    _vindex_transpose,
5140                    (
5141                        _vindex_slice,
5142                        (x.name,) + interleave_none(okey, key),
5143                        interleave_none(
5144                            full_slices, list(zip(*pluck(2, per_block[key])))
5145                        ),
5146                    ),
5147                    axis,
5148                )
5149            dsk[keyname(vindex_merge_name, 0, okey)] = (
5150                _vindex_merge,
5151                [list(pluck(0, per_block[key])) for key in per_block],
5152                [keyname(name, i, okey) for i in range(len(per_block))],
5153            )
5154
5155        result_1d = Array(
5156            HighLevelGraph.from_collections(out_name, dsk, dependencies=[x]),
5157            out_name,
5158            chunks,
5159            x.dtype,
5160            meta=x._meta,
5161        )
5162        return result_1d.reshape(broadcast_shape + result_1d.shape[1:])
5163
5164    # output has a zero dimension, just create a new zero-shape array with the
5165    # same dtype
5166    from .wrap import empty
5167
5168    result_1d = empty(
5169        tuple(map(sum, chunks)), chunks=chunks, dtype=x.dtype, name=out_name
5170    )
5171    return result_1d.reshape(broadcast_shape + result_1d.shape[1:])
5172
5173
5174def _get_axis(indexes):
5175    """Get axis along which point-wise slicing results lie
5176
5177    This is mostly a hack because I can't figure out NumPy's rule on this and
5178    can't be bothered to go reading.
5179
5180    >>> _get_axis([[1, 2], None, [1, 2], None])
5181    0
5182    >>> _get_axis([None, [1, 2], [1, 2], None])
5183    1
5184    >>> _get_axis([None, None, [1, 2], [1, 2]])
5185    2
5186    """
5187    ndim = len(indexes)
5188    indexes = [slice(None, None) if i is None else [0] for i in indexes]
5189    x = np.empty((2,) * ndim)
5190    x2 = x[tuple(indexes)]
5191    return x2.shape.index(1)
5192
5193
5194def _vindex_slice(block, points):
5195    """Pull out point-wise slices from block"""
5196    points = [p if isinstance(p, slice) else list(p) for p in points]
5197    return block[tuple(points)]
5198
5199
5200def _vindex_transpose(block, axis):
5201    """Rotate block so that points are on the first dimension"""
5202    axes = [axis] + list(range(axis)) + list(range(axis + 1, block.ndim))
5203    return block.transpose(axes)
5204
5205
5206def _vindex_merge(locations, values):
5207    """
5208
5209    >>> locations = [0], [2, 1]
5210    >>> values = [np.array([[1, 2, 3]]),
5211    ...           np.array([[10, 20, 30], [40, 50, 60]])]
5212
5213    >>> _vindex_merge(locations, values)
5214    array([[ 1,  2,  3],
5215           [40, 50, 60],
5216           [10, 20, 30]])
5217    """
5218    locations = list(map(list, locations))
5219    values = list(values)
5220
5221    n = sum(map(len, locations))
5222
5223    shape = list(values[0].shape)
5224    shape[0] = n
5225    shape = tuple(shape)
5226
5227    dtype = values[0].dtype
5228
5229    x = np.empty_like(values[0], dtype=dtype, shape=shape)
5230
5231    ind = [slice(None, None) for i in range(x.ndim)]
5232    for loc, val in zip(locations, values):
5233        ind[0] = loc
5234        x[tuple(ind)] = val
5235
5236    return x
5237
5238
5239def to_npy_stack(dirname, x, axis=0):
5240    """Write dask array to a stack of .npy files
5241
5242    This partitions the dask.array along one axis and stores each block along
5243    that axis as a single .npy file in the specified directory
5244
5245    Examples
5246    --------
5247    >>> x = da.ones((5, 10, 10), chunks=(2, 4, 4))  # doctest: +SKIP
5248    >>> da.to_npy_stack('data/', x, axis=0)  # doctest: +SKIP
5249
5250    The ``.npy`` files store numpy arrays for ``x[0:2], x[2:4], and x[4:5]``
5251    respectively, as is specified by the chunk size along the zeroth axis::
5252
5253        $ tree data/
5254        data/
5255        |-- 0.npy
5256        |-- 1.npy
5257        |-- 2.npy
5258        |-- info
5259
5260    The ``info`` file stores the dtype, chunks, and axis information of the array.
5261    You can load these stacks with the :func:`dask.array.from_npy_stack` function.
5262
5263    >>> y = da.from_npy_stack('data/')  # doctest: +SKIP
5264
5265    See Also
5266    --------
5267    from_npy_stack
5268    """
5269
5270    chunks = tuple((c if i == axis else (sum(c),)) for i, c in enumerate(x.chunks))
5271    xx = x.rechunk(chunks)
5272
5273    if not os.path.exists(dirname):
5274        os.mkdir(dirname)
5275
5276    meta = {"chunks": chunks, "dtype": x.dtype, "axis": axis}
5277
5278    with open(os.path.join(dirname, "info"), "wb") as f:
5279        pickle.dump(meta, f)
5280
5281    name = "to-npy-stack-" + str(uuid.uuid1())
5282    dsk = {
5283        (name, i): (np.save, os.path.join(dirname, "%d.npy" % i), key)
5284        for i, key in enumerate(core.flatten(xx.__dask_keys__()))
5285    }
5286
5287    graph = HighLevelGraph.from_collections(name, dsk, dependencies=[xx])
5288    compute_as_if_collection(Array, graph, list(dsk))
5289
5290
5291def from_npy_stack(dirname, mmap_mode="r"):
5292    """Load dask array from stack of npy files
5293
5294    See :func:`dask.array.to_npy_stack` for docstring.
5295
5296    Parameters
5297    ----------
5298    dirname: string
5299        Directory of .npy files
5300    mmap_mode: (None or 'r')
5301        Read data in memory map mode
5302    """
5303    with open(os.path.join(dirname, "info"), "rb") as f:
5304        info = pickle.load(f)
5305
5306    dtype = info["dtype"]
5307    chunks = info["chunks"]
5308    axis = info["axis"]
5309
5310    name = "from-npy-stack-%s" % dirname
5311    keys = list(product([name], *[range(len(c)) for c in chunks]))
5312    values = [
5313        (np.load, os.path.join(dirname, "%d.npy" % i), mmap_mode)
5314        for i in range(len(chunks[axis]))
5315    ]
5316    dsk = dict(zip(keys, values))
5317
5318    return Array(dsk, name, chunks, dtype)
5319
5320
5321def new_da_object(dsk, name, chunks, meta=None, dtype=None):
5322    """Generic constructor for dask.array or dask.dataframe objects.
5323
5324    Decides the appropriate output class based on the type of `meta` provided.
5325    """
5326    if is_dataframe_like(meta) or is_series_like(meta) or is_index_like(meta):
5327        from ..dataframe.core import new_dd_object
5328
5329        assert all(len(c) == 1 for c in chunks[1:])
5330        divisions = [None] * (len(chunks[0]) + 1)
5331        return new_dd_object(dsk, name, meta, divisions)
5332    else:
5333        return Array(dsk, name=name, chunks=chunks, meta=meta, dtype=dtype)
5334
5335
5336class BlockView:
5337    """An array-like interface to the blocks of an array.
5338
5339    ``BlockView`` provides an array-like interface
5340    to the blocks of a dask array.  Numpy-style indexing of a
5341     ``BlockView`` returns a selection of blocks as a new dask array.
5342
5343    You can index ``BlockView`` like a numpy array of shape
5344    equal to the number of blocks in each dimension, (available as
5345    array.blocks.size).  The dimensionality of the output array matches
5346    the dimension of this array, even if integer indices are passed.
5347    Slicing with ``np.newaxis`` or multiple lists is not supported.
5348
5349    Examples
5350    --------
5351    >>> import dask.array as da
5352    >>> from dask.array.core import BlockView
5353    >>> x = da.arange(8, chunks=2)
5354    >>> bv = BlockView(x)
5355    >>> bv.shape # aliases x.numblocks
5356    (4,)
5357    >>> bv.size
5358    4
5359    >>> bv[0].compute()
5360    array([0, 1])
5361    >>> bv[:3].compute()
5362    array([0, 1, 2, 3, 4, 5])
5363    >>> bv[::2].compute()
5364    array([0, 1, 4, 5])
5365    >>> bv[[-1, 0]].compute()
5366    array([6, 7, 0, 1])
5367    >>> bv.ravel()  # doctest: +NORMALIZE_WHITESPACE
5368    [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
5369     dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
5370     dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>,
5371     dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>]
5372
5373    Returns
5374    -------
5375    An instance of ``da.array.Blockview``
5376    """
5377
5378    def __init__(self, array: Array) -> BlockView:
5379        self._array = array
5380
5381    def __getitem__(
5382        self, index: int | Sequence[int] | slice | Sequence[slice]
5383    ) -> Array:
5384        from .slicing import normalize_index
5385
5386        if not isinstance(index, tuple):
5387            index = (index,)
5388        if sum(isinstance(ind, (np.ndarray, list)) for ind in index) > 1:
5389            raise ValueError("Can only slice with a single list")
5390        if any(ind is None for ind in index):
5391            raise ValueError("Slicing with np.newaxis or None is not supported")
5392        index = normalize_index(index, self._array.numblocks)
5393        index = tuple(slice(k, k + 1) if isinstance(k, Number) else k for k in index)
5394
5395        name = "blocks-" + tokenize(self._array, index)
5396
5397        new_keys = self._array._key_array[index]
5398
5399        chunks = tuple(
5400            tuple(np.array(c)[i].tolist()) for c, i in zip(self._array.chunks, index)
5401        )
5402
5403        keys = product(*(range(len(c)) for c in chunks))
5404
5405        layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys}
5406
5407        graph = HighLevelGraph.from_collections(name, layer, dependencies=[self._array])
5408        return Array(graph, name, chunks, meta=self._array)
5409
5410    def __eq__(self, other: Any) -> bool:
5411        if isinstance(other, BlockView):
5412            return self._array is other._array
5413        else:
5414            return NotImplemented
5415
5416    @property
5417    def size(self) -> int:
5418        """
5419        The total number of blocks in the array.
5420        """
5421        return np.prod(self.shape)
5422
5423    @property
5424    def shape(self) -> tuple[int, ...]:
5425        """
5426        The number of blocks per axis. Alias of ``dask.array.numblocks``.
5427        """
5428        return self._array.numblocks
5429
5430    def ravel(self) -> list[Array]:
5431        """
5432        Return a flattened list of all the blocks in the array in C order.
5433        """
5434        return [self[idx] for idx in np.ndindex(self.shape)]
5435
5436
5437from .blockwise import blockwise
5438from .utils import compute_meta, meta_from_array
5439