1from __future__ import unicode_literals 2 3import copy 4import importlib 5import threading 6import uuid 7import warnings 8 9import django 10import six 11from django.apps import apps 12from django.conf import settings 13from django.contrib import admin 14from django.contrib.auth import get_user_model 15from django.core.exceptions import ObjectDoesNotExist 16from django.db import models 17from django.db.models import ManyToManyField, Q 18from django.db.models.fields.proxy import OrderWrt 19from django.forms.models import model_to_dict 20from django.urls import reverse 21from django.utils.text import format_lazy 22from django.utils import timezone 23from simple_history import utils 24from . import exceptions 25from .manager import HistoryDescriptor 26from .signals import post_create_historical_record, pre_create_historical_record 27from .utils import get_change_reason_from_object 28 29if django.VERSION < (2,): 30 from django.utils.translation import ugettext_lazy as _ 31 from django.utils.encoding import smart_text as smart_str 32 from django.utils.encoding import python_2_unicode_compatible 33else: 34 from django.utils.translation import gettext_lazy as _ 35 from django.utils.encoding import smart_str 36 37registered_models = {} 38 39 40def _default_get_user(request, **kwargs): 41 try: 42 return request.user 43 except AttributeError: 44 return None 45 46 47def _history_user_getter(historical_instance): 48 if historical_instance.history_user_id is None: 49 return None 50 User = get_user_model() 51 try: 52 return User.objects.get(pk=historical_instance.history_user_id) 53 except User.DoesNotExist: 54 return None 55 56 57def _history_user_setter(historical_instance, user): 58 if user is not None: 59 historical_instance.history_user_id = user.pk 60 61 62class HistoricalRecords(object): 63 thread = threading.local() 64 65 def __init__( 66 self, 67 verbose_name=None, 68 bases=(models.Model,), 69 user_related_name="+", 70 table_name=None, 71 inherit=False, 72 excluded_fields=None, 73 history_id_field=None, 74 history_change_reason_field=None, 75 user_model=None, 76 get_user=_default_get_user, 77 cascade_delete_history=False, 78 custom_model_name=None, 79 app=None, 80 history_user_id_field=None, 81 history_user_getter=_history_user_getter, 82 history_user_setter=_history_user_setter, 83 related_name=None, 84 use_base_model_db=False, 85 user_db_constraint=True, 86 ): 87 self.user_set_verbose_name = verbose_name 88 self.user_related_name = user_related_name 89 self.user_db_constraint = user_db_constraint 90 self.table_name = table_name 91 self.inherit = inherit 92 self.history_id_field = history_id_field 93 self.history_change_reason_field = history_change_reason_field 94 self.user_model = user_model 95 self.get_user = get_user 96 self.cascade_delete_history = cascade_delete_history 97 self.custom_model_name = custom_model_name 98 self.app = app 99 self.user_id_field = history_user_id_field 100 self.user_getter = history_user_getter 101 self.user_setter = history_user_setter 102 self.related_name = related_name 103 self.use_base_model_db = use_base_model_db 104 105 if excluded_fields is None: 106 excluded_fields = [] 107 self.excluded_fields = excluded_fields 108 try: 109 if isinstance(bases, six.string_types): 110 raise TypeError 111 self.bases = (HistoricalChanges,) + tuple(bases) 112 except TypeError: 113 raise TypeError("The `bases` option must be a list or a tuple.") 114 115 def contribute_to_class(self, cls, name): 116 self.manager_name = name 117 self.module = cls.__module__ 118 self.cls = cls 119 models.signals.class_prepared.connect(self.finalize, weak=False) 120 self.add_extra_methods(cls) 121 122 if cls._meta.abstract and not self.inherit: 123 msg = ( 124 "HistoricalRecords added to abstract model ({}) without " 125 "inherit=True".format(self.cls.__name__) 126 ) 127 warnings.warn(msg, UserWarning) 128 129 def add_extra_methods(self, cls): 130 def save_without_historical_record(self, *args, **kwargs): 131 """ 132 Save model without saving a historical record 133 134 Make sure you know what you're doing before you use this method. 135 """ 136 self.skip_history_when_saving = True 137 try: 138 ret = self.save(*args, **kwargs) 139 finally: 140 del self.skip_history_when_saving 141 return ret 142 143 setattr(cls, "save_without_historical_record", save_without_historical_record) 144 145 def finalize(self, sender, **kwargs): 146 inherited = False 147 if self.cls is not sender: # set in concrete 148 inherited = self.inherit and issubclass(sender, self.cls) 149 if not inherited: 150 return # set in abstract 151 152 if hasattr(sender._meta, "simple_history_manager_attribute"): 153 raise exceptions.MultipleRegistrationsError( 154 "{}.{} registered multiple times for history tracking.".format( 155 sender._meta.app_label, sender._meta.object_name 156 ) 157 ) 158 history_model = self.create_history_model(sender, inherited) 159 if inherited: 160 # Make sure history model is in same module as concrete model 161 module = importlib.import_module(history_model.__module__) 162 else: 163 module = importlib.import_module(self.module) 164 setattr(module, history_model.__name__, history_model) 165 166 # The HistoricalRecords object will be discarded, 167 # so the signal handlers can't use weak references. 168 models.signals.post_save.connect(self.post_save, sender=sender, weak=False) 169 models.signals.post_delete.connect(self.post_delete, sender=sender, weak=False) 170 171 descriptor = HistoryDescriptor(history_model) 172 setattr(sender, self.manager_name, descriptor) 173 sender._meta.simple_history_manager_attribute = self.manager_name 174 175 def get_history_model_name(self, model): 176 if not self.custom_model_name: 177 return "Historical{}".format(model._meta.object_name) 178 # Must be trying to use a custom history model name 179 if callable(self.custom_model_name): 180 name = self.custom_model_name(model._meta.object_name) 181 else: 182 # simple string 183 name = self.custom_model_name 184 # Desired class name cannot be same as the model it is tracking 185 if not ( 186 name.lower() == model._meta.object_name.lower() 187 and model.__module__ == self.module 188 ): 189 return name 190 raise ValueError( 191 "The 'custom_model_name' option '{}' evaluates to a name that is the same " 192 "as the model it is tracking. This is not permitted.".format( 193 self.custom_model_name 194 ) 195 ) 196 197 def create_history_model(self, model, inherited): 198 """ 199 Creates a historical model to associate with the model provided. 200 """ 201 attrs = { 202 "__module__": self.module, 203 "_history_excluded_fields": self.excluded_fields, 204 } 205 206 app_module = "%s.models" % model._meta.app_label 207 208 if inherited: 209 # inherited use models module 210 attrs["__module__"] = model.__module__ 211 elif model.__module__ != self.module: 212 # registered under different app 213 attrs["__module__"] = self.module 214 elif app_module != self.module: 215 # Abuse an internal API because the app registry is loading. 216 app = apps.app_configs[model._meta.app_label] 217 models_module = app.name 218 attrs["__module__"] = models_module 219 220 fields = self.copy_fields(model) 221 attrs.update(fields) 222 attrs.update(self.get_extra_fields(model, fields)) 223 # type in python2 wants str as a first argument 224 attrs.update(Meta=type(str("Meta"), (), self.get_meta_options(model))) 225 if self.table_name is not None: 226 attrs["Meta"].db_table = self.table_name 227 228 # Set as the default then check for overrides 229 name = self.get_history_model_name(model) 230 231 registered_models[model._meta.db_table] = model 232 history_model = type(str(name), self.bases, attrs) 233 return ( 234 python_2_unicode_compatible(history_model) 235 if django.VERSION < (2,) 236 else history_model 237 ) 238 239 def fields_included(self, model): 240 fields = [] 241 for field in model._meta.fields: 242 if field.name not in self.excluded_fields: 243 fields.append(field) 244 return fields 245 246 def copy_fields(self, model): 247 """ 248 Creates copies of the model's original fields, returning 249 a dictionary mapping field name to copied field object. 250 """ 251 fields = {} 252 for field in self.fields_included(model): 253 field = copy.copy(field) 254 field.remote_field = copy.copy(field.remote_field) 255 if isinstance(field, OrderWrt): 256 # OrderWrt is a proxy field, switch to a plain IntegerField 257 field.__class__ = models.IntegerField 258 if isinstance(field, models.ForeignKey): 259 old_field = field 260 old_swappable = old_field.swappable 261 old_field.swappable = False 262 try: 263 _name, _path, args, field_args = old_field.deconstruct() 264 finally: 265 old_field.swappable = old_swappable 266 if getattr(old_field, "one_to_one", False) or isinstance( 267 old_field, models.OneToOneField 268 ): 269 FieldType = models.ForeignKey 270 else: 271 FieldType = type(old_field) 272 273 # If field_args['to'] is 'self' then we have a case where the object 274 # has a foreign key to itself. If we pass the historical record's 275 # field to = 'self', the foreign key will point to an historical 276 # record rather than the base record. We can use old_field.model here. 277 if field_args.get("to", None) == "self": 278 field_args["to"] = old_field.model 279 280 # Override certain arguments passed when creating the field 281 # so that they work for the historical field. 282 field_args.update( 283 db_constraint=False, 284 related_name="+", 285 null=True, 286 blank=True, 287 primary_key=False, 288 db_index=True, 289 serialize=True, 290 unique=False, 291 on_delete=models.DO_NOTHING, 292 ) 293 field = FieldType(*args, **field_args) 294 field.name = old_field.name 295 else: 296 transform_field(field) 297 fields[field.name] = field 298 return fields 299 300 def _get_history_change_reason_field(self): 301 if self.history_change_reason_field: 302 # User specific field from init 303 history_change_reason_field = self.history_change_reason_field 304 elif getattr( 305 settings, "SIMPLE_HISTORY_HISTORY_CHANGE_REASON_USE_TEXT_FIELD", False 306 ): 307 # Use text field with no max length, not enforced by DB anyways 308 history_change_reason_field = models.TextField(null=True) 309 else: 310 # Current default, with max length 311 history_change_reason_field = models.CharField(max_length=100, null=True) 312 313 return history_change_reason_field 314 315 def _get_history_id_field(self): 316 if self.history_id_field: 317 history_id_field = self.history_id_field 318 history_id_field.primary_key = True 319 history_id_field.editable = False 320 elif getattr(settings, "SIMPLE_HISTORY_HISTORY_ID_USE_UUID", False): 321 history_id_field = models.UUIDField( 322 primary_key=True, default=uuid.uuid4, editable=False 323 ) 324 else: 325 history_id_field = models.AutoField(primary_key=True) 326 327 return history_id_field 328 329 def _get_history_user_fields(self): 330 if self.user_id_field is not None: 331 # Tracking user using explicit id rather than Django ForeignKey 332 history_user_fields = { 333 "history_user": property(self.user_getter, self.user_setter), 334 "history_user_id": self.user_id_field, 335 } 336 else: 337 user_model = self.user_model or getattr( 338 settings, "AUTH_USER_MODEL", "auth.User" 339 ) 340 341 history_user_fields = { 342 "history_user": models.ForeignKey( 343 user_model, 344 null=True, 345 related_name=self.user_related_name, 346 on_delete=models.SET_NULL, 347 db_constraint=self.user_db_constraint, 348 ) 349 } 350 351 return history_user_fields 352 353 def _get_history_related_field(self, model): 354 if self.related_name: 355 if self.manager_name == self.related_name: 356 raise exceptions.RelatedNameConflictError( 357 "The related name must not be called like the history manager." 358 ) 359 return { 360 "history_relation": models.ForeignKey( 361 model, 362 on_delete=models.DO_NOTHING, 363 related_name=self.related_name, 364 db_constraint=False, 365 ) 366 } 367 else: 368 return {} 369 370 def get_extra_fields(self, model, fields): 371 """Return dict of extra fields added to the historical record model""" 372 373 def revert_url(self): 374 """URL for this change in the default admin site.""" 375 opts = model._meta 376 app_label, model_name = opts.app_label, opts.model_name 377 return reverse( 378 "%s:%s_%s_simple_history" % (admin.site.name, app_label, model_name), 379 args=[getattr(self, opts.pk.attname), self.history_id], 380 ) 381 382 def get_instance(self): 383 attrs = { 384 field.attname: getattr(self, field.attname) for field in fields.values() 385 } 386 if self._history_excluded_fields: 387 # We don't add ManyToManyFields to this list because they may cause 388 # the subsequent `.get()` call to fail. See #706 for context. 389 excluded_attnames = [ 390 model._meta.get_field(field).attname 391 for field in self._history_excluded_fields 392 if not isinstance(model._meta.get_field(field), ManyToManyField) 393 ] 394 try: 395 values = ( 396 model.objects.filter(pk=getattr(self, model._meta.pk.attname)) 397 .values(*excluded_attnames) 398 .get() 399 ) 400 except ObjectDoesNotExist: 401 pass 402 else: 403 attrs.update(values) 404 return model(**attrs) 405 406 def get_next_record(self): 407 """ 408 Get the next history record for the instance. `None` if last. 409 """ 410 history = utils.get_history_manager_for_model(self.instance) 411 return ( 412 history.filter(Q(history_date__gt=self.history_date)) 413 .order_by("history_date") 414 .first() 415 ) 416 417 def get_prev_record(self): 418 """ 419 Get the previous history record for the instance. `None` if first. 420 """ 421 history = utils.get_history_manager_for_model(self.instance) 422 return ( 423 history.filter(Q(history_date__lt=self.history_date)) 424 .order_by("history_date") 425 .last() 426 ) 427 428 def get_default_history_user(instance): 429 """ 430 Returns the user specified by `get_user` method for manually creating 431 historical objects 432 """ 433 return self.get_history_user(instance) 434 435 extra_fields = { 436 "history_id": self._get_history_id_field(), 437 "history_date": models.DateTimeField(), 438 "history_change_reason": self._get_history_change_reason_field(), 439 "history_type": models.CharField( 440 max_length=1, 441 choices=(("+", _("Created")), ("~", _("Changed")), ("-", _("Deleted"))), 442 ), 443 "history_object": HistoricalObjectDescriptor( 444 model, self.fields_included(model) 445 ), 446 "instance": property(get_instance), 447 "instance_type": model, 448 "next_record": property(get_next_record), 449 "prev_record": property(get_prev_record), 450 "revert_url": revert_url, 451 "__str__": lambda self: "{} as of {}".format( 452 self.history_object, self.history_date 453 ), 454 "get_default_history_user": staticmethod(get_default_history_user), 455 } 456 457 extra_fields.update(self._get_history_related_field(model)) 458 extra_fields.update(self._get_history_user_fields()) 459 460 return extra_fields 461 462 def get_meta_options(self, model): 463 """ 464 Returns a dictionary of fields that will be added to 465 the Meta inner class of the historical record model. 466 """ 467 meta_fields = { 468 "ordering": ("-history_date", "-history_id"), 469 "get_latest_by": "history_date", 470 } 471 if self.user_set_verbose_name: 472 name = self.user_set_verbose_name 473 else: 474 name = format_lazy("historical {}", smart_str(model._meta.verbose_name)) 475 meta_fields["verbose_name"] = name 476 if self.app: 477 meta_fields["app_label"] = self.app 478 return meta_fields 479 480 def post_save(self, instance, created, using=None, **kwargs): 481 if not created and hasattr(instance, "skip_history_when_saving"): 482 return 483 if not kwargs.get("raw", False): 484 self.create_historical_record(instance, created and "+" or "~", using=using) 485 486 def post_delete(self, instance, using=None, **kwargs): 487 if self.cascade_delete_history: 488 manager = getattr(instance, self.manager_name) 489 manager.using(using).all().delete() 490 else: 491 self.create_historical_record(instance, "-", using=using) 492 493 def create_historical_record(self, instance, history_type, using=None): 494 using = using if self.use_base_model_db else None 495 history_date = getattr(instance, "_history_date", timezone.now()) 496 history_user = self.get_history_user(instance) 497 history_change_reason = get_change_reason_from_object(instance) 498 manager = getattr(instance, self.manager_name) 499 500 attrs = {} 501 for field in self.fields_included(instance): 502 attrs[field.attname] = getattr(instance, field.attname) 503 504 relation_field = getattr(manager.model, "history_relation", None) 505 if relation_field is not None: 506 attrs["history_relation"] = instance 507 508 history_instance = manager.model( 509 history_date=history_date, 510 history_type=history_type, 511 history_user=history_user, 512 history_change_reason=history_change_reason, 513 **attrs 514 ) 515 516 pre_create_historical_record.send( 517 sender=manager.model, 518 instance=instance, 519 history_date=history_date, 520 history_user=history_user, 521 history_change_reason=history_change_reason, 522 history_instance=history_instance, 523 using=using, 524 ) 525 526 history_instance.save(using=using) 527 528 post_create_historical_record.send( 529 sender=manager.model, 530 instance=instance, 531 history_instance=history_instance, 532 history_date=history_date, 533 history_user=history_user, 534 history_change_reason=history_change_reason, 535 using=using, 536 ) 537 538 def get_history_user(self, instance): 539 """Get the modifying user from instance or middleware.""" 540 try: 541 return instance._history_user 542 except AttributeError: 543 request = None 544 try: 545 if self.thread.request.user.is_authenticated: 546 request = self.thread.request 547 except AttributeError: 548 pass 549 550 return self.get_user(instance=instance, request=request) 551 552 553def transform_field(field): 554 """Customize field appropriately for use in historical model""" 555 field.name = field.attname 556 if isinstance(field, models.BigAutoField): 557 field.__class__ = models.BigIntegerField 558 elif isinstance(field, models.AutoField): 559 field.__class__ = models.IntegerField 560 561 elif isinstance(field, models.FileField): 562 # Don't copy file, just path. 563 if getattr(settings, "SIMPLE_HISTORY_FILEFIELD_TO_CHARFIELD", False): 564 field.__class__ = models.CharField 565 else: 566 field.__class__ = models.TextField 567 568 # Historical instance shouldn't change create/update timestamps 569 field.auto_now = False 570 field.auto_now_add = False 571 572 if field.primary_key or field.unique: 573 # Unique fields can no longer be guaranteed unique, 574 # but they should still be indexed for faster lookups. 575 field.primary_key = False 576 field._unique = False 577 field.db_index = True 578 field.serialize = True 579 580 581class HistoricalObjectDescriptor(object): 582 def __init__(self, model, fields_included): 583 self.model = model 584 self.fields_included = fields_included 585 586 def __get__(self, instance, owner): 587 values = {f.attname: getattr(instance, f.attname) for f in self.fields_included} 588 return self.model(**values) 589 590 591class HistoricalChanges(object): 592 def diff_against(self, old_history, excluded_fields=None): 593 if not isinstance(old_history, type(self)): 594 raise TypeError( 595 ("unsupported type(s) for diffing: " "'{}' and '{}'").format( 596 type(self), type(old_history) 597 ) 598 ) 599 if excluded_fields is None: 600 excluded_fields = [] 601 changes = [] 602 changed_fields = [] 603 old_values = model_to_dict(old_history.instance) 604 current_values = model_to_dict(self.instance) 605 for field, new_value in current_values.items(): 606 if field in excluded_fields: 607 continue 608 if field in old_values: 609 old_value = old_values[field] 610 if old_value != new_value: 611 change = ModelChange(field, old_value, new_value) 612 changes.append(change) 613 changed_fields.append(field) 614 615 return ModelDelta(changes, changed_fields, old_history, self) 616 617 618class ModelChange(object): 619 def __init__(self, field_name, old_value, new_value): 620 self.field = field_name 621 self.old = old_value 622 self.new = new_value 623 624 625class ModelDelta(object): 626 def __init__(self, changes, changed_fields, old_record, new_record): 627 self.changes = changes 628 self.changed_fields = changed_fields 629 self.old_record = old_record 630 self.new_record = new_record 631