1import os
2from glob import glob
3from io import BytesIO
4from numbers import Number
5from typing import (
6    TYPE_CHECKING,
7    Callable,
8    Dict,
9    Hashable,
10    Iterable,
11    Mapping,
12    MutableMapping,
13    Optional,
14    Tuple,
15    Union,
16)
17
18import numpy as np
19
20from .. import backends, coding, conventions
21from ..core import indexing
22from ..core.combine import (
23    _infer_concat_order_from_positions,
24    _nested_combine,
25    combine_by_coords,
26)
27from ..core.dataarray import DataArray
28from ..core.dataset import Dataset, _get_chunk, _maybe_chunk
29from ..core.utils import is_remote_uri
30from . import plugins
31from .common import AbstractDataStore, ArrayWriter, _normalize_path
32from .locks import _get_scheduler
33
34if TYPE_CHECKING:
35    try:
36        from dask.delayed import Delayed
37    except ImportError:
38        Delayed = None
39
40
41DATAARRAY_NAME = "__xarray_dataarray_name__"
42DATAARRAY_VARIABLE = "__xarray_dataarray_variable__"
43
44ENGINES = {
45    "netcdf4": backends.NetCDF4DataStore.open,
46    "scipy": backends.ScipyDataStore,
47    "pydap": backends.PydapDataStore.open,
48    "h5netcdf": backends.H5NetCDFStore.open,
49    "pynio": backends.NioDataStore,
50    "pseudonetcdf": backends.PseudoNetCDFDataStore.open,
51    "cfgrib": backends.CfGribDataStore,
52    "zarr": backends.ZarrStore.open_group,
53}
54
55
56def _get_default_engine_remote_uri():
57    try:
58        import netCDF4  # noqa: F401
59
60        engine = "netcdf4"
61    except ImportError:  # pragma: no cover
62        try:
63            import pydap  # noqa: F401
64
65            engine = "pydap"
66        except ImportError:
67            raise ValueError(
68                "netCDF4 or pydap is required for accessing "
69                "remote datasets via OPeNDAP"
70            )
71    return engine
72
73
74def _get_default_engine_gz():
75    try:
76        import scipy  # noqa: F401
77
78        engine = "scipy"
79    except ImportError:  # pragma: no cover
80        raise ValueError("scipy is required for accessing .gz files")
81    return engine
82
83
84def _get_default_engine_netcdf():
85    try:
86        import netCDF4  # noqa: F401
87
88        engine = "netcdf4"
89    except ImportError:  # pragma: no cover
90        try:
91            import scipy.io.netcdf  # noqa: F401
92
93            engine = "scipy"
94        except ImportError:
95            raise ValueError(
96                "cannot read or write netCDF files without "
97                "netCDF4-python or scipy installed"
98            )
99    return engine
100
101
102def _get_default_engine(path: str, allow_remote: bool = False):
103    if allow_remote and is_remote_uri(path):
104        return _get_default_engine_remote_uri()
105    elif path.endswith(".gz"):
106        return _get_default_engine_gz()
107    else:
108        return _get_default_engine_netcdf()
109
110
111def _validate_dataset_names(dataset):
112    """DataArray.name and Dataset keys must be a string or None"""
113
114    def check_name(name):
115        if isinstance(name, str):
116            if not name:
117                raise ValueError(
118                    f"Invalid name {name!r} for DataArray or Dataset key: "
119                    "string must be length 1 or greater for "
120                    "serialization to netCDF files"
121                )
122        elif name is not None:
123            raise TypeError(
124                f"Invalid name {name!r} for DataArray or Dataset key: "
125                "must be either a string or None for serialization to netCDF "
126                "files"
127            )
128
129    for k in dataset.variables:
130        check_name(k)
131
132
133def _validate_attrs(dataset, invalid_netcdf=False):
134    """`attrs` must have a string key and a value which is either: a number,
135    a string, an ndarray, a list/tuple of numbers/strings, or a numpy.bool_.
136
137    Notes
138    -----
139    A numpy.bool_ is only allowed when using the h5netcdf engine with
140    `invalid_netcdf=True`.
141    """
142
143    valid_types = (str, Number, np.ndarray, np.number, list, tuple)
144    if invalid_netcdf:
145        valid_types += (np.bool_,)
146
147    def check_attr(name, value, valid_types):
148        if isinstance(name, str):
149            if not name:
150                raise ValueError(
151                    f"Invalid name for attr {name!r}: string must be "
152                    "length 1 or greater for serialization to "
153                    "netCDF files"
154                )
155        else:
156            raise TypeError(
157                f"Invalid name for attr: {name!r} must be a string for "
158                "serialization to netCDF files"
159            )
160
161        if not isinstance(value, valid_types):
162            raise TypeError(
163                f"Invalid value for attr {name!r}: {value!r}. For serialization to "
164                "netCDF files, its value must be of one of the following types: "
165                f"{', '.join([vtype.__name__ for vtype in valid_types])}"
166            )
167
168    # Check attrs on the dataset itself
169    for k, v in dataset.attrs.items():
170        check_attr(k, v, valid_types)
171
172    # Check attrs on each variable within the dataset
173    for variable in dataset.variables.values():
174        for k, v in variable.attrs.items():
175            check_attr(k, v, valid_types)
176
177
178def _resolve_decoders_kwargs(decode_cf, open_backend_dataset_parameters, **decoders):
179    for d in list(decoders):
180        if decode_cf is False and d in open_backend_dataset_parameters:
181            decoders[d] = False
182        if decoders[d] is None:
183            decoders.pop(d)
184    return decoders
185
186
187def _get_mtime(filename_or_obj):
188    # if passed an actual file path, augment the token with
189    # the file modification time
190    mtime = None
191
192    try:
193        path = os.fspath(filename_or_obj)
194    except TypeError:
195        path = None
196
197    if path and not is_remote_uri(path):
198        mtime = os.path.getmtime(filename_or_obj)
199
200    return mtime
201
202
203def _protect_dataset_variables_inplace(dataset, cache):
204    for name, variable in dataset.variables.items():
205        if name not in variable.dims:
206            # no need to protect IndexVariable objects
207            data = indexing.CopyOnWriteArray(variable._data)
208            if cache:
209                data = indexing.MemoryCachedArray(data)
210            variable.data = data
211
212
213def _finalize_store(write, store):
214    """Finalize this store by explicitly syncing and closing"""
215    del write  # ensure writing is done first
216    store.close()
217
218
219def load_dataset(filename_or_obj, **kwargs):
220    """Open, load into memory, and close a Dataset from a file or file-like
221    object.
222
223    This is a thin wrapper around :py:meth:`~xarray.open_dataset`. It differs
224    from `open_dataset` in that it loads the Dataset into memory, closes the
225    file, and returns the Dataset. In contrast, `open_dataset` keeps the file
226    handle open and lazy loads its contents. All parameters are passed directly
227    to `open_dataset`. See that documentation for further details.
228
229    Returns
230    -------
231    dataset : Dataset
232        The newly created Dataset.
233
234    See Also
235    --------
236    open_dataset
237    """
238    if "cache" in kwargs:
239        raise TypeError("cache has no effect in this context")
240
241    with open_dataset(filename_or_obj, **kwargs) as ds:
242        return ds.load()
243
244
245def load_dataarray(filename_or_obj, **kwargs):
246    """Open, load into memory, and close a DataArray from a file or file-like
247    object containing a single data variable.
248
249    This is a thin wrapper around :py:meth:`~xarray.open_dataarray`. It differs
250    from `open_dataarray` in that it loads the Dataset into memory, closes the
251    file, and returns the Dataset. In contrast, `open_dataarray` keeps the file
252    handle open and lazy loads its contents. All parameters are passed directly
253    to `open_dataarray`. See that documentation for further details.
254
255    Returns
256    -------
257    datarray : DataArray
258        The newly created DataArray.
259
260    See Also
261    --------
262    open_dataarray
263    """
264    if "cache" in kwargs:
265        raise TypeError("cache has no effect in this context")
266
267    with open_dataarray(filename_or_obj, **kwargs) as da:
268        return da.load()
269
270
271def _chunk_ds(
272    backend_ds,
273    filename_or_obj,
274    engine,
275    chunks,
276    overwrite_encoded_chunks,
277    **extra_tokens,
278):
279    from dask.base import tokenize
280
281    mtime = _get_mtime(filename_or_obj)
282    token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
283    name_prefix = f"open_dataset-{token}"
284
285    variables = {}
286    for name, var in backend_ds.variables.items():
287        var_chunks = _get_chunk(var, chunks)
288        variables[name] = _maybe_chunk(
289            name,
290            var,
291            var_chunks,
292            overwrite_encoded_chunks=overwrite_encoded_chunks,
293            name_prefix=name_prefix,
294            token=token,
295        )
296    return backend_ds._replace(variables)
297
298
299def _dataset_from_backend_dataset(
300    backend_ds,
301    filename_or_obj,
302    engine,
303    chunks,
304    cache,
305    overwrite_encoded_chunks,
306    **extra_tokens,
307):
308    if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}:
309        raise ValueError(
310            f"chunks must be an int, dict, 'auto', or None. Instead found {chunks}."
311        )
312
313    _protect_dataset_variables_inplace(backend_ds, cache)
314    if chunks is None:
315        ds = backend_ds
316    else:
317        ds = _chunk_ds(
318            backend_ds,
319            filename_or_obj,
320            engine,
321            chunks,
322            overwrite_encoded_chunks,
323            **extra_tokens,
324        )
325
326    ds.set_close(backend_ds._close)
327
328    # Ensure source filename always stored in dataset object (GH issue #2550)
329    if "source" not in ds.encoding and isinstance(filename_or_obj, str):
330        ds.encoding["source"] = filename_or_obj
331
332    return ds
333
334
335def open_dataset(
336    filename_or_obj,
337    *args,
338    engine=None,
339    chunks=None,
340    cache=None,
341    decode_cf=None,
342    mask_and_scale=None,
343    decode_times=None,
344    decode_timedelta=None,
345    use_cftime=None,
346    concat_characters=None,
347    decode_coords=None,
348    drop_variables=None,
349    backend_kwargs=None,
350    **kwargs,
351):
352    """Open and decode a dataset from a file or file-like object.
353
354    Parameters
355    ----------
356    filename_or_obj : str, Path, file-like or DataStore
357        Strings and Path objects are interpreted as a path to a netCDF file
358        or an OpenDAP URL and opened with python-netCDF4, unless the filename
359        ends with .gz, in which case the file is gunzipped and opened with
360        scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
361        objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
362    engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \
363        "pseudonetcdf", "zarr"} or subclass of xarray.backends.BackendEntrypoint, optional
364        Engine to use when reading files. If not provided, the default engine
365        is chosen based on available dependencies, with a preference for
366        "netcdf4". A custom backend class (a subclass of ``BackendEntrypoint``)
367        can also be used.
368    chunks : int or dict, optional
369        If chunks is provided, it is used to load the new dataset into dask
370        arrays. ``chunks=-1`` loads the dataset with dask using a single
371        chunk for all arrays. ``chunks={}`` loads the dataset with dask using
372        engine preferred chunks if exposed by the backend, otherwise with
373        a single chunk for all arrays.
374        ``chunks='auto'`` will use dask ``auto`` chunking taking into account the
375        engine preferred chunks. See dask chunking for more details.
376    cache : bool, optional
377        If True, cache data loaded from the underlying datastore in memory as
378        NumPy arrays when accessed to avoid reading from the underlying data-
379        store multiple times. Defaults to True unless you specify the `chunks`
380        argument to use dask, in which case it defaults to False. Does not
381        change the behavior of coordinates corresponding to dimensions, which
382        always load their data from disk into a ``pandas.Index``.
383    decode_cf : bool, optional
384        Whether to decode these variables, assuming they were saved according
385        to CF conventions.
386    mask_and_scale : bool, optional
387        If True, replace array values equal to `_FillValue` with NA and scale
388        values according to the formula `original_values * scale_factor +
389        add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are
390        taken from variable attributes (if they exist).  If the `_FillValue` or
391        `missing_value` attribute contains multiple values a warning will be
392        issued and all array values matching one of the multiple values will
393        be replaced by NA. mask_and_scale defaults to True except for the
394        pseudonetcdf backend. This keyword may not be supported by all the backends.
395    decode_times : bool, optional
396        If True, decode times encoded in the standard NetCDF datetime format
397        into datetime objects. Otherwise, leave them encoded as numbers.
398        This keyword may not be supported by all the backends.
399    decode_timedelta : bool, optional
400        If True, decode variables and coordinates with time units in
401        {"days", "hours", "minutes", "seconds", "milliseconds", "microseconds"}
402        into timedelta objects. If False, leave them encoded as numbers.
403        If None (default), assume the same value of decode_time.
404        This keyword may not be supported by all the backends.
405    use_cftime: bool, optional
406        Only relevant if encoded dates come from a standard calendar
407        (e.g. "gregorian", "proleptic_gregorian", "standard", or not
408        specified).  If None (default), attempt to decode times to
409        ``np.datetime64[ns]`` objects; if this is not possible, decode times to
410        ``cftime.datetime`` objects. If True, always decode times to
411        ``cftime.datetime`` objects, regardless of whether or not they can be
412        represented using ``np.datetime64[ns]`` objects.  If False, always
413        decode times to ``np.datetime64[ns]`` objects; if this is not possible
414        raise an error. This keyword may not be supported by all the backends.
415    concat_characters : bool, optional
416        If True, concatenate along the last dimension of character arrays to
417        form string arrays. Dimensions will only be concatenated over (and
418        removed) if they have no corresponding variable and if they are only
419        used as the last dimension of character arrays.
420        This keyword may not be supported by all the backends.
421    decode_coords : bool or {"coordinates", "all"}, optional
422        Controls which variables are set as coordinate variables:
423
424        - "coordinates" or True: Set variables referred to in the
425          ``'coordinates'`` attribute of the datasets or individual variables
426          as coordinate variables.
427        - "all": Set variables referred to in  ``'grid_mapping'``, ``'bounds'`` and
428          other attributes as coordinate variables.
429    drop_variables: str or iterable, optional
430        A variable or list of variables to exclude from being parsed from the
431        dataset. This may be useful to drop variables with problems or
432        inconsistent values.
433    backend_kwargs: dict
434        Additional keyword arguments passed on to the engine open function,
435        equivalent to `**kwargs`.
436    **kwargs: dict
437        Additional keyword arguments passed on to the engine open function.
438        For example:
439
440        - 'group': path to the netCDF4 group in the given file to open given as
441          a str,supported by "netcdf4", "h5netcdf", "zarr".
442        - 'lock': resource lock to use when reading data from disk. Only
443          relevant when using dask or another form of parallelism. By default,
444          appropriate locks are chosen to safely read and write files with the
445          currently active dask scheduler. Supported by "netcdf4", "h5netcdf",
446          "scipy", "pynio", "pseudonetcdf", "cfgrib".
447
448        See engine open function for kwargs accepted by each specific engine.
449
450    Returns
451    -------
452    dataset : Dataset
453        The newly created dataset.
454
455    Notes
456    -----
457    ``open_dataset`` opens the file with read-only access. When you modify
458    values of a Dataset, even one linked to files on disk, only the in-memory
459    copy you are manipulating in xarray is modified: the original file on disk
460    is never touched.
461
462    See Also
463    --------
464    open_mfdataset
465    """
466    if len(args) > 0:
467        raise TypeError(
468            "open_dataset() takes only 1 positional argument starting from version 0.18.0, "
469            "all other options must be passed as keyword arguments"
470        )
471
472    if cache is None:
473        cache = chunks is None
474
475    if backend_kwargs is not None:
476        kwargs.update(backend_kwargs)
477
478    if engine is None:
479        engine = plugins.guess_engine(filename_or_obj)
480
481    backend = plugins.get_backend(engine)
482
483    decoders = _resolve_decoders_kwargs(
484        decode_cf,
485        open_backend_dataset_parameters=backend.open_dataset_parameters,
486        mask_and_scale=mask_and_scale,
487        decode_times=decode_times,
488        decode_timedelta=decode_timedelta,
489        concat_characters=concat_characters,
490        use_cftime=use_cftime,
491        decode_coords=decode_coords,
492    )
493
494    overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
495    backend_ds = backend.open_dataset(
496        filename_or_obj,
497        drop_variables=drop_variables,
498        **decoders,
499        **kwargs,
500    )
501    ds = _dataset_from_backend_dataset(
502        backend_ds,
503        filename_or_obj,
504        engine,
505        chunks,
506        cache,
507        overwrite_encoded_chunks,
508        drop_variables=drop_variables,
509        **decoders,
510        **kwargs,
511    )
512    return ds
513
514
515def open_dataarray(
516    filename_or_obj,
517    *args,
518    engine=None,
519    chunks=None,
520    cache=None,
521    decode_cf=None,
522    mask_and_scale=None,
523    decode_times=None,
524    decode_timedelta=None,
525    use_cftime=None,
526    concat_characters=None,
527    decode_coords=None,
528    drop_variables=None,
529    backend_kwargs=None,
530    **kwargs,
531):
532    """Open an DataArray from a file or file-like object containing a single
533    data variable.
534
535    This is designed to read netCDF files with only one data variable. If
536    multiple variables are present then a ValueError is raised.
537
538    Parameters
539    ----------
540    filename_or_obj : str, Path, file-like or DataStore
541        Strings and Path objects are interpreted as a path to a netCDF file
542        or an OpenDAP URL and opened with python-netCDF4, unless the filename
543        ends with .gz, in which case the file is gunzipped and opened with
544        scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
545        objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
546    engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", \
547        "pseudonetcdf", "zarr"}, optional
548        Engine to use when reading files. If not provided, the default engine
549        is chosen based on available dependencies, with a preference for
550        "netcdf4".
551    chunks : int or dict, optional
552        If chunks is provided, it is used to load the new dataset into dask
553        arrays. ``chunks=-1`` loads the dataset with dask using a single
554        chunk for all arrays. `chunks={}`` loads the dataset with dask using
555        engine preferred chunks if exposed by the backend, otherwise with
556        a single chunk for all arrays.
557        ``chunks='auto'`` will use dask ``auto`` chunking taking into account the
558        engine preferred chunks. See dask chunking for more details.
559    cache : bool, optional
560        If True, cache data loaded from the underlying datastore in memory as
561        NumPy arrays when accessed to avoid reading from the underlying data-
562        store multiple times. Defaults to True unless you specify the `chunks`
563        argument to use dask, in which case it defaults to False. Does not
564        change the behavior of coordinates corresponding to dimensions, which
565        always load their data from disk into a ``pandas.Index``.
566    decode_cf : bool, optional
567        Whether to decode these variables, assuming they were saved according
568        to CF conventions.
569    mask_and_scale : bool, optional
570        If True, replace array values equal to `_FillValue` with NA and scale
571        values according to the formula `original_values * scale_factor +
572        add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are
573        taken from variable attributes (if they exist).  If the `_FillValue` or
574        `missing_value` attribute contains multiple values a warning will be
575        issued and all array values matching one of the multiple values will
576        be replaced by NA. mask_and_scale defaults to True except for the
577        pseudonetcdf backend. This keyword may not be supported by all the backends.
578    decode_times : bool, optional
579        If True, decode times encoded in the standard NetCDF datetime format
580        into datetime objects. Otherwise, leave them encoded as numbers.
581        This keyword may not be supported by all the backends.
582    decode_timedelta : bool, optional
583        If True, decode variables and coordinates with time units in
584        {"days", "hours", "minutes", "seconds", "milliseconds", "microseconds"}
585        into timedelta objects. If False, leave them encoded as numbers.
586        If None (default), assume the same value of decode_time.
587        This keyword may not be supported by all the backends.
588    use_cftime: bool, optional
589        Only relevant if encoded dates come from a standard calendar
590        (e.g. "gregorian", "proleptic_gregorian", "standard", or not
591        specified).  If None (default), attempt to decode times to
592        ``np.datetime64[ns]`` objects; if this is not possible, decode times to
593        ``cftime.datetime`` objects. If True, always decode times to
594        ``cftime.datetime`` objects, regardless of whether or not they can be
595        represented using ``np.datetime64[ns]`` objects.  If False, always
596        decode times to ``np.datetime64[ns]`` objects; if this is not possible
597        raise an error. This keyword may not be supported by all the backends.
598    concat_characters : bool, optional
599        If True, concatenate along the last dimension of character arrays to
600        form string arrays. Dimensions will only be concatenated over (and
601        removed) if they have no corresponding variable and if they are only
602        used as the last dimension of character arrays.
603        This keyword may not be supported by all the backends.
604    decode_coords : bool or {"coordinates", "all"}, optional
605        Controls which variables are set as coordinate variables:
606
607        - "coordinates" or True: Set variables referred to in the
608          ``'coordinates'`` attribute of the datasets or individual variables
609          as coordinate variables.
610        - "all": Set variables referred to in  ``'grid_mapping'``, ``'bounds'`` and
611          other attributes as coordinate variables.
612    drop_variables: str or iterable, optional
613        A variable or list of variables to exclude from being parsed from the
614        dataset. This may be useful to drop variables with problems or
615        inconsistent values.
616    backend_kwargs: dict
617        Additional keyword arguments passed on to the engine open function,
618        equivalent to `**kwargs`.
619    **kwargs: dict
620        Additional keyword arguments passed on to the engine open function.
621        For example:
622
623        - 'group': path to the netCDF4 group in the given file to open given as
624          a str,supported by "netcdf4", "h5netcdf", "zarr".
625        - 'lock': resource lock to use when reading data from disk. Only
626          relevant when using dask or another form of parallelism. By default,
627          appropriate locks are chosen to safely read and write files with the
628          currently active dask scheduler. Supported by "netcdf4", "h5netcdf",
629          "scipy", "pynio", "pseudonetcdf", "cfgrib".
630
631        See engine open function for kwargs accepted by each specific engine.
632
633    Notes
634    -----
635    This is designed to be fully compatible with `DataArray.to_netcdf`. Saving
636    using `DataArray.to_netcdf` and then loading with this function will
637    produce an identical result.
638
639    All parameters are passed directly to `xarray.open_dataset`. See that
640    documentation for further details.
641
642    See also
643    --------
644    open_dataset
645    """
646    if len(args) > 0:
647        raise TypeError(
648            "open_dataarray() takes only 1 positional argument starting from version 0.18.0, "
649            "all other options must be passed as keyword arguments"
650        )
651
652    dataset = open_dataset(
653        filename_or_obj,
654        decode_cf=decode_cf,
655        mask_and_scale=mask_and_scale,
656        decode_times=decode_times,
657        concat_characters=concat_characters,
658        decode_coords=decode_coords,
659        engine=engine,
660        chunks=chunks,
661        cache=cache,
662        drop_variables=drop_variables,
663        backend_kwargs=backend_kwargs,
664        use_cftime=use_cftime,
665        decode_timedelta=decode_timedelta,
666        **kwargs,
667    )
668
669    if len(dataset.data_vars) != 1:
670        raise ValueError(
671            "Given file dataset contains more than one data "
672            "variable. Please read with xarray.open_dataset and "
673            "then select the variable you want."
674        )
675    else:
676        (data_array,) = dataset.data_vars.values()
677
678    data_array.set_close(dataset._close)
679
680    # Reset names if they were changed during saving
681    # to ensure that we can 'roundtrip' perfectly
682    if DATAARRAY_NAME in dataset.attrs:
683        data_array.name = dataset.attrs[DATAARRAY_NAME]
684        del dataset.attrs[DATAARRAY_NAME]
685
686    if data_array.name == DATAARRAY_VARIABLE:
687        data_array.name = None
688
689    return data_array
690
691
692def open_mfdataset(
693    paths,
694    chunks=None,
695    concat_dim=None,
696    compat="no_conflicts",
697    preprocess=None,
698    engine=None,
699    data_vars="all",
700    coords="different",
701    combine="by_coords",
702    parallel=False,
703    join="outer",
704    attrs_file=None,
705    combine_attrs="override",
706    **kwargs,
707):
708    """Open multiple files as a single dataset.
709
710    If combine='by_coords' then the function ``combine_by_coords`` is used to combine
711    the datasets into one before returning the result, and if combine='nested' then
712    ``combine_nested`` is used. The filepaths must be structured according to which
713    combining function is used, the details of which are given in the documentation for
714    ``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'``
715    will be used. Requires dask to be installed. See documentation for
716    details on dask [1]_. Global attributes from the ``attrs_file`` are used
717    for the combined dataset.
718
719    Parameters
720    ----------
721    paths : str or sequence
722        Either a string glob in the form ``"path/to/my/files/*.nc"`` or an explicit list of
723        files to open. Paths can be given as strings or as pathlib Paths. If
724        concatenation along more than one dimension is desired, then ``paths`` must be a
725        nested list-of-lists (see ``combine_nested`` for details). (A string glob will
726        be expanded to a 1-dimensional list.)
727    chunks : int or dict, optional
728        Dictionary with keys given by dimension names and values given by chunk sizes.
729        In general, these should divide the dimensions of each dataset. If int, chunk
730        each dimension by ``chunks``. By default, chunks will be chosen to load entire
731        input files into memory at once. This has a major impact on performance: please
732        see the full documentation for more details [2]_.
733    concat_dim : str, or list of str, DataArray, Index or None, optional
734        Dimensions to concatenate files along.  You only need to provide this argument
735        if ``combine='nested'``, and if any of the dimensions along which you want to
736        concatenate is not a dimension in the original datasets, e.g., if you want to
737        stack a collection of 2D arrays along a third dimension. Set
738        ``concat_dim=[..., None, ...]`` explicitly to disable concatenation along a
739        particular dimension. Default is None, which for a 1D list of filepaths is
740        equivalent to opening the files separately and then merging them with
741        ``xarray.merge``.
742    combine : {"by_coords", "nested"}, optional
743        Whether ``xarray.combine_by_coords`` or ``xarray.combine_nested`` is used to
744        combine all the data. Default is to use ``xarray.combine_by_coords``.
745    compat : {"identical", "equals", "broadcast_equals", \
746              "no_conflicts", "override"}, optional
747        String indicating how to compare variables of the same name for
748        potential conflicts when merging:
749
750         * "broadcast_equals": all values must be equal when variables are
751           broadcast against each other to ensure common dimensions.
752         * "equals": all values and dimensions must be the same.
753         * "identical": all values, dimensions and attributes must be the
754           same.
755         * "no_conflicts": only values which are not null in both datasets
756           must be equal. The returned dataset then contains the combination
757           of all non-null values.
758         * "override": skip comparing and pick variable from first dataset
759
760    preprocess : callable, optional
761        If provided, call this function on each dataset prior to concatenation.
762        You can find the file-name from which each dataset was loaded in
763        ``ds.encoding["source"]``.
764    engine : {"netcdf4", "scipy", "pydap", "h5netcdf", "pynio", "cfgrib", "zarr"}, \
765        optional
766        Engine to use when reading files. If not provided, the default engine
767        is chosen based on available dependencies, with a preference for
768        "netcdf4".
769    data_vars : {"minimal", "different", "all"} or list of str, optional
770        These data variables will be concatenated together:
771          * "minimal": Only data variables in which the dimension already
772            appears are included.
773          * "different": Data variables which are not equal (ignoring
774            attributes) across all datasets are also concatenated (as well as
775            all for which dimension already appears). Beware: this option may
776            load the data payload of data variables into memory if they are not
777            already loaded.
778          * "all": All data variables will be concatenated.
779          * list of str: The listed data variables will be concatenated, in
780            addition to the "minimal" data variables.
781    coords : {"minimal", "different", "all"} or list of str, optional
782        These coordinate variables will be concatenated together:
783         * "minimal": Only coordinates in which the dimension already appears
784           are included.
785         * "different": Coordinates which are not equal (ignoring attributes)
786           across all datasets are also concatenated (as well as all for which
787           dimension already appears). Beware: this option may load the data
788           payload of coordinate variables into memory if they are not already
789           loaded.
790         * "all": All coordinate variables will be concatenated, except
791           those corresponding to other dimensions.
792         * list of str: The listed coordinate variables will be concatenated,
793           in addition the "minimal" coordinates.
794    parallel : bool, optional
795        If True, the open and preprocess steps of this function will be
796        performed in parallel using ``dask.delayed``. Default is False.
797    join : {"outer", "inner", "left", "right", "exact, "override"}, optional
798        String indicating how to combine differing indexes
799        (excluding concat_dim) in objects
800
801        - "outer": use the union of object indexes
802        - "inner": use the intersection of object indexes
803        - "left": use indexes from the first object with each dimension
804        - "right": use indexes from the last object with each dimension
805        - "exact": instead of aligning, raise `ValueError` when indexes to be
806          aligned are not equal
807        - "override": if indexes are of same size, rewrite indexes to be
808          those of the first object with that dimension. Indexes for the same
809          dimension must have the same size in all objects.
810    attrs_file : str or path-like, optional
811        Path of the file used to read global attributes from.
812        By default global attributes are read from the first file provided,
813        with wildcard matches sorted by filename.
814    **kwargs : optional
815        Additional arguments passed on to :py:func:`xarray.open_dataset`.
816
817    Returns
818    -------
819    xarray.Dataset
820
821    Notes
822    -----
823    ``open_mfdataset`` opens files with read-only access. When you modify values
824    of a Dataset, even one linked to files on disk, only the in-memory copy you
825    are manipulating in xarray is modified: the original file on disk is never
826    touched.
827
828    See Also
829    --------
830    combine_by_coords
831    combine_nested
832    open_dataset
833
834    References
835    ----------
836
837    .. [1] http://xarray.pydata.org/en/stable/dask.html
838    .. [2] http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance
839    """
840    if isinstance(paths, str):
841        if is_remote_uri(paths) and engine == "zarr":
842            try:
843                from fsspec.core import get_fs_token_paths
844            except ImportError as e:
845                raise ImportError(
846                    "The use of remote URLs for opening zarr requires the package fsspec"
847                ) from e
848
849            fs, _, _ = get_fs_token_paths(
850                paths,
851                mode="rb",
852                storage_options=kwargs.get("backend_kwargs", {}).get(
853                    "storage_options", {}
854                ),
855                expand=False,
856            )
857            paths = fs.glob(fs._strip_protocol(paths))  # finds directories
858            paths = [fs.get_mapper(path) for path in paths]
859        elif is_remote_uri(paths):
860            raise ValueError(
861                "cannot do wild-card matching for paths that are remote URLs "
862                f"unless engine='zarr' is specified. Got paths: {paths}. "
863                "Instead, supply paths as an explicit list of strings."
864            )
865        else:
866            paths = sorted(glob(_normalize_path(paths)))
867    elif isinstance(paths, os.PathLike):
868        paths = [os.fspath(paths)]
869    else:
870        paths = [os.fspath(p) if isinstance(p, os.PathLike) else p for p in paths]
871
872    if not paths:
873        raise OSError("no files to open")
874
875    if combine == "nested":
876        if isinstance(concat_dim, (str, DataArray)) or concat_dim is None:
877            concat_dim = [concat_dim]
878
879        # This creates a flat list which is easier to iterate over, whilst
880        # encoding the originally-supplied structure as "ids".
881        # The "ids" are not used at all if combine='by_coords`.
882        combined_ids_paths = _infer_concat_order_from_positions(paths)
883        ids, paths = (
884            list(combined_ids_paths.keys()),
885            list(combined_ids_paths.values()),
886        )
887    elif combine == "by_coords" and concat_dim is not None:
888        raise ValueError(
889            "When combine='by_coords', passing a value for `concat_dim` has no "
890            "effect. To manually combine along a specific dimension you should "
891            "instead specify combine='nested' along with a value for `concat_dim`.",
892        )
893
894    open_kwargs = dict(engine=engine, chunks=chunks or {}, **kwargs)
895
896    if parallel:
897        import dask
898
899        # wrap the open_dataset, getattr, and preprocess with delayed
900        open_ = dask.delayed(open_dataset)
901        getattr_ = dask.delayed(getattr)
902        if preprocess is not None:
903            preprocess = dask.delayed(preprocess)
904    else:
905        open_ = open_dataset
906        getattr_ = getattr
907
908    datasets = [open_(p, **open_kwargs) for p in paths]
909    closers = [getattr_(ds, "_close") for ds in datasets]
910    if preprocess is not None:
911        datasets = [preprocess(ds) for ds in datasets]
912
913    if parallel:
914        # calling compute here will return the datasets/file_objs lists,
915        # the underlying datasets will still be stored as dask arrays
916        datasets, closers = dask.compute(datasets, closers)
917
918    # Combine all datasets, closing them in case of a ValueError
919    try:
920        if combine == "nested":
921            # Combined nested list by successive concat and merge operations
922            # along each dimension, using structure given by "ids"
923            combined = _nested_combine(
924                datasets,
925                concat_dims=concat_dim,
926                compat=compat,
927                data_vars=data_vars,
928                coords=coords,
929                ids=ids,
930                join=join,
931                combine_attrs=combine_attrs,
932            )
933        elif combine == "by_coords":
934            # Redo ordering from coordinates, ignoring how they were ordered
935            # previously
936            combined = combine_by_coords(
937                datasets,
938                compat=compat,
939                data_vars=data_vars,
940                coords=coords,
941                join=join,
942                combine_attrs=combine_attrs,
943            )
944        else:
945            raise ValueError(
946                "{} is an invalid option for the keyword argument"
947                " ``combine``".format(combine)
948            )
949    except ValueError:
950        for ds in datasets:
951            ds.close()
952        raise
953
954    def multi_file_closer():
955        for closer in closers:
956            closer()
957
958    combined.set_close(multi_file_closer)
959
960    # read global attributes from the attrs_file or from the first dataset
961    if attrs_file is not None:
962        if isinstance(attrs_file, os.PathLike):
963            attrs_file = os.fspath(attrs_file)
964        combined.attrs = datasets[paths.index(attrs_file)].attrs
965
966    return combined
967
968
969WRITEABLE_STORES: Dict[str, Callable] = {
970    "netcdf4": backends.NetCDF4DataStore.open,
971    "scipy": backends.ScipyDataStore,
972    "h5netcdf": backends.H5NetCDFStore.open,
973}
974
975
976def to_netcdf(
977    dataset: Dataset,
978    path_or_file=None,
979    mode: str = "w",
980    format: str = None,
981    group: str = None,
982    engine: str = None,
983    encoding: Mapping = None,
984    unlimited_dims: Iterable[Hashable] = None,
985    compute: bool = True,
986    multifile: bool = False,
987    invalid_netcdf: bool = False,
988) -> Union[Tuple[ArrayWriter, AbstractDataStore], bytes, "Delayed", None]:
989    """This function creates an appropriate datastore for writing a dataset to
990    disk as a netCDF file
991
992    See `Dataset.to_netcdf` for full API docs.
993
994    The ``multifile`` argument is only for the private use of save_mfdataset.
995    """
996    if isinstance(path_or_file, os.PathLike):
997        path_or_file = os.fspath(path_or_file)
998
999    if encoding is None:
1000        encoding = {}
1001
1002    if path_or_file is None:
1003        if engine is None:
1004            engine = "scipy"
1005        elif engine != "scipy":
1006            raise ValueError(
1007                "invalid engine for creating bytes with "
1008                f"to_netcdf: {engine!r}. Only the default engine "
1009                "or engine='scipy' is supported"
1010            )
1011        if not compute:
1012            raise NotImplementedError(
1013                "to_netcdf() with compute=False is not yet implemented when "
1014                "returning bytes"
1015            )
1016    elif isinstance(path_or_file, str):
1017        if engine is None:
1018            engine = _get_default_engine(path_or_file)
1019        path_or_file = _normalize_path(path_or_file)
1020    else:  # file-like object
1021        engine = "scipy"
1022
1023    # validate Dataset keys, DataArray names, and attr keys/values
1024    _validate_dataset_names(dataset)
1025    _validate_attrs(dataset, invalid_netcdf=invalid_netcdf and engine == "h5netcdf")
1026
1027    try:
1028        store_open = WRITEABLE_STORES[engine]
1029    except KeyError:
1030        raise ValueError(f"unrecognized engine for to_netcdf: {engine!r}")
1031
1032    if format is not None:
1033        format = format.upper()
1034
1035    # handle scheduler specific logic
1036    scheduler = _get_scheduler()
1037    have_chunks = any(v.chunks for v in dataset.variables.values())
1038
1039    autoclose = have_chunks and scheduler in ["distributed", "multiprocessing"]
1040    if autoclose and engine == "scipy":
1041        raise NotImplementedError(
1042            f"Writing netCDF files with the {engine} backend "
1043            f"is not currently supported with dask's {scheduler} scheduler"
1044        )
1045
1046    target = path_or_file if path_or_file is not None else BytesIO()
1047    kwargs = dict(autoclose=True) if autoclose else {}
1048    if invalid_netcdf:
1049        if engine == "h5netcdf":
1050            kwargs["invalid_netcdf"] = invalid_netcdf
1051        else:
1052            raise ValueError(
1053                f"unrecognized option 'invalid_netcdf' for engine {engine}"
1054            )
1055    store = store_open(target, mode, format, group, **kwargs)
1056
1057    if unlimited_dims is None:
1058        unlimited_dims = dataset.encoding.get("unlimited_dims", None)
1059    if unlimited_dims is not None:
1060        if isinstance(unlimited_dims, str) or not isinstance(unlimited_dims, Iterable):
1061            unlimited_dims = [unlimited_dims]
1062        else:
1063            unlimited_dims = list(unlimited_dims)
1064
1065    writer = ArrayWriter()
1066
1067    # TODO: figure out how to refactor this logic (here and in save_mfdataset)
1068    # to avoid this mess of conditionals
1069    try:
1070        # TODO: allow this work (setting up the file for writing array data)
1071        # to be parallelized with dask
1072        dump_to_store(
1073            dataset, store, writer, encoding=encoding, unlimited_dims=unlimited_dims
1074        )
1075        if autoclose:
1076            store.close()
1077
1078        if multifile:
1079            return writer, store
1080
1081        writes = writer.sync(compute=compute)
1082
1083        if path_or_file is None:
1084            store.sync()
1085            return target.getvalue()
1086    finally:
1087        if not multifile and compute:
1088            store.close()
1089
1090    if not compute:
1091        import dask
1092
1093        return dask.delayed(_finalize_store)(writes, store)
1094    return None
1095
1096
1097def dump_to_store(
1098    dataset, store, writer=None, encoder=None, encoding=None, unlimited_dims=None
1099):
1100    """Store dataset contents to a backends.*DataStore object."""
1101    if writer is None:
1102        writer = ArrayWriter()
1103
1104    if encoding is None:
1105        encoding = {}
1106
1107    variables, attrs = conventions.encode_dataset_coordinates(dataset)
1108
1109    check_encoding = set()
1110    for k, enc in encoding.items():
1111        # no need to shallow copy the variable again; that already happened
1112        # in encode_dataset_coordinates
1113        variables[k].encoding = enc
1114        check_encoding.add(k)
1115
1116    if encoder:
1117        variables, attrs = encoder(variables, attrs)
1118
1119    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
1120
1121
1122def save_mfdataset(
1123    datasets, paths, mode="w", format=None, groups=None, engine=None, compute=True
1124):
1125    """Write multiple datasets to disk as netCDF files simultaneously.
1126
1127    This function is intended for use with datasets consisting of dask.array
1128    objects, in which case it can write the multiple datasets to disk
1129    simultaneously using a shared thread pool.
1130
1131    When not using dask, it is no different than calling ``to_netcdf``
1132    repeatedly.
1133
1134    Parameters
1135    ----------
1136    datasets : list of Dataset
1137        List of datasets to save.
1138    paths : list of str or list of path-like objects
1139        List of paths to which to save each corresponding dataset.
1140    mode : {"w", "a"}, optional
1141        Write ("w") or append ("a") mode. If mode="w", any existing file at
1142        these locations will be overwritten.
1143    format : {"NETCDF4", "NETCDF4_CLASSIC", "NETCDF3_64BIT", \
1144              "NETCDF3_CLASSIC"}, optional
1145
1146        File format for the resulting netCDF file:
1147
1148        * NETCDF4: Data is stored in an HDF5 file, using netCDF4 API
1149          features.
1150        * NETCDF4_CLASSIC: Data is stored in an HDF5 file, using only
1151          netCDF 3 compatible API features.
1152        * NETCDF3_64BIT: 64-bit offset version of the netCDF 3 file format,
1153          which fully supports 2+ GB files, but is only compatible with
1154          clients linked against netCDF version 3.6.0 or later.
1155        * NETCDF3_CLASSIC: The classic netCDF 3 file format. It does not
1156          handle 2+ GB files very well.
1157
1158        All formats are supported by the netCDF4-python library.
1159        scipy.io.netcdf only supports the last two formats.
1160
1161        The default format is NETCDF4 if you are saving a file to disk and
1162        have the netCDF4-python library available. Otherwise, xarray falls
1163        back to using scipy to write netCDF files and defaults to the
1164        NETCDF3_64BIT format (scipy does not support netCDF4).
1165    groups : list of str, optional
1166        Paths to the netCDF4 group in each corresponding file to which to save
1167        datasets (only works for format="NETCDF4"). The groups will be created
1168        if necessary.
1169    engine : {"netcdf4", "scipy", "h5netcdf"}, optional
1170        Engine to use when writing netCDF files. If not provided, the
1171        default engine is chosen based on available dependencies, with a
1172        preference for "netcdf4" if writing to a file on disk.
1173        See `Dataset.to_netcdf` for additional information.
1174    compute : bool
1175        If true compute immediately, otherwise return a
1176        ``dask.delayed.Delayed`` object that can be computed later.
1177
1178    Examples
1179    --------
1180
1181    Save a dataset into one netCDF per year of data:
1182
1183    >>> ds = xr.Dataset(
1184    ...     {"a": ("time", np.linspace(0, 1, 48))},
1185    ...     coords={"time": pd.date_range("2010-01-01", freq="M", periods=48)},
1186    ... )
1187    >>> ds
1188    <xarray.Dataset>
1189    Dimensions:  (time: 48)
1190    Coordinates:
1191      * time     (time) datetime64[ns] 2010-01-31 2010-02-28 ... 2013-12-31
1192    Data variables:
1193        a        (time) float64 0.0 0.02128 0.04255 0.06383 ... 0.9574 0.9787 1.0
1194    >>> years, datasets = zip(*ds.groupby("time.year"))
1195    >>> paths = [f"{y}.nc" for y in years]
1196    >>> xr.save_mfdataset(datasets, paths)
1197    """
1198    if mode == "w" and len(set(paths)) < len(paths):
1199        raise ValueError(
1200            "cannot use mode='w' when writing multiple datasets to the same path"
1201        )
1202
1203    for obj in datasets:
1204        if not isinstance(obj, Dataset):
1205            raise TypeError(
1206                "save_mfdataset only supports writing Dataset "
1207                f"objects, received type {type(obj)}"
1208            )
1209
1210    if groups is None:
1211        groups = [None] * len(datasets)
1212
1213    if len({len(datasets), len(paths), len(groups)}) > 1:
1214        raise ValueError(
1215            "must supply lists of the same length for the "
1216            "datasets, paths and groups arguments to "
1217            "save_mfdataset"
1218        )
1219
1220    writers, stores = zip(
1221        *[
1222            to_netcdf(
1223                ds, path, mode, format, group, engine, compute=compute, multifile=True
1224            )
1225            for ds, path, group in zip(datasets, paths, groups)
1226        ]
1227    )
1228
1229    try:
1230        writes = [w.sync(compute=compute) for w in writers]
1231    finally:
1232        if compute:
1233            for store in stores:
1234                store.close()
1235
1236    if not compute:
1237        import dask
1238
1239        return dask.delayed(
1240            [dask.delayed(_finalize_store)(w, s) for w, s in zip(writes, stores)]
1241        )
1242
1243
1244def _validate_region(ds, region):
1245    if not isinstance(region, dict):
1246        raise TypeError(f"``region`` must be a dict, got {type(region)}")
1247
1248    for k, v in region.items():
1249        if k not in ds.dims:
1250            raise ValueError(
1251                f"all keys in ``region`` are not in Dataset dimensions, got "
1252                f"{list(region)} and {list(ds.dims)}"
1253            )
1254        if not isinstance(v, slice):
1255            raise TypeError(
1256                "all values in ``region`` must be slice objects, got "
1257                f"region={region}"
1258            )
1259        if v.step not in {1, None}:
1260            raise ValueError(
1261                "step on all slices in ``region`` must be 1 or None, got "
1262                f"region={region}"
1263            )
1264
1265    non_matching_vars = [
1266        k for k, v in ds.variables.items() if not set(region).intersection(v.dims)
1267    ]
1268    if non_matching_vars:
1269        raise ValueError(
1270            f"when setting `region` explicitly in to_zarr(), all "
1271            f"variables in the dataset to write must have at least "
1272            f"one dimension in common with the region's dimensions "
1273            f"{list(region.keys())}, but that is not "
1274            f"the case for some variables here. To drop these variables "
1275            f"from this dataset before exporting to zarr, write: "
1276            f".drop({non_matching_vars!r})"
1277        )
1278
1279
1280def _validate_datatypes_for_zarr_append(dataset):
1281    """DataArray.name and Dataset keys must be a string or None"""
1282
1283    def check_dtype(var):
1284        if (
1285            not np.issubdtype(var.dtype, np.number)
1286            and not np.issubdtype(var.dtype, np.datetime64)
1287            and not np.issubdtype(var.dtype, np.bool_)
1288            and not coding.strings.is_unicode_dtype(var.dtype)
1289            and not var.dtype == object
1290        ):
1291            # and not re.match('^bytes[1-9]+$', var.dtype.name)):
1292            raise ValueError(
1293                "Invalid dtype for data variable: {} "
1294                "dtype must be a subtype of number, "
1295                "datetime, bool, a fixed sized string, "
1296                "a fixed size unicode string or an "
1297                "object".format(var)
1298            )
1299
1300    for k in dataset.data_vars.values():
1301        check_dtype(k)
1302
1303
1304def to_zarr(
1305    dataset: Dataset,
1306    store: Union[MutableMapping, str, os.PathLike] = None,
1307    chunk_store=None,
1308    mode: str = None,
1309    synchronizer=None,
1310    group: str = None,
1311    encoding: Mapping = None,
1312    compute: bool = True,
1313    consolidated: Optional[bool] = None,
1314    append_dim: Hashable = None,
1315    region: Mapping[str, slice] = None,
1316    safe_chunks: bool = True,
1317    storage_options: Dict[str, str] = None,
1318):
1319    """This function creates an appropriate datastore for writing a dataset to
1320    a zarr ztore
1321
1322    See `Dataset.to_zarr` for full API docs.
1323    """
1324
1325    # Load empty arrays to avoid bug saving zero length dimensions (Issue #5741)
1326    for v in dataset.variables.values():
1327        if v.size == 0:
1328            v.load()
1329
1330    # expand str and path-like arguments
1331    store = _normalize_path(store)
1332    chunk_store = _normalize_path(chunk_store)
1333
1334    if storage_options is None:
1335        mapper = store
1336        chunk_mapper = chunk_store
1337    else:
1338        from fsspec import get_mapper
1339
1340        if not isinstance(store, str):
1341            raise ValueError(
1342                f"store must be a string to use storage_options. Got {type(store)}"
1343            )
1344        mapper = get_mapper(store, **storage_options)
1345        if chunk_store is not None:
1346            chunk_mapper = get_mapper(chunk_store, **storage_options)
1347        else:
1348            chunk_mapper = chunk_store
1349
1350    if encoding is None:
1351        encoding = {}
1352
1353    if mode is None:
1354        if append_dim is not None:
1355            mode = "a"
1356        elif region is not None:
1357            mode = "r+"
1358        else:
1359            mode = "w-"
1360
1361    if mode != "a" and append_dim is not None:
1362        raise ValueError("cannot set append_dim unless mode='a' or mode=None")
1363
1364    if mode not in ["a", "r+"] and region is not None:
1365        raise ValueError("cannot set region unless mode='a', mode='r+' or mode=None")
1366
1367    if mode not in ["w", "w-", "a", "r+"]:
1368        raise ValueError(
1369            "The only supported options for mode are 'w', "
1370            f"'w-', 'a' and 'r+', but mode={mode!r}"
1371        )
1372
1373    # validate Dataset keys, DataArray names, and attr keys/values
1374    _validate_dataset_names(dataset)
1375    _validate_attrs(dataset)
1376
1377    if region is not None:
1378        _validate_region(dataset, region)
1379        if append_dim is not None and append_dim in region:
1380            raise ValueError(
1381                f"cannot list the same dimension in both ``append_dim`` and "
1382                f"``region`` with to_zarr(), got {append_dim} in both"
1383            )
1384
1385    if mode == "r+":
1386        already_consolidated = consolidated
1387        consolidate_on_close = False
1388    else:
1389        already_consolidated = False
1390        consolidate_on_close = consolidated or consolidated is None
1391    zstore = backends.ZarrStore.open_group(
1392        store=mapper,
1393        mode=mode,
1394        synchronizer=synchronizer,
1395        group=group,
1396        consolidated=already_consolidated,
1397        consolidate_on_close=consolidate_on_close,
1398        chunk_store=chunk_mapper,
1399        append_dim=append_dim,
1400        write_region=region,
1401        safe_chunks=safe_chunks,
1402        stacklevel=4,  # for Dataset.to_zarr()
1403    )
1404
1405    if mode in ["a", "r+"]:
1406        _validate_datatypes_for_zarr_append(dataset)
1407        if append_dim is not None:
1408            existing_dims = zstore.get_dimensions()
1409            if append_dim not in existing_dims:
1410                raise ValueError(
1411                    f"append_dim={append_dim!r} does not match any existing "
1412                    f"dataset dimensions {existing_dims}"
1413                )
1414        existing_var_names = set(zstore.zarr_group.array_keys())
1415        for var_name in existing_var_names:
1416            if var_name in encoding.keys():
1417                raise ValueError(
1418                    f"variable {var_name!r} already exists, but encoding was provided"
1419                )
1420        if mode == "r+":
1421            new_names = [k for k in dataset.variables if k not in existing_var_names]
1422            if new_names:
1423                raise ValueError(
1424                    f"dataset contains non-pre-existing variables {new_names}, "
1425                    "which is not allowed in ``xarray.Dataset.to_zarr()`` with "
1426                    "mode='r+'. To allow writing new variables, set mode='a'."
1427                )
1428
1429    writer = ArrayWriter()
1430    # TODO: figure out how to properly handle unlimited_dims
1431    dump_to_store(dataset, zstore, writer, encoding=encoding)
1432    writes = writer.sync(compute=compute)
1433
1434    if compute:
1435        _finalize_store(writes, zstore)
1436    else:
1437        import dask
1438
1439        return dask.delayed(_finalize_store)(writes, zstore)
1440
1441    return zstore
1442