1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4 5""" 6Contains classes used in the SAML ECP profile 7""" 8import logging 9from saml2.client_base import ACTOR, MIME_PAOS 10from saml2.ecp_client import SERVICE 11 12from saml2 import element_to_extension_element 13from saml2 import samlp 14from saml2 import soap 15from saml2 import BINDING_SOAP, BINDING_PAOS 16 17from saml2.profile import paos 18from saml2.profile import ecp 19 20#from saml2.client import Saml2Client 21from saml2.server import Server 22 23from saml2.schema import soapenv 24 25from saml2.response import authn_response 26 27from saml2 import saml 28 29logger = logging.getLogger(__name__) 30 31 32def ecp_capable(headers): 33 if MIME_PAOS in headers["Accept"]: 34 if "PAOS" in headers: 35 if 'ver="%s";"%s"' % (paos.NAMESPACE, 36 SERVICE) in headers["PAOS"]: 37 return True 38 39 return False 40 41 42#noinspection PyUnusedLocal 43def ecp_auth_request( 44 cls, entityid=None, relay_state="", sign=None, sign_alg=None, digest_alg=None 45): 46 """ Makes an authentication request. 47 48 :param entityid: The entity ID of the IdP to send the request to 49 :param relay_state: To where the user should be returned after 50 successfull log in. 51 :param sign: Whether the request should be signed or not. 52 :return: AuthnRequest response 53 """ 54 55 eelist = [] 56 57 # ---------------------------------------- 58 # <paos:Request> 59 # ---------------------------------------- 60 my_url = cls.service_urls(BINDING_PAOS)[0] 61 62 # must_understand and actor according to the standard 63 # 64 paos_request = paos.Request( 65 must_understand="1", 66 actor=ACTOR, 67 response_consumer_url=my_url, 68 service=SERVICE, 69 ) 70 71 eelist.append(element_to_extension_element(paos_request)) 72 73 # ---------------------------------------- 74 # <samlp:AuthnRequest> 75 # ---------------------------------------- 76 77 logger.info("entityid: %s, binding: %s" % (entityid, BINDING_SOAP)) 78 79 location = cls._sso_location(entityid, binding=BINDING_SOAP) 80 req_id, authn_req = cls.create_authn_request( 81 location, 82 binding=BINDING_PAOS, 83 service_url_binding=BINDING_PAOS, 84 sign=sign, 85 sign_alg=sign_alg, 86 digest_alg=digest_alg, 87 ) 88 89 body = soapenv.Body() 90 body.extension_elements = [element_to_extension_element(authn_req)] 91 92 # ---------------------------------------- 93 # <ecp:Request> 94 # ---------------------------------------- 95 96# idp = samlp.IDPEntry( 97# provider_id = "https://idp.example.org/entity", 98# name = "Example identity provider", 99# loc = "https://idp.example.org/saml2/sso", 100# ) 101# 102# idp_list = samlp.IDPList(idp_entry= [idp]) 103 104 idp_list = None 105 ecp_request = ecp.Request( 106 actor=ACTOR, 107 must_understand="1", 108 provider_name=None, 109 issuer=saml.Issuer(text=authn_req.issuer.text), 110 idp_list=idp_list, 111 ) 112 113 eelist.append(element_to_extension_element(ecp_request)) 114 115 # ---------------------------------------- 116 # <ecp:RelayState> 117 # ---------------------------------------- 118 119 relay_state = ecp.RelayState(actor=ACTOR, must_understand="1", text=relay_state) 120 121 eelist.append(element_to_extension_element(relay_state)) 122 123 header = soapenv.Header() 124 header.extension_elements = eelist 125 126 # ---------------------------------------- 127 # The SOAP envelope 128 # ---------------------------------------- 129 130 soap_envelope = soapenv.Envelope(header=header, body=body) 131 132 return req_id, str(soap_envelope) 133 134 135def handle_ecp_authn_response(cls, soap_message, outstanding=None): 136 rdict = soap.class_instances_from_soap_enveloped_saml_thingies( 137 soap_message, [paos, ecp, samlp] 138 ) 139 140 _relay_state = None 141 for item in rdict["header"]: 142 if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE: 143 _relay_state = item 144 145 response = authn_response( 146 cls.config, cls.service_urls(), outstanding, allow_unsolicited=True 147 ) 148 149 response.loads("%s" % rdict["body"], False, soap_message) 150 response.verify() 151 cls.users.add_information_about_person(response.session_info()) 152 153 return response, _relay_state 154 155 156def ecp_response(target_url, response): 157 158 # ---------------------------------------- 159 # <ecp:Response 160 # ---------------------------------------- 161 162 ecp_response = ecp.Response(assertion_consumer_service_url=target_url) 163 header = soapenv.Header() 164 header.extension_elements = [element_to_extension_element(ecp_response)] 165 166 # ---------------------------------------- 167 # <samlp:Response 168 # ---------------------------------------- 169 170 body = soapenv.Body() 171 body.extension_elements = [element_to_extension_element(response)] 172 173 soap_envelope = soapenv.Envelope(header=header, body=body) 174 175 return "%s" % soap_envelope 176 177 178class ECPServer(Server): 179 """ This deals with what the IdP has to do 180 181 TODO: Still tentative 182 """ 183 def __init__(self, config_file="", config=None, cache=None): 184 Server.__init__(self, config_file, config, cache) 185 186 def parse_ecp_authn_query(self): 187 pass 188 189 def ecp_response(self): 190 191 # ---------------------------------------- 192 # <ecp:Response 193 # ---------------------------------------- 194 target_url = "" 195 196 ecp_response = ecp.Response(assertion_consumer_service_url=target_url) 197 header = soapenv.Body() 198 header.extension_elements = [element_to_extension_element(ecp_response)] 199 200 # ---------------------------------------- 201 # <samlp:Response 202 # ---------------------------------------- 203 204 response = samlp.Response() 205 body = soapenv.Body() 206 body.extension_elements = [element_to_extension_element(response)] 207 208 soap_envelope = soapenv.Envelope(header=header, body=body) 209 210 return "%s" % soap_envelope 211