1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4 5""" 6Suppport for the client part of the SAML2.0 SOAP binding. 7""" 8import logging 9import re 10 11from saml2 import create_class_from_element_tree 12from saml2.samlp import NAMESPACE as SAMLP_NAMESPACE 13from saml2.schema import soapenv 14 15try: 16 from xml.etree import cElementTree as ElementTree 17except ImportError: 18 try: 19 import cElementTree as ElementTree 20 except ImportError: 21 #noinspection PyUnresolvedReferences 22 from elementtree import ElementTree 23import defusedxml.ElementTree 24 25 26logger = logging.getLogger(__name__) 27 28 29class XmlParseError(Exception): 30 pass 31 32 33class WrongMessageType(Exception): 34 pass 35 36 37def parse_soap_enveloped_saml_response(text): 38 tags = ['{%s}Response' % SAMLP_NAMESPACE, 39 '{%s}LogoutResponse' % SAMLP_NAMESPACE] 40 return parse_soap_enveloped_saml_thingy(text, tags) 41 42 43def parse_soap_enveloped_saml_logout_response(text): 44 tags = ['{%s}Response' % SAMLP_NAMESPACE, 45 '{%s}LogoutResponse' % SAMLP_NAMESPACE] 46 return parse_soap_enveloped_saml_thingy(text, tags) 47 48 49def parse_soap_enveloped_saml_attribute_query(text): 50 expected_tag = '{%s}AttributeQuery' % SAMLP_NAMESPACE 51 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 52 53 54def parse_soap_enveloped_saml_attribute_response(text): 55 tags = ['{%s}Response' % SAMLP_NAMESPACE, 56 '{%s}AttributeResponse' % SAMLP_NAMESPACE] 57 return parse_soap_enveloped_saml_thingy(text, tags) 58 59 60def parse_soap_enveloped_saml_logout_request(text): 61 expected_tag = '{%s}LogoutRequest' % SAMLP_NAMESPACE 62 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 63 64 65def parse_soap_enveloped_saml_authn_request(text): 66 expected_tag = '{%s}AuthnRequest' % SAMLP_NAMESPACE 67 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 68 69 70def parse_soap_enveloped_saml_artifact_resolve(text): 71 expected_tag = '{%s}ArtifactResolve' % SAMLP_NAMESPACE 72 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 73 74 75def parse_soap_enveloped_saml_artifact_response(text): 76 expected_tag = '{%s}ArtifactResponse' % SAMLP_NAMESPACE 77 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 78 79 80def parse_soap_enveloped_saml_name_id_mapping_request(text): 81 expected_tag = '{%s}NameIDMappingRequest' % SAMLP_NAMESPACE 82 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 83 84 85def parse_soap_enveloped_saml_name_id_mapping_response(text): 86 expected_tag = '{%s}NameIDMappingResponse' % SAMLP_NAMESPACE 87 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 88 89 90def parse_soap_enveloped_saml_manage_name_id_request(text): 91 expected_tag = '{%s}ManageNameIDRequest' % SAMLP_NAMESPACE 92 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 93 94 95def parse_soap_enveloped_saml_manage_name_id_response(text): 96 expected_tag = '{%s}ManageNameIDResponse' % SAMLP_NAMESPACE 97 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 98 99 100def parse_soap_enveloped_saml_assertion_id_request(text): 101 expected_tag = '{%s}AssertionIDRequest' % SAMLP_NAMESPACE 102 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 103 104 105def parse_soap_enveloped_saml_assertion_id_response(text): 106 tags = ['{%s}Response' % SAMLP_NAMESPACE, 107 '{%s}AssertionIDResponse' % SAMLP_NAMESPACE] 108 return parse_soap_enveloped_saml_thingy(text, tags) 109 110 111def parse_soap_enveloped_saml_authn_query(text): 112 expected_tag = '{%s}AuthnQuery' % SAMLP_NAMESPACE 113 return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 114 115 116def parse_soap_enveloped_saml_authn_query_response(text): 117 tags = ['{%s}Response' % SAMLP_NAMESPACE] 118 return parse_soap_enveloped_saml_thingy(text, tags) 119 120 121def parse_soap_enveloped_saml_authn_response(text): 122 tags = ['{%s}Response' % SAMLP_NAMESPACE] 123 return parse_soap_enveloped_saml_thingy(text, tags) 124 125 126#def parse_soap_enveloped_saml_logout_response(text): 127# expected_tag = '{%s}LogoutResponse' % SAMLP_NAMESPACE 128# return parse_soap_enveloped_saml_thingy(text, [expected_tag]) 129 130def parse_soap_enveloped_saml_thingy(text, expected_tags): 131 """Parses a SOAP enveloped SAML thing and returns the thing as 132 a string. 133 134 :param text: The SOAP object as XML string 135 :param expected_tags: What the tag of the SAML thingy is expected to be. 136 :return: SAML thingy as a string 137 """ 138 envelope = defusedxml.ElementTree.fromstring(text) 139 140 envelope_tag = "{%s}Envelope" % soapenv.NAMESPACE 141 if envelope.tag != envelope_tag: 142 raise ValueError( 143 "Invalid envelope tag '{invalid}' should be '{valid}'".format( 144 invalid=envelope.tag, valid=envelope_tag 145 ) 146 ) 147 148 if len(envelope) < 1: 149 raise Exception("No items in envelope.") 150 151 body = None 152 for part in envelope: 153 if part.tag == '{%s}Body' % soapenv.NAMESPACE: 154 n_children = len(part) 155 if n_children != 1: 156 raise Exception( 157 "Expected a single child element, found {n}".format(n=n_children) 158 ) 159 body = part 160 break 161 162 if body is None: 163 return "" 164 165 saml_part = body[0] 166 if saml_part.tag in expected_tags: 167 return ElementTree.tostring(saml_part, encoding="UTF-8") 168 else: 169 raise WrongMessageType("Was '%s' expected one of %s" % (saml_part.tag, 170 expected_tags)) 171 172 173NS_AND_TAG = re.compile(r"\{([^}]+)\}(.*)") 174 175 176def instanciate_class(item, modules): 177 m = NS_AND_TAG.match(item.tag) 178 ns, tag = m.groups() 179 for module in modules: 180 if module.NAMESPACE == ns: 181 try: 182 target = module.ELEMENT_BY_TAG[tag] 183 return create_class_from_element_tree(target, item) 184 except KeyError: 185 continue 186 raise Exception("Unknown class: ns='%s', tag='%s'" % (ns, tag)) 187 188 189def class_instances_from_soap_enveloped_saml_thingies(text, modules): 190 """Parses a SOAP enveloped header and body SAML thing and returns the 191 thing as a dictionary class instance. 192 193 :param text: The SOAP object as XML 194 :param modules: modules representing xsd schemas 195 :return: The body and headers as class instances 196 """ 197 try: 198 envelope = defusedxml.ElementTree.fromstring(text) 199 except Exception as exc: 200 raise XmlParseError("%s" % exc) 201 202 envelope_tag = "{%s}Envelope" % soapenv.NAMESPACE 203 if envelope.tag != envelope_tag: 204 raise ValueError( 205 "Invalid envelope tag '{invalid}' should be '{valid}'".format( 206 invalid=envelope.tag, valid=envelope_tag 207 ) 208 ) 209 210 if len(envelope) < 1: 211 raise Exception("No items in envelope.") 212 213 env = {"header": [], "body": None} 214 215 for part in envelope: 216 if part.tag == '{%s}Body' % soapenv.NAMESPACE: 217 if len(envelope) < 1: 218 raise Exception("No items in envelope part.") 219 env["body"] = instanciate_class(part[0], modules) 220 elif part.tag == "{%s}Header" % soapenv.NAMESPACE: 221 for item in part: 222 env["header"].append(instanciate_class(item, modules)) 223 224 return env 225 226 227def open_soap_envelope(text): 228 """ 229 230 :param text: SOAP message 231 :return: dictionary with two keys "body"/"header" 232 """ 233 try: 234 envelope = defusedxml.ElementTree.fromstring(text) 235 except Exception as exc: 236 raise XmlParseError("%s" % exc) 237 238 envelope_tag = "{%s}Envelope" % soapenv.NAMESPACE 239 if envelope.tag != envelope_tag: 240 raise ValueError( 241 "Invalid envelope tag '{invalid}' should be '{valid}'".format( 242 invalid=envelope.tag, valid=envelope_tag 243 ) 244 ) 245 246 if len(envelope) < 1: 247 raise Exception("No items in envelope.") 248 249 content = {"header": [], "body": None} 250 251 for part in envelope: 252 if part.tag == '{%s}Body' % soapenv.NAMESPACE: 253 if len(envelope) < 1: 254 raise Exception("No items in envelope part.") 255 content["body"] = ElementTree.tostring(part[0], encoding="UTF-8") 256 elif part.tag == "{%s}Header" % soapenv.NAMESPACE: 257 for item in part: 258 _str = ElementTree.tostring(item, encoding="UTF-8") 259 content["header"].append(_str) 260 261 return content 262 263 264def make_soap_enveloped_saml_thingy(thingy, headers=None): 265 """ Returns a soap envelope containing a SAML request 266 as a text string. 267 268 :param thingy: The SAML thingy 269 :return: The SOAP envelope as a string 270 """ 271 soap_envelope = soapenv.Envelope() 272 273 if headers: 274 _header = soapenv.Header() 275 _header.add_extension_elements(headers) 276 soap_envelope.header = _header 277 278 soap_envelope.body = soapenv.Body() 279 soap_envelope.body.add_extension_element(thingy) 280 281 return "%s" % soap_envelope 282 283 284def soap_fault(message=None, actor=None, code=None, detail=None): 285 """ Create a SOAP Fault message 286 287 :param message: Human readable error message 288 :param actor: Who discovered the error 289 :param code: Error code 290 :param detail: More specific error message 291 :return: A SOAP Fault message as a string 292 """ 293 _string = _actor = _code = _detail = None 294 295 if message: 296 _string = soapenv.Fault_faultstring(text=message) 297 if actor: 298 _actor = soapenv.Fault_faultactor(text=actor) 299 if code: 300 _code = soapenv.Fault_faultcode(text=code) 301 if detail: 302 _detail = soapenv.Fault_detail(text=detail) 303 304 fault = soapenv.Fault( 305 faultcode=_code, 306 faultstring=_string, 307 faultactor=_actor, 308 detail=_detail, 309 ) 310 311 return "%s" % fault 312