1#!/usr/local/bin/python3.8 2 3"""=cut 4=head1 NAME 5 6celery_tasks - Munin plugin to monitor the number of Celery tasks with specified names. 7 8=head1 REQUIREMENTS 9 10 - Python 11 - celery (http://celeryproject.org/) 12 - celerymon (http://github.com/ask/celerymon) 13 14Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option 15 16=head1 CONFIGURATION 17 18Default configuration: 19 20None 21 22You must set the name of at least one task you want to monitor (multiple names are separated by a comma). 23 24For example: 25 26 [celery_tasks] 27 env.tasks myapp.tasks.SendEmailTask,myapp2.tasks.FetchUserDataTask 28 29This would monitor the number of task for a task with name "myapp.tasks.SendEmailTask" and "myapp2.tasks.FetchUserDataTask". 30 31=head1 MAGIC MARKERS 32 33 #%# family=manual 34 #%# capabilities=autoconf 35 36=head1 AUTHOR 37 38Tomaz Muraus (http://github.com/Kami/munin-celery) 39 40=head1 LICENSE 41 42GPLv2 43 44=cut""" 45 46import os 47import sys 48import urllib 49 50try: 51 import json 52except: 53 import simplejson as json 54 55API_URL = 'http://localhost:8989' 56URL_ENDPOINTS = { 57 'workers': '/api/worker/', 58 'worker_tasks': '/api/worker/%s/tasks', 59 'tasks': '/api/task/', 60 'task_names': '/api/task/name/', 61 'task_details': '/api/task/name/%s', 62} 63TASK_STATES = ( 64 'PENDING', 65 'RECEIVED', 66 'STARTED', 67 'SUCCESS', 68 'FAILURE', 69 'REVOKED', 70 'RETRY' 71) 72 73def get_data(what, api_url, *args): 74 try: 75 request = urllib.urlopen('%s%s' % (api_url, \ 76 URL_ENDPOINTS[what] % (args))) 77 response = request.read() 78 return json.loads(response) 79 except IOError: 80 print 'Could not connect to the celerymon webserver' 81 sys.exit(-1) 82 83def check_web_server_status(api_url): 84 try: 85 request = urllib.urlopen(api_url) 86 response = request.read() 87 except IOError: 88 print 'Could not connect to the celerymon webserver' 89 sys.exit(-1) 90 91def clean_task_name(task_name): 92 return task_name.replace('.', '_') 93 94# Config 95def print_config(task_names): 96 print 'graph_title Celery tasks' 97 print 'graph_args --lower-limit 0' 98 print 'graph_scale no' 99 print 'graph_vlabel tasks per ${graph_period}' 100 print 'graph_category cloud' 101 102 for name in task_names: 103 print '%s.label %s' % (clean_task_name(name), name) 104 print '%s.type DERIVE' % (clean_task_name(name)) 105 print '%s.min 0' % (clean_task_name(name)) 106 print '%s.info number of %s tasks' % (clean_task_name(name), name) 107 108# Values 109def print_values(task_names = None, api_url = None): 110 for task_name in task_names: 111 count = len(get_data('task_details', api_url, task_name)) 112 print '%s.value %d' % (clean_task_name(task_name), count) 113 114if __name__ == '__main__': 115 task_names = os.environ.get('tasks', None) 116 api_url = os.environ.get('api_url', API_URL) 117 118 check_web_server_status(api_url) 119 120 if not task_names: 121 print 'You need to define at least one task name' 122 sys.exit(-1) 123 124 task_names = task_names.split(',') 125 126 if len(sys.argv) > 1: 127 if sys.argv[1] == 'config': 128 print_config(task_names) 129 elif sys.argv[1] == 'autoconf': 130 print 'yes' 131 else: 132 print_values(task_names, api_url) 133 134