1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17import logging 18import os 19 20import six 21 22from taskflow import exceptions 23from taskflow.listeners import base 24from taskflow import states 25 26LOG = logging.getLogger(__name__) 27 28 29class CheckingClaimListener(base.Listener): 30 """Listener that interacts [engine, job, jobboard]; ensures claim is valid. 31 32 This listener (or a derivative) can be associated with an engines 33 notification system after the job has been claimed (so that the jobs work 34 can be worked on by that engine). This listener (after associated) will 35 check that the job is still claimed *whenever* the engine notifies of a 36 task or flow state change. If the job is not claimed when a state change 37 occurs, a associated handler (or the default) will be activated to 38 determine how to react to this *hopefully* exceptional case. 39 40 NOTE(harlowja): this may create more traffic than desired to the 41 jobboard backend (zookeeper or other), since the amount of state change 42 per task and flow is non-zero (and checking during each state change will 43 result in quite a few calls to that management system to check the jobs 44 claim status); this could be later optimized to check less (or only check 45 on a smaller set of states) 46 47 NOTE(harlowja): if a custom ``on_job_loss`` callback is provided it must 48 accept three positional arguments, the first being the current engine being 49 ran, the second being the 'task/flow' state and the third being the details 50 that were sent from the engine to listeners for inspection. 51 """ 52 53 def __init__(self, engine, job, board, owner, on_job_loss=None): 54 super(CheckingClaimListener, self).__init__(engine) 55 self._job = job 56 self._board = board 57 self._owner = owner 58 if on_job_loss is None: 59 self._on_job_loss = self._suspend_engine_on_loss 60 else: 61 if not six.callable(on_job_loss): 62 raise ValueError("Custom 'on_job_loss' handler must be" 63 " callable") 64 self._on_job_loss = on_job_loss 65 66 def _suspend_engine_on_loss(self, engine, state, details): 67 """The default strategy for handling claims being lost.""" 68 try: 69 engine.suspend() 70 except exceptions.TaskFlowException as e: 71 LOG.warning("Failed suspending engine '%s', (previously owned by" 72 " '%s'):%s%s", engine, self._owner, os.linesep, 73 e.pformat()) 74 75 def _flow_receiver(self, state, details): 76 self._claim_checker(state, details) 77 78 def _task_receiver(self, state, details): 79 self._claim_checker(state, details) 80 81 def _has_been_lost(self): 82 try: 83 job_state = self._job.state 84 job_owner = self._board.find_owner(self._job) 85 except (exceptions.NotFound, exceptions.JobFailure): 86 return True 87 else: 88 if job_state == states.UNCLAIMED or self._owner != job_owner: 89 return True 90 else: 91 return False 92 93 def _claim_checker(self, state, details): 94 if not self._has_been_lost(): 95 LOG.debug("Job '%s' is still claimed (actively owned by '%s')", 96 self._job, self._owner) 97 else: 98 LOG.warning("Job '%s' has lost its claim" 99 " (previously owned by '%s')", 100 self._job, self._owner) 101 self._on_job_loss(self._engine, state, details) 102