1# Copyright 2009-2018 Oli Schacher 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15import threading 16import sys 17import time 18import socket 19import logging 20import datetime 21import traceback 22import string 23import os 24from fuglu.localStringEncoding import force_bString, force_uString 25 26 27class ControlServer(object): 28 29 def __init__(self, controller, port=None, address="127.0.0.1"): 30 if port is None: 31 port = "/tmp/fuglu_control.sock" 32 33 if isinstance(port, str): 34 try: 35 port = int(port) 36 except ValueError: 37 pass 38 39 if isinstance(port, int): 40 porttype = "inet" 41 self.logger = logging.getLogger("fuglu.control.%s" % port) 42 self.logger.debug('Starting Control/Info server on port %s' % port) 43 else: 44 porttype = "unix" 45 self.logger = logging.getLogger( 46 "fuglu.control.%s" % os.path.basename(port)) 47 self.logger.debug('Starting Control/Info server on %s' % port) 48 49 self.port = port 50 self.controller = controller 51 self.stayalive = 1 52 53 try: 54 if porttype == "inet": 55 addr_f = socket.getaddrinfo(address, 0)[0][0] 56 self._socket = socket.socket( 57 addr_f, socket.SOCK_STREAM) 58 self._socket.setsockopt( 59 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 60 self._socket.bind((address, port)) 61 else: 62 try: 63 os.remove(port) 64 except Exception: 65 pass 66 self._socket = socket.socket( 67 socket.AF_UNIX, socket.SOCK_STREAM) 68 self._socket.bind(port) 69 70 self._socket.listen(5) 71 except Exception as e: 72 self.logger.error('Could not start control server: %s' % e) 73 sys.exit(1) 74 75 def shutdown(self): 76 self.stayalive = False 77 self.logger.info("Control Server on port %s shutting down" % self.port) 78 try: 79 self._socket.shutdown(socket.SHUT_RDWR) 80 self._socket.close() 81 time.sleep(3) 82 except Exception: 83 pass 84 85 def serve(self): 86 threading.currentThread().name = 'ControlServer Thread' 87 controller = self.controller 88 89 self.logger.info('Control/Info Server running on port %s' % self.port) 90 while self.stayalive: 91 try: 92 self.logger.debug('Waiting for connection...') 93 nsd = self._socket.accept() 94 if not self.stayalive: 95 break 96 engine = ControlSession(nsd[0], controller) 97 self.logger.debug('Incoming connection from %s' % str(nsd[1])) 98 engine.handlesession() 99 100 except Exception: 101 fmt = traceback.format_exc() 102 self.logger.error('Exception in serve(): %s' % fmt) 103 104 105class ControlSession(object): 106 107 def __init__(self, socket, controller): 108 self.controller = controller 109 self.socket = socket 110 self.commands = { 111 'workerlist': self.workerlist, 112 'threadlist': self.threadlist, 113 'uptime': self.uptime, 114 'stats': self.stats, 115 'exceptionlist': self.exceptionlist, 116 'netconsole': self.netconsole, 117 } 118 self.logger = logging.getLogger('fuglu.controlsession') 119 120 def handlesession(self): 121 line = force_uString(self.socket.recv(4096)).lower().strip() 122 if line == '': 123 self.socket.close() 124 return 125 126 self.logger.debug('Control Socket command: %s' % line) 127 parts = line.split() 128 answer = self.handle_command(parts[0], parts[1:]) 129 self.socket.sendall(force_bString(answer)) 130 self.socket.close() 131 132 def handle_command(self, command, args): 133 if command not in self.commands: 134 return "ERR no such command: "+str(command) 135 136 res = self.commands[command](args) 137 return res 138 139 def netconsole(self, args): 140 port = 1337 141 bind = "127.0.0.1" 142 if len(args) > 0: 143 port = int(args[0]) 144 if len(args) > 1: 145 bind = args[1] 146 nc_thread = threading.Thread( 147 name='net console', target=self.controller.run_netconsole, args=(port, bind)) 148 nc_thread.daemon = True 149 nc_thread.start() 150 return "Python interactive console starting on %s port %s" % (bind, port) 151 152 def workerlist(self, args): 153 """list of mail scanning workers""" 154 threadpool = self.controller.threadpool 155 res="" 156 if threadpool is not None: 157 workerlist = "\n%s" % '\n*******\n'.join(map(repr, threadpool.workers)) 158 res += "Total %s worker threads\n%s" % (len(threadpool.workers), workerlist) 159 160 procpool = self.controller.procpool 161 if procpool is not None: 162 childstate_dict = procpool.shared_state 163 workerlist = "\n%s" % '\n*******\n'.join(["%s: %s"%(procname,procstate) for procname,procstate in childstate_dict.items()]) 164 res += "Total %s worker processes\n%s" % (len(procpool.workers), workerlist) 165 166 return res 167 168 def threadlist(self, args): 169 """list of all threads""" 170 threads = threading.enumerate() 171 threadinfo = "\n%s" % '\n*******\n'.join( 172 map(lambda t: "name=%s alive=%s daemon=%s" % (t.name, t.is_alive(), t.daemon), threads)) 173 res = "Total %s Threads\n%s" % (len(threads), threadinfo) 174 return res 175 176 def uptime(self, args): 177 start = self.controller.started 178 diff = datetime.datetime.now() - start 179 return "Fuglu was started on %s\nUptime: %s" % (start, diff) 180 181 def exceptionlist(self, args): 182 """return last stacktrace""" 183 excstring = "" 184 i = 0 185 for excinfo, thetime, threadinfo in CrashStore.exceptions: 186 i += 1 187 fmt = traceback.format_exception(*excinfo) 188 timestr = datetime.datetime.fromtimestamp(thetime).ctime() 189 excstring = excstring + \ 190 "\n[%s] %s : %s\n" % (i, timestr, threadinfo) 191 excstring = excstring + "".join(fmt) 192 return excstring 193 194 def stats(self, args): 195 start = self.controller.started 196 runtime = datetime.datetime.now() - start 197 stats = self.controller.statsthread.stats 198 template = """Fuglu statistics 199--------------- 200Uptime:\t\t${uptime} 201Avg scan time:\t${scantime} 202Total msgs:\t${totalcount} in:${incount} out:${outcount} 203Ham:\t\t${hamcount} 204Spam:\t\t${spamcount} 205Virus:\t\t${viruscount} 206Block:\t\t${blockedcount} 207 """ 208 renderer = string.Template(template) 209 vrs = dict( 210 uptime=runtime, 211 scantime=stats.scantime(), 212 totalcount=stats.totalcount, 213 hamcount=stats.hamcount, 214 viruscount=stats.viruscount, 215 spamcount=stats.spamcount, 216 incount=stats.incount, 217 outcount=stats.outcount, 218 blockedcount=stats.blockedcount, 219 ) 220 res = renderer.safe_substitute(vrs) 221 return res 222 223 224class CrashStore(object): 225 exceptions = [] 226 227 @staticmethod 228 def store_exception(exc_info=None, thread=None): 229 if exc_info is None: 230 exc_info = sys.exc_info() 231 232 if thread is None: 233 thread = threading.currentThread() 234 235 name = thread.getName() 236 info = "" 237 if hasattr(thread, 'threadinfo'): 238 info = thread.threadinfo 239 desc = "%s (%s)" % (name, info) 240 241 maxtracebacks = 10 242 CrashStore.exceptions.append((exc_info, time.time(), desc),) 243 while len(CrashStore.exceptions) > maxtracebacks: 244 CrashStore.exceptions.pop(0) 245 246