1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4
5"""
6Contains a class that can do SAML ECP Authentication for other python
7programs.
8"""
9
10from six.moves import http_cookiejar as cookielib
11import logging
12
13from saml2 import soap
14from saml2 import saml
15from saml2 import samlp
16from saml2 import SAMLError
17from saml2 import BINDING_SOAP
18from saml2.client_base import MIME_PAOS
19from saml2.config import Config
20from saml2.entity import Entity
21from saml2.httpbase import set_list2dict, dict2set_list
22
23from saml2.profile import paos
24from saml2.profile import ecp
25
26from saml2.mdstore import MetadataStore
27from saml2.s_utils import BadRequest
28
29SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
30PAOS_HEADER_INFO = 'ver="%s";"%s"' % (paos.NAMESPACE, SERVICE)
31
32logger = logging.getLogger(__name__)
33
34
35class Client(Entity):
36    """ECP-aware client that works on the client (application) side.
37
38    You can use this class when you want to login user through
39    ECP-aware SP and IdP.
40    """
41
42    def __init__(self, user, passwd, sp="", idp=None, metadata_file=None,
43                 xmlsec_binary=None, verbose=0, ca_certs="",
44                 disable_ssl_certificate_validation=True, key_file=None,
45                 cert_file=None, config=None):
46        """
47        :param user: user name
48        :param passwd: user password
49        :param sp: The SP URL
50        :param idp: The IdP PAOS endpoint
51        :param metadata_file: Where the metadata file is if used
52        :param xmlsec_binary: Where the xmlsec1 binary can be found (*)
53        :param verbose: Chatty or not
54        :param ca_certs: is the path of a file containing root CA certificates
55            for SSL server certificate validation (*)
56        :param disable_ssl_certificate_validation: If
57            disable_ssl_certificate_validation is true, SSL cert validation
58            will not be performed (*)
59        :param key_file: Private key filename (*)
60        :param cert_file: Certificate filename (*)
61        :param config: Config() instance, overrides all the parameters marked
62            with an asterisk (*) above
63        """
64        if not config:
65            config = Config()
66            config.disable_ssl_certificate_validation = \
67                disable_ssl_certificate_validation
68            config.key_file = key_file
69            config.cert_file = cert_file
70            config.ca_certs = ca_certs
71            config.xmlsec_binary = xmlsec_binary
72
73        Entity.__init__(self, "sp", config)
74        self._idp = idp
75        self._sp = sp
76        self.user = user
77        self.passwd = passwd
78        self._verbose = verbose
79
80        if metadata_file:
81            self._metadata = MetadataStore([saml, samlp], None, config)
82            self._metadata.load("local", metadata_file)
83            logger.debug("Loaded metadata from '%s'", metadata_file)
84        else:
85            self._metadata = None
86
87        self.metadata = self._metadata
88
89        self.cookie_handler = None
90
91        self.done_ecp = False
92        self.cookie_jar = cookielib.LWPCookieJar()
93
94    def phase2(
95        self,
96        authn_request,
97        rc_url,
98        idp_entity_id,
99        headers=None,
100        sign=False,
101        sign_alg=None,
102        **kwargs,
103    ):
104        """
105        Doing the second phase of the ECP conversation, the conversation
106        with the IdP happens.
107
108        :param authn_request: The AuthenticationRequest
109        :param rc_url: The assertion consumer service url of the SP
110        :param idp_entity_id: The EntityID of the IdP
111        :param headers: Possible extra headers
112        :param sign: If the message should be signed
113        :return: The response from the IdP
114        """
115
116        _, destination = self.pick_binding(
117            "single_sign_on_service", [BINDING_SOAP], "idpsso", entity_id=idp_entity_id
118        )
119
120        ht_args = self.apply_binding(
121            BINDING_SOAP, authn_request, destination, sign=sign, sigalg=sign_alg
122        )
123
124        if headers:
125            ht_args["headers"].extend(headers)
126
127        logger.debug("[P2] Sending request: %s", ht_args["data"])
128
129        # POST the request to the IdP
130        response = self.send(**ht_args)
131
132        logger.debug("[P2] Got IdP response: %s", response)
133
134        if response.status_code != 200:
135            raise SAMLError(
136                "Request to IdP failed ({status}): {text}".format(
137                    status=response.status_code, text=response.text
138                )
139            )
140
141        # SAMLP response in a SOAP envelope body, ecp response in headers
142        respdict = self.parse_soap_message(response.text)
143
144        if respdict is None:
145            raise SAMLError("Unexpected reply from the IdP")
146
147        logger.debug("[P2] IdP response dict: %s", respdict)
148
149        idp_response = respdict["body"]
150
151        expected_tag = "Response"
152        if idp_response.c_tag != expected_tag:
153            raise ValueError(
154                "Invalid Response tag '{invalid}' should be '{valid}'".format(
155                    invalid=idp_response.c_tag, valid=expected_tag
156                )
157            )
158
159        logger.debug("[P2] IdP AUTHN response: %s", idp_response)
160
161        _ecp_response = None
162        for item in respdict["header"]:
163            if item.c_tag == "Response" and item.c_namespace == ecp.NAMESPACE:
164                _ecp_response = item
165
166        _acs_url = _ecp_response.assertion_consumer_service_url
167        if rc_url != _acs_url:
168            error = ("response_consumer_url '%s' does not match" % rc_url,
169                     "assertion_consumer_service_url '%s" % _acs_url)
170            # Send an error message to the SP
171            _ = self.send(rc_url, "POST", data=soap.soap_fault(error))
172            # Raise an exception so the user knows something went wrong
173            raise SAMLError(error)
174
175        return idp_response
176
177    @staticmethod
178    def parse_sp_ecp_response(respdict):
179        if respdict is None:
180            raise SAMLError("Unexpected reply from the SP")
181
182        logger.debug("[P1] SP response dict: %s", respdict)
183
184        # AuthnRequest in the body or not
185        authn_request = respdict["body"]
186
187        expected_tag = "AuthnRequest"
188        if authn_request.c_tag != expected_tag:
189            raise ValueError(
190                "Invalid AuthnRequest tag '{invalid}' should be '{valid}'".format(
191                    invalid=authn_request.c_tag, valid=expected_tag
192                )
193            )
194
195        # ecp.RelayState among headers
196        _relay_state = None
197        _paos_request = None
198        for item in respdict["header"]:
199            if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
200                _relay_state = item
201            if item.c_tag == "Request" and item.c_namespace == paos.NAMESPACE:
202                _paos_request = item
203
204        if _paos_request is None:
205            raise BadRequest("Missing request")
206
207        _rc_url = _paos_request.response_consumer_url
208
209        return {
210            "authn_request": authn_request,
211            "rc_url": _rc_url,
212            "relay_state": _relay_state,
213        }
214
215    def ecp_conversation(self, respdict, idp_entity_id=None):
216        """
217
218        :param respdict:
219        :param idp_entity_id:
220        :return:
221        """
222
223        args = self.parse_sp_ecp_response(respdict)
224
225        # **********************
226        # Phase 2 - talk to the IdP
227        # **********************
228
229        idp_response = self.phase2(idp_entity_id=idp_entity_id, **args)
230
231        # **********************************
232        # Phase 3 - back to the SP
233        # **********************************
234
235        ht_args = self.use_soap(idp_response, args["rc_url"], [args["relay_state"]])
236        ht_args["headers"][0] = ('Content-Type', MIME_PAOS)
237        logger.debug("[P3] Post to SP: %s", ht_args["data"])
238
239        # POST the package from the IdP to the SP
240        response = self.send(**ht_args)
241
242        if response.status_code == 302:
243            # ignore where the SP is redirecting us to and go for the
244            # url I started off with.
245            pass
246        else:
247            raise SAMLError("Error POSTing package to SP: %s" % response.text)
248
249        logger.debug("[P3] SP response: %s", response.text)
250
251        self.done_ecp = True
252        logger.debug("Done ECP")
253
254        return None
255
256    @staticmethod
257    def add_paos_headers(headers=None):
258        if headers:
259            headers = set_list2dict(headers)
260            headers["PAOS"] = PAOS_HEADER_INFO
261            if "Accept" in headers:
262                headers["Accept"] += ";%s" % MIME_PAOS
263            elif "accept" in headers:
264                headers["Accept"] = headers["accept"]
265                headers["Accept"] += ";%s" % MIME_PAOS
266                del headers["accept"]
267            headers = dict2set_list(headers)
268        else:
269            headers = [
270                ('Accept', 'text/html; %s' % MIME_PAOS),
271                ('PAOS', PAOS_HEADER_INFO)
272            ]
273
274        return headers
275
276    def operation(self, url, idp_entity_id, op, **opargs):
277        """
278        This is the method that should be used by someone that wants
279        to authenticate using SAML ECP
280
281        :param url: The page that access is sought for
282        :param idp_entity_id: The entity ID of the IdP that should be
283            used for authentication
284        :param op: Which HTTP operation (GET/POST/PUT/DELETE)
285        :param opargs: Arguments to the HTTP call
286        :return: The page
287        """
288        sp_url = self._sp
289
290        # ********************************************
291        # Phase 1 - First conversation with the SP
292        # ********************************************
293        # headers needed to indicate to the SP that I'm ECP enabled
294
295        opargs["headers"] = self.add_paos_headers(opargs["headers"])
296        response = self.send(sp_url, op, **opargs)
297        logger.debug("[Op] SP response: %s" % response)
298        print(response.text)
299
300        if response.status_code != 200:
301            raise SAMLError(
302                "Request to SP failed: %s" % response.text)
303
304        # The response might be a AuthnRequest instance in a SOAP envelope
305        # body. If so it's the start of the ECP conversation
306        # Two SOAP header blocks; paos:Request and ecp:Request
307        # may also contain a ecp:RelayState SOAP header block
308        # If channel-binding was part of the PAOS header any number of
309        # <cb:ChannelBindings> header blocks may also be present
310        # if 'holder-of-key' option then one or more <ecp:SubjectConfirmation>
311        # header blocks may also be present
312        try:
313            respdict = self.parse_soap_message(response.text)
314            self.ecp_conversation(respdict, idp_entity_id)
315
316            # should by now be authenticated so this should go smoothly
317            response = self.send(url, op, **opargs)
318        except (soap.XmlParseError, AssertionError, KeyError):
319            raise
320
321        if response.status_code >= 400:
322            raise SAMLError("Error performing operation: %s" % (
323                response.text,))
324
325        return response
326
327    # different HTTP operations
328    def delete(self, url=None, idp_entity_id=None):
329        return self.operation(url, idp_entity_id, "DELETE")
330
331    def get(self, url=None, idp_entity_id=None, headers=None):
332        return self.operation(url, idp_entity_id, "GET", headers=headers)
333
334    def post(self, url=None, data="", idp_entity_id=None, headers=None):
335        return self.operation(url, idp_entity_id, "POST", data=data,
336                              headers=headers)
337
338    def put(self, url=None, data="", idp_entity_id=None, headers=None):
339        return self.operation(url, idp_entity_id, "PUT", data=data,
340                              headers=headers)
341