1# -*- coding: utf-8 -*- 2from __future__ import (absolute_import, division, print_function, 3 unicode_literals) 4 5import inspect 6import warnings 7import zlib 8from functools import partial 9from uuid import uuid4 10 11from rq.compat import as_text, decode_redis_hash, string_types, text_type 12 13from .connections import resolve_connection 14from .exceptions import InvalidJobDependency, NoSuchJobError, UnpickleError 15from .local import LocalStack 16from .utils import (enum, import_attribute, parse_timeout, str_to_date, 17 utcformat, utcnow) 18 19try: 20 import cPickle as pickle 21except ImportError: # noqa # pragma: no cover 22 import pickle 23 24 25# Serialize pickle dumps using the highest pickle protocol (binary, default 26# uses ascii) 27dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) 28loads = pickle.loads 29 30JobStatus = enum( 31 'JobStatus', 32 QUEUED='queued', 33 FINISHED='finished', 34 FAILED='failed', 35 STARTED='started', 36 DEFERRED='deferred', 37 SCHEDULED='scheduled', 38) 39 40# Sentinel value to mark that some of our lazily evaluated properties have not 41# yet been evaluated. 42UNEVALUATED = object() 43 44 45def unpickle(pickled_string): 46 """Unpickles a string, but raises a unified UnpickleError in case anything 47 fails. 48 49 This is a helper method to not have to deal with the fact that `loads()` 50 potentially raises many types of exceptions (e.g. AttributeError, 51 IndexError, TypeError, KeyError, etc.) 52 """ 53 try: 54 obj = loads(pickled_string) 55 except Exception as e: 56 raise UnpickleError('Could not unpickle', pickled_string, e) 57 return obj 58 59 60def cancel_job(job_id, connection=None): 61 """Cancels the job with the given job ID, preventing execution. Discards 62 any job info (i.e. it can't be requeued later). 63 """ 64 Job.fetch(job_id, connection=connection).cancel() 65 66 67def get_current_job(connection=None, job_class=None): 68 """Returns the Job instance that is currently being executed. If this 69 function is invoked from outside a job context, None is returned. 70 """ 71 if job_class: 72 warnings.warn("job_class argument for get_current_job is deprecated.", 73 DeprecationWarning) 74 return _job_stack.top 75 76 77def requeue_job(job_id, connection): 78 job = Job.fetch(job_id, connection=connection) 79 return job.requeue() 80 81 82class Job(object): 83 """A Job is just a convenient datastructure to pass around job (meta) data. 84 """ 85 redis_job_namespace_prefix = 'rq:job:' 86 87 # Job construction 88 @classmethod 89 def create(cls, func, args=None, kwargs=None, connection=None, 90 result_ttl=None, ttl=None, status=None, description=None, 91 depends_on=None, timeout=None, id=None, origin=None, meta=None, 92 failure_ttl=None): 93 """Creates a new Job instance for the given function, arguments, and 94 keyword arguments. 95 """ 96 if args is None: 97 args = () 98 if kwargs is None: 99 kwargs = {} 100 101 if not isinstance(args, (tuple, list)): 102 raise TypeError('{0!r} is not a valid args list'.format(args)) 103 if not isinstance(kwargs, dict): 104 raise TypeError('{0!r} is not a valid kwargs dict'.format(kwargs)) 105 106 job = cls(connection=connection) 107 if id is not None: 108 job.set_id(id) 109 110 if origin is not None: 111 job.origin = origin 112 113 # Set the core job tuple properties 114 job._instance = None 115 if inspect.ismethod(func): 116 job._instance = func.__self__ 117 job._func_name = func.__name__ 118 elif inspect.isfunction(func) or inspect.isbuiltin(func): 119 job._func_name = '{0}.{1}'.format(func.__module__, func.__name__) 120 elif isinstance(func, string_types): 121 job._func_name = as_text(func) 122 elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance 123 job._instance = func 124 job._func_name = '__call__' 125 else: 126 raise TypeError('Expected a callable or a string, but got: {0}'.format(func)) 127 job._args = args 128 job._kwargs = kwargs 129 130 # Extra meta data 131 job.description = description or job.get_call_string() 132 job.result_ttl = parse_timeout(result_ttl) 133 job.failure_ttl = parse_timeout(failure_ttl) 134 job.ttl = parse_timeout(ttl) 135 job.timeout = parse_timeout(timeout) 136 job._status = status 137 job.meta = meta or {} 138 139 # dependency could be job instance or id 140 if depends_on is not None: 141 job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] 142 return job 143 144 def get_status(self, refresh=True): 145 if refresh: 146 self._status = as_text(self.connection.hget(self.key, 'status')) 147 148 return self._status 149 150 def set_status(self, status, pipeline=None): 151 self._status = status 152 connection = pipeline or self.connection 153 connection.hset(self.key, 'status', self._status) 154 155 @property 156 def is_finished(self): 157 return self.get_status() == JobStatus.FINISHED 158 159 @property 160 def is_queued(self): 161 return self.get_status() == JobStatus.QUEUED 162 163 @property 164 def is_failed(self): 165 return self.get_status() == JobStatus.FAILED 166 167 @property 168 def is_started(self): 169 return self.get_status() == JobStatus.STARTED 170 171 @property 172 def is_deferred(self): 173 return self.get_status() == JobStatus.DEFERRED 174 175 @property 176 def is_scheduled(self): 177 return self.get_status() == JobStatus.SCHEDULED 178 179 @property 180 def _dependency_id(self): 181 """Returns the first item in self._dependency_ids. Present 182 preserve compatibility with third party packages.. 183 """ 184 if self._dependency_ids: 185 return self._dependency_ids[0] 186 187 @property 188 def dependency(self): 189 """Returns a job's dependency. To avoid repeated Redis fetches, we cache 190 job.dependency as job._dependency. 191 """ 192 if not self._dependency_ids: 193 return None 194 if hasattr(self, '_dependency'): 195 return self._dependency 196 job = self.fetch(self._dependency_ids[0], connection=self.connection) 197 self._dependency = job 198 return job 199 200 @property 201 def dependent_ids(self): 202 """Returns a list of ids of jobs whose execution depends on this 203 job's successful execution.""" 204 return list(map(as_text, self.connection.smembers(self.dependents_key))) 205 206 @property 207 def func(self): 208 func_name = self.func_name 209 if func_name is None: 210 return None 211 212 if self.instance: 213 return getattr(self.instance, func_name) 214 215 return import_attribute(self.func_name) 216 217 def _unpickle_data(self): 218 self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) 219 220 @property 221 def data(self): 222 if self._data is UNEVALUATED: 223 if self._func_name is UNEVALUATED: 224 raise ValueError('Cannot build the job data') 225 226 if self._instance is UNEVALUATED: 227 self._instance = None 228 229 if self._args is UNEVALUATED: 230 self._args = () 231 232 if self._kwargs is UNEVALUATED: 233 self._kwargs = {} 234 235 job_tuple = self._func_name, self._instance, self._args, self._kwargs 236 self._data = dumps(job_tuple) 237 return self._data 238 239 @data.setter 240 def data(self, value): 241 self._data = value 242 self._func_name = UNEVALUATED 243 self._instance = UNEVALUATED 244 self._args = UNEVALUATED 245 self._kwargs = UNEVALUATED 246 247 @property 248 def func_name(self): 249 if self._func_name is UNEVALUATED: 250 self._unpickle_data() 251 return self._func_name 252 253 @func_name.setter 254 def func_name(self, value): 255 self._func_name = value 256 self._data = UNEVALUATED 257 258 @property 259 def instance(self): 260 if self._instance is UNEVALUATED: 261 self._unpickle_data() 262 return self._instance 263 264 @instance.setter 265 def instance(self, value): 266 self._instance = value 267 self._data = UNEVALUATED 268 269 @property 270 def args(self): 271 if self._args is UNEVALUATED: 272 self._unpickle_data() 273 return self._args 274 275 @args.setter 276 def args(self, value): 277 self._args = value 278 self._data = UNEVALUATED 279 280 @property 281 def kwargs(self): 282 if self._kwargs is UNEVALUATED: 283 self._unpickle_data() 284 return self._kwargs 285 286 @kwargs.setter 287 def kwargs(self, value): 288 self._kwargs = value 289 self._data = UNEVALUATED 290 291 @classmethod 292 def exists(cls, job_id, connection=None): 293 """Returns whether a job hash exists for the given job ID.""" 294 conn = resolve_connection(connection) 295 return conn.exists(cls.key_for(job_id)) 296 297 @classmethod 298 def fetch(cls, id, connection=None): 299 """Fetches a persisted job from its corresponding Redis key and 300 instantiates it. 301 """ 302 job = cls(id, connection=connection) 303 job.refresh() 304 return job 305 306 @classmethod 307 def fetch_many(cls, job_ids, connection): 308 """ 309 Bulk version of Job.fetch 310 311 For any job_ids which a job does not exist, the corresponding item in 312 the returned list will be None. 313 """ 314 with connection.pipeline() as pipeline: 315 for job_id in job_ids: 316 pipeline.hgetall(cls.key_for(job_id)) 317 results = pipeline.execute() 318 319 jobs = [] 320 for i, job_id in enumerate(job_ids): 321 if results[i]: 322 job = cls(job_id, connection=connection) 323 job.restore(results[i]) 324 jobs.append(job) 325 else: 326 jobs.append(None) 327 328 return jobs 329 330 def __init__(self, id=None, connection=None): 331 self.connection = resolve_connection(connection) 332 self._id = id 333 self.created_at = utcnow() 334 self._data = UNEVALUATED 335 self._func_name = UNEVALUATED 336 self._instance = UNEVALUATED 337 self._args = UNEVALUATED 338 self._kwargs = UNEVALUATED 339 self.description = None 340 self.origin = None 341 self.enqueued_at = None 342 self.started_at = None 343 self.ended_at = None 344 self._result = None 345 self.exc_info = None 346 self.timeout = None 347 self.result_ttl = None 348 self.failure_ttl = None 349 self.ttl = None 350 self._status = None 351 self._dependency_ids = [] 352 self.meta = {} 353 354 def __repr__(self): # noqa # pragma: no cover 355 return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, 356 self._id, 357 self.enqueued_at) 358 359 def __str__(self): 360 return '<{0} {1}: {2}>'.format(self.__class__.__name__, 361 self.id, 362 self.description) 363 364 # Job equality 365 def __eq__(self, other): # noqa 366 return isinstance(other, self.__class__) and self.id == other.id 367 368 def __hash__(self): # pragma: no cover 369 return hash(self.id) 370 371 # Data access 372 def get_id(self): # noqa 373 """The job ID for this job instance. Generates an ID lazily the 374 first time the ID is requested. 375 """ 376 if self._id is None: 377 self._id = text_type(uuid4()) 378 return self._id 379 380 def set_id(self, value): 381 """Sets a job ID for the given job.""" 382 if not isinstance(value, string_types): 383 raise TypeError('id must be a string, not {0}'.format(type(value))) 384 self._id = value 385 386 id = property(get_id, set_id) 387 388 @classmethod 389 def key_for(cls, job_id): 390 """The Redis key that is used to store job hash under.""" 391 return (cls.redis_job_namespace_prefix + job_id).encode('utf-8') 392 393 @classmethod 394 def dependents_key_for(cls, job_id): 395 """The Redis key that is used to store job dependents hash under.""" 396 return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id) 397 398 @property 399 def key(self): 400 """The Redis key that is used to store job hash under.""" 401 return self.key_for(self.id) 402 403 @property 404 def dependents_key(self): 405 """The Redis key that is used to store job dependents hash under.""" 406 return self.dependents_key_for(self.id) 407 408 @property 409 def dependencies_key(self): 410 return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id) 411 412 def fetch_dependencies(self, watch=False, pipeline=None): 413 """ 414 Fetch all of a job's dependencies. If a pipeline is supplied, and 415 watch is true, then set WATCH on all the keys of all dependencies. 416 417 Returned jobs will use self's connection, not the pipeline supplied. 418 """ 419 connection = pipeline if pipeline is not None else self.connection 420 421 if watch and self._dependency_ids: 422 connection.watch(*self._dependency_ids) 423 424 jobs = self.fetch_many(self._dependency_ids, connection=self.connection) 425 426 for i, job in enumerate(jobs): 427 if not job: 428 raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i])) 429 430 return jobs 431 432 433 @property 434 def result(self): 435 """Returns the return value of the job. 436 437 Initially, right after enqueueing a job, the return value will be 438 None. But when the job has been executed, and had a return value or 439 exception, this will return that value or exception. 440 441 Note that, when the job has no return value (i.e. returns None), the 442 ReadOnlyJob object is useless, as the result won't be written back to 443 Redis. 444 445 Also note that you cannot draw the conclusion that a job has _not_ 446 been executed when its return value is None, since return values 447 written back to Redis will expire after a given amount of time (500 448 seconds by default). 449 """ 450 if self._result is None: 451 rv = self.connection.hget(self.key, 'result') 452 if rv is not None: 453 # cache the result 454 self._result = loads(rv) 455 return self._result 456 457 """Backwards-compatibility accessor property `return_value`.""" 458 return_value = result 459 460 def restore(self, raw_data): 461 """Overwrite properties with the provided values stored in Redis""" 462 obj = decode_redis_hash(raw_data) 463 try: 464 raw_data = obj['data'] 465 except KeyError: 466 raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) 467 468 try: 469 self.data = zlib.decompress(raw_data) 470 except zlib.error: 471 # Fallback to uncompressed string 472 self.data = raw_data 473 474 self.created_at = str_to_date(obj.get('created_at')) 475 self.origin = as_text(obj.get('origin')) 476 self.description = as_text(obj.get('description')) 477 self.enqueued_at = str_to_date(obj.get('enqueued_at')) 478 self.started_at = str_to_date(obj.get('started_at')) 479 self.ended_at = str_to_date(obj.get('ended_at')) 480 result = obj.get('result') 481 if result: 482 try: 483 self._result = unpickle(obj.get('result')) 484 except UnpickleError: 485 self._result = 'Unpickleable return value' 486 self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None 487 self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa 488 self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa 489 self._status = as_text(obj.get('status')) if obj.get('status') else None 490 491 dependency_id = obj.get('dependency_id', None) 492 self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] 493 494 self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None 495 self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} 496 497 raw_exc_info = obj.get('exc_info') 498 if raw_exc_info: 499 try: 500 self.exc_info = as_text(zlib.decompress(raw_exc_info)) 501 except zlib.error: 502 # Fallback to uncompressed string 503 self.exc_info = as_text(raw_exc_info) 504 505 # Persistence 506 def refresh(self): # noqa 507 """Overwrite the current instance's properties with the values in the 508 corresponding Redis key. 509 510 Will raise a NoSuchJobError if no corresponding Redis key exists. 511 """ 512 data = self.connection.hgetall(self.key) 513 if not data: 514 raise NoSuchJobError('No such job: {0}'.format(self.key)) 515 self.restore(data) 516 517 def to_dict(self, include_meta=True): 518 """ 519 Returns a serialization of the current job instance 520 521 You can exclude serializing the `meta` dictionary by setting 522 `include_meta=False`. 523 """ 524 obj = {} 525 obj['created_at'] = utcformat(self.created_at or utcnow()) 526 obj['data'] = zlib.compress(self.data) 527 528 if self.origin is not None: 529 obj['origin'] = self.origin 530 if self.description is not None: 531 obj['description'] = self.description 532 if self.enqueued_at is not None: 533 obj['enqueued_at'] = utcformat(self.enqueued_at) 534 if self.started_at is not None: 535 obj['started_at'] = utcformat(self.started_at) 536 if self.ended_at is not None: 537 obj['ended_at'] = utcformat(self.ended_at) 538 if self._result is not None: 539 try: 540 obj['result'] = dumps(self._result) 541 except: 542 obj['result'] = 'Unpickleable return value' 543 if self.exc_info is not None: 544 obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8')) 545 if self.timeout is not None: 546 obj['timeout'] = self.timeout 547 if self.result_ttl is not None: 548 obj['result_ttl'] = self.result_ttl 549 if self.failure_ttl is not None: 550 obj['failure_ttl'] = self.failure_ttl 551 if self._status is not None: 552 obj['status'] = self._status 553 if self._dependency_ids: 554 obj['dependency_id'] = self._dependency_ids[0] 555 if self.meta and include_meta: 556 obj['meta'] = dumps(self.meta) 557 if self.ttl: 558 obj['ttl'] = self.ttl 559 560 return obj 561 562 def save(self, pipeline=None, include_meta=True): 563 """ 564 Dumps the current job instance to its corresponding Redis key. 565 566 Exclude saving the `meta` dictionary by setting 567 `include_meta=False`. This is useful to prevent clobbering 568 user metadata without an expensive `refresh()` call first. 569 570 Redis key persistence may be altered by `cleanup()` method. 571 """ 572 key = self.key 573 connection = pipeline if pipeline is not None else self.connection 574 575 connection.hmset(key, self.to_dict(include_meta=include_meta)) 576 577 def save_meta(self): 578 """Stores job meta from the job instance to the corresponding Redis key.""" 579 meta = dumps(self.meta) 580 self.connection.hset(self.key, 'meta', meta) 581 582 def cancel(self, pipeline=None): 583 """Cancels the given job, which will prevent the job from ever being 584 ran (or inspected). 585 586 This method merely exists as a high-level API call to cancel jobs 587 without worrying about the internals required to implement job 588 cancellation. 589 """ 590 from .queue import Queue 591 pipeline = pipeline or self.connection.pipeline() 592 if self.origin: 593 q = Queue(name=self.origin, connection=self.connection) 594 q.remove(self, pipeline=pipeline) 595 pipeline.execute() 596 597 def requeue(self): 598 """Requeues job.""" 599 self.failed_job_registry.requeue(self) 600 601 def delete(self, pipeline=None, remove_from_queue=True, 602 delete_dependents=False): 603 """Cancels the job and deletes the job hash from Redis. Jobs depending 604 on this job can optionally be deleted as well.""" 605 if remove_from_queue: 606 self.cancel(pipeline=pipeline) 607 connection = pipeline if pipeline is not None else self.connection 608 609 if self.is_finished: 610 from .registry import FinishedJobRegistry 611 registry = FinishedJobRegistry(self.origin, 612 connection=self.connection, 613 job_class=self.__class__) 614 registry.remove(self, pipeline=pipeline) 615 616 elif self.is_deferred: 617 from .registry import DeferredJobRegistry 618 registry = DeferredJobRegistry(self.origin, 619 connection=self.connection, 620 job_class=self.__class__) 621 registry.remove(self, pipeline=pipeline) 622 623 elif self.is_started: 624 from .registry import StartedJobRegistry 625 registry = StartedJobRegistry(self.origin, 626 connection=self.connection, 627 job_class=self.__class__) 628 registry.remove(self, pipeline=pipeline) 629 630 elif self.is_scheduled: 631 from .registry import ScheduledJobRegistry 632 registry = ScheduledJobRegistry(self.origin, 633 connection=self.connection, 634 job_class=self.__class__) 635 registry.remove(self, pipeline=pipeline) 636 637 elif self.is_failed: 638 self.failed_job_registry.remove(self, pipeline=pipeline) 639 640 if delete_dependents: 641 self.delete_dependents(pipeline=pipeline) 642 643 connection.delete(self.key, self.dependents_key, self.dependencies_key) 644 645 def delete_dependents(self, pipeline=None): 646 """Delete jobs depending on this job.""" 647 connection = pipeline if pipeline is not None else self.connection 648 for dependent_id in self.dependent_ids: 649 try: 650 job = Job.fetch(dependent_id, connection=self.connection) 651 job.delete(pipeline=pipeline, 652 remove_from_queue=False) 653 except NoSuchJobError: 654 # It could be that the dependent job was never saved to redis 655 pass 656 connection.delete(self.dependents_key) 657 658 # Job execution 659 def perform(self): # noqa 660 """Invokes the job function with the job arguments.""" 661 self.connection.persist(self.key) 662 _job_stack.push(self) 663 try: 664 self._result = self._execute() 665 finally: 666 assert self is _job_stack.pop() 667 return self._result 668 669 def _execute(self): 670 return self.func(*self.args, **self.kwargs) 671 672 def get_ttl(self, default_ttl=None): 673 """Returns ttl for a job that determines how long a job will be 674 persisted. In the future, this method will also be responsible 675 for determining ttl for repeated jobs. 676 """ 677 return default_ttl if self.ttl is None else self.ttl 678 679 def get_result_ttl(self, default_ttl=None): 680 """Returns ttl for a job that determines how long a jobs result will 681 be persisted. In the future, this method will also be responsible 682 for determining ttl for repeated jobs. 683 """ 684 return default_ttl if self.result_ttl is None else self.result_ttl 685 686 # Representation 687 def get_call_string(self): # noqa 688 """Returns a string representation of the call, formatted as a regular 689 Python function invocation statement. 690 """ 691 if self.func_name is None: 692 return None 693 694 arg_list = [as_text(repr(arg)) for arg in self.args] 695 696 kwargs = ['{0}={1}'.format(k, as_text(repr(v))) for k, v in self.kwargs.items()] 697 # Sort here because python 3.3 & 3.4 makes different call_string 698 arg_list += sorted(kwargs) 699 args = ', '.join(arg_list) 700 701 return '{0}({1})'.format(self.func_name, args) 702 703 def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True): 704 """Prepare job for eventual deletion (if needed). This method is usually 705 called after successful execution. How long we persist the job and its 706 result depends on the value of ttl: 707 - If ttl is 0, cleanup the job immediately. 708 - If it's a positive number, set the job to expire in X seconds. 709 - If ttl is negative, don't set an expiry to it (persist 710 forever) 711 """ 712 if ttl == 0: 713 self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) 714 elif not ttl: 715 return 716 elif ttl > 0: 717 connection = pipeline if pipeline is not None else self.connection 718 connection.expire(self.key, ttl) 719 connection.expire(self.dependents_key, ttl) 720 connection.expire(self.dependencies_key, ttl) 721 722 @property 723 def failed_job_registry(self): 724 from .registry import FailedJobRegistry 725 return FailedJobRegistry(self.origin, connection=self.connection, 726 job_class=self.__class__) 727 728 def register_dependency(self, pipeline=None): 729 """Jobs may have dependencies. Jobs are enqueued only if the job they 730 depend on is successfully performed. We record this relation as 731 a reverse dependency (a Redis set), with a key that looks something 732 like: 733 734 rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} 735 736 This method adds the job in its dependency's dependents set 737 and adds the job to DeferredJobRegistry. 738 """ 739 from .registry import DeferredJobRegistry 740 741 registry = DeferredJobRegistry(self.origin, 742 connection=self.connection, 743 job_class=self.__class__) 744 registry.add(self, pipeline=pipeline) 745 746 connection = pipeline if pipeline is not None else self.connection 747 748 for dependency_id in self._dependency_ids: 749 dependents_key = self.dependents_key_for(dependency_id) 750 connection.sadd(dependents_key, self.id) 751 connection.sadd(self.dependencies_key, dependency_id) 752 753_job_stack = LocalStack() 754