1#   Copyright 2009-2018 Oli Schacher
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15import threading
16import sys
17import time
18import socket
19import logging
20import datetime
21import traceback
22import string
23import os
24from fuglu.localStringEncoding import force_bString, force_uString
25
26
27class ControlServer(object):
28
29    def __init__(self, controller, port=None, address="127.0.0.1"):
30        if port is None:
31            port = "/tmp/fuglu_control.sock"
32
33        if isinstance(port, str):
34            try:
35                port = int(port)
36            except ValueError:
37                pass
38
39        if isinstance(port, int):
40            porttype = "inet"
41            self.logger = logging.getLogger("fuglu.control.%s" % port)
42            self.logger.debug('Starting Control/Info server on port %s' % port)
43        else:
44            porttype = "unix"
45            self.logger = logging.getLogger(
46                "fuglu.control.%s" % os.path.basename(port))
47            self.logger.debug('Starting Control/Info server on %s' % port)
48
49        self.port = port
50        self.controller = controller
51        self.stayalive = 1
52
53        try:
54            if porttype == "inet":
55                addr_f = socket.getaddrinfo(address, 0)[0][0]
56                self._socket = socket.socket(
57                    addr_f, socket.SOCK_STREAM)
58                self._socket.setsockopt(
59                    socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
60                self._socket.bind((address, port))
61            else:
62                try:
63                    os.remove(port)
64                except Exception:
65                    pass
66                self._socket = socket.socket(
67                    socket.AF_UNIX, socket.SOCK_STREAM)
68                self._socket.bind(port)
69
70            self._socket.listen(5)
71        except Exception as e:
72            self.logger.error('Could not start control server: %s' % e)
73            sys.exit(1)
74
75    def shutdown(self):
76        self.stayalive = False
77        self.logger.info("Control Server on port %s shutting down" % self.port)
78        try:
79            self._socket.shutdown(socket.SHUT_RDWR)
80            self._socket.close()
81            time.sleep(3)
82        except Exception:
83            pass
84
85    def serve(self):
86        threading.currentThread().name = 'ControlServer Thread'
87        controller = self.controller
88
89        self.logger.info('Control/Info Server running on port %s' % self.port)
90        while self.stayalive:
91            try:
92                self.logger.debug('Waiting for connection...')
93                nsd = self._socket.accept()
94                if not self.stayalive:
95                    break
96                engine = ControlSession(nsd[0], controller)
97                self.logger.debug('Incoming connection from %s' % str(nsd[1]))
98                engine.handlesession()
99
100            except Exception:
101                fmt = traceback.format_exc()
102                self.logger.error('Exception in serve(): %s' % fmt)
103
104
105class ControlSession(object):
106
107    def __init__(self, socket, controller):
108        self.controller = controller
109        self.socket = socket
110        self.commands = {
111            'workerlist': self.workerlist,
112            'threadlist': self.threadlist,
113            'uptime': self.uptime,
114            'stats': self.stats,
115            'exceptionlist': self.exceptionlist,
116            'netconsole': self.netconsole,
117        }
118        self.logger = logging.getLogger('fuglu.controlsession')
119
120    def handlesession(self):
121        line = force_uString(self.socket.recv(4096)).lower().strip()
122        if line == '':
123            self.socket.close()
124            return
125
126        self.logger.debug('Control Socket command: %s' % line)
127        parts = line.split()
128        answer = self.handle_command(parts[0], parts[1:])
129        self.socket.sendall(force_bString(answer))
130        self.socket.close()
131
132    def handle_command(self, command, args):
133        if command not in self.commands:
134            return "ERR no such command: "+str(command)
135
136        res = self.commands[command](args)
137        return res
138
139    def netconsole(self, args):
140        port = 1337
141        bind = "127.0.0.1"
142        if len(args) > 0:
143            port = int(args[0])
144        if len(args) > 1:
145            bind = args[1]
146        nc_thread = threading.Thread(
147            name='net console', target=self.controller.run_netconsole, args=(port, bind))
148        nc_thread.daemon = True
149        nc_thread.start()
150        return "Python interactive console starting on %s port %s" % (bind, port)
151
152    def workerlist(self, args):
153        """list of mail scanning workers"""
154        threadpool = self.controller.threadpool
155        res=""
156        if threadpool is not None:
157            workerlist = "\n%s" % '\n*******\n'.join(map(repr, threadpool.workers))
158            res += "Total %s worker threads\n%s" % (len(threadpool.workers), workerlist)
159
160        procpool = self.controller.procpool
161        if procpool is not None:
162            childstate_dict = procpool.shared_state
163            workerlist = "\n%s" % '\n*******\n'.join(["%s: %s"%(procname,procstate) for procname,procstate in childstate_dict.items()])
164            res += "Total %s worker processes\n%s" % (len(procpool.workers), workerlist)
165
166        return res
167
168    def threadlist(self, args):
169        """list of all threads"""
170        threads = threading.enumerate()
171        threadinfo = "\n%s" % '\n*******\n'.join(
172            map(lambda t: "name=%s alive=%s daemon=%s" % (t.name, t.is_alive(), t.daemon), threads))
173        res = "Total %s Threads\n%s" % (len(threads), threadinfo)
174        return res
175
176    def uptime(self, args):
177        start = self.controller.started
178        diff = datetime.datetime.now() - start
179        return "Fuglu was started on %s\nUptime: %s" % (start, diff)
180
181    def exceptionlist(self, args):
182        """return last stacktrace"""
183        excstring = ""
184        i = 0
185        for excinfo, thetime, threadinfo in CrashStore.exceptions:
186            i += 1
187            fmt = traceback.format_exception(*excinfo)
188            timestr = datetime.datetime.fromtimestamp(thetime).ctime()
189            excstring = excstring + \
190                "\n[%s] %s : %s\n" % (i, timestr, threadinfo)
191            excstring = excstring + "".join(fmt)
192        return excstring
193
194    def stats(self, args):
195        start = self.controller.started
196        runtime = datetime.datetime.now() - start
197        stats = self.controller.statsthread.stats
198        template = """Fuglu statistics
199---------------
200Uptime:\t\t${uptime}
201Avg scan time:\t${scantime}
202Total msgs:\t${totalcount} in:${incount} out:${outcount}
203Ham:\t\t${hamcount}
204Spam:\t\t${spamcount}
205Virus:\t\t${viruscount}
206Block:\t\t${blockedcount}
207        """
208        renderer = string.Template(template)
209        vrs = dict(
210            uptime=runtime,
211            scantime=stats.scantime(),
212            totalcount=stats.totalcount,
213            hamcount=stats.hamcount,
214            viruscount=stats.viruscount,
215            spamcount=stats.spamcount,
216            incount=stats.incount,
217            outcount=stats.outcount,
218            blockedcount=stats.blockedcount,
219        )
220        res = renderer.safe_substitute(vrs)
221        return res
222
223
224class CrashStore(object):
225    exceptions = []
226
227    @staticmethod
228    def store_exception(exc_info=None, thread=None):
229        if exc_info is None:
230            exc_info = sys.exc_info()
231
232        if thread is None:
233            thread = threading.currentThread()
234
235        name = thread.getName()
236        info = ""
237        if hasattr(thread, 'threadinfo'):
238            info = thread.threadinfo
239        desc = "%s (%s)" % (name, info)
240
241        maxtracebacks = 10
242        CrashStore.exceptions.append((exc_info, time.time(), desc),)
243        while len(CrashStore.exceptions) > maxtracebacks:
244            CrashStore.exceptions.pop(0)
245
246