1# -*- coding: utf-8 -*- 2import base64 3import functools 4import hmac 5import io 6import logging 7import os 8import re 9import struct 10import time 11 12import werkzeug.urls 13 14from odoo import _, api, fields, models 15from odoo.addons.base.models.res_users import check_identity 16from odoo.exceptions import AccessDenied, UserError 17from odoo.http import request, db_list 18 19_logger = logging.getLogger(__name__) 20 21compress = functools.partial(re.sub, r'\s', '') 22class Users(models.Model): 23 _inherit = 'res.users' 24 25 totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS) 26 totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled') 27 28 def __init__(self, pool, cr): 29 init_res = super().__init__(pool, cr) 30 type(self).SELF_READABLE_FIELDS = self.SELF_READABLE_FIELDS + ['totp_enabled'] 31 return init_res 32 33 def _mfa_url(self): 34 r = super()._mfa_url() 35 if r is not None: 36 return r 37 if self.totp_enabled: 38 return '/web/login/totp' 39 40 @api.depends('totp_secret') 41 def _compute_totp_enabled(self): 42 for r, v in zip(self, self.sudo()): 43 r.totp_enabled = bool(v.totp_secret) 44 45 def _rpc_api_keys_only(self): 46 # 2FA enabled means we can't allow password-based RPC 47 self.ensure_one() 48 return self.totp_enabled or super()._rpc_api_keys_only() 49 50 def _get_session_token_fields(self): 51 return super()._get_session_token_fields() | {'totp_secret'} 52 53 def _totp_check(self, code): 54 sudo = self.sudo() 55 key = base64.b32decode(sudo.totp_secret) 56 match = TOTP(key).match(code) 57 if match is None: 58 _logger.info("2FA check: FAIL for %s %r", self, self.login) 59 raise AccessDenied() 60 _logger.info("2FA check: SUCCESS for %s %r", self, self.login) 61 62 def _totp_try_setting(self, secret, code): 63 if self.totp_enabled or self != self.env.user: 64 _logger.info("2FA enable: REJECT for %s %r", self, self.login) 65 return False 66 67 secret = compress(secret).upper() 68 match = TOTP(base64.b32decode(secret)).match(code) 69 if match is None: 70 _logger.info("2FA enable: REJECT CODE for %s %r", self, self.login) 71 return False 72 73 self.sudo().totp_secret = secret 74 if request: 75 self.flush() 76 # update session token so the user does not get logged out (cache cleared by change) 77 new_token = self.env.user._compute_session_token(request.session.sid) 78 request.session.session_token = new_token 79 80 _logger.info("2FA enable: SUCCESS for %s %r", self, self.login) 81 return True 82 83 @check_identity 84 def totp_disable(self): 85 logins = ', '.join(map(repr, self.mapped('login'))) 86 if not (self == self.env.user or self.env.user._is_admin() or self.env.su): 87 _logger.info("2FA disable: REJECT for %s (%s) by uid #%s", self, logins, self.env.user.id) 88 return False 89 90 self.sudo().write({'totp_secret': False}) 91 if request and self == self.env.user: 92 self.flush() 93 # update session token so the user does not get logged out (cache cleared by change) 94 new_token = self.env.user._compute_session_token(request.session.sid) 95 request.session.session_token = new_token 96 97 _logger.info("2FA disable: SUCCESS for %s (%s) by uid #%s", self, logins, self.env.user.id) 98 return { 99 'type': 'ir.actions.client', 100 'tag': 'display_notification', 101 'params': { 102 'type': 'warning', 103 'message': _("Two-factor authentication disabled for user(s) %s", logins), 104 'next': {'type': 'ir.actions.act_window_close'}, 105 } 106 } 107 108 @check_identity 109 def totp_enable_wizard(self): 110 if self.env.user != self: 111 raise UserError(_("Two-factor authentication can only be enabled for yourself")) 112 113 if self.totp_enabled: 114 raise UserError(_("Two-factor authentication already enabled")) 115 116 secret_bytes_count = TOTP_SECRET_SIZE // 8 117 secret = base64.b32encode(os.urandom(secret_bytes_count)).decode() 118 # format secret in groups of 4 characters for readability 119 secret = ' '.join(map(''.join, zip(*[iter(secret)]*4))) 120 w = self.env['auth_totp.wizard'].create({ 121 'user_id': self.id, 122 'secret': secret, 123 }) 124 return { 125 'type': 'ir.actions.act_window', 126 'target': 'new', 127 'res_model': 'auth_totp.wizard', 128 'name': _("Enable Two-Factor Authentication"), 129 'res_id': w.id, 130 'views': [(False, 'form')], 131 } 132 133class TOTPWizard(models.TransientModel): 134 _name = 'auth_totp.wizard' 135 _description = "Two-Factor Setup Wizard" 136 137 user_id = fields.Many2one('res.users', required=True, readonly=True) 138 secret = fields.Char(required=True, readonly=True) 139 url = fields.Char(store=True, readonly=True, compute='_compute_qrcode') 140 qrcode = fields.Binary( 141 attachment=False, store=True, readonly=True, 142 compute='_compute_qrcode', 143 ) 144 code = fields.Char(string="Verification Code", size=7) 145 146 @api.depends('user_id.login', 'user_id.company_id.display_name', 'secret') 147 def _compute_qrcode(self): 148 # TODO: make "issuer" configurable through config parameter? 149 global_issuer = request and request.httprequest.host.split(':', 1)[0] 150 for w in self: 151 issuer = global_issuer or w.user_id.company_id.display_name 152 w.url = url = werkzeug.urls.url_unparse(( 153 'otpauth', 'totp', 154 werkzeug.urls.url_quote(f'{issuer}:{w.user_id.login}', safe=':'), 155 werkzeug.urls.url_encode({ 156 'secret': compress(w.secret), 157 'issuer': issuer, 158 # apparently a lowercase hash name is anathema to google 159 # authenticator (error) and passlib (no token) 160 'algorithm': ALGORITHM.upper(), 161 'digits': DIGITS, 162 'period': TIMESTEP, 163 }), '' 164 )) 165 166 data = io.BytesIO() 167 import qrcode 168 qrcode.make(url.encode(), box_size=4).save(data, optimise=True, format='PNG') 169 w.qrcode = base64.b64encode(data.getvalue()).decode() 170 171 @check_identity 172 def enable(self): 173 try: 174 c = int(compress(self.code)) 175 except ValueError: 176 raise UserError(_("The verification code should only contain numbers")) 177 if self.user_id._totp_try_setting(self.secret, c): 178 self.secret = '' # empty it, because why keep it until GC? 179 return { 180 'type': 'ir.actions.client', 181 'tag': 'display_notification', 182 'params': { 183 'type': 'success', 184 'message': _("Two-factor authentication is now enabled."), 185 'next': {'type': 'ir.actions.act_window_close'}, 186 } 187 } 188 raise UserError(_('Verification failed, please double-check the 6-digit code')) 189 190# 160 bits, as recommended by HOTP RFC 4226, section 4, R6. 191# Google Auth uses 80 bits by default but supports 160. 192TOTP_SECRET_SIZE = 160 193 194# The algorithm (and key URI format) allows customising these parameters but 195# google authenticator doesn't support it 196# https://github.com/google/google-authenticator/wiki/Key-Uri-Format 197ALGORITHM = 'sha1' 198DIGITS = 6 199TIMESTEP = 30 200 201class TOTP: 202 def __init__(self, key): 203 self._key = key 204 205 def match(self, code, t=None, window=TIMESTEP): 206 """ 207 :param code: authenticator code to check against this key 208 :param int t: current timestamp (seconds) 209 :param int window: fuzz window to account for slow fingers, network 210 latency, desynchronised clocks, ..., every code 211 valid between t-window an t+window is considered 212 valid 213 """ 214 if t is None: 215 t = time.time() 216 217 low = int((t - window) / TIMESTEP) 218 high = int((t + window) / TIMESTEP) + 1 219 220 return next(( 221 counter for counter in range(low, high) 222 if hotp(self._key, counter) == code 223 ), None) 224 225def hotp(secret, counter): 226 # C is the 64b counter encoded in big-endian 227 C = struct.pack(">Q", counter) 228 mac = hmac.new(secret, msg=C, digestmod=ALGORITHM).digest() 229 # the data offset is the last nibble of the hash 230 offset = mac[-1] & 0xF 231 # code is the 4 bytes at the offset interpreted as a 31b big-endian uint 232 # (31b to avoid sign concerns). This effectively limits digits to 9 and 233 # hard-limits it to 10: each digit is normally worth 3.32 bits but the 234 # 10th is only worth 1.1 (9 digits encode 29.9 bits). 235 code = struct.unpack_from('>I', mac, offset)[0] & 0x7FFFFFFF 236 r = code % (10 ** DIGITS) 237 # NOTE: use text / bytes instead of int? 238 return r 239