1# process.py -- A very simple wrapper around QProcess 2# 3# Copyright (c) 2012 by Wilbert Berendsen 4# 5# This program is free software; you can redistribute it and/or 6# modify it under the terms of the GNU General Public License 7# as published by the Free Software Foundation; either version 2 8# of the License, or (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program; if not, write to the Free Software 17# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 18# See http://www.gnu.org/licenses/ for more information. 19 20""" 21A (multithreaded) Job queue 22""" 23 24from enum import Enum 25import collections 26import time 27 28from PyQt5.QtCore import QObject 29 30import app 31import signals 32 33 34class RunnerBusyException(Exception): 35 """Raised when a Runner is asked to start a job (without the force=True 36 keyword argument) while having already a running job.""" 37 pass 38 39 40class Runner(QObject): 41 """A Runner in the stack of a JobQueue. 42 43 Responsible for executing a single job.Job instance. 44 Receives a job through the start command from the queue, reports 45 back about completion of the job. Any further actions are 46 managed by the queue. 47 48 References are kept to the queue and the job as well as the index 49 in the queue. 50 """ 51 52 def __init__(self, queue, index): 53 super(Runner, self).__init__() 54 self._queue = queue 55 self._index = index 56 self._job = None 57 self._completed = 0 58 59 def abort(self): 60 """Aborts a running job if any.""" 61 if self._job and self._job.is_running(): 62 self._job.abort() 63 64 def completed(self): 65 """Return the number of jobs completed during the Runner's lifetime.""" 66 return self._completed 67 68 def index(self): 69 """Return the index of the Runner in the JobQueue.""" 70 return self._index 71 72 def is_running(self): 73 return self._job and self._job.is_running() 74 75 def job(self): 76 return self._job 77 78 def job_done(self): 79 """Count job, notify queue, remove reference to Job object.""" 80 self._completed += 1 81 job = self._job 82 self._job = None 83 self._queue.job_completed(self, job) 84 85 def start(self, j, force=False): 86 """Start a given job. 87 If currently a job is running either abort that 88 or raise an exception.""" 89 if self._job and self.is_running(): 90 if force: 91 self.abort() 92 else: 93 raise RunnerBusyException( 94 _("Job is already running. Wait for completion.")) 95 self._job = j 96 j.set_runner(self) 97 j.done.connect(self.job_done) 98 j.start() 99 100 101class AbstractQueue(QObject): 102 """Common interface for the different queue types used in the JobQueue. 103 The various queue classes are very lightweight wrappers around the 104 corresponding concepts and base objects, with the only reason to 105 provide a transparent interface for used in JobQueue.""" 106 107 def __init__(self): 108 super(AbstractQueue, self).__init__() 109 110 def clear(self): 111 """Remove all entries from the queue.""" 112 raise NotImplementedError 113 114 def empty(self): 115 """Return True if there are no queued items.""" 116 return self.length() == 0 117 118 def length(self): 119 """Return the length of the queue. Only has to be overridden 120 when the data structure doesn't support len().""" 121 return len(self._queue) 122 123 def push(self, j): 124 """Add a job to the queue.""" 125 raise NotImplementedError 126 127 def pop(self): 128 """Remove and return the next job.""" 129 raise NotImplementedError 130 131 132class AbstractStackQueue(AbstractQueue): 133 """Common ancestor for LIFO and FIFO queues""" 134 135 def __init__(self): 136 super(AbstractStackQueue, self).__init__() 137 self._queue = collections.deque() 138 139 def clear(self): 140 """Remove all entries from the queue.""" 141 self._queue.clear() 142 143 def pop(self): 144 return self._queue.pop() 145 146 147class Queue(AbstractStackQueue): 148 """First-in-first-out queue (default operation).""" 149 150 def push(self, j): 151 self._queue.appendleft(j) 152 153 154class Stack(AbstractStackQueue): 155 """Last-in-first-out queue, or stack.""" 156 157 def push(self, j): 158 self._queue.append(j) 159 160 161class PriorityQueue(AbstractQueue): 162 """Priority queue, always popping the job with the highest priority. 163 164 Uses Job's priority() property (which defaults to 1) and a transparent 165 insert count to determine order of popping jobs. If jobs have the same 166 priority they will be served first-in-first-out.""" 167 168 def __init__(self): 169 super(PriorityQueue, self).__init__() 170 self._queue = [] 171 self._insert_count = 0 172 173 def clear(self): 174 """Remove all entries from the queue.""" 175 self._queue = [] 176 177 def push(self, j): 178 """Add a job to the queue. retrieve the priority from the job, 179 add an autoincrement value for comparing jobs with identical 180 priority.""" 181 from heapq import heappush 182 heappush(self._queue, (j.priority(), self._insert_count, j)) 183 self._insert_count += 1 184 185 def pop(self): 186 """Return the correct part of the tuplet 187 (1st: priority, 2nd: insert order).""" 188 from heapq import heappop 189 return heappop(self._queue)[2] 190 191 192class JobQueueException(Exception): 193 """Abstract base exception for JobQueue related exceptions.""" 194 pass 195 196 197class JobQueueStateException(JobQueueException): 198 """Raised when an operation is not allowed in the current 199 state of the queue.""" 200 pass 201 202 203class QueueStatus(Enum): 204 INACTIVE = 0 205 STARTED = 1 206 PAUSED = 2 207 EMPTY = 3 208 IDLE = 4 209 FINISHED = 5 210 ABORTED = 6 211 212 213class QueueMode(Enum): 214 """Running mode of the JobQueue. 215 CONTINUOUS means that the queue can be idle, waiting for new jobs, 216 SINGLE means that it will be considered finished when running empty.""" 217 CONTINUOUS = 0 218 SINGLE = 1 219 220 221class JobQueue(QObject): 222 """A (multiThreaded) Job Queue. 223 224 Manages a given number of "runners" which can process one job 225 at a time each. 226 227 An arbitrary number of job.Job() instances (or any descendants) 228 can be added to the queue, which are distributed in parallel 229 to the runners. 230 231 A special case is a queue with only one runner. This can be used 232 to ensure that asynchronous jobs can run in sequence, either to 233 ensure that only one such job runs at a time (e.g. to force 234 long-running stuff to the background) or to ensure subsequent 235 jobs can use the results of earlier ones. 236 237 The queue supports pause(), resume() and abort() operations. 238 It iterates through the states 239 QueueStatus.INACTIVE 240 STARTED 241 PAUSED (remaining jobs won't be started) 242 EMPTY (no *new* jobs are queued) 243 IDLE (no new jobs available, all jobs completed) 244 FINISHED 245 ABORTED 246 247 A Queue can work in two modes: QueueMode.CONTINUOUS (default) and 248 QueueMode.SINGLE. In single mode the queue is considered finished once 249 there are no jobs available while a continuous queue switches to 250 IDLE in that case. A CONTINUOUS queue is always considered to be 251 active (idle when empty) and doesn't have to be explicitly started. 252 253 If a 'capacity' is passed to the queue it has the notion of "full", 254 otherwise an unlimited number of jobs can be enqueued. 255 256 By default an internal FIFO (First in, first out) Queue is used 257 as the underlying data structure, but Stack and PriorityQueue are 258 available through the keyword command as well. 259 """ 260 261 started = signals.Signal() 262 paused = signals.Signal() 263 resumed = signals.Signal() 264 emptied = signals.Signal() # emitted when last job is popped 265 idle = signals.Signal() # emitted when waiting for new jobs 266 # after the last job has been completed 267 finished = signals.Signal() 268 aborted = signals.Signal() 269 # The following three signals emit the corresponding job as argument. 270 job_added = signals.Signal() 271 job_done = signals.Signal() # emitted when a job has been completed. 272 # When this is emitted, the queue's state has been 273 # updated (other than with the *job's* signal) 274 job_started = signals.Signal() 275 276 def __init__(self, 277 queue_class=Queue, 278 queue_mode=QueueMode.CONTINUOUS, 279 num_runners=1, 280 tick_interval=1000, 281 capacity=None): 282 super(JobQueue, self).__init__() 283 self._state = QueueStatus.INACTIVE 284 self._queue_mode = queue_mode 285 self._starttime = None 286 self._endtime = None 287 self._completed = 0 288 self._queue = queue_class() 289 self._capacity = capacity 290 self._runners = [Runner(self, i) for i in range(num_runners)] 291 292 if queue_mode == QueueMode.CONTINUOUS: 293 self.start() 294 295 def abort(self, force=True): 296 """Abort the execution of the queued jobs. 297 If force=True running jobs are aborted, otherwise 298 only the queue is cleared, allowing running jobs to finish.""" 299 if self.state() in [ 300 QueueStatus.FINISHED, 301 QueueStatus.ABORTED 302 ]: 303 raise JobQueueStateException( 304 _("Inactive Job Queue can't be aborted") 305 ) 306 self.set_state(QueueStatus.ABORTED) 307 self.set_queue_mode(QueueMode.SINGLE) 308 self._queue.clear() 309 if force: 310 for runner in self._runners: 311 if runner: 312 # ignore runners that have already been set to None 313 runner.abort() 314 self.aborted.emit() 315 316 def add_job(self, job): 317 """Enqueue a new job to the queue. 318 319 Some checks are made to determine the validity of adding a new job. 320 If the queue hasn't started yet or is in pause the job is simply 321 pushed to the queue, otherwise it will be determined whether an 322 idle runner is available to start with the job immediately. 323 """ 324 if self.full(): 325 raise IndexError(_("Job Queue full")) 326 if self.state() in [QueueStatus.FINISHED, QueueStatus.ABORTED]: 327 raise JobQueueStateException( 328 _("Can't add job to finished/aborted queue.") 329 ) 330 elif self.state() in [QueueStatus.INACTIVE, QueueStatus.PAUSED]: 331 self._queue.push(job) 332 self.job_added.emit(job) 333 else: 334 runner = self.idle_runner() 335 if runner: 336 self.job_added.emit(job) 337 runner.start(job) 338 self.job_started.emit(job) 339 else: 340 self._queue.push(job) 341 self.job_added.emit(job) 342 self.set_state( 343 QueueStatus.EMPTY if self._queue.empty() 344 else QueueStatus.STARTED) 345 346 def completed(self, runner=-1): 347 """Return the number of completed jobs, 348 either for a given runner or the sum of all runners.""" 349 if runner >= 0: 350 return self._runners[runner].completed() 351 else: 352 result = 0 353 for i in range(len(self._runners)): 354 result += self._runners[i].completed() 355 return result 356 357 def full(self): 358 """Returns True if a maximum capacity is set and used.""" 359 if not self._capacity: 360 return False 361 return self._queue.length() == self._capacity 362 363 def is_idle(self): 364 """Returns True if all Runners are idle.""" 365 for runner in self._runners: 366 if runner.is_running(): 367 return False 368 return True 369 370 def is_running(self): 371 """Return True if the queue is 'live'.""" 372 return self.state() not in ([ 373 QueueStatus.INACTIVE, 374 QueueStatus.FINISHED, 375 QueueStatus.ABORTED 376 ]) 377 378 def idle_runner(self): 379 """Returns the first idle Runner object, 380 or None if all are busy.""" 381 for runner in self._runners: 382 if not runner.is_running(): 383 return runner 384 return None 385 386 def job_completed(self, runner, job): 387 """Called by a runner once its job has completed. 388 389 Manage behaviour at that point, depending on the 390 queue's state and mode. 391 """ 392 if self.state() == QueueStatus.STARTED: 393 runner.start(self.pop()) 394 elif self.state() == QueueStatus.PAUSED: 395 # If a SINGLE queue completes the last job while in PAUSE mode 396 # it can be considered finished. 397 if ( 398 self._queue.empty() 399 and self.is_idle() 400 and self.queue_mode() == QueueMode.SINGLE 401 ): 402 self.queue_finished() 403 elif self.is_idle(): 404 # last runner has completed its job and queue is empty. 405 # Either finish queue or set to IDLE. 406 if self.queue_mode() == QueueMode.SINGLE: 407 self.queue_finished() 408 else: 409 self.set_state(QueueStatus.IDLE) 410 self.idle.emit() 411 self.job_done.emit(job) 412 413 def pause(self): 414 """Pauses the execution of the queue. 415 Running jobs are allowed to finish, but no new jobs will be started. 416 (The actual behaviour is implemented in job_completed().)""" 417 if self.state() in [ 418 QueueStatus.INACTIVE, 419 QueueStatus.FINISHED, 420 QueueStatus.ABORTED 421 ]: 422 raise JobQueueStateException( 423 _("Non-running Job Queue can't be paused.") 424 ) 425 self.set_state(QueueStatus.PAUSED) 426 self.paused.emit() 427 428 def pop(self): 429 """Return and remove the next Job. 430 Raises Exception if empty.""" 431 if self.state() == QueueStatus.EMPTY: 432 raise IndexError("Job Queue is empty.") 433 if self.state() != QueueStatus.STARTED: 434 raise JobQueueStateException( 435 _("Can't pop job from non-started Job Queue") 436 ) 437 j = self._queue.pop() 438 if self._queue.empty(): 439 self.set_state(QueueStatus.EMPTY) 440 self.emptied.emit() 441 return j 442 443 def queue_finished(self): 444 """Called when the last job has been completed and the queue 445 is in SINGLE mode.""" 446 if self.state() != QueueStatus.ABORTED: 447 self.set_state(QueueStatus.FINISHED) 448 self.finished.emit() 449 450 def queue_mode(self): 451 return self._queue_mode 452 453 def resume(self): 454 """Resume the queue from PAUSED state by trying to start 455 all runners.""" 456 if not self.state() == QueueStatus.PAUSED: 457 raise JobQueueStateException( 458 _("Job Queue not paused, can't resume.") 459 ) 460 self._start() 461 self.resumed.emit() 462 463 def set_idle(self): 464 """Set status to IDLE if all runners are in idle mode.""" 465 for runner in self._runners: 466 if runner.is_running(): 467 return 468 self.set_state(QueueStatus.IDLE) 469 self.idle.emit() 470 471 def set_queue_mode(self, mode): 472 self._queue_mode = mode 473 474 def size(self): 475 """Return the number of unstarted jobs.""" 476 return self._queue.length() 477 478 def _start(self): 479 """Set the state to started and ask all runners to start.""" 480 if self.state() in [QueueStatus.FINISHED, QueueStatus.ABORTED]: 481 raise JobQueueStateException( 482 _("Can't (re)start a finished/aborted Job Queue.") 483 ) 484 elif self.state() == QueueStatus.STARTED: 485 raise JobQueueStateException(_("Queue already started.")) 486 if self.state() in [ 487 QueueStatus.INACTIVE, 488 QueueStatus.PAUSED 489 ]: 490 self.set_state(QueueStatus.STARTED) 491 492 if ( 493 self.state() == QueueStatus.EMPTY 494 and self.queue_mode() == QueueMode.SINGLE 495 ): 496 raise IndexError(_("Can't start SINGLE-mode empty queue")) 497 if self._queue.empty(): 498 self.set_state(QueueStatus.IDLE) 499 else: 500 self.set_state(QueueStatus.STARTED) 501 for runner in self._runners: 502 if self._queue.empty(): 503 break 504 if not runner.is_running(): 505 runner.start(self.pop()) 506 507 def start(self): 508 """Start processing of the queue.""" 509 if self.is_running(): 510 raise JobQueueStateException( 511 _("Can't 'start' an active Job Queue.")) 512 self._starttime = time.time() 513 self._start() 514 self.started.emit() 515 516 def set_state(self, state): 517 self._state = state 518 519 def state(self): 520 return self._state 521 522 523class GlobalJobQueue(QObject): 524 """The application-wide Job Queue that dispatches jobs to runners 525 and subordinate queues. 526 """ 527 528 def __init__(self): 529 self.load_settings() 530 self._crawler = JobQueue() 531 self._engraver = JobQueue() 532 self._generic = JobQueue() 533 self._queues = { 534 'crawl': self._crawler, 535 'engrave': self._engraver, 536 'generic': self._generic 537 } 538 app.settingsChanged.connect(self.settings_changed) 539 app.aboutToQuit.connect(self.about_to_quit) 540 541 def about_to_quit(self): 542 # TODO: 543 # - is any queue active? 544 # - should jobs be aborted? 545 # - or only the queues be emptied? 546 # - _crawer can always be aborted immediately 547 pass 548 549 def add_job(self, j, target='engrave'): 550 """Add a job to the specified job queue.""" 551 target_queue = self._queues.get(target, None) 552 if not target_queue: 553 raise ValueError(_("Invalid job queue target: {name}").format(name=target)) 554 target_queue.add_job(j) 555 556 def load_settings(self): 557 # TODO: Load settings and create the JobQueues accordingly 558 pass 559 560 def settings_changed(self): 561 # TODO: If multicore-related settings have changed update the queues 562 pass 563