1----
2Jobs
3----
4
5Overview
6========
7
8Jobs and jobboards are a **novel** concept that TaskFlow provides to allow for
9automatic ownership transfer of workflows between capable owners (those owners
10usually then use :doc:`engines <engines>` to complete the workflow). They
11provide the necessary semantics to be able to atomically transfer a job from a
12producer to a consumer in a reliable and fault tolerant manner. They are
13modeled off the concept used to post and acquire work in the physical world
14(typically a job listing in a newspaper or online website serves a similar
15role).
16
17**TLDR:** It's similar to a queue, but consumers lock items on the queue when
18claiming them, and only remove them from the queue when they're done with the
19work. If the consumer fails, the lock is *automatically* released and the item
20is back on the queue for further consumption.
21
22.. note::
23
24    For more information, please visit the `paradigm shift`_ page for
25    more details.
26
27Definitions
28===========
29
30Jobs
31  A :py:class:`job <taskflow.jobs.base.Job>` consists of a unique identifier,
32  name, and a reference to a :py:class:`logbook
33  <taskflow.persistence.models.LogBook>` which contains the details of the
34  work that has been or should be/will be completed to finish the work that has
35  been created for that job.
36
37Jobboards
38  A :py:class:`jobboard <taskflow.jobs.base.JobBoard>` is responsible for
39  managing the posting, ownership, and delivery of jobs. It acts as the
40  location where jobs can be posted, claimed and searched for; typically by
41  iteration or notification.  Jobboards may be backed by different *capable*
42  implementations (each with potentially differing configuration) but all
43  jobboards implement the same interface and semantics so that the backend
44  usage is as transparent as possible. This allows deployers or developers of a
45  service that uses TaskFlow to select a jobboard implementation that fits
46  their setup (and their intended usage) best.
47
48High level architecture
49=======================
50
51.. figure:: img/jobboard.png
52   :height: 350px
53   :align: right
54
55   **Note:** This diagram shows the high-level diagram (and further
56   parts of this documentation also refer to it as well) of the zookeeper
57   implementation (other implementations will typically have
58   different architectures).
59
60Features
61========
62
63- High availability
64
65  - Guarantees workflow forward progress by transferring partially complete
66    work or work that has not been started to entities which can either resume
67    the previously partially completed work or begin initial work to ensure
68    that the workflow as a whole progresses (where progressing implies
69    transitioning through the workflow :doc:`patterns <patterns>` and
70    :doc:`atoms <atoms>` and completing their associated
71    :doc:`states <states>` transitions).
72
73- Atomic transfer and single ownership
74
75  - Ensures that only one workflow is managed (aka owned) by a single owner at
76    a time in an atomic manner (including when the workflow is transferred to
77    a owner that is resuming some other failed owners work). This avoids
78    contention and ensures a workflow is managed by one and only one entity at
79    a time.
80  - *Note:* this does not mean that the owner needs to run the
81    workflow itself but instead said owner could use an engine that runs the
82    work in a distributed manner to ensure that the workflow progresses.
83
84- Separation of workflow construction and execution
85
86  - Jobs can be created with logbooks that contain a specification of the work
87    to be done by a entity (such as an API server). The job then can be
88    completed by a entity that is watching that jobboard (not necessarily the
89    API server itself). This creates a disconnection between work
90    formation and work completion that is useful for scaling out horizontally.
91
92- Asynchronous completion
93
94  - When for example a API server posts a job for completion to a
95    jobboard that API server can return a *tracking* identifier to the user
96    calling the API service. This  *tracking* identifier can be used by the
97    user to poll for status (similar in concept to a shipping *tracking*
98    identifier created by fedex or UPS).
99
100Usage
101=====
102
103All jobboards are mere classes that implement same interface, and of course
104it is possible to import them and create instances of them just like with any
105other class in Python. But the easier (and recommended) way for creating
106jobboards is by using the :py:meth:`fetch() <taskflow.jobs.backends.fetch>`
107function which uses entrypoints (internally using `stevedore`_) to fetch and
108configure your backend.
109
110Using this function the typical creation of a jobboard (and an example posting
111of a job) might look like:
112
113.. code-block:: python
114
115    from taskflow.persistence import backends as persistence_backends
116    from taskflow.jobs import backends as job_backends
117
118    ...
119    persistence = persistence_backends.fetch({
120        "connection': "mysql",
121        "user": ...,
122        "password": ...,
123    })
124    book = make_and_save_logbook(persistence)
125    board = job_backends.fetch('my-board', {
126        "board": "zookeeper",
127    }, persistence=persistence)
128    job = board.post("my-first-job", book)
129    ...
130
131Consumption of jobs is similarly achieved by creating a jobboard and using
132the iteration functionality to find and claim jobs (and eventually consume
133them). The typical usage of a jobboard for consumption (and work completion)
134might look like:
135
136.. code-block:: python
137
138    import time
139
140    from taskflow import exceptions as exc
141    from taskflow.persistence import backends as persistence_backends
142    from taskflow.jobs import backends as job_backends
143
144    ...
145    my_name = 'worker-1'
146    coffee_break_time = 60
147    persistence = persistence_backends.fetch({
148        "connection': "mysql",
149        "user": ...,
150        "password": ...,
151    })
152    board = job_backends.fetch('my-board', {
153        "board": "zookeeper",
154    }, persistence=persistence)
155    while True:
156        my_job = None
157        for job in board.iterjobs(only_unclaimed=True):
158            try:
159                board.claim(job, my_name)
160            except exc.UnclaimableJob:
161                pass
162            else:
163                my_job = job
164                break
165        if my_job is not None:
166            try:
167                perform_job(my_job)
168            except Exception:
169                LOG.exception("I failed performing job: %s", my_job)
170                board.abandon(my_job, my_name)
171            else:
172                # I finished it, now cleanup.
173                board.consume(my_job)
174                persistence.get_connection().destroy_logbook(my_job.book.uuid)
175        time.sleep(coffee_break_time)
176    ...
177
178There are a few ways to provide arguments to the flow.  The first option is to
179add a ``store`` to the flowdetail object in the
180:py:class:`logbook <taskflow.persistence.models.LogBook>`.
181
182You can also provide a ``store`` in the
183:py:class:`job <taskflow.jobs.base.Job>` itself when posting it to the
184job board.  If both ``store`` values are found, they will be combined,
185with the :py:class:`job <taskflow.jobs.base.Job>` ``store``
186overriding the :py:class:`logbook <taskflow.persistence.models.LogBook>`
187``store``.
188
189.. code-block:: python
190
191    from oslo_utils import uuidutils
192
193    from taskflow import engines
194    from taskflow.persistence import backends as persistence_backends
195    from taskflow.persistence import models
196    from taskflow.jobs import backends as job_backends
197
198
199    ...
200    persistence = persistence_backends.fetch({
201        "connection': "mysql",
202        "user": ...,
203        "password": ...,
204    })
205    board = job_backends.fetch('my-board', {
206        "board": "zookeeper",
207    }, persistence=persistence)
208
209    book = models.LogBook('my-book', uuidutils.generate_uuid())
210
211    flow_detail = models.FlowDetail('my-job', uuidutils.generate_uuid())
212    book.add(flow_detail)
213
214    connection = persistence.get_connection()
215    connection.save_logbook(book)
216
217    flow_detail.meta['store'] = {'a': 1, 'c': 3}
218
219    job_details = {
220        "flow_uuid": flow_detail.uuid,
221        "store": {'a': 2, 'b': 1}
222    }
223
224    engines.save_factory_details(flow_detail, flow_factory,
225                                 factory_args=[],
226                                 factory_kwargs={},
227                                 backend=persistence)
228
229    jobboard = get_jobboard(zk_client)
230    jobboard.connect()
231    job = jobboard.post('my-job', book=book, details=job_details)
232
233    # the flow global parameters are now the combined store values
234    # {'a': 2, 'b': 1', 'c': 3}
235    ...
236
237
238Types
239=====
240
241Zookeeper
242---------
243
244**Board type**: ``'zookeeper'``
245
246Uses `zookeeper`_ to provide the jobboard capabilities and semantics by using
247a zookeeper directory, ephemeral, non-ephemeral nodes and watches.
248
249Additional *kwarg* parameters:
250
251* ``client``: a class that provides ``kazoo.client.KazooClient``-like
252  interface; it will be used for zookeeper interactions, sharing clients
253  between jobboard instances will likely provide better scalability and can
254  help avoid creating to many open connections to a set of zookeeper servers.
255* ``persistence``: a class that provides a :doc:`persistence <persistence>`
256  backend interface; it will be used for loading jobs logbooks for usage at
257  runtime or for usage before a job is claimed for introspection.
258
259Additional *configuration* parameters:
260
261* ``path``: the root zookeeper path to store job information (*defaults* to
262  ``/taskflow/jobs``)
263* ``hosts``: the list of zookeeper hosts to connect to (*defaults* to
264  ``localhost:2181``); only used if a client is not provided.
265* ``timeout``: the timeout used when performing operations with zookeeper;
266  only used if a client is not provided.
267* ``handler``: a class that provides ``kazoo.handlers``-like interface; it will
268  be used internally by `kazoo`_ to perform asynchronous operations, useful
269  when your program uses eventlet and you want to instruct kazoo to use an
270  eventlet compatible handler.
271
272.. note::
273
274    See :py:class:`~taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard`
275    for implementation details.
276
277Redis
278-----
279
280**Board type**: ``'redis'``
281
282Uses `redis`_ to provide the jobboard capabilities and semantics by using
283a redis hash data structure and individual job ownership keys (that can
284optionally expire after a given amount of time).
285
286.. note::
287
288    See :py:class:`~taskflow.jobs.backends.impl_redis.RedisJobBoard`
289    for implementation details.
290
291Considerations
292==============
293
294Some usage considerations should be used when using a jobboard to make sure
295it's used in a safe and reliable manner. Eventually we hope to make these
296non-issues but for now they are worth mentioning.
297
298Dual-engine jobs
299----------------
300
301**What:** Since atoms and engines are not currently `preemptable`_ we can not
302force an engine (or the threads/remote workers... it is using to run) to stop
303working on an atom (it is general bad behavior to force code to stop without
304its consent anyway) if it has already started working on an atom (short of
305doing a ``kill -9`` on the running interpreter).  This could cause problems
306since the points an engine can notice that it no longer owns a claim is at any
307:doc:`state <states>` change that occurs (transitioning to a new atom or
308recording a result for example), where upon noticing the claim has been lost
309the engine can immediately stop doing further work. The effect that this causes
310is that when a claim is lost another engine can immediately attempt to acquire
311the claim that was previously lost and it *could* begin working on the
312unfinished tasks that the later engine may also still be executing (since that
313engine is not yet aware that it has *lost* the claim).
314
315**TLDR:** not `preemptable`_, possible to become aware of losing a claim
316after the fact (at the next state change), another engine could have acquired
317the claim by then, therefore both would be *working* on a job.
318
319**Alleviate by:**
320
321#. Ensure your atoms are `idempotent`_, this will cause an engine that may be
322   executing the same atom to be able to continue executing without causing
323   any conflicts/problems (idempotency guarantees this).
324#. On claiming jobs that have been claimed previously enforce a policy that
325   happens before the jobs workflow begins to execute (possibly prior to an
326   engine beginning the jobs work) that ensures that any prior work has been
327   rolled back before continuing rolling forward. For example:
328
329   * Rolling back the last atom/set of atoms that finished.
330   * Rolling back the last state change that occurred.
331
332#. Delay claiming partially completed work by adding a wait period (to allow
333   the previous engine to coalesce) before working on a partially completed job
334   (combine this with the prior suggestions and *most* dual-engine issues
335   should be avoided).
336
337.. _idempotent: https://en.wikipedia.org/wiki/Idempotence
338.. _preemptable: https://en.wikipedia.org/wiki/Preemption_%28computing%29
339
340Interfaces
341==========
342
343.. automodule:: taskflow.jobs.base
344.. automodule:: taskflow.jobs.backends
345
346Implementations
347===============
348
349Zookeeper
350---------
351
352.. automodule:: taskflow.jobs.backends.impl_zookeeper
353
354Redis
355-----
356
357.. automodule:: taskflow.jobs.backends.impl_redis
358
359Hierarchy
360=========
361
362.. inheritance-diagram::
363    taskflow.jobs.base
364    taskflow.jobs.backends.impl_redis
365    taskflow.jobs.backends.impl_zookeeper
366    :parts: 1
367
368.. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer
369.. _zookeeper: http://zookeeper.apache.org/
370.. _kazoo: https://kazoo.readthedocs.io/en/latest/
371.. _stevedore: https://docs.openstack.org/stevedore/latest
372.. _redis: https://redis.io/
373