1# 2# This file is part of the GROMACS molecular simulation package. 3# 4# Copyright (c) 2019,2020, by the GROMACS development team, led by 5# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl, 6# and including many others, as listed in the AUTHORS file in the 7# top-level source directory and at http://www.gromacs.org. 8# 9# GROMACS is free software; you can redistribute it and/or 10# modify it under the terms of the GNU Lesser General Public License 11# as published by the Free Software Foundation; either version 2.1 12# of the License, or (at your option) any later version. 13# 14# GROMACS is distributed in the hope that it will be useful, 15# but WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17# Lesser General Public License for more details. 18# 19# You should have received a copy of the GNU Lesser General Public 20# License along with GROMACS; if not, see 21# http://www.gnu.org/licenses, or write to the Free Software Foundation, 22# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 23# 24# If you want to redistribute modifications to GROMACS, please 25# consider that scientific software is very special. Version 26# control is crucial - bugs must be traceable. We will be happy to 27# consider code for inclusion in the official distribution, but 28# derived work must not be called official GROMACS. Details are found 29# in the README & COPYING files - if they are missing, get the 30# official version at http://www.gromacs.org. 31# 32# To help us fund GROMACS development, we humbly ask that you cite 33# the research papers on the package. Check out http://www.gromacs.org. 34 35"""Define gmxapi-compliant Operations 36 37Provide decorators and base classes to generate and validate gmxapi Operations. 38 39Nodes in a work graph are created as instances of Operations. An Operation factory 40accepts well-defined inputs as key word arguments. The object returned by such 41a factory is a handle to the node in the work graph. It's ``output`` attribute 42is a collection of the Operation's results. 43 44function_wrapper(...) produces a wrapper that converts a function to an Operation 45factory. The Operation is defined when the wrapper is called. The Operation is 46instantiated when the factory is called. The function is executed when the Operation 47instance is run. 48 49The framework ensures that an Operation instance is executed no more than once. 50""" 51 52__all__ = ['computed_result', 53 'function_wrapper', 54 ] 55 56import abc 57import collections 58import functools 59import inspect 60import typing 61import weakref 62from contextlib import contextmanager 63 64import gmxapi as gmx 65from gmxapi import datamodel 66from gmxapi import exceptions 67from gmxapi import logger as root_logger 68from gmxapi.abc import OperationImplementation, MutableResource, Node 69from gmxapi.typing import _Context, ResultTypeVar, SourceTypeVar, valid_result_types, valid_source_types 70from gmxapi.typing import Future as _Future 71 72# Initialize module-level logger 73logger = root_logger.getChild('operation') 74logger.info('Importing {}'.format(__name__)) 75 76 77class ResultDescription: 78 """Describe what will be returned when `result()` is called.""" 79 80 def __init__(self, dtype=None, width=1): 81 assert isinstance(dtype, type) 82 assert issubclass(dtype, valid_result_types) 83 assert isinstance(width, int) 84 self._dtype = dtype 85 self._width = width 86 87 @property 88 def dtype(self) -> type: 89 """node output type""" 90 return self._dtype 91 92 @property 93 def width(self) -> int: 94 """ensemble width""" 95 return self._width 96 97 def __repr__(self): 98 return '{}(dtype={}, width={})'.format(self.__class__.__name__, self.dtype, self.width) 99 100 101class OutputData(object): 102 """Encapsulate the description and storage of a data output.""" 103 104 def __init__(self, name: str, description: ResultDescription): 105 assert name != '' 106 self._name = name 107 assert isinstance(description, ResultDescription) 108 self._description = description 109 self._done = [False] * self._description.width 110 self._data = [None] * self._description.width 111 112 @property 113 def name(self): 114 """The name of the published output.""" 115 return self._name 116 117 # TODO: Change to regular member function and add ensemble member arg. 118 @property 119 def done(self): 120 """Ensemble completion status for this output.""" 121 return all(self._done) 122 123 def data(self, member: int = None): 124 """Access the raw data for localized output for the ensemble or the specified member.""" 125 if not self.done: 126 raise exceptions.ApiError('Attempt to read before data has been published.') 127 if self._data is None or None in self._data: 128 raise exceptions.ApiError('Data marked "done" but contains null value.') 129 if member is not None: 130 return self._data[member] 131 else: 132 return self._data 133 134 def set(self, value, member: int): 135 """Set local data and mark as completed. 136 137 Used internally to maintain the data store. 138 """ 139 if self._description.dtype == datamodel.NDArray: 140 self._data[member] = datamodel.ndarray(value) 141 else: 142 self._data[member] = self._description.dtype(value) 143 self._done[member] = True 144 145 def reset(self): 146 """Reinitialize the data store. 147 148 Note: 149 This is a workaround until the execution model is more developed. 150 151 Todo: 152 Remove this method when all operation handles provide factories to 153 reinstantiate on new Contexts and/or with new inputs. 154 155 """ 156 self._done = [False] * self._description.width 157 self._data = [None] * self._description.width 158 159 160class EnsembleDataSource(gmx.abc.EnsembleDataSource): 161 """A single source of data with ensemble data flow annotations. 162 163 Note that data sources may be Futures. 164 """ 165 166 def __init__(self, source=None, width=1, dtype=None): 167 self.source = source 168 self.width = width 169 self.dtype = dtype 170 171 def node(self, member: int): 172 return self.source[member] 173 174 def reset(self): 175 protocols = ('reset', '_reset') 176 for protocol in protocols: 177 if hasattr(self.source, protocol): 178 getattr(self.source, protocol)() 179 180 181class DataSourceCollection(collections.OrderedDict): 182 """Store and describe input data handles for an operation. 183 184 When created from InputCollectionDescription.bind(), the DataSourceCollection 185 has had default values inserted. 186 187 Note: DataSourceCollection is the input resource collection type for NodeBuilder. 188 To use with the NodeBuilder interface, first create the collection, then 189 iteratively add its resources. 190 191 TODO: We should probably normalize the collection aspect of input. Is this a 192 single input? Are all inputs added as collections? Should this be split 193 to a standard iterable? Does a resource factory always produce a mapping? 194 """ 195 196 def __init__(self, **kwargs): 197 """Initialize from key/value pairs of named data sources. 198 199 Data sources may be any of the basic gmxapi data types, gmxapi Futures 200 of those types, or gmxapi ensemble data bundles of the above. 201 """ 202 super(DataSourceCollection, self).__init__() 203 for name, value in kwargs.items(): 204 self[name] = value 205 206 def __setitem__(self, key: str, value: SourceTypeVar) -> None: 207 if not isinstance(key, str): 208 raise exceptions.TypeError('Data must be named with str type.') 209 # TODO: Encapsulate handling of proferred data sources to Context details. 210 # Preprocessed input should be self-describing gmxapi data types. Structured 211 # input must be recursively (depth-first) converted to gmxapi data types. 212 # TODO: Handle gmxapi Futures stored as dictionary elements! 213 if not isinstance(value, valid_source_types): 214 if isinstance(value, collections.abc.Iterable): 215 # Iterables here are treated as arrays, but we do not have a robust typing system. 216 # Warning: In the initial implementation, the iterable may contain Future objects. 217 # TODO: (#2993) Revisit as we sort out data shape and Future protocol. 218 try: 219 value = datamodel.ndarray(value) 220 except (exceptions.ValueError, exceptions.TypeError) as e: 221 raise exceptions.TypeError('Iterable could not be converted to NDArray: {}'.format(value)) from e 222 elif hasattr(value, 'result'): 223 # A Future object. 224 pass 225 else: 226 raise exceptions.ApiError('Cannot process data source {}'.format(value)) 227 super().__setitem__(key, value) 228 229 def reset(self): 230 """Reset all sources in the collection.""" 231 for source in self.values(): 232 if hasattr(source, 'reset'): 233 source.reset() 234 if hasattr(source, '_reset'): 235 source._reset() 236 237 def __hash__(self): 238 """Provide some sort of unique identifier. 239 240 We need a more deterministic fingerprinting scheme with well-specified 241 uniqueness semantics, but right now we just need something reasonably 242 unique. 243 """ 244 hashed_keys_and_values = tuple(hash(entity) for item in self.items() for entity in item) 245 return hash(hashed_keys_and_values) 246 247 248def computed_result(function): 249 """Decorate a function to get a helper that produces an object with Result behavior. 250 251 When called, the new function produces an ImmediateResult object. 252 253 The new function has the same signature as the original function, but can accept 254 gmxapi data proxies, assuming the provided proxy objects represent types 255 compatible with the original signature. 256 257 Calls to `result()` return the value that `function` would return when executed 258 in the local context with the inputs fully resolved. 259 260 The API does not specify when input data dependencies will be resolved 261 or when the wrapped function will be executed. That is, ``@computed_result`` 262 functions may force immediate resolution of data dependencies and/or may 263 be called more than once to satisfy dependent operation inputs. 264 """ 265 266 # Attempt to inspect function signature. Introspection can fail, so we catch 267 # the various exceptions. We re-raise as ApiError, indicating a bug, because 268 # we should do more to handle this intelligently and provide better user 269 # feedback. 270 # TODO: Figure out what to do with exceptions where this introspection 271 # and rebinding won't work. 272 # ref: https://docs.python.org/3/library/inspect.html#introspecting-callables-with-the-signature-object 273 try: 274 sig = inspect.signature(function) 275 except TypeError as T: 276 raise exceptions.ApiError('Can not inspect type of provided function argument.') from T 277 except ValueError as V: 278 raise exceptions.ApiError('Can not inspect provided function signature.') from V 279 280 wrapped_function = function_wrapper()(function) 281 282 @functools.wraps(function) 283 def new_function(*args, **kwargs): 284 # The signature of the new function will accept abstractions 285 # of whatever types it originally accepted. This wrapper must 286 # * Create a mapping to the original call signature from `input` 287 # * Add handling for typed abstractions in wrapper function. 288 # * Process arguments to the wrapper function into `input` 289 290 # 1. Inspect the return annotation to determine valid gmxapi type(s) 291 # 2. Generate a Result object advertising the correct type, bound to the 292 # Input and implementing function. 293 # 3. Transform the result() data to the correct type. 294 295 # TODO: (FR3+) create a serializable data structure for inputs discovered 296 # from function introspection. 297 298 for name, param in sig.parameters.items(): 299 assert not param.kind == param.POSITIONAL_ONLY 300 bound_arguments = sig.bind(*args, **kwargs) 301 handle = wrapped_function(**bound_arguments.arguments) 302 output = handle.output 303 # TODO: Find a type hinting / generic way to assert output attributes. 304 return output.data 305 306 return new_function 307 308 309class OutputCollectionDescription(collections.OrderedDict): 310 def __init__(self, **kwargs): 311 """Create the output description for an operation node from a dictionary of names and types.""" 312 outputs = [] 313 for name, flavor in kwargs.items(): 314 if not isinstance(name, str): 315 raise exceptions.TypeError('Output descriptions are keyed by Python strings.') 316 # Multidimensional outputs are explicitly NDArray 317 if issubclass(flavor, (list, tuple)): 318 flavor = datamodel.NDArray 319 assert issubclass(flavor, valid_result_types) 320 outputs.append((name, flavor)) 321 super().__init__(outputs) 322 323 324class InputCollectionDescription(collections.OrderedDict): 325 """Describe acceptable inputs for an Operation. 326 327 Generally, an InputCollectionDescription is an aspect of the public API by 328 which an Operation expresses its possible inputs. This class includes details 329 of the Python package. 330 331 Keyword Arguments: 332 parameters : A sequence of named parameter descriptions. 333 334 Parameter descriptions are objects containing an `annotation` attribute 335 declaring the data type of the parameter and, optionally, a `default` 336 attribute declaring a default value for the parameter. 337 338 Instances can be used as an ordered map of parameter names to gmxapi data types. 339 340 Analogous to inspect.Signature, but generalized for gmxapi Operations. 341 Additional notable differences: typing is normalized at initialization, and 342 the bind() method does not return an object that can be directly used as 343 function input. The object returned from bind() is used to construct a data 344 graph Edge for subsequent execution. 345 """ 346 347 def __init__(self, parameters: typing.Iterable[typing.Tuple[str, inspect.Parameter]]): 348 """Create the input description for an operation node from a dictionary of names and types.""" 349 inputs = [] 350 for name, param in parameters: 351 if not isinstance(name, str): 352 raise exceptions.TypeError('Input descriptions are keyed by Python strings.') 353 # Multidimensional inputs are explicitly NDArray 354 dtype = param.annotation 355 if issubclass(dtype, collections.abc.Iterable) \ 356 and not issubclass(dtype, (str, bytes, collections.abc.Mapping)): 357 # TODO: we can relax this with some more input conditioning. 358 if dtype != datamodel.NDArray: 359 raise exceptions.UsageError( 360 'Cannot accept input type {}. Sequence type inputs must use NDArray.'.format(param)) 361 assert issubclass(dtype, valid_result_types) 362 if hasattr(param, 'kind'): 363 disallowed = any([param.kind == param.POSITIONAL_ONLY, 364 param.kind == param.VAR_POSITIONAL, 365 param.kind == param.VAR_KEYWORD]) 366 if disallowed: 367 raise exceptions.ProtocolError( 368 'Cannot wrap function. Operations must have well-defined parameter names.') 369 kind = param.kind 370 else: 371 kind = inspect.Parameter.POSITIONAL_OR_KEYWORD 372 if hasattr(param, 'default'): 373 default = param.default 374 else: 375 default = inspect.Parameter.empty 376 inputs.append(inspect.Parameter(name, kind, default=default, annotation=dtype)) 377 super().__init__([(input.name, input.annotation) for input in inputs]) 378 self.signature = inspect.Signature(inputs) 379 380 @staticmethod 381 def from_function(function): 382 """Inspect a function to be wrapped. 383 384 Used internally by gmxapi.operation.function_wrapper() 385 386 Raises: 387 exceptions.ProtocolError if function signature cannot be determined to be valid. 388 389 Returns: 390 InputCollectionDescription for the function input signature. 391 """ 392 # First, inspect the function. 393 assert callable(function) 394 signature = inspect.signature(function) 395 # The function must have clear and static input schema 396 # Make sure that all parameters have clear names, whether or not they are used in a call. 397 for name, param in signature.parameters.items(): 398 disallowed = any([param.kind == param.POSITIONAL_ONLY, 399 param.kind == param.VAR_POSITIONAL, 400 param.kind == param.VAR_KEYWORD]) 401 if disallowed: 402 raise exceptions.ProtocolError( 403 'Cannot wrap function. Operations must have well-defined parameter names.') 404 if param.name == 'input': 405 raise exceptions.ProtocolError('Function signature includes the (reserved) "input" keyword argument.') 406 description = collections.OrderedDict() 407 for param in signature.parameters.values(): 408 if param.name == 'output': 409 # Wrapped functions may accept the output parameter to publish results, but 410 # that is not part of the Operation input signature. 411 continue 412 if param.annotation == param.empty: 413 if param.default == param.empty or param.default is None: 414 raise exceptions.ProtocolError('Could not infer parameter type for {}'.format(param.name)) 415 dtype = type(param.default) 416 if isinstance(dtype, collections.abc.Iterable) \ 417 and not isinstance(dtype, (str, bytes, collections.abc.Mapping)): 418 dtype = datamodel.NDArray 419 else: 420 dtype = param.annotation 421 description[param.name] = param.replace(annotation=dtype) 422 return InputCollectionDescription(description.items()) 423 424 def bind(self, *args, **kwargs) -> DataSourceCollection: 425 """Create a compatible DataSourceCollection from provided arguments. 426 427 Pre-process input and function signature to get named input arguments. 428 429 This is a helper function to allow calling code to characterize the 430 arguments in a Python function call with hints from the factory that is 431 initializing an operation. Its most useful functionality is to allows a 432 factory to accept positional arguments where named inputs are usually 433 required. It also allows data sources to participate in multiple 434 DataSourceCollections with minimal constraints. 435 436 Note that the returned object has had data populated from any defaults 437 described in the InputCollectionDescription. 438 439 See wrapped_function_runner() and describe_function_input(). 440 """ 441 # For convenience, accept *args, but convert to **kwargs to pass to Operation. 442 # Factory accepts an unadvertised `input` keyword argument that is used as a default kwargs dict. 443 # If present, kwargs['input'] is treated as an input "pack" providing _default_ values. 444 input_kwargs = collections.OrderedDict() 445 if 'input' in kwargs: 446 provided_input = kwargs.pop('input') 447 if provided_input is not None: 448 input_kwargs.update(provided_input) 449 # `function` may accept an `output` keyword argument that should not be supplied to the factory. 450 for key, value in kwargs.items(): 451 if key == 'output': 452 raise exceptions.UsageError('Invalid keyword argument: output (reserved).') 453 input_kwargs[key] = value 454 try: 455 bound_arguments = self.signature.bind_partial(*args, **input_kwargs) 456 except TypeError as e: 457 raise exceptions.UsageError('Could not bind operation parameters to function signature.') from e 458 assert 'output' not in bound_arguments.arguments 459 bound_arguments.apply_defaults() 460 assert 'input' not in bound_arguments.arguments 461 input_kwargs = collections.OrderedDict([pair for pair in bound_arguments.arguments.items()]) 462 if 'output' in input_kwargs: 463 input_kwargs.pop('output') 464 return DataSourceCollection(**input_kwargs) 465 466 467class ProxyDataDescriptor(object): 468 """Base class for data descriptors used in DataProxyBase subclasses. 469 470 Subclasses should either not define __init__ or should call the base class 471 __init__ explicitly: super().__init__(self, name, dtype) 472 """ 473 474 def __init__(self, name: str, dtype: ResultTypeVar = None): 475 self._name = name 476 # TODO: We should not allow dtype==None, but we currently have a weak data 477 # model that does not allow good support of structured Futures. 478 if dtype is not None: 479 assert isinstance(dtype, type) 480 assert issubclass(dtype, valid_result_types) 481 self._dtype = dtype 482 483 484class DataProxyMeta(abc.ABCMeta): 485 # Key word arguments consumed by __prepare__ 486 _prepare_keywords = ('descriptors',) 487 488 @classmethod 489 def __prepare__(mcs, name, bases, descriptors: collections.abc.Mapping = None): 490 """Allow dynamic sub-classing. 491 492 DataProxy class definitions are collections of data descriptors. This 493 class method allows subclasses to give the descriptor names and type(s) 494 in the class declaration as arguments instead of as class attribute 495 assignments. 496 497 class MyProxy(DataProxyBase, descriptors={name: MyDescriptor() for name in datanames}): pass 498 499 Note: 500 If we are only using this metaclass for the __prepare__ hook by the 501 time we require Python >= 3.6, we could reimplement __prepare__ as 502 DataProxyBase.__init_subclass__ and remove this metaclass. 503 """ 504 if descriptors is None: 505 return {} 506 elif isinstance(descriptors, tuple): 507 namespace = collections.OrderedDict([(d._name, d) for d in descriptors]) 508 return namespace 509 else: 510 assert isinstance(descriptors, collections.abc.Mapping) 511 return descriptors 512 513 def __new__(cls, name, bases: typing.Iterable, namespace, **kwargs): 514 for key in kwargs: 515 if key not in DataProxyMeta._prepare_keywords: 516 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key)) 517 # See note about DataProxyBase._reserved. 518 if '_reserved' not in namespace and not any(hasattr(base, '_reserved') for base in bases): 519 raise exceptions.ApiError( 520 'We currently expect DataProxy classes to provide a list of reserved attribute names.') 521 for key in namespace: 522 # Here we can check conformance with naming and typing rules. 523 assert isinstance(key, str) 524 if key.startswith('__'): 525 # Skip non-public attributes. 526 continue 527 descriptor = namespace[key] 528 # The purpose of the current data proxies is to serve as a limited namespace 529 # containing only descriptors of a certain type. In the future, these proxies 530 # may be flattened into a facet of a richer OperationHandle class 531 # (this metaclass may become a decorator on an implementation class), 532 # but for now we check that the class is being used according to the 533 # documented framework. A nearer term update could be to restrict the 534 # type of the data descriptor: 535 # TODO: Use a member type of the derived cls (or a mix-in base) to specify a particular 536 # ProxyDataDescriptor subclass. 537 # Also, see note about DataProxyBase._reserved 538 if not isinstance(descriptor, ProxyDataDescriptor): 539 if key not in namespace['_reserved'] and not any(key in getattr(base, '_reserved') for base in 540 bases if hasattr(base, '_reserved')): 541 raise exceptions.ApiError('Unexpected data proxy attribute {}: {}'.format(key, repr(descriptor))) 542 else: 543 assert isinstance(descriptor, ProxyDataDescriptor) 544 if not isinstance(descriptor._name, str) or descriptor._name == '': 545 descriptor._name = key 546 else: 547 if descriptor._name != key: 548 raise exceptions.ApiError( 549 'Descriptor internal name {} does not match attribute name {}'.format( 550 descriptor._name, key)) 551 return super().__new__(cls, name, bases, namespace) 552 553 # TODO: This keyword argument stripping is not necessary in more recent Python versions. 554 # When Python minimum required version is increased, check if we can remove this. 555 def __init__(cls, name, bases, namespace, **kwargs): 556 for key in kwargs: 557 if key not in DataProxyMeta._prepare_keywords: 558 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key)) 559 super().__init__(name, bases, namespace) 560 561 # TODO: See if we can use __dir__ in the metaclass to help hint class attributes for better tab completion. 562 # Ref: https://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion 563 # def __dir__(self) -> Iterable[str]: 564 # return super().__dir__() 565 566 567class DataProxyBase(collections.abc.Mapping, metaclass=DataProxyMeta): 568 """Limited interface to managed resources. 569 570 Inherit from DataProxy to specialize an interface to an ``instance``. 571 In the derived class, either do not define ``__init__`` or be sure to 572 initialize the super class (DataProxy) with an instance of the object 573 to be proxied. 574 575 A class deriving from DataProxyBase allows its instances to provide a namespace 576 for proxies to named data by defining attributes that are data descriptors 577 (subclasses of ProxyDataDescriptor). 578 The ProxyDataDescriptors are accessed as attributes of the 579 data proxy instance or by iterating on items(). Attributes that are not 580 ProxyDataDescriptors are possible, but will not be returned by items() which 581 is a necessary part of gmxapi execution protocol. 582 583 Acts as an owning handle to the resources provide by ``instance``, 584 preventing the reference count of ``instance`` from going to zero for the 585 lifetime of the proxy object. 586 587 When sub-classing DataProxyBase, data descriptors can be passed as a mapping 588 to the ``descriptors`` key word argument in the class declaration. This 589 allows data proxy subclasses to be easily defined dynamically. 590 591 mydescriptors = {'foo': Publisher('foo', int), 'data': Publisher('data', float)} 592 ... 593 class MyDataProxy(DataProxyBase, descriptors=mydescriptors): pass 594 assert hasattr(MyDataProxy, 'foo') 595 596 """ 597 # This class attribute (which subclasses are free to replace to augment) is an 598 # indication of a problem with the current data model. If we are allowing 599 # reserved words that would otherwise be valid data names, there is not a 600 # compelling reason for separate data proxy classes: we throw away the assertion 601 # that we are preparing a clean namespace and we could have accomplished the 602 # class responsibilities in the Operation handle with just descriptor classes. 603 # If we want the clean namespace, we should figure out how to keep this interface 604 # from growing and/or have some "hidden" internal interface. 605 _reserved = ('ensemble_width', 'items', '_reserved') 606 607 # This class can be expanded to be the attachment point for a metaclass for 608 # data proxies such as PublishingDataProxy or OutputDataProxy, which may be 609 # defined very dynamically and concisely as a set of Descriptors and a type() 610 # call. 611 # If development in this direction does not materialize, then this base 612 # class is not very useful and should be removed. 613 def __init__(self, instance: 'SourceResource', client_id: int = None): 614 """Get partial ownership of a resource provider. 615 616 Arguments: 617 instance : resource-owning object 618 client_id : identifier for client holding the resource handle (e.g. ensemble member id) 619 620 If client_id is not provided, the proxy scope is for all clients. 621 """ 622 # TODO: Decide whether _resource_instance is public or not. 623 # Note: currently commonly needed for subclass implementations. 624 self._resource_instance = instance 625 # Developer note subclasses should handle self._client_identifier == None 626 self._client_identifier = client_id 627 # Collection is fixed by the time of instance creation, so cache it. 628 self.__keys = tuple([key for key, _ in self.items()]) 629 self.__length = len(self.__keys) 630 631 @property 632 def ensemble_width(self) -> int: 633 return self._resource_instance.width() 634 635 @classmethod 636 def items(cls): 637 """Generator for tuples of attribute name and descriptor instance. 638 639 This almost certainly doesn't do quite what we want... 640 """ 641 for name, value in cls.__dict__.items(): 642 if isinstance(value, ProxyDataDescriptor): 643 yield name, value 644 645 def __getitem__(self, k): 646 if hasattr(self, k): 647 return getattr(self, k) 648 649 def __len__(self): 650 return self.__length 651 652 def __iter__(self): 653 for key in self.__keys: 654 yield key 655 656 657class Publisher(ProxyDataDescriptor): 658 """Data descriptor for write access to a specific named data resource. 659 660 For a wrapped function receiving an ``output`` argument, provides the 661 accessors for an attribute on the object passed as ``output``. Maps 662 read and write access by the wrapped function to appropriate details of 663 the resource manager. 664 665 Used internally to implement settable attributes on PublishingDataProxy. 666 Allows PublishingDataProxy to be dynamically defined in the scope of the 667 operation.function_wrapper closure. Each named output is represented by 668 an instance of Publisher in the PublishingDataProxy class definition for 669 the operation. 670 671 Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors 672 673 Collaborations: 674 Relies on implementation details of ResourceManager. 675 """ 676 677 def __get__(self, instance: DataProxyBase, owner): 678 if instance is None: 679 # The current access has come through the class attribute of owner class 680 return self 681 resource_manager = instance._resource_instance 682 client_id = instance._client_identifier 683 # TODO: Fix API scope. 684 # Either this class is a detail of the same implementation as ResourceManager, 685 # or we need to enforce that instance._resource_instance provides _data (or equivalent) 686 assert isinstance(resource_manager, ResourceManager) 687 if client_id is None: 688 return getattr(resource_manager._data, self._name) 689 else: 690 return getattr(resource_manager._data, self._name)[client_id] 691 692 def __set__(self, instance: DataProxyBase, value): 693 resource_manager = instance._resource_instance 694 # TODO: Fix API scope. 695 # Either this class is a detail of the same implementation as ResourceManager, 696 # or we need to enforce that instance._resource_instance provides _data (or equivalent) 697 assert isinstance(resource_manager, ResourceManager) 698 client_id = instance._client_identifier 699 resource_manager.set_result(name=self._name, value=value, member=client_id) 700 701 def __repr__(self): 702 return '{}(name={}, dtype={})'.format(self.__class__.__name__, 703 self._name, 704 self._dtype.__qualname__) 705 706 707def define_publishing_data_proxy(output_description) -> typing.Type[DataProxyBase]: 708 """Returns a class definition for a PublishingDataProxy for the provided output description.""" 709 # This dynamic type creation hides collaborations with things like make_datastore. 710 # We should encapsulate these relationships in Context details, explicit collaborations 711 # between specific operations and Contexts, and in groups of Operation definition helpers. 712 713 descriptors = collections.OrderedDict([(name, Publisher(name)) for name in output_description]) 714 715 class PublishingDataProxy(DataProxyBase, descriptors=descriptors): 716 """Handler for write access to the `output` of an operation. 717 718 Acts as a sort of PublisherCollection. 719 """ 720 721 return PublishingDataProxy 722 723 724# get symbols we can use to annotate input and output types more specifically. 725_OutputDataProxyType = typing.TypeVar('_OutputDataProxyType', bound=DataProxyBase) 726_PublishingDataProxyType = typing.TypeVar('_PublishingDataProxyType', bound=DataProxyBase) 727# Currently, the ClientID type is an integer, but this may change. 728ClientID = typing.NewType('ClientID', int) 729 730 731class _Resources(typing.Generic[_PublishingDataProxyType]): 732 pass 733 734 735# TODO: Why generic in publishingdataproxytype? 736class SourceResource(typing.Generic[_OutputDataProxyType, _PublishingDataProxyType]): 737 """Resource Manager for a data provider. 738 739 Supports Future instances in a particular context. 740 """ 741 742 # Note: ResourceManager members not yet included: 743 # future(), _data, set_result. 744 745 # This might not belong here. Maybe separate out for a OperationHandleManager? 746 @abc.abstractmethod 747 def data(self) -> _OutputDataProxyType: 748 """Get the output data proxy.""" 749 # Warning: this should probably be renamed, but "output_data_proxy" is already 750 # a member in at least one derived class. 751 ... 752 753 @abc.abstractmethod 754 def is_done(self, name: str) -> bool: 755 return False 756 757 @abc.abstractmethod 758 def get(self, name: str) -> 'OutputData': 759 ... 760 761 @abc.abstractmethod 762 def update_output(self): 763 """Bring the _data member up to date and local.""" 764 pass 765 766 @abc.abstractmethod 767 def reset(self): 768 """Recursively reinitialize resources. 769 770 Set the resource manager to its initialized state. 771 All outputs are marked not "done". 772 All inputs supporting the interface have ``_reset()`` called on them. 773 """ 774 775 @abc.abstractmethod 776 def width(self) -> int: 777 """Ensemble width of the managed resources.""" 778 ... 779 780 @abc.abstractmethod 781 def future(self, name: str, description: ResultDescription) -> 'Future': 782 """Get a Future handle for managed data. 783 784 Resource managers owned by subclasses of gmx.operation.Context provide 785 this method to get references to output data. 786 787 In addition to the interface described by gmx.abc.Future, returned objects 788 provide the interface described by gmx.operation.Future. 789 """ 790 791 792class StaticSourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]): 793 """Provide the resource manager interface for local static data. 794 795 Allow data transformations on the proxied resource. 796 797 Keyword Args: 798 proxied_data: A gmxapi supported data object. 799 width: Size of (one-dimensional) shaped data produced by function. 800 function: Transformation to perform on the managed data. 801 802 The callable passed as ``function`` must accept a single argument. The 803 argument will be an iterable when proxied_data represents an ensemble, 804 or an object of the same type as proxied_data otherwise. 805 """ 806 807 def __init__(self, *, name: str, proxied_data, width: int, function: typing.Callable): 808 assert not isinstance(proxied_data, Future) 809 if hasattr(proxied_data, 'width'): 810 # Ensemble data source 811 assert hasattr(proxied_data, 'source') 812 self._result = function(proxied_data.source) 813 else: 814 self._result = function(proxied_data) 815 if width > 1: 816 if isinstance(self._result, (str, bytes)): 817 # In this case, do not implicitly broadcast 818 raise exceptions.ValueError('"function" produced data incompatible with "width".') 819 else: 820 if not isinstance(self._result, collections.abc.Iterable): 821 raise exceptions.DataShapeError( 822 'Expected iterable of size {} but "function" result is not iterable.') 823 data = list(self._result) 824 size = len(data) 825 if len(data) != width: 826 raise exceptions.DataShapeError( 827 'Expected iterable of size {} but "function" produced a {} of size {}'.format(width, type(data), 828 size)) 829 dtype = type(data[0]) 830 else: 831 if width != 1: 832 raise exceptions.ValueError('width must be an integer 1 or greater.') 833 dtype = type(self._result) 834 if issubclass(dtype, (list, tuple)): 835 dtype = datamodel.NDArray 836 data = [datamodel.ndarray(self._result)] 837 elif isinstance(self._result, collections.abc.Iterable): 838 if not isinstance(self._result, (str, bytes, dict)): 839 raise exceptions.ValueError( 840 'Expecting width 1 but "function" produced iterable type {}.'.format(type(self._result))) 841 else: 842 dtype = str 843 data = [str(self._result)] 844 else: 845 data = [self._result] 846 description = ResultDescription(dtype=dtype, width=width) 847 self._data = OutputData(name=name, description=description) 848 for member in range(width): 849 self._data.set(data[member], member=member) 850 851 output_collection_description = OutputCollectionDescription(**{name: dtype}) 852 self.output_data_proxy = define_output_data_proxy(output_description=output_collection_description) 853 854 def is_done(self, name: str) -> bool: 855 return True 856 857 def get(self, name: str) -> 'OutputData': 858 assert self._data.name == name 859 return self._data 860 861 def data(self) -> _OutputDataProxyType: 862 return self.output_data_proxy(self) 863 864 def width(self) -> int: 865 # TODO: It looks like the OutputData ResultDescription probably belongs 866 # in the public interface. 867 return self._data._description.width 868 869 def update_output(self): 870 pass 871 872 def reset(self): 873 pass 874 875 def future(self, name: str, description: ResultDescription) -> 'Future': 876 return Future(self, name, description=description) 877 878 879class ProxyResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]): 880 """Act as a resource manager for data managed by another resource manager. 881 882 Allow data transformations on the proxied resource. 883 884 Keyword Args: 885 proxied_future: An object implementing the Future interface. 886 width: Size of (one-dimensional) shaped data produced by function. 887 function: Transformation to perform on the result of proxied_future. 888 889 The callable passed as ``function`` must accept a single argument, which will 890 be an iterable when proxied_future represents an ensemble, or an object of 891 type proxied_future.description.dtype otherwise. 892 """ 893 894 def __init__(self, proxied_future: 'Future', width: int, function: typing.Callable): 895 self._done = False 896 self._proxied_future = proxied_future 897 self._width = width 898 self.name = self._proxied_future.name 899 self._result = None 900 assert callable(function) 901 self.function = function 902 903 def width(self) -> int: 904 return self._width 905 906 def reset(self): 907 self._done = False 908 self._proxied_future._reset() 909 self._result = None 910 911 def is_done(self, name: str) -> bool: 912 return self._done 913 914 def get(self, name: str): 915 if name != self.name: 916 raise exceptions.ValueError('Request for unknown data.') 917 if not self.is_done(name): 918 raise exceptions.ProtocolError('Data not ready.') 919 result = self.function(self._result) 920 if self._width != 1: 921 # TODO Fix this typing nightmare: 922 # ResultDescription should be fully knowable and defined when the resource manager is initialized. 923 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result[0]), width=self._width)) 924 for member, value in enumerate(result): 925 data.set(value, member) 926 else: 927 data = OutputData(name=self.name, description=ResultDescription(dtype=type(result), width=self._width)) 928 data.set(result, 0) 929 return data 930 931 def update_output(self): 932 self._result = self._proxied_future.result() 933 self._done = True 934 935 def data(self) -> _OutputDataProxyType: 936 raise exceptions.ApiError('ProxyResourceManager cannot yet manage a full OutputDataProxy.') 937 938 def future(self, name: str, description: ResultDescription): 939 return Future(self, name, description=description) 940 941 942class AbstractOperation(typing.Generic[_OutputDataProxyType]): 943 """Client interface to an operation instance (graph node). 944 945 Note that this is a generic abstract class. Subclasses should provide a 946 class subscript to help static type checkers. 947 """ 948 949 @abc.abstractmethod 950 def run(self): 951 """Assert execution of an operation. 952 953 After calling run(), the operation results are guaranteed to be available 954 in the local context. 955 """ 956 957 @property 958 @abc.abstractmethod 959 def output(self) -> _OutputDataProxyType: 960 """Get a proxy collection to the output of the operation. 961 962 Developer note: The 'output' property exists to isolate the namespace of 963 output data from other operation handle attributes and we should consider 964 whether it is actually necessary or helpful. To facilitate its possible 965 future removal, do not enrich its interface beyond that of a collection 966 of OutputDescriptor attributes. 967 """ 968 ... 969 970 971class OperationRegistryKey(collections.namedtuple('OperationRegistryKey', 'namespace name'), collections.abc.Hashable): 972 """Helper class to define the key type for OperationRegistry.""" 973 974 def __hash__(self): 975 return hash((self.namespace, self.name)) 976 977 def __eq__(self, other): 978 """Test equivalence rather than identity. 979 980 Note: Use `is` to test identity. 981 """ 982 return other.namespace == self.namespace and other.name == self.name 983 984 def __str__(self): 985 return '.'.join([self.namespace, self.name]) 986 987 def __repr__(self): 988 return '{}(namespace={}, name={})'.format(self.__qualname__, self.namespace, self.name) 989 990 991def _make_registry_key(*args) -> OperationRegistryKey: 992 """Normalize input to an OperationRegistryKey. 993 994 Used to implement OperationRegistry.__getitem__(), which catches and converts 995 the various exceptions this helper function can produce. 996 """ 997 if len(args) > 1: 998 return OperationRegistryKey(*args) 999 else: 1000 if len(args) != 1: 1001 raise exceptions.UsageError('Empty index value passed to OperationRegistry instance[].') 1002 item = args[0] 1003 if isinstance(item, OperationRegistryKey): 1004 return item 1005 if isinstance(item, str): 1006 namespace, name = item.rsplit(sep='.', maxsplit=1) 1007 return OperationRegistryKey(namespace=namespace, name=name) 1008 # Item could be a class object or an instance object... 1009 if hasattr(item, 'namespace') and hasattr(item, 'name'): 1010 if callable(item.namespace): 1011 namespace = item.namespace() 1012 else: 1013 namespace = item.namespace 1014 if callable(item.name): 1015 name = item.name() 1016 else: 1017 name = item.name 1018 return OperationRegistryKey(namespace=namespace, name=name) 1019 raise exceptions.ValueError('Not a usable OperationRegistryKey: {}'.format(item)) 1020 1021 1022class OperationRegistry(collections.UserDict): 1023 """Helper class to map identifiers to Operation implementation instances. 1024 1025 This is an implementation detail of gmxapi.operation and should not be used from 1026 outside of the package until a stable interface can be specified. 1027 """ 1028 1029 @typing.overload 1030 def __getitem__(self, item: OperationRegistryKey): 1031 ... 1032 1033 @typing.overload 1034 def __getitem__(self, item: str): 1035 ... 1036 1037 def __getitem__(self, *args): 1038 """Fetch the requested operation registrant. 1039 1040 The getter is overloaded to support look-ups in multiple forms. 1041 1042 The key can be given in the following forms. 1043 * As a period-delimited string of the form "namespace.operation". 1044 * As an OperationRegistryKey object. 1045 * As a sequence of positional arguments accepted by OperationRegistryKey. 1046 * As an object conforming to the OperationRegistryKey interface. 1047 """ 1048 try: 1049 item = _make_registry_key(*args) 1050 except exceptions.Error as e: 1051 raise exceptions.TypeError('Could not interpret key as a OperationRegistryKey.') from e 1052 return self.data[item] 1053 1054 1055# Module level data store for locating operation implementations at run time. 1056# TODO: This may make sense as instance data of a root Context instance, but we 1057# don't have a stable interface for communicating between Contexts yet. 1058# Alternatively, it could be held as class data in a RegisteredOperation class, 1059# but note that the "register" member function would behave less like the abc.ABCMeta 1060# support for "virtual subclassing" and more like the generic function machinery 1061# of e.g. functools.singledispatch. 1062_operation_registry = OperationRegistry() 1063 1064 1065def _register_operation(cls: typing.Type[OperationImplementation]): 1066 assert isinstance(cls, type) 1067 assert issubclass(cls, OperationImplementation) 1068 operation = _make_registry_key(cls) 1069 if operation in _operation_registry: 1070 full_name = str(operation) 1071 raise exceptions.ProtocolError('Attempting to redefine operation {}.'.format(full_name)) 1072 _operation_registry[operation] = cls 1073 1074 1075# TODO: replace with a generic function that we dispatch on so the type checker can infer a return type. 1076def _get_operation_director(operation, context: gmx.abc.Context): 1077 """ 1078 1079 :param operation: 1080 :param context: 1081 :return: gmxapi.abc.OperationDirector 1082 """ 1083 registrant = _operation_registry[operation] 1084 director = registrant.director(context=context) 1085 return director 1086 1087 1088class InputDescription(abc.ABC): 1089 """Node input description for gmxapi.operation module. 1090 1091 Provides the support needed to understand operation inputs in gmxapi.operation 1092 module Contexts. 1093 1094 .. todo:: Implementation base class with heritable behavior and/or helpers to 1095 compose this functionality from more normative description of 1096 operation inputs. This will probably become a facet of the ResourceDirector 1097 when specialized for gmxapi.operation.Context. 1098 """ 1099 1100 @abc.abstractmethod 1101 def signature(self) -> InputCollectionDescription: 1102 """Mapping of named inputs and input type. 1103 1104 Used to determine valid inputs before an Operation node is created. 1105 1106 Collaborations: 1107 Related to the operation resource factory for this context. 1108 1109 .. todo:: 1110 Better unification of this protocol, InputCollectionDescription, and 1111 resource factory. 1112 Note, also, that the *bind* method of the returned InputCollectionDescription 1113 serves as the resource factory for input to the node builder. 1114 """ 1115 ... 1116 1117 @abc.abstractmethod 1118 def make_uid(self, input: 'DataEdge') -> str: 1119 """The unique identity of an operation node tags the output with respect to the input. 1120 1121 Combines information on the Operation details and the input to uniquely 1122 identify the Operation node. 1123 1124 Arguments: 1125 input : A (collection of) data source(s) that can provide Fingerprints. 1126 1127 Used internally by the Context to manage ownership of data sources, to 1128 locate resources for nodes in work graphs, and to manage serialization, 1129 deserialization, and checkpointing of the work graph. 1130 1131 The UID is a detail of the generic Operation that _should_ be independent 1132 of the Context details to allow the framework to manage when and where 1133 an operation is executed. 1134 1135 TODO: We probably don't want to allow Operations to single-handedly determine their 1136 own uniqueness, but they probably should participate in the determination with the Context. 1137 1138 TODO: Context implementations should be allowed to optimize handling of 1139 equivalent operations in different sessions or work graphs, but we do not 1140 yet guarantee that UIDs are globally unique! 1141 1142 To be refined... 1143 """ 1144 ... 1145 1146 1147class ConcreteInputDescription(InputDescription): 1148 """Simple composed InputDescription.""" 1149 1150 def __init__(self, 1151 input_signature: InputCollectionDescription, 1152 uid_helper: typing.Callable[['DataEdge'], str] 1153 ): 1154 self._input_signature_description = input_signature 1155 self._uid_helper = uid_helper 1156 1157 def signature(self) -> InputCollectionDescription: 1158 return self._input_signature_description 1159 1160 def make_uid(self, input: 'DataEdge') -> str: 1161 return self._uid_helper(input) 1162 1163 1164class OperationMeta(abc.ABCMeta): 1165 """Metaclass to manage the definition of Operation implementation classes. 1166 1167 Note that this metaclass can be superseded by `__init_subclass__()` when 1168 the minimum Python version is increased to Python 3.6+. 1169 """ 1170 1171 def __new__(meta, name, bases, class_dict): 1172 cls = super().__new__(meta, name, bases, class_dict) 1173 # Register subclasses, but not the base class. 1174 if issubclass(cls, OperationImplementation) and cls is not OperationImplementation: 1175 # TODO: Remove OperationDetailsBase layer and this extra check. 1176 # Note: we do not yet register the Operations built dynamically because we 1177 # don't have a clear definition of unique implementations yet. For instance, 1178 # a new OperationDetails class is defined for each call to gmx.join_arrays 1179 # TODO: Properly register and reuse Operations defined dynamically 1180 # through function_wrapper (currently encompassed by OperationDetailsBase subclasses) 1181 if name != 'OperationDetailsBase': 1182 if OperationDetailsBase not in bases: 1183 _register_operation(cls) 1184 return cls 1185 1186 1187class OperationDetailsBase(OperationImplementation, InputDescription, 1188 metaclass=OperationMeta): 1189 """Abstract base class for Operation details in this module's Python Context. 1190 1191 Provides necessary interface for use with gmxapi.operation.ResourceManager. 1192 Separates the details of an Operation from those of the ResourceManager in 1193 a given Context. 1194 1195 OperationDetails classes are almost stateless, serving mainly to compose implementation 1196 details. Instances (operation objects) provide the Context-dependent interfaces 1197 for a specific node in a work graph. 1198 1199 OperationDetails subclasses are created dynamically by function_wrapper and 1200 make_operation. 1201 1202 Developer note: when subclassing, note that the ResourceManager is responsible 1203 for managing Operation state. Do not add instance data members related to 1204 computation or output state. 1205 1206 TODO: determine what is acceptable instance data and/or initialization information. 1207 Note that currently the subclass in function_wrapper has _no_ initialization input, 1208 but does not yet handle input-dependent output specification or node fingerprinting. 1209 It seems likely that instance initialization will require some characterization of 1210 supplied input, but nothing else. Even that much is not necessary if the instance 1211 is completely stateless, but that would require additional parameters to the member 1212 functions. However, an instance should be tied to a specific ResourceManager and 1213 Context, so weak references to these would be reasonable. 1214 """ 1215 1216 @abc.abstractmethod 1217 def output_description(self) -> OutputCollectionDescription: 1218 """Mapping of available outputs and types for an existing Operation node.""" 1219 ... 1220 1221 @abc.abstractmethod 1222 def publishing_data_proxy(self, *, 1223 instance: SourceResource[typing.Any, _PublishingDataProxyType], 1224 client_id) -> _PublishingDataProxyType: 1225 """Factory for Operation output publishing resources. 1226 1227 Used internally when the operation is run with resources provided by instance.""" 1228 ... 1229 1230 @abc.abstractmethod 1231 def output_data_proxy(self, 1232 instance: SourceResource[_OutputDataProxyType, typing.Any] 1233 ) -> _OutputDataProxyType: 1234 """Get an object that can provide Futures for output data managed by instance.""" 1235 ... 1236 1237 @abc.abstractmethod 1238 def __call__(self, resources: _Resources): 1239 """Execute the operation with provided resources. 1240 1241 Resources are prepared in an execution context with aid of resource_director() 1242 1243 After the first call, output data has been published and is trivially 1244 available through the output_data_proxy() 1245 """ 1246 ... 1247 1248 @classmethod 1249 @abc.abstractmethod 1250 def resource_director(cls, *, input, output: _PublishingDataProxyType) -> _Resources[_PublishingDataProxyType]: 1251 """a Director factory that helps build the Session Resources for the function. 1252 1253 The Session launcher provides the director with all of the resources previously 1254 requested/negotiated/registered by the Operation. The director uses details of 1255 the operation to build the resources object required by the operation runner. 1256 1257 For the Python Context, the protocol is for the Context to call the 1258 resource_director instance method, passing input and output containers. 1259 (See, for example, gmxapi.operation.PyFunctionRunnerResources) 1260 """ 1261 ... 1262 1263 # TODO: Don't run the director. Just return the correct callable. 1264 @classmethod 1265 def operation_director(cls, *args, context: 'Context', label=None, **kwargs) -> AbstractOperation: 1266 """Dispatching Director for adding a work node. 1267 1268 A Director for input of a particular sort knows how to reconcile 1269 input with the requirements of the Operation and Context node builder. 1270 The Director (using a less flexible / more standard interface) 1271 builds the operation node using a node builder provided by the Context. 1272 1273 This is essentially the creation method, instead of __init__, but the 1274 object is created and owned by the framework, and the caller receives 1275 an OperationHandle instead of a reference to an instance of cls. 1276 1277 # TODO: We need a way to compose this functionality for arbitrary Contexts. 1278 # That likely requires traits on the Contexts, and registration of Context 1279 # implementations. It seems likely that an Operation will register Director 1280 # implementations on import, and dispatching will be moved to the Context 1281 # implementations, which can either find an appropriate OperationDirector 1282 # or raise a compatibility error. To avoid requirements on import order of 1283 # Operations and Context implementations, we can change this to a non-abstract 1284 # dispatching method, requiring registration in the global gmxapi.context 1285 # module, or get rid of this method and use something like pkg_resources 1286 # "entry point" groups for independent registration of Directors and Contexts, 1287 # each annotated with relevant traits. E.g.: 1288 # https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins 1289 """ 1290 if not isinstance(context, Context): 1291 raise exceptions.UsageError('Context instance needed for dispatch.') 1292 # TODO: use Context characteristics rather than isinstance checks. 1293 if isinstance(context, ModuleContext): 1294 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs) 1295 return construct() 1296 elif isinstance(context, SubgraphContext): 1297 construct = OperationDirector(*args, operation_details=cls, context=context, label=label, **kwargs) 1298 return construct() 1299 else: 1300 raise exceptions.ApiError('Cannot dispatch operation_director for context {}'.format(context)) 1301 1302 1303# TODO: Implement observer pattern for edge->node data flow. 1304# Step 0: implement subject interface subscribe() 1305# Step 1: implement subject interface get_state() 1306# Step 2: implement observer interface update() 1307# Step 3: implement subject interface notify() 1308# Step 4: implement observer hook to support substantial change in source that 1309# invalidates downstream fingerprinting, such as a new subgraph iteration. 1310# class Observer(object): 1311# """Abstract base class for data observers.""" 1312# def rebind(self, edge: DataEdge): 1313# """Recreate the Operation at the consuming end of the DataEdge.""" 1314 1315 1316class Future(_Future): 1317 """gmxapi data handle. 1318 1319 Future is currently more than a Future right now. (should be corrected / clarified.) 1320 Future is also a facade to other features of the data provider. 1321 1322 Future objects are the most atomic interface between Contexts. User scripts 1323 may hold Futures from which they extract data with result(). Operation output 1324 used as input for another Operation can be decomposed such that the Operation 1325 factory has only Future objects in its input. 1326 1327 TODO: ``subscribe`` method allows consumers to bind as Observers. 1328 1329 TODO: extract the abstract class for input inspection? 1330 Currently abstraction is handled through SourceResource subclassing. 1331 1332 Attributes: 1333 description (ResultDescription): Describes the result to be obtained from this Future. 1334 1335 """ 1336 1337 def __init__(self, resource_manager: SourceResource, name: str, description: ResultDescription): 1338 self.name = name 1339 if not isinstance(description, ResultDescription): 1340 raise exceptions.ValueError('Need description of requested data.') 1341 self.description = description # type: ResultDescription 1342 self.resource_manager = resource_manager 1343 1344 # Deprecated. We should not "reset" futures, but reconstitute them, but we 1345 # need to move the data model to a subscription-based system so that we can 1346 # make Futures properly immutable and issue new ones across subgraph iterations. 1347 self._number_of_resets = 0 1348 1349 def __eq__(self, other): 1350 # This function is defined because we have defined __hash__(). 1351 # Before customizing __eq__(), recall that Python objects that compare 1352 # equal should hash to the same value. 1353 # Please keep the two functions semantically correct. 1354 return object.__eq__(self, other) 1355 1356 def __hash__(self): 1357 # We cannot properly determine equivalency beyond the scope of a ResourceManager instance 1358 # without more developed data flow fingerprinting. 1359 return hash((id(self.resource_manager), self.name, self.description, self._number_of_resets)) 1360 1361 def __str__(self): 1362 return '<Future: name={}, description={}>'.format(self.name, self.description) 1363 1364 def result(self) -> ResultTypeVar: 1365 """Fetch data to the caller's Context. 1366 1367 Returns an object of the concrete type specified according to 1368 the operation that produces this Result. 1369 1370 Ensemble data are returned as a list. Scalar results or results from single 1371 member ensembles are returned as scalars. 1372 """ 1373 self.resource_manager.update_output() 1374 # Return ownership of concrete data 1375 handle = self.resource_manager.get(self.name) 1376 1377 # For intuitive use in non-ensemble cases, we represent data as bare scalars 1378 # when possible. It is easier for users to cast scalars to lists of length 1 1379 # than to introspect their own code to determine if a list of length 1 is 1380 # part of an ensemble or not. The data model will become clearer as we 1381 # develop more robust handling of multidimensional data and data flow topologies. 1382 # In the future, 1383 # we may distinguish between data of shape () and shape (1,), but we will need 1384 # to be careful with semantics. We are already starting to adopt a rule-of-thumb 1385 # that data objects assume the minimum dimensionality necessary unless told 1386 # otherwise, and we could make that a hard rule if it doesn't make other things 1387 # too difficult. 1388 if self.description.width == 1: 1389 return handle.data(member=0) 1390 else: 1391 return handle.data() 1392 1393 def _reset(self): 1394 """Mark the Future "not done" to allow reexecution. 1395 1396 Invalidates cached results, resets "done" markers in data sources, and 1397 triggers _reset recursively. 1398 1399 Note: this is a hack that is inconsistent with the plan of unique mappings 1400 of inputs to outputs, but allows a quick prototype for looping operations. 1401 """ 1402 self._number_of_resets += 1 1403 self.resource_manager.reset() 1404 1405 @property 1406 def dtype(self): 1407 return self.description.dtype 1408 1409 def __getitem__(self, item): 1410 """Get a more limited view on the Future.""" 1411 description = ResultDescription(dtype=self.dtype, width=self.description.width) 1412 # TODO: Use explicit typing when we have more thorough typing. 1413 description._dtype = None 1414 if self.description.width == 1: 1415 proxy = ProxyResourceManager(self, 1416 width=description.width, 1417 function=lambda value, key=item: value[key]) 1418 else: 1419 proxy = ProxyResourceManager(self, 1420 width=description.width, 1421 function=lambda value, key=item: 1422 [subscriptable[key] for subscriptable in value]) 1423 return proxy.future(self.name, description=description) 1424 1425 1426class OutputDataDescriptor(ProxyDataDescriptor): 1427 """Read-only data descriptor for proxied access to output data. 1428 1429 Knows how to get a Future from the resource manager. 1430 """ 1431 1432 # TODO: Reconcile the internal implementation details with the visibility and 1433 # usages of this class. 1434 1435 def __get__(self, proxy: DataProxyBase, owner): 1436 if proxy is None: 1437 # Access through class attribute of owner class 1438 return self 1439 result_description = ResultDescription(dtype=self._dtype, width=proxy.ensemble_width) 1440 1441 return proxy._resource_instance.future(name=self._name, description=result_description) 1442 1443 1444class MutableResourceDescriptor(ProxyDataDescriptor): 1445 """Accessor for rich binding interfaces. 1446 1447 Allows operations to access resources beyond the scope of the current 1448 resource manager. Used by operations whose interactions are more complicated 1449 than standard typed data flow at the scope of the current Context. 1450 1451 Instead of a Future interface, the returned object is a MutableResource with 1452 which a subscriber can collaborate with lower-level protocols. 1453 """ 1454 1455 def __get__(self, proxy: DataProxyBase, owner) -> typing.Union[MutableResource, 'MutableResourceDescriptor']: 1456 if proxy is None: 1457 # Access through class attribute of owner class. We don't have a 1458 # specified use case for that, so allow inspection of the data 1459 # descriptor instance, itself. 1460 return self 1461 # TODO: implement. 1462 # The protocol for MD extension plugins requires that the simulation operation 1463 # subscribe to the plugin. Then the Context allows the plugin to access the 1464 # MdRunner interface as the simulation is launched. 1465 # The protocol for modify_input and for mdrun to consume the TPR payload 1466 # of read_tpr or modify_input should allow us to use the gmxapi 0.0.7 1467 # WorkSpec to configure and launch a simulation, which we can do by feeding 1468 # forward and building a fused operation at the mdrun node. The information 1469 # fed forward can just be references to the inputs and parameters of the 1470 # earlier operations, with annotations so that we know the intended behavior. 1471 1472 1473def define_output_data_proxy(output_description: OutputCollectionDescription) -> typing.Type[DataProxyBase]: 1474 descriptors = {name: OutputDataDescriptor(name, description) for name, description in output_description.items()} 1475 1476 class OutputDataProxy(DataProxyBase, descriptors=descriptors): 1477 """Handler for read access to the `output` member of an operation handle. 1478 1479 Acts as a sort of ResultCollection. 1480 1481 A ResourceManager creates an OutputDataProxy instance at initialization to 1482 provide the ``output`` property of an operation handle. 1483 """ 1484 1485 # Note: the OutputDataProxy has an inherent ensemble shape in the context 1486 # in which it is used, but that is an instance characteristic, not part of this type definition. 1487 # TODO: (FR5) The current tool does not support topology changing operations. 1488 return OutputDataProxy 1489 1490 1491# Encapsulate the description of the input data flow. 1492PyFuncInput = collections.namedtuple('Input', ('args', 'kwargs', 'dependencies')) 1493 1494 1495class SinkTerminal(object): 1496 """Operation input end of a data edge. 1497 1498 In addition to the information in an InputCollectionDescription, includes 1499 topological information for the Operation node (ensemble width). 1500 1501 Collaborations: Required for creation of a DataEdge. Created with knowledge 1502 of a DataSourceCollection instance and a InputCollectionDescription. 1503 """ 1504 1505 # TODO: This clearly maps to a Builder pattern. 1506 # I think we want to get the sink terminal builder from a factory parameterized by InputCollectionDescription, 1507 # add data source collections, and then build the sink terminal for the data edge. 1508 def __init__(self, input_collection_description: InputCollectionDescription): 1509 """Define an appropriate data sink for a new operation node. 1510 1511 Resolve data sources and input description to determine connectability, 1512 topology, and any necessary implicit data transformations. 1513 1514 :param input_collection_description: Available inputs for Operation 1515 :return: Fully formed description of the Sink terminal for a data edge to be created. 1516 1517 Collaborations: Execution Context implementation. 1518 """ 1519 self.ensemble_width = 1 1520 self.inputs = input_collection_description 1521 1522 def __str__(self): 1523 return '<SinkTerminal: ensemble_width={}>'.format(self.ensemble_width) 1524 1525 def update_width(self, width: int): 1526 if not isinstance(width, int): 1527 try: 1528 width = int(width) 1529 except TypeError: 1530 raise exceptions.TypeError('Need an integer width > 0.') 1531 if width < 1: 1532 raise exceptions.ValueError('Nonsensical ensemble width: {}'.format(int(width))) 1533 if self.ensemble_width != 1: 1534 if width != self.ensemble_width: 1535 raise exceptions.ValueError( 1536 'Cannot change ensemble width {} to width {}.'.format(self.ensemble_width, width)) 1537 self.ensemble_width = width 1538 1539 def update(self, data_source_collection: DataSourceCollection): 1540 """Update the SinkTerminal with the proposed data provider.""" 1541 for name, sink_dtype in self.inputs.items(): 1542 if name not in data_source_collection: 1543 # If/when we accept data from multiple sources, we'll need some additional sanity checking. 1544 if not hasattr(self.inputs.signature.parameters[name], 'default'): 1545 raise exceptions.UsageError('No data or default for {}'.format(name)) 1546 else: 1547 # With a single data source, we need data to be in the source or have a default 1548 assert name in data_source_collection 1549 assert issubclass(sink_dtype, valid_result_types) 1550 source = data_source_collection[name] 1551 logger.debug('Updating Sink for source {}: {}.'.format(name, source)) 1552 if isinstance(source, sink_dtype): 1553 logger.debug('Source matches sink. No update necessary.') 1554 continue 1555 else: 1556 if isinstance(source, collections.abc.Iterable) and not isinstance(source, ( 1557 str, bytes, collections.abc.Mapping)): 1558 assert isinstance(source, datamodel.NDArray) 1559 if sink_dtype != datamodel.NDArray: 1560 # Source is NDArray, but sink is not. Implicitly scatter. 1561 self.update_width(len(source)) 1562 continue 1563 if hasattr(source, 'description'): 1564 source_description = typing.cast(ResultDescription, source.description) 1565 source_dtype = source_description.dtype 1566 assert isinstance(sink_dtype, type) 1567 # TODO: Handle typing of Future slices when we have a better data model. 1568 if source_dtype is not None: 1569 assert isinstance(source_dtype, type) 1570 if not issubclass(source_dtype, sink_dtype): 1571 raise exceptions.TypeError('Expected {} but got {}.'.format(sink_dtype, source_dtype)) 1572 source_width = source.description.width 1573 self.update_width(source_width) 1574 1575 1576class DataEdge(object): 1577 """State and description of a data flow edge. 1578 1579 A DataEdge connects a data source collection to a data sink. A sink is an 1580 input or collection of inputs of an operation (or fused operation). An operation's 1581 inputs may be fed from multiple data source collections, but an operation 1582 cannot be fully instantiated until all of its inputs are bound, so the DataEdge 1583 is instantiated at the same time the operation is instantiated because the 1584 required topology of a graph edge may be determined by the required topology 1585 of another graph edge. 1586 1587 A data edge has a well-defined topology only when it is terminated by both 1588 a source and sink. Creation requires that a source collection is compared to 1589 a sink description. 1590 1591 Calling code initiates edge creation by passing well-described data sources 1592 to an operation factory. The data sources may be annotated with explicit scatter 1593 or gather commands. 1594 1595 The resource manager for the new operation determines the 1596 required shape of the sink to handle all of the offered input. 1597 1598 Broadcasting 1599 and transformations of the data sources are then determined and the edge is 1600 established. 1601 1602 At that point, the fingerprint of the input data at each operation 1603 becomes available to the resource manager for the operation. The fingerprint 1604 has sufficient information for the resource manager of the operation to 1605 request and receive data through the execution context. 1606 1607 Instantiating operations and data edges implicitly involves collaboration with 1608 a Context instance. The state of a given Context or the availability of a 1609 default Context through a module function may affect the ability to instantiate 1610 an operation or edge. In other words, behavior may be different for connections 1611 being made in the scripting environment versus the running Session, and implementation 1612 details can determine whether or not new operations or data flow can occur in 1613 different code environments. 1614 """ 1615 1616 class ConstantResolver(object): 1617 def __init__(self, value): 1618 self.value = value 1619 1620 def __call__(self, member=None): 1621 return self.value 1622 1623 def __init__(self, source_collection: DataSourceCollection, sink_terminal: SinkTerminal): 1624 # Adapters are callables that transform a source and node ID to local data. 1625 # Every key in the sink has an adapter. 1626 self.adapters = {} 1627 self.source_collection = source_collection 1628 self.sink_terminal = sink_terminal 1629 for name in sink_terminal.inputs: 1630 if name not in source_collection: 1631 if hasattr(sink_terminal.inputs[name], 'default'): 1632 self.adapters[name] = self.ConstantResolver(sink_terminal.inputs[name]) 1633 else: 1634 # TODO: Initialize with multiple DataSourceCollections? 1635 raise exceptions.ValueError('No source or default for required input "{}".'.format(name)) 1636 else: 1637 source = source_collection[name] 1638 sink = sink_terminal.inputs[name] 1639 if isinstance(source, (str, bool, int, float, dict)): 1640 if issubclass(sink, (str, bool, int, float, dict)): 1641 self.adapters[name] = self.ConstantResolver(source) 1642 else: 1643 assert issubclass(sink, datamodel.NDArray) 1644 self.adapters[name] = self.ConstantResolver(datamodel.ndarray([source])) 1645 elif isinstance(source, datamodel.NDArray): 1646 if issubclass(sink, datamodel.NDArray): 1647 # TODO: shape checking 1648 # Implicit broadcast may not be what is intended 1649 self.adapters[name] = self.ConstantResolver(source) 1650 else: 1651 if source.shape[0] != sink_terminal.ensemble_width: 1652 raise exceptions.ValueError( 1653 'Implicit broadcast could not match array source to ensemble sink') 1654 else: 1655 self.adapters[name] = lambda member, source=source: source[member] 1656 elif hasattr(source, 'result'): 1657 # Handle data futures... 1658 # If the Future is part of an ensemble, result() will return a list. 1659 # Otherwise, it will return a single object. 1660 ensemble_width = source.description.width 1661 # TODO: subscribe to futures so results can be pushed. 1662 if ensemble_width == 1: 1663 self.adapters[name] = lambda member, source=source: source.result() 1664 else: 1665 self.adapters[name] = lambda member, source=source: source.result()[member] 1666 else: 1667 assert isinstance(source, EnsembleDataSource) 1668 self.adapters[name] = lambda member, source=source: source.node(member) 1669 1670 def __str__(self): 1671 return '<DataEdge: source_collection={}, sink_terminal={}>'.format(self.source_collection, self.sink_terminal) 1672 1673 def reset(self): 1674 self.source_collection.reset() 1675 1676 def resolve(self, key: str, member: int): 1677 return self.adapters[key](member=member) 1678 1679 def sink(self, node: int) -> dict: 1680 """Consume data for the specified sink terminal node. 1681 1682 Run-time utility delivers data from the bound data source(s) for the 1683 specified terminal that was configured when the edge was created. 1684 1685 Terminal node is identified by a member index number. 1686 1687 Returns: 1688 A Python dictionary of the provided inputs as local data (not Future). 1689 """ 1690 results = {} 1691 sink_ports = self.sink_terminal.inputs 1692 for key in sink_ports: 1693 results[key] = self.resolve(key, node) 1694 return results 1695 1696 1697class ResourceManager(SourceResource[_OutputDataProxyType, _PublishingDataProxyType]): 1698 """Provides data publication and subscription services. 1699 1700 Owns the data published by the operation implementation or served to consumers. 1701 Mediates read and write access to the managed data streams. 1702 1703 This ResourceManager implementation is defined in conjunction with a 1704 run-time definition of an Operation that wraps a Python callable (function). 1705 ResourceManager is instantiated with a reference to the callable. 1706 1707 When the Operation is run, the resource manager prepares resources for the wrapped 1708 function. Inputs provided to the Operation factory are provided to the 1709 function as keyword arguments. The wrapped function publishes its output 1710 through the (additional) ``output`` key word argument. This argument is 1711 a short-lived resource, prepared by the ResourceManager, with writable 1712 attributes named in the call to function_wrapper(). 1713 1714 After the Operation has run and the outputs published, the data managed 1715 by the ResourceManager is marked "done." 1716 1717 Protocols: 1718 1719 The data() method produces a read-only collection of outputs named for 1720 the Operation when the Operation's ``output`` attribute is accessed. 1721 1722 publishing_resources() can be called once during the ResourceManager lifetime 1723 to provide the ``output`` object for the wrapped function. (Used by update_output().) 1724 1725 update_output() brings the managed output data up-to-date with the input 1726 when the Operation results are needed. If the Operation has not run, an 1727 execution session is prepared with input and output arguments for the 1728 wrapped Python callable. Output is publishable only during this session. 1729 1730 TODO: This functionality should evolve to be a facet of Context implementations. 1731 There should be no more than one ResourceManager instance per work graph 1732 node in a Context. This will soon be at odds with letting the ResourceManager 1733 be owned by an operation instance handle. 1734 TODO: The publisher and data objects can be more strongly defined through 1735 interaction between the Context and clients. 1736 1737 Design notes: 1738 1739 The normative pattern for updating data is to execute a node in the work 1740 graph, passing Resources for an execution Session to an operation runner. 1741 The resources and runner are dependent on the implementation details of 1742 the operation and the execution context, so logical execution may look 1743 like the following. 1744 1745 resource_builder = ResourcesBuilder() 1746 runner_builder = RunnerBuilder() 1747 input_resource_director = input_resource_factory.director(input) 1748 output_resource_director = publishing_resource_factory.director(output) 1749 input_resource_director(resource_builder, runner_builder) 1750 output_resource_director(resource_builder, runner_builder) 1751 resources = resource_builder.build() 1752 runner = runner_builder.build() 1753 runner(resources) 1754 1755 Only the final line is intended to be literal. The preceding code, if it 1756 exists in entirety, may be spread across several code comments. 1757 1758 TODO: Data should be pushed, not pulled. 1759 Early implementations executed operation code and extracted results directly. 1760 While we need to be able to "wait for" results and alert the data provider that 1761 we are ready for input, we want to defer execution management and data flow to 1762 the framework. 1763 """ 1764 1765 @contextmanager 1766 def __publishing_context(self, ensemble_member=0) -> typing.Iterator[_PublishingDataProxyType]: 1767 """Get a context manager for resolving the data dependencies of this node. 1768 1769 The returned object is a Python context manager (used to open a `with` block) 1770 to define the scope in which the operation's output can be published. 1771 'output' type resources can be published exactly once, and only while the 1772 publishing context is active. (See operation.function_wrapper()) 1773 1774 Used internally to implement ResourceManager.publishing_resources() 1775 1776 Responsibilities of the context manager are to: 1777 * (TODO) Make sure dependencies are resolved. 1778 * Make sure outputs are marked 'done' when leaving the context. 1779 1780 """ 1781 1782 # TODO: 1783 # if self._data.done(): 1784 # raise exceptions.ProtocolError('Resources have already been published.') 1785 1786 # I don't think we want the OperationDetails to need to know about ensemble data, 1787 # (though the should probably be allowed to), so we may need a separate interface 1788 # for the resource manager with built-in scope-limiting to a single ensemble member. 1789 # Right now, one Operation handle owns one ResourceManager (which takes care of 1790 # the ensemble details), which owns one OperationDetails (which has no ensemble knowledge). 1791 # It is the responsibility of the calling code to make sure the PublishingDataProxy 1792 # gets used correctly. 1793 1794 # ref: https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager 1795 if self._done[ensemble_member]: 1796 raise exceptions.ProtocolError('Attempting to publish {}[{}] more than once.'.format(self.operation_id, ensemble_member)) 1797 1798 try: 1799 resource = self.__publishing_data_proxy(instance=weakref.proxy(self), 1800 client_id=ensemble_member) 1801 except Exception as e: 1802 logger.debug('Publishing context could not be created due to {}'.format(e)) 1803 raise e 1804 1805 yield resource 1806 # Note: The remaining lines are skipped if an exception occurs in the `with` block 1807 # for the contextmanager suite, which effectively raises at the line after 'yield'. 1808 logger.debug('Published output for {} member {}'.format(self.operation_id, ensemble_member)) 1809 self._done[ensemble_member] = True 1810 1811 def __init__(self, *, 1812 source: DataEdge, 1813 operation_id, 1814 output_description: OutputCollectionDescription, 1815 output_data_proxy: typing.Type[_OutputDataProxyType], 1816 publishing_data_proxy: typing.Type[_PublishingDataProxyType], 1817 resource_factory, 1818 runner_director, 1819 output_context: 'Context'): 1820 """Initialize a resource manager for the inputs and outputs of an operation. 1821 """ 1822 # Note: This implementation assumes there is one ResourceManager instance per data source, 1823 # so we only stash the inputs and dependency information for a single set of resources. 1824 # TODO: validate input_fingerprint as its interface becomes clear. 1825 self._input_edge = source 1826 self.ensemble_width = self._input_edge.sink_terminal.ensemble_width 1827 1828 # Node UID. 1829 self.operation_id = operation_id 1830 1831 if isinstance(output_context, Context): 1832 self._output_context = output_context 1833 else: 1834 message = 'Provide an instance of gmxapi.operation.Context for output_context' 1835 raise exceptions.UsageError(message) 1836 assert self._output_context is not None 1837 1838 self._output_data_proxy = output_data_proxy 1839 assert self._output_data_proxy is not None 1840 assert callable(self._output_data_proxy) 1841 1842 self._output_description = output_description 1843 assert self._output_description is not None 1844 1845 self.__publishing_data_proxy = publishing_data_proxy 1846 assert self.__publishing_data_proxy is not None 1847 assert callable(self.__publishing_data_proxy) 1848 1849 self._runner_director = runner_director 1850 assert self._runner_director is not None 1851 self._resource_factory = resource_factory 1852 assert self._resource_factory is not None 1853 1854 self._data = _make_datastore(output_description=self._output_description, 1855 ensemble_width=self.ensemble_width) 1856 1857 # We store a rereference to the publishing context manager implementation 1858 # in a data structure that can only produce one per Python interpreter 1859 # (using list.pop()). 1860 # TODO: reimplement as a data descriptor 1861 # so that PublishingDataProxy does not need a bound circular reference. 1862 self.__publishing_resources = [self.__publishing_context] 1863 1864 self._done = [False] * self.ensemble_width 1865 self.__operation_entrance_counter = 0 1866 1867 def width(self) -> int: 1868 return self.ensemble_width 1869 1870 def reset(self): 1871 self.__operation_entrance_counter = 0 1872 self._done = [False] * self.ensemble_width 1873 self.__publishing_resources = [self.__publishing_context] 1874 for data in self._data.values(): 1875 data.reset() 1876 self._input_edge.reset() 1877 assert self.__operation_entrance_counter == 0 1878 1879 def done(self, member=None): 1880 if member is None: 1881 return all(self._done) 1882 else: 1883 return self._done[member] 1884 1885 def set_result(self, name, value, member: int): 1886 if not isinstance(value, (str, bytes)): 1887 try: 1888 for item in value: 1889 # In this specification, it is antithetical to publish Futures. 1890 if hasattr(item, 'result'): 1891 raise exceptions.ApiError('Operation produced Future instead of real output.') 1892 except TypeError: 1893 # Ignore when `item` is not iterable. 1894 pass 1895 self._data[name].set(value=value, member=member) 1896 1897 def is_done(self, name): 1898 return self._data[name].done 1899 1900 def get(self, name: str): 1901 """ 1902 1903 Raises exceptions.ProtocolError if requested data is not local yet. 1904 Raises exceptions.ValueError if data is requested for an unknown name. 1905 """ 1906 if name not in self._data: 1907 raise exceptions.ValueError('Request for unknown data.') 1908 if not self.is_done(name): 1909 raise exceptions.ProtocolError('Data not ready.') 1910 assert isinstance(self._data[name], OutputData) 1911 return self._data[name] 1912 1913 # TODO: Normalize. This is the no-argument, no-return callable member of an 1914 # operation handle that dispatches to another Context (the operation implementation). 1915 # TODO: Allow update of a single ensemble member. As written, this always updates all ensemble members. That can 1916 # be the default behavior, but we don't want to require non-local updates in all cases. 1917 def update_output(self): 1918 """Bring the output of the bound operation up to date. 1919 1920 Execute the bound operation once if and only if it has not 1921 yet been run in the lifetime of this resource manager. 1922 1923 Used internally to implement Futures for the local operation 1924 associated with this resource manager. 1925 1926 Raises: 1927 exceptions.ApiError if operation runner fails to publish output. 1928 1929 TODO: More comprehensive error handling for operations that fail to execute. 1930 1931 TODO: We need a different implementation for an operation whose output 1932 is served by multiple resource managers. E.g. an operation whose output 1933 is available across the ensemble, but which should only be executed on 1934 a single ensemble member. 1935 """ 1936 # This code is not intended to be reentrant. We make a modest attempt to 1937 # catch unexpected reentrance, but this is not (yet) intended to be a thread-safe 1938 # resource manager implementation. 1939 # TODO: Handle checking just the ensemble members this resource manager is responsible for. 1940 # TODO: Replace with a managed observer pattern. Update once when input is available in the Context. 1941 if not self.done(): 1942 # Note: This check could also be encapsulated in a run_once decorator that 1943 # could even set a data descriptor to change behavior. 1944 self.__operation_entrance_counter += 1 1945 if self.__operation_entrance_counter > 1: 1946 raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.') 1947 if not self.done(): 1948 # Note! This is a detail of the ResourceManager in a SerialContext 1949 # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the 1950 # operation's library/implementation context. 1951 publishing_resources = self.publishing_resources() 1952 for i in range(self.ensemble_width): 1953 # TODO: rewrite the following expression as a call to a resource factory. 1954 # TODO: Consider whether the resource_factory behavior should be normalized 1955 # to always use `with` blocks to indicate the lifetime of a resource handle. 1956 # That implies that an operation handle can expire, but the operation handle 1957 # could be "yield"ed 1958 # from within the `with` block to keep the resource scope alive until the resulting 1959 # generator is exhausted. Not sure what that looks like or what the use case would be. 1960 with self.local_input(i) as input: 1961 # Note: Resources are marked "done" by the publishing system 1962 # before the following context manager finishes exiting. 1963 with publishing_resources(ensemble_member=i) as output: 1964 # self._runner(*input.args, output=output, **input.kwargs) 1965 #### 1966 # Here we can make _runner a thing that accepts session resources, and 1967 # is created by specializable builders. Separate out the expression of 1968 # inputs. 1969 # 1970 # resource_builder = OperationDetails.ResourcesBuilder(context) 1971 # runner_builder = OperationDetails.RunnerBuilder(context) 1972 # input_resource_director = self._input_resource_factory.director(input) 1973 # output_resource_director = self._publishing_resource_factory.director(output) 1974 # input_resource_director(resource_builder, runner_builder) 1975 # output_resource_director(resource_builder, runner_builder) 1976 # resources = resource_builder.build() 1977 # runner = runner_builder.build() 1978 # runner(resources) 1979 # 1980 # This resource factory signature might need to be inverted or broken up 1981 # into a builder for consistency. I.e. 1982 # option 1: Make the input and output resources with separate factories and add_resource on 1983 # the runner builder. 1984 # option 2: Pass resource_builder to input_director and then output_director. 1985 error_message = 'Got {} while executing {} for operation {}.' 1986 try: 1987 resources = self._resource_factory(input=input, output=output) 1988 except exceptions.TypeError as e: 1989 message = error_message.format(e, self._resource_factory, self.operation_id) 1990 raise exceptions.ApiError(message) from e 1991 1992 runner = self._runner_director(resources) 1993 try: 1994 runner() 1995 except Exception as e: 1996 message = error_message.format(e, runner, self.operation_id) 1997 raise exceptions.ApiError(message) from e 1998 if not self.done(): 1999 message = 'update_output implementation failed to update all outputs for {}.' 2000 message = message.format(self.operation_id) 2001 raise exceptions.ApiError(message) 2002 2003 def future(self, name: str, description: ResultDescription): 2004 """Retrieve a Future for a named output. 2005 2006 Provide a description of the expected result to check for compatibility or 2007 implicit topological conversion. 2008 2009 TODO: (FR5+) Normalize this part of the interface between operation definitions and 2010 resource managers. 2011 """ 2012 if not isinstance(name, str) or name not in self._data: 2013 raise exceptions.ValueError('"name" argument must name an output.') 2014 assert description is not None 2015 requested_dtype = description.dtype 2016 available_dtype = self._data[name]._description.dtype 2017 if requested_dtype != available_dtype: 2018 # TODO: framework to check for implicit conversions 2019 message = 'Requested Future of type {} is not compatible with available type {}.' 2020 message = message.format(requested_dtype, available_dtype) 2021 raise exceptions.ApiError(message) 2022 return Future(self, name, description) 2023 2024 def data(self) -> _OutputDataProxyType: 2025 """Get an adapter to the output resources to access results.""" 2026 return self._output_data_proxy(self) 2027 2028 @contextmanager 2029 def local_input(self, member: int = None): 2030 """In an API session, get a handle to fully resolved locally available input data. 2031 2032 Execution dependencies are resolved on creation of the context manager. Input data 2033 becomes available in the ``as`` object when entering the context manager, which 2034 becomes invalid after exiting the context manager. Resources allocated to hold the 2035 input data may be released when exiting the context manager. 2036 2037 It is left as an implementation detail whether the context manager is reusable and 2038 under what circumstances one may be obtained. 2039 """ 2040 # Localize data 2041 kwargs = self._input_edge.sink(node=member) 2042 assert 'input' not in kwargs 2043 2044 # Check that we have real data 2045 for key, value in kwargs.items(): 2046 assert not hasattr(value, 'result') 2047 assert not hasattr(value, 'run') 2048 value_list = [] 2049 if isinstance(value, list): 2050 value_list = value 2051 if isinstance(value, datamodel.NDArray): 2052 value_list = value._values 2053 if isinstance(value, collections.abc.Mapping): 2054 value_list = value.values() 2055 assert not isinstance(value_list, Future) 2056 assert not hasattr(value_list, 'result') 2057 assert not hasattr(value_list, 'run') 2058 for item in value_list: 2059 assert not hasattr(item, 'result') 2060 2061 input_pack = InputPack(kwargs=kwargs) 2062 2063 # Prepare input data structure 2064 # Note: we use 'yield' instead of 'return' for the protocol expected by 2065 # the @contextmanager decorator 2066 yield input_pack 2067 2068 def publishing_resources(self): 2069 """Get a context manager for resolving the data dependencies of this node. 2070 2071 Use the returned object as a Python context manager. 2072 'output' type resources can be published exactly once, and only while the 2073 publishing context is active. 2074 2075 Write access to publishing resources can be granted exactly once during the 2076 resource manager lifetime and conveys exclusive access. 2077 """ 2078 return self.__publishing_resources.pop() 2079 2080 2081class PyFunctionRunnerResources(collections.UserDict): 2082 """Runtime resources for Python functions. 2083 2084 Produced by a ResourceDirector for a particular Operation. 2085 """ 2086 2087 def output(self): 2088 if 'output' in self: 2089 return self['output'] 2090 else: 2091 return None 2092 2093 def input(self): 2094 return {key: value for key, value in self.items() if key != 'output'} 2095 2096 2097class PyFunctionRunner(abc.ABC): 2098 def __init__(self, *, function: typing.Callable, output_description: OutputCollectionDescription): 2099 assert callable(function) 2100 self.function = function 2101 self.output_description = output_description 2102 2103 @abc.abstractmethod 2104 def __call__(self, resources: PyFunctionRunnerResources): 2105 self.function(output=resources.output(), **resources.input()) 2106 2107 2108class CapturedOutputRunner(PyFunctionRunner): 2109 """Function runner that captures return value as output.data""" 2110 2111 def __call__(self, resources: PyFunctionRunnerResources): 2112 resources['output'].data = self.function(**resources.input()) 2113 2114 2115class OutputParameterRunner(PyFunctionRunner): 2116 """Function runner that uses output parameter to let function publish output.""" 2117 2118 def __call__(self, resources: PyFunctionRunnerResources): 2119 self.function(**resources) 2120 2121 2122def wrapped_function_runner(function, output_description: OutputCollectionDescription = None) -> PyFunctionRunner: 2123 """Get an adapter for a function to be wrapped. 2124 2125 If the function does not accept a publishing data proxy as an `output` 2126 key word argument, the returned object has a `capture_output` attribute that 2127 must be re-assigned by the calling code before calling the runner. `capture_output` 2128 must be assigned to be a callable that will receive the output of the wrapped 2129 function. 2130 2131 Returns: 2132 Callable with a signature `__call__(*args, **kwargs)` and no return value 2133 2134 Collaborations: 2135 OperationDetails.resource_director assigns the `capture_output` member of the returned object. 2136 """ 2137 assert callable(function) 2138 signature = inspect.signature(function) 2139 2140 # Implementation note: this function dispatches an implementation with the 2141 # logic below. A better factoring would be a "chain of responsibility" in 2142 # which the concrete Runners would be tried in sequence and determine internally 2143 # whether to create a runner, raise an error, or defer. 2144 2145 # Determine output details for proper dispatching. 2146 # First check for signature with output parameter. 2147 # TODO FR4: standardize typing 2148 if 'output' in signature.parameters: 2149 if not isinstance(output_description, OutputCollectionDescription): 2150 if not isinstance(output_description, collections.abc.Mapping): 2151 raise exceptions.UsageError( 2152 'Function passes output through call argument, but output is not described.') 2153 return OutputParameterRunner( 2154 function=function, 2155 output_description=OutputCollectionDescription(**output_description)) 2156 else: 2157 return OutputParameterRunner(function=function, 2158 output_description=output_description) 2159 # Next try output_description parameter or function return annotation. 2160 else: 2161 if isinstance(output_description, OutputCollectionDescription): 2162 return_type = output_description['data'].gmxapi_datatype 2163 elif output_description is not None: 2164 # output_description should be None for inferred output or 2165 # a singular mapping of the key 'data' to a gmxapi type. 2166 if not isinstance(output_description, collections.abc.Mapping) \ 2167 or set(output_description.keys()) != {'data'}: 2168 raise exceptions.ApiError( 2169 'invalid output description for wrapped function: {}'.format(output_description)) 2170 if signature.return_annotation != signature.empty: 2171 if signature.return_annotation != output_description['data']: 2172 raise exceptions.ApiError( 2173 'Wrapped function with return-value-capture provided with non-matching output description.') 2174 return_type = output_description['data'] 2175 else: 2176 # Use return type inferred from function signature. 2177 return_type = signature.return_annotation 2178 if return_type == signature.empty or return_type is None: 2179 raise exceptions.ApiError('No return annotation or output_description for {}'.format(function)) 2180 return CapturedOutputRunner(function=function, 2181 output_description=OutputCollectionDescription(data=return_type)) 2182 2183 2184# TODO: Refactor in terms of reference to a node in a Context. 2185# ResourceManager is an implementation detail of how the Context 2186# manages a node. 2187class OperationHandle(AbstractOperation[_OutputDataProxyType]): 2188 """Generic Operation handle for dynamically defined operations. 2189 2190 Define a gmxapi Operation for the functionality being wrapped by the enclosing code. 2191 2192 An Operation type definition encapsulates description of allowed inputs 2193 of an Operation. An Operation instance represents a node in a work graph 2194 with uniquely fingerprinted inputs and well-defined output. The implementation 2195 of the operation is a collaboration with the resource managers resolving 2196 data flow for output Futures, which may depend on the execution context. 2197 """ 2198 2199 def __init__(self, resource_manager: SourceResource[_OutputDataProxyType, typing.Any]): 2200 """Initialization defines the unique input requirements of a work graph node. 2201 2202 Initialization parameters map to the parameters of the wrapped function with 2203 addition(s) to support gmxapi data flow and deferred execution. 2204 2205 If provided, an ``input`` keyword argument is interpreted as a parameter pack 2206 of base input. Inputs also present as standalone keyword arguments override 2207 values in ``input``. 2208 2209 Inputs that are handles to gmxapi operations or outputs induce data flow 2210 dependencies that the framework promises to satisfy before the Operation 2211 executes and produces output. 2212 """ 2213 # TODO: When the resource manager can be kept alive by an enclosing or 2214 # module-level Context, convert to a weakref. 2215 self.__resource_manager = resource_manager 2216 # The unique identifier for the operation node allows the Context implementation 2217 # to manage the state of the handle. Reproducibility of node_uid is TBD, but 2218 # it must be unique in a Context where it references a different operation node. 2219 self.node_uid = None 2220 2221 @property 2222 def output(self) -> _OutputDataProxyType: 2223 # TODO: We can configure `output` as a data descriptor 2224 # instead of a property so that we can get more information 2225 # from the class attribute before creating an instance of OperationDetails.OutputDataProxy. 2226 # The C++ equivalence would probably be a templated free function for examining traits. 2227 return self.__resource_manager.data() 2228 2229 def run(self): 2230 """Make a single attempt to resolve data flow conditions. 2231 2232 This is a public method, but should not need to be called by users. Instead, 2233 just use the `output` data proxy for result handles, or force data flow to be 2234 resolved with the `result` methods on the result handles. 2235 2236 `run()` may be useful to try to trigger computation (such as for remotely 2237 dispatched work) without retrieving results locally right away. 2238 2239 `run()` is also useful internally as a facade to the Context implementation details 2240 that allow `result()` calls to ask for their data dependencies to be resolved. 2241 Typically, `run()` will cause results to be published to subscribing operations as 2242 they are calculated, so the `run()` hook allows execution dependency to be slightly 2243 decoupled from data dependency, as well as to allow some optimizations or to allow 2244 data flow to be resolved opportunistically. `result()` should not call `run()` 2245 directly, but should cause the resource manager / Context implementation to process 2246 the data flow graph. 2247 2248 In one conception, `run()` can have a return value that supports control flow 2249 by itself being either runnable or not. The idea would be to support 2250 fault tolerance, implementations that require multiple iterations / triggers 2251 to complete, or looping operations. 2252 """ 2253 # Note: `run()` is a synonym for `resolve` or `update` or whatever we choose 2254 # to generically describe the request to bring a node up-to-date: i.e. the 2255 # non-returning callable on the object produced by a director. 2256 self.__resource_manager.update_output() 2257 2258 2259class OperationPlaceholder(AbstractOperation): 2260 """Placeholder for Operation handle during subgraph definition.""" 2261 2262 def __init__(self, subgraph_resource_manager): 2263 ... 2264 2265 def run(self): 2266 raise exceptions.UsageError('This placeholder operation handle is not in an executable context.') 2267 2268 @property 2269 def output(self): 2270 """Allow subgraph components to be connected without instantiating actual operations.""" 2271 if not isinstance(current_context(), SubgraphContext): 2272 raise exceptions.UsageError('Invalid access to subgraph internals.') 2273 2274 2275_HandleType = typing.TypeVar('_HandleType', bound=gmx.abc.OperationReference) 2276 2277 2278class NodeBuilder(gmx.abc.NodeBuilder): 2279 """Add an operation node to be managed by a Context. 2280 2281 The NodeBuilder interface implies minimal internal logic, and instead 2282 specifies the set of information that must or may be provided to construct 2283 a node. 2284 """ 2285 2286 def __init__(self, 2287 context: 'Context', 2288 operation, 2289 label: typing.Optional[str] = None): 2290 """Initialize the base NodeBuilder for gmxapi.operation module Nodes. 2291 2292 TODO: 2293 Convert the *operation* argument to be the registrant in the Operation registry. 2294 Requires confirmation of conforming behavior for dynamically defined operations. 2295 2296 """ 2297 self.context = context 2298 self.label = label 2299 try: 2300 key = _make_registry_key(operation) 2301 except Exception as e: 2302 error = 'Could not create an operation registry key from {}'.format(operation) 2303 raise exceptions.ValueError(error) from e 2304 else: 2305 # TODO: sensibly handle dynamically defined operations. 2306 if key not in _operation_registry and not issubclass(operation, OperationDetailsBase): 2307 error = '{} must be initialized with a registered operation. Got {}.' 2308 raise exceptions.ValueError(error.format(__class__.__qualname__, operation)) 2309 self.sources = DataSourceCollection() 2310 self._input_description = None 2311 self._resource_factory = None 2312 self._runner_director = None 2313 self._handle = None 2314 self._output_factory = None 2315 2316 self._resource_manager = ResourceManager 2317 2318 def set_input_description(self, input_description: InputDescription): 2319 self._input_description = input_description 2320 2321 def set_resource_factory(self, factory): 2322 self._resource_factory = factory 2323 2324 def set_runner_director(self, factory): 2325 self._runner_director = factory 2326 2327 def set_handle(self, factory): 2328 self._handle = factory 2329 2330 def set_output_factory(self, factory: 'OutputFactory'): 2331 self._output_factory = factory 2332 2333 def set_resource_manager(self, implementation: typing.Type[ResourceManager]): 2334 """Allow overriding the default ResourceManager implementation. 2335 2336 This is a workaround until we figure out what parts of the ResourceManager 2337 could be composed, registered separately with the Context, or be customized 2338 through other dispatching. One likely part of the solution is for clients 2339 of the NodeBuilder to assert requirements of the Context. 2340 """ 2341 assert issubclass(implementation, ResourceManager) 2342 self._resource_manager = implementation 2343 2344 # TODO: Let the Context use the handle factory to produce a dynamically typed handle, 2345 # and figure out the right way to annotate the return value. 2346 def build(self): 2347 """Create node and return a handle of the appropriate type.""" 2348 2349 # Check for the ability to instantiate operations. 2350 missing_details = list() 2351 for builder_resource in ['input_description', 2352 'resource_factory', 2353 'runner_director', 2354 'handle', 2355 'output_factory']: 2356 detail = '_' + builder_resource 2357 if getattr(self, detail, None) is None: 2358 missing_details.append(builder_resource) 2359 if len(missing_details) > 0: 2360 raise exceptions.UsageError( 2361 'Missing details needed for operation node: {}'.format( 2362 ', '.join(missing_details) 2363 )) 2364 2365 assert hasattr(self._input_description, 'signature') 2366 input_sink = SinkTerminal(self._input_description.signature()) 2367 input_sink.update(self.sources) 2368 logger.debug('SinkTerminal configured: {}'.format(SinkTerminal)) 2369 edge = DataEdge(self.sources, input_sink) 2370 logger.debug('Created data edge {} with Sink {}'.format(edge, edge.sink_terminal)) 2371 # TODO: Fingerprinting: Each operation instance has unique output based on the unique input. 2372 # input_data_fingerprint = edge.fingerprint() 2373 2374 # Set up output proxy. 2375 assert hasattr(self._input_description, 'make_uid') 2376 uid = self._input_description.make_uid(edge) 2377 # TODO: ResourceManager should fetch the relevant factories from the Context 2378 # instead of getting an OperationDetails instance. 2379 output_data_proxy = self._output_factory.output_proxy() 2380 output_description = self._output_factory.output_description() 2381 publishing_data_proxy = self._output_factory.publishing_data_proxy() 2382 manager = self._resource_manager(output_context=self.context, 2383 source=edge, 2384 operation_id=uid, 2385 output_data_proxy=output_data_proxy, 2386 output_description=output_description, 2387 publishing_data_proxy=publishing_data_proxy, 2388 resource_factory=self._resource_factory, 2389 runner_director=self._runner_director) 2390 self.context.work_graph[uid] = manager 2391 # TODO: Replace with a call to Node.handle() 2392 handle = self._handle(self.context.work_graph[uid]) 2393 handle.node_uid = uid 2394 return handle 2395 2396 def add_input(self, name, source): 2397 # TODO: We can move some input checking here as the data model matures. 2398 self.sources[name] = source 2399 2400 2401class InputPack(object): 2402 """Input container for data sources provided to resource factories. 2403 2404 When gmxapi.operation Contexts provide run time inputs to operations, 2405 instances of this class are provided to the operation's registered 2406 Resource factory. 2407 2408 Attributes: 2409 kwargs (dict): collection of named data sources. 2410 """ 2411 2412 def __init__(self, kwargs: typing.Mapping[str, SourceTypeVar]): 2413 self.kwargs = kwargs 2414 2415 2416class Context(gmx.abc.Context): 2417 """API Context. 2418 2419 All gmxapi data and operations are owned by a Context instance. The Context 2420 manages the details of how work is run and how data is managed. 2421 2422 Context implementations are not required to inherit from gmxapi.context.Context, 2423 but this class definition serves to specify the current Context API. 2424 2425 If subclassing is used to implement new Contexts, be sure to initialize the 2426 base class when providing a new __init__ 2427 """ 2428 2429 def node(self, node_id) -> Node: 2430 if node_id in self.labels: 2431 return self.labels[node_id] 2432 elif node_id in self.work_graph: 2433 return self.work_graph[node_id] 2434 else: 2435 raise exceptions.ValueError('Could not find a node identified by {}'.format(node_id)) 2436 2437 def __init__(self): 2438 self.operations = dict() 2439 self.labels = dict() 2440 self.work_graph = collections.OrderedDict() 2441 2442 @abc.abstractmethod 2443 def node_builder(self, *, operation, 2444 label: typing.Optional[str] = None) -> NodeBuilder: 2445 """Get a builder for a new work graph node. 2446 2447 Nodes are elements of computational work, with resources and execution 2448 managed by the Context. The Context handles parallelism resources, data 2449 placement, work scheduling, and data flow / execution dependencies. 2450 2451 This method is used by Operation director code and helper functions to 2452 add work to the graph. 2453 2454 Arguments: 2455 operation: a registered gmxapi operation 2456 label: optional user-provided identifier to provide human-readable node locators. 2457 2458 """ 2459 ... 2460 # TODO: *node()* accessor. 2461 # @abc.abstractmethod 2462 # def node(self, node_identifier) -> AbstractOperation: 2463 # ... 2464 2465 2466class ModuleNodeBuilder(NodeBuilder): 2467 """Builder for work nodes in gmxapi.operation.ModuleContext.""" 2468 2469 2470class ModuleContext(Context): 2471 """Context implementation for the gmxapi.operation module. 2472 2473 """ 2474 __version__ = 0 2475 2476 def node_builder(self, operation, label=None) -> NodeBuilder: 2477 """Get a builder for a new work node to add an operation in this context.""" 2478 if label is not None: 2479 if label in self.labels: 2480 raise exceptions.ValueError('Label {} is already in use.'.format(label)) 2481 else: 2482 # The builder should update the labeled node when it is done. 2483 self.labels[label] = None 2484 2485 return ModuleNodeBuilder(context=weakref.proxy(self), operation=operation, label=label) 2486 2487 2488# Context stack. 2489__current_context = [ModuleContext()] 2490 2491 2492def current_context() -> Context: 2493 """Get a reference to the currently active Context. 2494 2495 The imported gmxapi.context module maintains some state for the convenience 2496 of the scripting environment. Internally, all gmxapi activity occurs under 2497 the management of an API Context, explicitly or implicitly. Some actions or 2498 code constructs will generate separate contexts or sub-contexts. This utility 2499 command retrieves a reference to the currently active Context. 2500 """ 2501 return __current_context[-1] 2502 2503 2504def push_context(context) -> Context: 2505 """Enter a sub-context by pushing a context to the global context stack. 2506 """ 2507 __current_context.append(context) 2508 return current_context() 2509 2510 2511def pop_context() -> Context: 2512 """Exit the current Context by popping it from the stack.""" 2513 return __current_context.pop() 2514 2515 2516class OutputFactory(object): 2517 """Encapsulate the details of Operation output implementation in the gmxapi.operation Context. 2518 2519 Currently, OutputFactory objects are containers that compose functionality 2520 with which to implement the required internal interface. 2521 """ 2522 2523 def __init__(self, *, 2524 output_proxy: typing.Callable[[SourceResource], _OutputDataProxyType], 2525 output_description: OutputCollectionDescription, 2526 publishing_data_proxy: typing.Callable[[SourceResource, ClientID], _PublishingDataProxyType]): 2527 """Package the output details for an operation. 2528 2529 Arguments: 2530 output_proxy: factory to produce the *output* facet of an operation instance (node) 2531 output_description: fully formed output description 2532 publishing_data_proxy: factory to produce the run time output publishing resources 2533 2534 """ 2535 if not callable(output_proxy): 2536 raise exceptions.ValueError('output_proxy argument must be a callable.') 2537 if not callable(publishing_data_proxy): 2538 raise exceptions.ValueError('publishing_data_proxy argument must be a callable.') 2539 if not isinstance(output_description, OutputCollectionDescription): 2540 raise exceptions.ValueError('output_description must be an instance of ' 2541 'gmxapi.operation.OutputCollectionDescription') 2542 self._output_proxy = output_proxy 2543 self._output_description = output_description 2544 self._publishing_data_proxy = publishing_data_proxy 2545 2546 def output_proxy(self) -> typing.Callable[[_OutputDataProxyType, SourceResource], _OutputDataProxyType]: 2547 return self._output_proxy 2548 2549 def output_description(self) -> OutputCollectionDescription: 2550 return self._output_description 2551 2552 def publishing_data_proxy(self) -> typing.Callable[[SourceResource, ClientID], 2553 _PublishingDataProxyType]: 2554 return self._publishing_data_proxy 2555 2556 2557# TODO: Refactor in terms of gmx.abc.OperationDirector[_Op, gmx.operation.Context] 2558# Normalizing this OperationDirector may require other updates to the function_wrapper facilities. 2559class OperationDirector(object): 2560 """Direct the construction of an operation node in the gmxapi.operation module Context. 2561 2562 Collaboration: used by OperationDetails.operation_director, which 2563 will likely dispatch to different implementations depending on 2564 requirements of work or context. 2565 """ 2566 2567 def __init__(self, 2568 *args, 2569 operation_details: typing.Type[OperationDetailsBase], 2570 context: Context, 2571 label=None, 2572 **kwargs): 2573 self.operation_details = operation_details 2574 self.context = weakref.proxy(context) 2575 self.args = args 2576 self.kwargs = kwargs 2577 self.label = label 2578 2579 def __call__(self) -> AbstractOperation: 2580 builder = self.context.node_builder(operation=self.operation_details, label=self.label) 2581 2582 builder.set_resource_factory(self.operation_details.resource_director) 2583 builder.set_input_description(self.operation_details) 2584 builder.set_handle(OperationHandle) 2585 2586 operation_details = self.operation_details() 2587 node_input_factory = operation_details.signature().bind 2588 data_source_collection = node_input_factory(*self.args, **self.kwargs) 2589 for name, source in data_source_collection.items(): 2590 builder.add_input(name, source) 2591 2592 def runner_director(resources): 2593 def runner(): 2594 operation_details(resources) 2595 return runner 2596 2597 builder.set_runner_director(runner_director) 2598 # Note that the call-backs held by OutputFactory cannot be annotated with 2599 # key word arguments under PEP 484, producing a weak warning in some cases. 2600 # We can consider more in the future how to balance call-back simplicity, 2601 # type annotation, and key word explicitness in helper functions like these. 2602 output_factory = OutputFactory(output_description=operation_details.output_description(), 2603 output_proxy=operation_details.output_data_proxy, 2604 publishing_data_proxy=operation_details.publishing_data_proxy) 2605 builder.set_output_factory(output_factory) 2606 2607 handle = builder.build() 2608 return handle 2609 2610 2611def _make_datastore(output_description: OutputCollectionDescription, ensemble_width: int): 2612 """Create the data store for an operation with the described output. 2613 2614 Create a container to hold the resources for an operation node. 2615 Used internally by the resource manager when setting up the node. 2616 Evolution of the C++ framework for creating the Operation SessionResources 2617 object will inform the future of this and the resource_director method, but 2618 this data store is how the Context manages output data sources for resources 2619 that it manages. 2620 """ 2621 2622 datastore = collections.OrderedDict() 2623 for name, dtype in output_description.items(): 2624 assert isinstance(dtype, type) 2625 result_description = ResultDescription(dtype=dtype, width=ensemble_width) 2626 datastore[name] = OutputData(name=name, description=result_description) 2627 return datastore 2628 2629 2630# TODO: For outputs, distinguish between "results" and "events". 2631# Both are published to the resource manager in the same way, but the relationship 2632# with subscribers is potentially different. 2633def function_wrapper(output: dict = None): 2634 # Suppress warnings in the example code. 2635 # noinspection PyUnresolvedReferences 2636 """Generate a decorator for wrapped functions with signature manipulation. 2637 2638 New function accepts the same arguments, with additional arguments required by 2639 the API. 2640 2641 The new function returns an object with an ``output`` attribute containing the named outputs. 2642 2643 Example: 2644 2645 >>> @function_wrapper(output={'spam': str, 'foo': str}) 2646 ... def myfunc(parameter: str = None, output=None): 2647 ... output.spam = parameter 2648 ... output.foo = parameter + ' ' + parameter 2649 ... 2650 >>> operation1 = myfunc(parameter='spam spam') 2651 >>> assert operation1.output.spam.result() == 'spam spam' 2652 >>> assert operation1.output.foo.result() == 'spam spam spam spam' 2653 2654 Arguments: 2655 output (dict): output names and types 2656 2657 If ``output`` is provided to the wrapper, a data structure will be passed to 2658 the wrapped functions with the named attributes so that the function can easily 2659 publish multiple named results. Otherwise, the ``output`` of the generated operation 2660 will just capture the return value of the wrapped function. 2661 2662 .. todo:: gmxapi typing stub file(s). 2663 The way this wrapper uses parameter annotations is not completely 2664 compatible with static type checking (PEP 484). If we decide to 2665 keep the convenience functionality by which operation details are 2666 inferred from parameter annotations, we should provide a separate 2667 stub file (.pyi) to support static type checking of the API. 2668 """ 2669 2670 if output is not None and not isinstance(output, collections.abc.Mapping): 2671 raise exceptions.TypeError('If provided, `output` argument must be a mapping of data names to types.') 2672 2673 # TODO: (FR5+) gmxapi operations need to allow a context-dependent way to generate an implementation with input. 2674 # This function wrapper reproduces the wrapped function's kwargs, but does not allow chaining a 2675 # dynamic `input` kwarg and does not dispatch according to a `context` kwarg. We should allow 2676 # a default implementation and registration of alternate implementations. We don't have to do that 2677 # with functools.singledispatch, but we could, if we add yet another layer to generate a wrapper 2678 # that takes the context as the first argument. (`singledispatch` inspects the first argument rather 2679 # that a named argument) 2680 2681 # Implementation note: The closure of the current function is used to 2682 # dynamically define several classes that support the operation to be 2683 # created by the returned decorator. 2684 2685 def decorator(function) -> typing.Callable: 2686 # Explicitly capture `function` and `output` references. 2687 provided_output_map = output 2688 2689 # Note: Allow operations to be defined entirely in template headers to facilitate 2690 # compile-time optimization of fused operations. Consider what distinction, if any, 2691 # exists between a fused operation and a more basic operation. Probably it amounts 2692 # to aspects related to interaction with the Context that get combined in a fused 2693 # operation, such as the resource director, builder, etc. 2694 class OperationDetails(OperationDetailsBase): 2695 # Warning: function.__qualname__ is not rigorous since function may be in a local scope. 2696 # TODO: Improve base identifier. 2697 # Suggest registering directly in the Context instead of in this local class definition. 2698 __basename = '.'.join((str(function.__module__), function.__qualname__)) 2699 __last_uid = 0 2700 _input_signature_description = InputCollectionDescription.from_function(function) 2701 # TODO: Separate the class and instance logic for the runner. 2702 # Logically, the runner is a detail of a context-specific implementation class, 2703 # though the output is not generally fully knowable until an instance is initialized 2704 # for a certain input fingerprint. 2705 # Note: We are almost at a point where this class can be subsumed into two 2706 # possible return types for wrapped_function_runner, acting as an operation helper. 2707 _runner = wrapped_function_runner(function, provided_output_map) 2708 _output_description = _runner.output_description 2709 _output_data_proxy_type = define_output_data_proxy(_output_description) 2710 _publishing_data_proxy_type = define_publishing_data_proxy(_output_description) 2711 _SourceResource = SourceResource[_output_data_proxy_type, _publishing_data_proxy_type] 2712 2713 @classmethod 2714 def name(cls) -> str: 2715 return cls.__basename.split('.')[-1] 2716 2717 @classmethod 2718 def namespace(cls) -> str: 2719 return cls.__basename.rstrip('.' + cls.name()) 2720 2721 @classmethod 2722 def director(cls, context: _Context): 2723 return cls.operation_director 2724 2725 @classmethod 2726 def signature(cls) -> InputCollectionDescription: 2727 """Mapping of named inputs and input type. 2728 2729 Used to determine valid inputs before an Operation node is created. 2730 2731 Overrides OperationDetailsBase.signature() to provide an 2732 implementation for the bound operation. 2733 """ 2734 return cls._input_signature_description 2735 2736 def output_description(self) -> OutputCollectionDescription: 2737 """Mapping of available outputs and types for an existing Operation node. 2738 2739 Overrides OperationDetailsBase.output_description() to provide an 2740 implementation for the bound operation. 2741 """ 2742 return self._output_description 2743 2744 def publishing_data_proxy(self, *, 2745 instance: _SourceResource, 2746 client_id: int 2747 ) -> _publishing_data_proxy_type: 2748 """Factory for Operation output publishing resources. 2749 2750 Used internally when the operation is run with resources provided by instance. 2751 2752 Overrides OperationDetailsBase.publishing_data_proxy() to provide an 2753 implementation for the bound operation. 2754 """ 2755 assert isinstance(instance, ResourceManager) 2756 return self._publishing_data_proxy_type(instance=instance, client_id=client_id) 2757 2758 def output_data_proxy(self, instance: _SourceResource) -> _output_data_proxy_type: 2759 assert isinstance(instance, ResourceManager) 2760 return self._output_data_proxy_type(instance=instance) 2761 2762 def __call__(self, resources: PyFunctionRunnerResources): 2763 """Execute the operation with provided resources. 2764 2765 Resources are prepared in an execution context with aid of resource_director() 2766 2767 After the first call, output data has been published and is trivially 2768 available through the output_data_proxy() 2769 2770 Overrides OperationDetailsBase.__call__(). 2771 """ 2772 self._runner(resources) 2773 2774 @classmethod 2775 def make_uid(cls, input): 2776 """The unique identity of an operation node tags the output with respect to the input. 2777 2778 Combines information on the Operation details and the input to uniquely 2779 identify the Operation node. 2780 2781 Arguments: 2782 input : A (collection of) data source(s) that can provide Fingerprints. 2783 2784 Used internally by the Context to manage ownership of data sources, to 2785 locate resources for nodes in work graphs, and to manage serialization, 2786 deserialization, and checkpointing of the work graph. 2787 2788 The UID is a detail of the generic Operation that _should_ be independent 2789 of the Context details to allow the framework to manage when and where 2790 an operation is executed. 2791 2792 Design notes on further refinement: 2793 TODO: Operations should not single-handedly determine their own uniqueness 2794 (but they should participate in the determination with the Context). 2795 2796 Context implementations should be allowed to optimize handling of 2797 equivalent operations in different sessions or work graphs, but we do not 2798 yet TODO: guarantee that UIDs are globally unique! 2799 2800 The UID should uniquely indicate an operation node based on that node's input. 2801 We need input fingerprinting to identify equivalent nodes in a work graph 2802 or distinguish nodes across work graphs. 2803 2804 """ 2805 uid = str(cls.__basename) + str(cls.__last_uid) 2806 cls.__last_uid += 1 2807 return uid 2808 2809 @classmethod 2810 def resource_director(cls, *, input=None, 2811 output: _publishing_data_proxy_type = None) -> PyFunctionRunnerResources: 2812 """a Director factory that helps build the Session Resources for the function. 2813 2814 The Session launcher provides the director with all of the resources previously 2815 requested/negotiated/registered by the Operation. The director uses details of 2816 the operation to build the resources object required by the operation runner. 2817 2818 For the Python Context, the protocol is for the Context to call the 2819 resource_director instance method, passing input and output containers. 2820 2821 Raises: 2822 exceptions.TypeError if provided resource type does not match input signature. 2823 """ 2824 resources = PyFunctionRunnerResources() 2825 resources.update(input.kwargs) 2826 resources.update({'output': output}) 2827 2828 # TODO: Remove this hack when we can better handle Futures of Containers and Future slicing. 2829 for name in resources: 2830 if isinstance(resources[name], (list, tuple)): 2831 resources[name] = datamodel.ndarray(resources[name]) 2832 2833 # Check data compatibility 2834 for name, value in resources.items(): 2835 if name != 'output': 2836 expected = cls.signature()[name] 2837 got = type(value) 2838 if got != expected: 2839 raise exceptions.TypeError( 2840 'Expected {} but got {} for {} resource {}.'.format(expected, 2841 got, 2842 cls.__basename, 2843 name)) 2844 return resources 2845 2846 # TODO: (FR4) Update annotations with gmxapi data types. E.g. return -> Future. 2847 @functools.wraps(function) 2848 def helper(*args, context=None, **kwargs): 2849 # Description of the Operation input (and output) occurs in the 2850 # decorator closure. By the time this factory is (dynamically) defined, 2851 # the OperationDetails and ResourceManager are well defined, but not 2852 # yet instantiated. 2853 # Inspection of the offered input occurs when this factory is called, 2854 # and OperationDetails, ResourceManager, and Operation are instantiated. 2855 2856 # This operation factory is specialized for the default package Context. 2857 if context is None: 2858 context = current_context() 2859 else: 2860 raise exceptions.ApiError('Non-default context handling not implemented.') 2861 2862 # This calls a dispatching function that may not be able to reconcile the input 2863 # and Context capabilities. This is the place to handle various exceptions for 2864 # whatever reasons this reconciliation cannot occur. 2865 handle = OperationDetails.operation_director(*args, context=context, label=None, **kwargs) 2866 2867 # TODO: NOW: The input fingerprint describes the provided input 2868 # as (a) ensemble input, (b) static, (c) future. By the time the 2869 # operation is instantiated, the topology of the node is known. 2870 # When compared to the InputCollectionDescription, the data compatibility 2871 # can be determined. 2872 2873 return handle 2874 2875 # to do: The factory itself needs to be able to register a factory with 2876 # the context that will be responsible for the Operation handle. 2877 # The factories need to be able to serve as dispatchers for themselves, 2878 # since an operation in one context may need to be reconstituted in a 2879 # different context. 2880 # The dispatching factory produces a director for a Context, 2881 # which will register a factory with the operation in that context. 2882 2883 # The factory function has a DirectorFactory. Director instances talk to a NodeBuilder for a Context to 2884 # get handles to new operation nodes managed by the context. Part of that process includes registering 2885 # a DirectorFactory with the Context. 2886 return helper 2887 2888 return decorator 2889 2890 2891class GraphVariableDescriptor(object): 2892 def __init__(self, name: str = None, dtype=None, default=None): 2893 self.name = name 2894 self.dtype = dtype 2895 self.default = default 2896 self.state = None 2897 2898 @property 2899 def internal_name(self): 2900 try: 2901 return '_' + self.name 2902 except TypeError: 2903 return None 2904 2905 def __get__(self, instance, owner): 2906 if instance is None: 2907 # Access is through the class attribute of the owning class. 2908 # Allows the descriptor itself to be inspected or reconfigured after 2909 # class definition. 2910 # TODO: There is probably some usage checking that can be performed here. 2911 return self 2912 try: 2913 value = getattr(instance, self.internal_name) 2914 except AttributeError: 2915 value = self.default 2916 # Lazily initialize the instance value from the class default. 2917 if value is not None: 2918 try: 2919 setattr(instance, self.internal_name, value) 2920 except Exception as e: 2921 message = 'Could not assign default value to {} attribute of {}'.format( 2922 self.internal_name, 2923 instance) 2924 raise exceptions.ApiError(message) from e 2925 return value 2926 2927 # Implementation note: If we have a version of the descriptor class with no `__set__` method, 2928 # it is a non-data descriptor that can be overridden by a data descriptor on an instance. 2929 # This could be one way to handle the polymorphism associated with state changes. 2930 def __set__(self, instance, value): 2931 if instance._editing: 2932 # Update the internal connections defining the subgraph. 2933 setattr(instance, self.internal_name, value) 2934 else: 2935 raise AttributeError('{} not assignable on {}'.format(self.name, instance)) 2936 2937 2938class GraphMeta(type): 2939 """Meta-class for gmxapi data flow graphs and subgraphs. 2940 2941 Used to implement ``subgraph`` as GraphMeta.__new__(...). 2942 Also allows subgraphs to be defined as Python class definitions by inheriting 2943 from Subgraph or by using the ``metaclass=GraphMeta`` hint in the class 2944 statement arguments. 2945 2946 The Python class creation protocol allows Subgraphs to be defined in as follows. 2947 2948 See the Subgraph class documentation for customization of instances through 2949 the Python context manager protocol. 2950 """ 2951 _prepare_keywords = ('variables',) 2952 2953 # TODO: Python 3.7.2 introduces typing.OrderedDict 2954 # In practice, we are using collections.OrderedDict, but we should use the generic 2955 # ABC from the typing module to avoid being overly restrictive with type hints. 2956 try: 2957 from typing import OrderedDict 2958 except ImportError: 2959 from collections import OrderedDict 2960 2961 @classmethod 2962 def __prepare__(mcs, name, bases, variables: OrderedDict = None, **kwargs): 2963 """Prepare the class namespace. 2964 2965 Keyword Args: 2966 variables: mapping of persistent graph variables to type / default value (optional) 2967 """ 2968 # Python runs this before executing the class body of Subgraph or its 2969 # subclasses. This is our chance to handle key word arguments given in the 2970 # class declaration. 2971 2972 if kwargs is not None: 2973 for keyword in kwargs: 2974 raise exceptions.UsageError('Unexpected key word argument: {}'.format(keyword)) 2975 2976 namespace = collections.OrderedDict() 2977 2978 if variables is not None: 2979 if isinstance(variables, collections.abc.Mapping): 2980 for name, value in variables.items(): 2981 if isinstance(value, type): 2982 dtype = value 2983 if hasattr(value, 'default'): 2984 default = value.default 2985 else: 2986 default = None 2987 else: 2988 default = value 2989 if hasattr(default, 'dtype'): 2990 dtype = default.dtype 2991 else: 2992 dtype = type(default) 2993 namespace[name] = GraphVariableDescriptor(name, default=default, dtype=dtype) 2994 # Note: we are not currently using the hook used by `inspect` 2995 # to annotate with the class that defined the attribute. 2996 # namespace[name].__objclass__ = mcs 2997 assert not hasattr(namespace[name], '__objclass__') 2998 else: 2999 raise exceptions.ValueError('"variables" must be a mapping of graph variables to types or defaults.') 3000 3001 return namespace 3002 3003 def __new__(cls, name, bases, namespace, **kwargs): 3004 for key in kwargs: 3005 if key not in GraphMeta._prepare_keywords: 3006 raise exceptions.ApiError('Unexpected class creation keyword: {}'.format(key)) 3007 return type.__new__(cls, name, bases, namespace) 3008 3009 # TODO: This is keyword argument stripping is not necessary in more recent Python versions. 3010 # When Python minimum required version is increased, check if we can remove this. 3011 def __init__(cls, name, bases, namespace, **kwargs): 3012 for key in kwargs: 3013 if key not in GraphMeta._prepare_keywords: 3014 raise exceptions.ApiError('Unexpected class initialization keyword: {}'.format(key)) 3015 super().__init__(name, bases, namespace) 3016 3017 3018class SubgraphNodeBuilder(NodeBuilder): 3019 3020 def __init__(self, 3021 context: 'SubgraphContext', 3022 operation, 3023 label: typing.Optional[str] = None): 3024 super().__init__(context, operation, label) 3025 3026 def add_input(self, name: str, source): 3027 """Add an input resource for the Node under construction. 3028 3029 Extends NodeBuilder.add_input() 3030 """ 3031 # Inspect inputs. 3032 # * Are they from outside the subgraph? 3033 # * Subgraph variables? 3034 # * Subgraph internal nodes? 3035 # Inputs from outside the subgraph are (provisionally) subgraph inputs. 3036 # Inputs that are subgraph variables or subgraph internal nodes mark operations that will need to be re-run. 3037 # For first implementation, let all operations be recreated, but we need to 3038 # manage the right input proxies. 3039 # For zeroeth implementation, try just tracking the entities that need a reset() method called. 3040 assert isinstance(self.context, SubgraphContext) 3041 if hasattr(source, 'reset'): 3042 self.context.add_resetter(source.reset) 3043 elif hasattr(source, '_reset'): 3044 self.context.add_resetter(source._reset) 3045 super().add_input(name, source) 3046 3047 def build(self) -> OperationPlaceholder: 3048 """Get a reference to the internal node in the subgraph definition. 3049 3050 In the SubgraphContext, these handles cannot represent uniquely identifiable 3051 results. They are placeholders for relative positions in graphs that are 3052 not defined until the subgraph is being executed. 3053 3054 Such references should be tracked and invalidated when exiting the 3055 subgraph context. Within the subgraph context, they are used to establish 3056 the recipe for updating the subgraph's outputs or persistent data during 3057 execution. 3058 """ 3059 # Placeholder handles in the subgraph definition don't have real resource managers. 3060 # Check for the ability to instantiate operations. 3061 handle = super().build() 3062 # handle = OperationPlaceholder() 3063 return typing.cast(OperationPlaceholder, handle) 3064 3065 3066class SubgraphContext(Context): 3067 """Provide a Python context manager in which to set up a graph of operations. 3068 3069 Allows operations to be configured without adding nodes to the global execution 3070 context. 3071 """ 3072 3073 def __init__(self): 3074 super().__init__() 3075 self.resetters = set() 3076 3077 def node_builder(self, operation, label=None) -> NodeBuilder: 3078 if label is not None: 3079 if label in self.labels: 3080 raise exceptions.ValueError('Label {} is already in use.'.format(label)) 3081 else: 3082 # The builder should update the labeled node when it is done. 3083 self.labels[label] = None 3084 3085 return SubgraphNodeBuilder(context=weakref.proxy(self), operation=operation, label=label) 3086 3087 def add_resetter(self, function): 3088 assert callable(function) 3089 self.resetters.add(function) 3090 3091 3092class Subgraph(object, metaclass=GraphMeta): 3093 """ 3094 3095 When subclassing from Subgraph, aspects of the subgraph's data ports can be 3096 specified with keyword arguments in the class statement. Example:: 3097 3098 >>> class MySubgraph(Subgraph, variables={'int_with_default': 1, 'boolData': bool}): pass 3099 ... 3100 3101 The key word *variables* is used in the class declaration to map the types 3102 of subgraph Variables with (optional) default values. 3103 3104 Execution model: 3105 Subgraph execution must follow a well-defined protocol in order to sensibly 3106 resolve data Futures at predictable points. Note that subgraphs act as operations 3107 that can be automatically redefined at run time (in limited cases), such as 3108 to automatically form iteration chains to implement "while" loops. We refer 3109 to one copy / iteration / generation of a subgraph as an "element" below. 3110 3111 When a subgraph element begins execution, each of its variables with an 3112 "updated" state from the previous iteration has the "updated" state moved 3113 to the new element's "initial" state, and the "updated" state is voided. 3114 3115 Subgraph.next() appends an element of the subgraph to the chain. Subsequent 3116 calls to Subgraph.run() bring the new outputs up to date (and then call next()). 3117 Thus, for a subgraph with an output called ``is_converged``, calling 3118 ``while (not subgraph.is_converged): subgraph.run()`` has the desired effect, 3119 but is likely suboptimal for execution. Instead use the gmxapi while_loop. 3120 3121 If the subgraph is not currently executing, it can be in one of two states: 3122 "editing" or "ready". In the "ready" state, class and instance Variables 3123 cannot be assigned, but can be read through the data descriptors. In this 3124 state, the descriptors only have a single state. 3125 3126 If "editing," variables on the class object can be assigned to update the 3127 data flow defined for the subgraph. While "editing", reading or writing 3128 instance Variables is undefined behavior. 3129 3130 When "editing" begins, each variable is readable as a proxy to the "initial" 3131 state in an element. Assignments while "editing" put variables in temporary 3132 states accessible only during editing. 3133 When "editing" finishes, the "updated" output data source of each 3134 variable for the element is set, if appropriate. 3135 3136 # TODO: Use a get_context to allow operation factories or accessors to mark 3137 # references for update/annotation when exiting the 'with' block. 3138 """ 3139 3140 3141class SubgraphBuilder(object): 3142 """Helper for defining new Subgraphs. 3143 3144 Manages a Python context in which to define a new Subgraph type. Can be used 3145 in a Python ``with`` construct exactly once to provide the Subgraph class body. 3146 When the ``with`` block is exited (or ``build()`` is called explicitly), a 3147 new type instance becomes available. Subsequent calls to SubgraphBuilder.__call__(self, ...) 3148 are dispatched to the Subgraph constructor. 3149 3150 Outside of the ``with`` block, read access to data members is proxied to 3151 descriptors on the built Subgraph. 3152 3153 Instances of SubgraphBuilder are returned by the ``subgraph()`` utility function. 3154 """ 3155 3156 def __init__(self, variables): 3157 self.__dict__.update({'variables': variables, 3158 '_staging': collections.OrderedDict(), 3159 '_editing': False, 3160 '_subgraph_context': None, 3161 '_subgraph_instance': None, 3162 '_fused_operation': None, 3163 '_factory': None}) 3164 # Return a placeholder that we can update during iteration. 3165 # Long term, this is probably implemented with data descriptors 3166 # that will be moved to a new Subgraph type object. 3167 for name in self.variables: 3168 if not isinstance(self.variables[name], Future): 3169 self.variables[name] = gmx.make_constant(self.variables[name]) 3170 3171 # class MySubgraph(Subgraph, variables=variables): 3172 # pass 3173 # 3174 # self._subgraph_instance = MySubgraph() 3175 3176 def __getattr__(self, item): 3177 if self._editing: 3178 if item in self.variables: 3179 if item in self._staging: 3180 logger.debug('Read access to intermediate value of subgraph variable {}'.format(item)) 3181 return self._staging[item] 3182 else: 3183 logger.debug('Read access to subgraph variable {}'.format(item)) 3184 return self.variables[item] 3185 else: 3186 raise AttributeError('Invalid attribute: {}'.format(item)) 3187 else: 3188 # TODO: this is not quite the described interface... 3189 return lambda obj: obj.values[item] 3190 3191 def __setattr__(self, key, value): 3192 """Part of the builder interface.""" 3193 if key in self.__dict__: 3194 self.__dict__[key] = value 3195 else: 3196 if self._editing: 3197 self.add_update(key, value) 3198 else: 3199 raise exceptions.UsageError('Subgraph is not in an editable state.') 3200 3201 def add_update(self, key, value): 3202 """Add a variable update to the internal subgraph.""" 3203 if key not in self.variables: 3204 raise AttributeError('No such attribute: {}'.format(key)) 3205 if not self._editing: 3206 raise exceptions.UsageError('Subgraph is not in an editable state.') 3207 # Else, stage the potential final value for the iteration. 3208 logger.debug('Staging subgraph update {} = {}'.format(key, value)) 3209 # Return a placeholder that we can update during iteration. 3210 # Long term, this is probably implemented with data descriptors 3211 # that will be moved to a new Subgraph type object. 3212 if not isinstance(value, Future): 3213 value = gmx.make_constant(value) 3214 self._staging[key] = value 3215 self._staging.move_to_end(key) 3216 3217 def __enter__(self): 3218 """Enter a Context managed by the subgraph to capture operation additions. 3219 3220 Allows the internal data flow of the subgraph to be defined in the same 3221 manner as the default work graph while the Python context manager is active. 3222 3223 The subgraph Context is activated when entering a ``with`` block and 3224 finalized at the end of the block. 3225 """ 3226 # While editing the subgraph in the SubgraphContext, we need __get__ and __set__ 3227 # data descriptor access on the Subgraph subclass type, but outside of the 3228 # context manager, the descriptor should be non-writable and more opaque, 3229 # while the instance should be readable, but not writable. 3230 self.__dict__['_editing'] = True 3231 # TODO: this probably needs to be configured with variables... 3232 self.__dict__['_subgraph_context'] = SubgraphContext() 3233 push_context(self._subgraph_context) 3234 return self 3235 3236 def build(self): 3237 """Build the subgraph by defining some new operations. 3238 3239 Examine the subgraph variables. Variables with handles to data sources 3240 in the SubgraphContext represent work that needs to be executed on 3241 a subgraph execution or iteration. Variables with handles to data sources 3242 outside of the subgraph represent work that needs to be executed once 3243 on only the first iteration to initialize the subgraph. 3244 3245 Construct a factory for the fused operation that performs the work to 3246 update the variables on a single iteration. 3247 3248 Construct a factory for the fused operation that performs the work to 3249 update the variables on subsequent iterations and which will be fed 3250 the outputs of a previous iteration. Both of the generated operations 3251 have the same output signature. 3252 3253 Construct and wrap the generator function to recursively append work 3254 to the graph and update until condition is satisfied. 3255 3256 TODO: Explore how to drop work from the graph once there are no more 3257 references to its output, including check-point machinery. 3258 """ 3259 logger.debug('Finalizing subgraph definition.') 3260 inputs = collections.OrderedDict() 3261 for key, value in self.variables.items(): 3262 # TODO: What if we don't want to provide default values? 3263 inputs[key] = value 3264 3265 updates = self._staging 3266 3267 class Subgraph(object): 3268 def __init__(self, input_futures, update_sources): 3269 self.values = collections.OrderedDict([(key, value.result()) for key, value in input_futures.items()]) 3270 logger.debug('subgraph initialized with {}'.format( 3271 ', '.join(['{}: {}'.format(key, value) for key, value in self.values.items()]))) 3272 self.futures = collections.OrderedDict([(key, value) for key, value in input_futures.items()]) 3273 self.update_sources = collections.OrderedDict([(key, value) for key, value in update_sources.items()]) 3274 logger.debug('Subgraph updates staged:') 3275 for update, source in self.update_sources.items(): 3276 logger.debug(' {} = {}'.format(update, source)) 3277 3278 def run(self): 3279 for name in self.update_sources: 3280 result = self.update_sources[name].result() 3281 logger.debug('Update: {} = {}'.format(name, result)) 3282 self.values[name] = result 3283 # Replace the data sources in the futures. 3284 for name in self.update_sources: 3285 self.futures[name].resource_manager = gmx.make_constant(self.values[name]).resource_manager 3286 for name in self.update_sources: 3287 self.update_sources[name]._reset() 3288 3289 subgraph = Subgraph(inputs, updates) 3290 3291 return lambda subgraph=subgraph: subgraph 3292 3293 def __exit__(self, exc_type, exc_val, exc_tb): 3294 """End the Subgraph editing session and finalize the Subgraph build. 3295 3296 After exiting, this instance forwards __call__() to a factory for an 3297 operation that carries out the work in the subgraph with inputs bound 3298 in the current context as defined by ``variables``. 3299 """ 3300 self._factory = self.build() 3301 3302 context = pop_context() 3303 assert context is self._subgraph_context 3304 self.__dict__['_editing'] = False 3305 # Returning False causes exceptions in the `with` block to be reraised. 3306 # Remember to switch this to return True if we want to transform or suppress 3307 # such an exception (we probably do). 3308 if exc_type is not None: 3309 logger.error('Got exception {} while editing subgraph {}.'.format(exc_val, self)) 3310 logger.debug('Subgraph exception traceback: \n{}'.format(exc_tb)) 3311 return False 3312 3313 def __call__(self): 3314 # TODO: After build() has been called, this should dispatch to a factory 3315 # that returns an OperationHandle. 3316 return self._factory() 3317 3318 3319def while_loop(*, operation, condition, max_iteration=10): 3320 """Generate and run a chain of operations such that condition evaluates True. 3321 3322 Returns and operation instance that acts like a single node in the current 3323 work graph, but which is a proxy to the operation at the end of a dynamically generated chain 3324 of operations. At run time, condition is evaluated for the last element in 3325 the current chain. If condition evaluates False, the chain is extended and 3326 the next element is executed. When condition evaluates True, the object 3327 returned by ``while_loop`` becomes a proxy for the last element in the chain. 3328 3329 Equivalent to calling operation.while(condition), where available. 3330 3331 Arguments: 3332 operation: a callable that produces an instance of an operation when called with no arguments. 3333 condition: a callable that accepts an object (returned by ``operation``) that returns a boolean. 3334 max_iteration: execute the loop no more than this many times (default 10) 3335 3336 Warning: 3337 *max_iteration* is provided in part to minimize the cost of bugs in early 3338 versions of this software. The default value may be changed or 3339 removed on short notice. 3340 3341 Warning: 3342 The protocol by which ``while_loop`` interacts with ``operation`` and ``condition`` 3343 is very unstable right now. Please refer to this documentation when installing new 3344 versions of the package. 3345 3346 Protocol: 3347 Warning: 3348 This protocol will be changed before the 0.1 API is finalized. 3349 3350 When called, ``while_loop`` calls ``operation`` without arguments 3351 and captures the return value captured as ``_operation``. 3352 The object produced by ``operation()`` must have a ``reset``, 3353 a ``run`` method, and an ``output`` attribute. 3354 3355 This is inspected 3356 to determine the output data proxy for the operation produced by the call 3357 to ``while_loop``. When that operation is called, it does the equivalent of 3358 3359 while(condition(self._operation)): 3360 self._operation.reset() 3361 self._operation.run() 3362 3363 Then, the output data proxy of ``self`` is updated with the results from 3364 self._operation.output. 3365 3366 """ 3367 # In the first implementation, Subgraph is NOT and OperationHandle. 3368 # if not isinstance(obj, AbstractOperationHandle): 3369 # raise exceptions.UsageError( 3370 # '"operation" key word argument must be a callable that produces an Operation handle.') 3371 # outputs = {} 3372 # for name, descriptor in obj.output.items(): 3373 # outputs[name] = descriptor._dtype 3374 3375 # 1. Get the initial inputs. 3376 # 2. Initialize the subgraph with the initial inputs. 3377 # 3. Run the subgraph. 3378 # 4. Get the outputs. 3379 # 5. Initialize the subgraph with the outputs. 3380 # 6. Go to 3 if condition is not met. 3381 3382 obj = operation() 3383 assert hasattr(obj, 'values') 3384 outputs = collections.OrderedDict([(key, type(value)) for key, value in obj.values.items()]) 3385 3386 @function_wrapper(output=outputs) 3387 def run_loop(output: OutputCollectionDescription): 3388 iteration = 0 3389 obj = operation() 3390 logger.debug('Created object {}'.format(obj)) 3391 logger.debug(', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values])) 3392 logger.debug('Condition: {}'.format(condition(obj))) 3393 while (condition(obj)): 3394 logger.debug('Running iteration {}'.format(iteration)) 3395 obj.run() 3396 logger.debug( 3397 ', '.join(['{}: {}'.format(key, obj.values[key]) for key in obj.values])) 3398 logger.debug('Condition: {}'.format(condition(obj))) 3399 iteration += 1 3400 if iteration > max_iteration: 3401 break 3402 for name in outputs: 3403 setattr(output, name, obj.values[name]) 3404 3405 return obj 3406 3407 return run_loop 3408 3409 3410def subgraph(variables=None): 3411 """Allow operations to be configured in a sub-context. 3412 3413 The object returned functions as a Python context manager. When entering the 3414 context manager (the beginning of the ``with`` block), the object has an 3415 attribute for each of the named ``variables``. Reading from these variables 3416 gets a proxy for the initial value or its update from a previous loop iteration. 3417 At the end of the ``with`` block, any values or data flows assigned to these 3418 attributes become the output for an iteration. 3419 3420 After leaving the ``with`` block, the variables are no longer assignable, but 3421 can be called as bound methods to get the current value of a variable. 3422 3423 When the object is run, operations bound to the variables are ``reset`` and 3424 run to update the variables. 3425 """ 3426 # Implementation note: 3427 # A Subgraph (type) has a subgraph context associated with it. The subgraph's 3428 # ability to capture operation additions is implemented in terms of the 3429 # subgraph context. 3430 logger.debug('Declare a new subgraph with variables {}'.format(variables)) 3431 3432 return SubgraphBuilder(variables) 3433 3434 3435@computed_result 3436def join_arrays(*, front: datamodel.NDArray = (), back: datamodel.NDArray = ()) -> datamodel.NDArray: 3437 """Operation that consumes two sequences and produces a concatenated single sequence. 3438 3439 Note that the exact signature of the operation is not determined until this 3440 helper is called. Helper functions may dispatch to factories for different 3441 operations based on the inputs. In this case, the dtype and shape of the 3442 inputs determines dtype and shape of the output. An operation instance must 3443 have strongly typed output, but the input must be strongly typed on an 3444 object definition so that a Context can make runtime decisions about 3445 dispatching work and data before instantiating. 3446 # TODO: elaborate and clarify. 3447 # TODO: check type and shape. 3448 # TODO: figure out a better annotation. 3449 """ 3450 # TODO: (FR4) Returned list should be an NDArray. 3451 if isinstance(front, (str, bytes)) or isinstance(back, (str, bytes)): 3452 raise exceptions.ValueError('Input must be a pair of lists.') 3453 assert isinstance(front, datamodel.NDArray) 3454 assert isinstance(back, datamodel.NDArray) 3455 new_list = list(front._values) 3456 new_list.extend(back._values) 3457 return datamodel.NDArray(new_list) 3458 3459 3460# TODO: Constrain 3461Scalar = typing.TypeVar('Scalar') 3462 3463 3464def concatenate_lists(sublists: list = ()) -> _Future[gmx.datamodel.NDArray]: 3465 """Combine data sources into a single list. 3466 3467 A trivial data flow restructuring operation. 3468 """ 3469 if isinstance(sublists, (str, bytes)): 3470 raise exceptions.ValueError('Input must be a list of lists.') 3471 if len(sublists) == 0: 3472 return datamodel.ndarray([]) 3473 else: 3474 # TODO: Fix the data model so that this can type-check properly. 3475 return join_arrays(front=sublists[0], 3476 back=typing.cast(datamodel.NDArray, 3477 concatenate_lists(sublists[1:]))) 3478 3479 3480def make_constant(value: Scalar) -> _Future: 3481 """Provide a predetermined value at run time. 3482 3483 This is a trivial operation that provides a (typed) value, primarily for 3484 internally use to manage gmxapi data flow. 3485 3486 Accepts a value of any type. The object returned has a definite type and 3487 provides same interface as other gmxapi outputs. Additional constraints or 3488 guarantees on data type may appear in future versions. 3489 """ 3490 dtype = type(value) 3491 source = StaticSourceManager(name='data', proxied_data=value, width=1, function=lambda x: x) 3492 description = ResultDescription(dtype=dtype, width=1) 3493 future = Future(source, 'data', description=description) 3494 return future 3495 3496 3497def logical_not(value: bool) -> _Future: 3498 """Boolean negation. 3499 3500 If the argument is a gmxapi compatible Data or Future object, a new View or 3501 Future is created that proxies the boolean opposite of the input. 3502 3503 If the argument is a callable, logical_not returns a wrapper function that 3504 returns a Future for the logical opposite of the callable's result. 3505 """ 3506 # TODO: Small data transformations like this don't need to be formal Operations. 3507 # This could be essentially a data annotation that affects the resolver in a 3508 # DataEdge. As an API detail, coding for different Contexts and optimizations 3509 # within those Context implementations could be simplified. 3510 operation = function_wrapper(output={'data': bool})(lambda data=bool(): not bool(data)) 3511 return operation(data=value).output.data 3512