1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2018 Jens Goepfert 4# 5 6import logging 7import multiprocessing 8import queue 9import threading 10 11from photofilmstrip.lib.common.Singleton import Singleton 12from photofilmstrip.lib.DestructionManager import Destroyable 13 14from .IVisualJobManager import IVisualJobManager 15from .LogVisualJobManager import LogVisualJobManager 16from .Worker import Worker, WorkerAbortSignal 17from .JobAbortedException import JobAbortedException 18 19 20class _JobCtxGroup: 21 ''' 22 Handles the processing state of a JobContext and manages a queue with 23 JobContexts that are waiting to be processed. 24 ''' 25 26 def __init__(self, workers): 27 self.__idleQueue = queue.Queue() 28 29 # holds the JobContext that is currently active 30 self.__active = None 31 32 # counts how many workers has finished working on the active JobContext 33 self.__doneCount = 0 34 35 # if set the acitve JobContext has finshed 36 self.__doneEvent = threading.Event() 37 38 # a list with workers working for this context group 39 self.__workers = workers 40 41 self.__lock = threading.Lock() 42 43 def Put(self, jobContext): 44 ''' 45 Adds a JobContext to the queue 46 :param jobContext: 47 ''' 48 self.__idleQueue.put(jobContext) 49 50 def Get(self): 51 ''' 52 Returns a JobContext from the queue. Blocks if no JobContext is waiting. 53 ''' 54 return self.__idleQueue.get() 55 56 def __enter__(self): 57 self.__lock.acquire() 58 return self 59 60 def __exit__(self, typ, value, traceback): 61 self.__lock.release() 62 63 def Active(self): 64 ''' 65 Return the currently active JobContext. 66 ''' 67 return self.__active 68 69 def SetActive(self, jobContext): 70 ''' 71 Sets the given JobContext as active. Resets the counter of finished 72 workers and the doneEvent. 73 :param jobContext: 74 ''' 75 self.__active = jobContext 76 if jobContext is not None: 77 self.__doneCount = 0 78 self.__doneEvent.clear() 79 80 def DoneCount(self): 81 ''' 82 Returns the number of finished workers for the active JobContext. 83 Only used for logging purposes. 84 ''' 85 return self.__doneCount 86 87 def CheckBusy(self): 88 ''' 89 Returns True if workers are still busy with the active JobContext. 90 ''' 91 return self.__doneCount < len(self.__workers) 92 93 def IncDoneCount(self): 94 ''' 95 Must be called when a worker finished the last workload of the active 96 JobContext. 97 ''' 98 self.__doneCount += 1 99 100 def SetDoneEvent(self): 101 ''' 102 Must be called when the last worker finished the last workload of the 103 active JobContext. 104 ''' 105 self.__doneEvent.set() 106 107 def WaitDoneEvent(self): 108 ''' 109 Must be called to block a finished worker until the last worker has 110 finished. 111 ''' 112 self.__doneEvent.wait() 113 114 def Workers(self): 115 return self.__workers 116 117 118class JobManager(Singleton, Destroyable): 119 120 DEFAULT_CTXGROUP_ID = "general" 121 122 def __init__(self): 123 Destroyable.__init__(self) 124 self.__defaultVisual = LogVisualJobManager() 125 self.__visuals = [self.__defaultVisual] 126 127 self.__destroying = False 128 self.__jobCtxGroups = {} 129 130 self.__logger = logging.getLogger("JobManager") 131 132 def AddVisual(self, visual): 133 assert isinstance(visual, IVisualJobManager) 134 if self.__defaultVisual in self.__visuals: 135 self.__visuals.remove(self.__defaultVisual) 136 137 if visual not in self.__visuals: 138 self.__visuals.append(visual) 139 140 def RemoveVisual(self, visual): 141 if visual in self.__visuals: 142 self.__visuals.remove(visual) 143 144 if len(self.__visuals) == 0: 145 self.__visuals.append(self.__defaultVisual) 146 147 def Init(self, workerCtxGroup=None, workerCount=None): 148 if workerCtxGroup is None: 149 workerCtxGroup = JobManager.DEFAULT_CTXGROUP_ID 150 if workerCount is None: 151 workerCount = multiprocessing.cpu_count() 152 153 if workerCtxGroup in self.__jobCtxGroups: 154 raise RuntimeError("group already initialized") 155 156 workers = [] 157 i = 0 158 while i < workerCount: 159 self.__logger.debug("creating worker for group %s", workerCtxGroup) 160 worker = Worker(self, workerCtxGroup, i) 161 workers.append(worker) 162 163 i += 1 164 165 jcGroup = _JobCtxGroup(workers) 166 self.__jobCtxGroups[workerCtxGroup] = jcGroup 167 168 for worker in workers: 169 worker.start() 170 171 def EnqueueContext(self, jobContext): 172 if jobContext.GetGroupId() not in self.__jobCtxGroups: 173 raise RuntimeError("job group %s not available" % jobContext.GetGroupId()) 174 175 self.__logger.debug("%s: register job", jobContext) 176 177 jcGroup = self.__jobCtxGroups[jobContext.GetGroupId()] 178 jcGroup.Put(jobContext) 179 180 for visual in self.__visuals: 181 try: 182 visual.RegisterJob(jobContext) 183 except Exception: 184 self.__logger.error("RegisterJob for visual <%s> failed", # IGNORE:W0702 185 visual, exc_info=1) 186 187 def _GetWorkLoad(self, workerCtxGroup): 188 ''' 189 Retrieves a workload of the given context group. 190 :param workerCtxGroup: 191 ''' 192 jcGroup = self.__jobCtxGroups[workerCtxGroup] 193 194 try: 195 with jcGroup: 196 while jcGroup.Active() is None: 197 jcIdle = jcGroup.Get() 198 if jcIdle is None: 199 raise WorkerAbortSignal() 200 if self.__StartCtx(jcIdle): 201 jcGroup.SetActive(jcIdle) 202 203 if self.__destroying: 204 # if in destroying state raise Queue.Empty() to enter 205 # the except section and get FinishCtx() called 206 raise queue.Empty() 207 jobCtxActive = jcGroup.Active() 208 workLoad = jobCtxActive.GetWorkLoad() 209 return jobCtxActive, workLoad # FIXME: no tuple 210 except queue.Empty: 211 # no more workloads, job done, only __FinishCtx() needs to be done 212 # wait for all workers to be done 213 with jcGroup: 214 jcGroup.IncDoneCount() 215 216 if jcGroup.CheckBusy(): 217 self.__logger.debug("<%s> block until ready... %s", 218 threading.currentThread().getName(), 219 jcGroup.DoneCount()) 220 jcGroup.WaitDoneEvent() 221 self.__logger.debug("<%s> block released continuing... %s", 222 threading.currentThread().getName(), 223 jcGroup.DoneCount()) 224 else: 225 with jcGroup: 226 jobCtxActive = jcGroup.Active() 227 if jobCtxActive is not None: 228 jcGroup.SetActive(None) 229 self.__FinishCtx(jobCtxActive) 230 231 self.__logger.debug("<%s> set done... %s", 232 threading.currentThread().getName(), 233 jcGroup.DoneCount()) 234 jcGroup.SetDoneEvent() 235 236 if self.__destroying: 237 raise WorkerAbortSignal() 238 else: 239 raise queue.Empty() 240 241 def __StartCtx(self, ctx): 242 self.__logger.debug("<%s> starting %s...", 243 threading.currentThread().getName(), ctx.GetName()) 244 try: 245 ctx._Begin() # pylint: disable=protected-access 246 except JobAbortedException: 247 return False 248 except Exception as exc: 249 self.__logger.error("<%s> not started %s", # IGNORE:W0702 250 threading.currentThread().getName(), ctx.GetName(), exc_info=1) 251 try: 252 ctx.Abort("Error: %s" % exc) 253 except: 254 self.__logger.error("<%s> error while aborting faulty started %s", # IGNORE:W0702 255 threading.currentThread().getName(), ctx.GetName(), exc_info=1) 256 return False 257 258 self.__logger.debug("<%s> started %s", 259 threading.currentThread().getName(), ctx.GetName()) 260 return True 261 262 def __FinishCtx(self, ctx): 263 self.__logger.debug("<%s> finalizing %s...", 264 threading.currentThread().getName(), ctx.GetName()) 265 try: 266 ctx._Done() # pylint: disable=protected-access 267 except: 268 self.__logger.error("<%s> error %s", # IGNORE:W0702 269 threading.currentThread().getName(), ctx.GetName(), exc_info=1) 270 finally: 271 self.__logger.debug("<%s> finished %s", 272 threading.currentThread().getName(), ctx.GetName()) 273 274 for visual in self.__visuals: 275 try: 276 visual.RemoveJob(ctx) 277 except Exception: 278 self.__logger.error("RemoveJob for visual <%s> failed", # IGNORE:W0702 279 visual, exc_info=1) 280 281 def Destroy(self): 282 self.__logger.debug("start destroying") 283 self.__destroying = True 284 for jcGroup in self.__jobCtxGroups.values(): 285 for worker in jcGroup.Workers(): 286 # put invalid jobs in idle queue to release the blocking state 287 jcGroup.Put(None) 288 289 for worker in jcGroup.Workers(): 290 self.__logger.debug("<%s> joining...", worker.getName()) 291 worker.join(3) 292 if worker.isAlive(): 293 self.__logger.warning("<%s> join failed", worker.getName()) 294 else: 295 self.__logger.debug("<%s> joined!", worker.getName()) 296 297 self.__logger.debug("destroyed") 298 299