1# -*- coding: utf-8 -*-
2
3#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17from taskflow.engines.action_engine import engine
18from taskflow.engines.worker_based import executor
19from taskflow.engines.worker_based import protocol as pr
20
21
22class WorkerBasedActionEngine(engine.ActionEngine):
23    """Worker based action engine.
24
25    Specific backend options (extracted from provided engine options):
26
27    :param exchange: broker exchange exchange name in which executor / worker
28                     communication is performed
29    :param url: broker connection url (see format in kombu documentation)
30    :param topics: list of workers topics to communicate with (this will also
31                   be learned by listening to the notifications that workers
32                   emit).
33    :param transport: transport to be used (e.g. amqp, memory, etc.)
34    :param transition_timeout: numeric value (or None for infinite) to wait
35                               for submitted remote requests to transition out
36                               of the (PENDING, WAITING) request states. When
37                               expired the associated task the request was made
38                               for will have its result become a
39                               :py:class:`~taskflow.exceptions.RequestTimeout`
40                               exception instead of its normally returned
41                               value (or raised exception).
42    :param transport_options: transport specific options (see:
43                              http://kombu.readthedocs.org/ for what these
44                              options imply and are expected to be)
45    :param retry_options: retry specific options
46                          (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
47    :param worker_expiry: numeric value (or negative/zero/None for
48                          infinite) that defines the number of seconds to
49                          continue to send messages to workers that
50                          have **not** responded back to a prior
51                          notification/ping request (this defaults
52                          to 60 seconds).
53    """
54
55    def __init__(self, flow, flow_detail, backend, options):
56        super(WorkerBasedActionEngine, self).__init__(flow, flow_detail,
57                                                      backend, options)
58        # This ensures that any provided executor will be validated before
59        # we get to far in the compilation/execution pipeline...
60        self._task_executor = self._fetch_task_executor(self._options,
61                                                        self._flow_detail)
62
63    @classmethod
64    def _fetch_task_executor(cls, options, flow_detail):
65        try:
66            e = options['executor']
67            if not isinstance(e, executor.WorkerTaskExecutor):
68                raise TypeError("Expected an instance of type '%s' instead of"
69                                " type '%s' for 'executor' option"
70                                % (executor.WorkerTaskExecutor, type(e)))
71            return e
72        except KeyError:
73            return executor.WorkerTaskExecutor(
74                uuid=flow_detail.uuid,
75                url=options.get('url'),
76                exchange=options.get('exchange', 'default'),
77                retry_options=options.get('retry_options'),
78                topics=options.get('topics', []),
79                transport=options.get('transport'),
80                transport_options=options.get('transport_options'),
81                transition_timeout=options.get('transition_timeout',
82                                               pr.REQUEST_TIMEOUT),
83                worker_expiry=options.get('worker_expiry',
84                                          pr.EXPIRES_AFTER),
85            )
86