1from __future__ import unicode_literals
2
3import copy
4import importlib
5import threading
6import uuid
7import warnings
8
9import django
10import six
11from django.apps import apps
12from django.conf import settings
13from django.contrib import admin
14from django.contrib.auth import get_user_model
15from django.core.exceptions import ObjectDoesNotExist
16from django.db import models
17from django.db.models import ManyToManyField, Q
18from django.db.models.fields.proxy import OrderWrt
19from django.forms.models import model_to_dict
20from django.urls import reverse
21from django.utils.text import format_lazy
22from django.utils import timezone
23from simple_history import utils
24from . import exceptions
25from .manager import HistoryDescriptor
26from .signals import post_create_historical_record, pre_create_historical_record
27from .utils import get_change_reason_from_object
28
29if django.VERSION < (2,):
30    from django.utils.translation import ugettext_lazy as _
31    from django.utils.encoding import smart_text as smart_str
32    from django.utils.encoding import python_2_unicode_compatible
33else:
34    from django.utils.translation import gettext_lazy as _
35    from django.utils.encoding import smart_str
36
37registered_models = {}
38
39
40def _default_get_user(request, **kwargs):
41    try:
42        return request.user
43    except AttributeError:
44        return None
45
46
47def _history_user_getter(historical_instance):
48    if historical_instance.history_user_id is None:
49        return None
50    User = get_user_model()
51    try:
52        return User.objects.get(pk=historical_instance.history_user_id)
53    except User.DoesNotExist:
54        return None
55
56
57def _history_user_setter(historical_instance, user):
58    if user is not None:
59        historical_instance.history_user_id = user.pk
60
61
62class HistoricalRecords(object):
63    thread = threading.local()
64
65    def __init__(
66        self,
67        verbose_name=None,
68        bases=(models.Model,),
69        user_related_name="+",
70        table_name=None,
71        inherit=False,
72        excluded_fields=None,
73        history_id_field=None,
74        history_change_reason_field=None,
75        user_model=None,
76        get_user=_default_get_user,
77        cascade_delete_history=False,
78        custom_model_name=None,
79        app=None,
80        history_user_id_field=None,
81        history_user_getter=_history_user_getter,
82        history_user_setter=_history_user_setter,
83        related_name=None,
84        use_base_model_db=False,
85        user_db_constraint=True,
86    ):
87        self.user_set_verbose_name = verbose_name
88        self.user_related_name = user_related_name
89        self.user_db_constraint = user_db_constraint
90        self.table_name = table_name
91        self.inherit = inherit
92        self.history_id_field = history_id_field
93        self.history_change_reason_field = history_change_reason_field
94        self.user_model = user_model
95        self.get_user = get_user
96        self.cascade_delete_history = cascade_delete_history
97        self.custom_model_name = custom_model_name
98        self.app = app
99        self.user_id_field = history_user_id_field
100        self.user_getter = history_user_getter
101        self.user_setter = history_user_setter
102        self.related_name = related_name
103        self.use_base_model_db = use_base_model_db
104
105        if excluded_fields is None:
106            excluded_fields = []
107        self.excluded_fields = excluded_fields
108        try:
109            if isinstance(bases, six.string_types):
110                raise TypeError
111            self.bases = (HistoricalChanges,) + tuple(bases)
112        except TypeError:
113            raise TypeError("The `bases` option must be a list or a tuple.")
114
115    def contribute_to_class(self, cls, name):
116        self.manager_name = name
117        self.module = cls.__module__
118        self.cls = cls
119        models.signals.class_prepared.connect(self.finalize, weak=False)
120        self.add_extra_methods(cls)
121
122        if cls._meta.abstract and not self.inherit:
123            msg = (
124                "HistoricalRecords added to abstract model ({}) without "
125                "inherit=True".format(self.cls.__name__)
126            )
127            warnings.warn(msg, UserWarning)
128
129    def add_extra_methods(self, cls):
130        def save_without_historical_record(self, *args, **kwargs):
131            """
132            Save model without saving a historical record
133
134            Make sure you know what you're doing before you use this method.
135            """
136            self.skip_history_when_saving = True
137            try:
138                ret = self.save(*args, **kwargs)
139            finally:
140                del self.skip_history_when_saving
141            return ret
142
143        setattr(cls, "save_without_historical_record", save_without_historical_record)
144
145    def finalize(self, sender, **kwargs):
146        inherited = False
147        if self.cls is not sender:  # set in concrete
148            inherited = self.inherit and issubclass(sender, self.cls)
149            if not inherited:
150                return  # set in abstract
151
152        if hasattr(sender._meta, "simple_history_manager_attribute"):
153            raise exceptions.MultipleRegistrationsError(
154                "{}.{} registered multiple times for history tracking.".format(
155                    sender._meta.app_label, sender._meta.object_name
156                )
157            )
158        history_model = self.create_history_model(sender, inherited)
159        if inherited:
160            # Make sure history model is in same module as concrete model
161            module = importlib.import_module(history_model.__module__)
162        else:
163            module = importlib.import_module(self.module)
164        setattr(module, history_model.__name__, history_model)
165
166        # The HistoricalRecords object will be discarded,
167        # so the signal handlers can't use weak references.
168        models.signals.post_save.connect(self.post_save, sender=sender, weak=False)
169        models.signals.post_delete.connect(self.post_delete, sender=sender, weak=False)
170
171        descriptor = HistoryDescriptor(history_model)
172        setattr(sender, self.manager_name, descriptor)
173        sender._meta.simple_history_manager_attribute = self.manager_name
174
175    def get_history_model_name(self, model):
176        if not self.custom_model_name:
177            return "Historical{}".format(model._meta.object_name)
178        # Must be trying to use a custom history model name
179        if callable(self.custom_model_name):
180            name = self.custom_model_name(model._meta.object_name)
181        else:
182            #  simple string
183            name = self.custom_model_name
184        # Desired class name cannot be same as the model it is tracking
185        if not (
186            name.lower() == model._meta.object_name.lower()
187            and model.__module__ == self.module
188        ):
189            return name
190        raise ValueError(
191            "The 'custom_model_name' option '{}' evaluates to a name that is the same "
192            "as the model it is tracking. This is not permitted.".format(
193                self.custom_model_name
194            )
195        )
196
197    def create_history_model(self, model, inherited):
198        """
199        Creates a historical model to associate with the model provided.
200        """
201        attrs = {
202            "__module__": self.module,
203            "_history_excluded_fields": self.excluded_fields,
204        }
205
206        app_module = "%s.models" % model._meta.app_label
207
208        if inherited:
209            # inherited use models module
210            attrs["__module__"] = model.__module__
211        elif model.__module__ != self.module:
212            # registered under different app
213            attrs["__module__"] = self.module
214        elif app_module != self.module:
215            # Abuse an internal API because the app registry is loading.
216            app = apps.app_configs[model._meta.app_label]
217            models_module = app.name
218            attrs["__module__"] = models_module
219
220        fields = self.copy_fields(model)
221        attrs.update(fields)
222        attrs.update(self.get_extra_fields(model, fields))
223        # type in python2 wants str as a first argument
224        attrs.update(Meta=type(str("Meta"), (), self.get_meta_options(model)))
225        if self.table_name is not None:
226            attrs["Meta"].db_table = self.table_name
227
228        # Set as the default then check for overrides
229        name = self.get_history_model_name(model)
230
231        registered_models[model._meta.db_table] = model
232        history_model = type(str(name), self.bases, attrs)
233        return (
234            python_2_unicode_compatible(history_model)
235            if django.VERSION < (2,)
236            else history_model
237        )
238
239    def fields_included(self, model):
240        fields = []
241        for field in model._meta.fields:
242            if field.name not in self.excluded_fields:
243                fields.append(field)
244        return fields
245
246    def copy_fields(self, model):
247        """
248        Creates copies of the model's original fields, returning
249        a dictionary mapping field name to copied field object.
250        """
251        fields = {}
252        for field in self.fields_included(model):
253            field = copy.copy(field)
254            field.remote_field = copy.copy(field.remote_field)
255            if isinstance(field, OrderWrt):
256                # OrderWrt is a proxy field, switch to a plain IntegerField
257                field.__class__ = models.IntegerField
258            if isinstance(field, models.ForeignKey):
259                old_field = field
260                old_swappable = old_field.swappable
261                old_field.swappable = False
262                try:
263                    _name, _path, args, field_args = old_field.deconstruct()
264                finally:
265                    old_field.swappable = old_swappable
266                if getattr(old_field, "one_to_one", False) or isinstance(
267                    old_field, models.OneToOneField
268                ):
269                    FieldType = models.ForeignKey
270                else:
271                    FieldType = type(old_field)
272
273                # If field_args['to'] is 'self' then we have a case where the object
274                # has a foreign key to itself. If we pass the historical record's
275                # field to = 'self', the foreign key will point to an historical
276                # record rather than the base record. We can use old_field.model here.
277                if field_args.get("to", None) == "self":
278                    field_args["to"] = old_field.model
279
280                # Override certain arguments passed when creating the field
281                # so that they work for the historical field.
282                field_args.update(
283                    db_constraint=False,
284                    related_name="+",
285                    null=True,
286                    blank=True,
287                    primary_key=False,
288                    db_index=True,
289                    serialize=True,
290                    unique=False,
291                    on_delete=models.DO_NOTHING,
292                )
293                field = FieldType(*args, **field_args)
294                field.name = old_field.name
295            else:
296                transform_field(field)
297            fields[field.name] = field
298        return fields
299
300    def _get_history_change_reason_field(self):
301        if self.history_change_reason_field:
302            # User specific field from init
303            history_change_reason_field = self.history_change_reason_field
304        elif getattr(
305            settings, "SIMPLE_HISTORY_HISTORY_CHANGE_REASON_USE_TEXT_FIELD", False
306        ):
307            # Use text field with no max length, not enforced by DB anyways
308            history_change_reason_field = models.TextField(null=True)
309        else:
310            # Current default, with max length
311            history_change_reason_field = models.CharField(max_length=100, null=True)
312
313        return history_change_reason_field
314
315    def _get_history_id_field(self):
316        if self.history_id_field:
317            history_id_field = self.history_id_field
318            history_id_field.primary_key = True
319            history_id_field.editable = False
320        elif getattr(settings, "SIMPLE_HISTORY_HISTORY_ID_USE_UUID", False):
321            history_id_field = models.UUIDField(
322                primary_key=True, default=uuid.uuid4, editable=False
323            )
324        else:
325            history_id_field = models.AutoField(primary_key=True)
326
327        return history_id_field
328
329    def _get_history_user_fields(self):
330        if self.user_id_field is not None:
331            # Tracking user using explicit id rather than Django ForeignKey
332            history_user_fields = {
333                "history_user": property(self.user_getter, self.user_setter),
334                "history_user_id": self.user_id_field,
335            }
336        else:
337            user_model = self.user_model or getattr(
338                settings, "AUTH_USER_MODEL", "auth.User"
339            )
340
341            history_user_fields = {
342                "history_user": models.ForeignKey(
343                    user_model,
344                    null=True,
345                    related_name=self.user_related_name,
346                    on_delete=models.SET_NULL,
347                    db_constraint=self.user_db_constraint,
348                )
349            }
350
351        return history_user_fields
352
353    def _get_history_related_field(self, model):
354        if self.related_name:
355            if self.manager_name == self.related_name:
356                raise exceptions.RelatedNameConflictError(
357                    "The related name must not be called like the history manager."
358                )
359            return {
360                "history_relation": models.ForeignKey(
361                    model,
362                    on_delete=models.DO_NOTHING,
363                    related_name=self.related_name,
364                    db_constraint=False,
365                )
366            }
367        else:
368            return {}
369
370    def get_extra_fields(self, model, fields):
371        """Return dict of extra fields added to the historical record model"""
372
373        def revert_url(self):
374            """URL for this change in the default admin site."""
375            opts = model._meta
376            app_label, model_name = opts.app_label, opts.model_name
377            return reverse(
378                "%s:%s_%s_simple_history" % (admin.site.name, app_label, model_name),
379                args=[getattr(self, opts.pk.attname), self.history_id],
380            )
381
382        def get_instance(self):
383            attrs = {
384                field.attname: getattr(self, field.attname) for field in fields.values()
385            }
386            if self._history_excluded_fields:
387                # We don't add ManyToManyFields to this list because they may cause
388                # the subsequent `.get()` call to fail. See #706 for context.
389                excluded_attnames = [
390                    model._meta.get_field(field).attname
391                    for field in self._history_excluded_fields
392                    if not isinstance(model._meta.get_field(field), ManyToManyField)
393                ]
394                try:
395                    values = (
396                        model.objects.filter(pk=getattr(self, model._meta.pk.attname))
397                        .values(*excluded_attnames)
398                        .get()
399                    )
400                except ObjectDoesNotExist:
401                    pass
402                else:
403                    attrs.update(values)
404            return model(**attrs)
405
406        def get_next_record(self):
407            """
408            Get the next history record for the instance. `None` if last.
409            """
410            history = utils.get_history_manager_for_model(self.instance)
411            return (
412                history.filter(Q(history_date__gt=self.history_date))
413                .order_by("history_date")
414                .first()
415            )
416
417        def get_prev_record(self):
418            """
419            Get the previous history record for the instance. `None` if first.
420            """
421            history = utils.get_history_manager_for_model(self.instance)
422            return (
423                history.filter(Q(history_date__lt=self.history_date))
424                .order_by("history_date")
425                .last()
426            )
427
428        def get_default_history_user(instance):
429            """
430            Returns the user specified by `get_user` method for manually creating
431            historical objects
432            """
433            return self.get_history_user(instance)
434
435        extra_fields = {
436            "history_id": self._get_history_id_field(),
437            "history_date": models.DateTimeField(),
438            "history_change_reason": self._get_history_change_reason_field(),
439            "history_type": models.CharField(
440                max_length=1,
441                choices=(("+", _("Created")), ("~", _("Changed")), ("-", _("Deleted"))),
442            ),
443            "history_object": HistoricalObjectDescriptor(
444                model, self.fields_included(model)
445            ),
446            "instance": property(get_instance),
447            "instance_type": model,
448            "next_record": property(get_next_record),
449            "prev_record": property(get_prev_record),
450            "revert_url": revert_url,
451            "__str__": lambda self: "{} as of {}".format(
452                self.history_object, self.history_date
453            ),
454            "get_default_history_user": staticmethod(get_default_history_user),
455        }
456
457        extra_fields.update(self._get_history_related_field(model))
458        extra_fields.update(self._get_history_user_fields())
459
460        return extra_fields
461
462    def get_meta_options(self, model):
463        """
464        Returns a dictionary of fields that will be added to
465        the Meta inner class of the historical record model.
466        """
467        meta_fields = {
468            "ordering": ("-history_date", "-history_id"),
469            "get_latest_by": "history_date",
470        }
471        if self.user_set_verbose_name:
472            name = self.user_set_verbose_name
473        else:
474            name = format_lazy("historical {}", smart_str(model._meta.verbose_name))
475        meta_fields["verbose_name"] = name
476        if self.app:
477            meta_fields["app_label"] = self.app
478        return meta_fields
479
480    def post_save(self, instance, created, using=None, **kwargs):
481        if not created and hasattr(instance, "skip_history_when_saving"):
482            return
483        if not kwargs.get("raw", False):
484            self.create_historical_record(instance, created and "+" or "~", using=using)
485
486    def post_delete(self, instance, using=None, **kwargs):
487        if self.cascade_delete_history:
488            manager = getattr(instance, self.manager_name)
489            manager.using(using).all().delete()
490        else:
491            self.create_historical_record(instance, "-", using=using)
492
493    def create_historical_record(self, instance, history_type, using=None):
494        using = using if self.use_base_model_db else None
495        history_date = getattr(instance, "_history_date", timezone.now())
496        history_user = self.get_history_user(instance)
497        history_change_reason = get_change_reason_from_object(instance)
498        manager = getattr(instance, self.manager_name)
499
500        attrs = {}
501        for field in self.fields_included(instance):
502            attrs[field.attname] = getattr(instance, field.attname)
503
504        relation_field = getattr(manager.model, "history_relation", None)
505        if relation_field is not None:
506            attrs["history_relation"] = instance
507
508        history_instance = manager.model(
509            history_date=history_date,
510            history_type=history_type,
511            history_user=history_user,
512            history_change_reason=history_change_reason,
513            **attrs
514        )
515
516        pre_create_historical_record.send(
517            sender=manager.model,
518            instance=instance,
519            history_date=history_date,
520            history_user=history_user,
521            history_change_reason=history_change_reason,
522            history_instance=history_instance,
523            using=using,
524        )
525
526        history_instance.save(using=using)
527
528        post_create_historical_record.send(
529            sender=manager.model,
530            instance=instance,
531            history_instance=history_instance,
532            history_date=history_date,
533            history_user=history_user,
534            history_change_reason=history_change_reason,
535            using=using,
536        )
537
538    def get_history_user(self, instance):
539        """Get the modifying user from instance or middleware."""
540        try:
541            return instance._history_user
542        except AttributeError:
543            request = None
544            try:
545                if self.thread.request.user.is_authenticated:
546                    request = self.thread.request
547            except AttributeError:
548                pass
549
550        return self.get_user(instance=instance, request=request)
551
552
553def transform_field(field):
554    """Customize field appropriately for use in historical model"""
555    field.name = field.attname
556    if isinstance(field, models.BigAutoField):
557        field.__class__ = models.BigIntegerField
558    elif isinstance(field, models.AutoField):
559        field.__class__ = models.IntegerField
560
561    elif isinstance(field, models.FileField):
562        # Don't copy file, just path.
563        if getattr(settings, "SIMPLE_HISTORY_FILEFIELD_TO_CHARFIELD", False):
564            field.__class__ = models.CharField
565        else:
566            field.__class__ = models.TextField
567
568    # Historical instance shouldn't change create/update timestamps
569    field.auto_now = False
570    field.auto_now_add = False
571
572    if field.primary_key or field.unique:
573        # Unique fields can no longer be guaranteed unique,
574        # but they should still be indexed for faster lookups.
575        field.primary_key = False
576        field._unique = False
577        field.db_index = True
578        field.serialize = True
579
580
581class HistoricalObjectDescriptor(object):
582    def __init__(self, model, fields_included):
583        self.model = model
584        self.fields_included = fields_included
585
586    def __get__(self, instance, owner):
587        values = {f.attname: getattr(instance, f.attname) for f in self.fields_included}
588        return self.model(**values)
589
590
591class HistoricalChanges(object):
592    def diff_against(self, old_history, excluded_fields=None):
593        if not isinstance(old_history, type(self)):
594            raise TypeError(
595                ("unsupported type(s) for diffing: " "'{}' and '{}'").format(
596                    type(self), type(old_history)
597                )
598            )
599        if excluded_fields is None:
600            excluded_fields = []
601        changes = []
602        changed_fields = []
603        old_values = model_to_dict(old_history.instance)
604        current_values = model_to_dict(self.instance)
605        for field, new_value in current_values.items():
606            if field in excluded_fields:
607                continue
608            if field in old_values:
609                old_value = old_values[field]
610                if old_value != new_value:
611                    change = ModelChange(field, old_value, new_value)
612                    changes.append(change)
613                    changed_fields.append(field)
614
615        return ModelDelta(changes, changed_fields, old_history, self)
616
617
618class ModelChange(object):
619    def __init__(self, field_name, old_value, new_value):
620        self.field = field_name
621        self.old = old_value
622        self.new = new_value
623
624
625class ModelDelta(object):
626    def __init__(self, changes, changed_fields, old_record, new_record):
627        self.changes = changes
628        self.changed_fields = changed_fields
629        self.old_record = old_record
630        self.new_record = new_record
631