1from __future__ import absolute_import, print_function 2 3"""SSLServer 4 5Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved.""" 6 7 8# M2Crypto 9from M2Crypto.SSL import SSLError 10from M2Crypto.SSL.Connection import Connection 11from M2Crypto.SSL.Context import Context # noqa 12# from M2Crypto import six # noqa 13from M2Crypto import util # noqa 14from M2Crypto.six.moves.socketserver import (BaseRequestHandler, BaseServer, 15 TCPServer, ThreadingMixIn) 16import os 17if os.name != 'nt': 18 from M2Crypto.six.moves.socketserver import ForkingMixIn 19from socket import socket # noqa 20from typing import Union # noqa 21 22__all__ = ['SSLServer', 'ForkingSSLServer', 'ThreadingSSLServer'] 23 24 25class SSLServer(TCPServer): 26 def __init__(self, server_address, RequestHandlerClass, ssl_context, # noqa 27 bind_and_activate=True): 28 # type: (util.AddrType, BaseRequestHandler, Context, bool) -> None 29 """ 30 Superclass says: Constructor. May be extended, do not override. 31 This class says: Ho-hum. 32 """ 33 BaseServer.__init__(self, server_address, RequestHandlerClass) 34 self.ssl_ctx = ssl_context 35 self.socket = Connection(self.ssl_ctx) 36 if bind_and_activate: 37 self.server_bind() 38 self.server_activate() 39 40 def handle_request(self): 41 # type: () -> None 42 request = None 43 client_address = None 44 try: 45 request, client_address = self.get_request() 46 if self.verify_request(request, client_address): 47 self.process_request(request, client_address) 48 except SSLError: 49 self.handle_error(request, client_address) 50 51 def handle_error(self, request, client_address): 52 # type: (Union[socket, Connection], util.AddrType) -> None 53 print('-' * 40) 54 import traceback 55 traceback.print_exc() 56 print('-' * 40) 57 58 59class ThreadingSSLServer(ThreadingMixIn, SSLServer): 60 pass 61 62 63if os.name != 'nt': 64 class ForkingSSLServer(ForkingMixIn, SSLServer): 65 pass 66