1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4 5""" 6Contains a class that can do SAML ECP Authentication for other python 7programs. 8""" 9 10from six.moves import http_cookiejar as cookielib 11import logging 12 13from saml2 import soap 14from saml2 import saml 15from saml2 import samlp 16from saml2 import SAMLError 17from saml2 import BINDING_SOAP 18from saml2.client_base import MIME_PAOS 19from saml2.config import Config 20from saml2.entity import Entity 21from saml2.httpbase import set_list2dict, dict2set_list 22 23from saml2.profile import paos 24from saml2.profile import ecp 25 26from saml2.mdstore import MetadataStore 27from saml2.s_utils import BadRequest 28 29SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp" 30PAOS_HEADER_INFO = 'ver="%s";"%s"' % (paos.NAMESPACE, SERVICE) 31 32logger = logging.getLogger(__name__) 33 34 35class Client(Entity): 36 """ECP-aware client that works on the client (application) side. 37 38 You can use this class when you want to login user through 39 ECP-aware SP and IdP. 40 """ 41 42 def __init__(self, user, passwd, sp="", idp=None, metadata_file=None, 43 xmlsec_binary=None, verbose=0, ca_certs="", 44 disable_ssl_certificate_validation=True, key_file=None, 45 cert_file=None, config=None): 46 """ 47 :param user: user name 48 :param passwd: user password 49 :param sp: The SP URL 50 :param idp: The IdP PAOS endpoint 51 :param metadata_file: Where the metadata file is if used 52 :param xmlsec_binary: Where the xmlsec1 binary can be found (*) 53 :param verbose: Chatty or not 54 :param ca_certs: is the path of a file containing root CA certificates 55 for SSL server certificate validation (*) 56 :param disable_ssl_certificate_validation: If 57 disable_ssl_certificate_validation is true, SSL cert validation 58 will not be performed (*) 59 :param key_file: Private key filename (*) 60 :param cert_file: Certificate filename (*) 61 :param config: Config() instance, overrides all the parameters marked 62 with an asterisk (*) above 63 """ 64 if not config: 65 config = Config() 66 config.disable_ssl_certificate_validation = \ 67 disable_ssl_certificate_validation 68 config.key_file = key_file 69 config.cert_file = cert_file 70 config.ca_certs = ca_certs 71 config.xmlsec_binary = xmlsec_binary 72 73 Entity.__init__(self, "sp", config) 74 self._idp = idp 75 self._sp = sp 76 self.user = user 77 self.passwd = passwd 78 self._verbose = verbose 79 80 if metadata_file: 81 self._metadata = MetadataStore([saml, samlp], None, config) 82 self._metadata.load("local", metadata_file) 83 logger.debug("Loaded metadata from '%s'", metadata_file) 84 else: 85 self._metadata = None 86 87 self.metadata = self._metadata 88 89 self.cookie_handler = None 90 91 self.done_ecp = False 92 self.cookie_jar = cookielib.LWPCookieJar() 93 94 def phase2( 95 self, 96 authn_request, 97 rc_url, 98 idp_entity_id, 99 headers=None, 100 sign=False, 101 sign_alg=None, 102 **kwargs, 103 ): 104 """ 105 Doing the second phase of the ECP conversation, the conversation 106 with the IdP happens. 107 108 :param authn_request: The AuthenticationRequest 109 :param rc_url: The assertion consumer service url of the SP 110 :param idp_entity_id: The EntityID of the IdP 111 :param headers: Possible extra headers 112 :param sign: If the message should be signed 113 :return: The response from the IdP 114 """ 115 116 _, destination = self.pick_binding( 117 "single_sign_on_service", [BINDING_SOAP], "idpsso", entity_id=idp_entity_id 118 ) 119 120 ht_args = self.apply_binding( 121 BINDING_SOAP, authn_request, destination, sign=sign, sigalg=sign_alg 122 ) 123 124 if headers: 125 ht_args["headers"].extend(headers) 126 127 logger.debug("[P2] Sending request: %s", ht_args["data"]) 128 129 # POST the request to the IdP 130 response = self.send(**ht_args) 131 132 logger.debug("[P2] Got IdP response: %s", response) 133 134 if response.status_code != 200: 135 raise SAMLError( 136 "Request to IdP failed ({status}): {text}".format( 137 status=response.status_code, text=response.text 138 ) 139 ) 140 141 # SAMLP response in a SOAP envelope body, ecp response in headers 142 respdict = self.parse_soap_message(response.text) 143 144 if respdict is None: 145 raise SAMLError("Unexpected reply from the IdP") 146 147 logger.debug("[P2] IdP response dict: %s", respdict) 148 149 idp_response = respdict["body"] 150 151 expected_tag = "Response" 152 if idp_response.c_tag != expected_tag: 153 raise ValueError( 154 "Invalid Response tag '{invalid}' should be '{valid}'".format( 155 invalid=idp_response.c_tag, valid=expected_tag 156 ) 157 ) 158 159 logger.debug("[P2] IdP AUTHN response: %s", idp_response) 160 161 _ecp_response = None 162 for item in respdict["header"]: 163 if item.c_tag == "Response" and item.c_namespace == ecp.NAMESPACE: 164 _ecp_response = item 165 166 _acs_url = _ecp_response.assertion_consumer_service_url 167 if rc_url != _acs_url: 168 error = ("response_consumer_url '%s' does not match" % rc_url, 169 "assertion_consumer_service_url '%s" % _acs_url) 170 # Send an error message to the SP 171 _ = self.send(rc_url, "POST", data=soap.soap_fault(error)) 172 # Raise an exception so the user knows something went wrong 173 raise SAMLError(error) 174 175 return idp_response 176 177 @staticmethod 178 def parse_sp_ecp_response(respdict): 179 if respdict is None: 180 raise SAMLError("Unexpected reply from the SP") 181 182 logger.debug("[P1] SP response dict: %s", respdict) 183 184 # AuthnRequest in the body or not 185 authn_request = respdict["body"] 186 187 expected_tag = "AuthnRequest" 188 if authn_request.c_tag != expected_tag: 189 raise ValueError( 190 "Invalid AuthnRequest tag '{invalid}' should be '{valid}'".format( 191 invalid=authn_request.c_tag, valid=expected_tag 192 ) 193 ) 194 195 # ecp.RelayState among headers 196 _relay_state = None 197 _paos_request = None 198 for item in respdict["header"]: 199 if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE: 200 _relay_state = item 201 if item.c_tag == "Request" and item.c_namespace == paos.NAMESPACE: 202 _paos_request = item 203 204 if _paos_request is None: 205 raise BadRequest("Missing request") 206 207 _rc_url = _paos_request.response_consumer_url 208 209 return { 210 "authn_request": authn_request, 211 "rc_url": _rc_url, 212 "relay_state": _relay_state, 213 } 214 215 def ecp_conversation(self, respdict, idp_entity_id=None): 216 """ 217 218 :param respdict: 219 :param idp_entity_id: 220 :return: 221 """ 222 223 args = self.parse_sp_ecp_response(respdict) 224 225 # ********************** 226 # Phase 2 - talk to the IdP 227 # ********************** 228 229 idp_response = self.phase2(idp_entity_id=idp_entity_id, **args) 230 231 # ********************************** 232 # Phase 3 - back to the SP 233 # ********************************** 234 235 ht_args = self.use_soap(idp_response, args["rc_url"], [args["relay_state"]]) 236 ht_args["headers"][0] = ('Content-Type', MIME_PAOS) 237 logger.debug("[P3] Post to SP: %s", ht_args["data"]) 238 239 # POST the package from the IdP to the SP 240 response = self.send(**ht_args) 241 242 if response.status_code == 302: 243 # ignore where the SP is redirecting us to and go for the 244 # url I started off with. 245 pass 246 else: 247 raise SAMLError("Error POSTing package to SP: %s" % response.text) 248 249 logger.debug("[P3] SP response: %s", response.text) 250 251 self.done_ecp = True 252 logger.debug("Done ECP") 253 254 return None 255 256 @staticmethod 257 def add_paos_headers(headers=None): 258 if headers: 259 headers = set_list2dict(headers) 260 headers["PAOS"] = PAOS_HEADER_INFO 261 if "Accept" in headers: 262 headers["Accept"] += ";%s" % MIME_PAOS 263 elif "accept" in headers: 264 headers["Accept"] = headers["accept"] 265 headers["Accept"] += ";%s" % MIME_PAOS 266 del headers["accept"] 267 headers = dict2set_list(headers) 268 else: 269 headers = [ 270 ('Accept', 'text/html; %s' % MIME_PAOS), 271 ('PAOS', PAOS_HEADER_INFO) 272 ] 273 274 return headers 275 276 def operation(self, url, idp_entity_id, op, **opargs): 277 """ 278 This is the method that should be used by someone that wants 279 to authenticate using SAML ECP 280 281 :param url: The page that access is sought for 282 :param idp_entity_id: The entity ID of the IdP that should be 283 used for authentication 284 :param op: Which HTTP operation (GET/POST/PUT/DELETE) 285 :param opargs: Arguments to the HTTP call 286 :return: The page 287 """ 288 sp_url = self._sp 289 290 # ******************************************** 291 # Phase 1 - First conversation with the SP 292 # ******************************************** 293 # headers needed to indicate to the SP that I'm ECP enabled 294 295 opargs["headers"] = self.add_paos_headers(opargs["headers"]) 296 response = self.send(sp_url, op, **opargs) 297 logger.debug("[Op] SP response: %s" % response) 298 print(response.text) 299 300 if response.status_code != 200: 301 raise SAMLError( 302 "Request to SP failed: %s" % response.text) 303 304 # The response might be a AuthnRequest instance in a SOAP envelope 305 # body. If so it's the start of the ECP conversation 306 # Two SOAP header blocks; paos:Request and ecp:Request 307 # may also contain a ecp:RelayState SOAP header block 308 # If channel-binding was part of the PAOS header any number of 309 # <cb:ChannelBindings> header blocks may also be present 310 # if 'holder-of-key' option then one or more <ecp:SubjectConfirmation> 311 # header blocks may also be present 312 try: 313 respdict = self.parse_soap_message(response.text) 314 self.ecp_conversation(respdict, idp_entity_id) 315 316 # should by now be authenticated so this should go smoothly 317 response = self.send(url, op, **opargs) 318 except (soap.XmlParseError, AssertionError, KeyError): 319 raise 320 321 if response.status_code >= 400: 322 raise SAMLError("Error performing operation: %s" % ( 323 response.text,)) 324 325 return response 326 327 # different HTTP operations 328 def delete(self, url=None, idp_entity_id=None): 329 return self.operation(url, idp_entity_id, "DELETE") 330 331 def get(self, url=None, idp_entity_id=None, headers=None): 332 return self.operation(url, idp_entity_id, "GET", headers=headers) 333 334 def post(self, url=None, data="", idp_entity_id=None, headers=None): 335 return self.operation(url, idp_entity_id, "POST", data=data, 336 headers=headers) 337 338 def put(self, url=None, data="", idp_entity_id=None, headers=None): 339 return self.operation(url, idp_entity_id, "PUT", data=data, 340 headers=headers) 341