1from __future__ import unicode_literals
3import copy
4import importlib
5import threading
6import uuid
7import warnings
9import django
10import six
11from django.apps import apps
12from django.conf import settings
13from django.contrib import admin
14from django.contrib.auth import get_user_model
15from django.core.exceptions import ObjectDoesNotExist
16from django.db import models
17from django.db.models import ManyToManyField, Q
18from django.db.models.fields.proxy import OrderWrt
19from django.forms.models import model_to_dict
20from django.urls import reverse
21from django.utils.text import format_lazy
22from django.utils import timezone
23from simple_history import utils
24from . import exceptions
25from .manager import HistoryDescriptor
26from .signals import post_create_historical_record, pre_create_historical_record
27from .utils import get_change_reason_from_object
29if django.VERSION < (2,):
30    from django.utils.translation import ugettext_lazy as _
31    from django.utils.encoding import smart_text as smart_str
32    from django.utils.encoding import python_2_unicode_compatible
34    from django.utils.translation import gettext_lazy as _
35    from django.utils.encoding import smart_str
37registered_models = {}
40def _default_get_user(request, **kwargs):
41    try:
42        return request.user
43    except AttributeError:
44        return None
47def _history_user_getter(historical_instance):
48    if historical_instance.history_user_id is None:
49        return None
50    User = get_user_model()
51    try:
52        return User.objects.get(pk=historical_instance.history_user_id)
53    except User.DoesNotExist:
54        return None
57def _history_user_setter(historical_instance, user):
58    if user is not None:
59        historical_instance.history_user_id = user.pk
62class HistoricalRecords(object):
63    thread = threading.local()
65    def __init__(
66        self,
67        verbose_name=None,
68        bases=(models.Model,),
69        user_related_name="+",
70        table_name=None,
71        inherit=False,
72        excluded_fields=None,
73        history_id_field=None,
74        history_change_reason_field=None,
75        user_model=None,
76        get_user=_default_get_user,
77        cascade_delete_history=False,
78        custom_model_name=None,
79        app=None,
80        history_user_id_field=None,
81        history_user_getter=_history_user_getter,
82        history_user_setter=_history_user_setter,
83        related_name=None,
84        use_base_model_db=False,
85        user_db_constraint=True,
86    ):
87        self.user_set_verbose_name = verbose_name
88        self.user_related_name = user_related_name
89        self.user_db_constraint = user_db_constraint
90        self.table_name = table_name
91        self.inherit = inherit
92        self.history_id_field = history_id_field
93        self.history_change_reason_field = history_change_reason_field
94        self.user_model = user_model
95        self.get_user = get_user
96        self.cascade_delete_history = cascade_delete_history
97        self.custom_model_name = custom_model_name
98        self.app = app
99        self.user_id_field = history_user_id_field
100        self.user_getter = history_user_getter
101        self.user_setter = history_user_setter
102        self.related_name = related_name
103        self.use_base_model_db = use_base_model_db
105        if excluded_fields is None:
106            excluded_fields = []
107        self.excluded_fields = excluded_fields
108        try:
109            if isinstance(bases, six.string_types):
110                raise TypeError
111            self.bases = (HistoricalChanges,) + tuple(bases)
112        except TypeError:
113            raise TypeError("The `bases` option must be a list or a tuple.")
115    def contribute_to_class(self, cls, name):
116        self.manager_name = name
117        self.module = cls.__module__
118        self.cls = cls
119        models.signals.class_prepared.connect(self.finalize, weak=False)
120        self.add_extra_methods(cls)
122        if cls._meta.abstract and not self.inherit:
123            msg = (
124                "HistoricalRecords added to abstract model ({}) without "
125                "inherit=True".format(self.cls.__name__)
126            )
127            warnings.warn(msg, UserWarning)
129    def add_extra_methods(self, cls):
130        def save_without_historical_record(self, *args, **kwargs):
131            """
132            Save model without saving a historical record
134            Make sure you know what you're doing before you use this method.
135            """
136            self.skip_history_when_saving = True
137            try:
138                ret = self.save(*args, **kwargs)
139            finally:
140                del self.skip_history_when_saving
141            return ret
143        setattr(cls, "save_without_historical_record", save_without_historical_record)
145    def finalize(self, sender, **kwargs):
146        inherited = False
147        if self.cls is not sender:  # set in concrete
148            inherited = self.inherit and issubclass(sender, self.cls)
149            if not inherited:
150                return  # set in abstract
152        if hasattr(sender._meta, "simple_history_manager_attribute"):
153            raise exceptions.MultipleRegistrationsError(
154                "{}.{} registered multiple times for history tracking.".format(
155                    sender._meta.app_label, sender._meta.object_name
156                )
157            )
158        history_model = self.create_history_model(sender, inherited)
159        if inherited:
160            # Make sure history model is in same module as concrete model
161            module = importlib.import_module(history_model.__module__)
162        else:
163            module = importlib.import_module(self.module)
164        setattr(module, history_model.__name__, history_model)
166        # The HistoricalRecords object will be discarded,
167        # so the signal handlers can't use weak references.
168        models.signals.post_save.connect(self.post_save, sender=sender, weak=False)
169        models.signals.post_delete.connect(self.post_delete, sender=sender, weak=False)
171        descriptor = HistoryDescriptor(history_model)
172        setattr(sender, self.manager_name, descriptor)
173        sender._meta.simple_history_manager_attribute = self.manager_name
175    def get_history_model_name(self, model):
176        if not self.custom_model_name:
177            return "Historical{}".format(model._meta.object_name)
178        # Must be trying to use a custom history model name
179        if callable(self.custom_model_name):
180            name = self.custom_model_name(model._meta.object_name)
181        else:
182            #  simple string
183            name = self.custom_model_name
184        # Desired class name cannot be same as the model it is tracking
185        if not (
186            name.lower() == model._meta.object_name.lower()
187            and model.__module__ == self.module
188        ):
189            return name
190        raise ValueError(
191            "The 'custom_model_name' option '{}' evaluates to a name that is the same "
192            "as the model it is tracking. This is not permitted.".format(
193                self.custom_model_name
194            )
195        )
197    def create_history_model(self, model, inherited):
198        """
199        Creates a historical model to associate with the model provided.
200        """
201        attrs = {
202            "__module__": self.module,
203            "_history_excluded_fields": self.excluded_fields,
204        }
206        app_module = "%s.models" % model._meta.app_label
208        if inherited:
209            # inherited use models module
210            attrs["__module__"] = model.__module__
211        elif model.__module__ != self.module:
212            # registered under different app
213            attrs["__module__"] = self.module
214        elif app_module != self.module:
215            # Abuse an internal API because the app registry is loading.
216            app = apps.app_configs[model._meta.app_label]
217            models_module = app.name
218            attrs["__module__"] = models_module
220        fields = self.copy_fields(model)
221        attrs.update(fields)
222        attrs.update(self.get_extra_fields(model, fields))
223        # type in python2 wants str as a first argument
224        attrs.update(Meta=type(str("Meta"), (), self.get_meta_options(model)))
225        if self.table_name is not None:
226            attrs["Meta"].db_table = self.table_name
228        # Set as the default then check for overrides
229        name = self.get_history_model_name(model)
231        registered_models[model._meta.db_table] = model
232        history_model = type(str(name), self.bases, attrs)
233        return (
234            python_2_unicode_compatible(history_model)
235            if django.VERSION < (2,)
236            else history_model
237        )
239    def fields_included(self, model):
240        fields = []
241        for field in model._meta.fields:
242            if field.name not in self.excluded_fields:
243                fields.append(field)
244        return fields
246    def copy_fields(self, model):
247        """
248        Creates copies of the model's original fields, returning
249        a dictionary mapping field name to copied field object.
250        """
251        fields = {}
252        for field in self.fields_included(model):
253            field = copy.copy(field)
254            field.remote_field = copy.copy(field.remote_field)
255            if isinstance(field, OrderWrt):
256                # OrderWrt is a proxy field, switch to a plain IntegerField
257                field.__class__ = models.IntegerField
258            if isinstance(field, models.ForeignKey):
259                old_field = field
260                old_swappable = old_field.swappable
261                old_field.swappable = False
262                try:
263                    _name, _path, args, field_args = old_field.deconstruct()
264                finally:
265                    old_field.swappable = old_swappable
266                if getattr(old_field, "one_to_one", False) or isinstance(
267                    old_field, models.OneToOneField
268                ):
269                    FieldType = models.ForeignKey
270                else:
271                    FieldType = type(old_field)
273                # If field_args['to'] is 'self' then we have a case where the object
274                # has a foreign key to itself. If we pass the historical record's
275                # field to = 'self', the foreign key will point to an historical
276                # record rather than the base record. We can use old_field.model here.
277                if field_args.get("to", None) == "self":
278                    field_args["to"] = old_field.model
280                # Override certain arguments passed when creating the field
281                # so that they work for the historical field.
282                field_args.update(
283                    db_constraint=False,
284                    related_name="+",
285                    null=True,
286                    blank=True,
287                    primary_key=False,
288                    db_index=True,
289                    serialize=True,
290                    unique=False,
291                    on_delete=models.DO_NOTHING,
292                )
293                field = FieldType(*args, **field_args)
294                field.name = old_field.name
295            else:
296                transform_field(field)
297            fields[field.name] = field
298        return fields
300    def _get_history_change_reason_field(self):
301        if self.history_change_reason_field:
302            # User specific field from init
303            history_change_reason_field = self.history_change_reason_field
304        elif getattr(
306        ):
307            # Use text field with no max length, not enforced by DB anyways
308            history_change_reason_field = models.TextField(null=True)
309        else:
310            # Current default, with max length
311            history_change_reason_field = models.CharField(max_length=100, null=True)
313        return history_change_reason_field
315    def _get_history_id_field(self):
316        if self.history_id_field:
317            history_id_field = self.history_id_field
318            history_id_field.primary_key = True
319            history_id_field.editable = False
320        elif getattr(settings, "SIMPLE_HISTORY_HISTORY_ID_USE_UUID", False):
321            history_id_field = models.UUIDField(
322                primary_key=True, default=uuid.uuid4, editable=False
323            )
324        else:
325            history_id_field = models.AutoField(primary_key=True)
327        return history_id_field
329    def _get_history_user_fields(self):
330        if self.user_id_field is not None:
331            # Tracking user using explicit id rather than Django ForeignKey
332            history_user_fields = {
333                "history_user": property(self.user_getter, self.user_setter),
334                "history_user_id": self.user_id_field,
335            }
336        else:
337            user_model = self.user_model or getattr(
338                settings, "AUTH_USER_MODEL", "auth.User"
339            )
341            history_user_fields = {
342                "history_user": models.ForeignKey(
343                    user_model,
344                    null=True,
345                    related_name=self.user_related_name,
346                    on_delete=models.SET_NULL,
347                    db_constraint=self.user_db_constraint,
348                )
349            }
351        return history_user_fields
353    def _get_history_related_field(self, model):
354        if self.related_name:
355            if self.manager_name == self.related_name:
356                raise exceptions.RelatedNameConflictError(
357                    "The related name must not be called like the history manager."
358                )
359            return {
360                "history_relation": models.ForeignKey(
361                    model,
362                    on_delete=models.DO_NOTHING,
363                    related_name=self.related_name,
364                    db_constraint=False,
365                )
366            }
367        else:
368            return {}
370    def get_extra_fields(self, model, fields):
371        """Return dict of extra fields added to the historical record model"""
373        def revert_url(self):
374            """URL for this change in the default admin site."""
375            opts = model._meta
376            app_label, model_name = opts.app_label, opts.model_name
377            return reverse(
378                "%s:%s_%s_simple_history" % (admin.site.name, app_label, model_name),
379                args=[getattr(self, opts.pk.attname), self.history_id],
380            )
382        def get_instance(self):
383            attrs = {
384                field.attname: getattr(self, field.attname) for field in fields.values()
385            }
386            if self._history_excluded_fields:
387                # We don't add ManyToManyFields to this list because they may cause
388                # the subsequent `.get()` call to fail. See #706 for context.
389                excluded_attnames = [
390                    model._meta.get_field(field).attname
391                    for field in self._history_excluded_fields
392                    if not isinstance(model._meta.get_field(field), ManyToManyField)
393                ]
394                try:
395                    values = (
396                        model.objects.filter(pk=getattr(self, model._meta.pk.attname))
397                        .values(*excluded_attnames)
398                        .get()
399                    )
400                except ObjectDoesNotExist:
401                    pass
402                else:
403                    attrs.update(values)
404            return model(**attrs)
406        def get_next_record(self):
407            """
408            Get the next history record for the instance. `None` if last.
409            """
410            history = utils.get_history_manager_for_model(self.instance)
411            return (
412                history.filter(Q(history_date__gt=self.history_date))
413                .order_by("history_date")
414                .first()
415            )
417        def get_prev_record(self):
418            """
419            Get the previous history record for the instance. `None` if first.
420            """
421            history = utils.get_history_manager_for_model(self.instance)
422            return (
423                history.filter(Q(history_date__lt=self.history_date))
424                .order_by("history_date")
425                .last()
426            )
428        def get_default_history_user(instance):
429            """
430            Returns the user specified by `get_user` method for manually creating
431            historical objects
432            """
433            return self.get_history_user(instance)
435        extra_fields = {
436            "history_id": self._get_history_id_field(),
437            "history_date": models.DateTimeField(),
438            "history_change_reason": self._get_history_change_reason_field(),
439            "history_type": models.CharField(
440                max_length=1,
441                choices=(("+", _("Created")), ("~", _("Changed")), ("-", _("Deleted"))),
442            ),
443            "history_object": HistoricalObjectDescriptor(
444                model, self.fields_included(model)
445            ),
446            "instance": property(get_instance),
447            "instance_type": model,
448            "next_record": property(get_next_record),
449            "prev_record": property(get_prev_record),
450            "revert_url": revert_url,
451            "__str__": lambda self: "{} as of {}".format(
452                self.history_object, self.history_date
453            ),
454            "get_default_history_user": staticmethod(get_default_history_user),
455        }
457        extra_fields.update(self._get_history_related_field(model))
458        extra_fields.update(self._get_history_user_fields())
460        return extra_fields
462    def get_meta_options(self, model):
463        """
464        Returns a dictionary of fields that will be added to
465        the Meta inner class of the historical record model.
466        """
467        meta_fields = {
468            "ordering": ("-history_date", "-history_id"),
469            "get_latest_by": "history_date",
470        }
471        if self.user_set_verbose_name:
472            name = self.user_set_verbose_name
473        else:
474            name = format_lazy("historical {}", smart_str(model._meta.verbose_name))
475        meta_fields["verbose_name"] = name
476        if self.app:
477            meta_fields["app_label"] = self.app
478        return meta_fields
480    def post_save(self, instance, created, using=None, **kwargs):
481        if not created and hasattr(instance, "skip_history_when_saving"):
482            return
483        if not kwargs.get("raw", False):
484            self.create_historical_record(instance, created and "+" or "~", using=using)
486    def post_delete(self, instance, using=None, **kwargs):
487        if self.cascade_delete_history:
488            manager = getattr(instance, self.manager_name)
489            manager.using(using).all().delete()
490        else:
491            self.create_historical_record(instance, "-", using=using)
493    def create_historical_record(self, instance, history_type, using=None):
494        using = using if self.use_base_model_db else None
495        history_date = getattr(instance, "_history_date", timezone.now())
496        history_user = self.get_history_user(instance)
497        history_change_reason = get_change_reason_from_object(instance)
498        manager = getattr(instance, self.manager_name)
500        attrs = {}
501        for field in self.fields_included(instance):
502            attrs[field.attname] = getattr(instance, field.attname)
504        relation_field = getattr(manager.model, "history_relation", None)
505        if relation_field is not None:
506            attrs["history_relation"] = instance
508        history_instance = manager.model(
509            history_date=history_date,
510            history_type=history_type,
511            history_user=history_user,
512            history_change_reason=history_change_reason,
513            **attrs
514        )
516        pre_create_historical_record.send(
517            sender=manager.model,
518            instance=instance,
519            history_date=history_date,
520            history_user=history_user,
521            history_change_reason=history_change_reason,
522            history_instance=history_instance,
523            using=using,
524        )
526        history_instance.save(using=using)
528        post_create_historical_record.send(
529            sender=manager.model,
530            instance=instance,
531            history_instance=history_instance,
532            history_date=history_date,
533            history_user=history_user,
534            history_change_reason=history_change_reason,
535            using=using,
536        )
538    def get_history_user(self, instance):
539        """Get the modifying user from instance or middleware."""
540        try:
541            return instance._history_user
542        except AttributeError:
543            request = None
544            try:
545                if self.thread.request.user.is_authenticated:
546                    request = self.thread.request
547            except AttributeError:
548                pass
550        return self.get_user(instance=instance, request=request)
553def transform_field(field):
554    """Customize field appropriately for use in historical model"""
555    field.name = field.attname
556    if isinstance(field, models.BigAutoField):
557        field.__class__ = models.BigIntegerField
558    elif isinstance(field, models.AutoField):
559        field.__class__ = models.IntegerField
561    elif isinstance(field, models.FileField):
562        # Don't copy file, just path.
563        if getattr(settings, "SIMPLE_HISTORY_FILEFIELD_TO_CHARFIELD", False):
564            field.__class__ = models.CharField
565        else:
566            field.__class__ = models.TextField
568    # Historical instance shouldn't change create/update timestamps
569    field.auto_now = False
570    field.auto_now_add = False
572    if field.primary_key or field.unique:
573        # Unique fields can no longer be guaranteed unique,
574        # but they should still be indexed for faster lookups.
575        field.primary_key = False
576        field._unique = False
577        field.db_index = True
578        field.serialize = True
581class HistoricalObjectDescriptor(object):
582    def __init__(self, model, fields_included):
583        self.model = model
584        self.fields_included = fields_included
586    def __get__(self, instance, owner):
587        values = {f.attname: getattr(instance, f.attname) for f in self.fields_included}
588        return self.model(**values)
591class HistoricalChanges(object):
592    def diff_against(self, old_history, excluded_fields=None):
593        if not isinstance(old_history, type(self)):
594            raise TypeError(
595                ("unsupported type(s) for diffing: " "'{}' and '{}'").format(
596                    type(self), type(old_history)
597                )
598            )
599        if excluded_fields is None:
600            excluded_fields = []
601        changes = []
602        changed_fields = []
603        old_values = model_to_dict(old_history.instance)
604        current_values = model_to_dict(self.instance)
605        for field, new_value in current_values.items():
606            if field in excluded_fields:
607                continue
608            if field in old_values:
609                old_value = old_values[field]
610                if old_value != new_value:
611                    change = ModelChange(field, old_value, new_value)
612                    changes.append(change)
613                    changed_fields.append(field)
615        return ModelDelta(changes, changed_fields, old_history, self)
618class ModelChange(object):
619    def __init__(self, field_name, old_value, new_value):
620        self.field = field_name
621        self.old = old_value
622        self.new = new_value
625class ModelDelta(object):
626    def __init__(self, changes, changed_fields, old_record, new_record):
627        self.changes = changes
628        self.changed_fields = changed_fields
629        self.old_record = old_record
630        self.new_record = new_record