1---- 2Jobs 3---- 4 5Overview 6======== 7 8Jobs and jobboards are a **novel** concept that TaskFlow provides to allow for 9automatic ownership transfer of workflows between capable owners (those owners 10usually then use :doc:`engines <engines>` to complete the workflow). They 11provide the necessary semantics to be able to atomically transfer a job from a 12producer to a consumer in a reliable and fault tolerant manner. They are 13modeled off the concept used to post and acquire work in the physical world 14(typically a job listing in a newspaper or online website serves a similar 15role). 16 17**TLDR:** It's similar to a queue, but consumers lock items on the queue when 18claiming them, and only remove them from the queue when they're done with the 19work. If the consumer fails, the lock is *automatically* released and the item 20is back on the queue for further consumption. 21 22.. note:: 23 24 For more information, please visit the `paradigm shift`_ page for 25 more details. 26 27Definitions 28=========== 29 30Jobs 31 A :py:class:`job <taskflow.jobs.base.Job>` consists of a unique identifier, 32 name, and a reference to a :py:class:`logbook 33 <taskflow.persistence.models.LogBook>` which contains the details of the 34 work that has been or should be/will be completed to finish the work that has 35 been created for that job. 36 37Jobboards 38 A :py:class:`jobboard <taskflow.jobs.base.JobBoard>` is responsible for 39 managing the posting, ownership, and delivery of jobs. It acts as the 40 location where jobs can be posted, claimed and searched for; typically by 41 iteration or notification. Jobboards may be backed by different *capable* 42 implementations (each with potentially differing configuration) but all 43 jobboards implement the same interface and semantics so that the backend 44 usage is as transparent as possible. This allows deployers or developers of a 45 service that uses TaskFlow to select a jobboard implementation that fits 46 their setup (and their intended usage) best. 47 48High level architecture 49======================= 50 51.. figure:: img/jobboard.png 52 :height: 350px 53 :align: right 54 55 **Note:** This diagram shows the high-level diagram (and further 56 parts of this documentation also refer to it as well) of the zookeeper 57 implementation (other implementations will typically have 58 different architectures). 59 60Features 61======== 62 63- High availability 64 65 - Guarantees workflow forward progress by transferring partially complete 66 work or work that has not been started to entities which can either resume 67 the previously partially completed work or begin initial work to ensure 68 that the workflow as a whole progresses (where progressing implies 69 transitioning through the workflow :doc:`patterns <patterns>` and 70 :doc:`atoms <atoms>` and completing their associated 71 :doc:`states <states>` transitions). 72 73- Atomic transfer and single ownership 74 75 - Ensures that only one workflow is managed (aka owned) by a single owner at 76 a time in an atomic manner (including when the workflow is transferred to 77 a owner that is resuming some other failed owners work). This avoids 78 contention and ensures a workflow is managed by one and only one entity at 79 a time. 80 - *Note:* this does not mean that the owner needs to run the 81 workflow itself but instead said owner could use an engine that runs the 82 work in a distributed manner to ensure that the workflow progresses. 83 84- Separation of workflow construction and execution 85 86 - Jobs can be created with logbooks that contain a specification of the work 87 to be done by a entity (such as an API server). The job then can be 88 completed by a entity that is watching that jobboard (not necessarily the 89 API server itself). This creates a disconnection between work 90 formation and work completion that is useful for scaling out horizontally. 91 92- Asynchronous completion 93 94 - When for example a API server posts a job for completion to a 95 jobboard that API server can return a *tracking* identifier to the user 96 calling the API service. This *tracking* identifier can be used by the 97 user to poll for status (similar in concept to a shipping *tracking* 98 identifier created by fedex or UPS). 99 100Usage 101===== 102 103All jobboards are mere classes that implement same interface, and of course 104it is possible to import them and create instances of them just like with any 105other class in Python. But the easier (and recommended) way for creating 106jobboards is by using the :py:meth:`fetch() <taskflow.jobs.backends.fetch>` 107function which uses entrypoints (internally using `stevedore`_) to fetch and 108configure your backend. 109 110Using this function the typical creation of a jobboard (and an example posting 111of a job) might look like: 112 113.. code-block:: python 114 115 from taskflow.persistence import backends as persistence_backends 116 from taskflow.jobs import backends as job_backends 117 118 ... 119 persistence = persistence_backends.fetch({ 120 "connection': "mysql", 121 "user": ..., 122 "password": ..., 123 }) 124 book = make_and_save_logbook(persistence) 125 board = job_backends.fetch('my-board', { 126 "board": "zookeeper", 127 }, persistence=persistence) 128 job = board.post("my-first-job", book) 129 ... 130 131Consumption of jobs is similarly achieved by creating a jobboard and using 132the iteration functionality to find and claim jobs (and eventually consume 133them). The typical usage of a jobboard for consumption (and work completion) 134might look like: 135 136.. code-block:: python 137 138 import time 139 140 from taskflow import exceptions as exc 141 from taskflow.persistence import backends as persistence_backends 142 from taskflow.jobs import backends as job_backends 143 144 ... 145 my_name = 'worker-1' 146 coffee_break_time = 60 147 persistence = persistence_backends.fetch({ 148 "connection': "mysql", 149 "user": ..., 150 "password": ..., 151 }) 152 board = job_backends.fetch('my-board', { 153 "board": "zookeeper", 154 }, persistence=persistence) 155 while True: 156 my_job = None 157 for job in board.iterjobs(only_unclaimed=True): 158 try: 159 board.claim(job, my_name) 160 except exc.UnclaimableJob: 161 pass 162 else: 163 my_job = job 164 break 165 if my_job is not None: 166 try: 167 perform_job(my_job) 168 except Exception: 169 LOG.exception("I failed performing job: %s", my_job) 170 board.abandon(my_job, my_name) 171 else: 172 # I finished it, now cleanup. 173 board.consume(my_job) 174 persistence.get_connection().destroy_logbook(my_job.book.uuid) 175 time.sleep(coffee_break_time) 176 ... 177 178There are a few ways to provide arguments to the flow. The first option is to 179add a ``store`` to the flowdetail object in the 180:py:class:`logbook <taskflow.persistence.models.LogBook>`. 181 182You can also provide a ``store`` in the 183:py:class:`job <taskflow.jobs.base.Job>` itself when posting it to the 184job board. If both ``store`` values are found, they will be combined, 185with the :py:class:`job <taskflow.jobs.base.Job>` ``store`` 186overriding the :py:class:`logbook <taskflow.persistence.models.LogBook>` 187``store``. 188 189.. code-block:: python 190 191 from oslo_utils import uuidutils 192 193 from taskflow import engines 194 from taskflow.persistence import backends as persistence_backends 195 from taskflow.persistence import models 196 from taskflow.jobs import backends as job_backends 197 198 199 ... 200 persistence = persistence_backends.fetch({ 201 "connection': "mysql", 202 "user": ..., 203 "password": ..., 204 }) 205 board = job_backends.fetch('my-board', { 206 "board": "zookeeper", 207 }, persistence=persistence) 208 209 book = models.LogBook('my-book', uuidutils.generate_uuid()) 210 211 flow_detail = models.FlowDetail('my-job', uuidutils.generate_uuid()) 212 book.add(flow_detail) 213 214 connection = persistence.get_connection() 215 connection.save_logbook(book) 216 217 flow_detail.meta['store'] = {'a': 1, 'c': 3} 218 219 job_details = { 220 "flow_uuid": flow_detail.uuid, 221 "store": {'a': 2, 'b': 1} 222 } 223 224 engines.save_factory_details(flow_detail, flow_factory, 225 factory_args=[], 226 factory_kwargs={}, 227 backend=persistence) 228 229 jobboard = get_jobboard(zk_client) 230 jobboard.connect() 231 job = jobboard.post('my-job', book=book, details=job_details) 232 233 # the flow global parameters are now the combined store values 234 # {'a': 2, 'b': 1', 'c': 3} 235 ... 236 237 238Types 239===== 240 241Zookeeper 242--------- 243 244**Board type**: ``'zookeeper'`` 245 246Uses `zookeeper`_ to provide the jobboard capabilities and semantics by using 247a zookeeper directory, ephemeral, non-ephemeral nodes and watches. 248 249Additional *kwarg* parameters: 250 251* ``client``: a class that provides ``kazoo.client.KazooClient``-like 252 interface; it will be used for zookeeper interactions, sharing clients 253 between jobboard instances will likely provide better scalability and can 254 help avoid creating to many open connections to a set of zookeeper servers. 255* ``persistence``: a class that provides a :doc:`persistence <persistence>` 256 backend interface; it will be used for loading jobs logbooks for usage at 257 runtime or for usage before a job is claimed for introspection. 258 259Additional *configuration* parameters: 260 261* ``path``: the root zookeeper path to store job information (*defaults* to 262 ``/taskflow/jobs``) 263* ``hosts``: the list of zookeeper hosts to connect to (*defaults* to 264 ``localhost:2181``); only used if a client is not provided. 265* ``timeout``: the timeout used when performing operations with zookeeper; 266 only used if a client is not provided. 267* ``handler``: a class that provides ``kazoo.handlers``-like interface; it will 268 be used internally by `kazoo`_ to perform asynchronous operations, useful 269 when your program uses eventlet and you want to instruct kazoo to use an 270 eventlet compatible handler. 271 272.. note:: 273 274 See :py:class:`~taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard` 275 for implementation details. 276 277Redis 278----- 279 280**Board type**: ``'redis'`` 281 282Uses `redis`_ to provide the jobboard capabilities and semantics by using 283a redis hash data structure and individual job ownership keys (that can 284optionally expire after a given amount of time). 285 286.. note:: 287 288 See :py:class:`~taskflow.jobs.backends.impl_redis.RedisJobBoard` 289 for implementation details. 290 291Considerations 292============== 293 294Some usage considerations should be used when using a jobboard to make sure 295it's used in a safe and reliable manner. Eventually we hope to make these 296non-issues but for now they are worth mentioning. 297 298Dual-engine jobs 299---------------- 300 301**What:** Since atoms and engines are not currently `preemptable`_ we can not 302force an engine (or the threads/remote workers... it is using to run) to stop 303working on an atom (it is general bad behavior to force code to stop without 304its consent anyway) if it has already started working on an atom (short of 305doing a ``kill -9`` on the running interpreter). This could cause problems 306since the points an engine can notice that it no longer owns a claim is at any 307:doc:`state <states>` change that occurs (transitioning to a new atom or 308recording a result for example), where upon noticing the claim has been lost 309the engine can immediately stop doing further work. The effect that this causes 310is that when a claim is lost another engine can immediately attempt to acquire 311the claim that was previously lost and it *could* begin working on the 312unfinished tasks that the later engine may also still be executing (since that 313engine is not yet aware that it has *lost* the claim). 314 315**TLDR:** not `preemptable`_, possible to become aware of losing a claim 316after the fact (at the next state change), another engine could have acquired 317the claim by then, therefore both would be *working* on a job. 318 319**Alleviate by:** 320 321#. Ensure your atoms are `idempotent`_, this will cause an engine that may be 322 executing the same atom to be able to continue executing without causing 323 any conflicts/problems (idempotency guarantees this). 324#. On claiming jobs that have been claimed previously enforce a policy that 325 happens before the jobs workflow begins to execute (possibly prior to an 326 engine beginning the jobs work) that ensures that any prior work has been 327 rolled back before continuing rolling forward. For example: 328 329 * Rolling back the last atom/set of atoms that finished. 330 * Rolling back the last state change that occurred. 331 332#. Delay claiming partially completed work by adding a wait period (to allow 333 the previous engine to coalesce) before working on a partially completed job 334 (combine this with the prior suggestions and *most* dual-engine issues 335 should be avoided). 336 337.. _idempotent: https://en.wikipedia.org/wiki/Idempotence 338.. _preemptable: https://en.wikipedia.org/wiki/Preemption_%28computing%29 339 340Interfaces 341========== 342 343.. automodule:: taskflow.jobs.base 344.. automodule:: taskflow.jobs.backends 345 346Implementations 347=============== 348 349Zookeeper 350--------- 351 352.. automodule:: taskflow.jobs.backends.impl_zookeeper 353 354Redis 355----- 356 357.. automodule:: taskflow.jobs.backends.impl_redis 358 359Hierarchy 360========= 361 362.. inheritance-diagram:: 363 taskflow.jobs.base 364 taskflow.jobs.backends.impl_redis 365 taskflow.jobs.backends.impl_zookeeper 366 :parts: 1 367 368.. _paradigm shift: https://wiki.openstack.org/wiki/TaskFlow/Paradigm_shifts#Workflow_ownership_transfer 369.. _zookeeper: http://zookeeper.apache.org/ 370.. _kazoo: https://kazoo.readthedocs.io/en/latest/ 371.. _stevedore: https://docs.openstack.org/stevedore/latest 372.. _redis: https://redis.io/ 373