1import os 2from glob import glob 3from io import BytesIO 4from numbers import Number 5from typing import ( 6 TYPE_CHECKING, 7 Callable, 8 Dict, 9 Hashable, 10 Iterable, 11 Mapping, 12 MutableMapping, 13 Optional, 14 Tuple, 15 Union, 16) 17 18import numpy as np 19 20from .. import backends, coding, conventions 21from ..core import indexing 22from ..core.combine import ( 23 _infer_concat_order_from_positions, 24 _nested_combine, 25 combine_by_coords, 26) 27from ..core.dataarray import DataArray 28from ..core.dataset import Dataset, _get_chunk, _maybe_chunk 29from ..core.utils import is_remote_uri 30from . import plugins 31from .common import AbstractDataStore, ArrayWriter, _normalize_path 32from .locks import _get_scheduler 33 34if TYPE_CHECKING: 35 try: 36 from dask.delayed import Delayed 37 except ImportError: 38 Delayed = None 39 40 41DATAARRAY_NAME = "__xarray_dataarray_name__" 42DATAARRAY_VARIABLE = "__xarray_dataarray_variable__" 43 44ENGINES = { 45 "netcdf4": backends.NetCDF4DataStore.open, 46 "scipy": backends.ScipyDataStore, 47 "pydap": backends.PydapDataStore.open, 48 "h5netcdf": backends.H5NetCDFStore.open, 49 "pynio": backends.NioDataStore, 50 "pseudonetcdf": backends.PseudoNetCDFDataStore.open, 51 "cfgrib": backends.CfGribDataStore, 52 "zarr": backends.ZarrStore.open_group, 53} 54 55 56def _get_default_engine_remote_uri(): 57 try: 58 import netCDF4 # noqa: F401 59 60 engine = "netcdf4" 61 except ImportError: # pragma: no cover 62 try: 63 import pydap # noqa: F401 64 65 engine = "pydap" 66 except ImportError: 67 raise ValueError( 68 "netCDF4 or pydap is required for accessing " 69 "remote datasets via OPeNDAP" 70 ) 71 return engine 72 73 74def _get_default_engine_gz(): 75 try: 76 import scipy # noqa: F401 77 78 engine = "scipy" 79 except ImportError: # pragma: no cover 80 raise ValueError("scipy is required for accessing .gz files") 81 return engine 82 83 84def _get_default_engine_netcdf(): 85 try: 86 import netCDF4 # noqa: F401 87 88 engine = "netcdf4" 89 except ImportError: # pragma: no cover 90 try: 91 import scipy.io.netcdf # noqa: F401 92 93 engine = "scipy" 94 except ImportError: 95 raise ValueError( 96 "cannot read or write netCDF files without " 97 "netCDF4-python or scipy installed" 98 ) 99 return engine 100 101 102def _get_default_engine(path: str, allow_remote: bool = False): 103 if allow_remote and is_remote_uri(path): 104 return _get_default_engine_remote_uri() 105 elif path.endswith(".gz"): 106 return _get_default_engine_gz() 107 else: 108 return _get_default_engine_netcdf() 109 110 111def _validate_dataset_names(dataset): 112 """DataArray.name and Dataset keys must be a string or None""" 113 114 def check_name(name): 115 if isinstance(name, str): 116 if not name: 117 raise ValueError( 118 f"Invalid name {name!r} for DataArray or Dataset key: " 119 "string must be length 1 or greater for " 120 "serialization to netCDF files" 121 ) 122 elif name is not None: 123 raise TypeError( 124 f"Invalid name {name!r} for DataArray or Dataset key: " 125 "must be either a string or None for serialization to netCDF " 126 "files" 127 ) 128 129 for k in dataset.variables: 130 check_name(k) 131 132 133def _validate_attrs(dataset, invalid_netcdf=False): 134 """`attrs` must have a string key and a value which is either: a number, 135 a string, an ndarray, a list/tuple of numbers/strings, or a numpy.bool_. 136 137 Notes 138 ----- 139 A numpy.bool_ is only allowed when using the h5netcdf engine with 140 `invalid_netcdf=True`. 141 """ 142 143 valid_types = (str, Number, np.ndarray, np.number, list, tuple) 144 if invalid_netcdf: 145 valid_types += (np.bool_,) 146 147 def check_attr(name, value, valid_types): 148 if isinstance(name, str): 149 if not name: 150 raise ValueError( 151 f"Invalid name for attr {name!r}: string must be " 152 "length 1 or greater for serialization to " 153 "netCDF files" 154 ) 155 else: 156 raise TypeError( 157 f"Invalid name for attr: {name!r} must be a string for " 158 "serialization to netCDF files" 159 ) 160 161 if not isinstance(value, valid_types): 162 raise TypeError( 163 f"Invalid value for attr {name!r}: {value!r}. For serialization to " 164 "netCDF files, its value must be of one of the following types: " 165 f"{', '.join([vtype.__name__ for vtype in valid_types])}" 166 ) 167 168 # Check attrs on the dataset itself 169 for k, v in dataset.attrs.items(): 170 check_attr(k, v, valid_types) 171 172 # Check attrs on each variable within the dataset 173 for variable in dataset.variables.values(): 174 for k, v in variable.attrs.items(): 175 check_attr(k, v, valid_types) 176 177 178def _resolve_decoders_kwargs(decode_cf, open_backend_dataset_parameters, **decoders): 179 for d in list(decoders): 180 if decode_cf is False and d in open_backend_dataset_parameters: 181 decoders[d] = False 182 if decoders[d] is None: 183 decoders.pop(d) 184 return decoders 185 186 187def _get_mtime(filename_or_obj): 188 # if passed an actual file path, augment the token with 189 # the file modification time 190 mtime = None 191 192 try: 193 path = os.fspath(filename_or_obj) 194 except TypeError: 195 path = None 196 197 if path and not is_remote_uri(path): 198 mtime = os.path.getmtime(filename_or_obj) 199 200 return mtime 201 202 203def _protect_dataset_variables_inplace(dataset, cache): 204 for name, variable in dataset.variables.items(): 205 if name not in variable.dims: 206 # no need to protect IndexVariable objects 207 data = indexing.CopyOnWriteArray(variable._data) 208 if cache: 209 data = indexing.MemoryCachedArray(data) 210 variable.data = data 211 212 213def _finalize_store(write, store): 214 """Finalize this store by explicitly syncing and closing""" 215 del write # ensure writing is done first 216 store.close() 217 218 219def load_dataset(filename_or_obj, **kwargs): 220 """Open, load into memory, and close a Dataset from a file or file-like 221 object. 222 223 This is a thin wrapper around :py:meth:`~xarray.open_dataset`. It differs 224 from `open_dataset` in that it loads the Dataset into memory, closes the 225 file, and returns the Dataset. In contrast, `open_dataset` keeps the file 226 handle open and lazy loads its contents. All parameters are passed directly 227 to `open_dataset`. See that documentation for further details. 228 229 Returns 230 ------- 231 dataset : Dataset 232 The newly created Dataset. 233 234 See Also 235 -------- 236 open_dataset 237 """ 238 if "cache" in kwargs: 239 raise TypeError("cache has no effect in this context") 240 241 with open_dataset(filename_or_obj, **kwargs) as ds: 242 return ds.load() 243 244 245def load_dataarray(filename_or_obj, **kwargs): 246 """Open, load into memory, and close a DataArray from a file or file-like 247 object containing a single data variable. 248 249 This is a thin wrapper around :py:meth:`~xarray.open_dataarray`. It differs 250 from `open_dataarray` in that it loads the Dataset into memory, closes the 251 file, and returns the Dataset. In contrast, `open_dataarray` keeps the file 252 handle open and lazy loads its contents. All parameters are passed directly 253 to `open_dataarray`. See that documentation for further details. 254 255 Returns 256 ------- 257 datarray : DataArray 258 The newly created DataArray. 259 260 See Also 261 -------- 262 open_dataarray 263 """ 264 if "cache" in kwargs: 265 raise TypeError("cache has no effect in this context") 266 267 with open_dataarray(filename_or_obj, **kwargs) as da: 268 return da.load() 269 270 271def _chunk_ds( 272 backend_ds, 273 filename_or_obj, 274 engine, 275 chunks, 276 overwrite_encoded_chunks, 277 **extra_tokens, 278): 279 from dask.base import tokenize 280 281 mtime = _get_mtime(filename_or_obj) 282 token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens) 283 name_prefix = f"open_dataset-{token}" 284 285 variables = {} 286 for name, var in backend_ds.variables.items(): 287 var_chunks = _get_chunk(var, chunks) 288 variables[name] = _maybe_chunk( 289 name, 290 var, 291 var_chunks, 292 overwrite_encoded_chunks=overwrite_encoded_chunks, 293 name_prefix=name_prefix, 294 token=token, 295 ) 296 return backend_ds._replace(variables) 297 298 299def _dataset_from_backend_dataset( 300 backend_ds, 301 filename_or_obj, 302 engine, 303 chunks, 304 cache, 305 overwrite_encoded_chunks, 306 **extra_tokens, 307): 308 if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}: 309 raise ValueError( 310 f"chunks must be an int, dict, 'auto', or None. Instead found {chunks}." 311 ) 312 313 _protect_dataset_variables_inplace(backend_ds, cache) 314 if chunks is None: 315 ds = backend_ds 316 else: 317 ds = _chunk_ds( 318 backend_ds, 319 filename_or_obj, 320 engine, 321 chunks, 322 overwrite_encoded_chunks, 323 **extra_tokens, 324 ) 325 326 ds.set_close(backend_ds._close) 327 328 # Ensure source filename always stored in dataset object (GH issue #2550) 329 if "source" not in ds.encoding and isinstance(filename_or_obj, str): 330 ds.encoding["source"] = filename_or_obj 331 332 return ds 333 334 335def open_dataset( 336 filename_or_obj, 337 *args, 338 engine=None, 339 chunks=None, 340 cache=None, 341 decode_cf=None, 342 mask_and_scale=None, 343 decode_times=None, 344 decode_timedelta=None, 345 use_cftime=None, 346 concat_characters=None, 347 decode_coords=None, 348 drop_variables=None, 349 backend_kwargs=None, 350 **kwargs, 351): 352 """Open and decode a dataset from a file or file-like object. 353 354 Parameters 355 ---------- 356 filename_or_obj : str, Path, file-like or DataStore 357 Strings and Path objects are interpreted as a path to a netCDF file 358 or an OpenDAP URL and opened with python-netCDF4, unless the filename 359 ends with .gz, in which case the file is gunzipped and opened with 360 scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like 361 objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF). 362 engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \ 363 "pseudonetcdf", "zarr"} or subclass of xarray.backends.BackendEntrypoint, optional 364 Engine to use when reading files. If not provided, the default engine 365 is chosen based on available dependencies, with a preference for 366 "netcdf4". A custom backend class (a subclass of ``BackendEntrypoint``) 367 can also be used. 368 chunks : int or dict, optional 369 If chunks is provided, it is used to load the new dataset into dask 370 arrays. ``chunks=-1`` loads the dataset with dask using a single 371 chunk for all arrays. ``chunks={}`` loads the dataset with dask using 372 engine preferred chunks if exposed by the backend, otherwise with 373 a single chunk for all arrays. 374 ``chunks='auto'`` will use dask ``auto`` chunking taking into account the 375 engine preferred chunks. See dask chunking for more details. 376 cache : bool, optional 377 If True, cache data loaded from the underlying datastore in memory as 378 NumPy arrays when accessed to avoid reading from the underlying data- 379 store multiple times. Defaults to True unless you specify the `chunks` 380 argument to use dask, in which case it defaults to False. Does not 381 change the behavior of coordinates corresponding to dimensions, which 382 always load their data from disk into a ``pandas.Index``. 383 decode_cf : bool, optional 384 Whether to decode these variables, assuming they were saved according 385 to CF conventions. 386 mask_and_scale : bool, optional 387 If True, replace array values equal to `_FillValue` with NA and scale 388 values according to the formula `original_values * scale_factor + 389 add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are 390 taken from variable attributes (if they exist). If the `_FillValue` or 391 `missing_value` attribute contains multiple values a warning will be 392 issued and all array values matching one of the multiple values will 393 be replaced by NA. mask_and_scale defaults to True except for the 394 pseudonetcdf backend. This keyword may not be supported by all the backends. 395 decode_times : bool, optional 396 If True, decode times encoded in the standard NetCDF datetime format 397 into datetime objects. Otherwise, leave them encoded as numbers. 398 This keyword may not be supported by all the backends. 399 decode_timedelta : bool, optional 400 If True, decode variables and coordinates with time units in 401 {"days", "hours", "minutes", "seconds", "milliseconds", "microseconds"} 402 into timedelta objects. If False, leave them encoded as numbers. 403 If None (default), assume the same value of decode_time. 404 This keyword may not be supported by all the backends. 405 use_cftime: bool, optional 406 Only relevant if encoded dates come from a standard calendar 407 (e.g. "gregorian", "proleptic_gregorian", "standard", or not 408 specified). If None (default), attempt to decode times to 409 ``np.datetime64[ns]`` objects; if this is not possible, decode times to 410 ``cftime.datetime`` objects. If True, always decode times to 411 ``cftime.datetime`` objects, regardless of whether or not they can be 412 represented using ``np.datetime64[ns]`` objects. If False, always 413 decode times to ``np.datetime64[ns]`` objects; if this is not possible 414 raise an error. This keyword may not be supported by all the backends. 415 concat_characters : bool, optional 416 If True, concatenate along the last dimension of character arrays to 417 form string arrays. Dimensions will only be concatenated over (and 418 removed) if they have no corresponding variable and if they are only 419 used as the last dimension of character arrays. 420 This keyword may not be supported by all the backends. 421 decode_coords : bool or {"coordinates", "all"}, optional 422 Controls which variables are set as coordinate variables: 423 424 - "coordinates" or True: Set variables referred to in the 425 ``'coordinates'`` attribute of the datasets or individual variables 426 as coordinate variables. 427 - "all": Set variables referred to in ``'grid_mapping'``, ``'bounds'`` and 428 other attributes as coordinate variables. 429 drop_variables: str or iterable, optional 430 A variable or list of variables to exclude from being parsed from the 431 dataset. This may be useful to drop variables with problems or 432 inconsistent values. 433 backend_kwargs: dict 434 Additional keyword arguments passed on to the engine open function, 435 equivalent to `**kwargs`. 436 **kwargs: dict 437 Additional keyword arguments passed on to the engine open function. 438 For example: 439 440 - 'group': path to the netCDF4 group in the given file to open given as 441 a str,supported by "netcdf4", "h5netcdf", "zarr". 442 - 'lock': resource lock to use when reading data from disk. Only 443 relevant when using dask or another form of parallelism. By default, 444 appropriate locks are chosen to safely read and write files with the 445 currently active dask scheduler. Supported by "netcdf4", "h5netcdf", 446 "scipy", "pynio", "pseudonetcdf", "cfgrib". 447 448 See engine open function for kwargs accepted by each specific engine. 449 450 Returns 451 ------- 452 dataset : Dataset 453 The newly created dataset. 454 455 Notes 456 ----- 457 ``open_dataset`` opens the file with read-only access. When you modify 458 values of a Dataset, even one linked to files on disk, only the in-memory 459 copy you are manipulating in xarray is modified: the original file on disk 460 is never touched. 461 462 See Also 463 -------- 464 open_mfdataset 465 """ 466 if len(args) > 0: 467 raise TypeError( 468 "open_dataset() takes only 1 positional argument starting from version 0.18.0, " 469 "all other options must be passed as keyword arguments" 470 ) 471 472 if cache is None: 473 cache = chunks is None 474 475 if backend_kwargs is not None: 476 kwargs.update(backend_kwargs) 477 478 if engine is None: 479 engine = plugins.guess_engine(filename_or_obj) 480 481 backend = plugins.get_backend(engine) 482 483 decoders = _resolve_decoders_kwargs( 484 decode_cf, 485 open_backend_dataset_parameters=backend.open_dataset_parameters, 486 mask_and_scale=mask_and_scale, 487 decode_times=decode_times, 488 decode_timedelta=decode_timedelta, 489 concat_characters=concat_characters, 490 use_cftime=use_cftime, 491 decode_coords=decode_coords, 492 ) 493 494 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) 495 backend_ds = backend.open_dataset( 496 filename_or_obj, 497 drop_variables=drop_variables, 498 **decoders, 499 **kwargs, 500 ) 501 ds = _dataset_from_backend_dataset( 502 backend_ds, 503 filename_or_obj, 504 engine, 505 chunks, 506 cache, 507 overwrite_encoded_chunks, 508 drop_variables=drop_variables, 509 **decoders, 510 **kwargs, 511 ) 512 return ds 513 514 515def open_dataarray( 516 filename_or_obj, 517 *args, 518 engine=None, 519 chunks=None, 520 cache=None, 521 decode_cf=None, 522 mask_and_scale=None, 523 decode_times=None, 524 decode_timedelta=None, 525 use_cftime=None, 526 concat_characters=None, 527 decode_coords=None, 528 drop_variables=None, 529 backend_kwargs=None, 530 **kwargs, 531): 532 """Open an DataArray from a file or file-like object containing a single 533 data variable. 534 535 This is designed to read netCDF files with only one data variable. If 536 multiple variables are present then a ValueError is raised. 537 538 Parameters 539 ---------- 540 filename_or_obj : str, Path, file-like or DataStore 541 Strings and Path objects are interpreted as a path to a netCDF file 542 or an OpenDAP URL and opened with python-netCDF4, unless the filename 543 ends with .gz, in which case the file is gunzipped and opened with 544 scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like 545 objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF). 546 engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \ 547 "pseudonetcdf", "zarr"}, optional 548 Engine to use when reading files. If not provided, the default engine 549 is chosen based on available dependencies, with a preference for 550 "netcdf4". 551 chunks : int or dict, optional 552 If chunks is provided, it is used to load the new dataset into dask 553 arrays. ``chunks=-1`` loads the dataset with dask using a single 554 chunk for all arrays. `chunks={}`` loads the dataset with dask using 555 engine preferred chunks if exposed by the backend, otherwise with 556 a single chunk for all arrays. 557 ``chunks='auto'`` will use dask ``auto`` chunking taking into account the 558 engine preferred chunks. See dask chunking for more details. 559 cache : bool, optional 560 If True, cache data loaded from the underlying datastore in memory as 561 NumPy arrays when accessed to avoid reading from the underlying data- 562 store multiple times. Defaults to True unless you specify the `chunks` 563 argument to use dask, in which case it defaults to False. Does not 564 change the behavior of coordinates corresponding to dimensions, which 565 always load their data from disk into a ``pandas.Index``. 566 decode_cf : bool, optional 567 Whether to decode these variables, assuming they were saved according 568 to CF conventions. 569 mask_and_scale : bool, optional 570 If True, replace array values equal to `_FillValue` with NA and scale 571 values according to the formula `original_values * scale_factor + 572 add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are 573 taken from variable attributes (if they exist). If the `_FillValue` or 574 `missing_value` attribute contains multiple values a warning will be 575 issued and all array values matching one of the multiple values will 576 be replaced by NA. mask_and_scale defaults to True except for the 577 pseudonetcdf backend. This keyword may not be supported by all the backends. 578 decode_times : bool, optional 579 If True, decode times encoded in the standard NetCDF datetime format 580 into datetime objects. Otherwise, leave them encoded as numbers. 581 This keyword may not be supported by all the backends. 582 decode_timedelta : bool, optional 583 If True, decode variables and coordinates with time units in 584 {"days", "hours", "minutes", "seconds", "milliseconds", "microseconds"} 585 into timedelta objects. If False, leave them encoded as numbers. 586 If None (default), assume the same value of decode_time. 587 This keyword may not be supported by all the backends. 588 use_cftime: bool, optional 589 Only relevant if encoded dates come from a standard calendar 590 (e.g. "gregorian", "proleptic_gregorian", "standard", or not 591 specified). If None (default), attempt to decode times to 592 ``np.datetime64[ns]`` objects; if this is not possible, decode times to 593 ``cftime.datetime`` objects. If True, always decode times to 594 ``cftime.datetime`` objects, regardless of whether or not they can be 595 represented using ``np.datetime64[ns]`` objects. If False, always 596 decode times to ``np.datetime64[ns]`` objects; if this is not possible 597 raise an error. This keyword may not be supported by all the backends. 598 concat_characters : bool, optional 599 If True, concatenate along the last dimension of character arrays to 600 form string arrays. Dimensions will only be concatenated over (and 601 removed) if they have no corresponding variable and if they are only 602 used as the last dimension of character arrays. 603 This keyword may not be supported by all the backends. 604 decode_coords : bool or {"coordinates", "all"}, optional 605 Controls which variables are set as coordinate variables: 606 607 - "coordinates" or True: Set variables referred to in the 608 ``'coordinates'`` attribute of the datasets or individual variables 609 as coordinate variables. 610 - "all": Set variables referred to in ``'grid_mapping'``, ``'bounds'`` and 611 other attributes as coordinate variables. 612 drop_variables: str or iterable, optional 613 A variable or list of variables to exclude from being parsed from the 614 dataset. This may be useful to drop variables with problems or 615 inconsistent values. 616 backend_kwargs: dict 617 Additional keyword arguments passed on to the engine open function, 618 equivalent to `**kwargs`. 619 **kwargs: dict 620 Additional keyword arguments passed on to the engine open function. 621 For example: 622 623 - 'group': path to the netCDF4 group in the given file to open given as 624 a str,supported by "netcdf4", "h5netcdf", "zarr". 625 - 'lock': resource lock to use when reading data from disk. Only 626 relevant when using dask or another form of parallelism. By default, 627 appropriate locks are chosen to safely read and write files with the 628 currently active dask scheduler. Supported by "netcdf4", "h5netcdf", 629 "scipy", "pynio", "pseudonetcdf", "cfgrib". 630 631 See engine open function for kwargs accepted by each specific engine. 632 633 Notes 634 ----- 635 This is designed to be fully compatible with `DataArray.to_netcdf`. Saving 636 using `DataArray.to_netcdf` and then loading with this function will 637 produce an identical result. 638 639 All parameters are passed directly to `xarray.open_dataset`. See that 640 documentation for further details. 641 642 See also 643 -------- 644 open_dataset 645 """ 646 if len(args) > 0: 647 raise TypeError( 648 "open_dataarray() takes only 1 positional argument starting from version 0.18.0, " 649 "all other options must be passed as keyword arguments" 650 ) 651 652 dataset = open_dataset( 653 filename_or_obj, 654 decode_cf=decode_cf, 655 mask_and_scale=mask_and_scale, 656 decode_times=decode_times, 657 concat_characters=concat_characters, 658 decode_coords=decode_coords, 659 engine=engine, 660 chunks=chunks, 661 cache=cache, 662 drop_variables=drop_variables, 663 backend_kwargs=backend_kwargs, 664 use_cftime=use_cftime, 665 decode_timedelta=decode_timedelta, 666 **kwargs, 667 ) 668 669 if len(dataset.data_vars) != 1: 670 raise ValueError( 671 "Given file dataset contains more than one data " 672 "variable. Please read with xarray.open_dataset and " 673 "then select the variable you want." 674 ) 675 else: 676 (data_array,) = dataset.data_vars.values() 677 678 data_array.set_close(dataset._close) 679 680 # Reset names if they were changed during saving 681 # to ensure that we can 'roundtrip' perfectly 682 if DATAARRAY_NAME in dataset.attrs: 683 data_array.name = dataset.attrs[DATAARRAY_NAME] 684 del dataset.attrs[DATAARRAY_NAME] 685 686 if data_array.name == DATAARRAY_VARIABLE: 687 data_array.name = None 688 689 return data_array 690 691 692def open_mfdataset( 693 paths, 694 chunks=None, 695 concat_dim=None, 696 compat="no_conflicts", 697 preprocess=None, 698 engine=None, 699 data_vars="all", 700 coords="different", 701 combine="by_coords", 702 parallel=False, 703 join="outer", 704 attrs_file=None, 705 combine_attrs="override", 706 **kwargs, 707): 708 """Open multiple files as a single dataset. 709 710 If combine='by_coords' then the function ``combine_by_coords`` is used to combine 711 the datasets into one before returning the result, and if combine='nested' then 712 ``combine_nested`` is used. The filepaths must be structured according to which 713 combining function is used, the details of which are given in the documentation for 714 ``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'`` 715 will be used. Requires dask to be installed. See documentation for 716 details on dask [1]_. Global attributes from the ``attrs_file`` are used 717 for the combined dataset. 718 719 Parameters 720 ---------- 721 paths : str or sequence 722 Either a string glob in the form ``"path/to/my/files/*.nc"`` or an explicit list of 723 files to open. Paths can be given as strings or as pathlib Paths. If 724 concatenation along more than one dimension is desired, then ``paths`` must be a 725 nested list-of-lists (see ``combine_nested`` for details). (A string glob will 726 be expanded to a 1-dimensional list.) 727 chunks : int or dict, optional 728 Dictionary with keys given by dimension names and values given by chunk sizes. 729 In general, these should divide the dimensions of each dataset. If int, chunk 730 each dimension by ``chunks``. By default, chunks will be chosen to load entire 731 input files into memory at once. This has a major impact on performance: please 732 see the full documentation for more details [2]_. 733 concat_dim : str, or list of str, DataArray, Index or None, optional 734 Dimensions to concatenate files along. You only need to provide this argument 735 if ``combine='nested'``, and if any of the dimensions along which you want to 736 concatenate is not a dimension in the original datasets, e.g., if you want to 737 stack a collection of 2D arrays along a third dimension. Set 738 ``concat_dim=[..., None, ...]`` explicitly to disable concatenation along a 739 particular dimension. Default is None, which for a 1D list of filepaths is 740 equivalent to opening the files separately and then merging them with 741 ``xarray.merge``. 742 combine : {"by_coords", "nested"}, optional 743 Whether ``xarray.combine_by_coords`` or ``xarray.combine_nested`` is used to 744 combine all the data. Default is to use ``xarray.combine_by_coords``. 745 compat : {"identical", "equals", "broadcast_equals", \ 746 "no_conflicts", "override"}, optional 747 String indicating how to compare variables of the same name for 748 potential conflicts when merging: 749 750 * "broadcast_equals": all values must be equal when variables are 751 broadcast against each other to ensure common dimensions. 752 * "equals": all values and dimensions must be the same. 753 * "identical": all values, dimensions and attributes must be the 754 same. 755 * "no_conflicts": only values which are not null in both datasets 756 must be equal. The returned dataset then contains the combination 757 of all non-null values. 758 * "override": skip comparing and pick variable from first dataset 759 760 preprocess : callable, optional 761 If provided, call this function on each dataset prior to concatenation. 762 You can find the file-name from which each dataset was loaded in 763 ``ds.encoding["source"]``. 764 engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", "zarr"}, \ 765 optional 766 Engine to use when reading files. If not provided, the default engine 767 is chosen based on available dependencies, with a preference for 768 "netcdf4". 769 data_vars : {"minimal", "different", "all"} or list of str, optional 770 These data variables will be concatenated together: 771 * "minimal": Only data variables in which the dimension already 772 appears are included. 773 * "different": Data variables which are not equal (ignoring 774 attributes) across all datasets are also concatenated (as well as 775 all for which dimension already appears). Beware: this option may 776 load the data payload of data variables into memory if they are not 777 already loaded. 778 * "all": All data variables will be concatenated. 779 * list of str: The listed data variables will be concatenated, in 780 addition to the "minimal" data variables. 781 coords : {"minimal", "different", "all"} or list of str, optional 782 These coordinate variables will be concatenated together: 783 * "minimal": Only coordinates in which the dimension already appears 784 are included. 785 * "different": Coordinates which are not equal (ignoring attributes) 786 across all datasets are also concatenated (as well as all for which 787 dimension already appears). Beware: this option may load the data 788 payload of coordinate variables into memory if they are not already 789 loaded. 790 * "all": All coordinate variables will be concatenated, except 791 those corresponding to other dimensions. 792 * list of str: The listed coordinate variables will be concatenated, 793 in addition the "minimal" coordinates. 794 parallel : bool, optional 795 If True, the open and preprocess steps of this function will be 796 performed in parallel using ``dask.delayed``. Default is False. 797 join : {"outer", "inner", "left", "right", "exact, "override"}, optional 798 String indicating how to combine differing indexes 799 (excluding concat_dim) in objects 800 801 - "outer": use the union of object indexes 802 - "inner": use the intersection of object indexes 803 - "left": use indexes from the first object with each dimension 804 - "right": use indexes from the last object with each dimension 805 - "exact": instead of aligning, raise `ValueError` when indexes to be 806 aligned are not equal 807 - "override": if indexes are of same size, rewrite indexes to be 808 those of the first object with that dimension. Indexes for the same 809 dimension must have the same size in all objects. 810 attrs_file : str or path-like, optional 811 Path of the file used to read global attributes from. 812 By default global attributes are read from the first file provided, 813 with wildcard matches sorted by filename. 814 **kwargs : optional 815 Additional arguments passed on to :py:func:`xarray.open_dataset`. 816 817 Returns 818 ------- 819 xarray.Dataset 820 821 Notes 822 ----- 823 ``open_mfdataset`` opens files with read-only access. When you modify values 824 of a Dataset, even one linked to files on disk, only the in-memory copy you 825 are manipulating in xarray is modified: the original file on disk is never 826 touched. 827 828 See Also 829 -------- 830 combine_by_coords 831 combine_nested 832 open_dataset 833 834 References 835 ---------- 836 837 .. [1] http://xarray.pydata.org/en/stable/dask.html 838 .. [2] http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance 839 """ 840 if isinstance(paths, str): 841 if is_remote_uri(paths) and engine == "zarr": 842 try: 843 from fsspec.core import get_fs_token_paths 844 except ImportError as e: 845 raise ImportError( 846 "The use of remote URLs for opening zarr requires the package fsspec" 847 ) from e 848 849 fs, _, _ = get_fs_token_paths( 850 paths, 851 mode="rb", 852 storage_options=kwargs.get("backend_kwargs", {}).get( 853 "storage_options", {} 854 ), 855 expand=False, 856 ) 857 paths = fs.glob(fs._strip_protocol(paths)) # finds directories 858 paths = [fs.get_mapper(path) for path in paths] 859 elif is_remote_uri(paths): 860 raise ValueError( 861 "cannot do wild-card matching for paths that are remote URLs " 862 f"unless engine='zarr' is specified. Got paths: {paths}. " 863 "Instead, supply paths as an explicit list of strings." 864 ) 865 else: 866 paths = sorted(glob(_normalize_path(paths))) 867 elif isinstance(paths, os.PathLike): 868 paths = [os.fspath(paths)] 869 else: 870 paths = [os.fspath(p) if isinstance(p, os.PathLike) else p for p in paths] 871 872 if not paths: 873 raise OSError("no files to open") 874 875 if combine == "nested": 876 if isinstance(concat_dim, (str, DataArray)) or concat_dim is None: 877 concat_dim = [concat_dim] 878 879 # This creates a flat list which is easier to iterate over, whilst 880 # encoding the originally-supplied structure as "ids". 881 # The "ids" are not used at all if combine='by_coords`. 882 combined_ids_paths = _infer_concat_order_from_positions(paths) 883 ids, paths = ( 884 list(combined_ids_paths.keys()), 885 list(combined_ids_paths.values()), 886 ) 887 elif combine == "by_coords" and concat_dim is not None: 888 raise ValueError( 889 "When combine='by_coords', passing a value for `concat_dim` has no " 890 "effect. To manually combine along a specific dimension you should " 891 "instead specify combine='nested' along with a value for `concat_dim`.", 892 ) 893 894 open_kwargs = dict(engine=engine, chunks=chunks or {}, **kwargs) 895 896 if parallel: 897 import dask 898 899 # wrap the open_dataset, getattr, and preprocess with delayed 900 open_ = dask.delayed(open_dataset) 901 getattr_ = dask.delayed(getattr) 902 if preprocess is not None: 903 preprocess = dask.delayed(preprocess) 904 else: 905 open_ = open_dataset 906 getattr_ = getattr 907 908 datasets = [open_(p, **open_kwargs) for p in paths] 909 closers = [getattr_(ds, "_close") for ds in datasets] 910 if preprocess is not None: 911 datasets = [preprocess(ds) for ds in datasets] 912 913 if parallel: 914 # calling compute here will return the datasets/file_objs lists, 915 # the underlying datasets will still be stored as dask arrays 916 datasets, closers = dask.compute(datasets, closers) 917 918 # Combine all datasets, closing them in case of a ValueError 919 try: 920 if combine == "nested": 921 # Combined nested list by successive concat and merge operations 922 # along each dimension, using structure given by "ids" 923 combined = _nested_combine( 924 datasets, 925 concat_dims=concat_dim, 926 compat=compat, 927 data_vars=data_vars, 928 coords=coords, 929 ids=ids, 930 join=join, 931 combine_attrs=combine_attrs, 932 ) 933 elif combine == "by_coords": 934 # Redo ordering from coordinates, ignoring how they were ordered 935 # previously 936 combined = combine_by_coords( 937 datasets, 938 compat=compat, 939 data_vars=data_vars, 940 coords=coords, 941 join=join, 942 combine_attrs=combine_attrs, 943 ) 944 else: 945 raise ValueError( 946 "{} is an invalid option for the keyword argument" 947 " ``combine``".format(combine) 948 ) 949 except ValueError: 950 for ds in datasets: 951 ds.close() 952 raise 953 954 def multi_file_closer(): 955 for closer in closers: 956 closer() 957 958 combined.set_close(multi_file_closer) 959 960 # read global attributes from the attrs_file or from the first dataset 961 if attrs_file is not None: 962 if isinstance(attrs_file, os.PathLike): 963 attrs_file = os.fspath(attrs_file) 964 combined.attrs = datasets[paths.index(attrs_file)].attrs 965 966 return combined 967 968 969WRITEABLE_STORES: Dict[str, Callable] = { 970 "netcdf4": backends.NetCDF4DataStore.open, 971 "scipy": backends.ScipyDataStore, 972 "h5netcdf": backends.H5NetCDFStore.open, 973} 974 975 976def to_netcdf( 977 dataset: Dataset, 978 path_or_file=None, 979 mode: str = "w", 980 format: str = None, 981 group: str = None, 982 engine: str = None, 983 encoding: Mapping = None, 984 unlimited_dims: Iterable[Hashable] = None, 985 compute: bool = True, 986 multifile: bool = False, 987 invalid_netcdf: bool = False, 988) -> Union[Tuple[ArrayWriter, AbstractDataStore], bytes, "Delayed", None]: 989 """This function creates an appropriate datastore for writing a dataset to 990 disk as a netCDF file 991 992 See `Dataset.to_netcdf` for full API docs. 993 994 The ``multifile`` argument is only for the private use of save_mfdataset. 995 """ 996 if isinstance(path_or_file, os.PathLike): 997 path_or_file = os.fspath(path_or_file) 998 999 if encoding is None: 1000 encoding = {} 1001 1002 if path_or_file is None: 1003 if engine is None: 1004 engine = "scipy" 1005 elif engine != "scipy": 1006 raise ValueError( 1007 "invalid engine for creating bytes with " 1008 f"to_netcdf: {engine!r}. Only the default engine " 1009 "or engine='scipy' is supported" 1010 ) 1011 if not compute: 1012 raise NotImplementedError( 1013 "to_netcdf() with compute=False is not yet implemented when " 1014 "returning bytes" 1015 ) 1016 elif isinstance(path_or_file, str): 1017 if engine is None: 1018 engine = _get_default_engine(path_or_file) 1019 path_or_file = _normalize_path(path_or_file) 1020 else: # file-like object 1021 engine = "scipy" 1022 1023 # validate Dataset keys, DataArray names, and attr keys/values 1024 _validate_dataset_names(dataset) 1025 _validate_attrs(dataset, invalid_netcdf=invalid_netcdf and engine == "h5netcdf") 1026 1027 try: 1028 store_open = WRITEABLE_STORES[engine] 1029 except KeyError: 1030 raise ValueError(f"unrecognized engine for to_netcdf: {engine!r}") 1031 1032 if format is not None: 1033 format = format.upper() 1034 1035 # handle scheduler specific logic 1036 scheduler = _get_scheduler() 1037 have_chunks = any(v.chunks for v in dataset.variables.values()) 1038 1039 autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"] 1040 if autoclose and engine == "scipy": 1041 raise NotImplementedError( 1042 f"Writing netCDF files with the {engine} backend " 1043 f"is not currently supported with dask's {scheduler} scheduler" 1044 ) 1045 1046 target = path_or_file if path_or_file is not None else BytesIO() 1047 kwargs = dict(autoclose=True) if autoclose else {} 1048 if invalid_netcdf: 1049 if engine == "h5netcdf": 1050 kwargs["invalid_netcdf"] = invalid_netcdf 1051 else: 1052 raise ValueError( 1053 f"unrecognized option 'invalid_netcdf' for engine {engine}" 1054 ) 1055 store = store_open(target, mode, format, group, **kwargs) 1056 1057 if unlimited_dims is None: 1058 unlimited_dims = dataset.encoding.get("unlimited_dims", None) 1059 if unlimited_dims is not None: 1060 if isinstance(unlimited_dims, str) or not isinstance(unlimited_dims, Iterable): 1061 unlimited_dims = [unlimited_dims] 1062 else: 1063 unlimited_dims = list(unlimited_dims) 1064 1065 writer = ArrayWriter() 1066 1067 # TODO: figure out how to refactor this logic (here and in save_mfdataset) 1068 # to avoid this mess of conditionals 1069 try: 1070 # TODO: allow this work (setting up the file for writing array data) 1071 # to be parallelized with dask 1072 dump_to_store( 1073 dataset, store, writer, encoding=encoding, unlimited_dims=unlimited_dims 1074 ) 1075 if autoclose: 1076 store.close() 1077 1078 if multifile: 1079 return writer, store 1080 1081 writes = writer.sync(compute=compute) 1082 1083 if path_or_file is None: 1084 store.sync() 1085 return target.getvalue() 1086 finally: 1087 if not multifile and compute: 1088 store.close() 1089 1090 if not compute: 1091 import dask 1092 1093 return dask.delayed(_finalize_store)(writes, store) 1094 return None 1095 1096 1097def dump_to_store( 1098 dataset, store, writer=None, encoder=None, encoding=None, unlimited_dims=None 1099): 1100 """Store dataset contents to a backends.*DataStore object.""" 1101 if writer is None: 1102 writer = ArrayWriter() 1103 1104 if encoding is None: 1105 encoding = {} 1106 1107 variables, attrs = conventions.encode_dataset_coordinates(dataset) 1108 1109 check_encoding = set() 1110 for k, enc in encoding.items(): 1111 # no need to shallow copy the variable again; that already happened 1112 # in encode_dataset_coordinates 1113 variables[k].encoding = enc 1114 check_encoding.add(k) 1115 1116 if encoder: 1117 variables, attrs = encoder(variables, attrs) 1118 1119 store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims) 1120 1121 1122def save_mfdataset( 1123 datasets, paths, mode="w", format=None, groups=None, engine=None, compute=True 1124): 1125 """Write multiple datasets to disk as netCDF files simultaneously. 1126 1127 This function is intended for use with datasets consisting of dask.array 1128 objects, in which case it can write the multiple datasets to disk 1129 simultaneously using a shared thread pool. 1130 1131 When not using dask, it is no different than calling ``to_netcdf`` 1132 repeatedly. 1133 1134 Parameters 1135 ---------- 1136 datasets : list of Dataset 1137 List of datasets to save. 1138 paths : list of str or list of path-like objects 1139 List of paths to which to save each corresponding dataset. 1140 mode : {"w", "a"}, optional 1141 Write ("w") or append ("a") mode. If mode="w", any existing file at 1142 these locations will be overwritten. 1143 format : {"NETCDF4", "NETCDF4_CLASSIC", "NETCDF3_64BIT", \ 1144 "NETCDF3_CLASSIC"}, optional 1145 1146 File format for the resulting netCDF file: 1147 1148 * NETCDF4: Data is stored in an HDF5 file, using netCDF4 API 1149 features. 1150 * NETCDF4_CLASSIC: Data is stored in an HDF5 file, using only 1151 netCDF 3 compatible API features. 1152 * NETCDF3_64BIT: 64-bit offset version of the netCDF 3 file format, 1153 which fully supports 2+ GB files, but is only compatible with 1154 clients linked against netCDF version 3.6.0 or later. 1155 * NETCDF3_CLASSIC: The classic netCDF 3 file format. It does not 1156 handle 2+ GB files very well. 1157 1158 All formats are supported by the netCDF4-python library. 1159 scipy.io.netcdf only supports the last two formats. 1160 1161 The default format is NETCDF4 if you are saving a file to disk and 1162 have the netCDF4-python library available. Otherwise, xarray falls 1163 back to using scipy to write netCDF files and defaults to the 1164 NETCDF3_64BIT format (scipy does not support netCDF4). 1165 groups : list of str, optional 1166 Paths to the netCDF4 group in each corresponding file to which to save 1167 datasets (only works for format="NETCDF4"). The groups will be created 1168 if necessary. 1169 engine : {"netcdf4", "scipy", "h5netcdf"}, optional 1170 Engine to use when writing netCDF files. If not provided, the 1171 default engine is chosen based on available dependencies, with a 1172 preference for "netcdf4" if writing to a file on disk. 1173 See `Dataset.to_netcdf` for additional information. 1174 compute : bool 1175 If true compute immediately, otherwise return a 1176 ``dask.delayed.Delayed`` object that can be computed later. 1177 1178 Examples 1179 -------- 1180 1181 Save a dataset into one netCDF per year of data: 1182 1183 >>> ds = xr.Dataset( 1184 ... {"a": ("time", np.linspace(0, 1, 48))}, 1185 ... coords={"time": pd.date_range("2010-01-01", freq="M", periods=48)}, 1186 ... ) 1187 >>> ds 1188 <xarray.Dataset> 1189 Dimensions: (time: 48) 1190 Coordinates: 1191 * time (time) datetime64[ns] 2010-01-31 2010-02-28 ... 2013-12-31 1192 Data variables: 1193 a (time) float64 0.0 0.02128 0.04255 0.06383 ... 0.9574 0.9787 1.0 1194 >>> years, datasets = zip(*ds.groupby("time.year")) 1195 >>> paths = [f"{y}.nc" for y in years] 1196 >>> xr.save_mfdataset(datasets, paths) 1197 """ 1198 if mode == "w" and len(set(paths)) < len(paths): 1199 raise ValueError( 1200 "cannot use mode='w' when writing multiple datasets to the same path" 1201 ) 1202 1203 for obj in datasets: 1204 if not isinstance(obj, Dataset): 1205 raise TypeError( 1206 "save_mfdataset only supports writing Dataset " 1207 f"objects, received type {type(obj)}" 1208 ) 1209 1210 if groups is None: 1211 groups = [None] * len(datasets) 1212 1213 if len({len(datasets), len(paths), len(groups)}) > 1: 1214 raise ValueError( 1215 "must supply lists of the same length for the " 1216 "datasets, paths and groups arguments to " 1217 "save_mfdataset" 1218 ) 1219 1220 writers, stores = zip( 1221 *[ 1222 to_netcdf( 1223 ds, path, mode, format, group, engine, compute=compute, multifile=True 1224 ) 1225 for ds, path, group in zip(datasets, paths, groups) 1226 ] 1227 ) 1228 1229 try: 1230 writes = [w.sync(compute=compute) for w in writers] 1231 finally: 1232 if compute: 1233 for store in stores: 1234 store.close() 1235 1236 if not compute: 1237 import dask 1238 1239 return dask.delayed( 1240 [dask.delayed(_finalize_store)(w, s) for w, s in zip(writes, stores)] 1241 ) 1242 1243 1244def _validate_region(ds, region): 1245 if not isinstance(region, dict): 1246 raise TypeError(f"``region`` must be a dict, got {type(region)}") 1247 1248 for k, v in region.items(): 1249 if k not in ds.dims: 1250 raise ValueError( 1251 f"all keys in ``region`` are not in Dataset dimensions, got " 1252 f"{list(region)} and {list(ds.dims)}" 1253 ) 1254 if not isinstance(v, slice): 1255 raise TypeError( 1256 "all values in ``region`` must be slice objects, got " 1257 f"region={region}" 1258 ) 1259 if v.step not in {1, None}: 1260 raise ValueError( 1261 "step on all slices in ``region`` must be 1 or None, got " 1262 f"region={region}" 1263 ) 1264 1265 non_matching_vars = [ 1266 k for k, v in ds.variables.items() if not set(region).intersection(v.dims) 1267 ] 1268 if non_matching_vars: 1269 raise ValueError( 1270 f"when setting `region` explicitly in to_zarr(), all " 1271 f"variables in the dataset to write must have at least " 1272 f"one dimension in common with the region's dimensions " 1273 f"{list(region.keys())}, but that is not " 1274 f"the case for some variables here. To drop these variables " 1275 f"from this dataset before exporting to zarr, write: " 1276 f".drop({non_matching_vars!r})" 1277 ) 1278 1279 1280def _validate_datatypes_for_zarr_append(dataset): 1281 """DataArray.name and Dataset keys must be a string or None""" 1282 1283 def check_dtype(var): 1284 if ( 1285 not np.issubdtype(var.dtype, np.number) 1286 and not np.issubdtype(var.dtype, np.datetime64) 1287 and not np.issubdtype(var.dtype, np.bool_) 1288 and not coding.strings.is_unicode_dtype(var.dtype) 1289 and not var.dtype == object 1290 ): 1291 # and not re.match('^bytes[1-9]+$', var.dtype.name)): 1292 raise ValueError( 1293 "Invalid dtype for data variable: {} " 1294 "dtype must be a subtype of number, " 1295 "datetime, bool, a fixed sized string, " 1296 "a fixed size unicode string or an " 1297 "object".format(var) 1298 ) 1299 1300 for k in dataset.data_vars.values(): 1301 check_dtype(k) 1302 1303 1304def to_zarr( 1305 dataset: Dataset, 1306 store: Union[MutableMapping, str, os.PathLike] = None, 1307 chunk_store=None, 1308 mode: str = None, 1309 synchronizer=None, 1310 group: str = None, 1311 encoding: Mapping = None, 1312 compute: bool = True, 1313 consolidated: Optional[bool] = None, 1314 append_dim: Hashable = None, 1315 region: Mapping[str, slice] = None, 1316 safe_chunks: bool = True, 1317 storage_options: Dict[str, str] = None, 1318): 1319 """This function creates an appropriate datastore for writing a dataset to 1320 a zarr ztore 1321 1322 See `Dataset.to_zarr` for full API docs. 1323 """ 1324 1325 # Load empty arrays to avoid bug saving zero length dimensions (Issue #5741) 1326 for v in dataset.variables.values(): 1327 if v.size == 0: 1328 v.load() 1329 1330 # expand str and path-like arguments 1331 store = _normalize_path(store) 1332 chunk_store = _normalize_path(chunk_store) 1333 1334 if storage_options is None: 1335 mapper = store 1336 chunk_mapper = chunk_store 1337 else: 1338 from fsspec import get_mapper 1339 1340 if not isinstance(store, str): 1341 raise ValueError( 1342 f"store must be a string to use storage_options. Got {type(store)}" 1343 ) 1344 mapper = get_mapper(store, **storage_options) 1345 if chunk_store is not None: 1346 chunk_mapper = get_mapper(chunk_store, **storage_options) 1347 else: 1348 chunk_mapper = chunk_store 1349 1350 if encoding is None: 1351 encoding = {} 1352 1353 if mode is None: 1354 if append_dim is not None: 1355 mode = "a" 1356 elif region is not None: 1357 mode = "r+" 1358 else: 1359 mode = "w-" 1360 1361 if mode != "a" and append_dim is not None: 1362 raise ValueError("cannot set append_dim unless mode='a' or mode=None") 1363 1364 if mode not in ["a", "r+"] and region is not None: 1365 raise ValueError("cannot set region unless mode='a', mode='r+' or mode=None") 1366 1367 if mode not in ["w", "w-", "a", "r+"]: 1368 raise ValueError( 1369 "The only supported options for mode are 'w', " 1370 f"'w-', 'a' and 'r+', but mode={mode!r}" 1371 ) 1372 1373 # validate Dataset keys, DataArray names, and attr keys/values 1374 _validate_dataset_names(dataset) 1375 _validate_attrs(dataset) 1376 1377 if region is not None: 1378 _validate_region(dataset, region) 1379 if append_dim is not None and append_dim in region: 1380 raise ValueError( 1381 f"cannot list the same dimension in both ``append_dim`` and " 1382 f"``region`` with to_zarr(), got {append_dim} in both" 1383 ) 1384 1385 if mode == "r+": 1386 already_consolidated = consolidated 1387 consolidate_on_close = False 1388 else: 1389 already_consolidated = False 1390 consolidate_on_close = consolidated or consolidated is None 1391 zstore = backends.ZarrStore.open_group( 1392 store=mapper, 1393 mode=mode, 1394 synchronizer=synchronizer, 1395 group=group, 1396 consolidated=already_consolidated, 1397 consolidate_on_close=consolidate_on_close, 1398 chunk_store=chunk_mapper, 1399 append_dim=append_dim, 1400 write_region=region, 1401 safe_chunks=safe_chunks, 1402 stacklevel=4, # for Dataset.to_zarr() 1403 ) 1404 1405 if mode in ["a", "r+"]: 1406 _validate_datatypes_for_zarr_append(dataset) 1407 if append_dim is not None: 1408 existing_dims = zstore.get_dimensions() 1409 if append_dim not in existing_dims: 1410 raise ValueError( 1411 f"append_dim={append_dim!r} does not match any existing " 1412 f"dataset dimensions {existing_dims}" 1413 ) 1414 existing_var_names = set(zstore.zarr_group.array_keys()) 1415 for var_name in existing_var_names: 1416 if var_name in encoding.keys(): 1417 raise ValueError( 1418 f"variable {var_name!r} already exists, but encoding was provided" 1419 ) 1420 if mode == "r+": 1421 new_names = [k for k in dataset.variables if k not in existing_var_names] 1422 if new_names: 1423 raise ValueError( 1424 f"dataset contains non-pre-existing variables {new_names}, " 1425 "which is not allowed in ``xarray.Dataset.to_zarr()`` with " 1426 "mode='r+'. To allow writing new variables, set mode='a'." 1427 ) 1428 1429 writer = ArrayWriter() 1430 # TODO: figure out how to properly handle unlimited_dims 1431 dump_to_store(dataset, zstore, writer, encoding=encoding) 1432 writes = writer.sync(compute=compute) 1433 1434 if compute: 1435 _finalize_store(writes, zstore) 1436 else: 1437 import dask 1438 1439 return dask.delayed(_finalize_store)(writes, zstore) 1440 1441 return zstore 1442