1from __future__ import annotations 2 3import contextlib 4import math 5import operator 6import os 7import pickle 8import re 9import sys 10import traceback 11import uuid 12import warnings 13from bisect import bisect 14from collections.abc import ( 15 Collection, 16 Hashable, 17 Iterable, 18 Iterator, 19 Mapping, 20 MutableMapping, 21) 22from functools import partial, reduce, wraps 23from itertools import product, zip_longest 24from numbers import Integral, Number 25from operator import add, mul 26from threading import Lock 27from typing import Any, Sequence 28 29import numpy as np 30from fsspec import get_mapper 31from tlz import accumulate, concat, first, frequencies, groupby, partition 32from tlz.curried import pluck 33 34from .. import compute, config, core, threaded 35from ..base import ( 36 DaskMethodsMixin, 37 compute_as_if_collection, 38 dont_optimize, 39 is_dask_collection, 40 persist, 41 tokenize, 42) 43from ..blockwise import broadcast_dimensions 44from ..context import globalmethod 45from ..core import quote 46from ..delayed import Delayed, delayed 47from ..highlevelgraph import HighLevelGraph 48from ..layers import reshapelist 49from ..sizeof import sizeof 50from ..utils import ( 51 IndexCallable, 52 M, 53 SerializableLock, 54 cached_property, 55 concrete, 56 derived_from, 57 factors, 58 format_bytes, 59 funcname, 60 has_keyword, 61 is_arraylike, 62 is_dataframe_like, 63 is_index_like, 64 is_integer, 65 is_series_like, 66 ndeepmap, 67 ndimlist, 68 parse_bytes, 69 typename, 70) 71from ..widgets import get_template 72from . import chunk 73from .chunk import getitem 74from .chunk_types import is_valid_array_chunk, is_valid_chunk_type 75 76# Keep einsum_lookup and tensordot_lookup here for backwards compatibility 77from .dispatch import concatenate_lookup, einsum_lookup, tensordot_lookup # noqa: F401 78from .numpy_compat import _numpy_120, _Recurser 79from .slicing import cached_cumsum, replace_ellipsis, setitem_array, slice_array 80 81config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}}) 82 83unknown_chunk_message = ( 84 "\n\n" 85 "A possible solution: " 86 "https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks\n" 87 "Summary: to compute chunks sizes, use\n\n" 88 " x.compute_chunk_sizes() # for Dask Array `x`\n" 89 " ddf.to_dask_array(lengths=True) # for Dask DataFrame `ddf`" 90) 91 92 93class PerformanceWarning(Warning): 94 """A warning given when bad chunking may cause poor performance""" 95 96 97def getter(a, b, asarray=True, lock=None): 98 if isinstance(b, tuple) and any(x is None for x in b): 99 b2 = tuple(x for x in b if x is not None) 100 b3 = tuple( 101 None if x is None else slice(None, None) 102 for x in b 103 if not isinstance(x, Integral) 104 ) 105 return getter(a, b2, asarray=asarray, lock=lock)[b3] 106 107 if lock: 108 lock.acquire() 109 try: 110 c = a[b] 111 # Below we special-case `np.matrix` to force a conversion to 112 # `np.ndarray` and preserve original Dask behavior for `getter`, 113 # as for all purposes `np.matrix` is array-like and thus 114 # `is_arraylike` evaluates to `True` in that case. 115 if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)): 116 c = np.asarray(c) 117 finally: 118 if lock: 119 lock.release() 120 return c 121 122 123def getter_nofancy(a, b, asarray=True, lock=None): 124 """A simple wrapper around ``getter``. 125 126 Used to indicate to the optimization passes that the backend doesn't 127 support fancy indexing. 128 """ 129 return getter(a, b, asarray=asarray, lock=lock) 130 131 132def getter_inline(a, b, asarray=True, lock=None): 133 """A getter function that optimizations feel comfortable inlining 134 135 Slicing operations with this function may be inlined into a graph, such as 136 in the following rewrite 137 138 **Before** 139 140 >>> a = x[:10] # doctest: +SKIP 141 >>> b = a + 1 # doctest: +SKIP 142 >>> c = a * 2 # doctest: +SKIP 143 144 **After** 145 146 >>> b = x[:10] + 1 # doctest: +SKIP 147 >>> c = x[:10] * 2 # doctest: +SKIP 148 149 This inlining can be relevant to operations when running off of disk. 150 """ 151 return getter(a, b, asarray=asarray, lock=lock) 152 153 154from .optimization import fuse_slice, optimize 155 156# __array_function__ dict for mapping aliases and mismatching names 157_HANDLED_FUNCTIONS = {} 158 159 160def implements(*numpy_functions): 161 """Register an __array_function__ implementation for dask.array.Array 162 163 Register that a function implements the API of a NumPy function (or several 164 NumPy functions in case of aliases) which is handled with 165 ``__array_function__``. 166 167 Parameters 168 ---------- 169 \\*numpy_functions : callables 170 One or more NumPy functions that are handled by ``__array_function__`` 171 and will be mapped by `implements` to a `dask.array` function. 172 """ 173 174 def decorator(dask_func): 175 for numpy_function in numpy_functions: 176 _HANDLED_FUNCTIONS[numpy_function] = dask_func 177 178 return dask_func 179 180 return decorator 181 182 183def _should_delegate(other) -> bool: 184 """Check whether Dask should delegate to the other. 185 This implementation follows NEP-13: 186 https://numpy.org/neps/nep-0013-ufunc-overrides.html#behavior-in-combination-with-python-s-binary-operations 187 """ 188 if hasattr(other, "__array_ufunc__") and other.__array_ufunc__ is None: 189 return True 190 elif ( 191 hasattr(other, "__array_ufunc__") 192 and not is_valid_array_chunk(other) 193 and type(other).__array_ufunc__ is not Array.__array_ufunc__ 194 ): 195 return True 196 return False 197 198 199def check_if_handled_given_other(f): 200 """Check if method is handled by Dask given type of other 201 202 Ensures proper deferral to upcast types in dunder operations without 203 assuming unknown types are automatically downcast types. 204 """ 205 206 @wraps(f) 207 def wrapper(self, other): 208 if _should_delegate(other): 209 return NotImplemented 210 else: 211 return f(self, other) 212 213 return wrapper 214 215 216def slices_from_chunks(chunks): 217 """Translate chunks tuple to a set of slices in product order 218 219 >>> slices_from_chunks(((2, 2), (3, 3, 3))) # doctest: +NORMALIZE_WHITESPACE 220 [(slice(0, 2, None), slice(0, 3, None)), 221 (slice(0, 2, None), slice(3, 6, None)), 222 (slice(0, 2, None), slice(6, 9, None)), 223 (slice(2, 4, None), slice(0, 3, None)), 224 (slice(2, 4, None), slice(3, 6, None)), 225 (slice(2, 4, None), slice(6, 9, None))] 226 """ 227 cumdims = [cached_cumsum(bds, initial_zero=True) for bds in chunks] 228 slices = [ 229 [slice(s, s + dim) for s, dim in zip(starts, shapes)] 230 for starts, shapes in zip(cumdims, chunks) 231 ] 232 return list(product(*slices)) 233 234 235def getem( 236 arr, 237 chunks, 238 getitem=getter, 239 shape=None, 240 out_name=None, 241 lock=False, 242 asarray=True, 243 dtype=None, 244): 245 """Dask getting various chunks from an array-like 246 247 >>> getem('X', chunks=(2, 3), shape=(4, 6)) # doctest: +SKIP 248 {('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))), 249 ('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))), 250 ('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))), 251 ('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))} 252 253 >>> getem('X', chunks=((2, 2), (3, 3))) # doctest: +SKIP 254 {('X', 0, 0): (getter, 'X', (slice(0, 2), slice(0, 3))), 255 ('X', 1, 0): (getter, 'X', (slice(2, 4), slice(0, 3))), 256 ('X', 1, 1): (getter, 'X', (slice(2, 4), slice(3, 6))), 257 ('X', 0, 1): (getter, 'X', (slice(0, 2), slice(3, 6)))} 258 """ 259 out_name = out_name or arr 260 chunks = normalize_chunks(chunks, shape, dtype=dtype) 261 keys = product([out_name], *(range(len(bds)) for bds in chunks)) 262 slices = slices_from_chunks(chunks) 263 264 if ( 265 has_keyword(getitem, "asarray") 266 and has_keyword(getitem, "lock") 267 and (not asarray or lock) 268 ): 269 values = [(getitem, arr, x, asarray, lock) for x in slices] 270 else: 271 # Common case, drop extra parameters 272 values = [(getitem, arr, x) for x in slices] 273 274 return dict(zip(keys, values)) 275 276 277def dotmany(A, B, leftfunc=None, rightfunc=None, **kwargs): 278 """Dot product of many aligned chunks 279 280 >>> x = np.array([[1, 2], [1, 2]]) 281 >>> y = np.array([[10, 20], [10, 20]]) 282 >>> dotmany([x, x, x], [y, y, y]) 283 array([[ 90, 180], 284 [ 90, 180]]) 285 286 Optionally pass in functions to apply to the left and right chunks 287 288 >>> dotmany([x, x, x], [y, y, y], rightfunc=np.transpose) 289 array([[150, 150], 290 [150, 150]]) 291 """ 292 if leftfunc: 293 A = map(leftfunc, A) 294 if rightfunc: 295 B = map(rightfunc, B) 296 return sum(map(partial(np.dot, **kwargs), A, B)) 297 298 299def _concatenate2(arrays, axes=[]): 300 """Recursively concatenate nested lists of arrays along axes 301 302 Each entry in axes corresponds to each level of the nested list. The 303 length of axes should correspond to the level of nesting of arrays. 304 If axes is an empty list or tuple, return arrays, or arrays[0] if 305 arrays is a list. 306 307 >>> x = np.array([[1, 2], [3, 4]]) 308 >>> _concatenate2([x, x], axes=[0]) 309 array([[1, 2], 310 [3, 4], 311 [1, 2], 312 [3, 4]]) 313 314 >>> _concatenate2([x, x], axes=[1]) 315 array([[1, 2, 1, 2], 316 [3, 4, 3, 4]]) 317 318 >>> _concatenate2([[x, x], [x, x]], axes=[0, 1]) 319 array([[1, 2, 1, 2], 320 [3, 4, 3, 4], 321 [1, 2, 1, 2], 322 [3, 4, 3, 4]]) 323 324 Supports Iterators 325 >>> _concatenate2(iter([x, x]), axes=[1]) 326 array([[1, 2, 1, 2], 327 [3, 4, 3, 4]]) 328 329 Special Case 330 >>> _concatenate2([x, x], axes=()) 331 array([[1, 2], 332 [3, 4]]) 333 """ 334 if axes == (): 335 if isinstance(arrays, list): 336 return arrays[0] 337 else: 338 return arrays 339 340 if isinstance(arrays, Iterator): 341 arrays = list(arrays) 342 if not isinstance(arrays, (list, tuple)): 343 return arrays 344 if len(axes) > 1: 345 arrays = [_concatenate2(a, axes=axes[1:]) for a in arrays] 346 concatenate = concatenate_lookup.dispatch( 347 type(max(arrays, key=lambda x: getattr(x, "__array_priority__", 0))) 348 ) 349 if isinstance(arrays[0], dict): 350 # Handle concatenation of `dict`s, used as a replacement for structured 351 # arrays when that's not supported by the array library (e.g., CuPy). 352 keys = list(arrays[0].keys()) 353 assert all(list(a.keys()) == keys for a in arrays) 354 ret = dict() 355 for k in keys: 356 ret[k] = concatenate(list(a[k] for a in arrays), axis=axes[0]) 357 return ret 358 else: 359 return concatenate(arrays, axis=axes[0]) 360 361 362def apply_infer_dtype(func, args, kwargs, funcname, suggest_dtype="dtype", nout=None): 363 """ 364 Tries to infer output dtype of ``func`` for a small set of input arguments. 365 366 Parameters 367 ---------- 368 func: Callable 369 Function for which output dtype is to be determined 370 371 args: List of array like 372 Arguments to the function, which would usually be used. Only attributes 373 ``ndim`` and ``dtype`` are used. 374 375 kwargs: dict 376 Additional ``kwargs`` to the ``func`` 377 378 funcname: String 379 Name of calling function to improve potential error messages 380 381 suggest_dtype: None/False or String 382 If not ``None`` adds suggestion to potential error message to specify a dtype 383 via the specified kwarg. Defaults to ``'dtype'``. 384 385 nout: None or Int 386 ``None`` if function returns single output, integer if many. 387 Deafults to ``None``. 388 389 Returns 390 ------- 391 : dtype or List of dtype 392 One or many dtypes (depending on ``nout``) 393 """ 394 from .utils import meta_from_array 395 396 # make sure that every arg is an evaluated array 397 args = [ 398 np.ones_like(meta_from_array(x), shape=((1,) * x.ndim), dtype=x.dtype) 399 if is_arraylike(x) 400 else x 401 for x in args 402 ] 403 try: 404 with np.errstate(all="ignore"): 405 o = func(*args, **kwargs) 406 except Exception as e: 407 exc_type, exc_value, exc_traceback = sys.exc_info() 408 tb = "".join(traceback.format_tb(exc_traceback)) 409 suggest = ( 410 ( 411 "Please specify the dtype explicitly using the " 412 "`{dtype}` kwarg.\n\n".format(dtype=suggest_dtype) 413 ) 414 if suggest_dtype 415 else "" 416 ) 417 msg = ( 418 f"`dtype` inference failed in `{funcname}`.\n\n" 419 f"{suggest}" 420 "Original error is below:\n" 421 "------------------------\n" 422 f"{e!r}\n\n" 423 "Traceback:\n" 424 "---------\n" 425 f"{tb}" 426 ) 427 else: 428 msg = None 429 if msg is not None: 430 raise ValueError(msg) 431 return o.dtype if nout is None else tuple(e.dtype for e in o) 432 433 434def normalize_arg(x): 435 """Normalize user provided arguments to blockwise or map_blocks 436 437 We do a few things: 438 439 1. If they are string literals that might collide with blockwise_token then we 440 quote them 441 2. IF they are large (as defined by sizeof) then we put them into the 442 graph on their own by using dask.delayed 443 """ 444 if is_dask_collection(x): 445 return x 446 elif isinstance(x, str) and re.match(r"_\d+", x): 447 return delayed(x) 448 elif isinstance(x, list) and len(x) >= 10: 449 return delayed(x) 450 elif sizeof(x) > 1e6: 451 return delayed(x) 452 else: 453 return x 454 455 456def _pass_extra_kwargs(func, keys, *args, **kwargs): 457 """Helper for :func:`dask.array.map_blocks` to pass `block_info` or `block_id`. 458 459 For each element of `keys`, a corresponding element of args is changed 460 to a keyword argument with that key, before all arguments re passed on 461 to `func`. 462 """ 463 kwargs.update(zip(keys, args)) 464 return func(*args[len(keys) :], **kwargs) 465 466 467def map_blocks( 468 func, 469 *args, 470 name=None, 471 token=None, 472 dtype=None, 473 chunks=None, 474 drop_axis=[], 475 new_axis=None, 476 meta=None, 477 **kwargs, 478): 479 """Map a function across all blocks of a dask array. 480 481 Note that ``map_blocks`` will attempt to automatically determine the output 482 array type by calling ``func`` on 0-d versions of the inputs. Please refer to 483 the ``meta`` keyword argument below if you expect that the function will not 484 succeed when operating on 0-d arrays. 485 486 Parameters 487 ---------- 488 func : callable 489 Function to apply to every block in the array. 490 args : dask arrays or other objects 491 dtype : np.dtype, optional 492 The ``dtype`` of the output array. It is recommended to provide this. 493 If not provided, will be inferred by applying the function to a small 494 set of fake data. 495 chunks : tuple, optional 496 Chunk shape of resulting blocks if the function does not preserve 497 shape. If not provided, the resulting array is assumed to have the same 498 block structure as the first input array. 499 drop_axis : number or iterable, optional 500 Dimensions lost by the function. 501 new_axis : number or iterable, optional 502 New dimensions created by the function. Note that these are applied 503 after ``drop_axis`` (if present). 504 token : string, optional 505 The key prefix to use for the output array. If not provided, will be 506 determined from the function name. 507 name : string, optional 508 The key name to use for the output array. Note that this fully 509 specifies the output key name, and must be unique. If not provided, 510 will be determined by a hash of the arguments. 511 meta : array-like, optional 512 The ``meta`` of the output array, when specified is expected to be an 513 array of the same type and dtype of that returned when calling ``.compute()`` 514 on the array returned by this function. When not provided, ``meta`` will be 515 inferred by applying the function to a small set of fake data, usually a 516 0-d array. It's important to ensure that ``func`` can successfully complete 517 computation without raising exceptions when 0-d is passed to it, providing 518 ``meta`` will be required otherwise. If the output type is known beforehand 519 (e.g., ``np.ndarray``, ``cupy.ndarray``), an empty array of such type dtype 520 can be passed, for example: ``meta=np.array((), dtype=np.int32)``. 521 **kwargs : 522 Other keyword arguments to pass to function. Values must be constants 523 (not dask.arrays) 524 525 See Also 526 -------- 527 dask.array.blockwise : Generalized operation with control over block alignment. 528 529 Examples 530 -------- 531 >>> import dask.array as da 532 >>> x = da.arange(6, chunks=3) 533 534 >>> x.map_blocks(lambda x: x * 2).compute() 535 array([ 0, 2, 4, 6, 8, 10]) 536 537 The ``da.map_blocks`` function can also accept multiple arrays. 538 539 >>> d = da.arange(5, chunks=2) 540 >>> e = da.arange(5, chunks=2) 541 542 >>> f = da.map_blocks(lambda a, b: a + b**2, d, e) 543 >>> f.compute() 544 array([ 0, 2, 6, 12, 20]) 545 546 If the function changes shape of the blocks then you must provide chunks 547 explicitly. 548 549 >>> y = x.map_blocks(lambda x: x[::2], chunks=((2, 2),)) 550 551 You have a bit of freedom in specifying chunks. If all of the output chunk 552 sizes are the same, you can provide just that chunk size as a single tuple. 553 554 >>> a = da.arange(18, chunks=(6,)) 555 >>> b = a.map_blocks(lambda x: x[:3], chunks=(3,)) 556 557 If the function changes the dimension of the blocks you must specify the 558 created or destroyed dimensions. 559 560 >>> b = a.map_blocks(lambda x: x[None, :, None], chunks=(1, 6, 1), 561 ... new_axis=[0, 2]) 562 563 If ``chunks`` is specified but ``new_axis`` is not, then it is inferred to 564 add the necessary number of axes on the left. 565 566 Map_blocks aligns blocks by block positions without regard to shape. In the 567 following example we have two arrays with the same number of blocks but 568 with different shape and chunk sizes. 569 570 >>> x = da.arange(1000, chunks=(100,)) 571 >>> y = da.arange(100, chunks=(10,)) 572 573 The relevant attribute to match is numblocks. 574 575 >>> x.numblocks 576 (10,) 577 >>> y.numblocks 578 (10,) 579 580 If these match (up to broadcasting rules) then we can map arbitrary 581 functions across blocks 582 583 >>> def func(a, b): 584 ... return np.array([a.max(), b.max()]) 585 586 >>> da.map_blocks(func, x, y, chunks=(2,), dtype='i8') 587 dask.array<func, shape=(20,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray> 588 589 >>> _.compute() 590 array([ 99, 9, 199, 19, 299, 29, 399, 39, 499, 49, 599, 59, 699, 591 69, 799, 79, 899, 89, 999, 99]) 592 593 Your block function get information about where it is in the array by 594 accepting a special ``block_info`` or ``block_id`` keyword argument. 595 596 >>> def func(block_info=None): 597 ... pass 598 599 This will receive the following information: 600 601 >>> block_info # doctest: +SKIP 602 {0: {'shape': (1000,), 603 'num-chunks': (10,), 604 'chunk-location': (4,), 605 'array-location': [(400, 500)]}, 606 None: {'shape': (1000,), 607 'num-chunks': (10,), 608 'chunk-location': (4,), 609 'array-location': [(400, 500)], 610 'chunk-shape': (100,), 611 'dtype': dtype('float64')}} 612 613 For each argument and keyword arguments that are dask arrays (the positions 614 of which are the first index), you will receive the shape of the full 615 array, the number of chunks of the full array in each dimension, the chunk 616 location (for example the fourth chunk over in the first dimension), and 617 the array location (for example the slice corresponding to ``40:50``). The 618 same information is provided for the output, with the key ``None``, plus 619 the shape and dtype that should be returned. 620 621 These features can be combined to synthesize an array from scratch, for 622 example: 623 624 >>> def func(block_info=None): 625 ... loc = block_info[None]['array-location'][0] 626 ... return np.arange(loc[0], loc[1]) 627 628 >>> da.map_blocks(func, chunks=((4, 4),), dtype=np.float_) 629 dask.array<func, shape=(8,), dtype=float64, chunksize=(4,), chunktype=numpy.ndarray> 630 631 >>> _.compute() 632 array([0, 1, 2, 3, 4, 5, 6, 7]) 633 634 ``block_id`` is similar to ``block_info`` but contains only the ``chunk_location``: 635 636 >>> def func(block_id=None): 637 ... pass 638 639 This will receive the following information: 640 641 >>> block_id # doctest: +SKIP 642 (4, 3) 643 644 You may specify the key name prefix of the resulting task in the graph with 645 the optional ``token`` keyword argument. 646 647 >>> x.map_blocks(lambda x: x + 1, name='increment') 648 dask.array<increment, shape=(1000,), dtype=int64, chunksize=(100,), chunktype=numpy.ndarray> 649 650 For functions that may not handle 0-d arrays, it's also possible to specify 651 ``meta`` with an empty array matching the type of the expected result. In 652 the example below, ``func`` will result in an ``IndexError`` when computing 653 ``meta``: 654 655 >>> da.map_blocks(lambda x: x[2], da.random.random(5), meta=np.array(())) 656 dask.array<lambda, shape=(5,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray> 657 658 Similarly, it's possible to specify a non-NumPy array to ``meta``, and provide 659 a ``dtype``: 660 661 >>> import cupy # doctest: +SKIP 662 >>> rs = da.random.RandomState(RandomState=cupy.random.RandomState) # doctest: +SKIP 663 >>> dt = np.float32 664 >>> da.map_blocks(lambda x: x[2], rs.random(5, dtype=dt), meta=cupy.array((), dtype=dt)) # doctest: +SKIP 665 dask.array<lambda, shape=(5,), dtype=float32, chunksize=(5,), chunktype=cupy.ndarray> 666 """ 667 if not callable(func): 668 msg = ( 669 "First argument must be callable function, not %s\n" 670 "Usage: da.map_blocks(function, x)\n" 671 " or: da.map_blocks(function, x, y, z)" 672 ) 673 raise TypeError(msg % type(func).__name__) 674 if token: 675 warnings.warn("The token= keyword to map_blocks has been moved to name=") 676 name = token 677 678 name = f"{name or funcname(func)}-{tokenize(func, *args, **kwargs)}" 679 new_axes = {} 680 681 if isinstance(drop_axis, Number): 682 drop_axis = [drop_axis] 683 if isinstance(new_axis, Number): 684 new_axis = [new_axis] # TODO: handle new_axis 685 686 arrs = [a for a in args if isinstance(a, Array)] 687 688 argpairs = [ 689 (a, tuple(range(a.ndim))[::-1]) if isinstance(a, Array) else (a, None) 690 for a in args 691 ] 692 if arrs: 693 out_ind = tuple(range(max(a.ndim for a in arrs)))[::-1] 694 else: 695 out_ind = () 696 697 original_kwargs = kwargs 698 699 if dtype is None and meta is None: 700 try: 701 meta = compute_meta(func, dtype, *args, **kwargs) 702 except Exception: 703 pass 704 705 dtype = apply_infer_dtype(func, args, original_kwargs, "map_blocks") 706 707 if drop_axis: 708 ndim_out = len(out_ind) 709 if any(i < -ndim_out or i >= ndim_out for i in drop_axis): 710 raise ValueError( 711 f"drop_axis out of range (drop_axis={drop_axis}, " 712 f"but output is {ndim_out}d)." 713 ) 714 drop_axis = [i % ndim_out for i in drop_axis] 715 out_ind = tuple(x for i, x in enumerate(out_ind) if i not in drop_axis) 716 if new_axis is None and chunks is not None and len(out_ind) < len(chunks): 717 new_axis = range(len(chunks) - len(out_ind)) 718 if new_axis: 719 # new_axis = [x + len(drop_axis) for x in new_axis] 720 out_ind = list(out_ind) 721 for ax in sorted(new_axis): 722 n = len(out_ind) + len(drop_axis) 723 out_ind.insert(ax, n) 724 if chunks is not None: 725 new_axes[n] = chunks[ax] 726 else: 727 new_axes[n] = 1 728 out_ind = tuple(out_ind) 729 if max(new_axis) > max(out_ind): 730 raise ValueError("New_axis values do not fill in all dimensions") 731 732 if chunks is not None: 733 if len(chunks) != len(out_ind): 734 raise ValueError( 735 f"Provided chunks have {len(chunks)} dims; expected {len(out_ind)} dims" 736 ) 737 adjust_chunks = dict(zip(out_ind, chunks)) 738 else: 739 adjust_chunks = None 740 741 out = blockwise( 742 func, 743 out_ind, 744 *concat(argpairs), 745 name=name, 746 new_axes=new_axes, 747 dtype=dtype, 748 concatenate=True, 749 align_arrays=False, 750 adjust_chunks=adjust_chunks, 751 meta=meta, 752 **kwargs, 753 ) 754 755 extra_argpairs = [] 756 extra_names = [] 757 # If func has block_id as an argument, construct an array of block IDs and 758 # prepare to inject it. 759 if has_keyword(func, "block_id"): 760 block_id_name = "block-id-" + out.name 761 block_id_dsk = { 762 (block_id_name,) + block_id: block_id 763 for block_id in product(*(range(len(c)) for c in out.chunks)) 764 } 765 block_id_array = Array( 766 block_id_dsk, 767 block_id_name, 768 chunks=tuple((1,) * len(c) for c in out.chunks), 769 dtype=np.object_, 770 ) 771 extra_argpairs.append((block_id_array, out_ind)) 772 extra_names.append("block_id") 773 774 # If func has block_info as an argument, construct an array of block info 775 # objects and prepare to inject it. 776 if has_keyword(func, "block_info"): 777 starts = {} 778 num_chunks = {} 779 shapes = {} 780 781 for i, (arg, in_ind) in enumerate(argpairs): 782 if in_ind is not None: 783 shapes[i] = arg.shape 784 if drop_axis: 785 # We concatenate along dropped axes, so we need to treat them 786 # as if there is only a single chunk. 787 starts[i] = [ 788 ( 789 cached_cumsum(arg.chunks[j], initial_zero=True) 790 if ind in out_ind 791 else [0, arg.shape[j]] 792 ) 793 for j, ind in enumerate(in_ind) 794 ] 795 num_chunks[i] = tuple(len(s) - 1 for s in starts[i]) 796 else: 797 starts[i] = [ 798 cached_cumsum(c, initial_zero=True) for c in arg.chunks 799 ] 800 num_chunks[i] = arg.numblocks 801 out_starts = [cached_cumsum(c, initial_zero=True) for c in out.chunks] 802 803 block_info_name = "block-info-" + out.name 804 block_info_dsk = {} 805 for block_id in product(*(range(len(c)) for c in out.chunks)): 806 # Get position of chunk, indexed by axis labels 807 location = {out_ind[i]: loc for i, loc in enumerate(block_id)} 808 info = {} 809 for i, shape in shapes.items(): 810 # Compute chunk key in the array, taking broadcasting into 811 # account. We don't directly know which dimensions are 812 # broadcast, but any dimension with only one chunk can be 813 # treated as broadcast. 814 arr_k = tuple( 815 location.get(ind, 0) if num_chunks[i][j] > 1 else 0 816 for j, ind in enumerate(argpairs[i][1]) 817 ) 818 info[i] = { 819 "shape": shape, 820 "num-chunks": num_chunks[i], 821 "array-location": [ 822 (starts[i][ij][j], starts[i][ij][j + 1]) 823 for ij, j in enumerate(arr_k) 824 ], 825 "chunk-location": arr_k, 826 } 827 828 info[None] = { 829 "shape": out.shape, 830 "num-chunks": out.numblocks, 831 "array-location": [ 832 (out_starts[ij][j], out_starts[ij][j + 1]) 833 for ij, j in enumerate(block_id) 834 ], 835 "chunk-location": block_id, 836 "chunk-shape": tuple( 837 out.chunks[ij][j] for ij, j in enumerate(block_id) 838 ), 839 "dtype": dtype, 840 } 841 block_info_dsk[(block_info_name,) + block_id] = info 842 843 block_info = Array( 844 block_info_dsk, 845 block_info_name, 846 chunks=tuple((1,) * len(c) for c in out.chunks), 847 dtype=np.object_, 848 ) 849 extra_argpairs.append((block_info, out_ind)) 850 extra_names.append("block_info") 851 852 if extra_argpairs: 853 # Rewrite the Blockwise layer. It would be nice to find a way to 854 # avoid doing it twice, but it's currently needed to determine 855 # out.chunks from the first pass. Since it constructs a Blockwise 856 # rather than an expanded graph, it shouldn't be too expensive. 857 out = blockwise( 858 _pass_extra_kwargs, 859 out_ind, 860 func, 861 None, 862 tuple(extra_names), 863 None, 864 *concat(extra_argpairs), 865 *concat(argpairs), 866 name=out.name, 867 dtype=out.dtype, 868 concatenate=True, 869 align_arrays=False, 870 adjust_chunks=dict(zip(out_ind, out.chunks)), 871 meta=meta, 872 **kwargs, 873 ) 874 875 return out 876 877 878def broadcast_chunks(*chunkss): 879 """Construct a chunks tuple that broadcasts many chunks tuples 880 881 >>> a = ((5, 5),) 882 >>> b = ((5, 5),) 883 >>> broadcast_chunks(a, b) 884 ((5, 5),) 885 886 >>> a = ((10, 10, 10), (5, 5),) 887 >>> b = ((5, 5),) 888 >>> broadcast_chunks(a, b) 889 ((10, 10, 10), (5, 5)) 890 891 >>> a = ((10, 10, 10), (5, 5),) 892 >>> b = ((1,), (5, 5),) 893 >>> broadcast_chunks(a, b) 894 ((10, 10, 10), (5, 5)) 895 896 >>> a = ((10, 10, 10), (5, 5),) 897 >>> b = ((3, 3,), (5, 5),) 898 >>> broadcast_chunks(a, b) 899 Traceback (most recent call last): 900 ... 901 ValueError: Chunks do not align: [(10, 10, 10), (3, 3)] 902 """ 903 if not chunkss: 904 return () 905 elif len(chunkss) == 1: 906 return chunkss[0] 907 n = max(map(len, chunkss)) 908 chunkss2 = [((1,),) * (n - len(c)) + c for c in chunkss] 909 result = [] 910 for i in range(n): 911 step1 = [c[i] for c in chunkss2] 912 if all(c == (1,) for c in step1): 913 step2 = step1 914 else: 915 step2 = [c for c in step1 if c != (1,)] 916 if len(set(step2)) != 1: 917 raise ValueError("Chunks do not align: %s" % str(step2)) 918 result.append(step2[0]) 919 return tuple(result) 920 921 922def store( 923 sources: Array | Collection[Array], 924 targets, 925 lock: bool | Lock = True, 926 regions: tuple[slice, ...] | Collection[tuple[slice, ...]] | None = None, 927 compute: bool = True, 928 return_stored: bool = False, 929 **kwargs, 930): 931 """Store dask arrays in array-like objects, overwrite data in target 932 933 This stores dask arrays into object that supports numpy-style setitem 934 indexing. It stores values chunk by chunk so that it does not have to 935 fill up memory. For best performance you can align the block size of 936 the storage target with the block size of your array. 937 938 If your data fits in memory then you may prefer calling 939 ``np.array(myarray)`` instead. 940 941 Parameters 942 ---------- 943 944 sources: Array or collection of Arrays 945 targets: array-like or Delayed or collection of array-likes and/or Delayeds 946 These should support setitem syntax ``target[10:20] = ...`` 947 lock: boolean or threading.Lock, optional 948 Whether or not to lock the data stores while storing. 949 Pass True (lock each file individually), False (don't lock) or a 950 particular :class:`threading.Lock` object to be shared among all writes. 951 regions: tuple of slices or collection of tuples of slices 952 Each ``region`` tuple in ``regions`` should be such that 953 ``target[region].shape = source.shape`` 954 for the corresponding source and target in sources and targets, 955 respectively. If this is a tuple, the contents will be assumed to be 956 slices, so do not provide a tuple of tuples. 957 compute: boolean, optional 958 If true compute immediately; return :class:`dask.delayed.Delayed` otherwise. 959 return_stored: boolean, optional 960 Optionally return the stored result (default False). 961 kwargs: 962 Parameters passed to compute/persist (only used if compute=True) 963 964 Returns 965 ------- 966 967 If return_stored=True 968 tuple of Arrays 969 If return_stored=False and compute=True 970 None 971 If return_stored=False and compute=False 972 Delayed 973 974 Examples 975 -------- 976 977 >>> import h5py # doctest: +SKIP 978 >>> f = h5py.File('myfile.hdf5', mode='a') # doctest: +SKIP 979 >>> dset = f.create_dataset('/data', shape=x.shape, 980 ... chunks=x.chunks, 981 ... dtype='f8') # doctest: +SKIP 982 983 >>> store(x, dset) # doctest: +SKIP 984 985 Alternatively store many arrays at the same time 986 987 >>> store([x, y, z], [dset1, dset2, dset3]) # doctest: +SKIP 988 """ 989 990 if isinstance(sources, Array): 991 sources = [sources] 992 targets = [targets] 993 994 if any(not isinstance(s, Array) for s in sources): 995 raise ValueError("All sources must be dask array objects") 996 997 if len(sources) != len(targets): 998 raise ValueError( 999 "Different number of sources [%d] and targets [%d]" 1000 % (len(sources), len(targets)) 1001 ) 1002 1003 if isinstance(regions, tuple) or regions is None: 1004 regions = [regions] 1005 1006 if len(sources) > 1 and len(regions) == 1: 1007 regions *= len(sources) 1008 1009 if len(sources) != len(regions): 1010 raise ValueError( 1011 "Different number of sources [%d] and targets [%d] than regions [%d]" 1012 % (len(sources), len(targets), len(regions)) 1013 ) 1014 1015 # Optimize all sources together 1016 sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources]) 1017 sources_layer = Array.__dask_optimize__( 1018 sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources])) 1019 ) 1020 sources_name = "store-sources-" + tokenize(sources) 1021 layers = {sources_name: sources_layer} 1022 dependencies = {sources_name: set()} 1023 1024 # Optimize all targets together 1025 targets_keys = [] 1026 targets_dsks = [] 1027 for t in targets: 1028 if isinstance(t, Delayed): 1029 targets_keys.append(t.key) 1030 targets_dsks.append(t.__dask_graph__()) 1031 elif is_dask_collection(t): 1032 raise TypeError("Targets must be either Delayed objects or array-likes") 1033 1034 if targets_dsks: 1035 targets_hlg = HighLevelGraph.merge(*targets_dsks) 1036 targets_layer = Delayed.__dask_optimize__(targets_hlg, targets_keys) 1037 targets_name = "store-targets-" + tokenize(targets_keys) 1038 layers[targets_name] = targets_layer 1039 dependencies[targets_name] = set() 1040 1041 load_stored = return_stored and not compute 1042 1043 map_names = [ 1044 "store-map-" + tokenize(s, t if isinstance(t, Delayed) else id(t), r) 1045 for s, t, r in zip(sources, targets, regions) 1046 ] 1047 map_keys = [] 1048 1049 for s, t, n, r in zip(sources, targets, map_names, regions): 1050 map_layer = insert_to_ooc( 1051 keys=s.__dask_keys__(), 1052 chunks=s.chunks, 1053 out=t.key if isinstance(t, Delayed) else t, 1054 name=n, 1055 lock=lock, 1056 region=r, 1057 return_stored=return_stored, 1058 load_stored=load_stored, 1059 ) 1060 layers[n] = map_layer 1061 if isinstance(t, Delayed): 1062 dependencies[n] = {sources_name, targets_name} 1063 else: 1064 dependencies[n] = {sources_name} 1065 map_keys += map_layer.keys() 1066 1067 if return_stored: 1068 store_dsk = HighLevelGraph(layers, dependencies) 1069 load_store_dsk = store_dsk 1070 if compute: 1071 store_dlyds = [Delayed(k, store_dsk) for k in map_keys] 1072 store_dlyds = persist(*store_dlyds, **kwargs) 1073 store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds]) 1074 load_store_dsk = retrieve_from_ooc(map_keys, store_dsk, store_dsk_2) 1075 map_names = ["load-" + n for n in map_names] 1076 1077 return tuple( 1078 Array(load_store_dsk, n, s.chunks, meta=s) 1079 for s, n in zip(sources, map_names) 1080 ) 1081 1082 elif compute: 1083 store_dsk = HighLevelGraph(layers, dependencies) 1084 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs) 1085 return None 1086 1087 else: 1088 key = "store-" + tokenize(map_names) 1089 layers[key] = {key: map_keys} 1090 dependencies[key] = set(map_names) 1091 store_dsk = HighLevelGraph(layers, dependencies) 1092 return Delayed(key, store_dsk) 1093 1094 1095def blockdims_from_blockshape(shape, chunks): 1096 """ 1097 1098 >>> blockdims_from_blockshape((10, 10), (4, 3)) 1099 ((4, 4, 2), (3, 3, 3, 1)) 1100 >>> blockdims_from_blockshape((10, 0), (4, 0)) 1101 ((4, 4, 2), (0,)) 1102 """ 1103 if chunks is None: 1104 raise TypeError("Must supply chunks= keyword argument") 1105 if shape is None: 1106 raise TypeError("Must supply shape= keyword argument") 1107 if np.isnan(sum(shape)) or np.isnan(sum(chunks)): 1108 raise ValueError( 1109 "Array chunk sizes are unknown. shape: %s, chunks: %s%s" 1110 % (shape, chunks, unknown_chunk_message) 1111 ) 1112 if not all(map(is_integer, chunks)): 1113 raise ValueError("chunks can only contain integers.") 1114 if not all(map(is_integer, shape)): 1115 raise ValueError("shape can only contain integers.") 1116 shape = tuple(map(int, shape)) 1117 chunks = tuple(map(int, chunks)) 1118 return tuple( 1119 ((bd,) * (d // bd) + ((d % bd,) if d % bd else ()) if d else (0,)) 1120 for d, bd in zip(shape, chunks) 1121 ) 1122 1123 1124def finalize(results): 1125 if not results: 1126 return concatenate3(results) 1127 results2 = results 1128 while isinstance(results2, (tuple, list)): 1129 if len(results2) > 1: 1130 return concatenate3(results) 1131 else: 1132 results2 = results2[0] 1133 return unpack_singleton(results) 1134 1135 1136CHUNKS_NONE_ERROR_MESSAGE = """ 1137You must specify a chunks= keyword argument. 1138This specifies the chunksize of your array blocks. 1139 1140See the following documentation page for details: 1141 https://docs.dask.org/en/latest/array-creation.html#chunks 1142""".strip() 1143 1144 1145class Array(DaskMethodsMixin): 1146 """Parallel Dask Array 1147 1148 A parallel nd-array comprised of many numpy arrays arranged in a grid. 1149 1150 This constructor is for advanced uses only. For normal use see the 1151 :func:`dask.array.from_array` function. 1152 1153 Parameters 1154 ---------- 1155 dask : dict 1156 Task dependency graph 1157 name : string 1158 Name of array in dask 1159 shape : tuple of ints 1160 Shape of the entire array 1161 chunks: iterable of tuples 1162 block sizes along each dimension 1163 dtype : str or dtype 1164 Typecode or data-type for the new Dask Array 1165 meta : empty ndarray 1166 empty ndarray created with same NumPy backend, ndim and dtype as the 1167 Dask Array being created (overrides dtype) 1168 1169 See Also 1170 -------- 1171 dask.array.from_array 1172 """ 1173 1174 __slots__ = "dask", "__name", "_cached_keys", "__chunks", "_meta", "__dict__" 1175 1176 def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None): 1177 self = super().__new__(cls) 1178 assert isinstance(dask, Mapping) 1179 if not isinstance(dask, HighLevelGraph): 1180 dask = HighLevelGraph.from_collections(name, dask, dependencies=()) 1181 self.dask = dask 1182 self._name = str(name) 1183 meta = meta_from_array(meta, dtype=dtype) 1184 1185 if ( 1186 isinstance(chunks, str) 1187 or isinstance(chunks, tuple) 1188 and chunks 1189 and any(isinstance(c, str) for c in chunks) 1190 ): 1191 dt = meta.dtype 1192 else: 1193 dt = None 1194 self._chunks = normalize_chunks(chunks, shape, dtype=dt) 1195 if self.chunks is None: 1196 raise ValueError(CHUNKS_NONE_ERROR_MESSAGE) 1197 self._meta = meta_from_array(meta, ndim=self.ndim, dtype=dtype) 1198 1199 for plugin in config.get("array_plugins", ()): 1200 result = plugin(self) 1201 if result is not None: 1202 self = result 1203 1204 try: 1205 layer = self.dask.layers[name] 1206 except (AttributeError, KeyError): 1207 # self is no longer an Array after applying the plugins, OR 1208 # a plugin replaced the HighLevelGraph with a plain dict, OR 1209 # name is not the top layer's name (this can happen after the layer is 1210 # manipulated, to avoid a collision) 1211 pass 1212 else: 1213 if layer.collection_annotations is None: 1214 layer.collection_annotations = { 1215 "shape": self.shape, 1216 "dtype": self.dtype, 1217 "chunksize": self.chunksize, 1218 "chunks": self.chunks, 1219 "type": typename(type(self)), 1220 "chunk_type": typename(type(self._meta)), 1221 } 1222 else: 1223 layer.collection_annotations.update( 1224 { 1225 "shape": self.shape, 1226 "dtype": self.dtype, 1227 "chunksize": self.chunksize, 1228 "chunks": self.chunks, 1229 "type": typename(type(self)), 1230 "chunk_type": typename(type(self._meta)), 1231 } 1232 ) 1233 1234 return self 1235 1236 def __reduce__(self): 1237 return (Array, (self.dask, self.name, self.chunks, self.dtype)) 1238 1239 def __dask_graph__(self): 1240 return self.dask 1241 1242 def __dask_layers__(self): 1243 return (self.name,) 1244 1245 def __dask_keys__(self): 1246 if self._cached_keys is not None: 1247 return self._cached_keys 1248 1249 name, chunks, numblocks = self.name, self.chunks, self.numblocks 1250 1251 def keys(*args): 1252 if not chunks: 1253 return [(name,)] 1254 ind = len(args) 1255 if ind + 1 == len(numblocks): 1256 result = [(name,) + args + (i,) for i in range(numblocks[ind])] 1257 else: 1258 result = [keys(*(args + (i,))) for i in range(numblocks[ind])] 1259 return result 1260 1261 self._cached_keys = result = keys() 1262 return result 1263 1264 def __dask_tokenize__(self): 1265 return self.name 1266 1267 __dask_optimize__ = globalmethod( 1268 optimize, key="array_optimize", falsey=dont_optimize 1269 ) 1270 __dask_scheduler__ = staticmethod(threaded.get) 1271 1272 def __dask_postcompute__(self): 1273 return finalize, () 1274 1275 def __dask_postpersist__(self): 1276 return self._rebuild, () 1277 1278 def _rebuild(self, dsk, *, rename=None): 1279 name = self._name 1280 if rename: 1281 name = rename.get(name, name) 1282 return Array(dsk, name, self.chunks, self.dtype, self._meta) 1283 1284 def _reset_cache(self, key=None): 1285 """ 1286 Reset cached properties. 1287 1288 Parameters 1289 ---------- 1290 key : str, optional 1291 Remove specified key. The default removes all items. 1292 """ 1293 if key is None: 1294 self.__dict__.clear() 1295 else: 1296 self.__dict__.pop(key, None) 1297 1298 @cached_property 1299 def _key_array(self): 1300 return np.array(self.__dask_keys__(), dtype=object) 1301 1302 @cached_property 1303 def numblocks(self): 1304 return tuple(map(len, self.chunks)) 1305 1306 @cached_property 1307 def npartitions(self): 1308 return reduce(mul, self.numblocks, 1) 1309 1310 def compute_chunk_sizes(self): 1311 """ 1312 Compute the chunk sizes for a Dask array. This is especially useful 1313 when the chunk sizes are unknown (e.g., when indexing one Dask array 1314 with another). 1315 1316 Notes 1317 ----- 1318 This function modifies the Dask array in-place. 1319 1320 Examples 1321 -------- 1322 >>> import dask.array as da 1323 >>> import numpy as np 1324 >>> x = da.from_array([-2, -1, 0, 1, 2], chunks=2) 1325 >>> x.chunks 1326 ((2, 2, 1),) 1327 >>> y = x[x <= 0] 1328 >>> y.chunks 1329 ((nan, nan, nan),) 1330 >>> y.compute_chunk_sizes() # in-place computation 1331 dask.array<getitem, shape=(3,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray> 1332 >>> y.chunks 1333 ((2, 1, 0),) 1334 1335 """ 1336 x = self 1337 chunk_shapes = x.map_blocks( 1338 _get_chunk_shape, 1339 dtype=int, 1340 chunks=tuple(len(c) * (1,) for c in x.chunks) + ((x.ndim,),), 1341 new_axis=x.ndim, 1342 ) 1343 1344 c = [] 1345 for i in range(x.ndim): 1346 s = x.ndim * [0] + [i] 1347 s[i] = slice(None) 1348 s = tuple(s) 1349 1350 c.append(tuple(chunk_shapes[s])) 1351 1352 # `map_blocks` assigns numpy dtypes 1353 # cast chunk dimensions back to python int before returning 1354 x._chunks = tuple( 1355 tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0] 1356 ) 1357 return x 1358 1359 @cached_property 1360 def shape(self): 1361 return tuple(cached_cumsum(c, initial_zero=True)[-1] for c in self.chunks) 1362 1363 @property 1364 def chunksize(self): 1365 return tuple(max(c) for c in self.chunks) 1366 1367 @property 1368 def dtype(self): 1369 if isinstance(self._meta, tuple): 1370 dtype = self._meta[0].dtype 1371 else: 1372 dtype = self._meta.dtype 1373 return dtype 1374 1375 @property 1376 def _chunks(self): 1377 """Non-public chunks property. Allows setting a chunk value.""" 1378 return self.__chunks 1379 1380 @_chunks.setter 1381 def _chunks(self, chunks): 1382 self.__chunks = chunks 1383 1384 # When the chunks changes the cached properties that was 1385 # dependent on it needs to be deleted: 1386 for key in ["numblocks", "npartitions", "shape", "ndim", "size", "_key_array"]: 1387 self._reset_cache(key) 1388 1389 @property 1390 def chunks(self): 1391 """Chunks property.""" 1392 return self.__chunks 1393 1394 @chunks.setter 1395 def chunks(self, chunks): 1396 raise TypeError( 1397 "Can not set chunks directly\n\n" 1398 "Please use the rechunk method instead:\n" 1399 f" x.rechunk({chunks})\n\n" 1400 "If trying to avoid unknown chunks, use\n" 1401 " x.compute_chunk_sizes()" 1402 ) 1403 1404 def __len__(self): 1405 if not self.chunks: 1406 raise TypeError("len() of unsized object") 1407 return sum(self.chunks[0]) 1408 1409 def __array_ufunc__(self, numpy_ufunc, method, *inputs, **kwargs): 1410 out = kwargs.get("out", ()) 1411 for x in inputs + out: 1412 if _should_delegate(x): 1413 return NotImplemented 1414 1415 if method == "__call__": 1416 if numpy_ufunc is np.matmul: 1417 from .routines import matmul 1418 1419 # special case until apply_gufunc handles optional dimensions 1420 return matmul(*inputs, **kwargs) 1421 if numpy_ufunc.signature is not None: 1422 from .gufunc import apply_gufunc 1423 1424 return apply_gufunc( 1425 numpy_ufunc, numpy_ufunc.signature, *inputs, **kwargs 1426 ) 1427 if numpy_ufunc.nout > 1: 1428 from . import ufunc 1429 1430 try: 1431 da_ufunc = getattr(ufunc, numpy_ufunc.__name__) 1432 except AttributeError: 1433 return NotImplemented 1434 return da_ufunc(*inputs, **kwargs) 1435 else: 1436 return elemwise(numpy_ufunc, *inputs, **kwargs) 1437 elif method == "outer": 1438 from . import ufunc 1439 1440 try: 1441 da_ufunc = getattr(ufunc, numpy_ufunc.__name__) 1442 except AttributeError: 1443 return NotImplemented 1444 return da_ufunc.outer(*inputs, **kwargs) 1445 else: 1446 return NotImplemented 1447 1448 def __repr__(self): 1449 """ 1450 1451 >>> import dask.array as da 1452 >>> da.ones((10, 10), chunks=(5, 5), dtype='i4') 1453 dask.array<..., shape=(10, 10), dtype=int32, chunksize=(5, 5), chunktype=numpy.ndarray> 1454 """ 1455 chunksize = str(self.chunksize) 1456 name = self.name.rsplit("-", 1)[0] 1457 return ( 1458 "dask.array<{}, shape={}, dtype={}, chunksize={}, chunktype={}.{}>".format( 1459 name, 1460 self.shape, 1461 self.dtype, 1462 chunksize, 1463 type(self._meta).__module__.split(".")[0], 1464 type(self._meta).__name__, 1465 ) 1466 ) 1467 1468 def _repr_html_(self): 1469 try: 1470 grid = self.to_svg(size=config.get("array.svg.size", 120)) 1471 except NotImplementedError: 1472 grid = "" 1473 1474 if "sparse" in typename(type(self._meta)): 1475 nbytes = None 1476 cbytes = None 1477 elif not math.isnan(self.nbytes): 1478 nbytes = format_bytes(self.nbytes) 1479 cbytes = format_bytes(np.prod(self.chunksize) * self.dtype.itemsize) 1480 else: 1481 nbytes = "unknown" 1482 cbytes = "unknown" 1483 1484 return get_template("array.html.j2").render( 1485 array=self, 1486 grid=grid, 1487 nbytes=nbytes, 1488 cbytes=cbytes, 1489 ) 1490 1491 @cached_property 1492 def ndim(self): 1493 return len(self.shape) 1494 1495 @cached_property 1496 def size(self): 1497 """Number of elements in array""" 1498 return reduce(mul, self.shape, 1) 1499 1500 @property 1501 def nbytes(self): 1502 """Number of bytes in array""" 1503 return self.size * self.dtype.itemsize 1504 1505 @property 1506 def itemsize(self): 1507 """Length of one array element in bytes""" 1508 return self.dtype.itemsize 1509 1510 @property 1511 def _name(self): 1512 return self.__name 1513 1514 @_name.setter 1515 def _name(self, val): 1516 self.__name = val 1517 # Clear the key cache when the name is reset 1518 self._cached_keys = None 1519 self._reset_cache("_key_array") 1520 1521 @property 1522 def name(self): 1523 return self.__name 1524 1525 @name.setter 1526 def name(self, val): 1527 raise TypeError( 1528 "Cannot set name directly\n\n" 1529 "Name is used to relate the array to the task graph.\n" 1530 "It is uncommon to need to change it, but if you do\n" 1531 "please set ``._name``" 1532 ) 1533 1534 def __iter__(self): 1535 for i in range(len(self)): 1536 yield self[i] 1537 1538 __array_priority__ = 11 # higher than numpy.ndarray and numpy.matrix 1539 1540 def __array__(self, dtype=None, **kwargs): 1541 x = self.compute() 1542 if dtype and x.dtype != dtype: 1543 x = x.astype(dtype) 1544 if not isinstance(x, np.ndarray): 1545 x = np.array(x) 1546 return x 1547 1548 def __array_function__(self, func, types, args, kwargs): 1549 import dask.array as module 1550 1551 def handle_nonmatching_names(func, args, kwargs): 1552 if func not in _HANDLED_FUNCTIONS: 1553 warnings.warn( 1554 "The `{}` function is not implemented by Dask array. " 1555 "You may want to use the da.map_blocks function " 1556 "or something similar to silence this warning. " 1557 "Your code may stop working in a future release.".format( 1558 func.__module__ + "." + func.__name__ 1559 ), 1560 FutureWarning, 1561 ) 1562 # Need to convert to array object (e.g. numpy.ndarray or 1563 # cupy.ndarray) as needed, so we can call the NumPy function 1564 # again and it gets the chance to dispatch to the right 1565 # implementation. 1566 args, kwargs = compute(args, kwargs) 1567 return func(*args, **kwargs) 1568 1569 return _HANDLED_FUNCTIONS[func](*args, **kwargs) 1570 1571 # First, verify that all types are handled by Dask. Otherwise, return NotImplemented. 1572 if not all(type is Array or is_valid_chunk_type(type) for type in types): 1573 return NotImplemented 1574 1575 # Now try to find a matching function name. If that doesn't work, we may 1576 # be dealing with an alias or a function that's simply not in the Dask API. 1577 # Handle aliases via the _HANDLED_FUNCTIONS dict mapping, and warn otherwise. 1578 for submodule in func.__module__.split(".")[1:]: 1579 try: 1580 module = getattr(module, submodule) 1581 except AttributeError: 1582 return handle_nonmatching_names(func, args, kwargs) 1583 1584 if not hasattr(module, func.__name__): 1585 return handle_nonmatching_names(func, args, kwargs) 1586 1587 da_func = getattr(module, func.__name__) 1588 if da_func is func: 1589 return handle_nonmatching_names(func, args, kwargs) 1590 1591 # If ``like`` is contained in ``da_func``'s signature, add ``like=self`` 1592 # to the kwargs dictionary. 1593 if has_keyword(da_func, "like"): 1594 kwargs["like"] = self 1595 1596 return da_func(*args, **kwargs) 1597 1598 @property 1599 def _elemwise(self): 1600 return elemwise 1601 1602 @wraps(store) 1603 def store(self, target, **kwargs): 1604 r = store([self], [target], **kwargs) 1605 1606 if kwargs.get("return_stored", False): 1607 r = r[0] 1608 1609 return r 1610 1611 def to_svg(self, size=500): 1612 """Convert chunks from Dask Array into an SVG Image 1613 1614 Parameters 1615 ---------- 1616 chunks: tuple 1617 size: int 1618 Rough size of the image 1619 1620 Examples 1621 -------- 1622 >>> x.to_svg(size=500) # doctest: +SKIP 1623 1624 Returns 1625 ------- 1626 text: An svg string depicting the array as a grid of chunks 1627 """ 1628 from .svg import svg 1629 1630 return svg(self.chunks, size=size) 1631 1632 def to_hdf5(self, filename, datapath, **kwargs): 1633 """Store array in HDF5 file 1634 1635 >>> x.to_hdf5('myfile.hdf5', '/x') # doctest: +SKIP 1636 1637 Optionally provide arguments as though to ``h5py.File.create_dataset`` 1638 1639 >>> x.to_hdf5('myfile.hdf5', '/x', compression='lzf', shuffle=True) # doctest: +SKIP 1640 1641 See Also 1642 -------- 1643 da.store 1644 h5py.File.create_dataset 1645 """ 1646 return to_hdf5(filename, datapath, self, **kwargs) 1647 1648 def to_dask_dataframe(self, columns=None, index=None, meta=None): 1649 """Convert dask Array to dask Dataframe 1650 1651 Parameters 1652 ---------- 1653 columns: list or string 1654 list of column names if DataFrame, single string if Series 1655 index : dask.dataframe.Index, optional 1656 An optional *dask* Index to use for the output Series or DataFrame. 1657 1658 The default output index depends on whether the array has any unknown 1659 chunks. If there are any unknown chunks, the output has ``None`` 1660 for all the divisions (one per chunk). If all the chunks are known, 1661 a default index with known divsions is created. 1662 1663 Specifying ``index`` can be useful if you're conforming a Dask Array 1664 to an existing dask Series or DataFrame, and you would like the 1665 indices to match. 1666 meta : object, optional 1667 An optional `meta` parameter can be passed for dask 1668 to specify the concrete dataframe type to use for partitions of 1669 the Dask dataframe. By default, pandas DataFrame is used. 1670 1671 See Also 1672 -------- 1673 dask.dataframe.from_dask_array 1674 """ 1675 from ..dataframe import from_dask_array 1676 1677 return from_dask_array(self, columns=columns, index=index, meta=meta) 1678 1679 def __bool__(self): 1680 if self.size > 1: 1681 raise ValueError( 1682 f"The truth value of a {self.__class__.__name__} is ambiguous. " 1683 "Use a.any() or a.all()." 1684 ) 1685 else: 1686 return bool(self.compute()) 1687 1688 __nonzero__ = __bool__ # python 2 1689 1690 def _scalarfunc(self, cast_type): 1691 if self.size > 1: 1692 raise TypeError("Only length-1 arrays can be converted to Python scalars") 1693 else: 1694 return cast_type(self.compute()) 1695 1696 def __int__(self): 1697 return self._scalarfunc(int) 1698 1699 __long__ = __int__ # python 2 1700 1701 def __float__(self): 1702 return self._scalarfunc(float) 1703 1704 def __complex__(self): 1705 return self._scalarfunc(complex) 1706 1707 def __index__(self): 1708 return self._scalarfunc(operator.index) 1709 1710 def __setitem__(self, key, value): 1711 if value is np.ma.masked: 1712 value = np.ma.masked_all(()) 1713 1714 ## Use the "where" method for cases when key is an Array 1715 if isinstance(key, Array): 1716 from .routines import where 1717 1718 if isinstance(value, Array) and value.ndim > 1: 1719 raise ValueError("boolean index array should have 1 dimension") 1720 try: 1721 y = where(key, value, self) 1722 except ValueError as e: 1723 raise ValueError( 1724 "Boolean index assignment in Dask " 1725 "expects equally shaped arrays.\nExample: da1[da2] = da3 " 1726 "where da1.shape == (4,), da2.shape == (4,) " 1727 "and da3.shape == (4,)." 1728 ) from e 1729 self._meta = y._meta 1730 self.dask = y.dask 1731 self._name = y.name 1732 self._chunks = y.chunks 1733 return 1734 1735 if np.isnan(self.shape).any(): 1736 raise ValueError(f"Arrays chunk sizes are unknown. {unknown_chunk_message}") 1737 1738 # Still here? Then apply the assignment to other type of 1739 # indices via the `setitem_array` function. 1740 value = asanyarray(value) 1741 1742 out = "setitem-" + tokenize(self, key, value) 1743 dsk = setitem_array(out, self, key, value) 1744 1745 meta = meta_from_array(self._meta) 1746 if np.isscalar(meta): 1747 meta = np.array(meta) 1748 1749 graph = HighLevelGraph.from_collections(out, dsk, dependencies=[self]) 1750 y = Array(graph, out, chunks=self.chunks, dtype=self.dtype, meta=meta) 1751 1752 self._meta = y._meta 1753 self.dask = y.dask 1754 self._name = y.name 1755 self._chunks = y.chunks 1756 1757 def __getitem__(self, index): 1758 # Field access, e.g. x['a'] or x[['a', 'b']] 1759 if isinstance(index, str) or ( 1760 isinstance(index, list) and index and all(isinstance(i, str) for i in index) 1761 ): 1762 if isinstance(index, str): 1763 dt = self.dtype[index] 1764 else: 1765 dt = np.dtype( 1766 { 1767 "names": index, 1768 "formats": [self.dtype.fields[name][0] for name in index], 1769 "offsets": [self.dtype.fields[name][1] for name in index], 1770 "itemsize": self.dtype.itemsize, 1771 } 1772 ) 1773 1774 if dt.shape: 1775 new_axis = list(range(self.ndim, self.ndim + len(dt.shape))) 1776 chunks = self.chunks + tuple((i,) for i in dt.shape) 1777 return self.map_blocks( 1778 getitem, index, dtype=dt.base, chunks=chunks, new_axis=new_axis 1779 ) 1780 else: 1781 return self.map_blocks(getitem, index, dtype=dt) 1782 1783 if not isinstance(index, tuple): 1784 index = (index,) 1785 1786 from .slicing import ( 1787 normalize_index, 1788 slice_with_bool_dask_array, 1789 slice_with_int_dask_array, 1790 ) 1791 1792 index2 = normalize_index(index, self.shape) 1793 dependencies = {self.name} 1794 for i in index2: 1795 if isinstance(i, Array): 1796 dependencies.add(i.name) 1797 1798 if any(isinstance(i, Array) and i.dtype.kind in "iu" for i in index2): 1799 self, index2 = slice_with_int_dask_array(self, index2) 1800 if any(isinstance(i, Array) and i.dtype == bool for i in index2): 1801 self, index2 = slice_with_bool_dask_array(self, index2) 1802 1803 if all(isinstance(i, slice) and i == slice(None) for i in index2): 1804 return self 1805 1806 out = "getitem-" + tokenize(self, index2) 1807 dsk, chunks = slice_array(out, self.name, self.chunks, index2, self.itemsize) 1808 1809 graph = HighLevelGraph.from_collections(out, dsk, dependencies=[self]) 1810 1811 meta = meta_from_array(self._meta, ndim=len(chunks)) 1812 if np.isscalar(meta): 1813 meta = np.array(meta) 1814 1815 return Array(graph, out, chunks, meta=meta) 1816 1817 def _vindex(self, key): 1818 if not isinstance(key, tuple): 1819 key = (key,) 1820 if any(k is None for k in key): 1821 raise IndexError( 1822 "vindex does not support indexing with None (np.newaxis), " 1823 "got {}".format(key) 1824 ) 1825 if all(isinstance(k, slice) for k in key): 1826 if all( 1827 k.indices(d) == slice(0, d).indices(d) for k, d in zip(key, self.shape) 1828 ): 1829 return self 1830 raise IndexError( 1831 "vindex requires at least one non-slice to vectorize over " 1832 "when the slices are not over the entire array (i.e, x[:]). " 1833 "Use normal slicing instead when only using slices. Got: {}".format(key) 1834 ) 1835 return _vindex(self, *key) 1836 1837 @property 1838 def vindex(self): 1839 """Vectorized indexing with broadcasting. 1840 1841 This is equivalent to numpy's advanced indexing, using arrays that are 1842 broadcast against each other. This allows for pointwise indexing: 1843 1844 >>> import dask.array as da 1845 >>> x = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) 1846 >>> x = da.from_array(x, chunks=2) 1847 >>> x.vindex[[0, 1, 2], [0, 1, 2]].compute() 1848 array([1, 5, 9]) 1849 1850 Mixed basic/advanced indexing with slices/arrays is also supported. The 1851 order of dimensions in the result follows those proposed for 1852 `ndarray.vindex <https://github.com/numpy/numpy/pull/6256>`_: 1853 the subspace spanned by arrays is followed by all slices. 1854 1855 Note: ``vindex`` provides more general functionality than standard 1856 indexing, but it also has fewer optimizations and can be significantly 1857 slower. 1858 """ 1859 return IndexCallable(self._vindex) 1860 1861 @property 1862 def blocks(self): 1863 """An array-like interface to the blocks of an array. 1864 1865 This returns a ``Blockview`` object that provides an array-like interface 1866 to the blocks of a dask array. Numpy-style indexing of a ``Blockview`` object 1867 returns a selection of blocks as a new dask array. 1868 1869 You can index ``array.blocks`` like a numpy array of shape 1870 equal to the number of blocks in each dimension, (available as 1871 array.blocks.size). The dimensionality of the output array matches 1872 the dimension of this array, even if integer indices are passed. 1873 Slicing with ``np.newaxis`` or multiple lists is not supported. 1874 1875 Examples 1876 -------- 1877 >>> import dask.array as da 1878 >>> x = da.arange(8, chunks=2) 1879 >>> x.blocks.shape # aliases x.numblocks 1880 (4,) 1881 >>> x.blocks[0].compute() 1882 array([0, 1]) 1883 >>> x.blocks[:3].compute() 1884 array([0, 1, 2, 3, 4, 5]) 1885 >>> x.blocks[::2].compute() 1886 array([0, 1, 4, 5]) 1887 >>> x.blocks[[-1, 0]].compute() 1888 array([6, 7, 0, 1]) 1889 >>> x.blocks.ravel() # doctest: +NORMALIZE_WHITESPACE 1890 [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1891 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1892 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1893 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>] 1894 1895 Returns 1896 ------- 1897 An instance of ``dask.array.Blockview`` 1898 """ 1899 return BlockView(self) 1900 1901 @property 1902 def partitions(self): 1903 """Slice an array by partitions. Alias of dask array .blocks attribute. 1904 1905 This alias allows you to write agnostic code that works with both 1906 dask arrays and dask dataframes. 1907 1908 This returns a ``Blockview`` object that provides an array-like interface 1909 to the blocks of a dask array. Numpy-style indexing of a ``Blockview`` object 1910 returns a selection of blocks as a new dask array. 1911 1912 You can index ``array.blocks`` like a numpy array of shape 1913 equal to the number of blocks in each dimension, (available as 1914 array.blocks.size). The dimensionality of the output array matches 1915 the dimension of this array, even if integer indices are passed. 1916 Slicing with ``np.newaxis`` or multiple lists is not supported. 1917 1918 Examples 1919 -------- 1920 >>> import dask.array as da 1921 >>> x = da.arange(8, chunks=2) 1922 >>> x.partitions.shape # aliases x.numblocks 1923 (4,) 1924 >>> x.partitions[0].compute() 1925 array([0, 1]) 1926 >>> x.partitions[:3].compute() 1927 array([0, 1, 2, 3, 4, 5]) 1928 >>> x.partitions[::2].compute() 1929 array([0, 1, 4, 5]) 1930 >>> x.partitions[[-1, 0]].compute() 1931 array([6, 7, 0, 1]) 1932 >>> x.partitions.ravel() # doctest: +NORMALIZE_WHITESPACE 1933 [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1934 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1935 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 1936 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>] 1937 1938 Returns 1939 ------- 1940 An instance of ``da.array.Blockview`` 1941 """ 1942 return self.blocks 1943 1944 @derived_from(np.ndarray) 1945 def dot(self, other): 1946 from .routines import tensordot 1947 1948 return tensordot(self, other, axes=((self.ndim - 1,), (other.ndim - 2,))) 1949 1950 @property 1951 def A(self): 1952 return self 1953 1954 @property 1955 def T(self): 1956 return self.transpose() 1957 1958 @derived_from(np.ndarray) 1959 def transpose(self, *axes): 1960 from .routines import transpose 1961 1962 if not axes: 1963 axes = None 1964 elif len(axes) == 1 and isinstance(axes[0], Iterable): 1965 axes = axes[0] 1966 if (axes == tuple(range(self.ndim))) or (axes == tuple(range(-self.ndim, 0))): 1967 # no transpose necessary 1968 return self 1969 else: 1970 return transpose(self, axes=axes) 1971 1972 @derived_from(np.ndarray) 1973 def ravel(self): 1974 from .routines import ravel 1975 1976 return ravel(self) 1977 1978 flatten = ravel 1979 1980 @derived_from(np.ndarray) 1981 def choose(self, choices): 1982 from .routines import choose 1983 1984 return choose(self, choices) 1985 1986 @derived_from(np.ndarray) 1987 def reshape(self, *shape, merge_chunks=True): 1988 """ 1989 .. note:: 1990 1991 See :meth:`dask.array.reshape` for an explanation of 1992 the ``merge_chunks`` keyword. 1993 """ 1994 from .reshape import reshape 1995 1996 if len(shape) == 1 and not isinstance(shape[0], Number): 1997 shape = shape[0] 1998 return reshape(self, shape, merge_chunks=merge_chunks) 1999 2000 def topk(self, k, axis=-1, split_every=None): 2001 """The top k elements of an array. 2002 2003 See :func:`dask.array.topk` for docstring. 2004 2005 """ 2006 from .reductions import topk 2007 2008 return topk(self, k, axis=axis, split_every=split_every) 2009 2010 def argtopk(self, k, axis=-1, split_every=None): 2011 """The indices of the top k elements of an array. 2012 2013 See :func:`dask.array.argtopk` for docstring. 2014 2015 """ 2016 from .reductions import argtopk 2017 2018 return argtopk(self, k, axis=axis, split_every=split_every) 2019 2020 def astype(self, dtype, **kwargs): 2021 """Copy of the array, cast to a specified type. 2022 2023 Parameters 2024 ---------- 2025 dtype : str or dtype 2026 Typecode or data-type to which the array is cast. 2027 casting : {'no', 'equiv', 'safe', 'same_kind', 'unsafe'}, optional 2028 Controls what kind of data casting may occur. Defaults to 'unsafe' 2029 for backwards compatibility. 2030 2031 * 'no' means the data types should not be cast at all. 2032 * 'equiv' means only byte-order changes are allowed. 2033 * 'safe' means only casts which can preserve values are allowed. 2034 * 'same_kind' means only safe casts or casts within a kind, 2035 like float64 to float32, are allowed. 2036 * 'unsafe' means any data conversions may be done. 2037 copy : bool, optional 2038 By default, astype always returns a newly allocated array. If this 2039 is set to False and the `dtype` requirement is satisfied, the input 2040 array is returned instead of a copy. 2041 """ 2042 # Scalars don't take `casting` or `copy` kwargs - as such we only pass 2043 # them to `map_blocks` if specified by user (different than defaults). 2044 extra = set(kwargs) - {"casting", "copy"} 2045 if extra: 2046 raise TypeError( 2047 f"astype does not take the following keyword arguments: {list(extra)}" 2048 ) 2049 casting = kwargs.get("casting", "unsafe") 2050 dtype = np.dtype(dtype) 2051 if self.dtype == dtype: 2052 return self 2053 elif not np.can_cast(self.dtype, dtype, casting=casting): 2054 raise TypeError( 2055 f"Cannot cast array from {self.dtype!r} to {dtype!r} " 2056 f"according to the rule {casting!r}" 2057 ) 2058 return self.map_blocks(chunk.astype, dtype=dtype, astype_dtype=dtype, **kwargs) 2059 2060 def __abs__(self): 2061 return elemwise(operator.abs, self) 2062 2063 @check_if_handled_given_other 2064 def __add__(self, other): 2065 return elemwise(operator.add, self, other) 2066 2067 @check_if_handled_given_other 2068 def __radd__(self, other): 2069 return elemwise(operator.add, other, self) 2070 2071 @check_if_handled_given_other 2072 def __and__(self, other): 2073 return elemwise(operator.and_, self, other) 2074 2075 @check_if_handled_given_other 2076 def __rand__(self, other): 2077 return elemwise(operator.and_, other, self) 2078 2079 @check_if_handled_given_other 2080 def __div__(self, other): 2081 return elemwise(operator.div, self, other) 2082 2083 @check_if_handled_given_other 2084 def __rdiv__(self, other): 2085 return elemwise(operator.div, other, self) 2086 2087 @check_if_handled_given_other 2088 def __eq__(self, other): 2089 return elemwise(operator.eq, self, other) 2090 2091 @check_if_handled_given_other 2092 def __gt__(self, other): 2093 return elemwise(operator.gt, self, other) 2094 2095 @check_if_handled_given_other 2096 def __ge__(self, other): 2097 return elemwise(operator.ge, self, other) 2098 2099 def __invert__(self): 2100 return elemwise(operator.invert, self) 2101 2102 @check_if_handled_given_other 2103 def __lshift__(self, other): 2104 return elemwise(operator.lshift, self, other) 2105 2106 @check_if_handled_given_other 2107 def __rlshift__(self, other): 2108 return elemwise(operator.lshift, other, self) 2109 2110 @check_if_handled_given_other 2111 def __lt__(self, other): 2112 return elemwise(operator.lt, self, other) 2113 2114 @check_if_handled_given_other 2115 def __le__(self, other): 2116 return elemwise(operator.le, self, other) 2117 2118 @check_if_handled_given_other 2119 def __mod__(self, other): 2120 return elemwise(operator.mod, self, other) 2121 2122 @check_if_handled_given_other 2123 def __rmod__(self, other): 2124 return elemwise(operator.mod, other, self) 2125 2126 @check_if_handled_given_other 2127 def __mul__(self, other): 2128 return elemwise(operator.mul, self, other) 2129 2130 @check_if_handled_given_other 2131 def __rmul__(self, other): 2132 return elemwise(operator.mul, other, self) 2133 2134 @check_if_handled_given_other 2135 def __ne__(self, other): 2136 return elemwise(operator.ne, self, other) 2137 2138 def __neg__(self): 2139 return elemwise(operator.neg, self) 2140 2141 @check_if_handled_given_other 2142 def __or__(self, other): 2143 return elemwise(operator.or_, self, other) 2144 2145 def __pos__(self): 2146 return self 2147 2148 @check_if_handled_given_other 2149 def __ror__(self, other): 2150 return elemwise(operator.or_, other, self) 2151 2152 @check_if_handled_given_other 2153 def __pow__(self, other): 2154 return elemwise(operator.pow, self, other) 2155 2156 @check_if_handled_given_other 2157 def __rpow__(self, other): 2158 return elemwise(operator.pow, other, self) 2159 2160 @check_if_handled_given_other 2161 def __rshift__(self, other): 2162 return elemwise(operator.rshift, self, other) 2163 2164 @check_if_handled_given_other 2165 def __rrshift__(self, other): 2166 return elemwise(operator.rshift, other, self) 2167 2168 @check_if_handled_given_other 2169 def __sub__(self, other): 2170 return elemwise(operator.sub, self, other) 2171 2172 @check_if_handled_given_other 2173 def __rsub__(self, other): 2174 return elemwise(operator.sub, other, self) 2175 2176 @check_if_handled_given_other 2177 def __truediv__(self, other): 2178 return elemwise(operator.truediv, self, other) 2179 2180 @check_if_handled_given_other 2181 def __rtruediv__(self, other): 2182 return elemwise(operator.truediv, other, self) 2183 2184 @check_if_handled_given_other 2185 def __floordiv__(self, other): 2186 return elemwise(operator.floordiv, self, other) 2187 2188 @check_if_handled_given_other 2189 def __rfloordiv__(self, other): 2190 return elemwise(operator.floordiv, other, self) 2191 2192 @check_if_handled_given_other 2193 def __xor__(self, other): 2194 return elemwise(operator.xor, self, other) 2195 2196 @check_if_handled_given_other 2197 def __rxor__(self, other): 2198 return elemwise(operator.xor, other, self) 2199 2200 @check_if_handled_given_other 2201 def __matmul__(self, other): 2202 from .routines import matmul 2203 2204 return matmul(self, other) 2205 2206 @check_if_handled_given_other 2207 def __rmatmul__(self, other): 2208 from .routines import matmul 2209 2210 return matmul(other, self) 2211 2212 @check_if_handled_given_other 2213 def __divmod__(self, other): 2214 from .ufunc import divmod 2215 2216 return divmod(self, other) 2217 2218 @check_if_handled_given_other 2219 def __rdivmod__(self, other): 2220 from .ufunc import divmod 2221 2222 return divmod(other, self) 2223 2224 @derived_from(np.ndarray) 2225 def any(self, axis=None, keepdims=False, split_every=None, out=None): 2226 from .reductions import any 2227 2228 return any(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out) 2229 2230 @derived_from(np.ndarray) 2231 def all(self, axis=None, keepdims=False, split_every=None, out=None): 2232 from .reductions import all 2233 2234 return all(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out) 2235 2236 @derived_from(np.ndarray) 2237 def min(self, axis=None, keepdims=False, split_every=None, out=None): 2238 from .reductions import min 2239 2240 return min(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out) 2241 2242 @derived_from(np.ndarray) 2243 def max(self, axis=None, keepdims=False, split_every=None, out=None): 2244 from .reductions import max 2245 2246 return max(self, axis=axis, keepdims=keepdims, split_every=split_every, out=out) 2247 2248 @derived_from(np.ndarray) 2249 def argmin(self, axis=None, split_every=None, out=None): 2250 from .reductions import argmin 2251 2252 return argmin(self, axis=axis, split_every=split_every, out=out) 2253 2254 @derived_from(np.ndarray) 2255 def argmax(self, axis=None, split_every=None, out=None): 2256 from .reductions import argmax 2257 2258 return argmax(self, axis=axis, split_every=split_every, out=out) 2259 2260 @derived_from(np.ndarray) 2261 def sum(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None): 2262 from .reductions import sum 2263 2264 return sum( 2265 self, 2266 axis=axis, 2267 dtype=dtype, 2268 keepdims=keepdims, 2269 split_every=split_every, 2270 out=out, 2271 ) 2272 2273 @derived_from(np.ndarray) 2274 def trace(self, offset=0, axis1=0, axis2=1, dtype=None): 2275 from .reductions import trace 2276 2277 return trace(self, offset=offset, axis1=axis1, axis2=axis2, dtype=dtype) 2278 2279 @derived_from(np.ndarray) 2280 def prod(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None): 2281 from .reductions import prod 2282 2283 return prod( 2284 self, 2285 axis=axis, 2286 dtype=dtype, 2287 keepdims=keepdims, 2288 split_every=split_every, 2289 out=out, 2290 ) 2291 2292 @derived_from(np.ndarray) 2293 def mean(self, axis=None, dtype=None, keepdims=False, split_every=None, out=None): 2294 from .reductions import mean 2295 2296 return mean( 2297 self, 2298 axis=axis, 2299 dtype=dtype, 2300 keepdims=keepdims, 2301 split_every=split_every, 2302 out=out, 2303 ) 2304 2305 @derived_from(np.ndarray) 2306 def std( 2307 self, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None, out=None 2308 ): 2309 from .reductions import std 2310 2311 return std( 2312 self, 2313 axis=axis, 2314 dtype=dtype, 2315 keepdims=keepdims, 2316 ddof=ddof, 2317 split_every=split_every, 2318 out=out, 2319 ) 2320 2321 @derived_from(np.ndarray) 2322 def var( 2323 self, axis=None, dtype=None, keepdims=False, ddof=0, split_every=None, out=None 2324 ): 2325 from .reductions import var 2326 2327 return var( 2328 self, 2329 axis=axis, 2330 dtype=dtype, 2331 keepdims=keepdims, 2332 ddof=ddof, 2333 split_every=split_every, 2334 out=out, 2335 ) 2336 2337 def moment( 2338 self, 2339 order, 2340 axis=None, 2341 dtype=None, 2342 keepdims=False, 2343 ddof=0, 2344 split_every=None, 2345 out=None, 2346 ): 2347 """Calculate the nth centralized moment. 2348 2349 Parameters 2350 ---------- 2351 order : int 2352 Order of the moment that is returned, must be >= 2. 2353 axis : int, optional 2354 Axis along which the central moment is computed. The default is to 2355 compute the moment of the flattened array. 2356 dtype : data-type, optional 2357 Type to use in computing the moment. For arrays of integer type the 2358 default is float64; for arrays of float types it is the same as the 2359 array type. 2360 keepdims : bool, optional 2361 If this is set to True, the axes which are reduced are left in the 2362 result as dimensions with size one. With this option, the result 2363 will broadcast correctly against the original array. 2364 ddof : int, optional 2365 "Delta Degrees of Freedom": the divisor used in the calculation is 2366 N - ddof, where N represents the number of elements. By default 2367 ddof is zero. 2368 2369 Returns 2370 ------- 2371 moment : ndarray 2372 2373 References 2374 ---------- 2375 .. [1] Pebay, Philippe (2008), "Formulas for Robust, One-Pass Parallel 2376 Computation of Covariances and Arbitrary-Order Statistical Moments", 2377 Technical Report SAND2008-6212, Sandia National Laboratories. 2378 2379 """ 2380 2381 from .reductions import moment 2382 2383 return moment( 2384 self, 2385 order, 2386 axis=axis, 2387 dtype=dtype, 2388 keepdims=keepdims, 2389 ddof=ddof, 2390 split_every=split_every, 2391 out=out, 2392 ) 2393 2394 @wraps(map_blocks) 2395 def map_blocks(self, func, *args, **kwargs): 2396 return map_blocks(func, self, *args, **kwargs) 2397 2398 def map_overlap(self, func, depth, boundary=None, trim=True, **kwargs): 2399 """Map a function over blocks of the array with some overlap 2400 2401 We share neighboring zones between blocks of the array, then map a 2402 function, then trim away the neighboring strips. 2403 2404 Note that this function will attempt to automatically determine the output 2405 array type before computing it, please refer to the ``meta`` keyword argument 2406 in :func:`map_blocks <dask.array.core.map_blocks>` if you expect that the function will not succeed when 2407 operating on 0-d arrays. 2408 2409 Parameters 2410 ---------- 2411 func: function 2412 The function to apply to each extended block 2413 depth: int, tuple, or dict 2414 The number of elements that each block should share with its neighbors 2415 If a tuple or dict then this can be different per axis 2416 boundary: str, tuple, dict 2417 How to handle the boundaries. 2418 Values include 'reflect', 'periodic', 'nearest', 'none', 2419 or any constant value like 0 or np.nan 2420 trim: bool 2421 Whether or not to trim ``depth`` elements from each block after 2422 calling the map function. 2423 Set this to False if your mapping function already does this for you 2424 **kwargs: 2425 Other keyword arguments valid in :func:`map_blocks <dask.array.core.map_blocks>`. 2426 2427 Examples 2428 -------- 2429 >>> import dask.array as da 2430 >>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1]) 2431 >>> x = da.from_array(x, chunks=5) 2432 >>> def derivative(x): 2433 ... return x - np.roll(x, 1) 2434 2435 >>> y = x.map_overlap(derivative, depth=1, boundary=0) 2436 >>> y.compute() 2437 array([ 1, 0, 1, 1, 0, 0, -1, -1, 0]) 2438 2439 >>> import dask.array as da 2440 >>> x = np.arange(16).reshape((4, 4)) 2441 >>> d = da.from_array(x, chunks=(2, 2)) 2442 >>> d.map_overlap(lambda x: x + x.size, depth=1).compute() 2443 array([[16, 17, 18, 19], 2444 [20, 21, 22, 23], 2445 [24, 25, 26, 27], 2446 [28, 29, 30, 31]]) 2447 2448 >>> func = lambda x: x + x.size 2449 >>> depth = {0: 1, 1: 1} 2450 >>> boundary = {0: 'reflect', 1: 'none'} 2451 >>> d.map_overlap(func, depth, boundary).compute() # doctest: +NORMALIZE_WHITESPACE 2452 array([[12, 13, 14, 15], 2453 [16, 17, 18, 19], 2454 [20, 21, 22, 23], 2455 [24, 25, 26, 27]]) 2456 2457 >>> x = np.arange(16).reshape((4, 4)) 2458 >>> d = da.from_array(x, chunks=(2, 2)) 2459 >>> y = d.map_overlap(lambda x: x + x[2], depth=1, meta=np.array(())) 2460 >>> y 2461 dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray> 2462 >>> y.compute() 2463 array([[ 4, 6, 8, 10], 2464 [ 8, 10, 12, 14], 2465 [20, 22, 24, 26], 2466 [24, 26, 28, 30]]) 2467 2468 >>> import cupy # doctest: +SKIP 2469 >>> x = cupy.arange(16).reshape((5, 4)) # doctest: +SKIP 2470 >>> d = da.from_array(x, chunks=(2, 2)) # doctest: +SKIP 2471 >>> y = d.map_overlap(lambda x: x + x[2], depth=1, meta=cupy.array(())) # doctest: +SKIP 2472 >>> y # doctest: +SKIP 2473 dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=cupy.ndarray> 2474 >>> y.compute() # doctest: +SKIP 2475 array([[ 4, 6, 8, 10], 2476 [ 8, 10, 12, 14], 2477 [20, 22, 24, 26], 2478 [24, 26, 28, 30]]) 2479 """ 2480 from .overlap import map_overlap 2481 2482 return map_overlap( 2483 func, self, depth=depth, boundary=boundary, trim=trim, **kwargs 2484 ) 2485 2486 @derived_from(np.ndarray) 2487 def cumsum(self, axis, dtype=None, out=None, *, method="sequential"): 2488 """Dask added an additional keyword-only argument ``method``. 2489 2490 method : {'sequential', 'blelloch'}, optional 2491 Choose which method to use to perform the cumsum. Default is 'sequential'. 2492 2493 * 'sequential' performs the cumsum of each prior block before the current block. 2494 * 'blelloch' is a work-efficient parallel cumsum. It exposes parallelism by 2495 first taking the sum of each block and combines the sums via a binary tree. 2496 This method may be faster or more memory efficient depending on workload, 2497 scheduler, and hardware. More benchmarking is necessary. 2498 """ 2499 from .reductions import cumsum 2500 2501 return cumsum(self, axis, dtype, out=out, method=method) 2502 2503 @derived_from(np.ndarray) 2504 def cumprod(self, axis, dtype=None, out=None, *, method="sequential"): 2505 """Dask added an additional keyword-only argument ``method``. 2506 2507 method : {'sequential', 'blelloch'}, optional 2508 Choose which method to use to perform the cumprod. Default is 'sequential'. 2509 2510 * 'sequential' performs the cumprod of each prior block before the current block. 2511 * 'blelloch' is a work-efficient parallel cumprod. It exposes parallelism by first 2512 taking the product of each block and combines the products via a binary tree. 2513 This method may be faster or more memory efficient depending on workload, 2514 scheduler, and hardware. More benchmarking is necessary. 2515 """ 2516 from .reductions import cumprod 2517 2518 return cumprod(self, axis, dtype, out=out, method=method) 2519 2520 @derived_from(np.ndarray) 2521 def squeeze(self, axis=None): 2522 from .routines import squeeze 2523 2524 return squeeze(self, axis) 2525 2526 def rechunk( 2527 self, chunks="auto", threshold=None, block_size_limit=None, balance=False 2528 ): 2529 """See da.rechunk for docstring""" 2530 from .rechunk import rechunk # avoid circular import 2531 2532 return rechunk(self, chunks, threshold, block_size_limit, balance) 2533 2534 @property 2535 def real(self): 2536 from .ufunc import real 2537 2538 return real(self) 2539 2540 @property 2541 def imag(self): 2542 from .ufunc import imag 2543 2544 return imag(self) 2545 2546 def conj(self): 2547 from .ufunc import conj 2548 2549 return conj(self) 2550 2551 @derived_from(np.ndarray) 2552 def clip(self, min=None, max=None): 2553 from .ufunc import clip 2554 2555 return clip(self, min, max) 2556 2557 def view(self, dtype=None, order="C"): 2558 """Get a view of the array as a new data type 2559 2560 Parameters 2561 ---------- 2562 dtype: 2563 The dtype by which to view the array. 2564 The default, None, results in the view having the same data-type 2565 as the original array. 2566 order: string 2567 'C' or 'F' (Fortran) ordering 2568 2569 This reinterprets the bytes of the array under a new dtype. If that 2570 dtype does not have the same size as the original array then the shape 2571 will change. 2572 2573 Beware that both numpy and dask.array can behave oddly when taking 2574 shape-changing views of arrays under Fortran ordering. Under some 2575 versions of NumPy this function will fail when taking shape-changing 2576 views of Fortran ordered arrays if the first dimension has chunks of 2577 size one. 2578 """ 2579 if dtype is None: 2580 dtype = self.dtype 2581 else: 2582 dtype = np.dtype(dtype) 2583 mult = self.dtype.itemsize / dtype.itemsize 2584 2585 if order == "C": 2586 chunks = self.chunks[:-1] + ( 2587 tuple(ensure_int(c * mult) for c in self.chunks[-1]), 2588 ) 2589 elif order == "F": 2590 chunks = ( 2591 tuple(ensure_int(c * mult) for c in self.chunks[0]), 2592 ) + self.chunks[1:] 2593 else: 2594 raise ValueError("Order must be one of 'C' or 'F'") 2595 2596 return self.map_blocks( 2597 chunk.view, dtype, order=order, dtype=dtype, chunks=chunks 2598 ) 2599 2600 @derived_from(np.ndarray) 2601 def swapaxes(self, axis1, axis2): 2602 from .routines import swapaxes 2603 2604 return swapaxes(self, axis1, axis2) 2605 2606 @derived_from(np.ndarray) 2607 def round(self, decimals=0): 2608 from .routines import round 2609 2610 return round(self, decimals=decimals) 2611 2612 def copy(self): 2613 """ 2614 Copy array. This is a no-op for dask.arrays, which are immutable 2615 """ 2616 if self.npartitions == 1: 2617 return self.map_blocks(M.copy) 2618 else: 2619 return Array(self.dask, self.name, self.chunks, meta=self) 2620 2621 def __deepcopy__(self, memo): 2622 c = self.copy() 2623 memo[id(self)] = c 2624 return c 2625 2626 def to_delayed(self, optimize_graph=True): 2627 """Convert into an array of :class:`dask.delayed.Delayed` objects, one per chunk. 2628 2629 Parameters 2630 ---------- 2631 optimize_graph : bool, optional 2632 If True [default], the graph is optimized before converting into 2633 :class:`dask.delayed.Delayed` objects. 2634 2635 See Also 2636 -------- 2637 dask.array.from_delayed 2638 """ 2639 keys = self.__dask_keys__() 2640 graph = self.__dask_graph__() 2641 if optimize_graph: 2642 graph = self.__dask_optimize__(graph, keys) # TODO, don't collape graph 2643 name = "delayed-" + self.name 2644 graph = HighLevelGraph.from_collections(name, graph, dependencies=()) 2645 L = ndeepmap(self.ndim, lambda k: Delayed(k, graph), keys) 2646 return np.array(L, dtype=object) 2647 2648 @derived_from(np.ndarray) 2649 def repeat(self, repeats, axis=None): 2650 from .creation import repeat 2651 2652 return repeat(self, repeats, axis=axis) 2653 2654 @derived_from(np.ndarray) 2655 def nonzero(self): 2656 from .routines import nonzero 2657 2658 return nonzero(self) 2659 2660 def to_zarr(self, *args, **kwargs): 2661 """Save array to the zarr storage format 2662 2663 See https://zarr.readthedocs.io for details about the format. 2664 2665 See function :func:`dask.array.to_zarr` for parameters. 2666 """ 2667 return to_zarr(self, *args, **kwargs) 2668 2669 def to_tiledb(self, uri, *args, **kwargs): 2670 """Save array to the TileDB storage manager 2671 2672 See function :func:`dask.array.to_tiledb` for argument documentation. 2673 2674 See https://docs.tiledb.io for details about the format and engine. 2675 """ 2676 from .tiledb_io import to_tiledb 2677 2678 return to_tiledb(self, uri, *args, **kwargs) 2679 2680 2681def ensure_int(f): 2682 i = int(f) 2683 if i != f: 2684 raise ValueError("Could not coerce %f to integer" % f) 2685 return i 2686 2687 2688def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None): 2689 """Normalize chunks to tuple of tuples 2690 2691 This takes in a variety of input types and information and produces a full 2692 tuple-of-tuples result for chunks, suitable to be passed to Array or 2693 rechunk or any other operation that creates a Dask array. 2694 2695 Parameters 2696 ---------- 2697 chunks: tuple, int, dict, or string 2698 The chunks to be normalized. See examples below for more details 2699 shape: Tuple[int] 2700 The shape of the array 2701 limit: int (optional) 2702 The maximum block size to target in bytes, 2703 if freedom is given to choose 2704 dtype: np.dtype 2705 previous_chunks: Tuple[Tuple[int]] optional 2706 Chunks from a previous array that we should use for inspiration when 2707 rechunking auto dimensions. If not provided but auto-chunking exists 2708 then auto-dimensions will prefer square-like chunk shapes. 2709 2710 Examples 2711 -------- 2712 Specify uniform chunk sizes 2713 2714 >>> from dask.array.core import normalize_chunks 2715 >>> normalize_chunks((2, 2), shape=(5, 6)) 2716 ((2, 2, 1), (2, 2, 2)) 2717 2718 Also passes through fully explicit tuple-of-tuples 2719 2720 >>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6)) 2721 ((2, 2, 1), (2, 2, 2)) 2722 2723 Cleans up lists to tuples 2724 2725 >>> normalize_chunks([[2, 2], [3, 3]]) 2726 ((2, 2), (3, 3)) 2727 2728 Expands integer inputs 10 -> (10, 10) 2729 2730 >>> normalize_chunks(10, shape=(30, 5)) 2731 ((10, 10, 10), (5,)) 2732 2733 Expands dict inputs 2734 2735 >>> normalize_chunks({0: 2, 1: 3}, shape=(6, 6)) 2736 ((2, 2, 2), (3, 3)) 2737 2738 The values -1 and None get mapped to full size 2739 2740 >>> normalize_chunks((5, -1), shape=(10, 10)) 2741 ((5, 5), (10,)) 2742 2743 Use the value "auto" to automatically determine chunk sizes along certain 2744 dimensions. This uses the ``limit=`` and ``dtype=`` keywords to 2745 determine how large to make the chunks. The term "auto" can be used 2746 anywhere an integer can be used. See array chunking documentation for more 2747 information. 2748 2749 >>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8') 2750 ((5, 5, 5, 5),) 2751 2752 You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of 2753 "auto" to ask for a particular size 2754 2755 >>> normalize_chunks("1kiB", shape=(2000,), dtype='float32') 2756 ((250, 250, 250, 250, 250, 250, 250, 250),) 2757 2758 Respects null dimensions 2759 2760 >>> normalize_chunks((), shape=(0, 0)) 2761 ((0,), (0,)) 2762 """ 2763 if dtype and not isinstance(dtype, np.dtype): 2764 dtype = np.dtype(dtype) 2765 if chunks is None: 2766 raise ValueError(CHUNKS_NONE_ERROR_MESSAGE) 2767 if isinstance(chunks, list): 2768 chunks = tuple(chunks) 2769 if isinstance(chunks, (Number, str)): 2770 chunks = (chunks,) * len(shape) 2771 if isinstance(chunks, dict): 2772 chunks = tuple(chunks.get(i, None) for i in range(len(shape))) 2773 if isinstance(chunks, np.ndarray): 2774 chunks = chunks.tolist() 2775 if not chunks and shape and all(s == 0 for s in shape): 2776 chunks = ((0,),) * len(shape) 2777 2778 if ( 2779 shape 2780 and len(shape) == 1 2781 and len(chunks) > 1 2782 and all(isinstance(c, (Number, str)) for c in chunks) 2783 ): 2784 chunks = (chunks,) 2785 2786 if shape and len(chunks) != len(shape): 2787 raise ValueError( 2788 "Chunks and shape must be of the same length/dimension. " 2789 "Got chunks=%s, shape=%s" % (chunks, shape) 2790 ) 2791 if -1 in chunks or None in chunks: 2792 chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape)) 2793 2794 # If specifying chunk size in bytes, use that value to set the limit. 2795 # Verify there is only one consistent value of limit or chunk-bytes used. 2796 for c in chunks: 2797 if isinstance(c, str) and c != "auto": 2798 parsed = parse_bytes(c) 2799 if limit is None: 2800 limit = parsed 2801 elif parsed != limit: 2802 raise ValueError( 2803 "Only one consistent value of limit or chunk is allowed." 2804 "Used %s != %s" % (parsed, limit) 2805 ) 2806 # Substitute byte limits with 'auto' now that limit is set. 2807 chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks) 2808 2809 if any(c == "auto" for c in chunks): 2810 chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks) 2811 2812 if shape is not None: 2813 chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape)) 2814 2815 if chunks and shape is not None: 2816 chunks = sum( 2817 ( 2818 blockdims_from_blockshape((s,), (c,)) 2819 if not isinstance(c, (tuple, list)) 2820 else (c,) 2821 for s, c in zip(shape, chunks) 2822 ), 2823 (), 2824 ) 2825 for c in chunks: 2826 if not c: 2827 raise ValueError( 2828 "Empty tuples are not allowed in chunks. Express " 2829 "zero length dimensions with 0(s) in chunks" 2830 ) 2831 2832 if shape is not None: 2833 if len(chunks) != len(shape): 2834 raise ValueError( 2835 "Input array has %d dimensions but the supplied " 2836 "chunks has only %d dimensions" % (len(shape), len(chunks)) 2837 ) 2838 if not all( 2839 c == s or (math.isnan(c) or math.isnan(s)) 2840 for c, s in zip(map(sum, chunks), shape) 2841 ): 2842 raise ValueError( 2843 "Chunks do not add up to shape. " 2844 "Got chunks=%s, shape=%s" % (chunks, shape) 2845 ) 2846 2847 return tuple(tuple(int(x) if not math.isnan(x) else x for x in c) for c in chunks) 2848 2849 2850def _compute_multiplier(limit: int, dtype, largest_block: int, result): 2851 """ 2852 Utility function for auto_chunk, to fin how much larger or smaller the ideal 2853 chunk size is relative to what we have now. 2854 """ 2855 return ( 2856 limit 2857 / dtype.itemsize 2858 / largest_block 2859 / np.prod(list(r if r != 0 else 1 for r in result.values())) 2860 ) 2861 2862 2863def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): 2864 """Determine automatic chunks 2865 2866 This takes in a chunks value that contains ``"auto"`` values in certain 2867 dimensions and replaces those values with concrete dimension sizes that try 2868 to get chunks to be of a certain size in bytes, provided by the ``limit=`` 2869 keyword. If multiple dimensions are marked as ``"auto"`` then they will 2870 all respond to meet the desired byte limit, trying to respect the aspect 2871 ratio of their dimensions in ``previous_chunks=``, if given. 2872 2873 Parameters 2874 ---------- 2875 chunks: Tuple 2876 A tuple of either dimensions or tuples of explicit chunk dimensions 2877 Some entries should be "auto" 2878 shape: Tuple[int] 2879 limit: int, str 2880 The maximum allowable size of a chunk in bytes 2881 previous_chunks: Tuple[Tuple[int]] 2882 2883 See also 2884 -------- 2885 normalize_chunks: for full docstring and parameters 2886 """ 2887 if previous_chunks is not None: 2888 previous_chunks = tuple( 2889 c if isinstance(c, tuple) else (c,) for c in previous_chunks 2890 ) 2891 chunks = list(chunks) 2892 2893 autos = {i for i, c in enumerate(chunks) if c == "auto"} 2894 if not autos: 2895 return tuple(chunks) 2896 2897 if limit is None: 2898 limit = config.get("array.chunk-size") 2899 if isinstance(limit, str): 2900 limit = parse_bytes(limit) 2901 2902 if dtype is None: 2903 raise TypeError("dtype must be known for auto-chunking") 2904 2905 if dtype.hasobject: 2906 raise NotImplementedError( 2907 "Can not use auto rechunking with object dtype. " 2908 "We are unable to estimate the size in bytes of object data" 2909 ) 2910 2911 for x in tuple(chunks) + tuple(shape): 2912 if ( 2913 isinstance(x, Number) 2914 and np.isnan(x) 2915 or isinstance(x, tuple) 2916 and np.isnan(x).any() 2917 ): 2918 raise ValueError( 2919 "Can not perform automatic rechunking with unknown " 2920 "(nan) chunk sizes.%s" % unknown_chunk_message 2921 ) 2922 2923 limit = max(1, limit) 2924 2925 largest_block = np.prod( 2926 [cs if isinstance(cs, Number) else max(cs) for cs in chunks if cs != "auto"] 2927 ) 2928 2929 if previous_chunks: 2930 # Base ideal ratio on the median chunk size of the previous chunks 2931 result = {a: np.median(previous_chunks[a]) for a in autos} 2932 2933 ideal_shape = [] 2934 for i, s in enumerate(shape): 2935 chunk_frequencies = frequencies(previous_chunks[i]) 2936 mode, count = max(chunk_frequencies.items(), key=lambda kv: kv[1]) 2937 if mode > 1 and count >= len(previous_chunks[i]) / 2: 2938 ideal_shape.append(mode) 2939 else: 2940 ideal_shape.append(s) 2941 2942 # How much larger or smaller the ideal chunk size is relative to what we have now 2943 multiplier = _compute_multiplier(limit, dtype, largest_block, result) 2944 2945 last_multiplier = 0 2946 last_autos = set() 2947 while ( 2948 multiplier != last_multiplier or autos != last_autos 2949 ): # while things change 2950 last_multiplier = multiplier # record previous values 2951 last_autos = set(autos) # record previous values 2952 2953 # Expand or contract each of the dimensions appropriately 2954 for a in sorted(autos): 2955 if ideal_shape[a] == 0: 2956 result[a] = 0 2957 continue 2958 proposed = result[a] * multiplier ** (1 / len(autos)) 2959 if proposed > shape[a]: # we've hit the shape boundary 2960 autos.remove(a) 2961 largest_block *= shape[a] 2962 chunks[a] = shape[a] 2963 del result[a] 2964 else: 2965 result[a] = round_to(proposed, ideal_shape[a]) 2966 2967 # recompute how much multiplier we have left, repeat 2968 multiplier = _compute_multiplier(limit, dtype, largest_block, result) 2969 2970 for k, v in result.items(): 2971 chunks[k] = v 2972 return tuple(chunks) 2973 2974 else: 2975 size = (limit / dtype.itemsize / largest_block) ** (1 / len(autos)) 2976 small = [i for i in autos if shape[i] < size] 2977 if small: 2978 for i in small: 2979 chunks[i] = (shape[i],) 2980 return auto_chunks(chunks, shape, limit, dtype) 2981 2982 for i in autos: 2983 chunks[i] = round_to(size, shape[i]) 2984 2985 return tuple(chunks) 2986 2987 2988def round_to(c, s): 2989 """Return a chunk dimension that is close to an even multiple or factor 2990 2991 We want values for c that are nicely aligned with s. 2992 2993 If c is smaller than s then we want the largest factor of s that is less than the 2994 desired chunk size, but not less than half, which is too much. If no such 2995 factor exists then we just go with the original chunk size and accept an 2996 uneven chunk at the end. 2997 2998 If c is larger than s then we want the largest multiple of s that is still 2999 smaller than c. 3000 """ 3001 if c <= s: 3002 try: 3003 return max(f for f in factors(s) if c / 2 <= f <= c) 3004 except ValueError: # no matching factors within factor of two 3005 return max(1, int(c)) 3006 else: 3007 return c // s * s 3008 3009 3010def _get_chunk_shape(a): 3011 s = np.asarray(a.shape, dtype=int) 3012 return s[len(s) * (None,) + (slice(None),)] 3013 3014 3015def from_array( 3016 x, 3017 chunks="auto", 3018 name=None, 3019 lock=False, 3020 asarray=None, 3021 fancy=True, 3022 getitem=None, 3023 meta=None, 3024 inline_array=False, 3025): 3026 """Create dask array from something that looks like an array. 3027 3028 Input must have a ``.shape``, ``.ndim``, ``.dtype`` and support numpy-style slicing. 3029 3030 Parameters 3031 ---------- 3032 x : array_like 3033 chunks : int, tuple 3034 How to chunk the array. Must be one of the following forms: 3035 3036 - A blocksize like 1000. 3037 - A blockshape like (1000, 1000). 3038 - Explicit sizes of all blocks along all dimensions like 3039 ((1000, 1000, 500), (400, 400)). 3040 - A size in bytes, like "100 MiB" which will choose a uniform 3041 block-like shape 3042 - The word "auto" which acts like the above, but uses a configuration 3043 value ``array.chunk-size`` for the chunk size 3044 3045 -1 or None as a blocksize indicate the size of the corresponding 3046 dimension. 3047 name : str or bool, optional 3048 The key name to use for the array. Defaults to a hash of ``x``. 3049 3050 Hashing is useful if the same value of ``x`` is used to create multiple 3051 arrays, as Dask can then recognise that they're the same and 3052 avoid duplicate computations. However, it can also be slow, and if the 3053 array is not contiguous it is copied for hashing. If the array uses 3054 stride tricks (such as :func:`numpy.broadcast_to` or 3055 :func:`skimage.util.view_as_windows`) to have a larger logical 3056 than physical size, this copy can cause excessive memory usage. 3057 3058 If you don't need the deduplication provided by hashing, use 3059 ``name=False`` to generate a random name instead of hashing, which 3060 avoids the pitfalls described above. Using ``name=True`` is 3061 equivalent to the default. 3062 3063 By default, hashing uses python's standard sha1. This behaviour can be 3064 changed by installing cityhash, xxhash or murmurhash. If installed, 3065 a large-factor speedup can be obtained in the tokenisation step. 3066 3067 .. note:: 3068 3069 Because this ``name`` is used as the key in task graphs, you should 3070 ensure that it uniquely identifies the data contained within. If 3071 you'd like to provide a descriptive name that is still unique, combine 3072 the descriptive name with :func:`dask.base.tokenize` of the 3073 ``array_like``. See :ref:`graphs` for more. 3074 3075 lock : bool or Lock, optional 3076 If ``x`` doesn't support concurrent reads then provide a lock here, or 3077 pass in True to have dask.array create one for you. 3078 asarray : bool, optional 3079 If True then call np.asarray on chunks to convert them to numpy arrays. 3080 If False then chunks are passed through unchanged. 3081 If None (default) then we use True if the ``__array_function__`` method 3082 is undefined. 3083 fancy : bool, optional 3084 If ``x`` doesn't support fancy indexing (e.g. indexing with lists or 3085 arrays) then set to False. Default is True. 3086 meta : Array-like, optional 3087 The metadata for the resulting dask array. This is the kind of array 3088 that will result from slicing the input array. 3089 Defaults to the input array. 3090 inline_array : bool, default False 3091 How to include the array in the task graph. By default 3092 (``inline_array=False``) the array is included in a task by itself, 3093 and each chunk refers to that task by its key. 3094 3095 .. code-block:: python 3096 3097 >>> x = h5py.File("data.h5")["/x"] # doctest: +SKIP 3098 >>> a = da.from_array(x, chunks=500) # doctest: +SKIP 3099 >>> dict(a.dask) # doctest: +SKIP 3100 { 3101 'array-original-<name>': <HDF5 dataset ...>, 3102 ('array-<name>', 0): (getitem, "array-original-<name>", ...), 3103 ('array-<name>', 1): (getitem, "array-original-<name>", ...) 3104 } 3105 3106 With ``inline_array=True``, Dask will instead inline the array directly 3107 in the values of the task graph. 3108 3109 .. code-block:: python 3110 3111 >>> a = da.from_array(x, chunks=500, inline_array=True) # doctest: +SKIP 3112 >>> dict(a.dask) # doctest: +SKIP 3113 { 3114 ('array-<name>', 0): (getitem, <HDF5 dataset ...>, ...), 3115 ('array-<name>', 1): (getitem, <HDF5 dataset ...>, ...) 3116 } 3117 3118 Note that there's no key in the task graph with just the array `x` 3119 anymore. Instead it's placed directly in the values. 3120 3121 The right choice for ``inline_array`` depends on several factors, 3122 including the size of ``x``, how expensive it is to create, which 3123 scheduler you're using, and the pattern of downstream computations. 3124 As a heuristic, ``inline_array=True`` may be the right choice when 3125 the array ``x`` is cheap to serialize and deserialize (since it's 3126 included in the graph many times) and if you're experiencing ordering 3127 issues (see :ref:`order` for more). 3128 3129 This has no effect when ``x`` is a NumPy array. 3130 3131 Examples 3132 -------- 3133 3134 >>> x = h5py.File('...')['/data/path'] # doctest: +SKIP 3135 >>> a = da.from_array(x, chunks=(1000, 1000)) # doctest: +SKIP 3136 3137 If your underlying datastore does not support concurrent reads then include 3138 the ``lock=True`` keyword argument or ``lock=mylock`` if you want multiple 3139 arrays to coordinate around the same lock. 3140 3141 >>> a = da.from_array(x, chunks=(1000, 1000), lock=True) # doctest: +SKIP 3142 3143 If your underlying datastore has a ``.chunks`` attribute (as h5py and zarr 3144 datasets do) then a multiple of that chunk shape will be used if you 3145 do not provide a chunk shape. 3146 3147 >>> a = da.from_array(x, chunks='auto') # doctest: +SKIP 3148 >>> a = da.from_array(x, chunks='100 MiB') # doctest: +SKIP 3149 >>> a = da.from_array(x) # doctest: +SKIP 3150 3151 If providing a name, ensure that it is unique 3152 3153 >>> import dask.base 3154 >>> token = dask.base.tokenize(x) # doctest: +SKIP 3155 >>> a = da.from_array('myarray-' + token) # doctest: +SKIP 3156 3157 NumPy ndarrays are eagerly sliced and then embedded in the graph. 3158 3159 >>> import dask.array 3160 >>> a = dask.array.from_array(np.array([[1, 2], [3, 4]]), chunks=(1,1)) 3161 >>> a.dask[a.name, 0, 0][0] 3162 array([1]) 3163 3164 Chunks with exactly-specified, different sizes can be created. 3165 3166 >>> import numpy as np 3167 >>> import dask.array as da 3168 >>> x = np.random.random((100, 6)) 3169 >>> a = da.from_array(x, chunks=((67, 33), (6,))) 3170 """ 3171 if isinstance(x, Array): 3172 raise ValueError( 3173 "Array is already a dask array. Use 'asarray' or " "'rechunk' instead." 3174 ) 3175 elif is_dask_collection(x): 3176 warnings.warn( 3177 "Passing an object to dask.array.from_array which is already a " 3178 "Dask collection. This can lead to unexpected behavior." 3179 ) 3180 3181 if isinstance(x, (list, tuple, memoryview) + np.ScalarType): 3182 x = np.array(x) 3183 3184 if asarray is None: 3185 asarray = not hasattr(x, "__array_function__") 3186 3187 previous_chunks = getattr(x, "chunks", None) 3188 3189 chunks = normalize_chunks( 3190 chunks, x.shape, dtype=x.dtype, previous_chunks=previous_chunks 3191 ) 3192 3193 if name in (None, True): 3194 token = tokenize(x, chunks) 3195 original_name = "array-original-" + token 3196 name = name or "array-" + token 3197 elif name is False: 3198 original_name = name = "array-" + str(uuid.uuid1()) 3199 else: 3200 original_name = name 3201 3202 if lock is True: 3203 lock = SerializableLock() 3204 3205 is_ndarray = type(x) is np.ndarray 3206 is_single_block = all(len(c) == 1 for c in chunks) 3207 # Always use the getter for h5py etc. Not using isinstance(x, np.ndarray) 3208 # because np.matrix is a subclass of np.ndarray. 3209 if is_ndarray and not is_single_block and not lock: 3210 # eagerly slice numpy arrays to prevent memory blowup 3211 # GH5367, GH5601 3212 slices = slices_from_chunks(chunks) 3213 keys = product([name], *(range(len(bds)) for bds in chunks)) 3214 values = [x[slc] for slc in slices] 3215 dsk = dict(zip(keys, values)) 3216 3217 elif is_ndarray and is_single_block: 3218 # No slicing needed 3219 dsk = {(name,) + (0,) * x.ndim: x} 3220 else: 3221 if getitem is None: 3222 if fancy: 3223 getitem = getter 3224 else: 3225 getitem = getter_nofancy 3226 3227 if inline_array: 3228 get_from = x 3229 else: 3230 get_from = original_name 3231 3232 dsk = getem( 3233 get_from, 3234 chunks, 3235 getitem=getitem, 3236 shape=x.shape, 3237 out_name=name, 3238 lock=lock, 3239 asarray=asarray, 3240 dtype=x.dtype, 3241 ) 3242 if not inline_array: 3243 dsk[original_name] = x 3244 3245 # Workaround for TileDB, its indexing is 1-based, 3246 # and doesn't seems to support 0-length slicing 3247 if x.__class__.__module__.split(".")[0] == "tiledb" and hasattr(x, "_ctx_"): 3248 return Array(dsk, name, chunks, dtype=x.dtype) 3249 3250 if meta is None: 3251 meta = x 3252 3253 return Array(dsk, name, chunks, meta=meta, dtype=getattr(x, "dtype", None)) 3254 3255 3256def from_zarr( 3257 url, 3258 component=None, 3259 storage_options=None, 3260 chunks=None, 3261 name=None, 3262 inline_array=False, 3263 **kwargs, 3264): 3265 """Load array from the zarr storage format 3266 3267 See https://zarr.readthedocs.io for details about the format. 3268 3269 Parameters 3270 ---------- 3271 url: Zarr Array or str or MutableMapping 3272 Location of the data. A URL can include a protocol specifier like s3:// 3273 for remote data. Can also be any MutableMapping instance, which should 3274 be serializable if used in multiple processes. 3275 component: str or None 3276 If the location is a zarr group rather than an array, this is the 3277 subcomponent that should be loaded, something like ``'foo/bar'``. 3278 storage_options: dict 3279 Any additional parameters for the storage backend (ignored for local 3280 paths) 3281 chunks: tuple of ints or tuples of ints 3282 Passed to :func:`dask.array.from_array`, allows setting the chunks on 3283 initialisation, if the chunking scheme in the on-disc dataset is not 3284 optimal for the calculations to follow. 3285 name : str, optional 3286 An optional keyname for the array. Defaults to hashing the input 3287 kwargs: 3288 Passed to :class:`zarr.core.Array`. 3289 inline_array : bool, default False 3290 Whether to inline the zarr Array in the values of the task graph. 3291 See :meth:`dask.array.from_array` for an explanation. 3292 3293 See Also 3294 -------- 3295 from_array 3296 """ 3297 import zarr 3298 3299 storage_options = storage_options or {} 3300 if isinstance(url, zarr.Array): 3301 z = url 3302 elif isinstance(url, (str, os.PathLike)): 3303 if isinstance(url, os.PathLike): 3304 url = os.fspath(url) 3305 mapper = get_mapper(url, **storage_options) 3306 z = zarr.Array(mapper, read_only=True, path=component, **kwargs) 3307 else: 3308 mapper = url 3309 z = zarr.Array(mapper, read_only=True, path=component, **kwargs) 3310 chunks = chunks if chunks is not None else z.chunks 3311 if name is None: 3312 name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs) 3313 return from_array(z, chunks, name=name, inline_array=inline_array) 3314 3315 3316def to_zarr( 3317 arr, 3318 url, 3319 component=None, 3320 storage_options=None, 3321 overwrite=False, 3322 compute=True, 3323 return_stored=False, 3324 **kwargs, 3325): 3326 """Save array to the zarr storage format 3327 3328 See https://zarr.readthedocs.io for details about the format. 3329 3330 Parameters 3331 ---------- 3332 arr: dask.array 3333 Data to store 3334 url: Zarr Array or str or MutableMapping 3335 Location of the data. A URL can include a protocol specifier like s3:// 3336 for remote data. Can also be any MutableMapping instance, which should 3337 be serializable if used in multiple processes. 3338 component: str or None 3339 If the location is a zarr group rather than an array, this is the 3340 subcomponent that should be created/over-written. 3341 storage_options: dict 3342 Any additional parameters for the storage backend (ignored for local 3343 paths) 3344 overwrite: bool 3345 If given array already exists, overwrite=False will cause an error, 3346 where overwrite=True will replace the existing data. 3347 compute: bool 3348 See :func:`~dask.array.store` for more details. 3349 return_stored: bool 3350 See :func:`~dask.array.store` for more details. 3351 **kwargs: 3352 Passed to the :func:`zarr.creation.create` function, e.g., compression options. 3353 3354 Raises 3355 ------ 3356 ValueError 3357 If ``arr`` has unknown chunk sizes, which is not supported by Zarr. 3358 3359 See Also 3360 -------- 3361 dask.array.store 3362 dask.array.Array.compute_chunk_sizes 3363 3364 """ 3365 import zarr 3366 3367 if np.isnan(arr.shape).any(): 3368 raise ValueError( 3369 "Saving a dask array with unknown chunk sizes is not " 3370 "currently supported by Zarr.%s" % unknown_chunk_message 3371 ) 3372 3373 if isinstance(url, zarr.Array): 3374 z = url 3375 if isinstance(z.store, (dict, MutableMapping)) and "distributed" in config.get( 3376 "scheduler", "" 3377 ): 3378 raise RuntimeError( 3379 "Cannot store into in memory Zarr Array using " 3380 "the Distributed Scheduler." 3381 ) 3382 arr = arr.rechunk(z.chunks) 3383 return arr.store(z, lock=False, compute=compute, return_stored=return_stored) 3384 3385 if not _check_regular_chunks(arr.chunks): 3386 raise ValueError( 3387 "Attempt to save array to zarr with irregular " 3388 "chunking, please call `arr.rechunk(...)` first." 3389 ) 3390 3391 storage_options = storage_options or {} 3392 3393 if isinstance(url, str): 3394 mapper = get_mapper(url, **storage_options) 3395 else: 3396 # assume the object passed is already a mapper 3397 mapper = url 3398 3399 chunks = [c[0] for c in arr.chunks] 3400 3401 z = zarr.create( 3402 shape=arr.shape, 3403 chunks=chunks, 3404 dtype=arr.dtype, 3405 store=mapper, 3406 path=component, 3407 overwrite=overwrite, 3408 **kwargs, 3409 ) 3410 return arr.store(z, lock=False, compute=compute, return_stored=return_stored) 3411 3412 3413def _check_regular_chunks(chunkset): 3414 """Check if the chunks are regular 3415 3416 "Regular" in this context means that along every axis, the chunks all 3417 have the same size, except the last one, which may be smaller 3418 3419 Parameters 3420 ---------- 3421 chunkset: tuple of tuples of ints 3422 From the ``.chunks`` attribute of an ``Array`` 3423 3424 Returns 3425 ------- 3426 True if chunkset passes, else False 3427 3428 Examples 3429 -------- 3430 >>> import dask.array as da 3431 >>> arr = da.zeros(10, chunks=(5, )) 3432 >>> _check_regular_chunks(arr.chunks) 3433 True 3434 3435 >>> arr = da.zeros(10, chunks=((3, 3, 3, 1), )) 3436 >>> _check_regular_chunks(arr.chunks) 3437 True 3438 3439 >>> arr = da.zeros(10, chunks=((3, 1, 3, 3), )) 3440 >>> _check_regular_chunks(arr.chunks) 3441 False 3442 """ 3443 for chunks in chunkset: 3444 if len(chunks) == 1: 3445 continue 3446 if len(set(chunks[:-1])) > 1: 3447 return False 3448 if chunks[-1] > chunks[0]: 3449 return False 3450 return True 3451 3452 3453def from_delayed(value, shape, dtype=None, meta=None, name=None): 3454 """Create a dask array from a dask delayed value 3455 3456 This routine is useful for constructing dask arrays in an ad-hoc fashion 3457 using dask delayed, particularly when combined with stack and concatenate. 3458 3459 The dask array will consist of a single chunk. 3460 3461 Examples 3462 -------- 3463 >>> import dask 3464 >>> import dask.array as da 3465 >>> import numpy as np 3466 >>> value = dask.delayed(np.ones)(5) 3467 >>> array = da.from_delayed(value, (5,), dtype=float) 3468 >>> array 3469 dask.array<from-value, shape=(5,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray> 3470 >>> array.compute() 3471 array([1., 1., 1., 1., 1.]) 3472 """ 3473 from ..delayed import Delayed, delayed 3474 3475 if not isinstance(value, Delayed) and hasattr(value, "key"): 3476 value = delayed(value) 3477 3478 name = name or "from-value-" + tokenize(value, shape, dtype, meta) 3479 dsk = {(name,) + (0,) * len(shape): value.key} 3480 chunks = tuple((d,) for d in shape) 3481 # TODO: value._key may not be the name of the layer in value.dask 3482 # This should be fixed after we build full expression graphs 3483 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[value]) 3484 return Array(graph, name, chunks, dtype=dtype, meta=meta) 3485 3486 3487def from_func(func, shape, dtype=None, name=None, args=(), kwargs={}): 3488 """Create dask array in a single block by calling a function 3489 3490 Calling the provided function with func(*args, **kwargs) should return a 3491 NumPy array of the indicated shape and dtype. 3492 3493 Examples 3494 -------- 3495 3496 >>> a = from_func(np.arange, (3,), dtype='i8', args=(3,)) 3497 >>> a.compute() 3498 array([0, 1, 2]) 3499 3500 This works particularly well when coupled with dask.array functions like 3501 concatenate and stack: 3502 3503 >>> arrays = [from_func(np.array, (), dtype='i8', args=(n,)) for n in range(5)] 3504 >>> stack(arrays).compute() 3505 array([0, 1, 2, 3, 4]) 3506 """ 3507 name = name or "from_func-" + tokenize(func, shape, dtype, args, kwargs) 3508 if args or kwargs: 3509 func = partial(func, *args, **kwargs) 3510 dsk = {(name,) + (0,) * len(shape): (func,)} 3511 chunks = tuple((i,) for i in shape) 3512 return Array(dsk, name, chunks, dtype) 3513 3514 3515def common_blockdim(blockdims): 3516 """Find the common block dimensions from the list of block dimensions 3517 3518 Currently only implements the simplest possible heuristic: the common 3519 block-dimension is the only one that does not span fully span a dimension. 3520 This is a conservative choice that allows us to avoid potentially very 3521 expensive rechunking. 3522 3523 Assumes that each element of the input block dimensions has all the same 3524 sum (i.e., that they correspond to dimensions of the same size). 3525 3526 Examples 3527 -------- 3528 >>> common_blockdim([(3,), (2, 1)]) 3529 (2, 1) 3530 >>> common_blockdim([(1, 2), (2, 1)]) 3531 (1, 1, 1) 3532 >>> common_blockdim([(2, 2), (3, 1)]) # doctest: +SKIP 3533 Traceback (most recent call last): 3534 ... 3535 ValueError: Chunks do not align 3536 """ 3537 if not any(blockdims): 3538 return () 3539 non_trivial_dims = {d for d in blockdims if len(d) > 1} 3540 if len(non_trivial_dims) == 1: 3541 return first(non_trivial_dims) 3542 if len(non_trivial_dims) == 0: 3543 return max(blockdims, key=first) 3544 3545 if np.isnan(sum(map(sum, blockdims))): 3546 raise ValueError( 3547 "Arrays' chunk sizes (%s) are unknown.\n\n" 3548 "A possible solution:\n" 3549 " x.compute_chunk_sizes()" % blockdims 3550 ) 3551 3552 if len(set(map(sum, non_trivial_dims))) > 1: 3553 raise ValueError("Chunks do not add up to same value", blockdims) 3554 3555 # We have multiple non-trivial chunks on this axis 3556 # e.g. (5, 2) and (4, 3) 3557 3558 # We create a single chunk tuple with the same total length 3559 # that evenly divides both, e.g. (4, 1, 2) 3560 3561 # To accomplish this we walk down all chunk tuples together, finding the 3562 # smallest element, adding it to the output, and subtracting it from all 3563 # other elements and remove the element itself. We stop once we have 3564 # burned through all of the chunk tuples. 3565 # For efficiency's sake we reverse the lists so that we can pop off the end 3566 rchunks = [list(ntd)[::-1] for ntd in non_trivial_dims] 3567 total = sum(first(non_trivial_dims)) 3568 i = 0 3569 3570 out = [] 3571 while i < total: 3572 m = min(c[-1] for c in rchunks) 3573 out.append(m) 3574 for c in rchunks: 3575 c[-1] -= m 3576 if c[-1] == 0: 3577 c.pop() 3578 i += m 3579 3580 return tuple(out) 3581 3582 3583def unify_chunks(*args, **kwargs): 3584 """ 3585 Unify chunks across a sequence of arrays 3586 3587 This utility function is used within other common operations like 3588 :func:`dask.array.core.map_blocks` and :func:`dask.array.core.blockwise`. 3589 It is not commonly used by end-users directly. 3590 3591 Parameters 3592 ---------- 3593 *args: sequence of Array, index pairs 3594 Sequence like (x, 'ij', y, 'jk', z, 'i') 3595 3596 Examples 3597 -------- 3598 >>> import dask.array as da 3599 >>> x = da.ones(10, chunks=((5, 2, 3),)) 3600 >>> y = da.ones(10, chunks=((2, 3, 5),)) 3601 >>> chunkss, arrays = unify_chunks(x, 'i', y, 'i') 3602 >>> chunkss 3603 {'i': (2, 3, 2, 3)} 3604 3605 >>> x = da.ones((100, 10), chunks=(20, 5)) 3606 >>> y = da.ones((10, 100), chunks=(4, 50)) 3607 >>> chunkss, arrays = unify_chunks(x, 'ij', y, 'jk', 'constant', None) 3608 >>> chunkss # doctest: +SKIP 3609 {'k': (50, 50), 'i': (20, 20, 20, 20, 20), 'j': (4, 1, 3, 2)} 3610 3611 >>> unify_chunks(0, None) 3612 ({}, [0]) 3613 3614 Returns 3615 ------- 3616 chunkss : dict 3617 Map like {index: chunks}. 3618 arrays : list 3619 List of rechunked arrays. 3620 3621 See Also 3622 -------- 3623 common_blockdim 3624 """ 3625 if not args: 3626 return {}, [] 3627 3628 arginds = [ 3629 (asanyarray(a) if ind is not None else a, ind) for a, ind in partition(2, args) 3630 ] # [x, ij, y, jk] 3631 warn = kwargs.get("warn", True) 3632 3633 arrays, inds = zip(*arginds) 3634 if all(ind is None for ind in inds): 3635 return {}, list(arrays) 3636 if all(ind == inds[0] for ind in inds) and all( 3637 a.chunks == arrays[0].chunks for a in arrays 3638 ): 3639 return dict(zip(inds[0], arrays[0].chunks)), arrays 3640 3641 nameinds = [] 3642 blockdim_dict = dict() 3643 max_parts = 0 3644 for a, ind in arginds: 3645 if ind is not None: 3646 nameinds.append((a.name, ind)) 3647 blockdim_dict[a.name] = a.chunks 3648 max_parts = max(max_parts, a.npartitions) 3649 else: 3650 nameinds.append((a, ind)) 3651 3652 chunkss = broadcast_dimensions(nameinds, blockdim_dict, consolidate=common_blockdim) 3653 nparts = np.prod(list(map(len, chunkss.values()))) 3654 3655 if warn and nparts and nparts >= max_parts * 10: 3656 warnings.warn( 3657 "Increasing number of chunks by factor of %d" % (nparts / max_parts), 3658 PerformanceWarning, 3659 stacklevel=3, 3660 ) 3661 3662 arrays = [] 3663 for a, i in arginds: 3664 if i is None: 3665 arrays.append(a) 3666 else: 3667 chunks = tuple( 3668 chunkss[j] 3669 if a.shape[n] > 1 3670 else a.shape[n] 3671 if not np.isnan(sum(chunkss[j])) 3672 else None 3673 for n, j in enumerate(i) 3674 ) 3675 if chunks != a.chunks and all(a.chunks): 3676 arrays.append(a.rechunk(chunks)) 3677 else: 3678 arrays.append(a) 3679 return chunkss, arrays 3680 3681 3682def unpack_singleton(x): 3683 """ 3684 3685 >>> unpack_singleton([[[[1]]]]) 3686 1 3687 >>> unpack_singleton(np.array(np.datetime64('2000-01-01'))) 3688 array('2000-01-01', dtype='datetime64[D]') 3689 """ 3690 while isinstance(x, (list, tuple)): 3691 try: 3692 x = x[0] 3693 except (IndexError, TypeError, KeyError): 3694 break 3695 return x 3696 3697 3698def block(arrays, allow_unknown_chunksizes=False): 3699 """ 3700 Assemble an nd-array from nested lists of blocks. 3701 3702 Blocks in the innermost lists are concatenated along the last 3703 dimension (-1), then these are concatenated along the second-last 3704 dimension (-2), and so on until the outermost list is reached 3705 3706 Blocks can be of any dimension, but will not be broadcasted using the normal 3707 rules. Instead, leading axes of size 1 are inserted, to make ``block.ndim`` 3708 the same for all blocks. This is primarily useful for working with scalars, 3709 and means that code like ``block([v, 1])`` is valid, where 3710 ``v.ndim == 1``. 3711 3712 When the nested list is two levels deep, this allows block matrices to be 3713 constructed from their components. 3714 3715 Parameters 3716 ---------- 3717 arrays : nested list of array_like or scalars (but not tuples) 3718 If passed a single ndarray or scalar (a nested list of depth 0), this 3719 is returned unmodified (and not copied). 3720 3721 Elements shapes must match along the appropriate axes (without 3722 broadcasting), but leading 1s will be prepended to the shape as 3723 necessary to make the dimensions match. 3724 3725 allow_unknown_chunksizes: bool 3726 Allow unknown chunksizes, such as come from converting from dask 3727 dataframes. Dask.array is unable to verify that chunks line up. If 3728 data comes from differently aligned sources then this can cause 3729 unexpected results. 3730 3731 Returns 3732 ------- 3733 block_array : ndarray 3734 The array assembled from the given blocks. 3735 3736 The dimensionality of the output is equal to the greatest of: 3737 * the dimensionality of all the inputs 3738 * the depth to which the input list is nested 3739 3740 Raises 3741 ------ 3742 ValueError 3743 * If list depths are mismatched - for instance, ``[[a, b], c]`` is 3744 illegal, and should be spelt ``[[a, b], [c]]`` 3745 * If lists are empty - for instance, ``[[a, b], []]`` 3746 3747 See Also 3748 -------- 3749 concatenate : Join a sequence of arrays together. 3750 stack : Stack arrays in sequence along a new dimension. 3751 hstack : Stack arrays in sequence horizontally (column wise). 3752 vstack : Stack arrays in sequence vertically (row wise). 3753 dstack : Stack arrays in sequence depth wise (along third dimension). 3754 vsplit : Split array into a list of multiple sub-arrays vertically. 3755 3756 Notes 3757 ----- 3758 3759 When called with only scalars, ``block`` is equivalent to an ndarray 3760 call. So ``block([[1, 2], [3, 4]])`` is equivalent to 3761 ``array([[1, 2], [3, 4]])``. 3762 3763 This function does not enforce that the blocks lie on a fixed grid. 3764 ``block([[a, b], [c, d]])`` is not restricted to arrays of the form:: 3765 3766 AAAbb 3767 AAAbb 3768 cccDD 3769 3770 But is also allowed to produce, for some ``a, b, c, d``:: 3771 3772 AAAbb 3773 AAAbb 3774 cDDDD 3775 3776 Since concatenation happens along the last axis first, `block` is _not_ 3777 capable of producing the following directly:: 3778 3779 AAAbb 3780 cccbb 3781 cccDD 3782 3783 Matlab's "square bracket stacking", ``[A, B, ...; p, q, ...]``, is 3784 equivalent to ``block([[A, B, ...], [p, q, ...]])``. 3785 """ 3786 3787 # This was copied almost verbatim from numpy.core.shape_base.block 3788 # See numpy license at https://github.com/numpy/numpy/blob/master/LICENSE.txt 3789 # or NUMPY_LICENSE.txt within this directory 3790 3791 def atleast_nd(x, ndim): 3792 x = asanyarray(x) 3793 diff = max(ndim - x.ndim, 0) 3794 if diff == 0: 3795 return x 3796 else: 3797 return x[(None,) * diff + (Ellipsis,)] 3798 3799 def format_index(index): 3800 return "arrays" + "".join(f"[{i}]" for i in index) 3801 3802 rec = _Recurser(recurse_if=lambda x: type(x) is list) 3803 3804 # ensure that the lists are all matched in depth 3805 list_ndim = None 3806 any_empty = False 3807 for index, value, entering in rec.walk(arrays): 3808 if type(value) is tuple: 3809 # not strictly necessary, but saves us from: 3810 # - more than one way to do things - no point treating tuples like 3811 # lists 3812 # - horribly confusing behaviour that results when tuples are 3813 # treated like ndarray 3814 raise TypeError( 3815 "{} is a tuple. " 3816 "Only lists can be used to arrange blocks, and np.block does " 3817 "not allow implicit conversion from tuple to ndarray.".format( 3818 format_index(index) 3819 ) 3820 ) 3821 if not entering: 3822 curr_depth = len(index) 3823 elif len(value) == 0: 3824 curr_depth = len(index) + 1 3825 any_empty = True 3826 else: 3827 continue 3828 3829 if list_ndim is not None and list_ndim != curr_depth: 3830 raise ValueError( 3831 "List depths are mismatched. First element was at depth {}, " 3832 "but there is an element at depth {} ({})".format( 3833 list_ndim, curr_depth, format_index(index) 3834 ) 3835 ) 3836 list_ndim = curr_depth 3837 3838 # do this here so we catch depth mismatches first 3839 if any_empty: 3840 raise ValueError("Lists cannot be empty") 3841 3842 # convert all the arrays to ndarrays 3843 arrays = rec.map_reduce(arrays, f_map=asanyarray, f_reduce=list) 3844 3845 # determine the maximum dimension of the elements 3846 elem_ndim = rec.map_reduce(arrays, f_map=lambda xi: xi.ndim, f_reduce=max) 3847 ndim = max(list_ndim, elem_ndim) 3848 3849 # first axis to concatenate along 3850 first_axis = ndim - list_ndim 3851 3852 # Make all the elements the same dimension 3853 arrays = rec.map_reduce( 3854 arrays, f_map=lambda xi: atleast_nd(xi, ndim), f_reduce=list 3855 ) 3856 3857 # concatenate innermost lists on the right, outermost on the left 3858 return rec.map_reduce( 3859 arrays, 3860 f_reduce=lambda xs, axis: concatenate( 3861 list(xs), axis=axis, allow_unknown_chunksizes=allow_unknown_chunksizes 3862 ), 3863 f_kwargs=lambda axis: dict(axis=(axis + 1)), 3864 axis=first_axis, 3865 ) 3866 3867 3868def concatenate(seq, axis=0, allow_unknown_chunksizes=False): 3869 """ 3870 Concatenate arrays along an existing axis 3871 3872 Given a sequence of dask Arrays form a new dask Array by stacking them 3873 along an existing dimension (axis=0 by default) 3874 3875 Parameters 3876 ---------- 3877 seq: list of dask.arrays 3878 axis: int 3879 Dimension along which to align all of the arrays 3880 allow_unknown_chunksizes: bool 3881 Allow unknown chunksizes, such as come from converting from dask 3882 dataframes. Dask.array is unable to verify that chunks line up. If 3883 data comes from differently aligned sources then this can cause 3884 unexpected results. 3885 3886 Examples 3887 -------- 3888 3889 Create slices 3890 3891 >>> import dask.array as da 3892 >>> import numpy as np 3893 3894 >>> data = [da.from_array(np.ones((4, 4)), chunks=(2, 2)) 3895 ... for i in range(3)] 3896 3897 >>> x = da.concatenate(data, axis=0) 3898 >>> x.shape 3899 (12, 4) 3900 3901 >>> da.concatenate(data, axis=1).shape 3902 (4, 12) 3903 3904 Result is a new dask Array 3905 3906 See Also 3907 -------- 3908 stack 3909 """ 3910 from . import wrap 3911 3912 seq = [asarray(a, allow_unknown_chunksizes=allow_unknown_chunksizes) for a in seq] 3913 3914 if not seq: 3915 raise ValueError("Need array(s) to concatenate") 3916 3917 seq_metas = [meta_from_array(s) for s in seq] 3918 _concatenate = concatenate_lookup.dispatch( 3919 type(max(seq_metas, key=lambda x: getattr(x, "__array_priority__", 0))) 3920 ) 3921 meta = _concatenate(seq_metas, axis=axis) 3922 3923 # Promote types to match meta 3924 seq = [a.astype(meta.dtype) for a in seq] 3925 3926 # Find output array shape 3927 ndim = len(seq[0].shape) 3928 shape = tuple( 3929 sum(a.shape[i] for a in seq) if i == axis else seq[0].shape[i] 3930 for i in range(ndim) 3931 ) 3932 3933 # Drop empty arrays 3934 seq2 = [a for a in seq if a.size] 3935 if not seq2: 3936 seq2 = seq 3937 3938 if axis < 0: 3939 axis = ndim + axis 3940 if axis >= ndim: 3941 msg = ( 3942 "Axis must be less than than number of dimensions" 3943 "\nData has %d dimensions, but got axis=%d" 3944 ) 3945 raise ValueError(msg % (ndim, axis)) 3946 3947 n = len(seq2) 3948 if n == 0: 3949 try: 3950 return wrap.empty_like(meta, shape=shape, chunks=shape, dtype=meta.dtype) 3951 except TypeError: 3952 return wrap.empty(shape, chunks=shape, dtype=meta.dtype) 3953 elif n == 1: 3954 return seq2[0] 3955 3956 if not allow_unknown_chunksizes and not all( 3957 i == axis or all(x.shape[i] == seq2[0].shape[i] for x in seq2) 3958 for i in range(ndim) 3959 ): 3960 if any(map(np.isnan, seq2[0].shape)): 3961 raise ValueError( 3962 "Tried to concatenate arrays with unknown" 3963 " shape %s.\n\nTwo solutions:\n" 3964 " 1. Force concatenation pass" 3965 " allow_unknown_chunksizes=True.\n" 3966 " 2. Compute shapes with " 3967 "[x.compute_chunk_sizes() for x in seq]" % str(seq2[0].shape) 3968 ) 3969 raise ValueError("Shapes do not align: %s", [x.shape for x in seq2]) 3970 3971 inds = [list(range(ndim)) for i in range(n)] 3972 for i, ind in enumerate(inds): 3973 ind[axis] = -(i + 1) 3974 3975 uc_args = list(concat(zip(seq2, inds))) 3976 _, seq2 = unify_chunks(*uc_args, warn=False) 3977 3978 bds = [a.chunks for a in seq2] 3979 3980 chunks = ( 3981 seq2[0].chunks[:axis] 3982 + (sum((bd[axis] for bd in bds), ()),) 3983 + seq2[0].chunks[axis + 1 :] 3984 ) 3985 3986 cum_dims = [0] + list(accumulate(add, [len(a.chunks[axis]) for a in seq2])) 3987 3988 names = [a.name for a in seq2] 3989 3990 name = "concatenate-" + tokenize(names, axis) 3991 keys = list(product([name], *[range(len(bd)) for bd in chunks])) 3992 3993 values = [ 3994 (names[bisect(cum_dims, key[axis + 1]) - 1],) 3995 + key[1 : axis + 1] 3996 + (key[axis + 1] - cum_dims[bisect(cum_dims, key[axis + 1]) - 1],) 3997 + key[axis + 2 :] 3998 for key in keys 3999 ] 4000 4001 dsk = dict(zip(keys, values)) 4002 graph = HighLevelGraph.from_collections(name, dsk, dependencies=seq2) 4003 4004 return Array(graph, name, chunks, meta=meta) 4005 4006 4007def load_store_chunk(x, out, index, lock, return_stored, load_stored): 4008 """ 4009 A function inserted in a Dask graph for storing a chunk. 4010 4011 Parameters 4012 ---------- 4013 x: array-like 4014 An array (potentially a NumPy one) 4015 out: array-like 4016 Where to store results too. 4017 index: slice-like 4018 Where to store result from ``x`` in ``out``. 4019 lock: Lock-like or False 4020 Lock to use before writing to ``out``. 4021 return_stored: bool 4022 Whether to return ``out``. 4023 load_stored: bool 4024 Whether to return the array stored in ``out``. 4025 Ignored if ``return_stored`` is not ``True``. 4026 4027 Examples 4028 -------- 4029 4030 >>> a = np.ones((5, 6)) 4031 >>> b = np.empty(a.shape) 4032 >>> load_store_chunk(a, b, (slice(None), slice(None)), False, False, False) 4033 """ 4034 4035 result = None 4036 if return_stored and not load_stored: 4037 result = out 4038 4039 if lock: 4040 lock.acquire() 4041 try: 4042 if x is not None: 4043 if is_arraylike(x): 4044 out[index] = x 4045 else: 4046 out[index] = np.asanyarray(x) 4047 if return_stored and load_stored: 4048 result = out[index] 4049 finally: 4050 if lock: 4051 lock.release() 4052 4053 return result 4054 4055 4056def store_chunk(x, out, index, lock, return_stored): 4057 return load_store_chunk(x, out, index, lock, return_stored, False) 4058 4059 4060def load_chunk(out, index, lock): 4061 return load_store_chunk(None, out, index, lock, True, True) 4062 4063 4064def insert_to_ooc( 4065 keys: list, 4066 chunks: tuple[tuple[int, ...], ...], 4067 out, 4068 name: str, 4069 *, 4070 lock: Lock | bool = True, 4071 region: slice | None = None, 4072 return_stored: bool = False, 4073 load_stored: bool = False, 4074) -> dict: 4075 """ 4076 Creates a Dask graph for storing chunks from ``arr`` in ``out``. 4077 4078 Parameters 4079 ---------- 4080 keys: list 4081 Dask keys of the input array 4082 chunks: tuple 4083 Dask chunks of the input array 4084 out: array-like 4085 Where to store results to 4086 name: str 4087 First element of dask keys 4088 lock: Lock-like or bool, optional 4089 Whether to lock or with what (default is ``True``, 4090 which means a :class:`threading.Lock` instance). 4091 region: slice-like, optional 4092 Where in ``out`` to store ``arr``'s results 4093 (default is ``None``, meaning all of ``out``). 4094 return_stored: bool, optional 4095 Whether to return ``out`` 4096 (default is ``False``, meaning ``None`` is returned). 4097 load_stored: bool, optional 4098 Whether to handling loading from ``out`` at the same time. 4099 Ignored if ``return_stored`` is not ``True``. 4100 (default is ``False``, meaning defer to ``return_stored``). 4101 4102 Returns 4103 ------- 4104 dask graph of store operation 4105 4106 Examples 4107 -------- 4108 >>> import dask.array as da 4109 >>> d = da.ones((5, 6), chunks=(2, 3)) 4110 >>> a = np.empty(d.shape) 4111 >>> insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") # doctest: +SKIP 4112 """ 4113 4114 if lock is True: 4115 lock = Lock() 4116 4117 slices = slices_from_chunks(chunks) 4118 if region: 4119 slices = [fuse_slice(region, slc) for slc in slices] 4120 4121 if return_stored and load_stored: 4122 func = load_store_chunk 4123 args = (load_stored,) 4124 else: 4125 func = store_chunk 4126 args = () 4127 4128 dsk = { 4129 (name,) + t[1:]: (func, t, out, slc, lock, return_stored) + args 4130 for t, slc in zip(core.flatten(keys), slices) 4131 } 4132 return dsk 4133 4134 4135def retrieve_from_ooc( 4136 keys: Collection[Hashable], dsk_pre: Mapping, dsk_post: Mapping 4137) -> dict: 4138 """ 4139 Creates a Dask graph for loading stored ``keys`` from ``dsk``. 4140 4141 Parameters 4142 ---------- 4143 keys: Collection 4144 A sequence containing Dask graph keys to load 4145 dsk_pre: Mapping 4146 A Dask graph corresponding to a Dask Array before computation 4147 dsk_post: Mapping 4148 A Dask graph corresponding to a Dask Array after computation 4149 4150 Examples 4151 -------- 4152 >>> import dask.array as da 4153 >>> d = da.ones((5, 6), chunks=(2, 3)) 4154 >>> a = np.empty(d.shape) 4155 >>> g = insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") 4156 >>> retrieve_from_ooc(g.keys(), g, {k: k for k in g.keys()}) # doctest: +SKIP 4157 """ 4158 load_dsk = { 4159 ("load-" + k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1] 4160 for k in keys 4161 } 4162 4163 return load_dsk 4164 4165 4166def asarray( 4167 a, allow_unknown_chunksizes=False, dtype=None, order=None, *, like=None, **kwargs 4168): 4169 """Convert the input to a dask array. 4170 4171 Parameters 4172 ---------- 4173 a : array-like 4174 Input data, in any form that can be converted to a dask array. This 4175 includes lists, lists of tuples, tuples, tuples of tuples, tuples of 4176 lists and ndarrays. 4177 allow_unknown_chunksizes: bool 4178 Allow unknown chunksizes, such as come from converting from dask 4179 dataframes. Dask.array is unable to verify that chunks line up. If 4180 data comes from differently aligned sources then this can cause 4181 unexpected results. 4182 dtype : data-type, optional 4183 By default, the data-type is inferred from the input data. 4184 order : {‘C’, ‘F’, ‘A’, ‘K’}, optional 4185 Memory layout. ‘A’ and ‘K’ depend on the order of input array a. 4186 ‘C’ row-major (C-style), ‘F’ column-major (Fortran-style) memory 4187 representation. ‘A’ (any) means ‘F’ if a is Fortran contiguous, ‘C’ 4188 otherwise ‘K’ (keep) preserve input order. Defaults to ‘C’. 4189 like: array-like 4190 Reference object to allow the creation of Dask arrays with chunks 4191 that are not NumPy arrays. If an array-like passed in as ``like`` 4192 supports the ``__array_function__`` protocol, the chunk type of the 4193 resulting array will be definde by it. In this case, it ensures the 4194 creation of a Dask array compatible with that passed in via this 4195 argument. If ``like`` is a Dask array, the chunk type of the 4196 resulting array will be defined by the chunk type of ``like``. 4197 Requires NumPy 1.20.0 or higher. 4198 4199 Returns 4200 ------- 4201 out : dask array 4202 Dask array interpretation of a. 4203 4204 Examples 4205 -------- 4206 >>> import dask.array as da 4207 >>> import numpy as np 4208 >>> x = np.arange(3) 4209 >>> da.asarray(x) 4210 dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray> 4211 4212 >>> y = [[1, 2, 3], [4, 5, 6]] 4213 >>> da.asarray(y) 4214 dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3), chunktype=numpy.ndarray> 4215 """ 4216 if like is None: 4217 if isinstance(a, Array): 4218 return a 4219 elif hasattr(a, "to_dask_array"): 4220 return a.to_dask_array() 4221 elif type(a).__module__.split(".")[0] == "xarray" and hasattr(a, "data"): 4222 return asarray(a.data) 4223 elif isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a): 4224 return stack(a, allow_unknown_chunksizes=allow_unknown_chunksizes) 4225 elif not isinstance(getattr(a, "shape", None), Iterable): 4226 a = np.asarray(a, dtype=dtype, order=order) 4227 else: 4228 if not _numpy_120: 4229 raise RuntimeError("The use of ``like`` required NumPy >= 1.20") 4230 4231 like_meta = meta_from_array(like) 4232 if isinstance(a, Array): 4233 return a.map_blocks(np.asarray, like=like_meta, dtype=dtype, order=order) 4234 else: 4235 a = np.asarray(a, like=like_meta, dtype=dtype, order=order) 4236 return from_array(a, getitem=getter_inline, **kwargs) 4237 4238 4239def asanyarray(a, dtype=None, order=None, *, like=None): 4240 """Convert the input to a dask array. 4241 4242 Subclasses of ``np.ndarray`` will be passed through as chunks unchanged. 4243 4244 Parameters 4245 ---------- 4246 a : array-like 4247 Input data, in any form that can be converted to a dask array. This 4248 includes lists, lists of tuples, tuples, tuples of tuples, tuples of 4249 lists and ndarrays. 4250 dtype : data-type, optional 4251 By default, the data-type is inferred from the input data. 4252 order : {‘C’, ‘F’, ‘A’, ‘K’}, optional 4253 Memory layout. ‘A’ and ‘K’ depend on the order of input array a. 4254 ‘C’ row-major (C-style), ‘F’ column-major (Fortran-style) memory 4255 representation. ‘A’ (any) means ‘F’ if a is Fortran contiguous, ‘C’ 4256 otherwise ‘K’ (keep) preserve input order. Defaults to ‘C’. 4257 like: array-like 4258 Reference object to allow the creation of Dask arrays with chunks 4259 that are not NumPy arrays. If an array-like passed in as ``like`` 4260 supports the ``__array_function__`` protocol, the chunk type of the 4261 resulting array will be definde by it. In this case, it ensures the 4262 creation of a Dask array compatible with that passed in via this 4263 argument. If ``like`` is a Dask array, the chunk type of the 4264 resulting array will be defined by the chunk type of ``like``. 4265 Requires NumPy 1.20.0 or higher. 4266 4267 Returns 4268 ------- 4269 out : dask array 4270 Dask array interpretation of a. 4271 4272 Examples 4273 -------- 4274 >>> import dask.array as da 4275 >>> import numpy as np 4276 >>> x = np.arange(3) 4277 >>> da.asanyarray(x) 4278 dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray> 4279 4280 >>> y = [[1, 2, 3], [4, 5, 6]] 4281 >>> da.asanyarray(y) 4282 dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3), chunktype=numpy.ndarray> 4283 """ 4284 if like is None: 4285 if isinstance(a, Array): 4286 return a 4287 elif hasattr(a, "to_dask_array"): 4288 return a.to_dask_array() 4289 elif type(a).__module__.split(".")[0] == "xarray" and hasattr(a, "data"): 4290 return asanyarray(a.data) 4291 elif isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a): 4292 return stack(a) 4293 elif not isinstance(getattr(a, "shape", None), Iterable): 4294 a = np.asanyarray(a, dtype=dtype, order=order) 4295 else: 4296 if not _numpy_120: 4297 raise RuntimeError("The use of ``like`` required NumPy >= 1.20") 4298 4299 like_meta = meta_from_array(like) 4300 if isinstance(a, Array): 4301 return a.map_blocks(np.asanyarray, like=like_meta, dtype=dtype, order=order) 4302 else: 4303 a = np.asanyarray(a, like=like_meta, dtype=dtype, order=order) 4304 return from_array(a, chunks=a.shape, getitem=getter_inline, asarray=False) 4305 4306 4307def is_scalar_for_elemwise(arg): 4308 """ 4309 4310 >>> is_scalar_for_elemwise(42) 4311 True 4312 >>> is_scalar_for_elemwise('foo') 4313 True 4314 >>> is_scalar_for_elemwise(True) 4315 True 4316 >>> is_scalar_for_elemwise(np.array(42)) 4317 True 4318 >>> is_scalar_for_elemwise([1, 2, 3]) 4319 True 4320 >>> is_scalar_for_elemwise(np.array([1, 2, 3])) 4321 False 4322 >>> is_scalar_for_elemwise(from_array(np.array(0), chunks=())) 4323 False 4324 >>> is_scalar_for_elemwise(np.dtype('i4')) 4325 True 4326 """ 4327 # the second half of shape_condition is essentially just to ensure that 4328 # dask series / frame are treated as scalars in elemwise. 4329 maybe_shape = getattr(arg, "shape", None) 4330 shape_condition = not isinstance(maybe_shape, Iterable) or any( 4331 is_dask_collection(x) for x in maybe_shape 4332 ) 4333 4334 return ( 4335 np.isscalar(arg) 4336 or shape_condition 4337 or isinstance(arg, np.dtype) 4338 or (isinstance(arg, np.ndarray) and arg.ndim == 0) 4339 ) 4340 4341 4342def broadcast_shapes(*shapes): 4343 """ 4344 Determines output shape from broadcasting arrays. 4345 4346 Parameters 4347 ---------- 4348 shapes : tuples 4349 The shapes of the arguments. 4350 4351 Returns 4352 ------- 4353 output_shape : tuple 4354 4355 Raises 4356 ------ 4357 ValueError 4358 If the input shapes cannot be successfully broadcast together. 4359 """ 4360 if len(shapes) == 1: 4361 return shapes[0] 4362 out = [] 4363 for sizes in zip_longest(*map(reversed, shapes), fillvalue=-1): 4364 if np.isnan(sizes).any(): 4365 dim = np.nan 4366 else: 4367 dim = 0 if 0 in sizes else np.max(sizes) 4368 if any(i not in [-1, 0, 1, dim] and not np.isnan(i) for i in sizes): 4369 raise ValueError( 4370 "operands could not be broadcast together with " 4371 "shapes {}".format(" ".join(map(str, shapes))) 4372 ) 4373 out.append(dim) 4374 return tuple(reversed(out)) 4375 4376 4377def elemwise(op, *args, out=None, **kwargs): 4378 """Apply elementwise function across arguments 4379 4380 Respects broadcasting rules 4381 4382 Parameters 4383 ---------- 4384 out : dask array or None 4385 If out is a dask.array then this overwrites the contents of that array with 4386 the result 4387 4388 Examples 4389 -------- 4390 >>> elemwise(add, x, y) # doctest: +SKIP 4391 >>> elemwise(sin, x) # doctest: +SKIP 4392 >>> elemwise(sin, x, out=dask_array) # doctest: +SKIP 4393 4394 See Also 4395 -------- 4396 blockwise 4397 """ 4398 if not {"name", "dtype"}.issuperset(kwargs): 4399 msg = "%s does not take the following keyword arguments %s" 4400 raise TypeError( 4401 msg % (op.__name__, str(sorted(set(kwargs) - {"name", "dtype"}))) 4402 ) 4403 4404 args = [np.asarray(a) if isinstance(a, (list, tuple)) else a for a in args] 4405 4406 shapes = [] 4407 for arg in args: 4408 shape = getattr(arg, "shape", ()) 4409 if any(is_dask_collection(x) for x in shape): 4410 # Want to exclude Delayed shapes and dd.Scalar 4411 shape = () 4412 shapes.append(shape) 4413 4414 shapes = [s if isinstance(s, Iterable) else () for s in shapes] 4415 out_ndim = len( 4416 broadcast_shapes(*shapes) 4417 ) # Raises ValueError if dimensions mismatch 4418 expr_inds = tuple(range(out_ndim))[::-1] 4419 4420 need_enforce_dtype = False 4421 if "dtype" in kwargs: 4422 need_enforce_dtype = True 4423 dt = kwargs["dtype"] 4424 else: 4425 # We follow NumPy's rules for dtype promotion, which special cases 4426 # scalars and 0d ndarrays (which it considers equivalent) by using 4427 # their values to compute the result dtype: 4428 # https://github.com/numpy/numpy/issues/6240 4429 # We don't inspect the values of 0d dask arrays, because these could 4430 # hold potentially very expensive calculations. Instead, we treat 4431 # them just like other arrays, and if necessary cast the result of op 4432 # to match. 4433 vals = [ 4434 np.empty((1,) * max(1, a.ndim), dtype=a.dtype) 4435 if not is_scalar_for_elemwise(a) 4436 else a 4437 for a in args 4438 ] 4439 try: 4440 dt = apply_infer_dtype(op, vals, {}, "elemwise", suggest_dtype=False) 4441 except Exception: 4442 return NotImplemented 4443 need_enforce_dtype = any( 4444 not is_scalar_for_elemwise(a) and a.ndim == 0 for a in args 4445 ) 4446 4447 name = kwargs.get("name", None) or f"{funcname(op)}-{tokenize(op, dt, *args)}" 4448 4449 blockwise_kwargs = dict(dtype=dt, name=name, token=funcname(op).strip("_")) 4450 if need_enforce_dtype: 4451 blockwise_kwargs["enforce_dtype"] = dt 4452 blockwise_kwargs["enforce_dtype_function"] = op 4453 op = _enforce_dtype 4454 result = blockwise( 4455 op, 4456 expr_inds, 4457 *concat( 4458 (a, tuple(range(a.ndim)[::-1]) if not is_scalar_for_elemwise(a) else None) 4459 for a in args 4460 ), 4461 **blockwise_kwargs, 4462 ) 4463 4464 return handle_out(out, result) 4465 4466 4467def handle_out(out, result): 4468 """Handle out parameters 4469 4470 If out is a dask.array then this overwrites the contents of that array with 4471 the result 4472 """ 4473 if isinstance(out, tuple): 4474 if len(out) == 1: 4475 out = out[0] 4476 elif len(out) > 1: 4477 raise NotImplementedError("The out parameter is not fully supported") 4478 else: 4479 out = None 4480 if isinstance(out, Array): 4481 if out.shape != result.shape: 4482 raise ValueError( 4483 "Mismatched shapes between result and out parameter. " 4484 "out=%s, result=%s" % (str(out.shape), str(result.shape)) 4485 ) 4486 out._chunks = result.chunks 4487 out.dask = result.dask 4488 out._meta = result._meta 4489 out._name = result.name 4490 elif out is not None: 4491 msg = ( 4492 "The out parameter is not fully supported." 4493 " Received type %s, expected Dask Array" % type(out).__name__ 4494 ) 4495 raise NotImplementedError(msg) 4496 else: 4497 return result 4498 4499 4500def _enforce_dtype(*args, **kwargs): 4501 """Calls a function and converts its result to the given dtype. 4502 4503 The parameters have deliberately been given unwieldy names to avoid 4504 clashes with keyword arguments consumed by blockwise 4505 4506 A dtype of `object` is treated as a special case and not enforced, 4507 because it is used as a dummy value in some places when the result will 4508 not be a block in an Array. 4509 4510 Parameters 4511 ---------- 4512 enforce_dtype : dtype 4513 Result dtype 4514 enforce_dtype_function : callable 4515 The wrapped function, which will be passed the remaining arguments 4516 """ 4517 dtype = kwargs.pop("enforce_dtype") 4518 function = kwargs.pop("enforce_dtype_function") 4519 4520 result = function(*args, **kwargs) 4521 if hasattr(result, "dtype") and dtype != result.dtype and dtype != object: 4522 if not np.can_cast(result, dtype, casting="same_kind"): 4523 raise ValueError( 4524 "Inferred dtype from function %r was %r " 4525 "but got %r, which can't be cast using " 4526 "casting='same_kind'" 4527 % (funcname(function), str(dtype), str(result.dtype)) 4528 ) 4529 if np.isscalar(result): 4530 # scalar astype method doesn't take the keyword arguments, so 4531 # have to convert via 0-dimensional array and back. 4532 result = result.astype(dtype) 4533 else: 4534 try: 4535 result = result.astype(dtype, copy=False) 4536 except TypeError: 4537 # Missing copy kwarg 4538 result = result.astype(dtype) 4539 return result 4540 4541 4542def broadcast_to(x, shape, chunks=None, meta=None): 4543 """Broadcast an array to a new shape. 4544 4545 Parameters 4546 ---------- 4547 x : array_like 4548 The array to broadcast. 4549 shape : tuple 4550 The shape of the desired array. 4551 chunks : tuple, optional 4552 If provided, then the result will use these chunks instead of the same 4553 chunks as the source array. Setting chunks explicitly as part of 4554 broadcast_to is more efficient than rechunking afterwards. Chunks are 4555 only allowed to differ from the original shape along dimensions that 4556 are new on the result or have size 1 the input array. 4557 meta : empty ndarray 4558 empty ndarray created with same NumPy backend, ndim and dtype as the 4559 Dask Array being created (overrides dtype) 4560 4561 Returns 4562 ------- 4563 broadcast : dask array 4564 4565 See Also 4566 -------- 4567 :func:`numpy.broadcast_to` 4568 """ 4569 x = asarray(x) 4570 shape = tuple(shape) 4571 4572 if meta is None: 4573 meta = meta_from_array(x) 4574 4575 if x.shape == shape and (chunks is None or chunks == x.chunks): 4576 return x 4577 4578 ndim_new = len(shape) - x.ndim 4579 if ndim_new < 0 or any( 4580 new != old for new, old in zip(shape[ndim_new:], x.shape) if old != 1 4581 ): 4582 raise ValueError(f"cannot broadcast shape {x.shape} to shape {shape}") 4583 4584 if chunks is None: 4585 chunks = tuple((s,) for s in shape[:ndim_new]) + tuple( 4586 bd if old > 1 else (new,) 4587 for bd, old, new in zip(x.chunks, x.shape, shape[ndim_new:]) 4588 ) 4589 else: 4590 chunks = normalize_chunks( 4591 chunks, shape, dtype=x.dtype, previous_chunks=x.chunks 4592 ) 4593 for old_bd, new_bd in zip(x.chunks, chunks[ndim_new:]): 4594 if old_bd != new_bd and old_bd != (1,): 4595 raise ValueError( 4596 "cannot broadcast chunks %s to chunks %s: " 4597 "new chunks must either be along a new " 4598 "dimension or a dimension of size 1" % (x.chunks, chunks) 4599 ) 4600 4601 name = "broadcast_to-" + tokenize(x, shape, chunks) 4602 dsk = {} 4603 4604 enumerated_chunks = product(*(enumerate(bds) for bds in chunks)) 4605 for new_index, chunk_shape in (zip(*ec) for ec in enumerated_chunks): 4606 old_index = tuple( 4607 0 if bd == (1,) else i for bd, i in zip(x.chunks, new_index[ndim_new:]) 4608 ) 4609 old_key = (x.name,) + old_index 4610 new_key = (name,) + new_index 4611 dsk[new_key] = (np.broadcast_to, old_key, quote(chunk_shape)) 4612 4613 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x]) 4614 return Array(graph, name, chunks, dtype=x.dtype, meta=meta) 4615 4616 4617@derived_from(np) 4618def broadcast_arrays(*args, subok=False): 4619 subok = bool(subok) 4620 4621 to_array = asanyarray if subok else asarray 4622 args = tuple(to_array(e) for e in args) 4623 4624 # Unify uneven chunking 4625 inds = [list(reversed(range(x.ndim))) for x in args] 4626 uc_args = concat(zip(args, inds)) 4627 _, args = unify_chunks(*uc_args, warn=False) 4628 4629 shape = broadcast_shapes(*(e.shape for e in args)) 4630 chunks = broadcast_chunks(*(e.chunks for e in args)) 4631 4632 result = [broadcast_to(e, shape=shape, chunks=chunks) for e in args] 4633 4634 return result 4635 4636 4637def offset_func(func, offset, *args): 4638 """Offsets inputs by offset 4639 4640 >>> double = lambda x: x * 2 4641 >>> f = offset_func(double, (10,)) 4642 >>> f(1) 4643 22 4644 >>> f(300) 4645 620 4646 """ 4647 4648 def _offset(*args): 4649 args2 = list(map(add, args, offset)) 4650 return func(*args2) 4651 4652 with contextlib.suppress(Exception): 4653 _offset.__name__ = "offset_" + func.__name__ 4654 4655 return _offset 4656 4657 4658def chunks_from_arrays(arrays): 4659 """Chunks tuple from nested list of arrays 4660 4661 >>> x = np.array([1, 2]) 4662 >>> chunks_from_arrays([x, x]) 4663 ((2, 2),) 4664 4665 >>> x = np.array([[1, 2]]) 4666 >>> chunks_from_arrays([[x], [x]]) 4667 ((1, 1), (2,)) 4668 4669 >>> x = np.array([[1, 2]]) 4670 >>> chunks_from_arrays([[x, x]]) 4671 ((1,), (2, 2)) 4672 4673 >>> chunks_from_arrays([1, 1]) 4674 ((1, 1),) 4675 """ 4676 if not arrays: 4677 return () 4678 result = [] 4679 dim = 0 4680 4681 def shape(x): 4682 try: 4683 return x.shape 4684 except AttributeError: 4685 return (1,) 4686 4687 while isinstance(arrays, (list, tuple)): 4688 result.append(tuple(shape(deepfirst(a))[dim] for a in arrays)) 4689 arrays = arrays[0] 4690 dim += 1 4691 return tuple(result) 4692 4693 4694def deepfirst(seq): 4695 """First element in a nested list 4696 4697 >>> deepfirst([[[1, 2], [3, 4]], [5, 6], [7, 8]]) 4698 1 4699 """ 4700 if not isinstance(seq, (list, tuple)): 4701 return seq 4702 else: 4703 return deepfirst(seq[0]) 4704 4705 4706def shapelist(a): 4707 """Get the shape of nested list""" 4708 if type(a) is list: 4709 return tuple([len(a)] + list(shapelist(a[0]))) 4710 else: 4711 return () 4712 4713 4714def transposelist(arrays, axes, extradims=0): 4715 """Permute axes of nested list 4716 4717 >>> transposelist([[1,1,1],[1,1,1]], [2,1]) 4718 [[[1, 1], [1, 1], [1, 1]]] 4719 4720 >>> transposelist([[1,1,1],[1,1,1]], [2,1], extradims=1) 4721 [[[[1], [1]], [[1], [1]], [[1], [1]]]] 4722 """ 4723 if len(axes) != ndimlist(arrays): 4724 raise ValueError("Length of axes should equal depth of nested arrays") 4725 if extradims < 0: 4726 raise ValueError("`newdims` should be positive") 4727 if len(axes) > len(set(axes)): 4728 raise ValueError("`axes` should be unique") 4729 4730 ndim = max(axes) + 1 4731 shape = shapelist(arrays) 4732 newshape = [ 4733 shape[axes.index(i)] if i in axes else 1 for i in range(ndim + extradims) 4734 ] 4735 4736 result = list(core.flatten(arrays)) 4737 return reshapelist(newshape, result) 4738 4739 4740def stack(seq, axis=0, allow_unknown_chunksizes=False): 4741 """ 4742 Stack arrays along a new axis 4743 4744 Given a sequence of dask arrays, form a new dask array by stacking them 4745 along a new dimension (axis=0 by default) 4746 4747 Parameters 4748 ---------- 4749 seq: list of dask.arrays 4750 axis: int 4751 Dimension along which to align all of the arrays 4752 allow_unknown_chunksizes: bool 4753 Allow unknown chunksizes, such as come from converting from dask 4754 dataframes. Dask.array is unable to verify that chunks line up. If 4755 data comes from differently aligned sources then this can cause 4756 unexpected results. 4757 4758 Examples 4759 -------- 4760 4761 Create slices 4762 4763 >>> import dask.array as da 4764 >>> import numpy as np 4765 4766 >>> data = [da.from_array(np.ones((4, 4)), chunks=(2, 2)) 4767 ... for i in range(3)] 4768 4769 >>> x = da.stack(data, axis=0) 4770 >>> x.shape 4771 (3, 4, 4) 4772 4773 >>> da.stack(data, axis=1).shape 4774 (4, 3, 4) 4775 4776 >>> da.stack(data, axis=-1).shape 4777 (4, 4, 3) 4778 4779 Result is a new dask Array 4780 4781 See Also 4782 -------- 4783 concatenate 4784 """ 4785 from . import wrap 4786 4787 seq = [asarray(a, allow_unknown_chunksizes=allow_unknown_chunksizes) for a in seq] 4788 4789 if not seq: 4790 raise ValueError("Need array(s) to stack") 4791 if not allow_unknown_chunksizes and not all(x.shape == seq[0].shape for x in seq): 4792 idx = first(i for i in enumerate(seq) if i[1].shape != seq[0].shape) 4793 raise ValueError( 4794 "Stacked arrays must have the same shape. The first array had shape " 4795 f"{seq[0].shape}, while array {idx[0] + 1} has shape {idx[1].shape}." 4796 ) 4797 4798 meta = np.stack([meta_from_array(a) for a in seq], axis=axis) 4799 seq = [x.astype(meta.dtype) for x in seq] 4800 4801 ndim = meta.ndim - 1 4802 if axis < 0: 4803 axis = ndim + axis + 1 4804 shape = tuple( 4805 len(seq) 4806 if i == axis 4807 else (seq[0].shape[i] if i < axis else seq[0].shape[i - 1]) 4808 for i in range(meta.ndim) 4809 ) 4810 4811 seq2 = [a for a in seq if a.size] 4812 if not seq2: 4813 seq2 = seq 4814 4815 n = len(seq2) 4816 if n == 0: 4817 try: 4818 return wrap.empty_like(meta, shape=shape, chunks=shape, dtype=meta.dtype) 4819 except TypeError: 4820 return wrap.empty(shape, chunks=shape, dtype=meta.dtype) 4821 4822 ind = list(range(ndim)) 4823 uc_args = list(concat((x, ind) for x in seq2)) 4824 _, seq2 = unify_chunks(*uc_args) 4825 4826 assert len({a.chunks for a in seq2}) == 1 # same chunks 4827 chunks = seq2[0].chunks[:axis] + ((1,) * n,) + seq2[0].chunks[axis:] 4828 4829 names = [a.name for a in seq2] 4830 name = "stack-" + tokenize(names, axis) 4831 keys = list(product([name], *[range(len(bd)) for bd in chunks])) 4832 4833 inputs = [ 4834 (names[key[axis + 1]],) + key[1 : axis + 1] + key[axis + 2 :] for key in keys 4835 ] 4836 values = [ 4837 ( 4838 getitem, 4839 inp, 4840 (slice(None, None, None),) * axis 4841 + (None,) 4842 + (slice(None, None, None),) * (ndim - axis), 4843 ) 4844 for inp in inputs 4845 ] 4846 4847 layer = dict(zip(keys, values)) 4848 graph = HighLevelGraph.from_collections(name, layer, dependencies=seq2) 4849 4850 return Array(graph, name, chunks, meta=meta) 4851 4852 4853def concatenate3(arrays): 4854 """Recursive np.concatenate 4855 4856 Input should be a nested list of numpy arrays arranged in the order they 4857 should appear in the array itself. Each array should have the same number 4858 of dimensions as the desired output and the nesting of the lists. 4859 4860 >>> x = np.array([[1, 2]]) 4861 >>> concatenate3([[x, x, x], [x, x, x]]) 4862 array([[1, 2, 1, 2, 1, 2], 4863 [1, 2, 1, 2, 1, 2]]) 4864 4865 >>> concatenate3([[x, x], [x, x], [x, x]]) 4866 array([[1, 2, 1, 2], 4867 [1, 2, 1, 2], 4868 [1, 2, 1, 2]]) 4869 """ 4870 # We need this as __array_function__ may not exist on older NumPy versions. 4871 # And to reduce verbosity. 4872 NDARRAY_ARRAY_FUNCTION = getattr(np.ndarray, "__array_function__", None) 4873 4874 arrays = concrete(arrays) 4875 if not arrays: 4876 return np.empty(0) 4877 4878 advanced = max( 4879 core.flatten(arrays, container=(list, tuple)), 4880 key=lambda x: getattr(x, "__array_priority__", 0), 4881 ) 4882 4883 if not all( 4884 NDARRAY_ARRAY_FUNCTION 4885 is getattr(type(arr), "__array_function__", NDARRAY_ARRAY_FUNCTION) 4886 for arr in core.flatten(arrays, container=(list, tuple)) 4887 ): 4888 try: 4889 x = unpack_singleton(arrays) 4890 return _concatenate2(arrays, axes=tuple(range(x.ndim))) 4891 except TypeError: 4892 pass 4893 4894 if concatenate_lookup.dispatch(type(advanced)) is not np.concatenate: 4895 x = unpack_singleton(arrays) 4896 return _concatenate2(arrays, axes=list(range(x.ndim))) 4897 4898 ndim = ndimlist(arrays) 4899 if not ndim: 4900 return arrays 4901 chunks = chunks_from_arrays(arrays) 4902 shape = tuple(map(sum, chunks)) 4903 4904 def dtype(x): 4905 try: 4906 return x.dtype 4907 except AttributeError: 4908 return type(x) 4909 4910 result = np.empty(shape=shape, dtype=dtype(deepfirst(arrays))) 4911 4912 for (idx, arr) in zip( 4913 slices_from_chunks(chunks), core.flatten(arrays, container=(list, tuple)) 4914 ): 4915 if hasattr(arr, "ndim"): 4916 while arr.ndim < ndim: 4917 arr = arr[None, ...] 4918 result[idx] = arr 4919 4920 return result 4921 4922 4923def concatenate_axes(arrays, axes): 4924 """Recursively call np.concatenate along axes""" 4925 if len(axes) != ndimlist(arrays): 4926 raise ValueError("Length of axes should equal depth of nested arrays") 4927 4928 extradims = max(0, deepfirst(arrays).ndim - (max(axes) + 1)) 4929 return concatenate3(transposelist(arrays, axes, extradims=extradims)) 4930 4931 4932def to_hdf5(filename, *args, chunks=True, **kwargs): 4933 """Store arrays in HDF5 file 4934 4935 This saves several dask arrays into several datapaths in an HDF5 file. 4936 It creates the necessary datasets and handles clean file opening/closing. 4937 4938 Parameters 4939 ---------- 4940 chunks: tuple or ``True`` 4941 Chunk shape, or ``True`` to pass the chunks from the dask array. 4942 Defaults to ``True``. 4943 4944 Examples 4945 -------- 4946 4947 >>> da.to_hdf5('myfile.hdf5', '/x', x) # doctest: +SKIP 4948 4949 or 4950 4951 >>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP 4952 4953 Optionally provide arguments as though to ``h5py.File.create_dataset`` 4954 4955 >>> da.to_hdf5('myfile.hdf5', '/x', x, compression='lzf', shuffle=True) # doctest: +SKIP 4956 4957 >>> da.to_hdf5('myfile.hdf5', '/x', x, chunks=(10,20,30)) # doctest: +SKIP 4958 4959 This can also be used as a method on a single Array 4960 4961 >>> x.to_hdf5('myfile.hdf5', '/x') # doctest: +SKIP 4962 4963 See Also 4964 -------- 4965 da.store 4966 h5py.File.create_dataset 4967 """ 4968 if len(args) == 1 and isinstance(args[0], dict): 4969 data = args[0] 4970 elif len(args) == 2 and isinstance(args[0], str) and isinstance(args[1], Array): 4971 data = {args[0]: args[1]} 4972 else: 4973 raise ValueError("Please provide {'/data/path': array} dictionary") 4974 4975 import h5py 4976 4977 with h5py.File(filename, mode="a") as f: 4978 dsets = [ 4979 f.require_dataset( 4980 dp, 4981 shape=x.shape, 4982 dtype=x.dtype, 4983 chunks=tuple(c[0] for c in x.chunks) if chunks is True else chunks, 4984 **kwargs, 4985 ) 4986 for dp, x in data.items() 4987 ] 4988 store(list(data.values()), dsets) 4989 4990 4991def interleave_none(a, b): 4992 """ 4993 4994 >>> interleave_none([0, None, 2, None], [1, 3]) 4995 (0, 1, 2, 3) 4996 """ 4997 result = [] 4998 i = j = 0 4999 n = len(a) + len(b) 5000 while i + j < n: 5001 if a[i] is not None: 5002 result.append(a[i]) 5003 i += 1 5004 else: 5005 result.append(b[j]) 5006 i += 1 5007 j += 1 5008 return tuple(result) 5009 5010 5011def keyname(name, i, okey): 5012 """ 5013 5014 >>> keyname('x', 3, [None, None, 0, 2]) 5015 ('x', 3, 0, 2) 5016 """ 5017 return (name, i) + tuple(k for k in okey if k is not None) 5018 5019 5020def _vindex(x, *indexes): 5021 """Point wise indexing with broadcasting. 5022 5023 >>> x = np.arange(56).reshape((7, 8)) 5024 >>> x 5025 array([[ 0, 1, 2, 3, 4, 5, 6, 7], 5026 [ 8, 9, 10, 11, 12, 13, 14, 15], 5027 [16, 17, 18, 19, 20, 21, 22, 23], 5028 [24, 25, 26, 27, 28, 29, 30, 31], 5029 [32, 33, 34, 35, 36, 37, 38, 39], 5030 [40, 41, 42, 43, 44, 45, 46, 47], 5031 [48, 49, 50, 51, 52, 53, 54, 55]]) 5032 5033 >>> d = from_array(x, chunks=(3, 4)) 5034 >>> result = _vindex(d, [0, 1, 6, 0], [0, 1, 0, 7]) 5035 >>> result.compute() 5036 array([ 0, 9, 48, 7]) 5037 """ 5038 indexes = replace_ellipsis(x.ndim, indexes) 5039 5040 nonfancy_indexes = [] 5041 reduced_indexes = [] 5042 for i, ind in enumerate(indexes): 5043 if isinstance(ind, Number): 5044 nonfancy_indexes.append(ind) 5045 elif isinstance(ind, slice): 5046 nonfancy_indexes.append(ind) 5047 reduced_indexes.append(slice(None)) 5048 else: 5049 nonfancy_indexes.append(slice(None)) 5050 reduced_indexes.append(ind) 5051 5052 nonfancy_indexes = tuple(nonfancy_indexes) 5053 reduced_indexes = tuple(reduced_indexes) 5054 5055 x = x[nonfancy_indexes] 5056 5057 array_indexes = {} 5058 for i, (ind, size) in enumerate(zip(reduced_indexes, x.shape)): 5059 if not isinstance(ind, slice): 5060 ind = np.array(ind, copy=True) 5061 if ind.dtype.kind == "b": 5062 raise IndexError("vindex does not support indexing with boolean arrays") 5063 if ((ind >= size) | (ind < -size)).any(): 5064 raise IndexError( 5065 "vindex key has entries out of bounds for " 5066 "indexing along axis %s of size %s: %r" % (i, size, ind) 5067 ) 5068 ind %= size 5069 array_indexes[i] = ind 5070 5071 if array_indexes: 5072 x = _vindex_array(x, array_indexes) 5073 5074 return x 5075 5076 5077def _vindex_array(x, dict_indexes): 5078 """Point wise indexing with only NumPy Arrays.""" 5079 5080 try: 5081 broadcast_indexes = np.broadcast_arrays(*dict_indexes.values()) 5082 except ValueError as e: 5083 # note: error message exactly matches numpy 5084 shapes_str = " ".join(str(a.shape) for a in dict_indexes.values()) 5085 raise IndexError( 5086 "shape mismatch: indexing arrays could not be " 5087 "broadcast together with shapes " + shapes_str 5088 ) from e 5089 broadcast_shape = broadcast_indexes[0].shape 5090 5091 lookup = dict(zip(dict_indexes, broadcast_indexes)) 5092 flat_indexes = [ 5093 lookup[i].ravel().tolist() if i in lookup else None for i in range(x.ndim) 5094 ] 5095 flat_indexes.extend([None] * (x.ndim - len(flat_indexes))) 5096 5097 flat_indexes = [ 5098 list(index) if index is not None else index for index in flat_indexes 5099 ] 5100 bounds = [list(accumulate(add, (0,) + c)) for c in x.chunks] 5101 bounds2 = [b for i, b in zip(flat_indexes, bounds) if i is not None] 5102 axis = _get_axis(flat_indexes) 5103 token = tokenize(x, flat_indexes) 5104 out_name = "vindex-merge-" + token 5105 5106 points = list() 5107 for i, idx in enumerate(zip(*[i for i in flat_indexes if i is not None])): 5108 block_idx = [bisect(b, ind) - 1 for b, ind in zip(bounds2, idx)] 5109 inblock_idx = [ 5110 ind - bounds2[k][j] for k, (ind, j) in enumerate(zip(idx, block_idx)) 5111 ] 5112 points.append((i, tuple(block_idx), tuple(inblock_idx))) 5113 5114 chunks = [c for i, c in zip(flat_indexes, x.chunks) if i is None] 5115 chunks.insert(0, (len(points),) if points else (0,)) 5116 chunks = tuple(chunks) 5117 5118 if points: 5119 per_block = groupby(1, points) 5120 per_block = {k: v for k, v in per_block.items() if v} 5121 5122 other_blocks = list( 5123 product( 5124 *[ 5125 list(range(len(c))) if i is None else [None] 5126 for i, c in zip(flat_indexes, x.chunks) 5127 ] 5128 ) 5129 ) 5130 5131 full_slices = [slice(None, None) if i is None else None for i in flat_indexes] 5132 5133 name = "vindex-slice-" + token 5134 vindex_merge_name = "vindex-merge-" + token 5135 dsk = {} 5136 for okey in other_blocks: 5137 for i, key in enumerate(per_block): 5138 dsk[keyname(name, i, okey)] = ( 5139 _vindex_transpose, 5140 ( 5141 _vindex_slice, 5142 (x.name,) + interleave_none(okey, key), 5143 interleave_none( 5144 full_slices, list(zip(*pluck(2, per_block[key]))) 5145 ), 5146 ), 5147 axis, 5148 ) 5149 dsk[keyname(vindex_merge_name, 0, okey)] = ( 5150 _vindex_merge, 5151 [list(pluck(0, per_block[key])) for key in per_block], 5152 [keyname(name, i, okey) for i in range(len(per_block))], 5153 ) 5154 5155 result_1d = Array( 5156 HighLevelGraph.from_collections(out_name, dsk, dependencies=[x]), 5157 out_name, 5158 chunks, 5159 x.dtype, 5160 meta=x._meta, 5161 ) 5162 return result_1d.reshape(broadcast_shape + result_1d.shape[1:]) 5163 5164 # output has a zero dimension, just create a new zero-shape array with the 5165 # same dtype 5166 from .wrap import empty 5167 5168 result_1d = empty( 5169 tuple(map(sum, chunks)), chunks=chunks, dtype=x.dtype, name=out_name 5170 ) 5171 return result_1d.reshape(broadcast_shape + result_1d.shape[1:]) 5172 5173 5174def _get_axis(indexes): 5175 """Get axis along which point-wise slicing results lie 5176 5177 This is mostly a hack because I can't figure out NumPy's rule on this and 5178 can't be bothered to go reading. 5179 5180 >>> _get_axis([[1, 2], None, [1, 2], None]) 5181 0 5182 >>> _get_axis([None, [1, 2], [1, 2], None]) 5183 1 5184 >>> _get_axis([None, None, [1, 2], [1, 2]]) 5185 2 5186 """ 5187 ndim = len(indexes) 5188 indexes = [slice(None, None) if i is None else [0] for i in indexes] 5189 x = np.empty((2,) * ndim) 5190 x2 = x[tuple(indexes)] 5191 return x2.shape.index(1) 5192 5193 5194def _vindex_slice(block, points): 5195 """Pull out point-wise slices from block""" 5196 points = [p if isinstance(p, slice) else list(p) for p in points] 5197 return block[tuple(points)] 5198 5199 5200def _vindex_transpose(block, axis): 5201 """Rotate block so that points are on the first dimension""" 5202 axes = [axis] + list(range(axis)) + list(range(axis + 1, block.ndim)) 5203 return block.transpose(axes) 5204 5205 5206def _vindex_merge(locations, values): 5207 """ 5208 5209 >>> locations = [0], [2, 1] 5210 >>> values = [np.array([[1, 2, 3]]), 5211 ... np.array([[10, 20, 30], [40, 50, 60]])] 5212 5213 >>> _vindex_merge(locations, values) 5214 array([[ 1, 2, 3], 5215 [40, 50, 60], 5216 [10, 20, 30]]) 5217 """ 5218 locations = list(map(list, locations)) 5219 values = list(values) 5220 5221 n = sum(map(len, locations)) 5222 5223 shape = list(values[0].shape) 5224 shape[0] = n 5225 shape = tuple(shape) 5226 5227 dtype = values[0].dtype 5228 5229 x = np.empty_like(values[0], dtype=dtype, shape=shape) 5230 5231 ind = [slice(None, None) for i in range(x.ndim)] 5232 for loc, val in zip(locations, values): 5233 ind[0] = loc 5234 x[tuple(ind)] = val 5235 5236 return x 5237 5238 5239def to_npy_stack(dirname, x, axis=0): 5240 """Write dask array to a stack of .npy files 5241 5242 This partitions the dask.array along one axis and stores each block along 5243 that axis as a single .npy file in the specified directory 5244 5245 Examples 5246 -------- 5247 >>> x = da.ones((5, 10, 10), chunks=(2, 4, 4)) # doctest: +SKIP 5248 >>> da.to_npy_stack('data/', x, axis=0) # doctest: +SKIP 5249 5250 The ``.npy`` files store numpy arrays for ``x[0:2], x[2:4], and x[4:5]`` 5251 respectively, as is specified by the chunk size along the zeroth axis:: 5252 5253 $ tree data/ 5254 data/ 5255 |-- 0.npy 5256 |-- 1.npy 5257 |-- 2.npy 5258 |-- info 5259 5260 The ``info`` file stores the dtype, chunks, and axis information of the array. 5261 You can load these stacks with the :func:`dask.array.from_npy_stack` function. 5262 5263 >>> y = da.from_npy_stack('data/') # doctest: +SKIP 5264 5265 See Also 5266 -------- 5267 from_npy_stack 5268 """ 5269 5270 chunks = tuple((c if i == axis else (sum(c),)) for i, c in enumerate(x.chunks)) 5271 xx = x.rechunk(chunks) 5272 5273 if not os.path.exists(dirname): 5274 os.mkdir(dirname) 5275 5276 meta = {"chunks": chunks, "dtype": x.dtype, "axis": axis} 5277 5278 with open(os.path.join(dirname, "info"), "wb") as f: 5279 pickle.dump(meta, f) 5280 5281 name = "to-npy-stack-" + str(uuid.uuid1()) 5282 dsk = { 5283 (name, i): (np.save, os.path.join(dirname, "%d.npy" % i), key) 5284 for i, key in enumerate(core.flatten(xx.__dask_keys__())) 5285 } 5286 5287 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[xx]) 5288 compute_as_if_collection(Array, graph, list(dsk)) 5289 5290 5291def from_npy_stack(dirname, mmap_mode="r"): 5292 """Load dask array from stack of npy files 5293 5294 See :func:`dask.array.to_npy_stack` for docstring. 5295 5296 Parameters 5297 ---------- 5298 dirname: string 5299 Directory of .npy files 5300 mmap_mode: (None or 'r') 5301 Read data in memory map mode 5302 """ 5303 with open(os.path.join(dirname, "info"), "rb") as f: 5304 info = pickle.load(f) 5305 5306 dtype = info["dtype"] 5307 chunks = info["chunks"] 5308 axis = info["axis"] 5309 5310 name = "from-npy-stack-%s" % dirname 5311 keys = list(product([name], *[range(len(c)) for c in chunks])) 5312 values = [ 5313 (np.load, os.path.join(dirname, "%d.npy" % i), mmap_mode) 5314 for i in range(len(chunks[axis])) 5315 ] 5316 dsk = dict(zip(keys, values)) 5317 5318 return Array(dsk, name, chunks, dtype) 5319 5320 5321def new_da_object(dsk, name, chunks, meta=None, dtype=None): 5322 """Generic constructor for dask.array or dask.dataframe objects. 5323 5324 Decides the appropriate output class based on the type of `meta` provided. 5325 """ 5326 if is_dataframe_like(meta) or is_series_like(meta) or is_index_like(meta): 5327 from ..dataframe.core import new_dd_object 5328 5329 assert all(len(c) == 1 for c in chunks[1:]) 5330 divisions = [None] * (len(chunks[0]) + 1) 5331 return new_dd_object(dsk, name, meta, divisions) 5332 else: 5333 return Array(dsk, name=name, chunks=chunks, meta=meta, dtype=dtype) 5334 5335 5336class BlockView: 5337 """An array-like interface to the blocks of an array. 5338 5339 ``BlockView`` provides an array-like interface 5340 to the blocks of a dask array. Numpy-style indexing of a 5341 ``BlockView`` returns a selection of blocks as a new dask array. 5342 5343 You can index ``BlockView`` like a numpy array of shape 5344 equal to the number of blocks in each dimension, (available as 5345 array.blocks.size). The dimensionality of the output array matches 5346 the dimension of this array, even if integer indices are passed. 5347 Slicing with ``np.newaxis`` or multiple lists is not supported. 5348 5349 Examples 5350 -------- 5351 >>> import dask.array as da 5352 >>> from dask.array.core import BlockView 5353 >>> x = da.arange(8, chunks=2) 5354 >>> bv = BlockView(x) 5355 >>> bv.shape # aliases x.numblocks 5356 (4,) 5357 >>> bv.size 5358 4 5359 >>> bv[0].compute() 5360 array([0, 1]) 5361 >>> bv[:3].compute() 5362 array([0, 1, 2, 3, 4, 5]) 5363 >>> bv[::2].compute() 5364 array([0, 1, 4, 5]) 5365 >>> bv[[-1, 0]].compute() 5366 array([6, 7, 0, 1]) 5367 >>> bv.ravel() # doctest: +NORMALIZE_WHITESPACE 5368 [dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 5369 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 5370 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>, 5371 dask.array<blocks, shape=(2,), dtype=int64, chunksize=(2,), chunktype=numpy.ndarray>] 5372 5373 Returns 5374 ------- 5375 An instance of ``da.array.Blockview`` 5376 """ 5377 5378 def __init__(self, array: Array) -> BlockView: 5379 self._array = array 5380 5381 def __getitem__( 5382 self, index: int | Sequence[int] | slice | Sequence[slice] 5383 ) -> Array: 5384 from .slicing import normalize_index 5385 5386 if not isinstance(index, tuple): 5387 index = (index,) 5388 if sum(isinstance(ind, (np.ndarray, list)) for ind in index) > 1: 5389 raise ValueError("Can only slice with a single list") 5390 if any(ind is None for ind in index): 5391 raise ValueError("Slicing with np.newaxis or None is not supported") 5392 index = normalize_index(index, self._array.numblocks) 5393 index = tuple(slice(k, k + 1) if isinstance(k, Number) else k for k in index) 5394 5395 name = "blocks-" + tokenize(self._array, index) 5396 5397 new_keys = self._array._key_array[index] 5398 5399 chunks = tuple( 5400 tuple(np.array(c)[i].tolist()) for c, i in zip(self._array.chunks, index) 5401 ) 5402 5403 keys = product(*(range(len(c)) for c in chunks)) 5404 5405 layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} 5406 5407 graph = HighLevelGraph.from_collections(name, layer, dependencies=[self._array]) 5408 return Array(graph, name, chunks, meta=self._array) 5409 5410 def __eq__(self, other: Any) -> bool: 5411 if isinstance(other, BlockView): 5412 return self._array is other._array 5413 else: 5414 return NotImplemented 5415 5416 @property 5417 def size(self) -> int: 5418 """ 5419 The total number of blocks in the array. 5420 """ 5421 return np.prod(self.shape) 5422 5423 @property 5424 def shape(self) -> tuple[int, ...]: 5425 """ 5426 The number of blocks per axis. Alias of ``dask.array.numblocks``. 5427 """ 5428 return self._array.numblocks 5429 5430 def ravel(self) -> list[Array]: 5431 """ 5432 Return a flattened list of all the blocks in the array in C order. 5433 """ 5434 return [self[idx] for idx in np.ndindex(self.shape)] 5435 5436 5437from .blockwise import blockwise 5438from .utils import compute_meta, meta_from_array 5439