1from __future__ import division 2 3from math import ceil 4 5from django.contrib import admin, messages 6from django.contrib.admin.views.decorators import staff_member_required 7from django.http import Http404, JsonResponse 8from django.shortcuts import redirect, render 9from django.urls import reverse 10from django.views.decorators.cache import never_cache 11 12from redis.exceptions import ResponseError 13from rq import requeue_job 14from rq.exceptions import NoSuchJobError 15from rq.job import Job, JobStatus 16from rq.registry import ( 17 DeferredJobRegistry, 18 FailedJobRegistry, 19 FinishedJobRegistry, 20 ScheduledJobRegistry, 21 StartedJobRegistry, 22) 23from rq.worker import Worker 24from rq.worker_registration import clean_worker_registry 25 26from .queues import get_queue_by_index 27from .settings import API_TOKEN 28from .utils import get_statistics, get_jobs 29 30 31@never_cache 32@staff_member_required 33def stats(request): 34 context_data = { 35 **admin.site.each_context(request), 36 **get_statistics(run_maintenance_tasks=True) 37 } 38 return render(request, 'django_rq/stats.html', context_data) 39 40 41def stats_json(request, token=None): 42 if request.user.is_staff or (token and token == API_TOKEN): 43 return JsonResponse(get_statistics()) 44 45 return JsonResponse({ 46 "error": True, 47 "description": "Please configure API_TOKEN in settings.py before accessing this view." 48 }) 49 50 51@never_cache 52@staff_member_required 53def jobs(request, queue_index): 54 queue_index = int(queue_index) 55 queue = get_queue_by_index(queue_index) 56 57 items_per_page = 100 58 num_jobs = queue.count 59 page = int(request.GET.get('page', 1)) 60 61 if num_jobs > 0: 62 last_page = int(ceil(num_jobs / items_per_page)) 63 page_range = range(1, last_page + 1) 64 offset = items_per_page * (page - 1) 65 jobs = queue.get_jobs(offset, items_per_page) 66 else: 67 jobs = [] 68 page_range = [] 69 70 context_data = { 71 **admin.site.each_context(request), 72 'queue': queue, 73 'queue_index': queue_index, 74 'jobs': jobs, 75 'num_jobs': num_jobs, 76 'page': page, 77 'page_range': page_range, 78 'job_status': 'Queued', 79 } 80 return render(request, 'django_rq/jobs.html', context_data) 81 82 83@never_cache 84@staff_member_required 85def finished_jobs(request, queue_index): 86 queue_index = int(queue_index) 87 queue = get_queue_by_index(queue_index) 88 89 registry = FinishedJobRegistry(queue.name, queue.connection) 90 91 items_per_page = 100 92 num_jobs = len(registry) 93 page = int(request.GET.get('page', 1)) 94 jobs = [] 95 96 if num_jobs > 0: 97 last_page = int(ceil(num_jobs / items_per_page)) 98 page_range = range(1, last_page + 1) 99 offset = items_per_page * (page - 1) 100 job_ids = registry.get_job_ids(offset, offset + items_per_page - 1) 101 jobs = get_jobs(queue, job_ids, registry) 102 103 else: 104 page_range = [] 105 106 context_data = { 107 **admin.site.each_context(request), 108 'queue': queue, 109 'queue_index': queue_index, 110 'jobs': jobs, 111 'num_jobs': num_jobs, 112 'page': page, 113 'page_range': page_range, 114 'job_status': 'Finished', 115 } 116 return render(request, 'django_rq/jobs.html', context_data) 117 118 119@never_cache 120@staff_member_required 121def failed_jobs(request, queue_index): 122 queue_index = int(queue_index) 123 queue = get_queue_by_index(queue_index) 124 125 registry = FailedJobRegistry(queue.name, queue.connection) 126 127 items_per_page = 100 128 num_jobs = len(registry) 129 page = int(request.GET.get('page', 1)) 130 jobs = [] 131 132 if num_jobs > 0: 133 last_page = int(ceil(num_jobs / items_per_page)) 134 page_range = range(1, last_page + 1) 135 offset = items_per_page * (page - 1) 136 job_ids = registry.get_job_ids(offset, offset + items_per_page - 1) 137 jobs = get_jobs(queue, job_ids, registry) 138 139 else: 140 page_range = [] 141 142 context_data = { 143 **admin.site.each_context(request), 144 'queue': queue, 145 'queue_index': queue_index, 146 'jobs': jobs, 147 'num_jobs': num_jobs, 148 'page': page, 149 'page_range': page_range, 150 'job_status': 'Failed', 151 } 152 return render(request, 'django_rq/jobs.html', context_data) 153 154 155@never_cache 156@staff_member_required 157def scheduled_jobs(request, queue_index): 158 queue_index = int(queue_index) 159 queue = get_queue_by_index(queue_index) 160 161 registry = ScheduledJobRegistry(queue.name, queue.connection) 162 163 items_per_page = 100 164 num_jobs = len(registry) 165 page = int(request.GET.get('page', 1)) 166 jobs = [] 167 168 if num_jobs > 0: 169 last_page = int(ceil(num_jobs / items_per_page)) 170 page_range = range(1, last_page + 1) 171 offset = items_per_page * (page - 1) 172 job_ids = registry.get_job_ids(offset, offset + items_per_page - 1) 173 174 jobs = get_jobs(queue, job_ids, registry) 175 for job in jobs: 176 job.scheduled_at = registry.get_scheduled_time(job) 177 178 else: 179 page_range = [] 180 181 context_data = { 182 **admin.site.each_context(request), 183 'queue': queue, 184 'queue_index': queue_index, 185 'jobs': jobs, 186 'num_jobs': num_jobs, 187 'page': page, 188 'page_range': page_range, 189 'job_status': 'Scheduled', 190 } 191 return render(request, 'django_rq/jobs.html', context_data) 192 193 194@never_cache 195@staff_member_required 196def started_jobs(request, queue_index): 197 queue_index = int(queue_index) 198 queue = get_queue_by_index(queue_index) 199 200 registry = StartedJobRegistry(queue.name, queue.connection) 201 202 items_per_page = 100 203 num_jobs = len(registry) 204 page = int(request.GET.get('page', 1)) 205 jobs = [] 206 207 if num_jobs > 0: 208 last_page = int(ceil(num_jobs / items_per_page)) 209 page_range = range(1, last_page + 1) 210 offset = items_per_page * (page - 1) 211 job_ids = registry.get_job_ids(offset, offset + items_per_page - 1) 212 jobs = get_jobs(queue, job_ids, registry) 213 214 else: 215 page_range = [] 216 217 context_data = { 218 **admin.site.each_context(request), 219 'queue': queue, 220 'queue_index': queue_index, 221 'jobs': jobs, 222 'num_jobs': num_jobs, 223 'page': page, 224 'page_range': page_range, 225 'job_status': 'Started', 226 } 227 return render(request, 'django_rq/jobs.html', context_data) 228 229 230@never_cache 231@staff_member_required 232def workers(request, queue_index): 233 queue_index = int(queue_index) 234 queue = get_queue_by_index(queue_index) 235 clean_worker_registry(queue) 236 all_workers = Worker.all(queue.connection) 237 workers = [worker for worker in all_workers 238 if queue.name in worker.queue_names()] 239 240 context_data = { 241 **admin.site.each_context(request), 242 'queue': queue, 243 'queue_index': queue_index, 244 'workers': workers, 245 } 246 return render(request, 'django_rq/workers.html', context_data) 247 248 249@never_cache 250@staff_member_required 251def worker_details(request, queue_index, key): 252 queue_index = int(queue_index) 253 queue = get_queue_by_index(queue_index) 254 worker = Worker.find_by_key(key, connection=queue.connection) 255 # Convert microseconds to milliseconds 256 worker.total_working_time = worker.total_working_time / 1000 257 258 queue_names = ', '.join(worker.queue_names()) 259 260 context_data = { 261 **admin.site.each_context(request), 262 'queue': queue, 263 'queue_index': queue_index, 264 'worker': worker, 265 'queue_names': queue_names, 266 'job': worker.get_current_job(), 267 'total_working_time': worker.total_working_time * 1000 268 } 269 return render(request, 'django_rq/worker_details.html', context_data) 270 271 272@never_cache 273@staff_member_required 274def deferred_jobs(request, queue_index): 275 queue_index = int(queue_index) 276 queue = get_queue_by_index(queue_index) 277 278 registry = DeferredJobRegistry(queue.name, queue.connection) 279 280 items_per_page = 100 281 num_jobs = len(registry) 282 page = int(request.GET.get('page', 1)) 283 jobs = [] 284 285 if num_jobs > 0: 286 last_page = int(ceil(num_jobs / items_per_page)) 287 page_range = range(1, last_page + 1) 288 offset = items_per_page * (page - 1) 289 job_ids = registry.get_job_ids(offset, offset + items_per_page - 1) 290 291 for job_id in job_ids: 292 try: 293 jobs.append(Job.fetch(job_id, connection=queue.connection)) 294 except NoSuchJobError: 295 pass 296 297 else: 298 page_range = [] 299 300 context_data = { 301 **admin.site.each_context(request), 302 'queue': queue, 303 'queue_index': queue_index, 304 'jobs': jobs, 305 'num_jobs': num_jobs, 306 'page': page, 307 'page_range': page_range, 308 'job_status': 'Deferred', 309 } 310 return render(request, 'django_rq/jobs.html', context_data) 311 312 313@never_cache 314@staff_member_required 315def job_detail(request, queue_index, job_id): 316 queue_index = int(queue_index) 317 queue = get_queue_by_index(queue_index) 318 try: 319 job = Job.fetch(job_id, connection=queue.connection) 320 except NoSuchJobError: 321 raise Http404("Couldn't find job with this ID: %s" % job_id) 322 323 try: 324 job.func_name 325 data_is_valid = True 326 except: 327 data_is_valid = False 328 329 context_data = { 330 **admin.site.each_context(request), 331 'queue_index': queue_index, 332 'job': job, 333 'dependency_id': job._dependency_id, 334 'queue': queue, 335 'data_is_valid': data_is_valid 336 } 337 return render(request, 'django_rq/job_detail.html', context_data) 338 339 340@never_cache 341@staff_member_required 342def delete_job(request, queue_index, job_id): 343 queue_index = int(queue_index) 344 queue = get_queue_by_index(queue_index) 345 job = Job.fetch(job_id, connection=queue.connection) 346 347 if request.method == 'POST': 348 # Remove job id from queue and delete the actual job 349 queue.connection.lrem(queue.key, 0, job.id) 350 job.delete() 351 messages.info(request, 'You have successfully deleted %s' % job.id) 352 return redirect('rq_jobs', queue_index) 353 354 context_data = { 355 **admin.site.each_context(request), 356 'queue_index': queue_index, 357 'job': job, 358 'queue': queue, 359 } 360 return render(request, 'django_rq/delete_job.html', context_data) 361 362 363@never_cache 364@staff_member_required 365def requeue_job_view(request, queue_index, job_id): 366 queue_index = int(queue_index) 367 queue = get_queue_by_index(queue_index) 368 job = Job.fetch(job_id, connection=queue.connection) 369 370 if request.method == 'POST': 371 requeue_job(job_id, connection=queue.connection) 372 messages.info(request, 'You have successfully requeued %s' % job.id) 373 return redirect('rq_job_detail', queue_index, job_id) 374 375 context_data = { 376 **admin.site.each_context(request), 377 'queue_index': queue_index, 378 'job': job, 379 'queue': queue, 380 } 381 return render(request, 'django_rq/delete_job.html', context_data) 382 383 384@never_cache 385@staff_member_required 386def clear_queue(request, queue_index): 387 queue_index = int(queue_index) 388 queue = get_queue_by_index(queue_index) 389 390 if request.method == 'POST': 391 try: 392 queue.empty() 393 messages.info(request, 'You have successfully cleared the queue %s' % queue.name) 394 except ResponseError as e: 395 if 'EVALSHA' in e.message: 396 messages.error(request, 'This action is not supported on Redis versions < 2.6.0, please use the bulk delete command instead') 397 else: 398 raise e 399 return redirect('rq_jobs', queue_index) 400 401 context_data = { 402 **admin.site.each_context(request), 403 'queue_index': queue_index, 404 'queue': queue, 405 } 406 return render(request, 'django_rq/clear_queue.html', context_data) 407 408 409@never_cache 410@staff_member_required 411def requeue_all(request, queue_index): 412 queue_index = int(queue_index) 413 queue = get_queue_by_index(queue_index) 414 registry = FailedJobRegistry(queue=queue) 415 416 if request.method == 'POST': 417 job_ids = registry.get_job_ids() 418 count = 0 419 # Confirmation received 420 for job_id in job_ids: 421 try: 422 requeue_job(job_id, connection=queue.connection) 423 count += 1 424 except NoSuchJobError: 425 pass 426 427 messages.info(request, 'You have successfully requeued %d jobs!' % count) 428 return redirect('rq_jobs', queue_index) 429 430 context_data = { 431 **admin.site.each_context(request), 432 'queue_index': queue_index, 433 'queue': queue, 434 'total_jobs': len(registry), 435 } 436 437 return render(request, 'django_rq/requeue_all.html', context_data) 438 439 440@never_cache 441@staff_member_required 442def confirm_action(request, queue_index): 443 queue_index = int(queue_index) 444 queue = get_queue_by_index(queue_index) 445 next_url = request.META.get('HTTP_REFERER') or reverse('rq_jobs', args=[queue_index]) 446 447 if request.method == 'POST' and request.POST.get('action', False): 448 # confirm action 449 if request.POST.get('_selected_action', False): 450 context_data = { 451 **admin.site.each_context(request), 452 'queue_index': queue_index, 453 'action': request.POST['action'], 454 'job_ids': request.POST.getlist('_selected_action'), 455 'queue': queue, 456 'next_url': next_url, 457 } 458 return render(request, 'django_rq/confirm_action.html', context_data) 459 460 return redirect(next_url) 461 462 463@never_cache 464@staff_member_required 465def actions(request, queue_index): 466 queue_index = int(queue_index) 467 queue = get_queue_by_index(queue_index) 468 next_url = request.POST.get('next_url') or reverse('rq_jobs', args=[queue_index]) 469 470 if request.method == 'POST' and request.POST.get('action', False): 471 # do confirmed action 472 if request.POST.get('job_ids', False): 473 job_ids = request.POST.getlist('job_ids') 474 475 if request.POST['action'] == 'delete': 476 for job_id in job_ids: 477 job = Job.fetch(job_id, connection=queue.connection) 478 # Remove job id from queue and delete the actual job 479 queue.connection.lrem(queue.key, 0, job.id) 480 job.delete() 481 messages.info(request, 'You have successfully deleted %s jobs!' % len(job_ids)) 482 elif request.POST['action'] == 'requeue': 483 for job_id in job_ids: 484 requeue_job(job_id, connection=queue.connection) 485 messages.info(request, 'You have successfully requeued %d jobs!' % len(job_ids)) 486 487 return redirect(next_url) 488 489 490@never_cache 491@staff_member_required 492def enqueue_job(request, queue_index, job_id): 493 """ Enqueue deferred jobs 494 """ 495 queue_index = int(queue_index) 496 queue = get_queue_by_index(queue_index) 497 job = Job.fetch(job_id, connection=queue.connection) 498 499 if request.method == 'POST': 500 queue.enqueue_job(job) 501 502 # Remove job from correct registry if needed 503 if job.get_status() == JobStatus.DEFERRED: 504 registry = DeferredJobRegistry(queue.name, queue.connection) 505 registry.remove(job) 506 elif job.get_status() == JobStatus.FINISHED: 507 registry = FinishedJobRegistry(queue.name, queue.connection) 508 registry.remove(job) 509 510 messages.info(request, 'You have successfully enqueued %s' % job.id) 511 return redirect('rq_job_detail', queue_index, job_id) 512 513 context_data = { 514 **admin.site.each_context(request), 515 'queue_index': queue_index, 516 'job': job, 517 'queue': queue, 518 } 519 return render(request, 'django_rq/delete_job.html', context_data) 520