1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2018 Jens Goepfert
4#
5
6import logging
7import multiprocessing
8import queue
9import threading
10
11from photofilmstrip.lib.common.Singleton import Singleton
12from photofilmstrip.lib.DestructionManager import Destroyable
13
14from .IVisualJobManager import IVisualJobManager
15from .LogVisualJobManager import LogVisualJobManager
16from .Worker import Worker, WorkerAbortSignal
17from .JobAbortedException import JobAbortedException
18
19
20class _JobCtxGroup:
21    '''
22    Handles the processing state of a JobContext and manages a queue with
23    JobContexts that are waiting to be processed.
24    '''
25
26    def __init__(self, workers):
27        self.__idleQueue = queue.Queue()
28
29        # holds the JobContext that is currently active
30        self.__active = None
31
32        # counts how many workers has finished working on the active JobContext
33        self.__doneCount = 0
34
35        # if set the acitve JobContext has finshed
36        self.__doneEvent = threading.Event()
37
38        # a list with workers working for this context group
39        self.__workers = workers
40
41        self.__lock = threading.Lock()
42
43    def Put(self, jobContext):
44        '''
45        Adds a JobContext to the queue
46        :param jobContext:
47        '''
48        self.__idleQueue.put(jobContext)
49
50    def Get(self):
51        '''
52        Returns a JobContext from the queue. Blocks if no JobContext is waiting.
53        '''
54        return self.__idleQueue.get()
55
56    def __enter__(self):
57        self.__lock.acquire()
58        return self
59
60    def __exit__(self, typ, value, traceback):
61        self.__lock.release()
62
63    def Active(self):
64        '''
65        Return the currently active JobContext.
66        '''
67        return self.__active
68
69    def SetActive(self, jobContext):
70        '''
71        Sets the given JobContext as active. Resets the counter of finished
72        workers and the doneEvent.
73        :param jobContext:
74        '''
75        self.__active = jobContext
76        if jobContext is not None:
77            self.__doneCount = 0
78            self.__doneEvent.clear()
79
80    def DoneCount(self):
81        '''
82        Returns the number of finished workers for the active JobContext.
83        Only used for logging purposes.
84        '''
85        return self.__doneCount
86
87    def CheckBusy(self):
88        '''
89        Returns True if workers are still busy with the active JobContext.
90        '''
91        return self.__doneCount < len(self.__workers)
92
93    def IncDoneCount(self):
94        '''
95        Must be called when a worker finished the last workload of the active
96        JobContext.
97        '''
98        self.__doneCount += 1
99
100    def SetDoneEvent(self):
101        '''
102        Must be called when the last worker finished the last workload of the
103        active JobContext.
104        '''
105        self.__doneEvent.set()
106
107    def WaitDoneEvent(self):
108        '''
109        Must be called to block a finished worker until the last worker has
110        finished.
111        '''
112        self.__doneEvent.wait()
113
114    def Workers(self):
115        return self.__workers
116
117
118class JobManager(Singleton, Destroyable):
119
120    DEFAULT_CTXGROUP_ID = "general"
121
122    def __init__(self):
123        Destroyable.__init__(self)
124        self.__defaultVisual = LogVisualJobManager()
125        self.__visuals = [self.__defaultVisual]
126
127        self.__destroying = False
128        self.__jobCtxGroups = {}
129
130        self.__logger = logging.getLogger("JobManager")
131
132    def AddVisual(self, visual):
133        assert isinstance(visual, IVisualJobManager)
134        if self.__defaultVisual in self.__visuals:
135            self.__visuals.remove(self.__defaultVisual)
136
137        if visual not in self.__visuals:
138            self.__visuals.append(visual)
139
140    def RemoveVisual(self, visual):
141        if visual in self.__visuals:
142            self.__visuals.remove(visual)
143
144        if len(self.__visuals) == 0:
145            self.__visuals.append(self.__defaultVisual)
146
147    def Init(self, workerCtxGroup=None, workerCount=None):
148        if workerCtxGroup is None:
149            workerCtxGroup = JobManager.DEFAULT_CTXGROUP_ID
150        if workerCount is None:
151            workerCount = multiprocessing.cpu_count()
152
153        if workerCtxGroup in self.__jobCtxGroups:
154            raise RuntimeError("group already initialized")
155
156        workers = []
157        i = 0
158        while i < workerCount:
159            self.__logger.debug("creating worker for group %s", workerCtxGroup)
160            worker = Worker(self, workerCtxGroup, i)
161            workers.append(worker)
162
163            i += 1
164
165        jcGroup = _JobCtxGroup(workers)
166        self.__jobCtxGroups[workerCtxGroup] = jcGroup
167
168        for worker in workers:
169            worker.start()
170
171    def EnqueueContext(self, jobContext):
172        if jobContext.GetGroupId() not in self.__jobCtxGroups:
173            raise RuntimeError("job group %s not available" % jobContext.GetGroupId())
174
175        self.__logger.debug("%s: register job", jobContext)
176
177        jcGroup = self.__jobCtxGroups[jobContext.GetGroupId()]
178        jcGroup.Put(jobContext)
179
180        for visual in self.__visuals:
181            try:
182                visual.RegisterJob(jobContext)
183            except Exception:
184                self.__logger.error("RegisterJob for visual <%s> failed",  # IGNORE:W0702
185                    visual, exc_info=1)
186
187    def _GetWorkLoad(self, workerCtxGroup):
188        '''
189        Retrieves a workload of the given context group.
190        :param workerCtxGroup:
191        '''
192        jcGroup = self.__jobCtxGroups[workerCtxGroup]
193
194        try:
195            with jcGroup:
196                while jcGroup.Active() is None:
197                    jcIdle = jcGroup.Get()
198                    if jcIdle is None:
199                        raise WorkerAbortSignal()
200                    if self.__StartCtx(jcIdle):
201                        jcGroup.SetActive(jcIdle)
202
203                if self.__destroying:
204                    # if in destroying state raise Queue.Empty() to enter
205                    # the except section and get FinishCtx() called
206                    raise queue.Empty()
207                jobCtxActive = jcGroup.Active()
208                workLoad = jobCtxActive.GetWorkLoad()
209                return jobCtxActive, workLoad  # FIXME: no tuple
210        except queue.Empty:
211            # no more workloads, job done, only __FinishCtx() needs to be done
212            # wait for all workers to be done
213            with jcGroup:
214                jcGroup.IncDoneCount()
215
216            if jcGroup.CheckBusy():
217                self.__logger.debug("<%s> block until ready... %s",
218                                    threading.currentThread().getName(),
219                                    jcGroup.DoneCount())
220                jcGroup.WaitDoneEvent()
221                self.__logger.debug("<%s> block released continuing... %s",
222                                    threading.currentThread().getName(),
223                                    jcGroup.DoneCount())
224            else:
225                with jcGroup:
226                    jobCtxActive = jcGroup.Active()
227                    if jobCtxActive is not None:
228                        jcGroup.SetActive(None)
229                        self.__FinishCtx(jobCtxActive)
230
231                self.__logger.debug("<%s> set done... %s",
232                                    threading.currentThread().getName(),
233                                    jcGroup.DoneCount())
234                jcGroup.SetDoneEvent()
235
236            if self.__destroying:
237                raise WorkerAbortSignal()
238            else:
239                raise queue.Empty()
240
241    def __StartCtx(self, ctx):
242        self.__logger.debug("<%s> starting %s...",
243                            threading.currentThread().getName(), ctx.GetName())
244        try:
245            ctx._Begin()  # pylint: disable=protected-access
246        except JobAbortedException:
247            return False
248        except Exception as exc:
249            self.__logger.error("<%s> not started %s",  # IGNORE:W0702
250                                threading.currentThread().getName(), ctx.GetName(), exc_info=1)
251            try:
252                ctx.Abort("Error: %s" % exc)
253            except:
254                self.__logger.error("<%s> error while aborting faulty started %s",  # IGNORE:W0702
255                                    threading.currentThread().getName(), ctx.GetName(), exc_info=1)
256            return False
257
258        self.__logger.debug("<%s> started %s",
259                            threading.currentThread().getName(), ctx.GetName())
260        return True
261
262    def __FinishCtx(self, ctx):
263        self.__logger.debug("<%s> finalizing %s...",
264                            threading.currentThread().getName(), ctx.GetName())
265        try:
266            ctx._Done()  # pylint: disable=protected-access
267        except:
268            self.__logger.error("<%s> error %s",  # IGNORE:W0702
269                                threading.currentThread().getName(), ctx.GetName(), exc_info=1)
270        finally:
271            self.__logger.debug("<%s> finished %s",
272                                threading.currentThread().getName(), ctx.GetName())
273
274        for visual in self.__visuals:
275            try:
276                visual.RemoveJob(ctx)
277            except Exception:
278                self.__logger.error("RemoveJob for visual <%s> failed",  # IGNORE:W0702
279                    visual, exc_info=1)
280
281    def Destroy(self):
282        self.__logger.debug("start destroying")
283        self.__destroying = True
284        for jcGroup in self.__jobCtxGroups.values():
285            for worker in jcGroup.Workers():
286                # put invalid jobs in idle queue to release the blocking state
287                jcGroup.Put(None)
288
289            for worker in jcGroup.Workers():
290                self.__logger.debug("<%s> joining...", worker.getName())
291                worker.join(3)
292                if worker.isAlive():
293                    self.__logger.warning("<%s> join failed", worker.getName())
294                else:
295                    self.__logger.debug("<%s> joined!", worker.getName())
296
297        self.__logger.debug("destroyed")
298
299