1# process.py -- A very simple wrapper around QProcess
2#
3# Copyright (c) 2012 by Wilbert Berendsen
4#
5# This program is free software; you can redistribute it and/or
6# modify it under the terms of the GNU General Public License
7# as published by the Free Software Foundation; either version 2
8# of the License, or (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program; if not, write to the Free Software
17# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18# See http://www.gnu.org/licenses/ for more information.
19
20"""
21A (multithreaded) Job queue
22"""
23
24from enum import Enum
25import collections
26import time
27
28from PyQt5.QtCore import QObject
29
30import app
31import signals
32
33
34class RunnerBusyException(Exception):
35    """Raised when a Runner is asked to start a job (without the force=True
36    keyword argument) while having already a running job."""
37    pass
38
39
40class Runner(QObject):
41    """A Runner in the stack of a JobQueue.
42
43    Responsible for executing a single job.Job instance.
44    Receives a job through the start command from the queue, reports
45    back about completion of the job. Any further actions are
46    managed by the queue.
47
48    References are kept to the queue and the job as well as the index
49    in the queue.
50    """
51
52    def __init__(self, queue, index):
53        super(Runner, self).__init__()
54        self._queue = queue
55        self._index = index
56        self._job = None
57        self._completed = 0
58
59    def abort(self):
60        """Aborts a running job if any."""
61        if self._job and self._job.is_running():
62            self._job.abort()
63
64    def completed(self):
65        """Return the number of jobs completed during the Runner's lifetime."""
66        return self._completed
67
68    def index(self):
69        """Return the index of the Runner in the JobQueue."""
70        return self._index
71
72    def is_running(self):
73        return self._job and self._job.is_running()
74
75    def job(self):
76        return self._job
77
78    def job_done(self):
79        """Count job, notify queue, remove reference to Job object."""
80        self._completed += 1
81        job = self._job
82        self._job = None
83        self._queue.job_completed(self, job)
84
85    def start(self, j, force=False):
86        """Start a given job.
87        If currently a job is running either abort that
88        or raise an exception."""
89        if self._job and self.is_running():
90            if force:
91                self.abort()
92            else:
93                raise RunnerBusyException(
94                    _("Job is already running. Wait for completion."))
95        self._job = j
96        j.set_runner(self)
97        j.done.connect(self.job_done)
98        j.start()
99
100
101class AbstractQueue(QObject):
102    """Common interface for the different queue types used in the JobQueue.
103    The various queue classes are very lightweight wrappers around the
104    corresponding concepts and base objects, with the only reason to
105    provide a transparent interface for used in JobQueue."""
106
107    def __init__(self):
108        super(AbstractQueue, self).__init__()
109
110    def clear(self):
111        """Remove all entries from the queue."""
112        raise NotImplementedError
113
114    def empty(self):
115        """Return True if there are no queued items."""
116        return self.length() == 0
117
118    def length(self):
119        """Return the length of the queue. Only has to be overridden
120        when the data structure doesn't support len()."""
121        return len(self._queue)
122
123    def push(self, j):
124        """Add a job to the queue."""
125        raise NotImplementedError
126
127    def pop(self):
128        """Remove and return the next job."""
129        raise NotImplementedError
130
131
132class AbstractStackQueue(AbstractQueue):
133    """Common ancestor for LIFO and FIFO queues"""
134
135    def __init__(self):
136        super(AbstractStackQueue, self).__init__()
137        self._queue = collections.deque()
138
139    def clear(self):
140        """Remove all entries from the queue."""
141        self._queue.clear()
142
143    def pop(self):
144        return self._queue.pop()
145
146
147class Queue(AbstractStackQueue):
148    """First-in-first-out queue (default operation)."""
149
150    def push(self, j):
151        self._queue.appendleft(j)
152
153
154class Stack(AbstractStackQueue):
155    """Last-in-first-out queue, or stack."""
156
157    def push(self, j):
158        self._queue.append(j)
159
160
161class PriorityQueue(AbstractQueue):
162    """Priority queue, always popping the job with the highest priority.
163
164    Uses Job's priority() property (which defaults to 1) and a transparent
165    insert count to determine order of popping jobs. If jobs have the same
166    priority they will be served first-in-first-out."""
167
168    def __init__(self):
169        super(PriorityQueue, self).__init__()
170        self._queue = []
171        self._insert_count = 0
172
173    def clear(self):
174        """Remove all entries from the queue."""
175        self._queue = []
176
177    def push(self, j):
178        """Add a job to the queue. retrieve the priority from the job,
179        add an autoincrement value for comparing jobs with identical
180        priority."""
181        from heapq import heappush
182        heappush(self._queue, (j.priority(), self._insert_count, j))
183        self._insert_count += 1
184
185    def pop(self):
186        """Return the correct part of the tuplet
187        (1st: priority, 2nd: insert order)."""
188        from heapq import heappop
189        return heappop(self._queue)[2]
190
191
192class JobQueueException(Exception):
193    """Abstract base exception for JobQueue related exceptions."""
194    pass
195
196
197class JobQueueStateException(JobQueueException):
198    """Raised when an operation is not allowed in the current
199    state of the queue."""
200    pass
201
202
203class QueueStatus(Enum):
204    INACTIVE = 0
205    STARTED = 1
206    PAUSED = 2
207    EMPTY = 3
208    IDLE = 4
209    FINISHED = 5
210    ABORTED = 6
211
212
213class QueueMode(Enum):
214    """Running mode of the JobQueue.
215    CONTINUOUS means that the queue can be idle, waiting for new jobs,
216    SINGLE means that it will be considered finished when running empty."""
217    CONTINUOUS = 0
218    SINGLE = 1
219
220
221class JobQueue(QObject):
222    """A (multiThreaded) Job Queue.
223
224    Manages a given number of "runners" which can process one job
225    at a time each.
226
227    An arbitrary number of job.Job() instances (or any descendants)
228    can be added to the queue, which are distributed in parallel
229    to the runners.
230
231    A special case is a queue with only one runner. This can be used
232    to ensure that asynchronous jobs can run in sequence, either to
233    ensure that only one such job runs at a time (e.g. to force
234    long-running stuff to the background) or to ensure subsequent
235    jobs can use the results of earlier ones.
236
237    The queue supports pause(), resume() and abort() operations.
238    It iterates through the states
239    QueueStatus.INACTIVE
240                STARTED
241                PAUSED (remaining jobs won't be started)
242                EMPTY (no *new* jobs are queued)
243                IDLE (no new jobs available, all jobs completed)
244                FINISHED
245                ABORTED
246
247    A Queue can work in two modes: QueueMode.CONTINUOUS (default) and
248    QueueMode.SINGLE. In single mode the queue is considered finished once
249    there are no jobs available while a continuous queue switches to
250    IDLE in that case. A CONTINUOUS queue is always considered to be
251    active (idle when empty) and doesn't have to be explicitly started.
252
253    If a 'capacity' is passed to the queue it has the notion of "full",
254    otherwise an unlimited number of jobs can be enqueued.
255
256    By default an internal FIFO (First in, first out) Queue is used
257    as the underlying data structure, but Stack and PriorityQueue are
258    available through the keyword command as well.
259    """
260
261    started = signals.Signal()
262    paused = signals.Signal()
263    resumed = signals.Signal()
264    emptied = signals.Signal()  # emitted when last job is popped
265    idle = signals.Signal()  # emitted when waiting for new jobs
266                             # after the last job has been completed
267    finished = signals.Signal()
268    aborted = signals.Signal()
269    # The following three signals emit the corresponding job as argument.
270    job_added = signals.Signal()
271    job_done = signals.Signal()  # emitted when a job has been completed.
272               # When this is emitted, the queue's state has been
273               # updated (other than with the *job's* signal)
274    job_started = signals.Signal()
275
276    def __init__(self,
277                 queue_class=Queue,
278                 queue_mode=QueueMode.CONTINUOUS,
279                 num_runners=1,
280                 tick_interval=1000,
281                 capacity=None):
282        super(JobQueue, self).__init__()
283        self._state = QueueStatus.INACTIVE
284        self._queue_mode = queue_mode
285        self._starttime = None
286        self._endtime = None
287        self._completed = 0
288        self._queue = queue_class()
289        self._capacity = capacity
290        self._runners = [Runner(self, i) for i in range(num_runners)]
291
292        if queue_mode == QueueMode.CONTINUOUS:
293            self.start()
294
295    def abort(self, force=True):
296        """Abort the execution of the queued jobs.
297        If force=True running jobs are aborted, otherwise
298        only the queue is cleared, allowing running jobs to finish."""
299        if self.state() in [
300            QueueStatus.FINISHED,
301            QueueStatus.ABORTED
302        ]:
303            raise JobQueueStateException(
304                _("Inactive Job Queue can't be aborted")
305            )
306        self.set_state(QueueStatus.ABORTED)
307        self.set_queue_mode(QueueMode.SINGLE)
308        self._queue.clear()
309        if force:
310            for runner in self._runners:
311                if runner:
312                    # ignore runners that have already been set to None
313                    runner.abort()
314        self.aborted.emit()
315
316    def add_job(self, job):
317        """Enqueue a new job to the queue.
318
319        Some checks are made to determine the validity of adding a new job.
320        If the queue hasn't started yet or is in pause the job is simply
321        pushed to the queue, otherwise it will be determined whether an
322        idle runner is available to start with the job immediately.
323        """
324        if self.full():
325            raise IndexError(_("Job Queue full"))
326        if self.state() in [QueueStatus.FINISHED, QueueStatus.ABORTED]:
327            raise JobQueueStateException(
328                _("Can't add job to finished/aborted queue.")
329            )
330        elif self.state() in [QueueStatus.INACTIVE, QueueStatus.PAUSED]:
331            self._queue.push(job)
332            self.job_added.emit(job)
333        else:
334            runner = self.idle_runner()
335            if runner:
336                self.job_added.emit(job)
337                runner.start(job)
338                self.job_started.emit(job)
339            else:
340                self._queue.push(job)
341                self.job_added.emit(job)
342            self.set_state(
343                QueueStatus.EMPTY if self._queue.empty()
344                else QueueStatus.STARTED)
345
346    def completed(self, runner=-1):
347        """Return the number of completed jobs,
348        either for a given runner or the sum of all runners."""
349        if runner >= 0:
350            return self._runners[runner].completed()
351        else:
352            result = 0
353            for i in range(len(self._runners)):
354                result += self._runners[i].completed()
355            return result
356
357    def full(self):
358        """Returns True if a maximum capacity is set and used."""
359        if not self._capacity:
360            return False
361        return self._queue.length() == self._capacity
362
363    def is_idle(self):
364        """Returns True if all Runners are idle."""
365        for runner in self._runners:
366            if runner.is_running():
367                return False
368        return True
369
370    def is_running(self):
371        """Return True if the queue is 'live'."""
372        return self.state() not in ([
373            QueueStatus.INACTIVE,
374            QueueStatus.FINISHED,
375            QueueStatus.ABORTED
376        ])
377
378    def idle_runner(self):
379        """Returns the first idle Runner object,
380        or None if all are busy."""
381        for runner in self._runners:
382            if not runner.is_running():
383                return runner
384        return None
385
386    def job_completed(self, runner, job):
387        """Called by a runner once its job has completed.
388
389        Manage behaviour at that point, depending on the
390        queue's state and mode.
391        """
392        if self.state() == QueueStatus.STARTED:
393            runner.start(self.pop())
394        elif self.state() == QueueStatus.PAUSED:
395            # If a SINGLE queue completes the last job while in PAUSE mode
396            # it can be considered finished.
397            if (
398                self._queue.empty()
399                and self.is_idle()
400                and self.queue_mode() == QueueMode.SINGLE
401            ):
402                self.queue_finished()
403        elif self.is_idle():
404            # last runner has completed its job and queue is empty.
405            # Either finish queue or set to IDLE.
406            if self.queue_mode() == QueueMode.SINGLE:
407                self.queue_finished()
408            else:
409                self.set_state(QueueStatus.IDLE)
410                self.idle.emit()
411        self.job_done.emit(job)
412
413    def pause(self):
414        """Pauses the execution of the queue.
415        Running jobs are allowed to finish, but no new jobs will be started.
416        (The actual behaviour is implemented in job_completed().)"""
417        if self.state() in [
418            QueueStatus.INACTIVE,
419            QueueStatus.FINISHED,
420            QueueStatus.ABORTED
421        ]:
422            raise JobQueueStateException(
423                _("Non-running Job Queue can't be paused.")
424            )
425        self.set_state(QueueStatus.PAUSED)
426        self.paused.emit()
427
428    def pop(self):
429        """Return and remove the next Job.
430        Raises Exception if empty."""
431        if self.state() == QueueStatus.EMPTY:
432            raise IndexError("Job Queue is empty.")
433        if self.state() != QueueStatus.STARTED:
434            raise JobQueueStateException(
435                _("Can't pop job from non-started Job Queue")
436            )
437        j = self._queue.pop()
438        if self._queue.empty():
439            self.set_state(QueueStatus.EMPTY)
440            self.emptied.emit()
441        return j
442
443    def queue_finished(self):
444        """Called when the last job has been completed and the queue
445        is in SINGLE mode."""
446        if self.state() != QueueStatus.ABORTED:
447            self.set_state(QueueStatus.FINISHED)
448        self.finished.emit()
449
450    def queue_mode(self):
451        return self._queue_mode
452
453    def resume(self):
454        """Resume the queue from PAUSED state by trying to start
455        all runners."""
456        if not self.state() == QueueStatus.PAUSED:
457            raise JobQueueStateException(
458                _("Job Queue not paused, can't resume.")
459            )
460        self._start()
461        self.resumed.emit()
462
463    def set_idle(self):
464        """Set status to IDLE if all runners are in idle mode."""
465        for runner in self._runners:
466            if runner.is_running():
467                return
468        self.set_state(QueueStatus.IDLE)
469        self.idle.emit()
470
471    def set_queue_mode(self, mode):
472        self._queue_mode = mode
473
474    def size(self):
475        """Return the number of unstarted jobs."""
476        return self._queue.length()
477
478    def _start(self):
479        """Set the state to started and ask all runners to start."""
480        if self.state() in [QueueStatus.FINISHED, QueueStatus.ABORTED]:
481            raise JobQueueStateException(
482                _("Can't (re)start a finished/aborted Job Queue.")
483            )
484        elif self.state() == QueueStatus.STARTED:
485            raise JobQueueStateException(_("Queue already started."))
486        if self.state() in [
487            QueueStatus.INACTIVE,
488            QueueStatus.PAUSED
489        ]:
490            self.set_state(QueueStatus.STARTED)
491
492        if (
493            self.state() == QueueStatus.EMPTY
494            and self.queue_mode() == QueueMode.SINGLE
495        ):
496            raise IndexError(_("Can't start SINGLE-mode empty queue"))
497        if self._queue.empty():
498            self.set_state(QueueStatus.IDLE)
499        else:
500            self.set_state(QueueStatus.STARTED)
501            for runner in self._runners:
502                if self._queue.empty():
503                    break
504                if not runner.is_running():
505                    runner.start(self.pop())
506
507    def start(self):
508        """Start processing of the queue."""
509        if self.is_running():
510            raise JobQueueStateException(
511                _("Can't 'start' an active Job Queue."))
512        self._starttime = time.time()
513        self._start()
514        self.started.emit()
515
516    def set_state(self, state):
517        self._state = state
518
519    def state(self):
520        return self._state
521
522
523class GlobalJobQueue(QObject):
524    """The application-wide Job Queue that dispatches jobs to runners
525    and subordinate queues.
526    """
527
528    def __init__(self):
529        self.load_settings()
530        self._crawler = JobQueue()
531        self._engraver = JobQueue()
532        self._generic = JobQueue()
533        self._queues = {
534            'crawl': self._crawler,
535            'engrave': self._engraver,
536            'generic': self._generic
537        }
538        app.settingsChanged.connect(self.settings_changed)
539        app.aboutToQuit.connect(self.about_to_quit)
540
541    def about_to_quit(self):
542        # TODO:
543        # - is any queue active?
544        # - should jobs be aborted?
545        # - or only the queues be emptied?
546        # - _crawer can always be aborted immediately
547        pass
548
549    def add_job(self, j, target='engrave'):
550        """Add a job to the specified job queue."""
551        target_queue = self._queues.get(target, None)
552        if not target_queue:
553            raise ValueError(_("Invalid job queue target: {name}").format(name=target))
554        target_queue.add_job(j)
555
556    def load_settings(self):
557        # TODO: Load settings and create the JobQueues accordingly
558        pass
559
560    def settings_changed(self):
561        # TODO: If multicore-related settings have changed update the queues
562        pass
563