1# Copyright (C) 2008-2012 AG Projects. See LICENSE for details 2 3from hashlib import md5 4from time import time 5from base64 import b64encode, b64decode 6import random 7 8def get_random_data(length): 9 return ''.join(chr(random.randint(0, 255)) for x in xrange(length)) 10 11class LoginFailed(Exception): 12 pass 13 14def calc_ha1(**parameters): 15 ha1_text = "%(username)s:%(realm)s:%(password)s" % parameters 16 return md5(ha1_text).hexdigest() 17 18def calc_ha2_response(**parameters): 19 ha2_text = "%(method)s:%(uri)s" % parameters 20 return md5(ha2_text).hexdigest() 21 22def calc_ha2_rspauth(**parameters): 23 ha2_text = ":%(uri)s" % parameters 24 return md5(ha2_text).hexdigest() 25 26def calc_hash(**parameters): 27 hash_text = "%(ha1)s:%(nonce)s:%(nc)s:%(cnonce)s:auth:%(ha2)s" % parameters 28 return md5(hash_text).hexdigest() 29 30def calc_responses(**parameters): 31 if parameters.has_key("ha1"): 32 ha1 = parameters.pop("ha1") 33 else: 34 ha1 = calc_ha1(**parameters) 35 ha2_response = calc_ha2_response(**parameters) 36 ha2_rspauth = calc_ha2_rspauth(**parameters) 37 response = calc_hash(ha1 = ha1, ha2 = ha2_response, **parameters) 38 rspauth = calc_hash(ha1 = ha1, ha2 = ha2_rspauth, **parameters) 39 return response, rspauth 40 41def process_www_authenticate(username, password, method, uri, **parameters): 42 nc = "00000001" 43 cnonce = get_random_data(16).encode("hex") 44 parameters["username"] = username 45 parameters["password"] = password 46 parameters["method"] = method 47 parameters["uri"] = uri 48 response, rsp_auth = calc_responses(nc = nc, cnonce = cnonce, **parameters) 49 authorization = {} 50 authorization["username"] = username 51 authorization["realm"] = parameters["realm"] 52 authorization["nonce"] = parameters["nonce"] 53 authorization["qop"] = "auth" 54 authorization["nc"] = nc 55 authorization["cnonce"] = cnonce 56 authorization["response"] = response 57 authorization["opaque"] = parameters["opaque"] 58 return authorization, rsp_auth 59 60class AuthChallenger(object): 61 62 def __init__(self, expire_time): 63 self.expire_time = expire_time 64 self.key = get_random_data(16) 65 66 def generate_www_authenticate(self, realm, peer_ip): 67 www_authenticate = {} 68 www_authenticate["realm"] = realm 69 www_authenticate["qop"] = "auth" 70 nonce = get_random_data(16) + "%.3f:%s" % (time(), peer_ip) 71 www_authenticate["nonce"] = b64encode(nonce) 72 opaque = md5(nonce + self.key) 73 www_authenticate["opaque"] = opaque.hexdigest() 74 return www_authenticate 75 76 def process_authorization_ha1(self, ha1, method, uri, peer_ip, **parameters): 77 parameters["method"] = method 78 parameters["uri"] = uri 79 try: 80 nonce = parameters["nonce"] 81 opaque = parameters["opaque"] 82 response = parameters["response"] 83 except IndexError, e: 84 raise LoginFailed("Parameter not present: %s", e.message) 85 try: 86 expected_response, rspauth = calc_responses(ha1 = ha1, **parameters) 87 except: 88 raise 89 #raise LoginFailed("Parameters error") 90 if response != expected_response: 91 raise LoginFailed("Incorrect password") 92 try: 93 nonce_dec = b64decode(nonce) 94 issued, nonce_ip = nonce_dec[16:].split(":", 1) 95 issued = float(issued) 96 except: 97 raise LoginFailed("Could not decode nonce") 98 if nonce_ip != peer_ip: 99 raise LoginFailed("This challenge was not issued to you") 100 expected_opaque = md5(nonce_dec + self.key).hexdigest() 101 if opaque != expected_opaque: 102 raise LoginFailed("This nonce/opaque combination was not issued by me") 103 if issued + self.expire_time < time(): 104 raise LoginFailed("This challenge has expired") 105 authentication_info = {} 106 authentication_info["qop"] = "auth" 107 authentication_info["cnonce"] = parameters["cnonce"] 108 authentication_info["nc"] = parameters["nc"] 109 authentication_info["rspauth"] = rspauth 110 return authentication_info 111 112 def process_authorization_password(self, password, method, uri, peer_ip, **parameters): 113 ha1 = calc_ha1(password = password, **parameters) 114 return self.process_authorization_ha1(ha1, method, uri, peer_ip, **parameters) 115