1# Note: This docstring is also used by this script's command line help. 2"""A one-stop helper for desktop app to acquire an authorization code. 3 4It starts a web server to listen redirect_uri, waiting for auth code. 5It optionally opens a browser window to guide a human user to manually login. 6After obtaining an auth code, the web server will automatically shut down. 7""" 8import logging 9import socket 10from string import Template 11 12try: # Python 3 13 from http.server import HTTPServer, BaseHTTPRequestHandler 14 from urllib.parse import urlparse, parse_qs, urlencode 15except ImportError: # Fall back to Python 2 16 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 17 from urlparse import urlparse, parse_qs 18 from urllib import urlencode 19 20 21logger = logging.getLogger(__name__) 22 23 24def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing 25 with AuthCodeReceiver(port=listen_port) as receiver: 26 return receiver.get_auth_response( 27 auth_uri=auth_uri, 28 welcome_template="""<html><body> 29 Open this link to <a href='$auth_uri'>Sign In</a> 30 (You may want to use incognito window) 31 <hr><a href='$abort_uri'>Abort</a> 32 </body></html>""", 33 ).get("code") 34 35 36def _browse(auth_uri): # throws ImportError, possibly webbrowser.Error in future 37 import webbrowser # Lazy import. Some distro may not have this. 38 return webbrowser.open(auth_uri) # Use default browser. Customizable by $BROWSER 39 40 41def _qs2kv(qs): 42 """Flatten parse_qs()'s single-item lists into the item itself""" 43 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v 44 for k, v in qs.items()} 45 46 47class _AuthCodeHandler(BaseHTTPRequestHandler): 48 def do_GET(self): 49 # For flexibility, we choose to not check self.path matching redirect_uri 50 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') 51 qs = parse_qs(urlparse(self.path).query) 52 if qs.get('code') or qs.get("error"): # So, it is an auth response 53 self.server.auth_response = _qs2kv(qs) 54 logger.debug("Got auth response: %s", self.server.auth_response) 55 template = (self.server.success_template 56 if "code" in qs else self.server.error_template) 57 self._send_full_response( 58 template.safe_substitute(**self.server.auth_response)) 59 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 60 else: 61 self._send_full_response(self.server.welcome_page) 62 63 def _send_full_response(self, body, is_ok=True): 64 self.send_response(200 if is_ok else 400) 65 content_type = 'text/html' if body.startswith('<') else 'text/plain' 66 self.send_header('Content-type', content_type) 67 self.end_headers() 68 self.wfile.write(body.encode("utf-8")) 69 70 def log_message(self, format, *args): 71 logger.debug(format, *args) # To override the default log-to-stderr behavior 72 73 74class _AuthCodeHttpServer(HTTPServer): 75 def handle_timeout(self): 76 # It will be triggered when no request comes in self.timeout seconds. 77 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout 78 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server 79 # We choose to not call self.server_close() here, 80 # because it would cause a socket.error exception in handle_request(), 81 # and likely end up the server being server_close() twice. 82 83 84class _AuthCodeHttpServer6(_AuthCodeHttpServer): 85 address_family = socket.AF_INET6 86 87 88class AuthCodeReceiver(object): 89 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API 90 def __init__(self, port=None): 91 """Create a Receiver waiting for incoming auth response. 92 93 :param port: 94 The local web server will listen at http://...:<port> 95 You need to use the same port when you register with your app. 96 If your Identity Provider supports dynamic port, you can use port=0 here. 97 Port 0 means to use an arbitrary unused port, per this official example: 98 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins 99 """ 100 address = "127.0.0.1" # Hardcode, for now, Not sure what to expose, yet. 101 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3): 102 # * Clients should listen on the loopback network interface only. 103 # (It is not recommended to use "" shortcut to bind all addr.) 104 # * the use of localhost is NOT RECOMMENDED. 105 # (Use) the loopback IP literal 106 # rather than localhost avoids inadvertently listening on network 107 # interfaces other than the loopback interface. 108 # Note: 109 # When this server physically listens to a specific IP (as it should), 110 # you will still be able to specify your redirect_uri using either 111 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration. 112 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer 113 # TODO: But, it would treat "localhost" or "" as IPv4. 114 # If pressed, we might just expose a family parameter to caller. 115 self._server = Server((address, port or 0), _AuthCodeHandler) 116 117 def get_port(self): 118 """The port this server actually listening to""" 119 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address 120 return self._server.server_address[1] 121 122 def get_auth_response(self, auth_uri=None, timeout=None, state=None, 123 welcome_template=None, success_template=None, error_template=None, 124 auth_uri_callback=None, 125 ): 126 """Wait and return the auth response. Raise RuntimeError when timeout. 127 128 :param str auth_uri: 129 If provided, this function will try to open a local browser. 130 :param int timeout: In seconds. None means wait indefinitely. 131 :param str state: 132 You may provide the state you used in auth_uri, 133 then we will use it to validate incoming response. 134 :param str welcome_template: 135 If provided, your end user will see it instead of the auth_uri. 136 When present, it shall be a plaintext or html template following 137 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_, 138 and include some of these placeholders: $auth_uri and $abort_uri. 139 :param str success_template: 140 The page will be displayed when authentication was largely successful. 141 Placeholders can be any of these: 142 https://tools.ietf.org/html/rfc6749#section-5.1 143 :param str error_template: 144 The page will be displayed when authentication encountered error. 145 Placeholders can be any of these: 146 https://tools.ietf.org/html/rfc6749#section-5.2 147 :param callable auth_uri_callback: 148 A function with the shape of lambda auth_uri: ... 149 When a browser was unable to be launch, this function will be called, 150 so that the app could tell user to manually visit the auth_uri. 151 :return: 152 The auth response of the first leg of Auth Code flow, 153 typically {"code": "...", "state": "..."} or {"error": "...", ...} 154 See https://tools.ietf.org/html/rfc6749#section-4.1.2 155 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse 156 Returns None when the state was mismatched, or when timeout occurred. 157 """ 158 welcome_uri = "http://localhost:{p}".format(p=self.get_port()) 159 abort_uri = "{loc}?error=abort".format(loc=welcome_uri) 160 logger.debug("Abort by visit %s", abort_uri) 161 self._server.welcome_page = Template(welcome_template or "").safe_substitute( 162 auth_uri=auth_uri, abort_uri=abort_uri) 163 if auth_uri: # Now attempt to open a local browser to visit it 164 _uri = welcome_uri if welcome_template else auth_uri 165 logger.info("Open a browser on this device to visit: %s" % _uri) 166 browser_opened = False 167 try: 168 browser_opened = _browse(_uri) 169 except: # Had to use broad except, because the potential 170 # webbrowser.Error is purposely undefined outside of _browse(). 171 # Absorb and proceed. Because browser could be manually run elsewhere. 172 logger.exception("_browse(...) unsuccessful") 173 if not browser_opened: 174 if not auth_uri_callback: 175 logger.warning( 176 "Found no browser in current environment. " 177 "If this program is being run inside a container " 178 "which has access to host network " 179 "(i.e. started by `docker run --net=host -it ...`), " 180 "you can use browser on host to visit the following link. " 181 "Otherwise, this auth attempt would either timeout " 182 "(current timeout setting is {timeout}) " 183 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format( 184 auth_uri=_uri, timeout=timeout)) 185 else: # Then it is the auth_uri_callback()'s job to inform the user 186 auth_uri_callback(_uri) 187 188 self._server.success_template = Template(success_template or 189 "Authentication completed. You can close this window now.") 190 self._server.error_template = Template(error_template or 191 "Authentication failed. $error: $error_description. ($error_uri)") 192 193 self._server.timeout = timeout # Otherwise its handle_timeout() won't work 194 self._server.auth_response = {} # Shared with _AuthCodeHandler 195 while True: 196 # Derived from 197 # https://docs.python.org/2/library/basehttpserver.html#more-examples 198 self._server.handle_request() 199 if self._server.auth_response: 200 if state and state != self._server.auth_response.get("state"): 201 logger.debug("State mismatch. Ignoring this noise.") 202 else: 203 break 204 return self._server.auth_response 205 206 def close(self): 207 """Either call this eventually; or use the entire class as context manager""" 208 self._server.server_close() 209 210 def __enter__(self): 211 return self 212 213 def __exit__(self, exc_type, exc_val, exc_tb): 214 self.close() 215 216# Note: Manually use or test this module by: 217# python -m path.to.this.file -h 218if __name__ == '__main__': 219 import argparse, json 220 from .oauth2 import Client 221 logging.basicConfig(level=logging.INFO) 222 p = parser = argparse.ArgumentParser( 223 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 224 description=__doc__ + "The auth code received will be shown at stdout.") 225 p.add_argument( 226 '--endpoint', help="The auth endpoint for your app.", 227 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize") 228 p.add_argument('client_id', help="The client_id of your application") 229 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri") 230 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri") 231 p.add_argument('--scope', default=None, help="The scope list") 232 args = parser.parse_args() 233 client = Client({"authorization_endpoint": args.endpoint}, args.client_id) 234 with AuthCodeReceiver(port=args.port) as receiver: 235 flow = client.initiate_auth_code_flow( 236 scope=args.scope.split() if args.scope else None, 237 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()), 238 ) 239 print(json.dumps(receiver.get_auth_response( 240 auth_uri=flow["auth_uri"], 241 welcome_template= 242 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a", 243 error_template="Oh no. $error", 244 success_template="Oh yeah. Got $code", 245 timeout=60, 246 state=flow["state"], # Optional 247 ), indent=4)) 248 249