1from __future__ import division
2
3from math import ceil
4
5from django.contrib import admin, messages
6from django.contrib.admin.views.decorators import staff_member_required
7from django.http import Http404, JsonResponse
8from django.shortcuts import redirect, render
9from django.urls import reverse
10from django.views.decorators.cache import never_cache
11
12from redis.exceptions import ResponseError
13from rq import requeue_job
14from rq.exceptions import NoSuchJobError
15from rq.job import Job, JobStatus
16from rq.registry import (
17    DeferredJobRegistry,
18    FailedJobRegistry,
19    FinishedJobRegistry,
20    ScheduledJobRegistry,
21    StartedJobRegistry,
22)
23from rq.worker import Worker
24from rq.worker_registration import clean_worker_registry
25
26from .queues import get_queue_by_index
27from .settings import API_TOKEN
28from .utils import get_statistics, get_jobs
29
30
31@never_cache
32@staff_member_required
33def stats(request):
34    context_data = {
35        **admin.site.each_context(request),
36        **get_statistics(run_maintenance_tasks=True)
37    }
38    return render(request, 'django_rq/stats.html', context_data)
39
40
41def stats_json(request, token=None):
42    if request.user.is_staff or (token and token == API_TOKEN):
43        return JsonResponse(get_statistics())
44
45    return JsonResponse({
46        "error": True,
47        "description": "Please configure API_TOKEN in settings.py before accessing this view."
48    })
49
50
51@never_cache
52@staff_member_required
53def jobs(request, queue_index):
54    queue_index = int(queue_index)
55    queue = get_queue_by_index(queue_index)
56
57    items_per_page = 100
58    num_jobs = queue.count
59    page = int(request.GET.get('page', 1))
60
61    if num_jobs > 0:
62        last_page = int(ceil(num_jobs / items_per_page))
63        page_range = range(1, last_page + 1)
64        offset = items_per_page * (page - 1)
65        jobs = queue.get_jobs(offset, items_per_page)
66    else:
67        jobs = []
68        page_range = []
69
70    context_data = {
71        **admin.site.each_context(request),
72        'queue': queue,
73        'queue_index': queue_index,
74        'jobs': jobs,
75        'num_jobs': num_jobs,
76        'page': page,
77        'page_range': page_range,
78        'job_status': 'Queued',
79    }
80    return render(request, 'django_rq/jobs.html', context_data)
81
82
83@never_cache
84@staff_member_required
85def finished_jobs(request, queue_index):
86    queue_index = int(queue_index)
87    queue = get_queue_by_index(queue_index)
88
89    registry = FinishedJobRegistry(queue.name, queue.connection)
90
91    items_per_page = 100
92    num_jobs = len(registry)
93    page = int(request.GET.get('page', 1))
94    jobs = []
95
96    if num_jobs > 0:
97        last_page = int(ceil(num_jobs / items_per_page))
98        page_range = range(1, last_page + 1)
99        offset = items_per_page * (page - 1)
100        job_ids = registry.get_job_ids(offset, offset + items_per_page - 1)
101        jobs = get_jobs(queue, job_ids, registry)
102
103    else:
104        page_range = []
105
106    context_data = {
107        **admin.site.each_context(request),
108        'queue': queue,
109        'queue_index': queue_index,
110        'jobs': jobs,
111        'num_jobs': num_jobs,
112        'page': page,
113        'page_range': page_range,
114        'job_status': 'Finished',
115    }
116    return render(request, 'django_rq/jobs.html', context_data)
117
118
119@never_cache
120@staff_member_required
121def failed_jobs(request, queue_index):
122    queue_index = int(queue_index)
123    queue = get_queue_by_index(queue_index)
124
125    registry = FailedJobRegistry(queue.name, queue.connection)
126
127    items_per_page = 100
128    num_jobs = len(registry)
129    page = int(request.GET.get('page', 1))
130    jobs = []
131
132    if num_jobs > 0:
133        last_page = int(ceil(num_jobs / items_per_page))
134        page_range = range(1, last_page + 1)
135        offset = items_per_page * (page - 1)
136        job_ids = registry.get_job_ids(offset, offset + items_per_page - 1)
137        jobs = get_jobs(queue, job_ids, registry)
138
139    else:
140        page_range = []
141
142    context_data = {
143        **admin.site.each_context(request),
144        'queue': queue,
145        'queue_index': queue_index,
146        'jobs': jobs,
147        'num_jobs': num_jobs,
148        'page': page,
149        'page_range': page_range,
150        'job_status': 'Failed',
151    }
152    return render(request, 'django_rq/jobs.html', context_data)
153
154
155@never_cache
156@staff_member_required
157def scheduled_jobs(request, queue_index):
158    queue_index = int(queue_index)
159    queue = get_queue_by_index(queue_index)
160
161    registry = ScheduledJobRegistry(queue.name, queue.connection)
162
163    items_per_page = 100
164    num_jobs = len(registry)
165    page = int(request.GET.get('page', 1))
166    jobs = []
167
168    if num_jobs > 0:
169        last_page = int(ceil(num_jobs / items_per_page))
170        page_range = range(1, last_page + 1)
171        offset = items_per_page * (page - 1)
172        job_ids = registry.get_job_ids(offset, offset + items_per_page - 1)
173
174        jobs = get_jobs(queue, job_ids, registry)
175        for job in jobs:
176            job.scheduled_at = registry.get_scheduled_time(job)
177
178    else:
179        page_range = []
180
181    context_data = {
182        **admin.site.each_context(request),
183        'queue': queue,
184        'queue_index': queue_index,
185        'jobs': jobs,
186        'num_jobs': num_jobs,
187        'page': page,
188        'page_range': page_range,
189        'job_status': 'Scheduled',
190    }
191    return render(request, 'django_rq/jobs.html', context_data)
192
193
194@never_cache
195@staff_member_required
196def started_jobs(request, queue_index):
197    queue_index = int(queue_index)
198    queue = get_queue_by_index(queue_index)
199
200    registry = StartedJobRegistry(queue.name, queue.connection)
201
202    items_per_page = 100
203    num_jobs = len(registry)
204    page = int(request.GET.get('page', 1))
205    jobs = []
206
207    if num_jobs > 0:
208        last_page = int(ceil(num_jobs / items_per_page))
209        page_range = range(1, last_page + 1)
210        offset = items_per_page * (page - 1)
211        job_ids = registry.get_job_ids(offset, offset + items_per_page - 1)
212        jobs = get_jobs(queue, job_ids, registry)
213
214    else:
215        page_range = []
216
217    context_data = {
218        **admin.site.each_context(request),
219        'queue': queue,
220        'queue_index': queue_index,
221        'jobs': jobs,
222        'num_jobs': num_jobs,
223        'page': page,
224        'page_range': page_range,
225        'job_status': 'Started',
226    }
227    return render(request, 'django_rq/jobs.html', context_data)
228
229
230@never_cache
231@staff_member_required
232def workers(request, queue_index):
233    queue_index = int(queue_index)
234    queue = get_queue_by_index(queue_index)
235    clean_worker_registry(queue)
236    all_workers = Worker.all(queue.connection)
237    workers = [worker for worker in all_workers
238               if queue.name in worker.queue_names()]
239
240    context_data = {
241        **admin.site.each_context(request),
242        'queue': queue,
243        'queue_index': queue_index,
244        'workers': workers,
245    }
246    return render(request, 'django_rq/workers.html', context_data)
247
248
249@never_cache
250@staff_member_required
251def worker_details(request, queue_index, key):
252    queue_index = int(queue_index)
253    queue = get_queue_by_index(queue_index)
254    worker = Worker.find_by_key(key, connection=queue.connection)
255    # Convert microseconds to milliseconds
256    worker.total_working_time = worker.total_working_time / 1000
257
258    queue_names = ', '.join(worker.queue_names())
259
260    context_data = {
261        **admin.site.each_context(request),
262        'queue': queue,
263        'queue_index': queue_index,
264        'worker': worker,
265        'queue_names': queue_names,
266        'job': worker.get_current_job(),
267        'total_working_time': worker.total_working_time * 1000
268    }
269    return render(request, 'django_rq/worker_details.html', context_data)
270
271
272@never_cache
273@staff_member_required
274def deferred_jobs(request, queue_index):
275    queue_index = int(queue_index)
276    queue = get_queue_by_index(queue_index)
277
278    registry = DeferredJobRegistry(queue.name, queue.connection)
279
280    items_per_page = 100
281    num_jobs = len(registry)
282    page = int(request.GET.get('page', 1))
283    jobs = []
284
285    if num_jobs > 0:
286        last_page = int(ceil(num_jobs / items_per_page))
287        page_range = range(1, last_page + 1)
288        offset = items_per_page * (page - 1)
289        job_ids = registry.get_job_ids(offset, offset + items_per_page - 1)
290
291        for job_id in job_ids:
292            try:
293                jobs.append(Job.fetch(job_id, connection=queue.connection))
294            except NoSuchJobError:
295                pass
296
297    else:
298        page_range = []
299
300    context_data = {
301        **admin.site.each_context(request),
302        'queue': queue,
303        'queue_index': queue_index,
304        'jobs': jobs,
305        'num_jobs': num_jobs,
306        'page': page,
307        'page_range': page_range,
308        'job_status': 'Deferred',
309    }
310    return render(request, 'django_rq/jobs.html', context_data)
311
312
313@never_cache
314@staff_member_required
315def job_detail(request, queue_index, job_id):
316    queue_index = int(queue_index)
317    queue = get_queue_by_index(queue_index)
318    try:
319        job = Job.fetch(job_id, connection=queue.connection)
320    except NoSuchJobError:
321        raise Http404("Couldn't find job with this ID: %s" % job_id)
322
323    try:
324        job.func_name
325        data_is_valid = True
326    except:
327        data_is_valid = False
328
329    context_data = {
330        **admin.site.each_context(request),
331        'queue_index': queue_index,
332        'job': job,
333        'dependency_id': job._dependency_id,
334        'queue': queue,
335        'data_is_valid': data_is_valid
336    }
337    return render(request, 'django_rq/job_detail.html', context_data)
338
339
340@never_cache
341@staff_member_required
342def delete_job(request, queue_index, job_id):
343    queue_index = int(queue_index)
344    queue = get_queue_by_index(queue_index)
345    job = Job.fetch(job_id, connection=queue.connection)
346
347    if request.method == 'POST':
348        # Remove job id from queue and delete the actual job
349        queue.connection.lrem(queue.key, 0, job.id)
350        job.delete()
351        messages.info(request, 'You have successfully deleted %s' % job.id)
352        return redirect('rq_jobs', queue_index)
353
354    context_data = {
355        **admin.site.each_context(request),
356        'queue_index': queue_index,
357        'job': job,
358        'queue': queue,
359    }
360    return render(request, 'django_rq/delete_job.html', context_data)
361
362
363@never_cache
364@staff_member_required
365def requeue_job_view(request, queue_index, job_id):
366    queue_index = int(queue_index)
367    queue = get_queue_by_index(queue_index)
368    job = Job.fetch(job_id, connection=queue.connection)
369
370    if request.method == 'POST':
371        requeue_job(job_id, connection=queue.connection)
372        messages.info(request, 'You have successfully requeued %s' % job.id)
373        return redirect('rq_job_detail', queue_index, job_id)
374
375    context_data = {
376        **admin.site.each_context(request),
377        'queue_index': queue_index,
378        'job': job,
379        'queue': queue,
380    }
381    return render(request, 'django_rq/delete_job.html', context_data)
382
383
384@never_cache
385@staff_member_required
386def clear_queue(request, queue_index):
387    queue_index = int(queue_index)
388    queue = get_queue_by_index(queue_index)
389
390    if request.method == 'POST':
391        try:
392            queue.empty()
393            messages.info(request, 'You have successfully cleared the queue %s' % queue.name)
394        except ResponseError as e:
395            if 'EVALSHA' in e.message:
396                messages.error(request, 'This action is not supported on Redis versions < 2.6.0, please use the bulk delete command instead')
397            else:
398                raise e
399        return redirect('rq_jobs', queue_index)
400
401    context_data = {
402        **admin.site.each_context(request),
403        'queue_index': queue_index,
404        'queue': queue,
405    }
406    return render(request, 'django_rq/clear_queue.html', context_data)
407
408
409@never_cache
410@staff_member_required
411def requeue_all(request, queue_index):
412    queue_index = int(queue_index)
413    queue = get_queue_by_index(queue_index)
414    registry = FailedJobRegistry(queue=queue)
415
416    if request.method == 'POST':
417        job_ids = registry.get_job_ids()
418        count = 0
419        # Confirmation received
420        for job_id in job_ids:
421            try:
422                requeue_job(job_id, connection=queue.connection)
423                count += 1
424            except NoSuchJobError:
425                pass
426
427        messages.info(request, 'You have successfully requeued %d jobs!' % count)
428        return redirect('rq_jobs', queue_index)
429
430    context_data = {
431        **admin.site.each_context(request),
432        'queue_index': queue_index,
433        'queue': queue,
434        'total_jobs': len(registry),
435    }
436
437    return render(request, 'django_rq/requeue_all.html', context_data)
438
439
440@never_cache
441@staff_member_required
442def confirm_action(request, queue_index):
443    queue_index = int(queue_index)
444    queue = get_queue_by_index(queue_index)
445    next_url = request.META.get('HTTP_REFERER') or reverse('rq_jobs', args=[queue_index])
446
447    if request.method == 'POST' and request.POST.get('action', False):
448        # confirm action
449        if request.POST.get('_selected_action', False):
450            context_data = {
451                **admin.site.each_context(request),
452                'queue_index': queue_index,
453                'action': request.POST['action'],
454                'job_ids': request.POST.getlist('_selected_action'),
455                'queue': queue,
456                'next_url': next_url,
457            }
458            return render(request, 'django_rq/confirm_action.html', context_data)
459
460    return redirect(next_url)
461
462
463@never_cache
464@staff_member_required
465def actions(request, queue_index):
466    queue_index = int(queue_index)
467    queue = get_queue_by_index(queue_index)
468    next_url = request.POST.get('next_url') or reverse('rq_jobs', args=[queue_index])
469
470    if request.method == 'POST' and request.POST.get('action', False):
471        # do confirmed action
472        if request.POST.get('job_ids', False):
473            job_ids = request.POST.getlist('job_ids')
474
475            if request.POST['action'] == 'delete':
476                for job_id in job_ids:
477                    job = Job.fetch(job_id, connection=queue.connection)
478                    # Remove job id from queue and delete the actual job
479                    queue.connection.lrem(queue.key, 0, job.id)
480                    job.delete()
481                messages.info(request, 'You have successfully deleted %s jobs!' % len(job_ids))
482            elif request.POST['action'] == 'requeue':
483                for job_id in job_ids:
484                    requeue_job(job_id, connection=queue.connection)
485                messages.info(request, 'You have successfully requeued %d  jobs!' % len(job_ids))
486
487    return redirect(next_url)
488
489
490@never_cache
491@staff_member_required
492def enqueue_job(request, queue_index, job_id):
493    """ Enqueue deferred jobs
494    """
495    queue_index = int(queue_index)
496    queue = get_queue_by_index(queue_index)
497    job = Job.fetch(job_id, connection=queue.connection)
498
499    if request.method == 'POST':
500        queue.enqueue_job(job)
501
502        # Remove job from correct registry if needed
503        if job.get_status() == JobStatus.DEFERRED:
504            registry = DeferredJobRegistry(queue.name, queue.connection)
505            registry.remove(job)
506        elif job.get_status() == JobStatus.FINISHED:
507            registry = FinishedJobRegistry(queue.name, queue.connection)
508            registry.remove(job)
509
510        messages.info(request, 'You have successfully enqueued %s' % job.id)
511        return redirect('rq_job_detail', queue_index, job_id)
512
513    context_data = {
514        **admin.site.each_context(request),
515        'queue_index': queue_index,
516        'job': job,
517        'queue': queue,
518    }
519    return render(request, 'django_rq/delete_job.html', context_data)
520