1# -*- coding: utf-8 -*-
2import base64
3import functools
4import hmac
5import io
6import logging
7import os
8import re
9import struct
10import time
11
12import werkzeug.urls
13
14from odoo import _, api, fields, models
15from odoo.addons.base.models.res_users import check_identity
16from odoo.exceptions import AccessDenied, UserError
17from odoo.http import request, db_list
18
19_logger = logging.getLogger(__name__)
20
21compress = functools.partial(re.sub, r'\s', '')
22class Users(models.Model):
23    _inherit = 'res.users'
24
25    totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS)
26    totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled')
27
28    def __init__(self, pool, cr):
29        init_res = super().__init__(pool, cr)
30        type(self).SELF_READABLE_FIELDS = self.SELF_READABLE_FIELDS + ['totp_enabled']
31        return init_res
32
33    def _mfa_url(self):
34        r = super()._mfa_url()
35        if r is not None:
36            return r
37        if self.totp_enabled:
38            return '/web/login/totp'
39
40    @api.depends('totp_secret')
41    def _compute_totp_enabled(self):
42        for r, v in zip(self, self.sudo()):
43            r.totp_enabled = bool(v.totp_secret)
44
45    def _rpc_api_keys_only(self):
46        # 2FA enabled means we can't allow password-based RPC
47        self.ensure_one()
48        return self.totp_enabled or super()._rpc_api_keys_only()
49
50    def _get_session_token_fields(self):
51        return super()._get_session_token_fields() | {'totp_secret'}
52
53    def _totp_check(self, code):
54        sudo = self.sudo()
55        key = base64.b32decode(sudo.totp_secret)
56        match = TOTP(key).match(code)
57        if match is None:
58            _logger.info("2FA check: FAIL for %s %r", self, self.login)
59            raise AccessDenied()
60        _logger.info("2FA check: SUCCESS for %s %r", self, self.login)
61
62    def _totp_try_setting(self, secret, code):
63        if self.totp_enabled or self != self.env.user:
64            _logger.info("2FA enable: REJECT for %s %r", self, self.login)
65            return False
66
67        secret = compress(secret).upper()
68        match = TOTP(base64.b32decode(secret)).match(code)
69        if match is None:
70            _logger.info("2FA enable: REJECT CODE for %s %r", self, self.login)
71            return False
72
73        self.sudo().totp_secret = secret
74        if request:
75            self.flush()
76            # update session token so the user does not get logged out (cache cleared by change)
77            new_token = self.env.user._compute_session_token(request.session.sid)
78            request.session.session_token = new_token
79
80        _logger.info("2FA enable: SUCCESS for %s %r", self, self.login)
81        return True
82
83    @check_identity
84    def totp_disable(self):
85        logins = ', '.join(map(repr, self.mapped('login')))
86        if not (self == self.env.user or self.env.user._is_admin() or self.env.su):
87            _logger.info("2FA disable: REJECT for %s (%s) by uid #%s", self, logins, self.env.user.id)
88            return False
89
90        self.sudo().write({'totp_secret': False})
91        if request and self == self.env.user:
92            self.flush()
93            # update session token so the user does not get logged out (cache cleared by change)
94            new_token = self.env.user._compute_session_token(request.session.sid)
95            request.session.session_token = new_token
96
97        _logger.info("2FA disable: SUCCESS for %s (%s) by uid #%s", self, logins, self.env.user.id)
98        return {
99            'type': 'ir.actions.client',
100            'tag': 'display_notification',
101            'params': {
102                'type': 'warning',
103                'message': _("Two-factor authentication disabled for user(s) %s", logins),
104                'next': {'type': 'ir.actions.act_window_close'},
105            }
106        }
107
108    @check_identity
109    def totp_enable_wizard(self):
110        if self.env.user != self:
111            raise UserError(_("Two-factor authentication can only be enabled for yourself"))
112
113        if self.totp_enabled:
114            raise UserError(_("Two-factor authentication already enabled"))
115
116        secret_bytes_count = TOTP_SECRET_SIZE // 8
117        secret = base64.b32encode(os.urandom(secret_bytes_count)).decode()
118        # format secret in groups of 4 characters for readability
119        secret = ' '.join(map(''.join, zip(*[iter(secret)]*4)))
120        w = self.env['auth_totp.wizard'].create({
121            'user_id': self.id,
122            'secret': secret,
123        })
124        return {
125            'type': 'ir.actions.act_window',
126            'target': 'new',
127            'res_model': 'auth_totp.wizard',
128            'name': _("Enable Two-Factor Authentication"),
129            'res_id': w.id,
130            'views': [(False, 'form')],
131        }
132
133class TOTPWizard(models.TransientModel):
134    _name = 'auth_totp.wizard'
135    _description = "Two-Factor Setup Wizard"
136
137    user_id = fields.Many2one('res.users', required=True, readonly=True)
138    secret = fields.Char(required=True, readonly=True)
139    url = fields.Char(store=True, readonly=True, compute='_compute_qrcode')
140    qrcode = fields.Binary(
141        attachment=False, store=True, readonly=True,
142        compute='_compute_qrcode',
143    )
144    code = fields.Char(string="Verification Code", size=7)
145
146    @api.depends('user_id.login', 'user_id.company_id.display_name', 'secret')
147    def _compute_qrcode(self):
148        # TODO: make "issuer" configurable through config parameter?
149        global_issuer = request and request.httprequest.host.split(':', 1)[0]
150        for w in self:
151            issuer = global_issuer or w.user_id.company_id.display_name
152            w.url = url = werkzeug.urls.url_unparse((
153                'otpauth', 'totp',
154                werkzeug.urls.url_quote(f'{issuer}:{w.user_id.login}', safe=':'),
155                werkzeug.urls.url_encode({
156                    'secret': compress(w.secret),
157                    'issuer': issuer,
158                    # apparently a lowercase hash name is anathema to google
159                    # authenticator (error) and passlib (no token)
160                    'algorithm': ALGORITHM.upper(),
161                    'digits': DIGITS,
162                    'period': TIMESTEP,
163                }), ''
164            ))
165
166            data = io.BytesIO()
167            import qrcode
168            qrcode.make(url.encode(), box_size=4).save(data, optimise=True, format='PNG')
169            w.qrcode = base64.b64encode(data.getvalue()).decode()
170
171    @check_identity
172    def enable(self):
173        try:
174            c = int(compress(self.code))
175        except ValueError:
176            raise UserError(_("The verification code should only contain numbers"))
177        if self.user_id._totp_try_setting(self.secret, c):
178            self.secret = '' # empty it, because why keep it until GC?
179            return {
180                'type': 'ir.actions.client',
181                'tag': 'display_notification',
182                'params': {
183                    'type': 'success',
184                    'message': _("Two-factor authentication is now enabled."),
185                    'next': {'type': 'ir.actions.act_window_close'},
186                }
187            }
188        raise UserError(_('Verification failed, please double-check the 6-digit code'))
189
190# 160 bits, as recommended by HOTP RFC 4226, section 4, R6.
191# Google Auth uses 80 bits by default but supports 160.
192TOTP_SECRET_SIZE = 160
193
194# The algorithm (and key URI format) allows customising these parameters but
195# google authenticator doesn't support it
196# https://github.com/google/google-authenticator/wiki/Key-Uri-Format
197ALGORITHM = 'sha1'
198DIGITS = 6
199TIMESTEP = 30
200
201class TOTP:
202    def __init__(self, key):
203        self._key = key
204
205    def match(self, code, t=None, window=TIMESTEP):
206        """
207        :param code: authenticator code to check against this key
208        :param int t: current timestamp (seconds)
209        :param int window: fuzz window to account for slow fingers, network
210                           latency, desynchronised clocks, ..., every code
211                           valid between t-window an t+window is considered
212                           valid
213        """
214        if t is None:
215            t = time.time()
216
217        low = int((t - window) / TIMESTEP)
218        high = int((t + window) / TIMESTEP) + 1
219
220        return next((
221            counter for counter in range(low, high)
222            if hotp(self._key, counter) == code
223        ), None)
224
225def hotp(secret, counter):
226    # C is the 64b counter encoded in big-endian
227    C = struct.pack(">Q", counter)
228    mac = hmac.new(secret, msg=C, digestmod=ALGORITHM).digest()
229    # the data offset is the last nibble of the hash
230    offset = mac[-1] & 0xF
231    # code is the 4 bytes at the offset interpreted as a 31b big-endian uint
232    # (31b to avoid sign concerns). This effectively limits digits to 9 and
233    # hard-limits it to 10: each digit is normally worth 3.32 bits but the
234    # 10th is only worth 1.1 (9 digits encode 29.9 bits).
235    code = struct.unpack_from('>I', mac, offset)[0] & 0x7FFFFFFF
236    r = code % (10 ** DIGITS)
237    # NOTE: use text / bytes instead of int?
238    return r
239