1#
2# This file is part of the GROMACS molecular simulation package.
3#
4# Copyright (c) 2019,2020, by the GROMACS development team, led by
5# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
6# and including many others, as listed in the AUTHORS file in the
7# top-level source directory and at http://www.gromacs.org.
8#
9# GROMACS is free software; you can redistribute it and/or
10# modify it under the terms of the GNU Lesser General Public License
11# as published by the Free Software Foundation; either version 2.1
12# of the License, or (at your option) any later version.
13#
14# GROMACS is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17# Lesser General Public License for more details.
18#
19# You should have received a copy of the GNU Lesser General Public
20# License along with GROMACS; if not, see
21# http://www.gnu.org/licenses, or write to the Free Software Foundation,
22# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
23#
24# If you want to redistribute modifications to GROMACS, please
25# consider that scientific software is very special. Version
26# control is crucial - bugs must be traceable. We will be happy to
27# consider code for inclusion in the official distribution, but
28# derived work must not be called official GROMACS. Details are found
29# in the README & COPYING files - if they are missing, get the
30# official version at http://www.gromacs.org.
31#
32# To help us fund GROMACS development, we humbly ask that you cite
33# the research papers on the package. Check out http://www.gromacs.org.
34
35"""Define gmxapi-compliant Operations
36
37Provide decorators and base classes to generate and validate gmxapi Operations.
38
39Nodes in a work graph are created as instances of Operations. An Operation factory
40accepts well-defined inputs as key word arguments. The object returned by such
41a factory is a handle to the node in the work graph. It's ``output`` attribute
42is a collection of the Operation's results.
43
44function_wrapper(...) produces a wrapper that converts a function to an Operation
45factory. The Operation is defined when the wrapper is called. The Operation is
46instantiated when the factory is called. The function is executed when the Operation
47instance is run.
48
49The framework ensures that an Operation instance is executed no more than once.
50"""
51
52__all__ = ['computed_result',
53           'function_wrapper',
54           ]
55
56import abc
57import collections
58import functools
59import inspect
60import typing
61import weakref
62from contextlib import contextmanager
63
64import gmxapi as gmx
65from gmxapi import datamodel
66from gmxapi import exceptions
67from gmxapi import logger as root_logger
68from gmxapi.abc import OperationImplementation, MutableResource, Node
69from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types
70from gmxapi.typing import Future as _Future
71
72# Initialize module-level logger
73logger = root_logger.getChild('operation')
74logger.info('Importing {}'.format(__name__))
75
76
77class ResultDescription:
78    """Describe what will be returned when `result()` is called."""
79
80    def __init__(self, dtype=None, width=1):
81        assert isinstance(dtype, type)
82        assert issubclass(dtype, valid_result_types)
83        assert isinstance(width, int)
84        self._dtype = dtype
85        self._width = width
86
87    @property
88    def dtype(self) -> type:
89        """node output type"""
90        return self._dtype
91
92    @property
93    def width(self) -> int:
94        """ensemble width"""
95        return self._width
96
97    def __repr__(self):
98        return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width)
99
100
101class OutputData(object):
102    """Encapsulate the description and storage of a data output."""
103
104    def __init__(self, name: str, description: ResultDescription):
105        assert name != ''
106        self._name = name
107        assert isinstance(description, ResultDescription)
108        self._description = description
109        self._done = [False] * self._description.width
110        self._data = [None] * self._description.width
111
112    @property
113    def name(self):
114        """The name of the published output."""
115        return self._name
116
117    # TODO: Change to regular member function and add ensemble member arg.
118    @property
119    def done(self):
120        """Ensemble completion status for this output."""
121        return all(self._done)
122
123    def data(self, member: int = None):
124        """Access the raw data for localized output for the ensemble or the specified member."""
125        if not self.done:
126            raise exceptions.ApiError('Attempt to read before data has been published.')
127        if self._data is None or None in self._data:
128            raise exceptions.ApiError('Data marked "done" but contains null value.')
129        if member is not None:
130            return self._data[member]
131        else:
132            return self._data
133
134    def set(self, value, member: int):
135        """Set local data and mark as completed.
136
137        Used internally to maintain the data store.
138        """
139        if self._description.dtype == datamodel.NDArray:
140            self._data[member] = datamodel.ndarray(value)
141        else:
142            self._data[member] = self._description.dtype(value)
143        self._done[member] = True
144
145    def reset(self):
146        """Reinitialize the data store.
147
148        Note:
149            This is a workaround until the execution model is more developed.
150
151        Todo:
152            Remove this method when all operation handles provide factories to
153            reinstantiate on new Contexts and/or with new inputs.
154
155        """
156        self._done = [False] * self._description.width
157        self._data = [None] * self._description.width
158
159
160class EnsembleDataSource(gmx.abc.EnsembleDataSource):
161    """A single source of data with ensemble data flow annotations.
162
163    Note that data sources may be Futures.
164    """
165
166    def __init__(self, source=None, width=1, dtype=None):
167        self.source = source
168        self.width = width
169        self.dtype = dtype
170
171    def node(self, member: int):
172        return self.source[member]
173
174    def reset(self):
175        protocols = ('reset', '_reset')
176        for protocol in protocols:
177            if hasattr(self.source, protocol):
178                getattr(self.source, protocol)()
179
180
181class DataSourceCollection(collections.OrderedDict):
182    """Store and describe input data handles for an operation.
183
184    When created from InputCollectionDescription.bind(), the DataSourceCollection
185    has had default values inserted.
186
187    Note: DataSourceCollection is the input resource collection type for NodeBuilder.
188    To use with the NodeBuilder interface, first create the collection, then
189    iteratively add its resources.
190
191    TODO: We should probably normalize the collection aspect of input. Is this a
192          single input? Are all inputs added as collections? Should this be split
193          to a standard iterable? Does a resource factory always produce a mapping?
194    """
195
196    def __init__(self, **kwargs):
197        """Initialize from key/value pairs of named data sources.
198
199        Data sources may be any of the basic gmxapi data types, gmxapi Futures
200        of those types, or gmxapi ensemble data bundles of the above.
201        """
202        super(DataSourceCollection, self).__init__()
203        for name, value in kwargs.items():
204            self[name] = value
205
206    def __setitem__(self, key: str, value: SourceTypeVar) -> None:
207        if not isinstance(key, str):
208            raise exceptions.TypeError('Data must be named with str type.')
209        # TODO: Encapsulate handling of proferred data sources to Context details.
210        # Preprocessed input should be self-describing gmxapi data types. Structured
211        # input must be recursively (depth-first) converted to gmxapi data types.
212        # TODO: Handle gmxapi Futures stored as dictionary elements!
213        if not isinstance(value, valid_source_types):
214            if isinstance(value, collections.abc.Iterable):
215                # Iterables here are treated as arrays, but we do not have a robust typing system.
216                # Warning: In the initial implementation, the iterable may contain Future objects.
217                # TODO: (#2993) Revisit as we sort out data shape and Future protocol.
218                try:
219                    value = datamodel.ndarray(value)
220                except (exceptions.ValueError, exceptions.TypeError) as e:
221                    raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e
222            elif hasattr(value, 'result'):
223                # A Future object.
224                pass
225            else:
226                raise exceptions.ApiError('Cannot process data source {}'.format(value))
227        super().__setitem__(key, value)
228
229    def reset(self):
230        """Reset all sources in the collection."""
231        for source in self.values():
232            if hasattr(source, 'reset'):
233                source.reset()
234            if hasattr(source, '_reset'):
235                source._reset()
236
237    def __hash__(self):
238        """Provide some sort of unique identifier.
239
240        We need a more deterministic fingerprinting scheme with well-specified
241        uniqueness semantics, but right now we just need something reasonably
242        unique.
243        """
244        hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item)
245        return hash(hashed_keys_and_values)
246
247
248def computed_result(function):
249    """Decorate a function to get a helper that produces an object with Result behavior.
250
251    When called, the new function produces an ImmediateResult object.
252
253    The new function has the same signature as the original function, but can accept
254    gmxapi data proxies, assuming the provided proxy objects represent types
255    compatible with the original signature.
256
257    Calls to `result()` return the value that `function` would return when executed
258    in the local context with the inputs fully resolved.
259
260    The API does not specify when input data dependencies will be resolved
261    or when the wrapped function will be executed. That is, ``@computed_result``
262    functions may force immediate resolution of data dependencies and/or may
263    be called more than once to satisfy dependent operation inputs.
264    """
265
266    # Attempt to inspect function signature. Introspection can fail, so we catch
267    # the various exceptions. We re-raise as ApiError, indicating a bug, because
268    # we should do more to handle this intelligently and provide better user
269    # feedback.
270    # TODO: Figure out what to do with exceptions where this introspection
271    #  and rebinding won't work.
272    # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object
273    try:
274        sig = inspect.signature(function)
275    except TypeError as T:
276        raise exceptions.ApiError('Can not inspect type of provided function argument.') from T
277    except ValueError as V:
278        raise exceptions.ApiError('Can not inspect provided function signature.') from V
279
280    wrapped_function = function_wrapper()(function)
281
282    @functools.wraps(function)
283    def new_function(*args, **kwargs):
284        # The signature of the new function will accept abstractions
285        # of whatever types it originally accepted. This wrapper must
286        # * Create a mapping to the original call signature from `input`
287        # * Add handling for typed abstractions in wrapper function.
288        # * Process arguments to the wrapper function into `input`
289
290        # 1. Inspect the return annotation to determine valid gmxapi type(s)
291        # 2. Generate a Result object advertising the correct type, bound to the
292        #    Input and implementing function.
293        # 3. Transform the result() data to the correct type.
294
295        # TODO: (FR3+) create a serializable data structure for inputs discovered
296        #  from function introspection.
297
298        for name, param in sig.parameters.items():
299            assert not param.kind == param.POSITIONAL_ONLY
300        bound_arguments = sig.bind(*args, **kwargs)
301        handle = wrapped_function(**bound_arguments.arguments)
302        output = handle.output
303        # TODO: Find a type hinting / generic way to assert output attributes.
304        return output.data
305
306    return new_function
307
308
309class OutputCollectionDescription(collections.OrderedDict):
310    def __init__(self, **kwargs):
311        """Create the output description for an operation node from a dictionary of names and types."""
312        outputs = []
313        for name, flavor in kwargs.items():
314            if not isinstance(name, str):
315                raise exceptions.TypeError('Output descriptions are keyed by Python strings.')
316            # Multidimensional outputs are explicitly NDArray
317            if issubclass(flavor, (list, tuple)):
318                flavor = datamodel.NDArray
319            assert issubclass(flavor, valid_result_types)
320            outputs.append((name, flavor))
321        super().__init__(outputs)
322
323
324class InputCollectionDescription(collections.OrderedDict):
325    """Describe acceptable inputs for an Operation.
326
327    Generally, an InputCollectionDescription is an aspect of the public API by
328    which an Operation expresses its possible inputs. This class includes details
329    of the Python package.
330
331    Keyword Arguments:
332        parameters : A sequence of named parameter descriptions.
333
334    Parameter descriptions are objects containing an `annotation` attribute
335    declaring the data type of the parameter and, optionally, a `default`
336    attribute declaring a default value for the parameter.
337
338    Instances can be used as an ordered map of parameter names to gmxapi data types.
339
340    Analogous to inspect.Signature, but generalized for gmxapi Operations.
341    Additional notable differences: typing is normalized at initialization, and
342    the bind() method does not return an object that can be directly used as
343    function input. The object returned from bind() is used to construct a data
344    graph Edge for subsequent execution.
345    """
346
347    def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]):
348        """Create the input description for an operation node from a dictionary of names and types."""
349        inputs = []
350        for name, param in parameters:
351            if not isinstance(name, str):
352                raise exceptions.TypeError('Input descriptions are keyed by Python strings.')
353            # Multidimensional inputs are explicitly NDArray
354            dtype = param.annotation
355            if issubclass(dtype, collections.abc.Iterable) \
356                    and not issubclass(dtype, (str, bytes, collections.abc.Mapping)):
357                # TODO: we can relax this with some more input conditioning.
358                if dtype != datamodel.NDArray:
359                    raise exceptions.UsageError(
360                        'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param))
361            assert issubclass(dtype, valid_result_types)
362            if hasattr(param, 'kind'):
363                disallowed = any([param.kind == param.POSITIONAL_ONLY,
364                                  param.kind == param.VAR_POSITIONAL,
365                                  param.kind == param.VAR_KEYWORD])
366                if disallowed:
367                    raise exceptions.ProtocolError(
368                        'Cannot wrap function. Operations must have well-defined parameter names.')
369                kind = param.kind
370            else:
371                kind = inspect.Parameter.POSITIONAL_OR_KEYWORD
372            if hasattr(param, 'default'):
373                default = param.default
374            else:
375                default = inspect.Parameter.empty
376            inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype))
377        super().__init__([(input.name, input.annotation) for input in inputs])
378        self.signature = inspect.Signature(inputs)
379
380    @staticmethod
381    def from_function(function):
382        """Inspect a function to be wrapped.
383
384        Used internally by gmxapi.operation.function_wrapper()
385
386            Raises:
387                exceptions.ProtocolError if function signature cannot be determined to be valid.
388
389            Returns:
390                InputCollectionDescription for the function input signature.
391        """
392        # First, inspect the function.
393        assert callable(function)
394        signature = inspect.signature(function)
395        # The function must have clear and static input schema
396        # Make sure that all parameters have clear names, whether or not they are used in a call.
397        for name, param in signature.parameters.items():
398            disallowed = any([param.kind == param.POSITIONAL_ONLY,
399                              param.kind == param.VAR_POSITIONAL,
400                              param.kind == param.VAR_KEYWORD])
401            if disallowed:
402                raise exceptions.ProtocolError(
403                    'Cannot wrap function. Operations must have well-defined parameter names.')
404            if param.name == 'input':
405                raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.')
406        description = collections.OrderedDict()
407        for param in signature.parameters.values():
408            if param.name == 'output':
409                # Wrapped functions may accept the output parameter to publish results, but
410                # that is not part of the Operation input signature.
411                continue
412            if param.annotation == param.empty:
413                if param.default == param.empty or param.default is None:
414                    raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name))
415                dtype = type(param.default)
416                if isinstance(dtype, collections.abc.Iterable) \
417                        and not isinstance(dtype, (str, bytes, collections.abc.Mapping)):
418                    dtype = datamodel.NDArray
419            else:
420                dtype = param.annotation
421            description[param.name] = param.replace(annotation=dtype)
422        return InputCollectionDescription(description.items())
423
424    def bind(self, *args, **kwargs) -> DataSourceCollection:
425        """Create a compatible DataSourceCollection from provided arguments.
426
427        Pre-process input and function signature to get named input arguments.
428
429        This is a helper function to allow calling code to characterize the
430        arguments in a Python function call with hints from the factory that is
431        initializing an operation. Its most useful functionality is to  allows a
432        factory to accept positional arguments where named inputs are usually
433        required. It also allows data sources to participate in multiple
434        DataSourceCollections with minimal constraints.
435
436        Note that the returned object has had data populated from any defaults
437        described in the InputCollectionDescription.
438
439        See wrapped_function_runner() and describe_function_input().
440        """
441        # For convenience, accept *args, but convert to **kwargs to pass to Operation.
442        # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict.
443        # If present, kwargs['input'] is treated as an input "pack" providing _default_ values.
444        input_kwargs = collections.OrderedDict()
445        if 'input' in kwargs:
446            provided_input = kwargs.pop('input')
447            if provided_input is not None:
448                input_kwargs.update(provided_input)
449        # `function` may accept an `output` keyword argument that should not be supplied to the factory.
450        for key, value in kwargs.items():
451            if key == 'output':
452                raise exceptions.UsageError('Invalid keyword argument: output (reserved).')
453            input_kwargs[key] = value
454        try:
455            bound_arguments = self.signature.bind_partial(*args, **input_kwargs)
456        except TypeError as e:
457            raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e
458        assert 'output' not in bound_arguments.arguments
459        bound_arguments.apply_defaults()
460        assert 'input' not in bound_arguments.arguments
461        input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()])
462        if 'output' in input_kwargs:
463            input_kwargs.pop('output')
464        return DataSourceCollection(**input_kwargs)
465
466
467class ProxyDataDescriptor(object):
468    """Base class for data descriptors used in DataProxyBase subclasses.
469
470    Subclasses should either not define __init__ or should call the base class
471    __init__ explicitly: super().__init__(self, name, dtype)
472    """
473
474    def __init__(self, name: str, dtype: ResultTypeVar = None):
475        self._name = name
476        # TODO: We should not allow dtype==None, but we currently have a weak data
477        #  model that does not allow good support of structured Futures.
478        if dtype is not None:
479            assert isinstance(dtype, type)
480            assert issubclass(dtype, valid_result_types)
481        self._dtype = dtype
482
483
484class DataProxyMeta(abc.ABCMeta):
485    # Key word arguments consumed by __prepare__
486    _prepare_keywords = ('descriptors',)
487
488    @classmethod
489    def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None):
490        """Allow dynamic sub-classing.
491
492        DataProxy class definitions are collections of data descriptors. This
493        class method allows subclasses to give the descriptor names and type(s)
494        in the class declaration as arguments instead of as class attribute
495        assignments.
496
497            class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass
498
499        Note:
500            If we are only using this metaclass for the __prepare__ hook by the
501            time we require Python >= 3.6, we could reimplement __prepare__ as
502            DataProxyBase.__init_subclass__ and remove this metaclass.
503        """
504        if descriptors is None:
505            return {}
506        elif isinstance(descriptors, tuple):
507            namespace = collections.OrderedDict([(d._name, d) for d in descriptors])
508            return namespace
509        else:
510            assert isinstance(descriptors, collections.abc.Mapping)
511            return descriptors
512
513    def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs):
514        for key in kwargs:
515            if key not in DataProxyMeta._prepare_keywords:
516                raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
517        # See note about DataProxyBase._reserved.
518        if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases):
519            raise exceptions.ApiError(
520                'We currently expect DataProxy classes to provide a list of reserved attribute names.')
521        for key in namespace:
522            # Here we can check conformance with naming and typing rules.
523            assert isinstance(key, str)
524            if key.startswith('__'):
525                # Skip non-public attributes.
526                continue
527            descriptor = namespace[key]
528            # The purpose of the current data proxies is to serve as a limited namespace
529            # containing only descriptors of a certain type. In the future, these proxies
530            # may be flattened into a facet of a richer OperationHandle class
531            # (this metaclass may become a decorator on an implementation class),
532            # but for now we check that the class is being used according to the
533            # documented framework. A nearer term update could be to restrict the
534            # type of the data descriptor:
535            # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular
536            #  ProxyDataDescriptor subclass.
537            # Also, see note about DataProxyBase._reserved
538            if not isinstance(descriptor, ProxyDataDescriptor):
539                if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in
540                                                                 bases if hasattr(base, '_reserved')):
541                    raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor)))
542            else:
543                assert isinstance(descriptor, ProxyDataDescriptor)
544                if not isinstance(descriptor._name, str) or descriptor._name == '':
545                    descriptor._name = key
546                else:
547                    if descriptor._name != key:
548                        raise exceptions.ApiError(
549                            'Descriptor internal name {} does not match attribute name {}'.format(
550                                descriptor._name, key))
551        return super().__new__(cls, name, bases, namespace)
552
553    # TODO: This keyword argument stripping is not necessary in more recent Python versions.
554    # When Python minimum required version is increased, check if we can remove this.
555    def __init__(cls, name, bases, namespace, **kwargs):
556        for key in kwargs:
557            if key not in DataProxyMeta._prepare_keywords:
558                raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
559        super().__init__(name, bases, namespace)
560
561    # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion.
562    #  Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
563    # def __dir__(self) -> Iterable[str]:
564    #     return super().__dir__()
565
566
567class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta):
568    """Limited interface to managed resources.
569
570    Inherit from DataProxy to specialize an interface to an ``instance``.
571    In the derived class, either do not define ``__init__`` or be sure to
572    initialize the super class (DataProxy) with an instance of the object
573    to be proxied.
574
575    A class deriving from DataProxyBase allows its instances to provide a namespace
576    for proxies to named data by defining attributes that are data descriptors
577    (subclasses of ProxyDataDescriptor).
578    The ProxyDataDescriptors are accessed as attributes of the
579    data proxy instance or by iterating on items(). Attributes that are not
580    ProxyDataDescriptors are possible, but will not be returned by items() which
581    is a necessary part of gmxapi execution protocol.
582
583    Acts as an owning handle to the resources provide by ``instance``,
584    preventing the reference count of ``instance`` from going to zero for the
585    lifetime of the proxy object.
586
587    When sub-classing DataProxyBase, data descriptors can be passed as a mapping
588    to the ``descriptors`` key word argument in the class declaration. This
589    allows data proxy subclasses to be easily defined dynamically.
590
591        mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)}
592        ...
593        class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass
594        assert hasattr(MyDataProxy, 'foo')
595
596    """
597    # This class attribute (which subclasses are free to replace to augment) is an
598    # indication of a problem with the current data model. If we are allowing
599    # reserved words that would otherwise be valid data names, there is not a
600    # compelling reason for separate data proxy classes: we throw away the assertion
601    # that we are preparing a clean namespace and we could have accomplished the
602    # class responsibilities in the Operation handle with just descriptor classes.
603    # If we want the clean namespace, we should figure out how to keep this interface
604    # from growing and/or have some "hidden" internal interface.
605    _reserved = ('ensemble_width', 'items', '_reserved')
606
607    # This class can be expanded to be the attachment point for a metaclass for
608    # data proxies such as PublishingDataProxy or OutputDataProxy, which may be
609    # defined very dynamically and concisely as a set of Descriptors and a type()
610    # call.
611    # If development in this direction does not materialize, then this base
612    # class is not very useful and should be removed.
613    def __init__(self, instance: 'SourceResource', client_id: int = None):
614        """Get partial ownership of a resource provider.
615
616        Arguments:
617            instance : resource-owning object
618            client_id : identifier for client holding the resource handle (e.g. ensemble member id)
619
620        If client_id is not provided, the proxy scope is for all clients.
621        """
622        # TODO: Decide whether _resource_instance is public or not.
623        # Note: currently commonly needed for subclass implementations.
624        self._resource_instance = instance
625        # Developer note subclasses should handle self._client_identifier == None
626        self._client_identifier = client_id
627        # Collection is fixed by the time of instance creation, so cache it.
628        self.__keys = tuple([key for key, _ in self.items()])
629        self.__length = len(self.__keys)
630
631    @property
632    def ensemble_width(self) -> int:
633        return self._resource_instance.width()
634
635    @classmethod
636    def items(cls):
637        """Generator for tuples of attribute name and descriptor instance.
638
639        This almost certainly doesn't do quite what we want...
640        """
641        for name, value in cls.__dict__.items():
642            if isinstance(value, ProxyDataDescriptor):
643                yield name, value
644
645    def __getitem__(self, k):
646        if hasattr(self, k):
647            return getattr(self, k)
648
649    def __len__(self):
650        return self.__length
651
652    def __iter__(self):
653        for key in self.__keys:
654            yield key
655
656
657class Publisher(ProxyDataDescriptor):
658    """Data descriptor for write access to a specific named data resource.
659
660    For a wrapped function receiving an ``output`` argument, provides the
661    accessors for an attribute on the object passed as ``output``. Maps
662    read and write access by the wrapped function to appropriate details of
663    the resource manager.
664
665    Used internally to implement settable attributes on PublishingDataProxy.
666    Allows PublishingDataProxy to be dynamically defined in the scope of the
667    operation.function_wrapper closure. Each named output is represented by
668    an instance of Publisher in the PublishingDataProxy class definition for
669    the operation.
670
671    Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
672
673    Collaborations:
674    Relies on implementation details of ResourceManager.
675    """
676
677    def __get__(self, instance: DataProxyBase, owner):
678        if instance is None:
679            # The current access has come through the class attribute of owner class
680            return self
681        resource_manager = instance._resource_instance
682        client_id = instance._client_identifier
683        # TODO: Fix API scope.
684        # Either this class is a detail of the same implementation as ResourceManager,
685        # or we need to enforce that instance._resource_instance provides _data (or equivalent)
686        assert isinstance(resource_manager, ResourceManager)
687        if client_id is None:
688            return getattr(resource_manager._data, self._name)
689        else:
690            return getattr(resource_manager._data, self._name)[client_id]
691
692    def __set__(self, instance: DataProxyBase, value):
693        resource_manager = instance._resource_instance
694        # TODO: Fix API scope.
695        # Either this class is a detail of the same implementation as ResourceManager,
696        # or we need to enforce that instance._resource_instance provides _data (or equivalent)
697        assert isinstance(resource_manager, ResourceManager)
698        client_id = instance._client_identifier
699        resource_manager.set_result(name=self._name, value=value, member=client_id)
700
701    def __repr__(self):
702        return '{}(name={}, dtype={})'.format(self.__class__.__name__,
703                                              self._name,
704                                              self._dtype.__qualname__)
705
706
707def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]:
708    """Returns a class definition for a PublishingDataProxy for the provided output description."""
709    # This dynamic type creation hides collaborations with things like make_datastore.
710    # We should encapsulate these relationships in Context details, explicit collaborations
711    # between specific operations and Contexts, and in groups of Operation definition helpers.
712
713    descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description])
714
715    class PublishingDataProxy(DataProxyBase, descriptors=descriptors):
716        """Handler for write access to the `output` of an operation.
717
718        Acts as a sort of PublisherCollection.
719        """
720
721    return PublishingDataProxy
722
723
724# get symbols we can use to annotate input and output types more specifically.
725_OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase)
726_PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase)
727# Currently, the ClientID type is an integer, but this may change.
728ClientID = typing.NewType('ClientID', int)
729
730
731class _Resources(typing.Generic[_PublishingDataProxyType]):
732    pass
733
734
735# TODO: Why generic in publishingdataproxytype?
736class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]):
737    """Resource Manager for a data provider.
738
739    Supports Future instances in a particular context.
740    """
741
742    # Note: ResourceManager members not yet included:
743    # future(), _data, set_result.
744
745    # This might not belong here. Maybe separate out for a OperationHandleManager?
746    @abc.abstractmethod
747    def data(self) -> _OutputDataProxyType:
748        """Get the output data proxy."""
749        # Warning: this should probably be renamed, but "output_data_proxy" is already
750        # a member in at least one derived class.
751        ...
752
753    @abc.abstractmethod
754    def is_done(self, name: str) -> bool:
755        return False
756
757    @abc.abstractmethod
758    def get(self, name: str) -> 'OutputData':
759        ...
760
761    @abc.abstractmethod
762    def update_output(self):
763        """Bring the _data member up to date and local."""
764        pass
765
766    @abc.abstractmethod
767    def reset(self):
768        """Recursively reinitialize resources.
769
770        Set the resource manager to its initialized state.
771        All outputs are marked not "done".
772        All inputs supporting the interface have ``_reset()`` called on them.
773        """
774
775    @abc.abstractmethod
776    def width(self) -> int:
777        """Ensemble width of the managed resources."""
778        ...
779
780    @abc.abstractmethod
781    def future(self, name: str, description: ResultDescription) -> 'Future':
782        """Get a Future handle for managed data.
783
784        Resource managers owned by subclasses of gmx.operation.Context provide
785        this method to get references to output data.
786
787        In addition to the interface described by gmx.abc.Future, returned objects
788        provide the interface described by gmx.operation.Future.
789        """
790
791
792class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
793    """Provide the resource manager interface for local static data.
794
795    Allow data transformations on the proxied resource.
796
797    Keyword Args:
798        proxied_data: A gmxapi supported data object.
799        width: Size of (one-dimensional) shaped data produced by function.
800        function: Transformation to perform on the managed data.
801
802    The callable passed as ``function`` must accept a single argument. The
803    argument will be an iterable when proxied_data represents an ensemble,
804    or an object of the same type as proxied_data otherwise.
805    """
806
807    def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable):
808        assert not isinstance(proxied_data, Future)
809        if hasattr(proxied_data, 'width'):
810            # Ensemble data source
811            assert hasattr(proxied_data, 'source')
812            self._result = function(proxied_data.source)
813        else:
814            self._result = function(proxied_data)
815        if width > 1:
816            if isinstance(self._result, (str, bytes)):
817                # In this case, do not implicitly broadcast
818                raise exceptions.ValueError('"function" produced data incompatible with "width".')
819            else:
820                if not isinstance(self._result, collections.abc.Iterable):
821                    raise exceptions.DataShapeError(
822                        'Expected iterable of size {} but "function" result is not iterable.')
823            data = list(self._result)
824            size = len(data)
825            if len(data) != width:
826                raise exceptions.DataShapeError(
827                    'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data),
828                                                                                                  size))
829            dtype = type(data[0])
830        else:
831            if width != 1:
832                raise exceptions.ValueError('width must be an integer 1 or greater.')
833            dtype = type(self._result)
834            if issubclass(dtype, (list, tuple)):
835                dtype = datamodel.NDArray
836                data = [datamodel.ndarray(self._result)]
837            elif isinstance(self._result, collections.abc.Iterable):
838                if not isinstance(self._result, (str, bytes, dict)):
839                    raise exceptions.ValueError(
840                        'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result)))
841                else:
842                    dtype = str
843                    data = [str(self._result)]
844            else:
845                data = [self._result]
846        description = ResultDescription(dtype=dtype, width=width)
847        self._data = OutputData(name=name, description=description)
848        for member in range(width):
849            self._data.set(data[member], member=member)
850
851        output_collection_description = OutputCollectionDescription(**{name: dtype})
852        self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description)
853
854    def is_done(self, name: str) -> bool:
855        return True
856
857    def get(self, name: str) -> 'OutputData':
858        assert self._data.name == name
859        return self._data
860
861    def data(self) -> _OutputDataProxyType:
862        return self.output_data_proxy(self)
863
864    def width(self) -> int:
865        # TODO: It looks like the OutputData ResultDescription probably belongs
866        #  in the public interface.
867        return self._data._description.width
868
869    def update_output(self):
870        pass
871
872    def reset(self):
873        pass
874
875    def future(self, name: str, description: ResultDescription) -> 'Future':
876        return Future(self, name, description=description)
877
878
879class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
880    """Act as a resource manager for data managed by another resource manager.
881
882    Allow data transformations on the proxied resource.
883
884    Keyword Args:
885        proxied_future: An object implementing the Future interface.
886        width: Size of (one-dimensional) shaped data produced by function.
887        function: Transformation to perform on the result of proxied_future.
888
889    The callable passed as ``function`` must accept a single argument, which will
890    be an iterable when proxied_future represents an ensemble, or an object of
891    type proxied_future.description.dtype otherwise.
892    """
893
894    def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable):
895        self._done = False
896        self._proxied_future = proxied_future
897        self._width = width
898        self.name = self._proxied_future.name
899        self._result = None
900        assert callable(function)
901        self.function = function
902
903    def width(self) -> int:
904        return self._width
905
906    def reset(self):
907        self._done = False
908        self._proxied_future._reset()
909        self._result = None
910
911    def is_done(self, name: str) -> bool:
912        return self._done
913
914    def get(self, name: str):
915        if name != self.name:
916            raise exceptions.ValueError('Request for unknown data.')
917        if not self.is_done(name):
918            raise exceptions.ProtocolError('Data not ready.')
919        result = self.function(self._result)
920        if self._width != 1:
921            # TODO Fix this typing nightmare:
922            #  ResultDescription should be fully knowable and defined when the resource manager is initialized.
923            data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width))
924            for member, value in enumerate(result):
925                data.set(value, member)
926        else:
927            data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width))
928            data.set(result, 0)
929        return data
930
931    def update_output(self):
932        self._result = self._proxied_future.result()
933        self._done = True
934
935    def data(self) -> _OutputDataProxyType:
936        raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.')
937
938    def future(self, name: str, description: ResultDescription):
939        return Future(self, name, description=description)
940
941
942class AbstractOperation(typing.Generic[_OutputDataProxyType]):
943    """Client interface to an operation instance (graph node).
944
945    Note that this is a generic abstract class. Subclasses should provide a
946    class subscript to help static type checkers.
947    """
948
949    @abc.abstractmethod
950    def run(self):
951        """Assert execution of an operation.
952
953        After calling run(), the operation results are guaranteed to be available
954        in the local context.
955        """
956
957    @property
958    @abc.abstractmethod
959    def output(self) -> _OutputDataProxyType:
960        """Get a proxy collection to the output of the operation.
961
962        Developer note: The 'output' property exists to isolate the namespace of
963        output data from other operation handle attributes and we should consider
964        whether it is actually necessary or helpful. To facilitate its possible
965        future removal, do not enrich its interface beyond that of a collection
966        of OutputDescriptor attributes.
967        """
968        ...
969
970
971class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable):
972    """Helper class to define the key type for OperationRegistry."""
973
974    def __hash__(self):
975        return hash((self.namespace, self.name))
976
977    def __eq__(self, other):
978        """Test equivalence rather than identity.
979
980        Note: Use `is` to test identity.
981        """
982        return other.namespace == self.namespace and other.name == self.name
983
984    def __str__(self):
985        return '.'.join([self.namespace, self.name])
986
987    def __repr__(self):
988        return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name)
989
990
991def _make_registry_key(*args) -> OperationRegistryKey:
992    """Normalize input to an OperationRegistryKey.
993
994    Used to implement OperationRegistry.__getitem__(), which catches and converts
995    the various exceptions this helper function can produce.
996    """
997    if len(args) > 1:
998        return OperationRegistryKey(*args)
999    else:
1000        if len(args) != 1:
1001            raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].')
1002        item = args[0]
1003    if isinstance(item, OperationRegistryKey):
1004        return item
1005    if isinstance(item, str):
1006        namespace, name = item.rsplit(sep='.', maxsplit=1)
1007        return OperationRegistryKey(namespace=namespace, name=name)
1008    # Item could be a class object or an instance object...
1009    if hasattr(item, 'namespace') and hasattr(item, 'name'):
1010        if callable(item.namespace):
1011            namespace = item.namespace()
1012        else:
1013            namespace = item.namespace
1014        if callable(item.name):
1015            name = item.name()
1016        else:
1017            name = item.name
1018        return OperationRegistryKey(namespace=namespace, name=name)
1019    raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item))
1020
1021
1022class OperationRegistry(collections.UserDict):
1023    """Helper class to map identifiers to Operation implementation instances.
1024
1025    This is an implementation detail of gmxapi.operation and should not be used from
1026    outside of the package until a stable interface can be specified.
1027    """
1028
1029    @typing.overload
1030    def __getitem__(self, item: OperationRegistryKey):
1031        ...
1032
1033    @typing.overload
1034    def __getitem__(self, item: str):
1035        ...
1036
1037    def __getitem__(self, *args):
1038        """Fetch the requested operation registrant.
1039
1040        The getter is overloaded to support look-ups in multiple forms.
1041
1042        The key can be given in the following forms.
1043        * As a period-delimited string of the form "namespace.operation".
1044        * As an OperationRegistryKey object.
1045        * As a sequence of positional arguments accepted by OperationRegistryKey.
1046        * As an object conforming to the OperationRegistryKey interface.
1047        """
1048        try:
1049            item = _make_registry_key(*args)
1050        except exceptions.Error as e:
1051            raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e
1052        return self.data[item]
1053
1054
1055# Module level data store for locating operation implementations at run time.
1056# TODO: This may make sense as instance data of a root Context instance, but we
1057#  don't have a stable interface for communicating between Contexts yet.
1058# Alternatively, it could be held as class data in a RegisteredOperation class,
1059# but note that the "register" member function would behave less like the abc.ABCMeta
1060# support for "virtual subclassing" and more like the generic function machinery
1061# of e.g. functools.singledispatch.
1062_operation_registry = OperationRegistry()
1063
1064
1065def _register_operation(cls: typing.Type[OperationImplementation]):
1066    assert isinstance(cls, type)
1067    assert issubclass(cls, OperationImplementation)
1068    operation = _make_registry_key(cls)
1069    if operation in _operation_registry:
1070        full_name = str(operation)
1071        raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name))
1072    _operation_registry[operation] = cls
1073
1074
1075# TODO: replace with a generic function that we dispatch on so the type checker can infer a return type.
1076def _get_operation_director(operation, context: gmx.abc.Context):
1077    """
1078
1079    :param operation:
1080    :param context:
1081    :return: gmxapi.abc.OperationDirector
1082    """
1083    registrant = _operation_registry[operation]
1084    director = registrant.director(context=context)
1085    return director
1086
1087
1088class InputDescription(abc.ABC):
1089    """Node input description for gmxapi.operation module.
1090
1091    Provides the support needed to understand operation inputs in gmxapi.operation
1092    module Contexts.
1093
1094    .. todo:: Implementation base class with heritable behavior and/or helpers to
1095              compose this functionality from more normative description of
1096              operation inputs. This will probably become a facet of the ResourceDirector
1097              when specialized for gmxapi.operation.Context.
1098    """
1099
1100    @abc.abstractmethod
1101    def signature(self) -> InputCollectionDescription:
1102        """Mapping of named inputs and input type.
1103
1104        Used to determine valid inputs before an Operation node is created.
1105
1106        Collaborations:
1107            Related to the operation resource factory for this context.
1108
1109        ..  todo::
1110            Better unification of this protocol, InputCollectionDescription, and
1111            resource factory.
1112            Note, also, that the *bind* method of the returned InputCollectionDescription
1113            serves as the resource factory for input to the node builder.
1114        """
1115        ...
1116
1117    @abc.abstractmethod
1118    def make_uid(self, input: 'DataEdge') -> str:
1119        """The unique identity of an operation node tags the output with respect to the input.
1120
1121        Combines information on the Operation details and the input to uniquely
1122        identify the Operation node.
1123
1124        Arguments:
1125            input : A (collection of) data source(s) that can provide Fingerprints.
1126
1127        Used internally by the Context to manage ownership of data sources, to
1128        locate resources for nodes in work graphs, and to manage serialization,
1129        deserialization, and checkpointing of the work graph.
1130
1131        The UID is a detail of the generic Operation that _should_ be independent
1132        of the Context details to allow the framework to manage when and where
1133        an operation is executed.
1134
1135        TODO: We probably don't want to allow Operations to single-handedly determine their
1136         own uniqueness, but they probably should participate in the determination with the Context.
1137
1138        TODO: Context implementations should be allowed to optimize handling of
1139         equivalent operations in different sessions or work graphs, but we do not
1140         yet guarantee that UIDs are globally unique!
1141
1142        To be refined...
1143        """
1144        ...
1145
1146
1147class ConcreteInputDescription(InputDescription):
1148    """Simple composed InputDescription."""
1149
1150    def __init__(self,
1151                 input_signature: InputCollectionDescription,
1152                 uid_helper: typing.Callable[['DataEdge'], str]
1153                 ):
1154        self._input_signature_description = input_signature
1155        self._uid_helper = uid_helper
1156
1157    def signature(self) -> InputCollectionDescription:
1158        return self._input_signature_description
1159
1160    def make_uid(self, input: 'DataEdge') -> str:
1161        return self._uid_helper(input)
1162
1163
1164class OperationMeta(abc.ABCMeta):
1165    """Metaclass to manage the definition of Operation implementation classes.
1166
1167    Note that this metaclass can be superseded by `__init_subclass__()` when
1168    the minimum Python version is increased to Python 3.6+.
1169    """
1170
1171    def __new__(meta, name, bases, class_dict):
1172        cls = super().__new__(meta, name, bases, class_dict)
1173        # Register subclasses, but not the base class.
1174        if issubclass(cls, OperationImplementation) and cls is not OperationImplementation:
1175            # TODO: Remove OperationDetailsBase layer and this extra check.
1176            # Note: we do not yet register the Operations built dynamically because we
1177            # don't have a clear definition of unique implementations yet. For instance,
1178            # a new OperationDetails class is defined for each call to gmx.join_arrays
1179            # TODO: Properly register and reuse Operations defined dynamically
1180            #  through function_wrapper (currently encompassed by OperationDetailsBase subclasses)
1181            if name != 'OperationDetailsBase':
1182                if OperationDetailsBase not in bases:
1183                    _register_operation(cls)
1184        return cls
1185
1186
1187class OperationDetailsBase(OperationImplementation, InputDescription,
1188                           metaclass=OperationMeta):
1189    """Abstract base class for Operation details in this module's Python Context.
1190
1191    Provides necessary interface for use with gmxapi.operation.ResourceManager.
1192    Separates the details of an Operation from those of the ResourceManager in
1193    a given Context.
1194
1195    OperationDetails classes are almost stateless, serving mainly to compose implementation
1196    details. Instances (operation objects) provide the Context-dependent interfaces
1197    for a specific node in a work graph.
1198
1199    OperationDetails subclasses are created dynamically by function_wrapper and
1200    make_operation.
1201
1202    Developer note: when subclassing, note that the ResourceManager is responsible
1203    for managing Operation state. Do not add instance data members related to
1204    computation or output state.
1205
1206    TODO: determine what is acceptable instance data and/or initialization information.
1207    Note that currently the subclass in function_wrapper has _no_ initialization input,
1208    but does not yet handle input-dependent output specification or node fingerprinting.
1209    It seems likely that instance initialization will require some characterization of
1210    supplied input, but nothing else. Even that much is not necessary if the instance
1211    is completely stateless, but that would require additional parameters to the member
1212    functions. However, an instance should be tied to a specific ResourceManager and
1213    Context, so weak references to these would be reasonable.
1214    """
1215
1216    @abc.abstractmethod
1217    def output_description(self) -> OutputCollectionDescription:
1218        """Mapping of available outputs and types for an existing Operation node."""
1219        ...
1220
1221    @abc.abstractmethod
1222    def publishing_data_proxy(self, *,
1223                              instance: SourceResource[typing.Any, _PublishingDataProxyType],
1224                              client_id) -> _PublishingDataProxyType:
1225        """Factory for Operation output publishing resources.
1226
1227        Used internally when the operation is run with resources provided by instance."""
1228        ...
1229
1230    @abc.abstractmethod
1231    def output_data_proxy(self,
1232                          instance: SourceResource[_OutputDataProxyType, typing.Any]
1233                          ) -> _OutputDataProxyType:
1234        """Get an object that can provide Futures for output data managed by instance."""
1235        ...
1236
1237    @abc.abstractmethod
1238    def __call__(self, resources: _Resources):
1239        """Execute the operation with provided resources.
1240
1241        Resources are prepared in an execution context with aid of resource_director()
1242
1243        After the first call, output data has been published and is trivially
1244        available through the output_data_proxy()
1245        """
1246        ...
1247
1248    @classmethod
1249    @abc.abstractmethod
1250    def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]:
1251        """a Director factory that helps build the Session Resources for the function.
1252
1253        The Session launcher provides the director with all of the resources previously
1254        requested/negotiated/registered by the Operation. The director uses details of
1255        the operation to build the resources object required by the operation runner.
1256
1257        For the Python Context, the protocol is for the Context to call the
1258        resource_director instance method, passing input and output containers.
1259        (See, for example, gmxapi.operation.PyFunctionRunnerResources)
1260        """
1261        ...
1262
1263    # TODO: Don't run the director. Just return the correct callable.
1264    @classmethod
1265    def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation:
1266        """Dispatching Director for adding a work node.
1267
1268        A Director for input of a particular sort knows how to reconcile
1269        input with the requirements of the Operation and Context node builder.
1270        The Director (using a less flexible / more standard interface)
1271        builds the operation node using a node builder provided by the Context.
1272
1273        This is essentially the creation method, instead of __init__, but the
1274        object is created and owned by the framework, and the caller receives
1275        an OperationHandle instead of a reference to an instance of cls.
1276
1277        # TODO: We need a way to compose this functionality for arbitrary Contexts.
1278        # That likely requires traits on the Contexts, and registration of Context
1279        # implementations. It seems likely that an Operation will register Director
1280        # implementations on import, and dispatching will be moved to the Context
1281        # implementations, which can either find an appropriate OperationDirector
1282        # or raise a compatibility error. To avoid requirements on import order of
1283        # Operations and Context implementations, we can change this to a non-abstract
1284        # dispatching method, requiring registration in the global gmxapi.context
1285        # module, or get rid of this method and use something like pkg_resources
1286        # "entry point" groups for independent registration of Directors and Contexts,
1287        # each annotated with relevant traits. E.g.:
1288        # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins
1289        """
1290        if not isinstance(context, Context):
1291            raise exceptions.UsageError('Context instance needed for dispatch.')
1292        # TODO: use Context characteristics rather than isinstance checks.
1293        if isinstance(context, ModuleContext):
1294            construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1295            return construct()
1296        elif isinstance(context, SubgraphContext):
1297            construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs)
1298            return construct()
1299        else:
1300            raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context))
1301
1302
1303# TODO: Implement observer pattern for edge->node data flow.
1304# Step 0: implement subject interface subscribe()
1305# Step 1: implement subject interface get_state()
1306# Step 2: implement observer interface update()
1307# Step 3: implement subject interface notify()
1308# Step 4: implement observer hook to support substantial change in source that
1309#         invalidates downstream fingerprinting, such as a new subgraph iteration.
1310# class Observer(object):
1311#     """Abstract base class for data observers."""
1312#     def rebind(self, edge: DataEdge):
1313#         """Recreate the Operation at the consuming end of the DataEdge."""
1314
1315
1316class Future(_Future):
1317    """gmxapi data handle.
1318
1319    Future is currently more than a Future right now. (should be corrected / clarified.)
1320    Future is also a facade to other features of the data provider.
1321
1322    Future objects are the most atomic interface between Contexts. User scripts
1323    may hold Futures from which they extract data with result(). Operation output
1324    used as input for another Operation can be decomposed such that the Operation
1325    factory has only Future objects in its input.
1326
1327    TODO: ``subscribe`` method allows consumers to bind as Observers.
1328
1329    TODO: extract the abstract class for input inspection?
1330    Currently abstraction is handled through SourceResource subclassing.
1331
1332    Attributes:
1333        description (ResultDescription): Describes the result to be obtained from this Future.
1334
1335    """
1336
1337    def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription):
1338        self.name = name
1339        if not isinstance(description, ResultDescription):
1340            raise exceptions.ValueError('Need description of requested data.')
1341        self.description = description  # type: ResultDescription
1342        self.resource_manager = resource_manager
1343
1344        # Deprecated. We should not "reset" futures, but reconstitute them, but we
1345        # need to move the data model to a subscription-based system so that we can
1346        # make Futures properly immutable and issue new ones across subgraph iterations.
1347        self._number_of_resets = 0
1348
1349    def __eq__(self, other):
1350        # This function is defined because we have defined __hash__().
1351        # Before customizing __eq__(), recall that Python objects that compare
1352        # equal should hash to the same value.
1353        # Please keep the two functions semantically correct.
1354        return object.__eq__(self, other)
1355
1356    def __hash__(self):
1357        # We cannot properly determine equivalency beyond the scope of a ResourceManager instance
1358        # without more developed data flow fingerprinting.
1359        return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets))
1360
1361    def __str__(self):
1362        return '<Future: name={}, description={}>'.format(self.name, self.description)
1363
1364    def result(self) -> ResultTypeVar:
1365        """Fetch data to the caller's Context.
1366
1367        Returns an object of the concrete type specified according to
1368        the operation that produces this Result.
1369
1370        Ensemble data are returned as a list. Scalar results or results from single
1371        member ensembles are returned as scalars.
1372        """
1373        self.resource_manager.update_output()
1374        # Return ownership of concrete data
1375        handle = self.resource_manager.get(self.name)
1376
1377        # For intuitive use in non-ensemble cases, we represent data as bare scalars
1378        # when possible. It is easier for users to cast scalars to lists of length 1
1379        # than to introspect their own code to determine if a list of length 1 is
1380        # part of an ensemble or not. The data model will become clearer as we
1381        # develop more robust handling of multidimensional data and data flow topologies.
1382        # In the future,
1383        # we may distinguish between data of shape () and shape (1,), but we will need
1384        # to be careful with semantics. We are already starting to adopt a rule-of-thumb
1385        # that data objects assume the minimum dimensionality necessary unless told
1386        # otherwise, and we could make that a hard rule if it doesn't make other things
1387        # too difficult.
1388        if self.description.width == 1:
1389            return handle.data(member=0)
1390        else:
1391            return handle.data()
1392
1393    def _reset(self):
1394        """Mark the Future "not done" to allow reexecution.
1395
1396        Invalidates cached results, resets "done" markers in data sources, and
1397        triggers _reset recursively.
1398
1399        Note: this is a hack that is inconsistent with the plan of unique mappings
1400        of inputs to outputs, but allows a quick prototype for looping operations.
1401        """
1402        self._number_of_resets += 1
1403        self.resource_manager.reset()
1404
1405    @property
1406    def dtype(self):
1407        return self.description.dtype
1408
1409    def __getitem__(self, item):
1410        """Get a more limited view on the Future."""
1411        description = ResultDescription(dtype=self.dtype, width=self.description.width)
1412        # TODO: Use explicit typing when we have more thorough typing.
1413        description._dtype = None
1414        if self.description.width == 1:
1415            proxy = ProxyResourceManager(self,
1416                                         width=description.width,
1417                                         function=lambda value, key=item: value[key])
1418        else:
1419            proxy = ProxyResourceManager(self,
1420                                         width=description.width,
1421                                         function=lambda value, key=item:
1422                                         [subscriptable[key] for subscriptable in value])
1423        return proxy.future(self.name, description=description)
1424
1425
1426class OutputDataDescriptor(ProxyDataDescriptor):
1427    """Read-only data descriptor for proxied access to output data.
1428
1429    Knows how to get a Future from the resource manager.
1430    """
1431
1432    # TODO: Reconcile the internal implementation details with the visibility and
1433    #  usages of this class.
1434
1435    def __get__(self, proxy: DataProxyBase, owner):
1436        if proxy is None:
1437            # Access through class attribute of owner class
1438            return self
1439        result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width)
1440
1441        return proxy._resource_instance.future(name=self._name, description=result_description)
1442
1443
1444class MutableResourceDescriptor(ProxyDataDescriptor):
1445    """Accessor for rich binding interfaces.
1446
1447    Allows operations to access resources beyond the scope of the current
1448    resource manager. Used by operations whose interactions are more complicated
1449    than standard typed data flow at the scope of the current Context.
1450
1451    Instead of a Future interface, the returned object is a MutableResource with
1452    which a subscriber can collaborate with lower-level protocols.
1453    """
1454
1455    def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']:
1456        if proxy is None:
1457            # Access through class attribute of owner class. We don't have a
1458            # specified use case for that, so allow inspection of the data
1459            # descriptor instance, itself.
1460            return self
1461        # TODO: implement.
1462        # The protocol for MD extension plugins requires that the simulation operation
1463        # subscribe to the plugin. Then the Context allows the plugin to access the
1464        # MdRunner interface as the simulation is launched.
1465        # The protocol for modify_input and for mdrun to consume the TPR payload
1466        # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7
1467        # WorkSpec to configure and launch a simulation, which we can do by feeding
1468        # forward and building a fused operation at the mdrun node. The information
1469        # fed forward can just be references to the inputs and parameters of the
1470        # earlier operations, with annotations so that we know the intended behavior.
1471
1472
1473def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]:
1474    descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()}
1475
1476    class OutputDataProxy(DataProxyBase, descriptors=descriptors):
1477        """Handler for read access to the `output` member of an operation handle.
1478
1479        Acts as a sort of ResultCollection.
1480
1481        A ResourceManager creates an OutputDataProxy instance at initialization to
1482        provide the ``output`` property of an operation handle.
1483        """
1484
1485    # Note: the OutputDataProxy has an inherent ensemble shape in the context
1486    # in which it is used, but that is an instance characteristic, not part of this type definition.
1487    # TODO: (FR5) The current tool does not support topology changing operations.
1488    return OutputDataProxy
1489
1490
1491# Encapsulate the description of the input data flow.
1492PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies'))
1493
1494
1495class SinkTerminal(object):
1496    """Operation input end of a data edge.
1497
1498    In addition to the information in an InputCollectionDescription, includes
1499    topological information for the Operation node (ensemble width).
1500
1501    Collaborations: Required for creation of a DataEdge. Created with knowledge
1502    of a DataSourceCollection instance and a InputCollectionDescription.
1503    """
1504
1505    # TODO: This clearly maps to a Builder pattern.
1506    # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription,
1507    # add data source collections, and then build the sink terminal for the data edge.
1508    def __init__(self, input_collection_description: InputCollectionDescription):
1509        """Define an appropriate data sink for a new operation node.
1510
1511        Resolve data sources and input description to determine connectability,
1512        topology, and any necessary implicit data transformations.
1513
1514        :param input_collection_description: Available inputs for Operation
1515        :return: Fully formed description of the Sink terminal for a data edge to be created.
1516
1517        Collaborations: Execution Context implementation.
1518        """
1519        self.ensemble_width = 1
1520        self.inputs = input_collection_description
1521
1522    def __str__(self):
1523        return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width)
1524
1525    def update_width(self, width: int):
1526        if not isinstance(width, int):
1527            try:
1528                width = int(width)
1529            except TypeError:
1530                raise exceptions.TypeError('Need an integer width > 0.')
1531        if width < 1:
1532            raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width)))
1533        if self.ensemble_width != 1:
1534            if width != self.ensemble_width:
1535                raise exceptions.ValueError(
1536                    'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width))
1537        self.ensemble_width = width
1538
1539    def update(self, data_source_collection: DataSourceCollection):
1540        """Update the SinkTerminal with the proposed data provider."""
1541        for name, sink_dtype in self.inputs.items():
1542            if name not in data_source_collection:
1543                # If/when we accept data from multiple sources, we'll need some additional sanity checking.
1544                if not hasattr(self.inputs.signature.parameters[name], 'default'):
1545                    raise exceptions.UsageError('No data or default for {}'.format(name))
1546            else:
1547                # With a single data source, we need data to be in the source or have a default
1548                assert name in data_source_collection
1549                assert issubclass(sink_dtype, valid_result_types)
1550                source = data_source_collection[name]
1551                logger.debug('Updating Sink for source {}: {}.'.format(name, source))
1552                if isinstance(source, sink_dtype):
1553                    logger.debug('Source matches sink. No update necessary.')
1554                    continue
1555                else:
1556                    if isinstance(source, collections.abc.Iterable) and not isinstance(source, (
1557                            str, bytes, collections.abc.Mapping)):
1558                        assert isinstance(source, datamodel.NDArray)
1559                        if sink_dtype != datamodel.NDArray:
1560                            # Source is NDArray, but sink is not. Implicitly scatter.
1561                            self.update_width(len(source))
1562                        continue
1563                    if hasattr(source, 'description'):
1564                        source_description = typing.cast(ResultDescription, source.description)
1565                        source_dtype = source_description.dtype
1566                        assert isinstance(sink_dtype, type)
1567                        # TODO: Handle typing of Future slices when we have a better data model.
1568                        if source_dtype is not None:
1569                            assert isinstance(source_dtype, type)
1570                            if not issubclass(source_dtype, sink_dtype):
1571                                raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype))
1572                        source_width = source.description.width
1573                        self.update_width(source_width)
1574
1575
1576class DataEdge(object):
1577    """State and description of a data flow edge.
1578
1579    A DataEdge connects a data source collection to a data sink. A sink is an
1580    input or collection of inputs of an operation (or fused operation). An operation's
1581    inputs may be fed from multiple data source collections, but an operation
1582    cannot be fully instantiated until all of its inputs are bound, so the DataEdge
1583    is instantiated at the same time the operation is instantiated because the
1584    required topology of a graph edge may be determined by the required topology
1585    of another graph edge.
1586
1587    A data edge has a well-defined topology only when it is terminated by both
1588    a source and sink. Creation requires that a source collection is compared to
1589    a sink description.
1590
1591    Calling code initiates edge creation by passing well-described data sources
1592    to an operation factory. The data sources may be annotated with explicit scatter
1593    or gather commands.
1594
1595    The resource manager for the new operation determines the
1596    required shape of the sink to handle all of the offered input.
1597
1598    Broadcasting
1599    and transformations of the data sources are then determined and the edge is
1600    established.
1601
1602    At that point, the fingerprint of the input data at each operation
1603    becomes available to the resource manager for the operation. The fingerprint
1604    has sufficient information for the resource manager of the operation to
1605    request and receive data through the execution context.
1606
1607    Instantiating operations and data edges implicitly involves collaboration with
1608    a Context instance. The state of a given Context or the availability of a
1609    default Context through a module function may affect the ability to instantiate
1610    an operation or edge. In other words, behavior may be different for connections
1611    being made in the scripting environment versus the running Session, and implementation
1612    details can determine whether or not new operations or data flow can occur in
1613    different code environments.
1614    """
1615
1616    class ConstantResolver(object):
1617        def __init__(self, value):
1618            self.value = value
1619
1620        def __call__(self, member=None):
1621            return self.value
1622
1623    def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal):
1624        # Adapters are callables that transform a source and node ID to local data.
1625        # Every key in the sink has an adapter.
1626        self.adapters = {}
1627        self.source_collection = source_collection
1628        self.sink_terminal = sink_terminal
1629        for name in sink_terminal.inputs:
1630            if name not in source_collection:
1631                if hasattr(sink_terminal.inputs[name], 'default'):
1632                    self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name])
1633                else:
1634                    # TODO: Initialize with multiple DataSourceCollections?
1635                    raise exceptions.ValueError('No source or default for required input "{}".'.format(name))
1636            else:
1637                source = source_collection[name]
1638                sink = sink_terminal.inputs[name]
1639                if isinstance(source, (str, bool, int, float, dict)):
1640                    if issubclass(sink, (str, bool, int, float, dict)):
1641                        self.adapters[name] = self.ConstantResolver(source)
1642                    else:
1643                        assert issubclass(sink, datamodel.NDArray)
1644                        self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source]))
1645                elif isinstance(source, datamodel.NDArray):
1646                    if issubclass(sink, datamodel.NDArray):
1647                        # TODO: shape checking
1648                        # Implicit broadcast may not be what is intended
1649                        self.adapters[name] = self.ConstantResolver(source)
1650                    else:
1651                        if source.shape[0] != sink_terminal.ensemble_width:
1652                            raise exceptions.ValueError(
1653                                'Implicit broadcast could not match array source to ensemble sink')
1654                        else:
1655                            self.adapters[name] = lambda member, source=source: source[member]
1656                elif hasattr(source, 'result'):
1657                    # Handle data futures...
1658                    # If the Future is part of an ensemble, result() will return a list.
1659                    # Otherwise, it will return a single object.
1660                    ensemble_width = source.description.width
1661                    # TODO: subscribe to futures so results can be pushed.
1662                    if ensemble_width == 1:
1663                        self.adapters[name] = lambda member, source=source: source.result()
1664                    else:
1665                        self.adapters[name] = lambda member, source=source: source.result()[member]
1666                else:
1667                    assert isinstance(source, EnsembleDataSource)
1668                    self.adapters[name] = lambda member, source=source: source.node(member)
1669
1670    def __str__(self):
1671        return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal)
1672
1673    def reset(self):
1674        self.source_collection.reset()
1675
1676    def resolve(self, key: str, member: int):
1677        return self.adapters[key](member=member)
1678
1679    def sink(self, node: int) -> dict:
1680        """Consume data for the specified sink terminal node.
1681
1682        Run-time utility delivers data from the bound data source(s) for the
1683        specified terminal that was configured when the edge was created.
1684
1685        Terminal node is identified by a member index number.
1686
1687        Returns:
1688            A Python dictionary of the provided inputs as local data (not Future).
1689        """
1690        results = {}
1691        sink_ports = self.sink_terminal.inputs
1692        for key in sink_ports:
1693            results[key] = self.resolve(key, node)
1694        return results
1695
1696
1697class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]):
1698    """Provides data publication and subscription services.
1699
1700        Owns the data published by the operation implementation or served to consumers.
1701        Mediates read and write access to the managed data streams.
1702
1703        This ResourceManager implementation is defined in conjunction with a
1704        run-time definition of an Operation that wraps a Python callable (function).
1705        ResourceManager is instantiated with a reference to the callable.
1706
1707        When the Operation is run, the resource manager prepares resources for the wrapped
1708        function. Inputs provided to the Operation factory are provided to the
1709        function as keyword arguments. The wrapped function publishes its output
1710        through the (additional) ``output`` key word argument. This argument is
1711        a short-lived resource, prepared by the ResourceManager, with writable
1712        attributes named in the call to function_wrapper().
1713
1714        After the Operation has run and the outputs published, the data managed
1715        by the ResourceManager is marked "done."
1716
1717        Protocols:
1718
1719        The data() method produces a read-only collection of outputs named for
1720        the Operation when the Operation's ``output`` attribute is accessed.
1721
1722        publishing_resources() can be called once during the ResourceManager lifetime
1723        to provide the ``output`` object for the wrapped function. (Used by update_output().)
1724
1725        update_output() brings the managed output data up-to-date with the input
1726        when the Operation results are needed. If the Operation has not run, an
1727        execution session is prepared with input and output arguments for the
1728        wrapped Python callable. Output is publishable only during this session.
1729
1730    TODO: This functionality should evolve to be a facet of Context implementations.
1731     There should be no more than one ResourceManager instance per work graph
1732     node in a Context. This will soon be at odds with letting the ResourceManager
1733     be owned by an operation instance handle.
1734    TODO: The publisher and data objects can be more strongly defined through
1735     interaction between the Context and clients.
1736
1737    Design notes:
1738
1739    The normative pattern for updating data is to execute a node in the work
1740    graph, passing Resources for an execution Session to an operation runner.
1741    The resources and runner are dependent on the implementation details of
1742    the operation and the execution context, so logical execution may look
1743    like the following.
1744
1745        resource_builder = ResourcesBuilder()
1746        runner_builder = RunnerBuilder()
1747        input_resource_director = input_resource_factory.director(input)
1748        output_resource_director = publishing_resource_factory.director(output)
1749        input_resource_director(resource_builder, runner_builder)
1750        output_resource_director(resource_builder, runner_builder)
1751        resources = resource_builder.build()
1752        runner = runner_builder.build()
1753        runner(resources)
1754
1755    Only the final line is intended to be literal. The preceding code, if it
1756    exists in entirety, may be spread across several code comments.
1757
1758    TODO: Data should be pushed, not pulled.
1759    Early implementations executed operation code and extracted results directly.
1760    While we need to be able to "wait for" results and alert the data provider that
1761    we are ready for input, we want to defer execution management and data flow to
1762    the framework.
1763    """
1764
1765    @contextmanager
1766    def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]:
1767        """Get a context manager for resolving the data dependencies of this node.
1768
1769        The returned object is a Python context manager (used to open a `with` block)
1770        to define the scope in which the operation's output can be published.
1771        'output' type resources can be published exactly once, and only while the
1772        publishing context is active. (See operation.function_wrapper())
1773
1774        Used internally to implement ResourceManager.publishing_resources()
1775
1776        Responsibilities of the context manager are to:
1777            * (TODO) Make sure dependencies are resolved.
1778            * Make sure outputs are marked 'done' when leaving the context.
1779
1780        """
1781
1782        # TODO:
1783        # if self._data.done():
1784        #     raise exceptions.ProtocolError('Resources have already been published.')
1785
1786        # I don't think we want the OperationDetails to need to know about ensemble data,
1787        # (though the should probably be allowed to), so we may need a separate interface
1788        # for the resource manager with built-in scope-limiting to a single ensemble member.
1789        # Right now, one Operation handle owns one ResourceManager (which takes care of
1790        # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge).
1791        # It is the responsibility of the calling code to make sure the PublishingDataProxy
1792        # gets used correctly.
1793
1794        # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager
1795        if self._done[ensemble_member]:
1796            raise exceptions.ProtocolError('Attempting to publish {}[{}] more than once.'.format(self.operation_id, ensemble_member))
1797
1798        try:
1799            resource = self.__publishing_data_proxy(instance=weakref.proxy(self),
1800                                                    client_id=ensemble_member)
1801        except Exception as e:
1802            logger.debug('Publishing context could not be created due to {}'.format(e))
1803            raise e
1804
1805        yield resource
1806        # Note: The remaining lines are skipped if an exception occurs in the `with` block
1807        # for the contextmanager suite, which effectively raises at the line after 'yield'.
1808        logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member))
1809        self._done[ensemble_member] = True
1810
1811    def __init__(self, *,
1812                 source: DataEdge,
1813                 operation_id,
1814                 output_description: OutputCollectionDescription,
1815                 output_data_proxy: typing.Type[_OutputDataProxyType],
1816                 publishing_data_proxy: typing.Type[_PublishingDataProxyType],
1817                 resource_factory,
1818                 runner_director,
1819                 output_context: 'Context'):
1820        """Initialize a resource manager for the inputs and outputs of an operation.
1821        """
1822        # Note: This implementation assumes there is one ResourceManager instance per data source,
1823        # so we only stash the inputs and dependency information for a single set of resources.
1824        # TODO: validate input_fingerprint as its interface becomes clear.
1825        self._input_edge = source
1826        self.ensemble_width = self._input_edge.sink_terminal.ensemble_width
1827
1828        # Node UID.
1829        self.operation_id = operation_id
1830
1831        if isinstance(output_context, Context):
1832            self._output_context = output_context
1833        else:
1834            message = 'Provide an instance of gmxapi.operation.Context for output_context'
1835            raise exceptions.UsageError(message)
1836        assert self._output_context is not None
1837
1838        self._output_data_proxy = output_data_proxy
1839        assert self._output_data_proxy is not None
1840        assert callable(self._output_data_proxy)
1841
1842        self._output_description = output_description
1843        assert self._output_description is not None
1844
1845        self.__publishing_data_proxy = publishing_data_proxy
1846        assert self.__publishing_data_proxy is not None
1847        assert callable(self.__publishing_data_proxy)
1848
1849        self._runner_director = runner_director
1850        assert self._runner_director is not None
1851        self._resource_factory = resource_factory
1852        assert self._resource_factory is not None
1853
1854        self._data = _make_datastore(output_description=self._output_description,
1855                                     ensemble_width=self.ensemble_width)
1856
1857        # We store a rereference to the publishing context manager implementation
1858        # in a data structure that can only produce one per Python interpreter
1859        # (using list.pop()).
1860        # TODO: reimplement as a data descriptor
1861        #  so that PublishingDataProxy does not need a bound circular reference.
1862        self.__publishing_resources = [self.__publishing_context]
1863
1864        self._done = [False] * self.ensemble_width
1865        self.__operation_entrance_counter = 0
1866
1867    def width(self) -> int:
1868        return self.ensemble_width
1869
1870    def reset(self):
1871        self.__operation_entrance_counter = 0
1872        self._done = [False] * self.ensemble_width
1873        self.__publishing_resources = [self.__publishing_context]
1874        for data in self._data.values():
1875            data.reset()
1876        self._input_edge.reset()
1877        assert self.__operation_entrance_counter == 0
1878
1879    def done(self, member=None):
1880        if member is None:
1881            return all(self._done)
1882        else:
1883            return self._done[member]
1884
1885    def set_result(self, name, value, member: int):
1886        if not isinstance(value, (str, bytes)):
1887            try:
1888                for item in value:
1889                    # In this specification, it is antithetical to publish Futures.
1890                    if hasattr(item, 'result'):
1891                        raise exceptions.ApiError('Operation produced Future instead of real output.')
1892            except TypeError:
1893                # Ignore when `item` is not iterable.
1894                pass
1895        self._data[name].set(value=value, member=member)
1896
1897    def is_done(self, name):
1898        return self._data[name].done
1899
1900    def get(self, name: str):
1901        """
1902
1903        Raises exceptions.ProtocolError if requested data is not local yet.
1904        Raises exceptions.ValueError if data is requested for an unknown name.
1905        """
1906        if name not in self._data:
1907            raise exceptions.ValueError('Request for unknown data.')
1908        if not self.is_done(name):
1909            raise exceptions.ProtocolError('Data not ready.')
1910        assert isinstance(self._data[name], OutputData)
1911        return self._data[name]
1912
1913    # TODO: Normalize. This is the no-argument, no-return callable member of an
1914    #  operation handle that dispatches to another Context (the operation implementation).
1915    # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can
1916    #  be the default behavior, but we don't want to require non-local updates in all cases.
1917    def update_output(self):
1918        """Bring the output of the bound operation up to date.
1919
1920        Execute the bound operation once if and only if it has not
1921        yet been run in the lifetime of this resource manager.
1922
1923        Used internally to implement Futures for the local operation
1924        associated with this resource manager.
1925
1926        Raises:
1927            exceptions.ApiError if operation runner fails to publish output.
1928
1929        TODO: More comprehensive error handling for operations that fail to execute.
1930
1931        TODO: We need a different implementation for an operation whose output
1932         is served by multiple resource managers. E.g. an operation whose output
1933         is available across the ensemble, but which should only be executed on
1934         a single ensemble member.
1935        """
1936        # This code is not intended to be reentrant. We make a modest attempt to
1937        # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe
1938        # resource manager implementation.
1939        # TODO: Handle checking just the ensemble members this resource manager is responsible for.
1940        # TODO: Replace with a managed observer pattern. Update once when input is available in the Context.
1941        if not self.done():
1942            # Note: This check could also be encapsulated in a run_once decorator that
1943            # could even set a data descriptor to change behavior.
1944            self.__operation_entrance_counter += 1
1945            if self.__operation_entrance_counter > 1:
1946                raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
1947            if not self.done():
1948                # Note! This is a detail of the ResourceManager in a SerialContext
1949                # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
1950                #  operation's library/implementation context.
1951                publishing_resources = self.publishing_resources()
1952                for i in range(self.ensemble_width):
1953                    # TODO: rewrite the following expression as a call to a resource factory.
1954                    # TODO: Consider whether the resource_factory behavior should be normalized
1955                    #  to always use `with` blocks to indicate the lifetime of a resource handle.
1956                    #  That implies that an operation handle can expire, but the operation handle
1957                    #  could be "yield"ed
1958                    #  from within the `with` block to keep the resource scope alive until the resulting
1959                    #  generator is exhausted. Not sure what that looks like or what the use case would be.
1960                    with self.local_input(i) as input:
1961                        # Note: Resources are marked "done" by the publishing system
1962                        # before the following context manager finishes exiting.
1963                        with publishing_resources(ensemble_member=i) as output:
1964                            # self._runner(*input.args, output=output, **input.kwargs)
1965                            ####
1966                            # Here we can make _runner a thing that accepts session resources, and
1967                            # is created by specializable builders. Separate out the expression of
1968                            # inputs.
1969                            #
1970                            # resource_builder = OperationDetails.ResourcesBuilder(context)
1971                            # runner_builder = OperationDetails.RunnerBuilder(context)
1972                            # input_resource_director = self._input_resource_factory.director(input)
1973                            # output_resource_director = self._publishing_resource_factory.director(output)
1974                            # input_resource_director(resource_builder, runner_builder)
1975                            # output_resource_director(resource_builder, runner_builder)
1976                            # resources = resource_builder.build()
1977                            # runner = runner_builder.build()
1978                            # runner(resources)
1979                            #
1980                            # This resource factory signature might need to be inverted or broken up
1981                            # into a builder for consistency. I.e.
1982                            # option 1: Make the input and output resources with separate factories and add_resource on
1983                            # the runner builder.
1984                            # option 2: Pass resource_builder to input_director and then output_director.
1985                            error_message = 'Got {} while executing {} for operation {}.'
1986                            try:
1987                                resources = self._resource_factory(input=input, output=output)
1988                            except exceptions.TypeError as e:
1989                                message = error_message.format(e, self._resource_factory, self.operation_id)
1990                                raise exceptions.ApiError(message) from e
1991
1992                            runner = self._runner_director(resources)
1993                            try:
1994                                runner()
1995                            except Exception as e:
1996                                message = error_message.format(e, runner, self.operation_id)
1997                                raise exceptions.ApiError(message) from e
1998            if not self.done():
1999                message = 'update_output implementation failed to update all outputs for {}.'
2000                message = message.format(self.operation_id)
2001                raise exceptions.ApiError(message)
2002
2003    def future(self, name: str, description: ResultDescription):
2004        """Retrieve a Future for a named output.
2005
2006        Provide a description of the expected result to check for compatibility or
2007        implicit topological conversion.
2008
2009        TODO: (FR5+) Normalize this part of the interface between operation definitions and
2010         resource managers.
2011        """
2012        if not isinstance(name, str) or name not in self._data:
2013            raise exceptions.ValueError('"name" argument must name an output.')
2014        assert description is not None
2015        requested_dtype = description.dtype
2016        available_dtype = self._data[name]._description.dtype
2017        if requested_dtype != available_dtype:
2018            # TODO: framework to check for implicit conversions
2019            message = 'Requested Future of type {} is not compatible with available type {}.'
2020            message = message.format(requested_dtype, available_dtype)
2021            raise exceptions.ApiError(message)
2022        return Future(self, name, description)
2023
2024    def data(self) -> _OutputDataProxyType:
2025        """Get an adapter to the output resources to access results."""
2026        return self._output_data_proxy(self)
2027
2028    @contextmanager
2029    def local_input(self, member: int = None):
2030        """In an API session, get a handle to fully resolved locally available input data.
2031
2032        Execution dependencies are resolved on creation of the context manager. Input data
2033        becomes available in the ``as`` object when entering the context manager, which
2034        becomes invalid after exiting the context manager. Resources allocated to hold the
2035        input data may be released when exiting the context manager.
2036
2037        It is left as an implementation detail whether the context manager is reusable and
2038        under what circumstances one may be obtained.
2039        """
2040        # Localize data
2041        kwargs = self._input_edge.sink(node=member)
2042        assert 'input' not in kwargs
2043
2044        # Check that we have real data
2045        for key, value in kwargs.items():
2046            assert not hasattr(value, 'result')
2047            assert not hasattr(value, 'run')
2048            value_list = []
2049            if isinstance(value, list):
2050                value_list = value
2051            if isinstance(value, datamodel.NDArray):
2052                value_list = value._values
2053            if isinstance(value, collections.abc.Mapping):
2054                value_list = value.values()
2055            assert not isinstance(value_list, Future)
2056            assert not hasattr(value_list, 'result')
2057            assert not hasattr(value_list, 'run')
2058            for item in value_list:
2059                assert not hasattr(item, 'result')
2060
2061        input_pack = InputPack(kwargs=kwargs)
2062
2063        # Prepare input data structure
2064        # Note: we use 'yield' instead of 'return' for the protocol expected by
2065        # the @contextmanager decorator
2066        yield input_pack
2067
2068    def publishing_resources(self):
2069        """Get a context manager for resolving the data dependencies of this node.
2070
2071        Use the returned object as a Python context manager.
2072        'output' type resources can be published exactly once, and only while the
2073        publishing context is active.
2074
2075        Write access to publishing resources can be granted exactly once during the
2076        resource manager lifetime and conveys exclusive access.
2077        """
2078        return self.__publishing_resources.pop()
2079
2080
2081class PyFunctionRunnerResources(collections.UserDict):
2082    """Runtime resources for Python functions.
2083
2084    Produced by a ResourceDirector for a particular Operation.
2085    """
2086
2087    def output(self):
2088        if 'output' in self:
2089            return self['output']
2090        else:
2091            return None
2092
2093    def input(self):
2094        return {key: value for key, value in self.items() if key != 'output'}
2095
2096
2097class PyFunctionRunner(abc.ABC):
2098    def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription):
2099        assert callable(function)
2100        self.function = function
2101        self.output_description = output_description
2102
2103    @abc.abstractmethod
2104    def __call__(self, resources: PyFunctionRunnerResources):
2105        self.function(output=resources.output(), **resources.input())
2106
2107
2108class CapturedOutputRunner(PyFunctionRunner):
2109    """Function runner that captures return value as output.data"""
2110
2111    def __call__(self, resources: PyFunctionRunnerResources):
2112        resources['output'].data = self.function(**resources.input())
2113
2114
2115class OutputParameterRunner(PyFunctionRunner):
2116    """Function runner that uses output parameter to let function publish output."""
2117
2118    def __call__(self, resources: PyFunctionRunnerResources):
2119        self.function(**resources)
2120
2121
2122def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner:
2123    """Get an adapter for a function to be wrapped.
2124
2125    If the function does not accept a publishing data proxy as an `output`
2126    key word argument, the returned object has a `capture_output` attribute that
2127    must be re-assigned by the calling code before calling the runner. `capture_output`
2128    must be assigned to be a callable that will receive the output of the wrapped
2129    function.
2130
2131    Returns:
2132        Callable with a signature `__call__(*args, **kwargs)` and no return value
2133
2134    Collaborations:
2135        OperationDetails.resource_director assigns the `capture_output` member of the returned object.
2136    """
2137    assert callable(function)
2138    signature = inspect.signature(function)
2139
2140    # Implementation note: this function dispatches an implementation with the
2141    # logic below. A better factoring would be a "chain of responsibility" in
2142    # which the concrete Runners would be tried in sequence and determine internally
2143    # whether to create a runner, raise an error, or defer.
2144
2145    # Determine output details for proper dispatching.
2146    # First check for signature with output parameter.
2147    # TODO FR4: standardize typing
2148    if 'output' in signature.parameters:
2149        if not isinstance(output_description, OutputCollectionDescription):
2150            if not isinstance(output_description, collections.abc.Mapping):
2151                raise exceptions.UsageError(
2152                    'Function passes output through call argument, but output is not described.')
2153            return OutputParameterRunner(
2154                function=function,
2155                output_description=OutputCollectionDescription(**output_description))
2156        else:
2157            return OutputParameterRunner(function=function,
2158                                         output_description=output_description)
2159    # Next try output_description parameter or function return annotation.
2160    else:
2161        if isinstance(output_description, OutputCollectionDescription):
2162            return_type = output_description['data'].gmxapi_datatype
2163        elif output_description is not None:
2164            # output_description should be None for inferred output or
2165            # a singular mapping of the key 'data' to a gmxapi type.
2166            if not isinstance(output_description, collections.abc.Mapping) \
2167                    or set(output_description.keys()) != {'data'}:
2168                raise exceptions.ApiError(
2169                    'invalid output description for wrapped function: {}'.format(output_description))
2170            if signature.return_annotation != signature.empty:
2171                if signature.return_annotation != output_description['data']:
2172                    raise exceptions.ApiError(
2173                        'Wrapped function with return-value-capture provided with non-matching output description.')
2174            return_type = output_description['data']
2175        else:
2176            # Use return type inferred from function signature.
2177            return_type = signature.return_annotation
2178        if return_type == signature.empty or return_type is None:
2179            raise exceptions.ApiError('No return annotation or output_description for {}'.format(function))
2180        return CapturedOutputRunner(function=function,
2181                                    output_description=OutputCollectionDescription(data=return_type))
2182
2183
2184# TODO: Refactor in terms of reference to a node in a Context.
2185#  ResourceManager is an implementation detail of how the Context
2186#  manages a node.
2187class OperationHandle(AbstractOperation[_OutputDataProxyType]):
2188    """Generic Operation handle for dynamically defined operations.
2189
2190    Define a gmxapi Operation for the functionality being wrapped by the enclosing code.
2191
2192    An Operation type definition encapsulates description of allowed inputs
2193    of an Operation. An Operation instance represents a node in a work graph
2194    with uniquely fingerprinted inputs and well-defined output. The implementation
2195    of the operation is a collaboration with the resource managers resolving
2196    data flow for output Futures, which may depend on the execution context.
2197    """
2198
2199    def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]):
2200        """Initialization defines the unique input requirements of a work graph node.
2201
2202        Initialization parameters map to the parameters of the wrapped function with
2203        addition(s) to support gmxapi data flow and deferred execution.
2204
2205        If provided, an ``input`` keyword argument is interpreted as a parameter pack
2206        of base input. Inputs also present as standalone keyword arguments override
2207        values in ``input``.
2208
2209        Inputs that are handles to gmxapi operations or outputs induce data flow
2210        dependencies that the framework promises to satisfy before the Operation
2211        executes and produces output.
2212        """
2213        # TODO: When the resource manager can be kept alive by an enclosing or
2214        #  module-level Context, convert to a weakref.
2215        self.__resource_manager = resource_manager
2216        # The unique identifier for the operation node allows the Context implementation
2217        # to manage the state of the handle. Reproducibility of node_uid is TBD, but
2218        # it must be unique in a Context where it references a different operation node.
2219        self.node_uid = None
2220
2221    @property
2222    def output(self) -> _OutputDataProxyType:
2223        # TODO: We can configure `output` as a data descriptor
2224        #  instead of a property so that we can get more information
2225        #  from the class attribute before creating an instance of OperationDetails.OutputDataProxy.
2226        # The C++ equivalence would probably be a templated free function for examining traits.
2227        return self.__resource_manager.data()
2228
2229    def run(self):
2230        """Make a single attempt to resolve data flow conditions.
2231
2232        This is a public method, but should not need to be called by users. Instead,
2233        just use the `output` data proxy for result handles, or force data flow to be
2234        resolved with the `result` methods on the result handles.
2235
2236        `run()` may be useful to try to trigger computation (such as for remotely
2237        dispatched work) without retrieving results locally right away.
2238
2239        `run()` is also useful internally as a facade to the Context implementation details
2240        that allow `result()` calls to ask for their data dependencies to be resolved.
2241        Typically, `run()` will cause results to be published to subscribing operations as
2242        they are calculated, so the `run()` hook allows execution dependency to be slightly
2243        decoupled from data dependency, as well as to allow some optimizations or to allow
2244        data flow to be resolved opportunistically. `result()` should not call `run()`
2245        directly, but should cause the resource manager / Context implementation to process
2246        the data flow graph.
2247
2248        In one conception, `run()` can have a return value that supports control flow
2249        by itself being either runnable or not. The idea would be to support
2250        fault tolerance, implementations that require multiple iterations / triggers
2251        to complete, or looping operations.
2252        """
2253        # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose
2254        #  to generically describe the request to bring a node up-to-date: i.e. the
2255        #  non-returning callable on the object produced by a director.
2256        self.__resource_manager.update_output()
2257
2258
2259class OperationPlaceholder(AbstractOperation):
2260    """Placeholder for Operation handle during subgraph definition."""
2261
2262    def __init__(self, subgraph_resource_manager):
2263        ...
2264
2265    def run(self):
2266        raise exceptions.UsageError('This placeholder operation handle is not in an executable context.')
2267
2268    @property
2269    def output(self):
2270        """Allow subgraph components to be connected without instantiating actual operations."""
2271        if not isinstance(current_context(), SubgraphContext):
2272            raise exceptions.UsageError('Invalid access to subgraph internals.')
2273
2274
2275_HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference)
2276
2277
2278class NodeBuilder(gmx.abc.NodeBuilder):
2279    """Add an operation node to be managed by a Context.
2280
2281    The NodeBuilder interface implies minimal internal logic, and instead
2282    specifies the set of information that must or may be provided to construct
2283    a node.
2284    """
2285
2286    def __init__(self,
2287                 context: 'Context',
2288                 operation,
2289                 label: typing.Optional[str] = None):
2290        """Initialize the base NodeBuilder for gmxapi.operation module Nodes.
2291
2292        TODO:
2293            Convert the *operation* argument to be the registrant in the Operation registry.
2294            Requires confirmation of conforming behavior for dynamically defined operations.
2295
2296        """
2297        self.context = context
2298        self.label = label
2299        try:
2300            key = _make_registry_key(operation)
2301        except Exception as e:
2302            error = 'Could not create an operation registry key from {}'.format(operation)
2303            raise exceptions.ValueError(error) from e
2304        else:
2305            # TODO: sensibly handle dynamically defined operations.
2306            if key not in _operation_registry and not issubclass(operation, OperationDetailsBase):
2307                error = '{} must be initialized with a registered operation. Got {}.'
2308                raise exceptions.ValueError(error.format(__class__.__qualname__, operation))
2309        self.sources = DataSourceCollection()
2310        self._input_description = None
2311        self._resource_factory = None
2312        self._runner_director = None
2313        self._handle = None
2314        self._output_factory = None
2315
2316        self._resource_manager = ResourceManager
2317
2318    def set_input_description(self, input_description: InputDescription):
2319        self._input_description = input_description
2320
2321    def set_resource_factory(self, factory):
2322        self._resource_factory = factory
2323
2324    def set_runner_director(self, factory):
2325        self._runner_director = factory
2326
2327    def set_handle(self, factory):
2328        self._handle = factory
2329
2330    def set_output_factory(self, factory: 'OutputFactory'):
2331        self._output_factory = factory
2332
2333    def set_resource_manager(self, implementation: typing.Type[ResourceManager]):
2334        """Allow overriding the default ResourceManager implementation.
2335
2336        This is a workaround until we figure out what parts of the ResourceManager
2337        could be composed, registered separately with the Context, or be customized
2338        through other dispatching. One likely part of the solution is for clients
2339        of the NodeBuilder to assert requirements of the Context.
2340        """
2341        assert issubclass(implementation, ResourceManager)
2342        self._resource_manager = implementation
2343
2344    # TODO: Let the Context use the handle factory to produce a dynamically typed handle,
2345    #  and figure out the right way to annotate the return value.
2346    def build(self):
2347        """Create node and return a handle of the appropriate type."""
2348
2349        # Check for the ability to instantiate operations.
2350        missing_details = list()
2351        for builder_resource in ['input_description',
2352                                 'resource_factory',
2353                                 'runner_director',
2354                                 'handle',
2355                                 'output_factory']:
2356            detail = '_' + builder_resource
2357            if getattr(self, detail, None) is None:
2358                missing_details.append(builder_resource)
2359        if len(missing_details) > 0:
2360            raise exceptions.UsageError(
2361                'Missing details needed for operation node: {}'.format(
2362                    ', '.join(missing_details)
2363                ))
2364
2365        assert hasattr(self._input_description, 'signature')
2366        input_sink = SinkTerminal(self._input_description.signature())
2367        input_sink.update(self.sources)
2368        logger.debug('SinkTerminal configured: {}'.format(SinkTerminal))
2369        edge = DataEdge(self.sources, input_sink)
2370        logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal))
2371        # TODO: Fingerprinting: Each operation instance has unique output based on the unique input.
2372        #            input_data_fingerprint = edge.fingerprint()
2373
2374        # Set up output proxy.
2375        assert hasattr(self._input_description, 'make_uid')
2376        uid = self._input_description.make_uid(edge)
2377        # TODO: ResourceManager should fetch the relevant factories from the Context
2378        #  instead of getting an OperationDetails instance.
2379        output_data_proxy = self._output_factory.output_proxy()
2380        output_description = self._output_factory.output_description()
2381        publishing_data_proxy = self._output_factory.publishing_data_proxy()
2382        manager = self._resource_manager(output_context=self.context,
2383                                         source=edge,
2384                                         operation_id=uid,
2385                                         output_data_proxy=output_data_proxy,
2386                                         output_description=output_description,
2387                                         publishing_data_proxy=publishing_data_proxy,
2388                                         resource_factory=self._resource_factory,
2389                                         runner_director=self._runner_director)
2390        self.context.work_graph[uid] = manager
2391        # TODO: Replace with a call to Node.handle()
2392        handle = self._handle(self.context.work_graph[uid])
2393        handle.node_uid = uid
2394        return handle
2395
2396    def add_input(self, name, source):
2397        # TODO: We can move some input checking here as the data model matures.
2398        self.sources[name] = source
2399
2400
2401class InputPack(object):
2402    """Input container for data sources provided to resource factories.
2403
2404    When gmxapi.operation Contexts provide run time inputs to operations,
2405    instances of this class are provided to the operation's registered
2406    Resource factory.
2407
2408    Attributes:
2409        kwargs (dict): collection of named data sources.
2410    """
2411
2412    def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]):
2413        self.kwargs = kwargs
2414
2415
2416class Context(gmx.abc.Context):
2417    """API Context.
2418
2419    All gmxapi data and operations are owned by a Context instance. The Context
2420    manages the details of how work is run and how data is managed.
2421
2422    Context implementations are not required to inherit from gmxapi.context.Context,
2423    but this class definition serves to specify the current Context API.
2424
2425    If subclassing is used to implement new Contexts, be sure to initialize the
2426    base class when providing a new __init__
2427    """
2428
2429    def node(self, node_id) -> Node:
2430        if node_id in self.labels:
2431            return self.labels[node_id]
2432        elif node_id in self.work_graph:
2433            return self.work_graph[node_id]
2434        else:
2435            raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id))
2436
2437    def __init__(self):
2438        self.operations = dict()
2439        self.labels = dict()
2440        self.work_graph = collections.OrderedDict()
2441
2442    @abc.abstractmethod
2443    def node_builder(self, *, operation,
2444                     label: typing.Optional[str] = None) -> NodeBuilder:
2445        """Get a builder for a new work graph node.
2446
2447        Nodes are elements of computational work, with resources and execution
2448        managed by the Context. The Context handles parallelism resources, data
2449        placement, work scheduling, and data flow / execution dependencies.
2450
2451        This method is used by Operation director code and helper functions to
2452        add work to the graph.
2453
2454        Arguments:
2455            operation: a registered gmxapi operation
2456            label: optional user-provided identifier to provide human-readable node locators.
2457
2458        """
2459        ...
2460    # TODO: *node()* accessor.
2461    # @abc.abstractmethod
2462    # def node(self, node_identifier) -> AbstractOperation:
2463    #     ...
2464
2465
2466class ModuleNodeBuilder(NodeBuilder):
2467    """Builder for work nodes in gmxapi.operation.ModuleContext."""
2468
2469
2470class ModuleContext(Context):
2471    """Context implementation for the gmxapi.operation module.
2472
2473    """
2474    __version__ = 0
2475
2476    def node_builder(self, operation, label=None) -> NodeBuilder:
2477        """Get a builder for a new work node to add an operation in this context."""
2478        if label is not None:
2479            if label in self.labels:
2480                raise exceptions.ValueError('Label {} is already in use.'.format(label))
2481            else:
2482                # The builder should update the labeled node when it is done.
2483                self.labels[label] = None
2484
2485        return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
2486
2487
2488# Context stack.
2489__current_context = [ModuleContext()]
2490
2491
2492def current_context() -> Context:
2493    """Get a reference to the currently active Context.
2494
2495    The imported gmxapi.context module maintains some state for the convenience
2496    of the scripting environment. Internally, all gmxapi activity occurs under
2497    the management of an API Context, explicitly or implicitly. Some actions or
2498    code constructs will generate separate contexts or sub-contexts. This utility
2499    command retrieves a reference to the currently active Context.
2500    """
2501    return __current_context[-1]
2502
2503
2504def push_context(context) -> Context:
2505    """Enter a sub-context by pushing a context to the global context stack.
2506    """
2507    __current_context.append(context)
2508    return current_context()
2509
2510
2511def pop_context() -> Context:
2512    """Exit the current Context by popping it from the stack."""
2513    return __current_context.pop()
2514
2515
2516class OutputFactory(object):
2517    """Encapsulate the details of Operation output implementation in the gmxapi.operation Context.
2518
2519    Currently, OutputFactory objects are containers that compose functionality
2520    with which to implement the required internal interface.
2521    """
2522
2523    def __init__(self, *,
2524                 output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType],
2525                 output_description: OutputCollectionDescription,
2526                 publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]):
2527        """Package the output details for an operation.
2528
2529        Arguments:
2530            output_proxy: factory to produce the *output* facet of an operation instance (node)
2531            output_description: fully formed output description
2532            publishing_data_proxy: factory to produce the run time output publishing resources
2533
2534        """
2535        if not callable(output_proxy):
2536            raise exceptions.ValueError('output_proxy argument must be a callable.')
2537        if not callable(publishing_data_proxy):
2538            raise exceptions.ValueError('publishing_data_proxy argument must be a callable.')
2539        if not isinstance(output_description, OutputCollectionDescription):
2540            raise exceptions.ValueError('output_description must be an instance of '
2541                                        'gmxapi.operation.OutputCollectionDescription')
2542        self._output_proxy = output_proxy
2543        self._output_description = output_description
2544        self._publishing_data_proxy = publishing_data_proxy
2545
2546    def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]:
2547        return self._output_proxy
2548
2549    def output_description(self) -> OutputCollectionDescription:
2550        return self._output_description
2551
2552    def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID],
2553                                                       _PublishingDataProxyType]:
2554        return self._publishing_data_proxy
2555
2556
2557# TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context]
2558# Normalizing this OperationDirector may require other updates to the function_wrapper facilities.
2559class OperationDirector(object):
2560    """Direct the construction of an operation node in the gmxapi.operation module Context.
2561
2562    Collaboration: used by OperationDetails.operation_director, which
2563    will likely dispatch to different implementations depending on
2564    requirements of work or context.
2565    """
2566
2567    def __init__(self,
2568                 *args,
2569                 operation_details: typing.Type[OperationDetailsBase],
2570                 context: Context,
2571                 label=None,
2572                 **kwargs):
2573        self.operation_details = operation_details
2574        self.context = weakref.proxy(context)
2575        self.args = args
2576        self.kwargs = kwargs
2577        self.label = label
2578
2579    def __call__(self) -> AbstractOperation:
2580        builder = self.context.node_builder(operation=self.operation_details, label=self.label)
2581
2582        builder.set_resource_factory(self.operation_details.resource_director)
2583        builder.set_input_description(self.operation_details)
2584        builder.set_handle(OperationHandle)
2585
2586        operation_details = self.operation_details()
2587        node_input_factory = operation_details.signature().bind
2588        data_source_collection = node_input_factory(*self.args, **self.kwargs)
2589        for name, source in data_source_collection.items():
2590            builder.add_input(name, source)
2591
2592        def runner_director(resources):
2593            def runner():
2594                operation_details(resources)
2595            return runner
2596
2597        builder.set_runner_director(runner_director)
2598        # Note that the call-backs held by OutputFactory cannot be annotated with
2599        # key word arguments under PEP 484, producing a weak warning in some cases.
2600        # We can consider more in the future how to balance call-back simplicity,
2601        # type annotation, and key word explicitness in helper functions like these.
2602        output_factory = OutputFactory(output_description=operation_details.output_description(),
2603                                       output_proxy=operation_details.output_data_proxy,
2604                                       publishing_data_proxy=operation_details.publishing_data_proxy)
2605        builder.set_output_factory(output_factory)
2606
2607        handle = builder.build()
2608        return handle
2609
2610
2611def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int):
2612    """Create the data store for an operation with the described output.
2613
2614    Create a container to hold the resources for an operation node.
2615    Used internally by the resource manager when setting up the node.
2616    Evolution of the C++ framework for creating the Operation SessionResources
2617    object will inform the future of this and the resource_director method, but
2618    this data store is how the Context manages output data sources for resources
2619    that it manages.
2620    """
2621
2622    datastore = collections.OrderedDict()
2623    for name, dtype in output_description.items():
2624        assert isinstance(dtype, type)
2625        result_description = ResultDescription(dtype=dtype, width=ensemble_width)
2626        datastore[name] = OutputData(name=name, description=result_description)
2627    return datastore
2628
2629
2630# TODO: For outputs, distinguish between "results" and "events".
2631#  Both are published to the resource manager in the same way, but the relationship
2632#  with subscribers is potentially different.
2633def function_wrapper(output: dict = None):
2634    # Suppress warnings in the example code.
2635    # noinspection PyUnresolvedReferences
2636    """Generate a decorator for wrapped functions with signature manipulation.
2637
2638    New function accepts the same arguments, with additional arguments required by
2639    the API.
2640
2641    The new function returns an object with an ``output`` attribute containing the named outputs.
2642
2643    Example:
2644
2645        >>> @function_wrapper(output={'spam': str, 'foo': str})
2646        ... def myfunc(parameter: str = None, output=None):
2647        ...    output.spam = parameter
2648        ...    output.foo = parameter + ' ' + parameter
2649        ...
2650        >>> operation1 = myfunc(parameter='spam spam')
2651        >>> assert operation1.output.spam.result() == 'spam spam'
2652        >>> assert operation1.output.foo.result() == 'spam spam spam spam'
2653
2654    Arguments:
2655        output (dict): output names and types
2656
2657    If ``output`` is provided to the wrapper, a data structure will be passed to
2658    the wrapped functions with the named attributes so that the function can easily
2659    publish multiple named results. Otherwise, the ``output`` of the generated operation
2660    will just capture the return value of the wrapped function.
2661
2662    .. todo:: gmxapi typing stub file(s).
2663              The way this wrapper uses parameter annotations is not completely
2664              compatible with static type checking (PEP 484). If we decide to
2665              keep the convenience functionality by which operation details are
2666              inferred from parameter annotations, we should provide a separate
2667              stub file (.pyi) to support static type checking of the API.
2668    """
2669
2670    if output is not None and not isinstance(output, collections.abc.Mapping):
2671        raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.')
2672
2673    # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input.
2674    # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a
2675    # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow
2676    # a default implementation and registration of alternate implementations. We don't have to do that
2677    # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper
2678    # that takes the context as the first argument. (`singledispatch` inspects the first argument rather
2679    # that a named argument)
2680
2681    # Implementation note: The closure of the current function is used to
2682    # dynamically define several classes that support the operation to be
2683    # created by the returned decorator.
2684
2685    def decorator(function) -> typing.Callable:
2686        # Explicitly capture `function` and `output` references.
2687        provided_output_map = output
2688
2689        # Note: Allow operations to be defined entirely in template headers to facilitate
2690        # compile-time optimization of fused operations. Consider what distinction, if any,
2691        # exists between a fused operation and a more basic operation. Probably it amounts
2692        # to aspects related to interaction with the Context that get combined in a fused
2693        # operation, such as the resource director, builder, etc.
2694        class OperationDetails(OperationDetailsBase):
2695            # Warning: function.__qualname__ is not rigorous since function may be in a local scope.
2696            # TODO: Improve base identifier.
2697            # Suggest registering directly in the Context instead of in this local class definition.
2698            __basename = '.'.join((str(function.__module__), function.__qualname__))
2699            __last_uid = 0
2700            _input_signature_description = InputCollectionDescription.from_function(function)
2701            # TODO: Separate the class and instance logic for the runner.
2702            # Logically, the runner is a detail of a context-specific implementation class,
2703            # though the output is not generally fully knowable until an instance is initialized
2704            # for a certain input fingerprint.
2705            # Note: We are almost at a point where this class can be subsumed into two
2706            # possible return types for wrapped_function_runner, acting as an operation helper.
2707            _runner = wrapped_function_runner(function, provided_output_map)
2708            _output_description = _runner.output_description
2709            _output_data_proxy_type = define_output_data_proxy(_output_description)
2710            _publishing_data_proxy_type = define_publishing_data_proxy(_output_description)
2711            _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type]
2712
2713            @classmethod
2714            def name(cls) -> str:
2715                return cls.__basename.split('.')[-1]
2716
2717            @classmethod
2718            def namespace(cls) -> str:
2719                return cls.__basename.rstrip('.' + cls.name())
2720
2721            @classmethod
2722            def director(cls, context: _Context):
2723                return cls.operation_director
2724
2725            @classmethod
2726            def signature(cls) -> InputCollectionDescription:
2727                """Mapping of named inputs and input type.
2728
2729                Used to determine valid inputs before an Operation node is created.
2730
2731                Overrides OperationDetailsBase.signature() to provide an
2732                implementation for the bound operation.
2733                """
2734                return cls._input_signature_description
2735
2736            def output_description(self) -> OutputCollectionDescription:
2737                """Mapping of available outputs and types for an existing Operation node.
2738
2739                Overrides OperationDetailsBase.output_description() to provide an
2740                implementation for the bound operation.
2741                """
2742                return self._output_description
2743
2744            def publishing_data_proxy(self, *,
2745                                      instance: _SourceResource,
2746                                      client_id: int
2747                                      ) -> _publishing_data_proxy_type:
2748                """Factory for Operation output publishing resources.
2749
2750                Used internally when the operation is run with resources provided by instance.
2751
2752                Overrides OperationDetailsBase.publishing_data_proxy() to provide an
2753                implementation for the bound operation.
2754                """
2755                assert isinstance(instance, ResourceManager)
2756                return self._publishing_data_proxy_type(instance=instance, client_id=client_id)
2757
2758            def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type:
2759                assert isinstance(instance, ResourceManager)
2760                return self._output_data_proxy_type(instance=instance)
2761
2762            def __call__(self, resources: PyFunctionRunnerResources):
2763                """Execute the operation with provided resources.
2764
2765                Resources are prepared in an execution context with aid of resource_director()
2766
2767                After the first call, output data has been published and is trivially
2768                available through the output_data_proxy()
2769
2770                Overrides OperationDetailsBase.__call__().
2771                """
2772                self._runner(resources)
2773
2774            @classmethod
2775            def make_uid(cls, input):
2776                """The unique identity of an operation node tags the output with respect to the input.
2777
2778                Combines information on the Operation details and the input to uniquely
2779                identify the Operation node.
2780
2781                Arguments:
2782                    input : A (collection of) data source(s) that can provide Fingerprints.
2783
2784                Used internally by the Context to manage ownership of data sources, to
2785                locate resources for nodes in work graphs, and to manage serialization,
2786                deserialization, and checkpointing of the work graph.
2787
2788                The UID is a detail of the generic Operation that _should_ be independent
2789                of the Context details to allow the framework to manage when and where
2790                an operation is executed.
2791
2792                Design notes on further refinement:
2793                    TODO: Operations should not single-handedly determine their own uniqueness
2794                    (but they should participate in the determination with the Context).
2795
2796                    Context implementations should be allowed to optimize handling of
2797                    equivalent operations in different sessions or work graphs, but we do not
2798                    yet TODO: guarantee that UIDs are globally unique!
2799
2800                    The UID should uniquely indicate an operation node based on that node's input.
2801                    We need input fingerprinting to identify equivalent nodes in a work graph
2802                    or distinguish nodes across work graphs.
2803
2804                """
2805                uid = str(cls.__basename) + str(cls.__last_uid)
2806                cls.__last_uid += 1
2807                return uid
2808
2809            @classmethod
2810            def resource_director(cls, *, input=None,
2811                                  output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources:
2812                """a Director factory that helps build the Session Resources for the function.
2813
2814                The Session launcher provides the director with all of the resources previously
2815                requested/negotiated/registered by the Operation. The director uses details of
2816                the operation to build the resources object required by the operation runner.
2817
2818                For the Python Context, the protocol is for the Context to call the
2819                resource_director instance method, passing input and output containers.
2820
2821                Raises:
2822                    exceptions.TypeError if provided resource type does not match input signature.
2823                """
2824                resources = PyFunctionRunnerResources()
2825                resources.update(input.kwargs)
2826                resources.update({'output': output})
2827
2828                # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing.
2829                for name in resources:
2830                    if isinstance(resources[name], (list, tuple)):
2831                        resources[name] = datamodel.ndarray(resources[name])
2832
2833                # Check data compatibility
2834                for name, value in resources.items():
2835                    if name != 'output':
2836                        expected = cls.signature()[name]
2837                        got = type(value)
2838                        if got != expected:
2839                            raise exceptions.TypeError(
2840                                'Expected {} but got {} for {} resource {}.'.format(expected,
2841                                                                                    got,
2842                                                                                    cls.__basename,
2843                                                                                    name))
2844                return resources
2845
2846        # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future.
2847        @functools.wraps(function)
2848        def helper(*args, context=None, **kwargs):
2849            # Description of the Operation input (and output) occurs in the
2850            # decorator closure. By the time this factory is (dynamically) defined,
2851            # the OperationDetails and ResourceManager are well defined, but not
2852            # yet instantiated.
2853            # Inspection of the offered input occurs when this factory is called,
2854            # and OperationDetails, ResourceManager, and Operation are instantiated.
2855
2856            # This operation factory is specialized for the default package Context.
2857            if context is None:
2858                context = current_context()
2859            else:
2860                raise exceptions.ApiError('Non-default context handling not implemented.')
2861
2862            # This calls a dispatching function that may not be able to reconcile the input
2863            # and Context capabilities. This is the place to handle various exceptions for
2864            # whatever reasons this reconciliation cannot occur.
2865            handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs)
2866
2867            # TODO: NOW: The input fingerprint describes the provided input
2868            # as (a) ensemble input, (b) static, (c) future. By the time the
2869            # operation is instantiated, the topology of the node is known.
2870            # When compared to the InputCollectionDescription, the data compatibility
2871            # can be determined.
2872
2873            return handle
2874
2875        # to do: The factory itself needs to be able to register a factory with
2876        # the context that will be responsible for the Operation handle.
2877        # The factories need to be able to serve as dispatchers for themselves,
2878        # since an operation in one context may need to be reconstituted in a
2879        # different context.
2880        # The dispatching factory produces a director for a Context,
2881        # which will register a factory with the operation in that context.
2882
2883        # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to
2884        # get handles to new operation nodes managed by the context. Part of that process includes registering
2885        # a DirectorFactory with the Context.
2886        return helper
2887
2888    return decorator
2889
2890
2891class GraphVariableDescriptor(object):
2892    def __init__(self, name: str = None, dtype=None, default=None):
2893        self.name = name
2894        self.dtype = dtype
2895        self.default = default
2896        self.state = None
2897
2898    @property
2899    def internal_name(self):
2900        try:
2901            return '_' + self.name
2902        except TypeError:
2903            return None
2904
2905    def __get__(self, instance, owner):
2906        if instance is None:
2907            # Access is through the class attribute of the owning class.
2908            # Allows the descriptor itself to be inspected or reconfigured after
2909            # class definition.
2910            # TODO: There is probably some usage checking that can be performed here.
2911            return self
2912        try:
2913            value = getattr(instance, self.internal_name)
2914        except AttributeError:
2915            value = self.default
2916            # Lazily initialize the instance value from the class default.
2917            if value is not None:
2918                try:
2919                    setattr(instance, self.internal_name, value)
2920                except Exception as e:
2921                    message = 'Could not assign default value to {} attribute of {}'.format(
2922                        self.internal_name,
2923                        instance)
2924                    raise exceptions.ApiError(message) from e
2925        return value
2926
2927    # Implementation note: If we have a version of the descriptor class with no `__set__` method,
2928    # it is a non-data descriptor that can be overridden by a data descriptor on an instance.
2929    # This could be one way to handle the polymorphism associated with state changes.
2930    def __set__(self, instance, value):
2931        if instance._editing:
2932            # Update the internal connections defining the subgraph.
2933            setattr(instance, self.internal_name, value)
2934        else:
2935            raise AttributeError('{} not assignable on {}'.format(self.name, instance))
2936
2937
2938class GraphMeta(type):
2939    """Meta-class for gmxapi data flow graphs and subgraphs.
2940
2941    Used to implement ``subgraph`` as GraphMeta.__new__(...).
2942    Also allows subgraphs to be defined as Python class definitions by inheriting
2943    from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class
2944    statement arguments.
2945
2946    The Python class creation protocol allows Subgraphs to be defined in as follows.
2947
2948    See the Subgraph class documentation for customization of instances through
2949    the Python context manager protocol.
2950    """
2951    _prepare_keywords = ('variables',)
2952
2953    # TODO: Python 3.7.2 introduces typing.OrderedDict
2954    # In practice, we are using collections.OrderedDict, but we should use the generic
2955    # ABC from the typing module to avoid being overly restrictive with type hints.
2956    try:
2957        from typing import OrderedDict
2958    except ImportError:
2959        from collections import OrderedDict
2960
2961    @classmethod
2962    def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs):
2963        """Prepare the class namespace.
2964
2965        Keyword Args:
2966              variables: mapping of persistent graph variables to type / default value (optional)
2967        """
2968        # Python runs this before executing the class body of Subgraph or its
2969        # subclasses. This is our chance to handle key word arguments given in the
2970        # class declaration.
2971
2972        if kwargs is not None:
2973            for keyword in kwargs:
2974                raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword))
2975
2976        namespace = collections.OrderedDict()
2977
2978        if variables is not None:
2979            if isinstance(variables, collections.abc.Mapping):
2980                for name, value in variables.items():
2981                    if isinstance(value, type):
2982                        dtype = value
2983                        if hasattr(value, 'default'):
2984                            default = value.default
2985                        else:
2986                            default = None
2987                    else:
2988                        default = value
2989                        if hasattr(default, 'dtype'):
2990                            dtype = default.dtype
2991                        else:
2992                            dtype = type(default)
2993                    namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype)
2994                    # Note: we are not currently using the hook used by `inspect`
2995                    # to annotate with the class that defined the attribute.
2996                    # namespace[name].__objclass__ = mcs
2997                    assert not hasattr(namespace[name], '__objclass__')
2998            else:
2999                raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.')
3000
3001        return namespace
3002
3003    def __new__(cls, name, bases, namespace, **kwargs):
3004        for key in kwargs:
3005            if key not in GraphMeta._prepare_keywords:
3006                raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key))
3007        return type.__new__(cls, name, bases, namespace)
3008
3009    # TODO: This is keyword argument stripping is not necessary in more recent Python versions.
3010    # When Python minimum required version is increased, check if we can remove this.
3011    def __init__(cls, name, bases, namespace, **kwargs):
3012        for key in kwargs:
3013            if key not in GraphMeta._prepare_keywords:
3014                raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key))
3015        super().__init__(name, bases, namespace)
3016
3017
3018class SubgraphNodeBuilder(NodeBuilder):
3019
3020    def __init__(self,
3021                 context: 'SubgraphContext',
3022                 operation,
3023                 label: typing.Optional[str] = None):
3024        super().__init__(context, operation, label)
3025
3026    def add_input(self, name: str, source):
3027        """Add an input resource for the Node under construction.
3028
3029        Extends NodeBuilder.add_input()
3030        """
3031        # Inspect inputs.
3032        #  * Are they from outside the subgraph?
3033        #  * Subgraph variables?
3034        #  * Subgraph internal nodes?
3035        # Inputs from outside the subgraph are (provisionally) subgraph inputs.
3036        # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run.
3037        # For first implementation, let all operations be recreated, but we need to
3038        # manage the right input proxies.
3039        # For zeroeth implementation, try just tracking the entities that need a reset() method called.
3040        assert isinstance(self.context, SubgraphContext)
3041        if hasattr(source, 'reset'):
3042            self.context.add_resetter(source.reset)
3043        elif hasattr(source, '_reset'):
3044            self.context.add_resetter(source._reset)
3045        super().add_input(name, source)
3046
3047    def build(self) -> OperationPlaceholder:
3048        """Get a reference to the internal node in the subgraph definition.
3049
3050        In the SubgraphContext, these handles cannot represent uniquely identifiable
3051        results. They are placeholders for relative positions in graphs that are
3052        not defined until the subgraph is being executed.
3053
3054        Such references should be tracked and invalidated when exiting the
3055        subgraph context. Within the subgraph context, they are used to establish
3056        the recipe for updating the subgraph's outputs or persistent data during
3057        execution.
3058        """
3059        # Placeholder handles in the subgraph definition don't have real resource managers.
3060        # Check for the ability to instantiate operations.
3061        handle = super().build()
3062        # handle = OperationPlaceholder()
3063        return typing.cast(OperationPlaceholder, handle)
3064
3065
3066class SubgraphContext(Context):
3067    """Provide a Python context manager in which to set up a graph of operations.
3068
3069    Allows operations to be configured without adding nodes to the global execution
3070    context.
3071    """
3072
3073    def __init__(self):
3074        super().__init__()
3075        self.resetters = set()
3076
3077    def node_builder(self, operation, label=None) -> NodeBuilder:
3078        if label is not None:
3079            if label in self.labels:
3080                raise exceptions.ValueError('Label {} is already in use.'.format(label))
3081            else:
3082                # The builder should update the labeled node when it is done.
3083                self.labels[label] = None
3084
3085        return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label)
3086
3087    def add_resetter(self, function):
3088        assert callable(function)
3089        self.resetters.add(function)
3090
3091
3092class Subgraph(object, metaclass=GraphMeta):
3093    """
3094
3095    When subclassing from Subgraph, aspects of the subgraph's data ports can be
3096    specified with keyword arguments in the class statement. Example::
3097
3098        >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass
3099        ...
3100
3101    The key word *variables* is used in the class declaration to map the types
3102    of subgraph Variables with (optional) default values.
3103
3104    Execution model:
3105        Subgraph execution must follow a well-defined protocol in order to sensibly
3106        resolve data Futures at predictable points. Note that subgraphs act as operations
3107        that can be automatically redefined at run time (in limited cases), such as
3108        to automatically form iteration chains to implement "while" loops. We refer
3109        to one copy / iteration / generation of a subgraph as an "element" below.
3110
3111        When a subgraph element begins execution, each of its variables with an
3112        "updated" state from the previous iteration has the "updated" state moved
3113        to the new element's "initial" state, and the "updated" state is voided.
3114
3115        Subgraph.next() appends an element of the subgraph to the chain. Subsequent
3116        calls to Subgraph.run() bring the new outputs up to date (and then call next()).
3117        Thus, for a subgraph with an output called ``is_converged``, calling
3118        ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect,
3119        but is likely suboptimal for execution. Instead use the gmxapi while_loop.
3120
3121        If the subgraph is not currently executing, it can be in one of two states:
3122        "editing" or "ready". In the "ready" state, class and instance Variables
3123        cannot be assigned, but can be read through the data descriptors. In this
3124        state, the descriptors only have a single state.
3125
3126        If "editing," variables on the class object can be assigned to update the
3127        data flow defined for the subgraph. While "editing", reading or writing
3128        instance Variables is undefined behavior.
3129
3130        When "editing" begins, each variable is readable as a proxy to the "initial"
3131        state in an element. Assignments while "editing" put variables in temporary
3132        states accessible only during editing.
3133        When "editing" finishes, the "updated" output data source of each
3134        variable for the element is set, if appropriate.
3135
3136        # TODO: Use a get_context to allow operation factories or accessors to mark
3137        #  references for update/annotation when exiting the 'with' block.
3138    """
3139
3140
3141class SubgraphBuilder(object):
3142    """Helper for defining new Subgraphs.
3143
3144    Manages a Python context in which to define a new Subgraph type. Can be used
3145    in a Python ``with`` construct exactly once to provide the Subgraph class body.
3146    When the ``with`` block is exited (or ``build()`` is called explicitly), a
3147    new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...)
3148    are dispatched to the Subgraph constructor.
3149
3150    Outside of the ``with`` block, read access to data members is proxied to
3151    descriptors on the built Subgraph.
3152
3153    Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function.
3154    """
3155
3156    def __init__(self, variables):
3157        self.__dict__.update({'variables': variables,
3158                              '_staging': collections.OrderedDict(),
3159                              '_editing': False,
3160                              '_subgraph_context': None,
3161                              '_subgraph_instance': None,
3162                              '_fused_operation': None,
3163                              '_factory': None})
3164        # Return a placeholder that we can update during iteration.
3165        # Long term, this is probably implemented with data descriptors
3166        # that will be moved to a new Subgraph type object.
3167        for name in self.variables:
3168            if not isinstance(self.variables[name], Future):
3169                self.variables[name] = gmx.make_constant(self.variables[name])
3170
3171        # class MySubgraph(Subgraph, variables=variables):
3172        #     pass
3173        #
3174        # self._subgraph_instance = MySubgraph()
3175
3176    def __getattr__(self, item):
3177        if self._editing:
3178            if item in self.variables:
3179                if item in self._staging:
3180                    logger.debug('Read access to intermediate value of subgraph variable {}'.format(item))
3181                    return self._staging[item]
3182                else:
3183                    logger.debug('Read access to subgraph variable {}'.format(item))
3184                    return self.variables[item]
3185            else:
3186                raise AttributeError('Invalid attribute: {}'.format(item))
3187        else:
3188            # TODO: this is not quite the described interface...
3189            return lambda obj: obj.values[item]
3190
3191    def __setattr__(self, key, value):
3192        """Part of the builder interface."""
3193        if key in self.__dict__:
3194            self.__dict__[key] = value
3195        else:
3196            if self._editing:
3197                self.add_update(key, value)
3198            else:
3199                raise exceptions.UsageError('Subgraph is not in an editable state.')
3200
3201    def add_update(self, key, value):
3202        """Add a variable update to the internal subgraph."""
3203        if key not in self.variables:
3204            raise AttributeError('No such attribute: {}'.format(key))
3205        if not self._editing:
3206            raise exceptions.UsageError('Subgraph is not in an editable state.')
3207        # Else, stage the potential final value for the iteration.
3208        logger.debug('Staging subgraph update {} = {}'.format(key, value))
3209        # Return a placeholder that we can update during iteration.
3210        # Long term, this is probably implemented with data descriptors
3211        # that will be moved to a new Subgraph type object.
3212        if not isinstance(value, Future):
3213            value = gmx.make_constant(value)
3214        self._staging[key] = value
3215        self._staging.move_to_end(key)
3216
3217    def __enter__(self):
3218        """Enter a Context managed by the subgraph to capture operation additions.
3219
3220        Allows the internal data flow of the subgraph to be defined in the same
3221        manner as the default work graph while the Python context manager is active.
3222
3223        The subgraph Context is activated when entering a ``with`` block and
3224        finalized at the end of the block.
3225        """
3226        # While editing the subgraph in the SubgraphContext, we need __get__ and __set__
3227        # data descriptor access on the Subgraph subclass type, but outside of the
3228        # context manager, the descriptor should be non-writable and more opaque,
3229        # while the instance should be readable, but not writable.
3230        self.__dict__['_editing'] = True
3231        # TODO: this probably needs to be configured with variables...
3232        self.__dict__['_subgraph_context'] = SubgraphContext()
3233        push_context(self._subgraph_context)
3234        return self
3235
3236    def build(self):
3237        """Build the subgraph by defining some new operations.
3238
3239        Examine the subgraph variables. Variables with handles to data sources
3240        in the SubgraphContext represent work that needs to be executed on
3241        a subgraph execution or iteration. Variables with handles to data sources
3242        outside of the subgraph represent work that needs to be executed once
3243        on only the first iteration to initialize the subgraph.
3244
3245        Construct a factory for the fused operation that performs the work to
3246        update the variables on a single iteration.
3247
3248        Construct a factory for the fused operation that performs the work to
3249        update the variables on subsequent iterations and which will be fed
3250        the outputs of a previous iteration. Both of the generated operations
3251        have the same output signature.
3252
3253        Construct and wrap the generator function to recursively append work
3254        to the graph and update until condition is satisfied.
3255
3256        TODO: Explore how to drop work from the graph once there are no more
3257         references to its output, including check-point machinery.
3258        """
3259        logger.debug('Finalizing subgraph definition.')
3260        inputs = collections.OrderedDict()
3261        for key, value in self.variables.items():
3262            # TODO: What if we don't want to provide default values?
3263            inputs[key] = value
3264
3265        updates = self._staging
3266
3267        class Subgraph(object):
3268            def __init__(self, input_futures, update_sources):
3269                self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()])
3270                logger.debug('subgraph initialized with {}'.format(
3271                    ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()])))
3272                self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()])
3273                self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()])
3274                logger.debug('Subgraph updates staged:')
3275                for update, source in self.update_sources.items():
3276                    logger.debug('    {} = {}'.format(update, source))
3277
3278            def run(self):
3279                for name in self.update_sources:
3280                    result = self.update_sources[name].result()
3281                    logger.debug('Update: {} = {}'.format(name, result))
3282                    self.values[name] = result
3283                # Replace the data sources in the futures.
3284                for name in self.update_sources:
3285                    self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager
3286                for name in self.update_sources:
3287                    self.update_sources[name]._reset()
3288
3289        subgraph = Subgraph(inputs, updates)
3290
3291        return lambda subgraph=subgraph: subgraph
3292
3293    def __exit__(self, exc_type, exc_val, exc_tb):
3294        """End the Subgraph editing session and finalize the Subgraph build.
3295
3296        After exiting, this instance forwards __call__() to a factory for an
3297        operation that carries out the work in the subgraph with inputs bound
3298        in the current context as defined by ``variables``.
3299        """
3300        self._factory = self.build()
3301
3302        context = pop_context()
3303        assert context is self._subgraph_context
3304        self.__dict__['_editing'] = False
3305        # Returning False causes exceptions in the `with` block to be reraised.
3306        # Remember to switch this to return True if we want to transform or suppress
3307        # such an exception (we probably do).
3308        if exc_type is not None:
3309            logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self))
3310            logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb))
3311        return False
3312
3313    def __call__(self):
3314        # TODO: After build() has been called, this should dispatch to a factory
3315        #  that returns an OperationHandle.
3316        return self._factory()
3317
3318
3319def while_loop(*, operation, condition, max_iteration=10):
3320    """Generate and run a chain of operations such that condition evaluates True.
3321
3322    Returns and operation instance that acts like a single node in the current
3323    work graph, but which is a proxy to the operation at the end of a dynamically generated chain
3324    of operations. At run time, condition is evaluated for the last element in
3325    the current chain. If condition evaluates False, the chain is extended and
3326    the next element is executed. When condition evaluates True, the object
3327    returned by ``while_loop`` becomes a proxy for the last element in the chain.
3328
3329    Equivalent to calling operation.while(condition), where available.
3330
3331    Arguments:
3332        operation: a callable that produces an instance of an operation when called with no arguments.
3333        condition: a callable that accepts an object (returned by ``operation``) that returns a boolean.
3334        max_iteration: execute the loop no more than this many times (default 10)
3335
3336    Warning:
3337        *max_iteration* is provided in part to minimize the cost of bugs in early
3338        versions of this software. The default value may be changed or
3339        removed on short notice.
3340
3341    Warning:
3342        The protocol by which ``while_loop`` interacts with ``operation`` and ``condition``
3343        is very unstable right now. Please refer to this documentation when installing new
3344        versions of the package.
3345
3346    Protocol:
3347        Warning:
3348            This protocol will be changed before the 0.1 API is finalized.
3349
3350        When called, ``while_loop`` calls ``operation`` without arguments
3351        and captures the return value captured as ``_operation``.
3352        The object produced by ``operation()`` must have a ``reset``,
3353        a ``run`` method, and an ``output`` attribute.
3354
3355        This is inspected
3356        to determine the output data proxy for the operation produced by the call
3357        to ``while_loop``. When that operation is called, it does the equivalent of
3358
3359            while(condition(self._operation)):
3360                self._operation.reset()
3361                self._operation.run()
3362
3363        Then, the output data proxy of ``self`` is updated with the results from
3364        self._operation.output.
3365
3366    """
3367    # In the first implementation, Subgraph is NOT and OperationHandle.
3368    # if not isinstance(obj, AbstractOperationHandle):
3369    #     raise exceptions.UsageError(
3370    #     '"operation" key word argument must be a callable that produces an Operation handle.')
3371    # outputs = {}
3372    # for name, descriptor in obj.output.items():
3373    #     outputs[name] = descriptor._dtype
3374
3375    # 1. Get the initial inputs.
3376    # 2. Initialize the subgraph with the initial inputs.
3377    # 3. Run the subgraph.
3378    # 4. Get the outputs.
3379    # 5. Initialize the subgraph with the outputs.
3380    # 6. Go to 3 if condition is not met.
3381
3382    obj = operation()
3383    assert hasattr(obj, 'values')
3384    outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()])
3385
3386    @function_wrapper(output=outputs)
3387    def run_loop(output: OutputCollectionDescription):
3388        iteration = 0
3389        obj = operation()
3390        logger.debug('Created object {}'.format(obj))
3391        logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3392        logger.debug('Condition: {}'.format(condition(obj)))
3393        while (condition(obj)):
3394            logger.debug('Running iteration {}'.format(iteration))
3395            obj.run()
3396            logger.debug(
3397                ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values]))
3398            logger.debug('Condition: {}'.format(condition(obj)))
3399            iteration += 1
3400            if iteration > max_iteration:
3401                break
3402        for name in outputs:
3403            setattr(output, name, obj.values[name])
3404
3405        return obj
3406
3407    return run_loop
3408
3409
3410def subgraph(variables=None):
3411    """Allow operations to be configured in a sub-context.
3412
3413    The object returned functions as a Python context manager. When entering the
3414    context manager (the beginning of the ``with`` block), the object has an
3415    attribute for each of the named ``variables``. Reading from these variables
3416    gets a proxy for the initial value or its update from a previous loop iteration.
3417    At the end of the ``with`` block, any values or data flows assigned to these
3418    attributes become the output for an iteration.
3419
3420    After leaving the ``with`` block, the variables are no longer assignable, but
3421    can be called as bound methods to get the current value of a variable.
3422
3423    When the object is run, operations bound to the variables are ``reset`` and
3424    run to update the variables.
3425    """
3426    # Implementation note:
3427    # A Subgraph (type) has a subgraph context associated with it. The subgraph's
3428    # ability to capture operation additions is implemented in terms of the
3429    # subgraph context.
3430    logger.debug('Declare a new subgraph with variables {}'.format(variables))
3431
3432    return SubgraphBuilder(variables)
3433
3434
3435@computed_result
3436def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray:
3437    """Operation that consumes two sequences and produces a concatenated single sequence.
3438
3439    Note that the exact signature of the operation is not determined until this
3440    helper is called. Helper functions may dispatch to factories for different
3441    operations based on the inputs. In this case, the dtype and shape of the
3442    inputs determines dtype and shape of the output. An operation instance must
3443    have strongly typed output, but the input must be strongly typed on an
3444    object definition so that a Context can make runtime decisions about
3445    dispatching work and data before instantiating.
3446    # TODO: elaborate and clarify.
3447    # TODO: check type and shape.
3448    # TODO: figure out a better annotation.
3449    """
3450    # TODO: (FR4) Returned list should be an NDArray.
3451    if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)):
3452        raise exceptions.ValueError('Input must be a pair of lists.')
3453    assert isinstance(front, datamodel.NDArray)
3454    assert isinstance(back, datamodel.NDArray)
3455    new_list = list(front._values)
3456    new_list.extend(back._values)
3457    return datamodel.NDArray(new_list)
3458
3459
3460# TODO: Constrain
3461Scalar = typing.TypeVar('Scalar')
3462
3463
3464def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]:
3465    """Combine data sources into a single list.
3466
3467    A trivial data flow restructuring operation.
3468    """
3469    if isinstance(sublists, (str, bytes)):
3470        raise exceptions.ValueError('Input must be a list of lists.')
3471    if len(sublists) == 0:
3472        return datamodel.ndarray([])
3473    else:
3474        # TODO: Fix the data model so that this can type-check properly.
3475        return join_arrays(front=sublists[0],
3476                           back=typing.cast(datamodel.NDArray,
3477                                            concatenate_lists(sublists[1:])))
3478
3479
3480def make_constant(value: Scalar) -> _Future:
3481    """Provide a predetermined value at run time.
3482
3483    This is a trivial operation that provides a (typed) value, primarily for
3484    internally use to manage gmxapi data flow.
3485
3486    Accepts a value of any type. The object returned has a definite type and
3487    provides same interface as other gmxapi outputs. Additional constraints or
3488    guarantees on data type may appear in future versions.
3489    """
3490    dtype = type(value)
3491    source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x)
3492    description = ResultDescription(dtype=dtype, width=1)
3493    future = Future(source, 'data', description=description)
3494    return future
3495
3496
3497def logical_not(value: bool) -> _Future:
3498    """Boolean negation.
3499
3500    If the argument is a gmxapi compatible Data or Future object, a new View or
3501    Future is created that proxies the boolean opposite of the input.
3502
3503    If the argument is a callable, logical_not returns a wrapper function that
3504    returns a Future for the logical opposite of the callable's result.
3505    """
3506    # TODO: Small data transformations like this don't need to be formal Operations.
3507    # This could be essentially a data annotation that affects the resolver in a
3508    # DataEdge. As an API detail, coding for different Contexts and optimizations
3509    # within those Context implementations could be simplified.
3510    operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data))
3511    return operation(data=value).output.data
3512