1import os 2import re 3import time 4import traceback 5import datetime 6 7from supervisor import templating 8 9from supervisor.compat import urllib 10from supervisor.compat import urlparse 11from supervisor.compat import as_string 12from supervisor.compat import PY2 13from supervisor.compat import unicode 14 15from supervisor.medusa import producers 16from supervisor.medusa.http_server import http_date 17from supervisor.medusa.http_server import get_header 18from supervisor.medusa.xmlrpc_handler import collector 19 20from supervisor.process import ProcessStates 21from supervisor.http import NOT_DONE_YET 22 23from supervisor.options import VERSION 24from supervisor.options import make_namespec 25from supervisor.options import split_namespec 26 27from supervisor.xmlrpc import SystemNamespaceRPCInterface 28from supervisor.xmlrpc import RootRPCInterface 29from supervisor.xmlrpc import Faults 30from supervisor.xmlrpc import RPCError 31 32from supervisor.rpcinterface import SupervisorNamespaceRPCInterface 33 34class DeferredWebProducer: 35 """ A medusa producer that implements a deferred callback; requires 36 a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """ 37 CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE) 38 39 def __init__(self, request, callback): 40 self.callback = callback 41 self.request = request 42 self.finished = False 43 self.delay = float(callback.delay) 44 45 def more(self): 46 if self.finished: 47 return '' 48 try: 49 response = self.callback() 50 if response is NOT_DONE_YET: 51 return NOT_DONE_YET 52 53 self.finished = True 54 return self.sendresponse(response) 55 56 except: 57 tb = traceback.format_exc() 58 # this should go to the main supervisor log file 59 self.request.channel.server.logger.log('Web interface error', tb) 60 self.finished = True 61 self.request.error(500) 62 63 def sendresponse(self, response): 64 65 headers = response.get('headers', {}) 66 for header in headers: 67 self.request[header] = headers[header] 68 69 if 'Content-Type' not in self.request: 70 self.request['Content-Type'] = 'text/plain' 71 72 if headers.get('Location'): 73 self.request['Content-Length'] = 0 74 self.request.error(301) 75 return 76 77 body = response.get('body', '') 78 self.request['Content-Length'] = len(body) 79 80 self.request.push(body) 81 82 connection = get_header(self.CONNECTION, self.request.header) 83 84 close_it = 0 85 wrap_in_chunking = 0 86 87 if self.request.version == '1.0': 88 if connection == 'keep-alive': 89 if not self.request.has_key('Content-Length'): 90 close_it = 1 91 else: 92 self.request['Connection'] = 'Keep-Alive' 93 else: 94 close_it = 1 95 elif self.request.version == '1.1': 96 if connection == 'close': 97 close_it = 1 98 elif 'Content-Length' not in self.request: 99 if 'Transfer-Encoding' in self.request: 100 if not self.request['Transfer-Encoding'] == 'chunked': 101 close_it = 1 102 elif self.request.use_chunked: 103 self.request['Transfer-Encoding'] = 'chunked' 104 wrap_in_chunking = 1 105 else: 106 close_it = 1 107 elif self.request.version is None: 108 close_it = 1 109 110 outgoing_header = producers.simple_producer ( 111 self.request.build_reply_header()) 112 113 if close_it: 114 self.request['Connection'] = 'close' 115 116 if wrap_in_chunking: 117 outgoing_producer = producers.chunked_producer ( 118 producers.composite_producer (self.request.outgoing) 119 ) 120 # prepend the header 121 outgoing_producer = producers.composite_producer( 122 [outgoing_header, outgoing_producer] 123 ) 124 else: 125 # fix AttributeError: 'unicode' object has no attribute 'more' 126 if PY2 and (len(self.request.outgoing) > 0): 127 body = self.request.outgoing[0] 128 if isinstance(body, unicode): 129 self.request.outgoing[0] = producers.simple_producer (body) 130 131 # prepend the header 132 self.request.outgoing.insert(0, outgoing_header) 133 outgoing_producer = producers.composite_producer ( 134 self.request.outgoing) 135 136 # apply a few final transformations to the output 137 self.request.channel.push_with_producer ( 138 # globbing gives us large packets 139 producers.globbing_producer ( 140 # hooking lets us log the number of bytes sent 141 producers.hooked_producer ( 142 outgoing_producer, 143 self.request.log 144 ) 145 ) 146 ) 147 148 self.request.channel.current_request = None 149 150 if close_it: 151 self.request.channel.close_when_done() 152 153class ViewContext: 154 def __init__(self, **kw): 155 self.__dict__.update(kw) 156 157class MeldView: 158 159 content_type = 'text/html;charset=utf-8' 160 delay = .5 161 162 def __init__(self, context): 163 self.context = context 164 template = self.context.template 165 if not os.path.isabs(template): 166 here = os.path.abspath(os.path.dirname(__file__)) 167 template = os.path.join(here, template) 168 self.root = templating.parse_xml(template) 169 self.callback = None 170 171 def __call__(self): 172 body = self.render() 173 if body is NOT_DONE_YET: 174 return NOT_DONE_YET 175 176 response = self.context.response 177 headers = response['headers'] 178 headers['Content-Type'] = self.content_type 179 headers['Pragma'] = 'no-cache' 180 headers['Cache-Control'] = 'no-cache' 181 headers['Expires'] = http_date.build_http_date(0) 182 response['body'] = as_string(body) 183 return response 184 185 def render(self): 186 pass 187 188 def clone(self): 189 return self.root.clone() 190 191class TailView(MeldView): 192 def render(self): 193 supervisord = self.context.supervisord 194 form = self.context.form 195 196 if not 'processname' in form: 197 tail = 'No process name found' 198 processname = None 199 else: 200 processname = form['processname'] 201 offset = 0 202 limit = form.get('limit', '1024') 203 limit = min(-1024, int(limit)*-1 if limit.isdigit() else -1024) 204 if not processname: 205 tail = 'No process name found' 206 else: 207 rpcinterface = SupervisorNamespaceRPCInterface(supervisord) 208 try: 209 tail = rpcinterface.readProcessStdoutLog(processname, 210 limit, offset) 211 except RPCError as e: 212 if e.code == Faults.NO_FILE: 213 tail = 'No file for %s' % processname 214 else: 215 tail = 'ERROR: unexpected rpc fault [%d] %s' % ( 216 e.code, e.text) 217 218 root = self.clone() 219 220 title = root.findmeld('title') 221 title.content('Supervisor tail of process %s' % processname) 222 tailbody = root.findmeld('tailbody') 223 tailbody.content(tail) 224 225 refresh_anchor = root.findmeld('refresh_anchor') 226 if processname is not None: 227 refresh_anchor.attributes( 228 href='tail.html?processname=%s&limit=%s' % ( 229 urllib.quote(processname), urllib.quote(str(abs(limit))) 230 ) 231 ) 232 else: 233 refresh_anchor.deparent() 234 235 return as_string(root.write_xhtmlstring()) 236 237class StatusView(MeldView): 238 def actions_for_process(self, process): 239 state = process.get_state() 240 processname = urllib.quote(make_namespec(process.group.config.name, 241 process.config.name)) 242 start = { 243 'name': 'Start', 244 'href': 'index.html?processname=%s&action=start' % processname, 245 'target': None, 246 } 247 restart = { 248 'name': 'Restart', 249 'href': 'index.html?processname=%s&action=restart' % processname, 250 'target': None, 251 } 252 stop = { 253 'name': 'Stop', 254 'href': 'index.html?processname=%s&action=stop' % processname, 255 'target': None, 256 } 257 clearlog = { 258 'name': 'Clear Log', 259 'href': 'index.html?processname=%s&action=clearlog' % processname, 260 'target': None, 261 } 262 tailf_stdout = { 263 'name': 'Tail -f Stdout', 264 'href': 'logtail/%s' % processname, 265 'target': '_blank' 266 } 267 tailf_stderr = { 268 'name': 'Tail -f Stderr', 269 'href': 'logtail/%s/stderr' % processname, 270 'target': '_blank' 271 } 272 if state == ProcessStates.RUNNING: 273 actions = [restart, stop, clearlog, tailf_stdout, tailf_stderr] 274 elif state in (ProcessStates.STOPPED, ProcessStates.EXITED, 275 ProcessStates.FATAL): 276 actions = [start, None, clearlog, tailf_stdout, tailf_stderr] 277 else: 278 actions = [None, None, clearlog, tailf_stdout, tailf_stderr] 279 return actions 280 281 def css_class_for_state(self, state): 282 if state == ProcessStates.RUNNING: 283 return 'statusrunning' 284 elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF): 285 return 'statuserror' 286 else: 287 return 'statusnominal' 288 289 def make_callback(self, namespec, action): 290 supervisord = self.context.supervisord 291 292 # the rpc interface code is already written to deal properly in a 293 # deferred world, so just use it 294 main = ('supervisor', SupervisorNamespaceRPCInterface(supervisord)) 295 system = ('system', SystemNamespaceRPCInterface([main])) 296 297 rpcinterface = RootRPCInterface([main, system]) 298 299 if action: 300 301 if action == 'refresh': 302 def donothing(): 303 message = 'Page refreshed at %s' % time.ctime() 304 return message 305 donothing.delay = 0.05 306 return donothing 307 308 elif action == 'stopall': 309 callback = rpcinterface.supervisor.stopAllProcesses() 310 def stopall(): 311 if callback() is NOT_DONE_YET: 312 return NOT_DONE_YET 313 else: 314 return 'All stopped at %s' % time.ctime() 315 stopall.delay = 0.05 316 return stopall 317 318 elif action == 'restartall': 319 callback = rpcinterface.system.multicall( 320 [ {'methodName':'supervisor.stopAllProcesses'}, 321 {'methodName':'supervisor.startAllProcesses'} ] ) 322 def restartall(): 323 result = callback() 324 if result is NOT_DONE_YET: 325 return NOT_DONE_YET 326 return 'All restarted at %s' % time.ctime() 327 restartall.delay = 0.05 328 return restartall 329 330 elif namespec: 331 def wrong(): 332 return 'No such process named %s' % namespec 333 wrong.delay = 0.05 334 group_name, process_name = split_namespec(namespec) 335 group = supervisord.process_groups.get(group_name) 336 if group is None: 337 return wrong 338 process = group.processes.get(process_name) 339 if process is None: 340 return wrong 341 342 if action == 'start': 343 try: 344 bool_or_callback = ( 345 rpcinterface.supervisor.startProcess(namespec) 346 ) 347 except RPCError as e: 348 if e.code == Faults.NO_FILE: 349 msg = 'no such file' 350 elif e.code == Faults.NOT_EXECUTABLE: 351 msg = 'file not executable' 352 elif e.code == Faults.ALREADY_STARTED: 353 msg = 'already started' 354 elif e.code == Faults.SPAWN_ERROR: 355 msg = 'spawn error' 356 elif e.code == Faults.ABNORMAL_TERMINATION: 357 msg = 'abnormal termination' 358 else: 359 msg = 'unexpected rpc fault [%d] %s' % ( 360 e.code, e.text) 361 def starterr(): 362 return 'ERROR: Process %s: %s' % (namespec, msg) 363 starterr.delay = 0.05 364 return starterr 365 366 if callable(bool_or_callback): 367 def startprocess(): 368 try: 369 result = bool_or_callback() 370 except RPCError as e: 371 if e.code == Faults.SPAWN_ERROR: 372 msg = 'spawn error' 373 elif e.code == Faults.ABNORMAL_TERMINATION: 374 msg = 'abnormal termination' 375 else: 376 msg = 'unexpected rpc fault [%d] %s' % ( 377 e.code, e.text) 378 return 'ERROR: Process %s: %s' % (namespec, msg) 379 380 if result is NOT_DONE_YET: 381 return NOT_DONE_YET 382 return 'Process %s started' % namespec 383 startprocess.delay = 0.05 384 return startprocess 385 else: 386 def startdone(): 387 return 'Process %s started' % namespec 388 startdone.delay = 0.05 389 return startdone 390 391 elif action == 'stop': 392 try: 393 bool_or_callback = ( 394 rpcinterface.supervisor.stopProcess(namespec) 395 ) 396 except RPCError as e: 397 msg = 'unexpected rpc fault [%d] %s' % (e.code, e.text) 398 def stoperr(): 399 return msg 400 stoperr.delay = 0.05 401 return stoperr 402 403 if callable(bool_or_callback): 404 def stopprocess(): 405 try: 406 result = bool_or_callback() 407 except RPCError as e: 408 return 'unexpected rpc fault [%d] %s' % ( 409 e.code, e.text) 410 if result is NOT_DONE_YET: 411 return NOT_DONE_YET 412 return 'Process %s stopped' % namespec 413 stopprocess.delay = 0.05 414 return stopprocess 415 else: 416 def stopdone(): 417 return 'Process %s stopped' % namespec 418 stopdone.delay = 0.05 419 return stopdone 420 421 elif action == 'restart': 422 results_or_callback = rpcinterface.system.multicall( 423 [ {'methodName':'supervisor.stopProcess', 424 'params': [namespec]}, 425 {'methodName':'supervisor.startProcess', 426 'params': [namespec]}, 427 ] 428 ) 429 if callable(results_or_callback): 430 callback = results_or_callback 431 def restartprocess(): 432 results = callback() 433 if results is NOT_DONE_YET: 434 return NOT_DONE_YET 435 return 'Process %s restarted' % namespec 436 restartprocess.delay = 0.05 437 return restartprocess 438 else: 439 def restartdone(): 440 return 'Process %s restarted' % namespec 441 restartdone.delay = 0.05 442 return restartdone 443 444 elif action == 'clearlog': 445 try: 446 callback = rpcinterface.supervisor.clearProcessLogs( 447 namespec) 448 except RPCError as e: 449 msg = 'unexpected rpc fault [%d] %s' % (e.code, e.text) 450 def clearerr(): 451 return msg 452 clearerr.delay = 0.05 453 return clearerr 454 455 def clearlog(): 456 return 'Log for %s cleared' % namespec 457 clearlog.delay = 0.05 458 return clearlog 459 460 raise ValueError(action) 461 462 def render(self): 463 form = self.context.form 464 response = self.context.response 465 processname = form.get('processname') 466 action = form.get('action') 467 message = form.get('message') 468 469 if action: 470 if not self.callback: 471 self.callback = self.make_callback(processname, action) 472 return NOT_DONE_YET 473 474 else: 475 message = self.callback() 476 if message is NOT_DONE_YET: 477 return NOT_DONE_YET 478 if message is not None: 479 server_url = form['SERVER_URL'] 480 location = server_url + "/" + '?message=%s' % urllib.quote( 481 message) 482 response['headers']['Location'] = location 483 484 supervisord = self.context.supervisord 485 rpcinterface = RootRPCInterface( 486 [('supervisor', 487 SupervisorNamespaceRPCInterface(supervisord))] 488 ) 489 490 processnames = [] 491 for group in supervisord.process_groups.values(): 492 for gprocname in group.processes.keys(): 493 processnames.append((group.config.name, gprocname)) 494 495 processnames.sort() 496 497 data = [] 498 for groupname, processname in processnames: 499 actions = self.actions_for_process( 500 supervisord.process_groups[groupname].processes[processname]) 501 sent_name = make_namespec(groupname, processname) 502 info = rpcinterface.supervisor.getProcessInfo(sent_name) 503 data.append({ 504 'status':info['statename'], 505 'name':processname, 506 'group':groupname, 507 'actions':actions, 508 'state':info['state'], 509 'description':info['description'], 510 }) 511 512 root = self.clone() 513 514 if message is not None: 515 statusarea = root.findmeld('statusmessage') 516 statusarea.attrib['class'] = 'status_msg' 517 statusarea.content(message) 518 519 if data: 520 iterator = root.findmeld('tr').repeat(data) 521 shaded_tr = False 522 523 for tr_element, item in iterator: 524 status_text = tr_element.findmeld('status_text') 525 status_text.content(item['status'].lower()) 526 status_text.attrib['class'] = self.css_class_for_state( 527 item['state']) 528 529 info_text = tr_element.findmeld('info_text') 530 info_text.content(item['description']) 531 532 anchor = tr_element.findmeld('name_anchor') 533 processname = make_namespec(item['group'], item['name']) 534 anchor.attributes(href='tail.html?processname=%s' % 535 urllib.quote(processname)) 536 anchor.content(processname) 537 538 actions = item['actions'] 539 actionitem_td = tr_element.findmeld('actionitem_td') 540 541 for li_element, actionitem in actionitem_td.repeat(actions): 542 anchor = li_element.findmeld('actionitem_anchor') 543 if actionitem is None: 544 anchor.attrib['class'] = 'hidden' 545 else: 546 anchor.attributes(href=actionitem['href'], 547 name=actionitem['name']) 548 anchor.content(actionitem['name']) 549 if actionitem['target']: 550 anchor.attributes(target=actionitem['target']) 551 if shaded_tr: 552 tr_element.attrib['class'] = 'shade' 553 shaded_tr = not shaded_tr 554 else: 555 table = root.findmeld('statustable') 556 table.replace('No programs to manage') 557 558 root.findmeld('supervisor_version').content(VERSION) 559 copyright_year = str(datetime.date.today().year) 560 root.findmeld('copyright_date').content(copyright_year) 561 562 return as_string(root.write_xhtmlstring()) 563 564class OKView: 565 delay = 0 566 def __init__(self, context): 567 self.context = context 568 569 def __call__(self): 570 return {'body':'OK'} 571 572VIEWS = { 573 'index.html': { 574 'template':'ui/status.html', 575 'view':StatusView 576 }, 577 'tail.html': { 578 'template':'ui/tail.html', 579 'view':TailView, 580 }, 581 'ok.html': { 582 'template':None, 583 'view':OKView, 584 }, 585 } 586 587 588class supervisor_ui_handler: 589 IDENT = 'Supervisor Web UI HTTP Request Handler' 590 591 def __init__(self, supervisord): 592 self.supervisord = supervisord 593 594 def match(self, request): 595 if request.command not in ('POST', 'GET'): 596 return False 597 598 path, params, query, fragment = request.split_uri() 599 600 while path.startswith('/'): 601 path = path[1:] 602 603 if not path: 604 path = 'index.html' 605 606 for viewname in VIEWS.keys(): 607 if viewname == path: 608 return True 609 610 def handle_request(self, request): 611 if request.command == 'POST': 612 request.collector = collector(self, request) 613 else: 614 self.continue_request('', request) 615 616 def continue_request (self, data, request): 617 form = {} 618 cgi_env = request.cgi_environment() 619 form.update(cgi_env) 620 if 'QUERY_STRING' not in form: 621 form['QUERY_STRING'] = '' 622 623 query = form['QUERY_STRING'] 624 625 # we only handle x-www-form-urlencoded values from POSTs 626 form_urlencoded = urlparse.parse_qsl(data) 627 query_data = urlparse.parse_qs(query) 628 629 for k, v in query_data.items(): 630 # ignore dupes 631 form[k] = v[0] 632 633 for k, v in form_urlencoded: 634 # ignore dupes 635 form[k] = v 636 637 form['SERVER_URL'] = request.get_server_url() 638 639 path = form['PATH_INFO'] 640 # strip off all leading slashes 641 while path and path[0] == '/': 642 path = path[1:] 643 if not path: 644 path = 'index.html' 645 646 viewinfo = VIEWS.get(path) 647 if viewinfo is None: 648 # this should never happen if our match method works 649 return 650 651 response = {'headers': {}} 652 653 viewclass = viewinfo['view'] 654 viewtemplate = viewinfo['template'] 655 context = ViewContext(template=viewtemplate, 656 request = request, 657 form = form, 658 response = response, 659 supervisord=self.supervisord) 660 view = viewclass(context) 661 pushproducer = request.channel.push_with_producer 662 pushproducer(DeferredWebProducer(request, view)) 663 664