1from datetime import timedelta
2
3from django.apps import apps
4from django.conf import settings
5from django.core.exceptions import ObjectDoesNotExist
6from django.db import models
7from django.utils import timezone
8from django.utils.functional import cached_property
9
10from .util import random_number_token
11
12
13class DeviceManager(models.Manager):
14    """
15    The :class:`~django.db.models.Manager` object installed as
16    ``Device.objects``.
17    """
18    def devices_for_user(self, user, confirmed=None):
19        """
20        Returns a queryset for all devices of this class that belong to the
21        given user.
22
23        :param user: The user.
24        :type user: :class:`~django.contrib.auth.models.User`
25
26        :param confirmed: If ``None``, all matching devices are returned.
27            Otherwise, this can be any true or false value to limit the query
28            to confirmed or unconfirmed devices, respectively.
29        """
30        devices = self.model.objects.filter(user=user)
31        if confirmed is not None:
32            devices = devices.filter(confirmed=bool(confirmed))
33
34        return devices
35
36
37class Device(models.Model):
38    """
39    Abstract base model for a :term:`device` attached to a user. Plugins must
40    subclass this to define their OTP models.
41
42    .. _unsaved_device_warning:
43
44    .. warning::
45
46        OTP devices are inherently stateful. For example, verifying a token is
47        logically a mutating operation on the device, which may involve
48        incrementing a counter or otherwise consuming a token. A device must be
49        committed to the database before it can be used in any way.
50
51    .. attribute:: user
52
53        *ForeignKey*: Foreign key to your user model, as configured by
54        :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
55        by default).
56
57    .. attribute:: name
58
59        *CharField*: A human-readable name to help the user identify their
60        devices.
61
62    .. attribute:: confirmed
63
64        *BooleanField*: A boolean value that tells us whether this device has
65        been confirmed as valid. It defaults to ``True``, but subclasses or
66        individual deployments can force it to ``False`` if they wish to create
67        a device and then ask the user for confirmation. As a rule, built-in
68        APIs that enumerate devices will only include those that are confirmed.
69
70    .. attribute:: objects
71
72        A :class:`~django_otp.models.DeviceManager`.
73    """
74    user = models.ForeignKey(getattr(settings, 'AUTH_USER_MODEL', 'auth.User'), help_text="The user that this device belongs to.", on_delete=models.CASCADE)
75    name = models.CharField(max_length=64, help_text="The human-readable name of this device.")
76    confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?")
77
78    objects = DeviceManager()
79
80    class Meta:
81        abstract = True
82
83    def __str__(self):
84        try:
85            user = self.user
86        except ObjectDoesNotExist:
87            user = None
88
89        return "{0} ({1})".format(self.name, user)
90
91    @property
92    def persistent_id(self):
93        return '{0}/{1}'.format(self.model_label(), self.id)
94
95    @classmethod
96    def model_label(cls):
97        """
98        Returns an identifier for this Django model class.
99
100        This is just the standard "<app_label>.<model_name>" form.
101
102        """
103        return '{0}.{1}'.format(cls._meta.app_label, cls._meta.model_name)
104
105    @classmethod
106    def from_persistent_id(cls, persistent_id):
107        """
108        Loads a device from its persistent id::
109
110            device == Device.from_persistent_id(device.persistent_id)
111
112        """
113        device = None
114
115        try:
116            model_label, device_id = persistent_id.rsplit('/', 1)
117            app_label, model_name = model_label.split('.')
118
119            device_cls = apps.get_model(app_label, model_name)
120            if issubclass(device_cls, Device):
121                device = device_cls.objects.filter(id=int(device_id)).first()
122        except (ValueError, LookupError):
123            pass
124
125        return device
126
127    def is_interactive(self):
128        """
129        Returns ``True`` if this is an interactive device. The default
130        implementation returns ``True`` if
131        :meth:`~django_otp.models.Device.generate_challenge` has been
132        overridden, but subclasses are welcome to provide smarter
133        implementations.
134
135        :rtype: bool
136        """
137        return not hasattr(self.generate_challenge, 'stub')
138
139    def generate_challenge(self):
140        """
141        Generates a challenge value that the user will need to produce a token.
142        This method is permitted to have side effects, such as transmitting
143        information to the user through some other channel (email or SMS,
144        perhaps). And, of course, some devices may need to commit the
145        challenge to the database.
146
147        :returns: A message to the user. This should be a string that fits
148            comfortably in the template ``'OTP Challenge: {0}'``. This may
149            return ``None`` if this device is not interactive.
150        :rtype: string or ``None``
151
152        :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should
153            trap ``Exception`` and report it to the user.
154        """
155        return None
156    generate_challenge.stub = True
157
158    def verify_is_allowed(self):
159        """
160        Checks whether it is permissible to call :meth:`verify_token`. If it is
161        allowed, returns ``(True, None)``. Otherwise returns ``(False,
162        data_dict)``, where ``data_dict`` contains extra information, defined
163        by the implementation.
164
165        This method can be used to implement throttling or locking, for
166        example. Client code should check this method before calling
167        :meth:`verify_token` and report problems to the user.
168
169        To report specific problems, the data dictionary can return include a
170        ``'reason'`` member with a value from the constants in
171        :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member
172        should be provided with an error message.
173
174        :meth:`verify_token` should also call this method and return False if
175        verification is not allowed.
176
177        :rtype: (bool, dict or ``None``)
178
179        """
180        return (True, None)
181
182    def verify_token(self, token):
183        """
184        Verifies a token. As a rule, the token should no longer be valid if
185        this returns ``True``.
186
187        :param string token: The OTP token provided by the user.
188        :rtype: bool
189        """
190        return False
191
192
193class SideChannelDevice(Device):
194    """
195    Abstract base model for a side-channel :term:`device` attached to a user.
196
197    This model implements token generation, verification and expiration, so the
198    concrete devices only have to implement delivery.
199
200    """
201    token = models.CharField(max_length=16, blank=True, null=True)
202    valid_until = models.DateTimeField(
203        default=timezone.now,
204        help_text="The timestamp of the moment of expiry of the saved token."
205    )
206
207    class Meta:
208        abstract = True
209
210    def generate_token(self, length=6, valid_secs=300, commit=True):
211        """
212        Generates a token of the specified length, then sets it on the model
213        and sets the expiration of the token on the model.
214
215        Pass 'commit=False' to avoid calling self.save().
216
217        :param int length: Number of decimal digits in the generated token.
218        :param int valid_secs: Amount of seconds the token should be valid.
219        :param bool commit: Whether to autosave the generated token.
220
221        """
222        self.token = random_number_token(length)
223        self.valid_until = timezone.now() + timedelta(seconds=valid_secs)
224        if commit:
225            self.save()
226
227    def verify_token(self, token):
228        """
229        Verifies a token by content and expiry.
230
231        On success, the token is cleared and the device saved.
232
233        :param string token: The OTP token provided by the user.
234        :rtype: bool
235
236        """
237        _now = timezone.now()
238
239        if (self.token is not None) and (token == self.token) and (_now < self.valid_until):
240            self.token = None
241            self.valid_until = _now
242            self.save()
243
244            return True
245        else:
246            return False
247
248
249class VerifyNotAllowed:
250    """
251    Constants that may be returned in the ``reason`` member of the extra
252    information dictionary returned by
253    :meth:`~django_otp.models.Device.verify_is_allowed`
254
255    .. data:: N_FAILED_ATTEMPTS
256
257       Indicates that verification is disallowed because of ``n`` successive
258       failed attempts. The data dictionary should include the value of ``n``
259       in member ``failure_count``
260
261    """
262    N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS'
263
264
265class ThrottlingMixin(models.Model):
266    """
267    Mixin class for models that need throttling behaviour. Implements
268    exponential back-off.
269    """
270    # This mixin is not publicly documented, but is used internally to avoid
271    # code duplication. Subclasses must implement get_throttle_factor(), and
272    # must use the verify_is_allowed(), throttle_reset() and
273    # throttle_increment() methods from within their verify_token() method.
274    throttling_failure_timestamp = models.DateTimeField(
275        null=True, blank=True, default=None,
276        help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded."
277    )
278    throttling_failure_count = models.PositiveIntegerField(
279        default=0, help_text="Number of successive failed attempts."
280    )
281
282    def verify_is_allowed(self):
283        """
284        If verification is allowed, returns ``(True, None)``.
285        Otherwise, returns ``(False, data_dict)``.
286
287        ``data_dict`` contains further information. Currently it can be::
288
289            {'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
290             'failure_count': n
291            }
292
293        where ``n`` is the number of successive failures. See
294        :class:`~django_otp.models.VerifyNotAllowed`.
295        """
296        if (self.throttling_enabled and
297                self.throttling_failure_count > 0 and
298                self.throttling_failure_timestamp is not None):
299            now = timezone.now()
300            delay = (now - self.throttling_failure_timestamp).total_seconds()
301            # Required delays should be 1, 2, 4, 8 ...
302            delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1))
303            if delay < delay_required:
304                return (False,
305                        {'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
306                         'failure_count': self.throttling_failure_count,
307                         'locked_until': now + timedelta(seconds=delay_required)}
308                        )
309
310        return super().verify_is_allowed()
311
312    def throttle_reset(self, commit=True):
313        """
314        Call this method to reset throttling (normally when a verify attempt
315        succeeded).
316
317        Pass 'commit=False' to avoid calling self.save().
318
319        """
320        self.throttling_failure_timestamp = None
321        self.throttling_failure_count = 0
322        if commit:
323            self.save()
324
325    def throttle_increment(self, commit=True):
326        """
327        Call this method to increase throttling (normally when a verify attempt
328        failed).
329
330        Pass 'commit=False' to avoid calling self.save().
331
332        """
333        self.throttling_failure_timestamp = timezone.now()
334        self.throttling_failure_count += 1
335        if commit:
336            self.save()
337
338    @cached_property
339    def throttling_enabled(self):
340        return self.get_throttle_factor() > 0
341
342    def get_throttle_factor(self):  # pragma: no cover
343        raise NotImplementedError()
344
345    class Meta:
346        abstract = True
347