1from datetime import timedelta 2 3from django.apps import apps 4from django.conf import settings 5from django.core.exceptions import ObjectDoesNotExist 6from django.db import models 7from django.utils import timezone 8from django.utils.functional import cached_property 9 10from .util import random_number_token 11 12 13class DeviceManager(models.Manager): 14 """ 15 The :class:`~django.db.models.Manager` object installed as 16 ``Device.objects``. 17 """ 18 def devices_for_user(self, user, confirmed=None): 19 """ 20 Returns a queryset for all devices of this class that belong to the 21 given user. 22 23 :param user: The user. 24 :type user: :class:`~django.contrib.auth.models.User` 25 26 :param confirmed: If ``None``, all matching devices are returned. 27 Otherwise, this can be any true or false value to limit the query 28 to confirmed or unconfirmed devices, respectively. 29 """ 30 devices = self.model.objects.filter(user=user) 31 if confirmed is not None: 32 devices = devices.filter(confirmed=bool(confirmed)) 33 34 return devices 35 36 37class Device(models.Model): 38 """ 39 Abstract base model for a :term:`device` attached to a user. Plugins must 40 subclass this to define their OTP models. 41 42 .. _unsaved_device_warning: 43 44 .. warning:: 45 46 OTP devices are inherently stateful. For example, verifying a token is 47 logically a mutating operation on the device, which may involve 48 incrementing a counter or otherwise consuming a token. A device must be 49 committed to the database before it can be used in any way. 50 51 .. attribute:: user 52 53 *ForeignKey*: Foreign key to your user model, as configured by 54 :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User` 55 by default). 56 57 .. attribute:: name 58 59 *CharField*: A human-readable name to help the user identify their 60 devices. 61 62 .. attribute:: confirmed 63 64 *BooleanField*: A boolean value that tells us whether this device has 65 been confirmed as valid. It defaults to ``True``, but subclasses or 66 individual deployments can force it to ``False`` if they wish to create 67 a device and then ask the user for confirmation. As a rule, built-in 68 APIs that enumerate devices will only include those that are confirmed. 69 70 .. attribute:: objects 71 72 A :class:`~django_otp.models.DeviceManager`. 73 """ 74 user = models.ForeignKey(getattr(settings, 'AUTH_USER_MODEL', 'auth.User'), help_text="The user that this device belongs to.", on_delete=models.CASCADE) 75 name = models.CharField(max_length=64, help_text="The human-readable name of this device.") 76 confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?") 77 78 objects = DeviceManager() 79 80 class Meta: 81 abstract = True 82 83 def __str__(self): 84 try: 85 user = self.user 86 except ObjectDoesNotExist: 87 user = None 88 89 return "{0} ({1})".format(self.name, user) 90 91 @property 92 def persistent_id(self): 93 return '{0}/{1}'.format(self.model_label(), self.id) 94 95 @classmethod 96 def model_label(cls): 97 """ 98 Returns an identifier for this Django model class. 99 100 This is just the standard "<app_label>.<model_name>" form. 101 102 """ 103 return '{0}.{1}'.format(cls._meta.app_label, cls._meta.model_name) 104 105 @classmethod 106 def from_persistent_id(cls, persistent_id): 107 """ 108 Loads a device from its persistent id:: 109 110 device == Device.from_persistent_id(device.persistent_id) 111 112 """ 113 device = None 114 115 try: 116 model_label, device_id = persistent_id.rsplit('/', 1) 117 app_label, model_name = model_label.split('.') 118 119 device_cls = apps.get_model(app_label, model_name) 120 if issubclass(device_cls, Device): 121 device = device_cls.objects.filter(id=int(device_id)).first() 122 except (ValueError, LookupError): 123 pass 124 125 return device 126 127 def is_interactive(self): 128 """ 129 Returns ``True`` if this is an interactive device. The default 130 implementation returns ``True`` if 131 :meth:`~django_otp.models.Device.generate_challenge` has been 132 overridden, but subclasses are welcome to provide smarter 133 implementations. 134 135 :rtype: bool 136 """ 137 return not hasattr(self.generate_challenge, 'stub') 138 139 def generate_challenge(self): 140 """ 141 Generates a challenge value that the user will need to produce a token. 142 This method is permitted to have side effects, such as transmitting 143 information to the user through some other channel (email or SMS, 144 perhaps). And, of course, some devices may need to commit the 145 challenge to the database. 146 147 :returns: A message to the user. This should be a string that fits 148 comfortably in the template ``'OTP Challenge: {0}'``. This may 149 return ``None`` if this device is not interactive. 150 :rtype: string or ``None`` 151 152 :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should 153 trap ``Exception`` and report it to the user. 154 """ 155 return None 156 generate_challenge.stub = True 157 158 def verify_is_allowed(self): 159 """ 160 Checks whether it is permissible to call :meth:`verify_token`. If it is 161 allowed, returns ``(True, None)``. Otherwise returns ``(False, 162 data_dict)``, where ``data_dict`` contains extra information, defined 163 by the implementation. 164 165 This method can be used to implement throttling or locking, for 166 example. Client code should check this method before calling 167 :meth:`verify_token` and report problems to the user. 168 169 To report specific problems, the data dictionary can return include a 170 ``'reason'`` member with a value from the constants in 171 :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member 172 should be provided with an error message. 173 174 :meth:`verify_token` should also call this method and return False if 175 verification is not allowed. 176 177 :rtype: (bool, dict or ``None``) 178 179 """ 180 return (True, None) 181 182 def verify_token(self, token): 183 """ 184 Verifies a token. As a rule, the token should no longer be valid if 185 this returns ``True``. 186 187 :param string token: The OTP token provided by the user. 188 :rtype: bool 189 """ 190 return False 191 192 193class SideChannelDevice(Device): 194 """ 195 Abstract base model for a side-channel :term:`device` attached to a user. 196 197 This model implements token generation, verification and expiration, so the 198 concrete devices only have to implement delivery. 199 200 """ 201 token = models.CharField(max_length=16, blank=True, null=True) 202 valid_until = models.DateTimeField( 203 default=timezone.now, 204 help_text="The timestamp of the moment of expiry of the saved token." 205 ) 206 207 class Meta: 208 abstract = True 209 210 def generate_token(self, length=6, valid_secs=300, commit=True): 211 """ 212 Generates a token of the specified length, then sets it on the model 213 and sets the expiration of the token on the model. 214 215 Pass 'commit=False' to avoid calling self.save(). 216 217 :param int length: Number of decimal digits in the generated token. 218 :param int valid_secs: Amount of seconds the token should be valid. 219 :param bool commit: Whether to autosave the generated token. 220 221 """ 222 self.token = random_number_token(length) 223 self.valid_until = timezone.now() + timedelta(seconds=valid_secs) 224 if commit: 225 self.save() 226 227 def verify_token(self, token): 228 """ 229 Verifies a token by content and expiry. 230 231 On success, the token is cleared and the device saved. 232 233 :param string token: The OTP token provided by the user. 234 :rtype: bool 235 236 """ 237 _now = timezone.now() 238 239 if (self.token is not None) and (token == self.token) and (_now < self.valid_until): 240 self.token = None 241 self.valid_until = _now 242 self.save() 243 244 return True 245 else: 246 return False 247 248 249class VerifyNotAllowed: 250 """ 251 Constants that may be returned in the ``reason`` member of the extra 252 information dictionary returned by 253 :meth:`~django_otp.models.Device.verify_is_allowed` 254 255 .. data:: N_FAILED_ATTEMPTS 256 257 Indicates that verification is disallowed because of ``n`` successive 258 failed attempts. The data dictionary should include the value of ``n`` 259 in member ``failure_count`` 260 261 """ 262 N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS' 263 264 265class ThrottlingMixin(models.Model): 266 """ 267 Mixin class for models that need throttling behaviour. Implements 268 exponential back-off. 269 """ 270 # This mixin is not publicly documented, but is used internally to avoid 271 # code duplication. Subclasses must implement get_throttle_factor(), and 272 # must use the verify_is_allowed(), throttle_reset() and 273 # throttle_increment() methods from within their verify_token() method. 274 throttling_failure_timestamp = models.DateTimeField( 275 null=True, blank=True, default=None, 276 help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded." 277 ) 278 throttling_failure_count = models.PositiveIntegerField( 279 default=0, help_text="Number of successive failed attempts." 280 ) 281 282 def verify_is_allowed(self): 283 """ 284 If verification is allowed, returns ``(True, None)``. 285 Otherwise, returns ``(False, data_dict)``. 286 287 ``data_dict`` contains further information. Currently it can be:: 288 289 {'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS, 290 'failure_count': n 291 } 292 293 where ``n`` is the number of successive failures. See 294 :class:`~django_otp.models.VerifyNotAllowed`. 295 """ 296 if (self.throttling_enabled and 297 self.throttling_failure_count > 0 and 298 self.throttling_failure_timestamp is not None): 299 now = timezone.now() 300 delay = (now - self.throttling_failure_timestamp).total_seconds() 301 # Required delays should be 1, 2, 4, 8 ... 302 delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1)) 303 if delay < delay_required: 304 return (False, 305 {'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS, 306 'failure_count': self.throttling_failure_count, 307 'locked_until': now + timedelta(seconds=delay_required)} 308 ) 309 310 return super().verify_is_allowed() 311 312 def throttle_reset(self, commit=True): 313 """ 314 Call this method to reset throttling (normally when a verify attempt 315 succeeded). 316 317 Pass 'commit=False' to avoid calling self.save(). 318 319 """ 320 self.throttling_failure_timestamp = None 321 self.throttling_failure_count = 0 322 if commit: 323 self.save() 324 325 def throttle_increment(self, commit=True): 326 """ 327 Call this method to increase throttling (normally when a verify attempt 328 failed). 329 330 Pass 'commit=False' to avoid calling self.save(). 331 332 """ 333 self.throttling_failure_timestamp = timezone.now() 334 self.throttling_failure_count += 1 335 if commit: 336 self.save() 337 338 @cached_property 339 def throttling_enabled(self): 340 return self.get_throttle_factor() > 0 341 342 def get_throttle_factor(self): # pragma: no cover 343 raise NotImplementedError() 344 345 class Meta: 346 abstract = True 347