1# Copyright (C) 2008-2012 AG Projects. See LICENSE for details
2
3from hashlib import md5
4from time import time
5from base64 import b64encode, b64decode
6import random
7
8def get_random_data(length):
9    return ''.join(chr(random.randint(0, 255)) for x in xrange(length))
10
11class LoginFailed(Exception):
12    pass
13
14def calc_ha1(**parameters):
15    ha1_text = "%(username)s:%(realm)s:%(password)s" % parameters
16    return md5(ha1_text).hexdigest()
17
18def calc_ha2_response(**parameters):
19    ha2_text = "%(method)s:%(uri)s" % parameters
20    return md5(ha2_text).hexdigest()
21
22def calc_ha2_rspauth(**parameters):
23    ha2_text = ":%(uri)s" % parameters
24    return md5(ha2_text).hexdigest()
25
26def calc_hash(**parameters):
27    hash_text = "%(ha1)s:%(nonce)s:%(nc)s:%(cnonce)s:auth:%(ha2)s" % parameters
28    return md5(hash_text).hexdigest()
29
30def calc_responses(**parameters):
31    if parameters.has_key("ha1"):
32        ha1 = parameters.pop("ha1")
33    else:
34        ha1 = calc_ha1(**parameters)
35    ha2_response = calc_ha2_response(**parameters)
36    ha2_rspauth = calc_ha2_rspauth(**parameters)
37    response = calc_hash(ha1 = ha1, ha2 = ha2_response, **parameters)
38    rspauth = calc_hash(ha1 = ha1, ha2 = ha2_rspauth, **parameters)
39    return response, rspauth
40
41def process_www_authenticate(username, password, method, uri, **parameters):
42    nc = "00000001"
43    cnonce = get_random_data(16).encode("hex")
44    parameters["username"] = username
45    parameters["password"] = password
46    parameters["method"] = method
47    parameters["uri"] = uri
48    response, rsp_auth = calc_responses(nc = nc, cnonce = cnonce, **parameters)
49    authorization = {}
50    authorization["username"] = username
51    authorization["realm"] = parameters["realm"]
52    authorization["nonce"] = parameters["nonce"]
53    authorization["qop"] = "auth"
54    authorization["nc"] = nc
55    authorization["cnonce"] = cnonce
56    authorization["response"] = response
57    authorization["opaque"] = parameters["opaque"]
58    return authorization, rsp_auth
59
60class AuthChallenger(object):
61
62    def __init__(self, expire_time):
63        self.expire_time = expire_time
64        self.key = get_random_data(16)
65
66    def generate_www_authenticate(self, realm, peer_ip):
67        www_authenticate = {}
68        www_authenticate["realm"] = realm
69        www_authenticate["qop"] = "auth"
70        nonce = get_random_data(16) + "%.3f:%s" % (time(), peer_ip)
71        www_authenticate["nonce"] = b64encode(nonce)
72        opaque = md5(nonce + self.key)
73        www_authenticate["opaque"] = opaque.hexdigest()
74        return www_authenticate
75
76    def process_authorization_ha1(self, ha1, method, uri, peer_ip, **parameters):
77        parameters["method"] = method
78        parameters["uri"] = uri
79        try:
80            nonce = parameters["nonce"]
81            opaque = parameters["opaque"]
82            response = parameters["response"]
83        except IndexError, e:
84            raise LoginFailed("Parameter not present: %s", e.message)
85        try:
86            expected_response, rspauth = calc_responses(ha1 = ha1, **parameters)
87        except:
88            raise
89            #raise LoginFailed("Parameters error")
90        if response != expected_response:
91            raise LoginFailed("Incorrect password")
92        try:
93            nonce_dec = b64decode(nonce)
94            issued, nonce_ip = nonce_dec[16:].split(":", 1)
95            issued = float(issued)
96        except:
97            raise LoginFailed("Could not decode nonce")
98        if nonce_ip != peer_ip:
99            raise LoginFailed("This challenge was not issued to you")
100        expected_opaque = md5(nonce_dec + self.key).hexdigest()
101        if opaque != expected_opaque:
102            raise LoginFailed("This nonce/opaque combination was not issued by me")
103        if issued + self.expire_time < time():
104            raise LoginFailed("This challenge has expired")
105        authentication_info = {}
106        authentication_info["qop"] = "auth"
107        authentication_info["cnonce"] = parameters["cnonce"]
108        authentication_info["nc"] = parameters["nc"]
109        authentication_info["rspauth"] = rspauth
110        return authentication_info
111
112    def process_authorization_password(self, password, method, uri, peer_ip, **parameters):
113        ha1 = calc_ha1(password = password, **parameters)
114        return self.process_authorization_ha1(ha1, method, uri, peer_ip, **parameters)
115