1""" 2wagtail.core.models is split into submodules for maintainability. All definitions intended as 3public should be imported here (with 'noqa' comments as required) and outside code should continue 4to import them from wagtail.core.models (e.g. `from wagtail.core.models import Site`, not 5`from wagtail.core.models.sites import Site`.) 6 7Submodules should take care to keep the direction of dependencies consistent; where possible they 8should implement low-level generic functionality which is then imported by higher-level models such 9as Page. 10""" 11 12import functools 13import json 14import logging 15import uuid 16 17from io import StringIO 18from urllib.parse import urlparse 19 20from django import forms 21from django.conf import settings 22from django.contrib.auth.models import Group 23from django.contrib.contenttypes.models import ContentType 24from django.core import checks 25from django.core.cache import cache 26from django.core.exceptions import PermissionDenied, ValidationError 27from django.core.handlers.base import BaseHandler 28from django.core.handlers.wsgi import WSGIRequest 29from django.db import models, transaction 30from django.db.models import DEFERRED, Q, Value 31from django.db.models.expressions import OuterRef, Subquery 32from django.db.models.functions import Concat, Substr 33from django.dispatch import receiver 34from django.http import Http404 35from django.template.response import TemplateResponse 36from django.urls import NoReverseMatch, reverse 37from django.utils import timezone 38from django.utils import translation as translation 39from django.utils.cache import patch_cache_control 40from django.utils.encoding import force_str 41from django.utils.functional import cached_property 42from django.utils.module_loading import import_string 43from django.utils.text import capfirst, slugify 44from django.utils.translation import gettext_lazy as _ 45from modelcluster.fields import ParentalKey 46from modelcluster.models import ClusterableModel, get_all_child_relations 47from treebeard.mp_tree import MP_Node 48 49from wagtail.core.fields import StreamField 50from wagtail.core.forms import TaskStateCommentForm 51from wagtail.core.log_actions import page_log_action_registry 52from wagtail.core.query import PageQuerySet 53from wagtail.core.signals import ( 54 page_published, page_unpublished, post_page_move, pre_page_move, pre_validate_delete, 55 task_approved, task_cancelled, task_rejected, task_submitted, workflow_approved, 56 workflow_cancelled, workflow_rejected, workflow_submitted) 57from wagtail.core.treebeard import TreebeardPathFixMixin 58from wagtail.core.url_routing import RouteResult 59from wagtail.core.utils import ( 60 WAGTAIL_APPEND_SLASH, camelcase_to_underscore, find_available_slug, 61 get_supported_content_language_variant, resolve_model_string) 62from wagtail.search import index 63 64from .audit_log import BaseLogEntry, BaseLogEntryManager, LogEntryQuerySet # noqa 65from .collections import ( # noqa 66 BaseCollectionManager, Collection, CollectionManager, CollectionMember, 67 CollectionViewRestriction, GroupCollectionPermission, GroupCollectionPermissionManager, 68 get_root_collection_id) 69from .copying import _copy, _copy_m2m_relations, _extract_field_data # noqa 70from .i18n import ( # noqa 71 BootstrapTranslatableMixin, BootstrapTranslatableModel, Locale, LocaleManager, 72 TranslatableMixin, bootstrap_translatable_model, get_translatable_models) 73from .sites import Site, SiteManager, SiteRootPath # noqa 74from .view_restrictions import BaseViewRestriction 75 76 77logger = logging.getLogger('wagtail.core') 78 79PAGE_TEMPLATE_VAR = 'page' 80COMMENTS_RELATION_NAME = getattr(settings, 'WAGTAIL_COMMENTS_RELATION_NAME', 'comments') 81 82 83@receiver(pre_validate_delete, sender=Locale) 84def reassign_root_page_locale_on_delete(sender, instance, **kwargs): 85 # if we're deleting the locale used on the root page node, reassign that to a new locale first 86 root_page_with_this_locale = Page.objects.filter(depth=1, locale=instance) 87 if root_page_with_this_locale.exists(): 88 # Select the default locale, if one exists and isn't the one being deleted 89 try: 90 new_locale = Locale.get_default() 91 default_locale_is_ok = (new_locale != instance) 92 except (Locale.DoesNotExist, LookupError): 93 default_locale_is_ok = False 94 95 if not default_locale_is_ok: 96 # fall back on any remaining locale 97 new_locale = Locale.all_objects.exclude(pk=instance.pk).first() 98 99 root_page_with_this_locale.update(locale=new_locale) 100 101 102class ParentNotTranslatedError(Exception): 103 """ 104 Raised when a call to Page.copy_for_translation is made but the 105 parent page is not translated and copy_parents is False. 106 """ 107 pass 108 109 110PAGE_MODEL_CLASSES = [] 111 112 113def get_page_models(): 114 """ 115 Returns a list of all non-abstract Page model classes defined in this project. 116 """ 117 return PAGE_MODEL_CLASSES 118 119 120def get_default_page_content_type(): 121 """ 122 Returns the content type to use as a default for pages whose content type 123 has been deleted. 124 """ 125 return ContentType.objects.get_for_model(Page) 126 127 128@functools.lru_cache(maxsize=None) 129def get_streamfield_names(model_class): 130 return tuple( 131 field.name for field in model_class._meta.concrete_fields 132 if isinstance(field, StreamField) 133 ) 134 135 136class BasePageManager(models.Manager): 137 def get_queryset(self): 138 return self._queryset_class(self.model).order_by('path') 139 140 141PageManager = BasePageManager.from_queryset(PageQuerySet) 142 143 144class PageBase(models.base.ModelBase): 145 """Metaclass for Page""" 146 147 def __init__(cls, name, bases, dct): 148 super(PageBase, cls).__init__(name, bases, dct) 149 150 if 'template' not in dct: 151 # Define a default template path derived from the app name and model name 152 cls.template = "%s/%s.html" % (cls._meta.app_label, camelcase_to_underscore(name)) 153 154 if 'ajax_template' not in dct: 155 cls.ajax_template = None 156 157 cls._clean_subpage_models = None # to be filled in on first call to cls.clean_subpage_models 158 cls._clean_parent_page_models = None # to be filled in on first call to cls.clean_parent_page_models 159 160 # All pages should be creatable unless explicitly set otherwise. 161 # This attribute is not inheritable. 162 if 'is_creatable' not in dct: 163 cls.is_creatable = not cls._meta.abstract 164 165 if not cls._meta.abstract: 166 # register this type in the list of page content types 167 PAGE_MODEL_CLASSES.append(cls) 168 169 170class AbstractPage(TranslatableMixin, TreebeardPathFixMixin, MP_Node): 171 """ 172 Abstract superclass for Page. According to Django's inheritance rules, managers set on 173 abstract models are inherited by subclasses, but managers set on concrete models that are extended 174 via multi-table inheritance are not. We therefore need to attach PageManager to an abstract 175 superclass to ensure that it is retained by subclasses of Page. 176 """ 177 objects = PageManager() 178 179 class Meta: 180 abstract = True 181 182 183class Page(AbstractPage, index.Indexed, ClusterableModel, metaclass=PageBase): 184 title = models.CharField( 185 verbose_name=_('title'), 186 max_length=255, 187 help_text=_("The page title as you'd like it to be seen by the public") 188 ) 189 # to reflect title of a current draft in the admin UI 190 draft_title = models.CharField( 191 max_length=255, 192 editable=False 193 ) 194 slug = models.SlugField( 195 verbose_name=_('slug'), 196 allow_unicode=True, 197 max_length=255, 198 help_text=_("The name of the page as it will appear in URLs e.g http://domain.com/blog/[my-slug]/") 199 ) 200 content_type = models.ForeignKey( 201 ContentType, 202 verbose_name=_('content type'), 203 related_name='pages', 204 on_delete=models.SET(get_default_page_content_type) 205 ) 206 live = models.BooleanField(verbose_name=_('live'), default=True, editable=False) 207 has_unpublished_changes = models.BooleanField( 208 verbose_name=_('has unpublished changes'), 209 default=False, 210 editable=False 211 ) 212 url_path = models.TextField(verbose_name=_('URL path'), blank=True, editable=False) 213 owner = models.ForeignKey( 214 settings.AUTH_USER_MODEL, 215 verbose_name=_('owner'), 216 null=True, 217 blank=True, 218 editable=True, 219 on_delete=models.SET_NULL, 220 related_name='owned_pages' 221 ) 222 223 seo_title = models.CharField( 224 verbose_name=_("title tag"), 225 max_length=255, 226 blank=True, 227 help_text=_("The name of the page displayed on search engine results as the clickable headline.") 228 ) 229 230 show_in_menus_default = False 231 show_in_menus = models.BooleanField( 232 verbose_name=_('show in menus'), 233 default=False, 234 help_text=_("Whether a link to this page will appear in automatically generated menus") 235 ) 236 search_description = models.TextField( 237 verbose_name=_('meta description'), 238 blank=True, 239 help_text=_("The descriptive text displayed underneath a headline in search engine results.") 240 ) 241 242 go_live_at = models.DateTimeField( 243 verbose_name=_("go live date/time"), 244 blank=True, 245 null=True 246 ) 247 expire_at = models.DateTimeField( 248 verbose_name=_("expiry date/time"), 249 blank=True, 250 null=True 251 ) 252 expired = models.BooleanField(verbose_name=_('expired'), default=False, editable=False) 253 254 locked = models.BooleanField(verbose_name=_('locked'), default=False, editable=False) 255 locked_at = models.DateTimeField(verbose_name=_('locked at'), null=True, editable=False) 256 locked_by = models.ForeignKey( 257 settings.AUTH_USER_MODEL, 258 verbose_name=_('locked by'), 259 null=True, 260 blank=True, 261 editable=False, 262 on_delete=models.SET_NULL, 263 related_name='locked_pages' 264 ) 265 266 first_published_at = models.DateTimeField( 267 verbose_name=_('first published at'), 268 blank=True, 269 null=True, 270 db_index=True 271 ) 272 last_published_at = models.DateTimeField( 273 verbose_name=_('last published at'), 274 null=True, 275 editable=False 276 ) 277 latest_revision_created_at = models.DateTimeField( 278 verbose_name=_('latest revision created at'), 279 null=True, 280 editable=False 281 ) 282 live_revision = models.ForeignKey( 283 'PageRevision', 284 related_name='+', 285 verbose_name=_('live revision'), 286 on_delete=models.SET_NULL, 287 null=True, 288 blank=True, 289 editable=False 290 ) 291 292 # If non-null, this page is an alias of the linked page 293 # This means the page is kept in sync with the live version 294 # of the linked pages and is not editable by users. 295 alias_of = models.ForeignKey( 296 'self', 297 on_delete=models.SET_NULL, 298 null=True, 299 blank=True, 300 editable=False, 301 related_name='aliases', 302 ) 303 304 search_fields = [ 305 index.SearchField('title', partial_match=True, boost=2), 306 index.AutocompleteField('title'), 307 index.FilterField('title'), 308 index.FilterField('id'), 309 index.FilterField('live'), 310 index.FilterField('owner'), 311 index.FilterField('content_type'), 312 index.FilterField('path'), 313 index.FilterField('depth'), 314 index.FilterField('locked'), 315 index.FilterField('show_in_menus'), 316 index.FilterField('first_published_at'), 317 index.FilterField('last_published_at'), 318 index.FilterField('latest_revision_created_at'), 319 index.FilterField('locale'), 320 index.FilterField('translation_key'), 321 ] 322 323 # Do not allow plain Page instances to be created through the Wagtail admin 324 is_creatable = False 325 326 # Define the maximum number of instances this page type can have. Default to unlimited. 327 max_count = None 328 329 # Define the maximum number of instances this page can have under a specific parent. Default to unlimited. 330 max_count_per_parent = None 331 332 # An array of additional field names that will not be included when a Page is copied. 333 exclude_fields_in_copy = [] 334 default_exclude_fields_in_copy = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries', COMMENTS_RELATION_NAME] 335 336 # Define these attributes early to avoid masking errors. (Issue #3078) 337 # The canonical definition is in wagtailadmin.edit_handlers. 338 content_panels = [] 339 promote_panels = [] 340 settings_panels = [] 341 342 def __init__(self, *args, **kwargs): 343 super().__init__(*args, **kwargs) 344 if not self.id: 345 # this model is being newly created 346 # rather than retrieved from the db; 347 if not self.content_type_id: 348 # set content type to correctly represent the model class 349 # that this was created as 350 self.content_type = ContentType.objects.get_for_model(self) 351 if 'show_in_menus' not in kwargs: 352 # if the value is not set on submit refer to the model setting 353 self.show_in_menus = self.show_in_menus_default 354 355 def __str__(self): 356 return self.title 357 358 @classmethod 359 def get_streamfield_names(cls): 360 return get_streamfield_names(cls) 361 362 def set_url_path(self, parent): 363 """ 364 Populate the url_path field based on this page's slug and the specified parent page. 365 (We pass a parent in here, rather than retrieving it via get_parent, so that we can give 366 new unsaved pages a meaningful URL when previewing them; at that point the page has not 367 been assigned a position in the tree, as far as treebeard is concerned. 368 """ 369 if parent: 370 self.url_path = parent.url_path + self.slug + '/' 371 else: 372 # a page without a parent is the tree root, which always has a url_path of '/' 373 self.url_path = '/' 374 375 return self.url_path 376 377 @staticmethod 378 def _slug_is_available(slug, parent_page, page=None): 379 """ 380 Determine whether the given slug is available for use on a child page of 381 parent_page. If 'page' is passed, the slug is intended for use on that page 382 (and so it will be excluded from the duplicate check). 383 """ 384 if parent_page is None: 385 # the root page's slug can be whatever it likes... 386 return True 387 388 siblings = parent_page.get_children() 389 if page: 390 siblings = siblings.not_page(page) 391 392 return not siblings.filter(slug=slug).exists() 393 394 def _get_autogenerated_slug(self, base_slug): 395 candidate_slug = base_slug 396 suffix = 1 397 parent_page = self.get_parent() 398 399 while not Page._slug_is_available(candidate_slug, parent_page, self): 400 # try with incrementing suffix until we find a slug which is available 401 suffix += 1 402 candidate_slug = "%s-%d" % (base_slug, suffix) 403 404 return candidate_slug 405 406 def get_default_locale(self): 407 """ 408 Finds the default locale to use for this page. 409 410 This will be called just before the initial save. 411 """ 412 parent = self.get_parent() 413 if parent is not None: 414 return ( 415 parent.specific_class.objects.defer().select_related("locale") 416 .get(id=parent.id) 417 .locale 418 ) 419 420 return super().get_default_locale() 421 422 def full_clean(self, *args, **kwargs): 423 # Apply fixups that need to happen before per-field validation occurs 424 425 if not self.slug: 426 # Try to auto-populate slug from title 427 allow_unicode = getattr(settings, 'WAGTAIL_ALLOW_UNICODE_SLUGS', True) 428 base_slug = slugify(self.title, allow_unicode=allow_unicode) 429 430 # only proceed if we get a non-empty base slug back from slugify 431 if base_slug: 432 self.slug = self._get_autogenerated_slug(base_slug) 433 434 if not self.draft_title: 435 self.draft_title = self.title 436 437 # Set the locale 438 if self.locale_id is None: 439 self.locale = self.get_default_locale() 440 441 super().full_clean(*args, **kwargs) 442 443 def clean(self): 444 super().clean() 445 if not Page._slug_is_available(self.slug, self.get_parent(), self): 446 raise ValidationError({'slug': _("This slug is already in use")}) 447 448 def is_site_root(self): 449 """ 450 Returns True if this page is the root of any site. 451 452 This includes translations of site root pages as well. 453 """ 454 # `_is_site_root` may be populated by `annotate_site_root_state` on `PageQuerySet` as a 455 # performance optimisation 456 if hasattr(self, "_is_site_root"): 457 return self._is_site_root 458 459 return Site.objects.filter( 460 root_page__translation_key=self.translation_key 461 ).exists() 462 463 @transaction.atomic 464 # ensure that changes are only committed when we have updated all descendant URL paths, to preserve consistency 465 def save(self, clean=True, user=None, log_action=False, **kwargs): 466 """ 467 Overrides default method behaviour to make additional updates unique to pages, 468 such as updating the ``url_path`` value of descendant page to reflect changes 469 to this page's slug. 470 471 New pages should generally be saved via the ``add_child()`` or ``add_sibling()`` 472 method of an existing page, which will correctly set the ``path`` and ``depth`` 473 fields on the new page before saving it. 474 475 By default, pages are validated using ``full_clean()`` before attempting to 476 save changes to the database, which helps to preserve validity when restoring 477 pages from historic revisions (which might not necessarily reflect the current 478 model state). This validation step can be bypassed by calling the method with 479 ``clean=False``. 480 """ 481 if clean: 482 self.full_clean() 483 484 update_descendant_url_paths = False 485 is_new = self.id is None 486 487 if is_new: 488 # we are creating a record. If we're doing things properly, this should happen 489 # through a treebeard method like add_child, in which case the 'path' field 490 # has been set and so we can safely call get_parent 491 self.set_url_path(self.get_parent()) 492 else: 493 # Check that we are committing the slug to the database 494 # Basically: If update_fields has been specified, and slug is not included, skip this step 495 if not ('update_fields' in kwargs and 'slug' not in kwargs['update_fields']): 496 # see if the slug has changed from the record in the db, in which case we need to 497 # update url_path of self and all descendants 498 old_record = Page.objects.get(id=self.id) 499 if old_record.slug != self.slug: 500 self.set_url_path(self.get_parent()) 501 update_descendant_url_paths = True 502 old_url_path = old_record.url_path 503 new_url_path = self.url_path 504 505 result = super().save(**kwargs) 506 507 if not is_new and update_descendant_url_paths: 508 self._update_descendant_url_paths(old_url_path, new_url_path) 509 510 # Check if this is a root page of any sites and clear the 'wagtail_site_root_paths' key if so 511 # Note: New translations of existing site roots are considered site roots as well, so we must 512 # always check if this page is a site root, even if it's new. 513 if self.is_site_root(): 514 cache.delete('wagtail_site_root_paths') 515 516 # Log 517 if is_new: 518 cls = type(self) 519 logger.info( 520 "Page created: \"%s\" id=%d content_type=%s.%s path=%s", 521 self.title, 522 self.id, 523 cls._meta.app_label, 524 cls.__name__, 525 self.url_path 526 ) 527 528 if log_action is not None: 529 # The default for log_action is False. i.e. don't log unless specifically instructed 530 # Page creation is a special case that we want logged by default, but allow skipping it 531 # explicitly by passing log_action=None 532 if is_new: 533 PageLogEntry.objects.log_action( 534 instance=self, 535 action='wagtail.create', 536 user=user or self.owner, 537 content_changed=True, 538 ) 539 elif log_action: 540 PageLogEntry.objects.log_action( 541 instance=self, 542 action=log_action, 543 user=user 544 ) 545 546 return result 547 548 def delete(self, *args, **kwargs): 549 # Ensure that deletion always happens on an instance of Page, not a specific subclass. This 550 # works around a bug in treebeard <= 3.0 where calling SpecificPage.delete() fails to delete 551 # child pages that are not instances of SpecificPage 552 if type(self) is Page: 553 user = kwargs.pop('user', None) 554 555 def log_deletion(page, user): 556 PageLogEntry.objects.log_action( 557 instance=page, 558 action='wagtail.delete', 559 user=user, 560 deleted=True, 561 ) 562 if self.get_children().exists(): 563 for child in self.get_children(): 564 log_deletion(child.specific, user) 565 log_deletion(self.specific, user) 566 567 # this is a Page instance, so carry on as we were 568 return super().delete(*args, **kwargs) 569 else: 570 # retrieve an actual Page instance and delete that instead of self 571 return Page.objects.get(id=self.id).delete(*args, **kwargs) 572 573 @classmethod 574 def check(cls, **kwargs): 575 errors = super(Page, cls).check(**kwargs) 576 577 # Check that foreign keys from pages are not configured to cascade 578 # This is the default Django behaviour which must be explicitly overridden 579 # to prevent pages disappearing unexpectedly and the tree being corrupted 580 581 # get names of foreign keys pointing to parent classes (such as page_ptr) 582 field_exceptions = [field.name 583 for model in [cls] + list(cls._meta.get_parent_list()) 584 for field in model._meta.parents.values() if field] 585 586 for field in cls._meta.fields: 587 if isinstance(field, models.ForeignKey) and field.name not in field_exceptions: 588 if field.remote_field.on_delete == models.CASCADE: 589 errors.append( 590 checks.Warning( 591 "Field hasn't specified on_delete action", 592 hint="Set on_delete=models.SET_NULL and make sure the field is nullable or set on_delete=models.PROTECT. Wagtail does not allow simple database CASCADE because it will corrupt its tree storage.", 593 obj=field, 594 id='wagtailcore.W001', 595 ) 596 ) 597 598 if not isinstance(cls.objects, PageManager): 599 errors.append( 600 checks.Error( 601 "Manager does not inherit from PageManager", 602 hint="Ensure that custom Page managers inherit from wagtail.core.models.PageManager", 603 obj=cls, 604 id='wagtailcore.E002', 605 ) 606 ) 607 608 try: 609 cls.clean_subpage_models() 610 except (ValueError, LookupError) as e: 611 errors.append( 612 checks.Error( 613 "Invalid subpage_types setting for %s" % cls, 614 hint=str(e), 615 id='wagtailcore.E002' 616 ) 617 ) 618 619 try: 620 cls.clean_parent_page_models() 621 except (ValueError, LookupError) as e: 622 errors.append( 623 checks.Error( 624 "Invalid parent_page_types setting for %s" % cls, 625 hint=str(e), 626 id='wagtailcore.E002' 627 ) 628 ) 629 630 return errors 631 632 def _update_descendant_url_paths(self, old_url_path, new_url_path): 633 ( 634 Page.objects 635 .filter(path__startswith=self.path) 636 .exclude(pk=self.pk) 637 .update( 638 url_path=Concat( 639 Value(new_url_path), 640 Substr('url_path', len(old_url_path) + 1) 641 ) 642 ) 643 ) 644 645 def get_specific(self, deferred=False, copy_attrs=None, copy_attrs_exclude=None): 646 """ 647 .. versionadded:: 2.12 648 649 Return this page in its most specific subclassed form. 650 651 .. versionchanged:: 2.13 652 * When ``copy_attrs`` is not supplied, all known non-field attribute 653 values are copied to the returned object. Previously, no non-field 654 values would be copied. 655 * The ``copy_attrs_exclude`` option was added. 656 657 By default, a database query is made to fetch all field values for the 658 specific object. If you only require access to custom methods or other 659 non-field attributes on the specific object, you can use 660 ``deferred=True`` to avoid this query. However, any attempts to access 661 specific field values from the returned object will trigger additional 662 database queries. 663 664 By default, references to all non-field attribute values are copied 665 from current object to the returned one. This includes: 666 667 * Values set by a queryset, for example: annotations, or values set as 668 a result of using ``select_related()`` or ``prefetch_related()``. 669 * Any ``cached_property`` values that have been evaluated. 670 * Attributes set elsewhere in Python code. 671 672 For fine-grained control over which non-field values are copied to the 673 returned object, you can use ``copy_attrs`` to specify a complete list 674 of attribute names to include. Alternatively, you can use 675 ``copy_attrs_exclude`` to specify a list of attribute names to exclude. 676 677 If called on a page object that is already an instance of the most 678 specific class (e.g. an ``EventPage``), the object will be returned 679 as is, and no database queries or other operations will be triggered. 680 681 If the page was originally created using a page type that has since 682 been removed from the codebase, a generic ``Page`` object will be 683 returned (without any custom field values or other functionality 684 present on the original class). Usually, deleting these pages is the 685 best course of action, but there is currently no safe way for Wagtail 686 to do that at migration time. 687 """ 688 model_class = self.specific_class 689 690 if model_class is None: 691 # The codebase and database are out of sync (e.g. the model exists 692 # on a different git branch and migrations were not applied or 693 # reverted before switching branches). So, the best we can do is 694 # return the page in it's current form. 695 return self 696 697 if isinstance(self, model_class): 698 # self is already the an instance of the most specific class 699 return self 700 701 if deferred: 702 # Generate a tuple of values in the order expected by __init__(), 703 # with missing values substituted with DEFERRED () 704 values = tuple( 705 getattr(self, f.attname, self.pk if f.primary_key else DEFERRED) 706 for f in model_class._meta.concrete_fields 707 ) 708 # Create object from known attribute values 709 specific_obj = model_class(*values) 710 specific_obj._state.adding = self._state.adding 711 else: 712 # Fetch object from database 713 specific_obj = model_class._default_manager.get(id=self.id) 714 715 # Copy non-field attribute values 716 if copy_attrs is not None: 717 for attr in (attr for attr in copy_attrs if attr in self.__dict__): 718 setattr(specific_obj, attr, getattr(self, attr)) 719 else: 720 exclude = copy_attrs_exclude or () 721 for k, v in ( 722 (k, v) for k, v in self.__dict__.items() 723 if k not in exclude 724 ): 725 # only set values that haven't already been set 726 specific_obj.__dict__.setdefault(k, v) 727 728 return specific_obj 729 730 @cached_property 731 def specific(self): 732 """ 733 Returns this page in its most specific subclassed form with all field 734 values fetched from the database. The result is cached in memory. 735 """ 736 return self.get_specific() 737 738 @cached_property 739 def specific_deferred(self): 740 """ 741 .. versionadded:: 2.12 742 743 Returns this page in its most specific subclassed form without any 744 additional field values being fetched from the database. The result 745 is cached in memory. 746 """ 747 return self.get_specific(deferred=True) 748 749 @cached_property 750 def specific_class(self): 751 """ 752 Return the class that this page would be if instantiated in its 753 most specific form. 754 755 If the model class can no longer be found in the codebase, and the 756 relevant ``ContentType`` has been removed by a database migration, 757 the return value will be ``None``. 758 759 If the model class can no longer be found in the codebase, but the 760 relevant ``ContentType`` is still present in the database (usually a 761 result of switching between git branches without running or reverting 762 database migrations beforehand), the return value will be ``None``. 763 """ 764 return self.cached_content_type.model_class() 765 766 @property 767 def cached_content_type(self): 768 """ 769 .. versionadded:: 2.10 770 771 Return this page's ``content_type`` value from the ``ContentType`` 772 model's cached manager, which will avoid a database query if the 773 object is already in memory. 774 """ 775 return ContentType.objects.get_for_id(self.content_type_id) 776 777 @property 778 def localized_draft(self): 779 """ 780 Finds the translation in the current active language. 781 782 If there is no translation in the active language, self is returned. 783 784 Note: This will return translations that are in draft. If you want to exclude 785 these, use the ``.localized`` attribute. 786 """ 787 try: 788 locale = Locale.get_active() 789 except (LookupError, Locale.DoesNotExist): 790 return self 791 792 if locale.id == self.locale_id: 793 return self 794 795 return self.get_translation_or_none(locale) or self 796 797 @property 798 def localized(self): 799 """ 800 Finds the translation in the current active language. 801 802 If there is no translation in the active language, self is returned. 803 804 Note: This will not return the translation if it is in draft. 805 If you want to include drafts, use the ``.localized_draft`` attribute instead. 806 """ 807 localized = self.localized_draft 808 if not localized.live: 809 return self 810 811 return localized 812 813 def route(self, request, path_components): 814 if path_components: 815 # request is for a child of this page 816 child_slug = path_components[0] 817 remaining_components = path_components[1:] 818 819 try: 820 subpage = self.get_children().get(slug=child_slug) 821 except Page.DoesNotExist: 822 raise Http404 823 824 return subpage.specific.route(request, remaining_components) 825 826 else: 827 # request is for this very page 828 if self.live: 829 return RouteResult(self) 830 else: 831 raise Http404 832 833 def get_admin_display_title(self): 834 """ 835 Return the title for this page as it should appear in the admin backend; 836 override this if you wish to display extra contextual information about the page, 837 such as language. By default, returns ``draft_title``. 838 """ 839 # Fall back on title if draft_title is blank (which may happen if the page was created 840 # in a fixture or migration that didn't explicitly handle draft_title) 841 return self.draft_title or self.title 842 843 def save_revision(self, user=None, submitted_for_moderation=False, approved_go_live_at=None, changed=True, 844 log_action=False, previous_revision=None, clean=True): 845 """ 846 Creates and saves a page revision. 847 :param user: the user performing the action 848 :param submitted_for_moderation: indicates whether the page was submitted for moderation 849 :param approved_go_live_at: the date and time the revision is approved to go live 850 :param changed: indicates whether there were any content changes 851 :param log_action: flag for logging the action. Pass False to skip logging. Can be passed an action string. 852 Defaults to 'wagtail.edit' when no 'previous_revision' param is passed, otherwise 'wagtail.revert' 853 :param previous_revision: indicates a revision reversal. Should be set to the previous revision instance 854 :param clean: Set this to False to skip cleaning page content before saving this revision 855 :return: the newly created revision 856 """ 857 # Raise an error if this page is an alias. 858 if self.alias_of_id: 859 raise RuntimeError( 860 "save_revision() was called on an alias page. " 861 "Revisions are not required for alias pages as they are an exact copy of another page." 862 ) 863 864 if clean: 865 self.full_clean() 866 867 new_comments = getattr(self, COMMENTS_RELATION_NAME).filter(pk__isnull=True) 868 for comment in new_comments: 869 # We need to ensure comments have an id in the revision, so positions can be identified correctly 870 comment.save() 871 872 # Create revision 873 revision = self.revisions.create( 874 content_json=self.to_json(), 875 user=user, 876 submitted_for_moderation=submitted_for_moderation, 877 approved_go_live_at=approved_go_live_at, 878 ) 879 880 for comment in new_comments: 881 comment.revision_created = revision 882 883 update_fields = [COMMENTS_RELATION_NAME] 884 885 self.latest_revision_created_at = revision.created_at 886 update_fields.append('latest_revision_created_at') 887 888 self.draft_title = self.title 889 update_fields.append('draft_title') 890 891 if changed: 892 self.has_unpublished_changes = True 893 update_fields.append('has_unpublished_changes') 894 895 if update_fields: 896 # clean=False because the fields we're updating don't need validation 897 self.save(update_fields=update_fields, clean=False) 898 899 # Log 900 logger.info("Page edited: \"%s\" id=%d revision_id=%d", self.title, self.id, revision.id) 901 if log_action: 902 if not previous_revision: 903 PageLogEntry.objects.log_action( 904 instance=self, 905 action=log_action if isinstance(log_action, str) else 'wagtail.edit', 906 user=user, 907 revision=revision, 908 content_changed=changed, 909 ) 910 else: 911 PageLogEntry.objects.log_action( 912 instance=self, 913 action=log_action if isinstance(log_action, str) else 'wagtail.revert', 914 user=user, 915 data={ 916 'revision': { 917 'id': previous_revision.id, 918 'created': previous_revision.created_at.strftime("%d %b %Y %H:%M") 919 } 920 }, 921 revision=revision, 922 content_changed=changed, 923 ) 924 925 if submitted_for_moderation: 926 logger.info("Page submitted for moderation: \"%s\" id=%d revision_id=%d", self.title, self.id, revision.id) 927 928 return revision 929 930 def get_latest_revision(self): 931 return self.revisions.order_by('-created_at', '-id').first() 932 933 def get_latest_revision_as_page(self): 934 if not self.has_unpublished_changes: 935 # Use the live database copy in preference to the revision record, as: 936 # 1) this will pick up any changes that have been made directly to the model, 937 # such as automated data imports; 938 # 2) it ensures that inline child objects pick up real database IDs even if 939 # those are absent from the revision data. (If this wasn't the case, the child 940 # objects would be recreated with new IDs on next publish - see #1853) 941 return self.specific 942 943 latest_revision = self.get_latest_revision() 944 945 if latest_revision: 946 return latest_revision.as_page_object() 947 else: 948 return self.specific 949 950 def update_aliases(self, *, revision=None, user=None, _content_json=None, _updated_ids=None): 951 """ 952 Publishes all aliases that follow this page with the latest content from this page. 953 954 This is called by Wagtail whenever a page with aliases is published. 955 956 :param revision: The revision of the original page that we are updating to (used for logging purposes) 957 :type revision: PageRevision, optional 958 :param user: The user who is publishing (used for logging purposes) 959 :type user: User, optional 960 """ 961 specific_self = self.specific 962 963 # Only compute this if necessary since it's quite a heavy operation 964 if _content_json is None: 965 _content_json = self.to_json() 966 967 # A list of IDs that have already been updated. This is just in case someone has 968 # created an alias loop (which is impossible to do with the UI Wagtail provides) 969 _updated_ids = _updated_ids or [] 970 971 for alias in self.specific_class.objects.filter(alias_of=self).exclude(id__in=_updated_ids): 972 # FIXME: Switch to the same fields that are excluded from copy 973 # We can't do this right now because we can't exclude fields from with_content_json 974 exclude_fields = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries'] 975 976 # Copy field content 977 alias_updated = alias.with_content_json(_content_json) 978 979 # Publish the alias if it's currently in draft 980 alias_updated.live = True 981 alias_updated.has_unpublished_changes = False 982 983 # Copy child relations 984 child_object_map = specific_self.copy_all_child_relations(target=alias_updated, exclude=exclude_fields) 985 986 # Process child objects 987 # This has two jobs: 988 # - If the alias is in a different locale, this updates the 989 # locale of any translatable child objects to match 990 # - If the alias is not a translation of the original, this 991 # changes the translation_key field of all child objects 992 # so they do not clash 993 if child_object_map: 994 alias_is_translation = alias.translation_key == self.translation_key 995 996 def process_child_object(child_object): 997 if isinstance(child_object, TranslatableMixin): 998 # Child object's locale must always match the page 999 child_object.locale = alias_updated.locale 1000 1001 # If the alias isn't a translation of the original page, 1002 # change the child object's translation_keys so they are 1003 # not either 1004 if not alias_is_translation: 1005 child_object.translation_key = uuid.uuid4() 1006 1007 for (rel, previous_id), child_objects in child_object_map.items(): 1008 if previous_id is None: 1009 for child_object in child_objects: 1010 process_child_object(child_object) 1011 else: 1012 process_child_object(child_objects) 1013 1014 # Copy M2M relations 1015 _copy_m2m_relations(specific_self, alias_updated, exclude_fields=exclude_fields) 1016 1017 # Don't change the aliases slug 1018 # Aliases can have their own slugs so they can be siblings of the original 1019 alias_updated.slug = alias.slug 1020 alias_updated.set_url_path(alias_updated.get_parent()) 1021 1022 # Aliases don't have revisions, so update fields that would normally be updated by save_revision 1023 alias_updated.draft_title = alias_updated.title 1024 alias_updated.latest_revision_created_at = self.latest_revision_created_at 1025 1026 alias_updated.save(clean=False) 1027 1028 page_published.send(sender=alias_updated.specific_class, instance=alias_updated, revision=revision, alias=True) 1029 1030 # Log the publish of the alias 1031 PageLogEntry.objects.log_action( 1032 instance=alias_updated, 1033 action='wagtail.publish', 1034 user=user, 1035 ) 1036 1037 # Update any aliases of that alias 1038 1039 # Design note: 1040 # It could be argued that this will be faster if we just changed these alias-of-alias 1041 # pages to all point to the original page and avoid having to update them recursively. 1042 # 1043 # But, it's useful to have a record of how aliases have been chained. 1044 # For example, In Wagtail Localize, we use aliases to create mirrored trees, but those 1045 # trees themselves could have aliases within them. If an alias within a tree is 1046 # converted to a regular page, we want the alias in the mirrored tree to follow that 1047 # new page and stop receiving updates from the original page. 1048 # 1049 # Doing it this way requires an extra lookup query per alias but this is small in 1050 # comparison to the work required to update the alias. 1051 1052 alias.update_aliases(revision=revision, _content_json=_content_json, _updated_ids=_updated_ids) 1053 1054 update_aliases.alters_data = True 1055 1056 def unpublish(self, set_expired=False, commit=True, user=None, log_action=True): 1057 """ 1058 Unpublish the page by setting ``live`` to ``False``. Does nothing if ``live`` is already ``False`` 1059 :param log_action: flag for logging the action. Pass False to skip logging. Can be passed an action string. 1060 Defaults to 'wagtail.unpublish' 1061 """ 1062 if self.live: 1063 self.live = False 1064 self.has_unpublished_changes = True 1065 self.live_revision = None 1066 1067 if set_expired: 1068 self.expired = True 1069 1070 if commit: 1071 # using clean=False to bypass validation 1072 self.save(clean=False) 1073 1074 page_unpublished.send(sender=self.specific_class, instance=self.specific) 1075 1076 if log_action: 1077 PageLogEntry.objects.log_action( 1078 instance=self, 1079 action=log_action if isinstance(log_action, str) else 'wagtail.unpublish', 1080 user=user, 1081 ) 1082 1083 logger.info("Page unpublished: \"%s\" id=%d", self.title, self.id) 1084 1085 self.revisions.update(approved_go_live_at=None) 1086 1087 # Unpublish aliases 1088 for alias in self.aliases.all(): 1089 alias.unpublish() 1090 1091 context_object_name = None 1092 1093 def get_context(self, request, *args, **kwargs): 1094 context = { 1095 PAGE_TEMPLATE_VAR: self, 1096 'self': self, 1097 'request': request, 1098 } 1099 1100 if self.context_object_name: 1101 context[self.context_object_name] = self 1102 1103 return context 1104 1105 def get_template(self, request, *args, **kwargs): 1106 if request.is_ajax(): 1107 return self.ajax_template or self.template 1108 else: 1109 return self.template 1110 1111 def serve(self, request, *args, **kwargs): 1112 request.is_preview = getattr(request, 'is_preview', False) 1113 1114 return TemplateResponse( 1115 request, 1116 self.get_template(request, *args, **kwargs), 1117 self.get_context(request, *args, **kwargs) 1118 ) 1119 1120 def is_navigable(self): 1121 """ 1122 Return true if it's meaningful to browse subpages of this page - 1123 i.e. it currently has subpages, 1124 or it's at the top level (this rule necessary for empty out-of-the-box sites to have working navigation) 1125 """ 1126 return (not self.is_leaf()) or self.depth == 2 1127 1128 def _get_site_root_paths(self, request=None): 1129 """ 1130 Return ``Site.get_site_root_paths()``, using the cached copy on the 1131 request object if available. 1132 """ 1133 # if we have a request, use that to cache site_root_paths; otherwise, use self 1134 cache_object = request if request else self 1135 try: 1136 return cache_object._wagtail_cached_site_root_paths 1137 except AttributeError: 1138 cache_object._wagtail_cached_site_root_paths = Site.get_site_root_paths() 1139 return cache_object._wagtail_cached_site_root_paths 1140 1141 def get_url_parts(self, request=None): 1142 """ 1143 Determine the URL for this page and return it as a tuple of 1144 ``(site_id, site_root_url, page_url_relative_to_site_root)``. 1145 Return None if the page is not routable. 1146 1147 This is used internally by the ``full_url``, ``url``, ``relative_url`` 1148 and ``get_site`` properties and methods; pages with custom URL routing 1149 should override this method in order to have those operations return 1150 the custom URLs. 1151 1152 Accepts an optional keyword argument ``request``, which may be used 1153 to avoid repeated database / cache lookups. Typically, a page model 1154 that overrides ``get_url_parts`` should not need to deal with 1155 ``request`` directly, and should just pass it to the original method 1156 when calling ``super``. 1157 """ 1158 1159 possible_sites = [ 1160 (pk, path, url, language_code) 1161 for pk, path, url, language_code in self._get_site_root_paths(request) 1162 if self.url_path.startswith(path) 1163 ] 1164 1165 if not possible_sites: 1166 return None 1167 1168 site_id, root_path, root_url, language_code = possible_sites[0] 1169 1170 site = Site.find_for_request(request) 1171 if site: 1172 for site_id, root_path, root_url, language_code in possible_sites: 1173 if site_id == site.pk: 1174 break 1175 else: 1176 site_id, root_path, root_url, language_code = possible_sites[0] 1177 1178 use_wagtail_i18n = getattr(settings, 'WAGTAIL_I18N_ENABLED', False) 1179 1180 if use_wagtail_i18n: 1181 # If the active language code is a variant of the page's language, then 1182 # use that instead 1183 # This is used when LANGUAGES contain more languages than WAGTAIL_CONTENT_LANGUAGES 1184 try: 1185 if get_supported_content_language_variant(translation.get_language()) == language_code: 1186 language_code = translation.get_language() 1187 except LookupError: 1188 # active language code is not a recognised content language, so leave 1189 # page's language code unchanged 1190 pass 1191 1192 # The page may not be routable because wagtail_serve is not registered 1193 # This may be the case if Wagtail is used headless 1194 try: 1195 if use_wagtail_i18n: 1196 with translation.override(language_code): 1197 page_path = reverse( 1198 'wagtail_serve', args=(self.url_path[len(root_path):],)) 1199 else: 1200 page_path = reverse( 1201 'wagtail_serve', args=(self.url_path[len(root_path):],)) 1202 except NoReverseMatch: 1203 return (site_id, None, None) 1204 1205 # Remove the trailing slash from the URL reverse generates if 1206 # WAGTAIL_APPEND_SLASH is False and we're not trying to serve 1207 # the root path 1208 if not WAGTAIL_APPEND_SLASH and page_path != '/': 1209 page_path = page_path.rstrip('/') 1210 1211 return (site_id, root_url, page_path) 1212 1213 def get_full_url(self, request=None): 1214 """Return the full URL (including protocol / domain) to this page, or None if it is not routable""" 1215 url_parts = self.get_url_parts(request=request) 1216 1217 if url_parts is None or url_parts[1] is None and url_parts[2] is None: 1218 # page is not routable 1219 return 1220 1221 site_id, root_url, page_path = url_parts 1222 1223 return root_url + page_path 1224 1225 full_url = property(get_full_url) 1226 1227 def get_url(self, request=None, current_site=None): 1228 """ 1229 Return the 'most appropriate' URL for referring to this page from the pages we serve, 1230 within the Wagtail backend and actual website templates; 1231 this is the local URL (starting with '/') if we're only running a single site 1232 (i.e. we know that whatever the current page is being served from, this link will be on the 1233 same domain), and the full URL (with domain) if not. 1234 Return None if the page is not routable. 1235 1236 Accepts an optional but recommended ``request`` keyword argument that, if provided, will 1237 be used to cache site-level URL information (thereby avoiding repeated database / cache 1238 lookups) and, via the ``Site.find_for_request()`` function, determine whether a relative 1239 or full URL is most appropriate. 1240 """ 1241 # ``current_site`` is purposefully undocumented, as one can simply pass the request and get 1242 # a relative URL based on ``Site.find_for_request()``. Nonetheless, support it here to avoid 1243 # copy/pasting the code to the ``relative_url`` method below. 1244 if current_site is None and request is not None: 1245 site = Site.find_for_request(request) 1246 current_site = site 1247 url_parts = self.get_url_parts(request=request) 1248 1249 if url_parts is None or url_parts[1] is None and url_parts[2] is None: 1250 # page is not routable 1251 return 1252 1253 site_id, root_url, page_path = url_parts 1254 1255 # Get number of unique sites in root paths 1256 # Note: there may be more root paths to sites if there are multiple languages 1257 num_sites = len(set(root_path[0] for root_path in self._get_site_root_paths(request))) 1258 1259 if (current_site is not None and site_id == current_site.id) or num_sites == 1: 1260 # the site matches OR we're only running a single site, so a local URL is sufficient 1261 return page_path 1262 else: 1263 return root_url + page_path 1264 1265 url = property(get_url) 1266 1267 def relative_url(self, current_site, request=None): 1268 """ 1269 Return the 'most appropriate' URL for this page taking into account the site we're currently on; 1270 a local URL if the site matches, or a fully qualified one otherwise. 1271 Return None if the page is not routable. 1272 1273 Accepts an optional but recommended ``request`` keyword argument that, if provided, will 1274 be used to cache site-level URL information (thereby avoiding repeated database / cache 1275 lookups). 1276 """ 1277 return self.get_url(request=request, current_site=current_site) 1278 1279 def get_site(self): 1280 """ 1281 Return the Site object that this page belongs to. 1282 """ 1283 1284 url_parts = self.get_url_parts() 1285 1286 if url_parts is None: 1287 # page is not routable 1288 return 1289 1290 site_id, root_url, page_path = url_parts 1291 1292 return Site.objects.get(id=site_id) 1293 1294 @classmethod 1295 def get_indexed_objects(cls): 1296 content_type = ContentType.objects.get_for_model(cls) 1297 return super(Page, cls).get_indexed_objects().filter(content_type=content_type) 1298 1299 def get_indexed_instance(self): 1300 # This is accessed on save by the wagtailsearch signal handler, and in edge 1301 # cases (e.g. loading test fixtures), may be called before the specific instance's 1302 # entry has been created. In those cases, we aren't ready to be indexed yet, so 1303 # return None. 1304 try: 1305 return self.specific 1306 except self.specific_class.DoesNotExist: 1307 return None 1308 1309 @classmethod 1310 def clean_subpage_models(cls): 1311 """ 1312 Returns the list of subpage types, normalised as model classes. 1313 Throws ValueError if any entry in subpage_types cannot be recognised as a model name, 1314 or LookupError if a model does not exist (or is not a Page subclass). 1315 """ 1316 if cls._clean_subpage_models is None: 1317 subpage_types = getattr(cls, 'subpage_types', None) 1318 if subpage_types is None: 1319 # if subpage_types is not specified on the Page class, allow all page types as subpages 1320 cls._clean_subpage_models = get_page_models() 1321 else: 1322 cls._clean_subpage_models = [ 1323 resolve_model_string(model_string, cls._meta.app_label) 1324 for model_string in subpage_types 1325 ] 1326 1327 for model in cls._clean_subpage_models: 1328 if not issubclass(model, Page): 1329 raise LookupError("%s is not a Page subclass" % model) 1330 1331 return cls._clean_subpage_models 1332 1333 @classmethod 1334 def clean_parent_page_models(cls): 1335 """ 1336 Returns the list of parent page types, normalised as model classes. 1337 Throws ValueError if any entry in parent_page_types cannot be recognised as a model name, 1338 or LookupError if a model does not exist (or is not a Page subclass). 1339 """ 1340 1341 if cls._clean_parent_page_models is None: 1342 parent_page_types = getattr(cls, 'parent_page_types', None) 1343 if parent_page_types is None: 1344 # if parent_page_types is not specified on the Page class, allow all page types as subpages 1345 cls._clean_parent_page_models = get_page_models() 1346 else: 1347 cls._clean_parent_page_models = [ 1348 resolve_model_string(model_string, cls._meta.app_label) 1349 for model_string in parent_page_types 1350 ] 1351 1352 for model in cls._clean_parent_page_models: 1353 if not issubclass(model, Page): 1354 raise LookupError("%s is not a Page subclass" % model) 1355 1356 return cls._clean_parent_page_models 1357 1358 @classmethod 1359 def allowed_parent_page_models(cls): 1360 """ 1361 Returns the list of page types that this page type can be a subpage of, 1362 as a list of model classes 1363 """ 1364 return [ 1365 parent_model for parent_model in cls.clean_parent_page_models() 1366 if cls in parent_model.clean_subpage_models() 1367 ] 1368 1369 @classmethod 1370 def allowed_subpage_models(cls): 1371 """ 1372 Returns the list of page types that this page type can have as subpages, 1373 as a list of model classes 1374 """ 1375 return [ 1376 subpage_model for subpage_model in cls.clean_subpage_models() 1377 if cls in subpage_model.clean_parent_page_models() 1378 ] 1379 1380 @classmethod 1381 def creatable_subpage_models(cls): 1382 """ 1383 Returns the list of page types that may be created under this page type, 1384 as a list of model classes 1385 """ 1386 return [ 1387 page_model for page_model in cls.allowed_subpage_models() 1388 if page_model.is_creatable 1389 ] 1390 1391 @classmethod 1392 def can_exist_under(cls, parent): 1393 """ 1394 Checks if this page type can exist as a subpage under a parent page 1395 instance. 1396 1397 See also: :func:`Page.can_create_at` and :func:`Page.can_move_to` 1398 """ 1399 return cls in parent.specific_class.allowed_subpage_models() 1400 1401 @classmethod 1402 def can_create_at(cls, parent): 1403 """ 1404 Checks if this page type can be created as a subpage under a parent 1405 page instance. 1406 """ 1407 can_create = cls.is_creatable and cls.can_exist_under(parent) 1408 1409 if cls.max_count is not None: 1410 can_create = can_create and cls.objects.count() < cls.max_count 1411 1412 if cls.max_count_per_parent is not None: 1413 can_create = can_create and parent.get_children().type(cls).count() < cls.max_count_per_parent 1414 1415 return can_create 1416 1417 def can_move_to(self, parent): 1418 """ 1419 Checks if this page instance can be moved to be a subpage of a parent 1420 page instance. 1421 """ 1422 # Prevent pages from being moved to different language sections 1423 # The only page that can have multi-lingual children is the root page 1424 parent_is_root = parent.depth == 1 1425 if not parent_is_root and parent.locale_id != self.locale_id: 1426 return False 1427 1428 return self.can_exist_under(parent) 1429 1430 @classmethod 1431 def get_verbose_name(cls): 1432 """ 1433 Returns the human-readable "verbose name" of this page model e.g "Blog page". 1434 """ 1435 # This is similar to doing cls._meta.verbose_name.title() 1436 # except this doesn't convert any characters to lowercase 1437 return capfirst(cls._meta.verbose_name) 1438 1439 @property 1440 def status_string(self): 1441 if not self.live: 1442 if self.expired: 1443 return _("expired") 1444 elif self.approved_schedule: 1445 return _("scheduled") 1446 elif self.workflow_in_progress: 1447 return _("in moderation") 1448 else: 1449 return _("draft") 1450 else: 1451 if self.approved_schedule: 1452 return _("live + scheduled") 1453 elif self.workflow_in_progress: 1454 return _("live + in moderation") 1455 elif self.has_unpublished_changes: 1456 return _("live + draft") 1457 else: 1458 return _("live") 1459 1460 @property 1461 def approved_schedule(self): 1462 # `_approved_schedule` may be populated by `annotate_approved_schedule` on `PageQuerySet` as a 1463 # performance optimisation 1464 if hasattr(self, "_approved_schedule"): 1465 return self._approved_schedule 1466 1467 return self.revisions.exclude(approved_go_live_at__isnull=True).exists() 1468 1469 def has_unpublished_subtree(self): 1470 """ 1471 An awkwardly-defined flag used in determining whether unprivileged editors have 1472 permission to delete this article. Returns true if and only if this page is non-live, 1473 and it has no live children. 1474 """ 1475 return (not self.live) and (not self.get_descendants().filter(live=True).exists()) 1476 1477 def move(self, target, pos=None, user=None): 1478 """ 1479 Extension to the treebeard 'move' method to ensure that url_path is updated, 1480 and to emit a 'pre_page_move' and 'post_page_move' signals. 1481 """ 1482 # Determine old and new parents 1483 parent_before = self.get_parent() 1484 if pos in ('first-child', 'last-child', 'sorted-child'): 1485 parent_after = target 1486 else: 1487 parent_after = target.get_parent() 1488 1489 # Determine old and new url_paths 1490 # Fetching new object to avoid affecting `self` 1491 old_self = Page.objects.get(id=self.id) 1492 old_url_path = old_self.url_path 1493 new_url_path = old_self.set_url_path(parent=parent_after) 1494 1495 # Emit pre_page_move signal 1496 pre_page_move.send( 1497 sender=self.specific_class or self.__class__, 1498 instance=self, 1499 parent_page_before=parent_before, 1500 parent_page_after=parent_after, 1501 url_path_before=old_url_path, 1502 url_path_after=new_url_path, 1503 ) 1504 1505 # Only commit when all descendants are properly updated 1506 with transaction.atomic(): 1507 # Allow treebeard to update `path` values 1508 super().move(target, pos=pos) 1509 1510 # Treebeard's move method doesn't actually update the in-memory instance, 1511 # so we need to work with a freshly loaded one now 1512 new_self = Page.objects.get(id=self.id) 1513 new_self.url_path = new_url_path 1514 new_self.save() 1515 1516 # Update descendant paths if url_path has changed 1517 if old_url_path != new_url_path: 1518 new_self._update_descendant_url_paths(old_url_path, new_url_path) 1519 1520 # Emit post_page_move signal 1521 post_page_move.send( 1522 sender=self.specific_class or self.__class__, 1523 instance=new_self, 1524 parent_page_before=parent_before, 1525 parent_page_after=parent_after, 1526 url_path_before=old_url_path, 1527 url_path_after=new_url_path, 1528 ) 1529 1530 # Log 1531 PageLogEntry.objects.log_action( 1532 instance=self, 1533 # Check if page was reordered (reordering doesn't change the parent) 1534 action='wagtail.reorder' if parent_before.id == target.id else 'wagtail.move', 1535 user=user, 1536 data={ 1537 'source': { 1538 'id': parent_before.id, 1539 'title': parent_before.specific_deferred.get_admin_display_title() 1540 }, 1541 'destination': { 1542 'id': parent_after.id, 1543 'title': parent_after.specific_deferred.get_admin_display_title() 1544 } 1545 } 1546 ) 1547 logger.info("Page moved: \"%s\" id=%d path=%s", self.title, self.id, new_url_path) 1548 1549 def copy(self, recursive=False, to=None, update_attrs=None, copy_revisions=True, keep_live=True, user=None, 1550 process_child_object=None, exclude_fields=None, log_action='wagtail.copy', reset_translation_key=True, _mpnode_attrs=None): 1551 """ 1552 Copies a given page 1553 :param log_action flag for logging the action. Pass None to skip logging. 1554 Can be passed an action string. Defaults to 'wagtail.copy' 1555 """ 1556 1557 if self._state.adding: 1558 raise RuntimeError('Page.copy() called on an unsaved page') 1559 1560 exclude_fields = self.default_exclude_fields_in_copy + self.exclude_fields_in_copy + (exclude_fields or []) 1561 specific_self = self.specific 1562 if keep_live: 1563 base_update_attrs = { 1564 'alias_of': None, 1565 } 1566 else: 1567 base_update_attrs = { 1568 'live': False, 1569 'has_unpublished_changes': True, 1570 'live_revision': None, 1571 'first_published_at': None, 1572 'last_published_at': None, 1573 'alias_of': None, 1574 } 1575 1576 if user: 1577 base_update_attrs['owner'] = user 1578 1579 # When we're not copying for translation, we should give the translation_key a new value 1580 if reset_translation_key: 1581 base_update_attrs['translation_key'] = uuid.uuid4() 1582 1583 if update_attrs: 1584 base_update_attrs.update(update_attrs) 1585 1586 page_copy, child_object_map = _copy(specific_self, exclude_fields=exclude_fields, update_attrs=base_update_attrs) 1587 1588 # Save copied child objects and run process_child_object on them if we need to 1589 for (child_relation, old_pk), child_object in child_object_map.items(): 1590 if process_child_object: 1591 process_child_object(specific_self, page_copy, child_relation, child_object) 1592 1593 # When we're not copying for translation, we should give the translation_key a new value for each child object as well 1594 if reset_translation_key and isinstance(child_object, TranslatableMixin): 1595 child_object.translation_key = uuid.uuid4() 1596 1597 # Save the new page 1598 if _mpnode_attrs: 1599 # We've got a tree position already reserved. Perform a quick save 1600 page_copy.path = _mpnode_attrs[0] 1601 page_copy.depth = _mpnode_attrs[1] 1602 page_copy.save(clean=False) 1603 1604 else: 1605 if to: 1606 if recursive and (to == self or to.is_descendant_of(self)): 1607 raise Exception("You cannot copy a tree branch recursively into itself") 1608 page_copy = to.add_child(instance=page_copy) 1609 else: 1610 page_copy = self.add_sibling(instance=page_copy) 1611 1612 _mpnode_attrs = (page_copy.path, page_copy.depth) 1613 1614 _copy_m2m_relations(specific_self, page_copy, exclude_fields=exclude_fields, update_attrs=base_update_attrs) 1615 1616 # Copy revisions 1617 if copy_revisions: 1618 for revision in self.revisions.all(): 1619 revision.pk = None 1620 revision.submitted_for_moderation = False 1621 revision.approved_go_live_at = None 1622 revision.page = page_copy 1623 1624 # Update ID fields in content 1625 revision_content = json.loads(revision.content_json) 1626 revision_content['pk'] = page_copy.pk 1627 1628 for child_relation in get_all_child_relations(specific_self): 1629 accessor_name = child_relation.get_accessor_name() 1630 try: 1631 child_objects = revision_content[accessor_name] 1632 except KeyError: 1633 # KeyErrors are possible if the revision was created 1634 # before this child relation was added to the database 1635 continue 1636 1637 for child_object in child_objects: 1638 child_object[child_relation.field.name] = page_copy.pk 1639 1640 # Remap primary key to copied versions 1641 # If the primary key is not recognised (eg, the child object has been deleted from the database) 1642 # set the primary key to None 1643 copied_child_object = child_object_map.get((child_relation, child_object['pk'])) 1644 child_object['pk'] = copied_child_object.pk if copied_child_object else None 1645 1646 revision.content_json = json.dumps(revision_content) 1647 1648 # Save 1649 revision.save() 1650 1651 # Create a new revision 1652 # This code serves a few purposes: 1653 # * It makes sure update_attrs gets applied to the latest revision 1654 # * It bumps the last_revision_created_at value so the new page gets ordered as if it was just created 1655 # * It sets the user of the new revision so it's possible to see who copied the page by looking at its history 1656 latest_revision = page_copy.get_latest_revision_as_page() 1657 1658 if update_attrs: 1659 for field, value in update_attrs.items(): 1660 setattr(latest_revision, field, value) 1661 1662 latest_revision_as_page_revision = latest_revision.save_revision(user=user, changed=False, clean=False) 1663 if keep_live: 1664 page_copy.live_revision = latest_revision_as_page_revision 1665 page_copy.last_published_at = latest_revision_as_page_revision.created_at 1666 page_copy.first_published_at = latest_revision_as_page_revision.created_at 1667 page_copy.save(clean=False) 1668 1669 if page_copy.live: 1670 page_published.send( 1671 sender=page_copy.specific_class, instance=page_copy, 1672 revision=latest_revision_as_page_revision 1673 ) 1674 1675 # Log 1676 if log_action: 1677 parent = specific_self.get_parent() 1678 PageLogEntry.objects.log_action( 1679 instance=page_copy, 1680 action=log_action, 1681 user=user, 1682 data={ 1683 'page': { 1684 'id': page_copy.id, 1685 'title': page_copy.get_admin_display_title() 1686 }, 1687 'source': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None, 1688 'destination': {'id': to.id, 'title': to.specific_deferred.get_admin_display_title()} if to else None, 1689 'keep_live': page_copy.live and keep_live 1690 }, 1691 ) 1692 if page_copy.live and keep_live: 1693 # Log the publish if the use chose to keep the copied page live 1694 PageLogEntry.objects.log_action( 1695 instance=page_copy, 1696 action='wagtail.publish', 1697 user=user, 1698 revision=latest_revision_as_page_revision, 1699 ) 1700 logger.info("Page copied: \"%s\" id=%d from=%d", page_copy.title, page_copy.id, self.id) 1701 1702 # Copy child pages 1703 if recursive: 1704 numchild = 0 1705 1706 for child_page in self.get_children().specific(): 1707 newdepth = _mpnode_attrs[1] + 1 1708 child_mpnode_attrs = ( 1709 Page._get_path(_mpnode_attrs[0], newdepth, numchild), 1710 newdepth 1711 ) 1712 numchild += 1 1713 child_page.copy( 1714 recursive=True, 1715 to=page_copy, 1716 copy_revisions=copy_revisions, 1717 keep_live=keep_live, 1718 user=user, 1719 process_child_object=process_child_object, 1720 _mpnode_attrs=child_mpnode_attrs 1721 ) 1722 1723 if numchild > 0: 1724 page_copy.numchild = numchild 1725 page_copy.save(clean=False, update_fields=['numchild']) 1726 1727 return page_copy 1728 1729 copy.alters_data = True 1730 1731 def create_alias(self, *, recursive=False, parent=None, update_slug=None, update_locale=None, user=None, log_action='wagtail.create_alias', reset_translation_key=True, _mpnode_attrs=None): 1732 """ 1733 Creates an alias of the given page. 1734 1735 An alias is like a copy, but an alias remains in sync with the original page. They 1736 are not directly editable and do not have revisions. 1737 1738 You can convert an alias into a regular page by setting the .alias_of attibute to None 1739 and creating an initial revision. 1740 1741 :param recursive: create aliases of the page's subtree, defaults to False 1742 :type recursive: boolean, optional 1743 :param parent: The page to create the new alias under 1744 :type parent: Page, optional 1745 :param update_slug: The slug of the new alias page, defaults to the slug of the original page 1746 :type update_slug: string, optional 1747 :param update_locale: The locale of the new alias page, defaults to the locale of the original page 1748 :type update_locale: Locale, optional 1749 :param user: The user who is performing this action. This user would be assigned as the owner of the new page and appear in the audit log 1750 :type user: User, optional 1751 :param log_action: Override the log action with a custom one. or pass None to skip logging, defaults to 'wagtail.create_alias' 1752 :type log_action: string or None, optional 1753 :param reset_translation_key: Generate new translation_keys for the page and any translatable child objects, defaults to False 1754 :type reset_translation_key: boolean, optional 1755 """ 1756 specific_self = self.specific 1757 1758 # FIXME: Switch to the same fields that are excluded from copy 1759 # We can't do this right now because we can't exclude fields from with_content_json 1760 # which we use for updating aliases 1761 exclude_fields = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries'] 1762 1763 update_attrs = { 1764 'alias_of': self, 1765 1766 # Aliases don't have revisions so the draft title should always match the live title 1767 'draft_title': self.title, 1768 1769 # Likewise, an alias page can't have unpublished changes if it's live 1770 'has_unpublished_changes': not self.live, 1771 } 1772 1773 if update_slug: 1774 update_attrs['slug'] = update_slug 1775 1776 if update_locale: 1777 update_attrs['locale'] = update_locale 1778 1779 if user: 1780 update_attrs['owner'] = user 1781 1782 # When we're not copying for translation, we should give the translation_key a new value 1783 if reset_translation_key: 1784 update_attrs['translation_key'] = uuid.uuid4() 1785 1786 alias, child_object_map = _copy(specific_self, update_attrs=update_attrs, exclude_fields=exclude_fields) 1787 1788 # Update any translatable child objects 1789 for (child_relation, old_pk), child_object in child_object_map.items(): 1790 if isinstance(child_object, TranslatableMixin): 1791 if update_locale: 1792 child_object.locale = update_locale 1793 1794 # When we're not copying for translation, we should give the translation_key a new value for each child object as well 1795 if reset_translation_key: 1796 child_object.translation_key = uuid.uuid4() 1797 1798 # Save the new page 1799 if _mpnode_attrs: 1800 # We've got a tree position already reserved. Perform a quick save 1801 alias.path = _mpnode_attrs[0] 1802 alias.depth = _mpnode_attrs[1] 1803 alias.save(clean=False) 1804 1805 else: 1806 if parent: 1807 if recursive and (parent == self or parent.is_descendant_of(self)): 1808 raise Exception("You cannot copy a tree branch recursively into itself") 1809 alias = parent.add_child(instance=alias) 1810 else: 1811 alias = self.add_sibling(instance=alias) 1812 1813 _mpnode_attrs = (alias.path, alias.depth) 1814 1815 _copy_m2m_relations(specific_self, alias, exclude_fields=exclude_fields) 1816 1817 # Log 1818 if log_action: 1819 source_parent = specific_self.get_parent() 1820 PageLogEntry.objects.log_action( 1821 instance=alias, 1822 action=log_action, 1823 user=user, 1824 data={ 1825 'page': { 1826 'id': alias.id, 1827 'title': alias.get_admin_display_title() 1828 }, 1829 'source': {'id': source_parent.id, 'title': source_parent.specific_deferred.get_admin_display_title()} if source_parent else None, 1830 'destination': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None, 1831 }, 1832 ) 1833 if alias.live: 1834 # Log the publish 1835 PageLogEntry.objects.log_action( 1836 instance=alias, 1837 action='wagtail.publish', 1838 user=user, 1839 ) 1840 1841 logger.info("Page alias created: \"%s\" id=%d from=%d", alias.title, alias.id, self.id) 1842 1843 # Copy child pages 1844 if recursive: 1845 numchild = 0 1846 1847 for child_page in self.get_children().specific(): 1848 newdepth = _mpnode_attrs[1] + 1 1849 child_mpnode_attrs = ( 1850 Page._get_path(_mpnode_attrs[0], newdepth, numchild), 1851 newdepth 1852 ) 1853 numchild += 1 1854 child_page.create_alias( 1855 recursive=True, 1856 parent=alias, 1857 update_locale=update_locale, 1858 user=user, 1859 log_action=log_action, 1860 reset_translation_key=reset_translation_key, 1861 _mpnode_attrs=child_mpnode_attrs 1862 ) 1863 1864 if numchild > 0: 1865 alias.numchild = numchild 1866 alias.save(clean=False, update_fields=['numchild']) 1867 1868 return alias 1869 1870 create_alias.alters_data = True 1871 1872 @transaction.atomic 1873 def copy_for_translation(self, locale, copy_parents=False, alias=False, exclude_fields=None): 1874 """ 1875 Creates a copy of this page in the specified locale. 1876 1877 The new page will be created in draft as a child of this page's translated 1878 parent. 1879 1880 For example, if you are translating a blog post from English into French, 1881 this method will look for the French version of the blog index and create 1882 the French translation of the blog post under that. 1883 1884 If this page's parent is not translated into the locale, then a ``ParentNotTranslatedError`` 1885 is raised. You can circumvent this error by passing ``copy_parents=True`` which 1886 copies any parents that are not translated yet. 1887 1888 The ``exclude_fields`` parameter can be used to set any fields to a blank value 1889 in the copy. 1890 1891 Note that this method calls the ``.copy()`` method internally so any fields that 1892 are excluded in ``.exclude_fields_in_copy`` will be excluded from the translation. 1893 """ 1894 # Find the translated version of the parent page to create the new page under 1895 parent = self.get_parent().specific 1896 slug = self.slug 1897 1898 if not parent.is_root(): 1899 try: 1900 translated_parent = parent.get_translation(locale) 1901 except parent.__class__.DoesNotExist: 1902 if not copy_parents: 1903 raise ParentNotTranslatedError 1904 1905 translated_parent = parent.copy_for_translation( 1906 locale, copy_parents=True, alias=True 1907 ) 1908 else: 1909 # Don't duplicate the root page for translation. Create new locale as a sibling 1910 translated_parent = parent 1911 1912 # Append language code to slug as the new page 1913 # will be created in the same section as the existing one 1914 slug += "-" + locale.language_code 1915 1916 # Find available slug for new page 1917 slug = find_available_slug(translated_parent, slug) 1918 1919 if alias: 1920 return self.create_alias( 1921 parent=translated_parent, 1922 update_slug=slug, 1923 update_locale=locale, 1924 reset_translation_key=False, 1925 ) 1926 1927 else: 1928 # Update locale on translatable child objects as well 1929 def process_child_object( 1930 original_page, page_copy, child_relation, child_object 1931 ): 1932 if isinstance(child_object, TranslatableMixin): 1933 child_object.locale = locale 1934 1935 return self.copy( 1936 to=translated_parent, 1937 update_attrs={ 1938 "locale": locale, 1939 "slug": slug, 1940 }, 1941 copy_revisions=False, 1942 keep_live=False, 1943 reset_translation_key=False, 1944 process_child_object=process_child_object, 1945 exclude_fields=exclude_fields, 1946 ) 1947 1948 copy_for_translation.alters_data = True 1949 1950 def permissions_for_user(self, user): 1951 """ 1952 Return a PagePermissionsTester object defining what actions the user can perform on this page 1953 """ 1954 user_perms = UserPagePermissionsProxy(user) 1955 return user_perms.for_page(self) 1956 1957 def make_preview_request(self, original_request=None, preview_mode=None, extra_request_attrs=None): 1958 """ 1959 Simulate a request to this page, by constructing a fake HttpRequest object that is (as far 1960 as possible) representative of a real request to this page's front-end URL, and invoking 1961 serve_preview with that request (and the given preview_mode). 1962 1963 Used for previewing / moderation and any other place where we 1964 want to display a view of this page in the admin interface without going through the regular 1965 page routing logic. 1966 1967 If you pass in a real request object as original_request, additional information (e.g. client IP, cookies) 1968 will be included in the dummy request. 1969 """ 1970 dummy_meta = self._get_dummy_headers(original_request) 1971 request = WSGIRequest(dummy_meta) 1972 1973 # Add a flag to let middleware know that this is a dummy request. 1974 request.is_dummy = True 1975 1976 if extra_request_attrs: 1977 for k, v in extra_request_attrs.items(): 1978 setattr(request, k, v) 1979 1980 page = self 1981 1982 # Build a custom django.core.handlers.BaseHandler subclass that invokes serve_preview as 1983 # the eventual view function called at the end of the middleware chain, rather than going 1984 # through the URL resolver 1985 class Handler(BaseHandler): 1986 def _get_response(self, request): 1987 response = page.serve_preview(request, preview_mode) 1988 if hasattr(response, 'render') and callable(response.render): 1989 response = response.render() 1990 return response 1991 1992 # Invoke this custom handler. 1993 handler = Handler() 1994 handler.load_middleware() 1995 return handler.get_response(request) 1996 1997 def _get_dummy_headers(self, original_request=None): 1998 """ 1999 Return a dict of META information to be included in a faked HttpRequest object to pass to 2000 serve_preview. 2001 """ 2002 url = self._get_dummy_header_url(original_request) 2003 if url: 2004 url_info = urlparse(url) 2005 hostname = url_info.hostname 2006 path = url_info.path 2007 port = url_info.port or (443 if url_info.scheme == 'https' else 80) 2008 scheme = url_info.scheme 2009 else: 2010 # Cannot determine a URL to this page - cobble one together based on 2011 # whatever we find in ALLOWED_HOSTS 2012 try: 2013 hostname = settings.ALLOWED_HOSTS[0] 2014 if hostname == '*': 2015 # '*' is a valid value to find in ALLOWED_HOSTS[0], but it's not a valid domain name. 2016 # So we pretend it isn't there. 2017 raise IndexError 2018 except IndexError: 2019 hostname = 'localhost' 2020 path = '/' 2021 port = 80 2022 scheme = 'http' 2023 2024 http_host = hostname 2025 if port != (443 if scheme == 'https' else 80): 2026 http_host = '%s:%s' % (http_host, port) 2027 dummy_values = { 2028 'REQUEST_METHOD': 'GET', 2029 'PATH_INFO': path, 2030 'SERVER_NAME': hostname, 2031 'SERVER_PORT': port, 2032 'SERVER_PROTOCOL': 'HTTP/1.1', 2033 'HTTP_HOST': http_host, 2034 'wsgi.version': (1, 0), 2035 'wsgi.input': StringIO(), 2036 'wsgi.errors': StringIO(), 2037 'wsgi.url_scheme': scheme, 2038 'wsgi.multithread': True, 2039 'wsgi.multiprocess': True, 2040 'wsgi.run_once': False, 2041 } 2042 2043 # Add important values from the original request object, if it was provided. 2044 HEADERS_FROM_ORIGINAL_REQUEST = [ 2045 'REMOTE_ADDR', 'HTTP_X_FORWARDED_FOR', 'HTTP_COOKIE', 'HTTP_USER_AGENT', 'HTTP_AUTHORIZATION', 2046 'wsgi.version', 'wsgi.multithread', 'wsgi.multiprocess', 'wsgi.run_once', 2047 ] 2048 if settings.SECURE_PROXY_SSL_HEADER: 2049 HEADERS_FROM_ORIGINAL_REQUEST.append(settings.SECURE_PROXY_SSL_HEADER[0]) 2050 if original_request: 2051 for header in HEADERS_FROM_ORIGINAL_REQUEST: 2052 if header in original_request.META: 2053 dummy_values[header] = original_request.META[header] 2054 2055 return dummy_values 2056 2057 def _get_dummy_header_url(self, original_request=None): 2058 """ 2059 Return the URL that _get_dummy_headers() should use to set META headers 2060 for the faked HttpRequest. 2061 """ 2062 return self.full_url 2063 2064 DEFAULT_PREVIEW_MODES = [('', _('Default'))] 2065 2066 @property 2067 def preview_modes(self): 2068 """ 2069 A list of (internal_name, display_name) tuples for the modes in which 2070 this page can be displayed for preview/moderation purposes. Ordinarily a page 2071 will only have one display mode, but subclasses of Page can override this - 2072 for example, a page containing a form might have a default view of the form, 2073 and a post-submission 'thank you' page 2074 """ 2075 return Page.DEFAULT_PREVIEW_MODES 2076 2077 @property 2078 def default_preview_mode(self): 2079 """ 2080 The preview mode to use in workflows that do not give the user the option of selecting a 2081 mode explicitly, e.g. moderator approval. Will raise IndexError if preview_modes is empty 2082 """ 2083 return self.preview_modes[0][0] 2084 2085 def is_previewable(self): 2086 """Returns True if at least one preview mode is specified""" 2087 # It's possible that this will be called from a listing page using a plain Page queryset - 2088 # if so, checking self.preview_modes would incorrectly give us the default set from 2089 # Page.preview_modes. However, accessing self.specific.preview_modes would result in an N+1 2090 # query problem. To avoid this (at least in the general case), we'll call .specific only if 2091 # a check of the property at the class level indicates that preview_modes has been 2092 # overridden from whatever type we're currently in. 2093 page = self 2094 if page.specific_class.preview_modes != type(page).preview_modes: 2095 page = page.specific 2096 2097 return bool(page.preview_modes) 2098 2099 def serve_preview(self, request, mode_name): 2100 """ 2101 Return an HTTP response for use in page previews. Normally this would be equivalent 2102 to self.serve(request), since we obviously want the preview to be indicative of how 2103 it looks on the live site. However, there are a couple of cases where this is not 2104 appropriate, and custom behaviour is required: 2105 2106 1) The page has custom routing logic that derives some additional required 2107 args/kwargs to be passed to serve(). The routing mechanism is bypassed when 2108 previewing, so there's no way to know what args we should pass. In such a case, 2109 the page model needs to implement its own version of serve_preview. 2110 2111 2) The page has several different renderings that we would like to be able to see 2112 when previewing - for example, a form page might have one rendering that displays 2113 the form, and another rendering to display a landing page when the form is posted. 2114 This can be done by setting a custom preview_modes list on the page model - 2115 Wagtail will allow the user to specify one of those modes when previewing, and 2116 pass the chosen mode_name to serve_preview so that the page model can decide how 2117 to render it appropriately. (Page models that do not specify their own preview_modes 2118 list will always receive an empty string as mode_name.) 2119 2120 Any templates rendered during this process should use the 'request' object passed 2121 here - this ensures that request.user and other properties are set appropriately for 2122 the wagtail user bar to be displayed. This request will always be a GET. 2123 """ 2124 request.is_preview = True 2125 2126 response = self.serve(request) 2127 patch_cache_control(response, private=True) 2128 return response 2129 2130 def get_cached_paths(self): 2131 """ 2132 This returns a list of paths to invalidate in a frontend cache 2133 """ 2134 return ['/'] 2135 2136 def get_sitemap_urls(self, request=None): 2137 return [ 2138 { 2139 'location': self.get_full_url(request), 2140 # fall back on latest_revision_created_at if last_published_at is null 2141 # (for backwards compatibility from before last_published_at was added) 2142 'lastmod': (self.last_published_at or self.latest_revision_created_at), 2143 } 2144 ] 2145 2146 def get_static_site_paths(self): 2147 """ 2148 This is a generator of URL paths to feed into a static site generator 2149 Override this if you would like to create static versions of subpages 2150 """ 2151 # Yield path for this page 2152 yield '/' 2153 2154 # Yield paths for child pages 2155 for child in self.get_children().live(): 2156 for path in child.specific.get_static_site_paths(): 2157 yield '/' + child.slug + path 2158 2159 def get_ancestors(self, inclusive=False): 2160 """ 2161 Returns a queryset of the current page's ancestors, starting at the root page 2162 and descending to the parent, or to the current page itself if ``inclusive`` is true. 2163 """ 2164 return Page.objects.ancestor_of(self, inclusive) 2165 2166 def get_descendants(self, inclusive=False): 2167 """ 2168 Returns a queryset of all pages underneath the current page, any number of levels deep. 2169 If ``inclusive`` is true, the current page itself is included in the queryset. 2170 """ 2171 return Page.objects.descendant_of(self, inclusive) 2172 2173 def get_siblings(self, inclusive=True): 2174 """ 2175 Returns a queryset of all other pages with the same parent as the current page. 2176 If ``inclusive`` is true, the current page itself is included in the queryset. 2177 """ 2178 return Page.objects.sibling_of(self, inclusive) 2179 2180 def get_next_siblings(self, inclusive=False): 2181 return self.get_siblings(inclusive).filter(path__gte=self.path).order_by('path') 2182 2183 def get_prev_siblings(self, inclusive=False): 2184 return self.get_siblings(inclusive).filter(path__lte=self.path).order_by('-path') 2185 2186 def get_view_restrictions(self): 2187 """ 2188 Return a query set of all page view restrictions that apply to this page. 2189 2190 This checks the current page and all ancestor pages for page view restrictions. 2191 2192 If any of those pages are aliases, it will resolve them to their source pages 2193 before querying PageViewRestrictions so alias pages use the same view restrictions 2194 as their source page and they cannot have their own. 2195 """ 2196 page_ids_to_check = set() 2197 2198 def add_page_to_check_list(page): 2199 # If the page is an alias, add the source page to the check list instead 2200 if page.alias_of: 2201 add_page_to_check_list(page.alias_of) 2202 else: 2203 page_ids_to_check.add(page.id) 2204 2205 # Check current page for view restrictions 2206 add_page_to_check_list(self) 2207 2208 # Check each ancestor for view restrictions as well 2209 for page in self.get_ancestors().only('alias_of'): 2210 add_page_to_check_list(page) 2211 2212 return PageViewRestriction.objects.filter(page_id__in=page_ids_to_check) 2213 2214 password_required_template = getattr(settings, 'PASSWORD_REQUIRED_TEMPLATE', 'wagtailcore/password_required.html') 2215 2216 def serve_password_required_response(self, request, form, action_url): 2217 """ 2218 Serve a response indicating that the user has been denied access to view this page, 2219 and must supply a password. 2220 form = a Django form object containing the password input 2221 (and zero or more hidden fields that also need to be output on the template) 2222 action_url = URL that this form should be POSTed to 2223 """ 2224 context = self.get_context(request) 2225 context['form'] = form 2226 context['action_url'] = action_url 2227 return TemplateResponse(request, self.password_required_template, context) 2228 2229 def with_content_json(self, content_json): 2230 """ 2231 Returns a new version of the page with field values updated to reflect changes 2232 in the provided ``content_json`` (which usually comes from a previously-saved 2233 page revision). 2234 2235 Certain field values are preserved in order to prevent errors if the returned 2236 page is saved, such as ``id``, ``content_type`` and some tree-related values. 2237 The following field values are also preserved, as they are considered to be 2238 meaningful to the page as a whole, rather than to a specific revision: 2239 2240 * ``draft_title`` 2241 * ``live`` 2242 * ``has_unpublished_changes`` 2243 * ``owner`` 2244 * ``locked`` 2245 * ``locked_by`` 2246 * ``locked_at`` 2247 * ``latest_revision_created_at`` 2248 * ``first_published_at`` 2249 * ``alias_of`` 2250 * ``wagtail_admin_comments`` (COMMENTS_RELATION_NAME) 2251 """ 2252 2253 data = json.loads(content_json) 2254 2255 # Old revisions (pre Wagtail 2.15) may have saved comment data under the name 'comments' 2256 # rather than the current relation name as set by COMMENTS_RELATION_NAME; 2257 # if a 'comments' field exists and looks like our comments model, alter the data to use 2258 # COMMENTS_RELATION_NAME before restoring 2259 if ( 2260 COMMENTS_RELATION_NAME not in data and 'comments' in data 2261 and isinstance(data['comments'], list) and len(data['comments']) 2262 and isinstance(data['comments'][0], dict) and 'contentpath' in data['comments'][0] 2263 ): 2264 data[COMMENTS_RELATION_NAME] = data['comments'] 2265 del data['comments'] 2266 2267 obj = self.specific_class.from_serializable_data(data) 2268 2269 # These should definitely never change between revisions 2270 obj.id = self.id 2271 obj.pk = self.pk 2272 obj.content_type = self.content_type 2273 2274 # Override possibly-outdated tree parameter fields 2275 obj.path = self.path 2276 obj.depth = self.depth 2277 obj.numchild = self.numchild 2278 2279 # Update url_path to reflect potential slug changes, but maintining the page's 2280 # existing tree position 2281 obj.set_url_path(self.get_parent()) 2282 2283 # Ensure other values that are meaningful for the page as a whole (rather than 2284 # to a specific revision) are preserved 2285 obj.draft_title = self.draft_title 2286 obj.live = self.live 2287 obj.has_unpublished_changes = self.has_unpublished_changes 2288 obj.owner = self.owner 2289 obj.locked = self.locked 2290 obj.locked_by = self.locked_by 2291 obj.locked_at = self.locked_at 2292 obj.latest_revision_created_at = self.latest_revision_created_at 2293 obj.first_published_at = self.first_published_at 2294 obj.translation_key = self.translation_key 2295 obj.locale = self.locale 2296 obj.alias_of_id = self.alias_of_id 2297 revision_comments = getattr(obj, COMMENTS_RELATION_NAME) 2298 page_comments = getattr(self, COMMENTS_RELATION_NAME).filter(resolved_at__isnull=True) 2299 for comment in page_comments: 2300 # attempt to retrieve the comment position from the revision's stored version 2301 # of the comment 2302 try: 2303 revision_comment = revision_comments.get(id=comment.id) 2304 comment.position = revision_comment.position 2305 except Comment.DoesNotExist: 2306 pass 2307 setattr(obj, COMMENTS_RELATION_NAME, page_comments) 2308 2309 return obj 2310 2311 @property 2312 def has_workflow(self): 2313 """Returns True if the page or an ancestor has an active workflow assigned, otherwise False""" 2314 if not getattr(settings, 'WAGTAIL_WORKFLOW_ENABLED', True): 2315 return False 2316 return self.get_ancestors(inclusive=True).filter(workflowpage__isnull=False).filter(workflowpage__workflow__active=True).exists() 2317 2318 def get_workflow(self): 2319 """Returns the active workflow assigned to the page or its nearest ancestor""" 2320 if not getattr(settings, 'WAGTAIL_WORKFLOW_ENABLED', True): 2321 return None 2322 2323 if hasattr(self, 'workflowpage') and self.workflowpage.workflow.active: 2324 return self.workflowpage.workflow 2325 else: 2326 try: 2327 workflow = self.get_ancestors().filter(workflowpage__isnull=False).filter(workflowpage__workflow__active=True).order_by( 2328 '-depth').first().workflowpage.workflow 2329 except AttributeError: 2330 workflow = None 2331 return workflow 2332 2333 @property 2334 def workflow_in_progress(self): 2335 """Returns True if a workflow is in progress on the current page, otherwise False""" 2336 if not getattr(settings, 'WAGTAIL_WORKFLOW_ENABLED', True): 2337 return False 2338 2339 # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `PageQuerySet` as a 2340 # performance optimisation 2341 if hasattr(self, "_current_workflow_states"): 2342 for state in self._current_workflow_states: 2343 if state.status == WorkflowState.STATUS_IN_PROGRESS: 2344 return True 2345 return False 2346 2347 return WorkflowState.objects.filter(page=self, status=WorkflowState.STATUS_IN_PROGRESS).exists() 2348 2349 @property 2350 def current_workflow_state(self): 2351 """Returns the in progress or needs changes workflow state on this page, if it exists""" 2352 if not getattr(settings, 'WAGTAIL_WORKFLOW_ENABLED', True): 2353 return None 2354 2355 # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `pagequeryset` as a 2356 # performance optimisation 2357 if hasattr(self, "_current_workflow_states"): 2358 try: 2359 return self._current_workflow_states[0] 2360 except IndexError: 2361 return 2362 2363 try: 2364 return WorkflowState.objects.active().select_related("current_task_state__task").get(page=self) 2365 except WorkflowState.DoesNotExist: 2366 return 2367 2368 @property 2369 def current_workflow_task_state(self): 2370 """Returns (specific class of) the current task state of the workflow on this page, if it exists""" 2371 current_workflow_state = self.current_workflow_state 2372 if current_workflow_state and current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS and current_workflow_state.current_task_state: 2373 return current_workflow_state.current_task_state.specific 2374 2375 @property 2376 def current_workflow_task(self): 2377 """Returns (specific class of) the current task in progress on this page, if it exists""" 2378 current_workflow_task_state = self.current_workflow_task_state 2379 if current_workflow_task_state: 2380 return current_workflow_task_state.task.specific 2381 2382 class Meta: 2383 verbose_name = _('page') 2384 verbose_name_plural = _('pages') 2385 unique_together = [("translation_key", "locale")] 2386 2387 2388class Orderable(models.Model): 2389 sort_order = models.IntegerField(null=True, blank=True, editable=False) 2390 sort_order_field = 'sort_order' 2391 2392 class Meta: 2393 abstract = True 2394 ordering = ['sort_order'] 2395 2396 2397class SubmittedRevisionsManager(models.Manager): 2398 def get_queryset(self): 2399 return super().get_queryset().filter(submitted_for_moderation=True) 2400 2401 2402class PageRevision(models.Model): 2403 page = models.ForeignKey('Page', verbose_name=_('page'), related_name='revisions', on_delete=models.CASCADE) 2404 submitted_for_moderation = models.BooleanField( 2405 verbose_name=_('submitted for moderation'), 2406 default=False, 2407 db_index=True 2408 ) 2409 created_at = models.DateTimeField(db_index=True, verbose_name=_('created at')) 2410 user = models.ForeignKey( 2411 settings.AUTH_USER_MODEL, verbose_name=_('user'), null=True, blank=True, 2412 on_delete=models.SET_NULL 2413 ) 2414 content_json = models.TextField(verbose_name=_('content JSON')) 2415 approved_go_live_at = models.DateTimeField( 2416 verbose_name=_('approved go live at'), 2417 null=True, 2418 blank=True, 2419 db_index=True 2420 ) 2421 2422 objects = models.Manager() 2423 submitted_revisions = SubmittedRevisionsManager() 2424 2425 def save(self, user=None, *args, **kwargs): 2426 # Set default value for created_at to now 2427 # We cannot use auto_now_add as that will override 2428 # any value that is set before saving 2429 if self.created_at is None: 2430 self.created_at = timezone.now() 2431 2432 super().save(*args, **kwargs) 2433 if self.submitted_for_moderation: 2434 # ensure that all other revisions of this page have the 'submitted for moderation' flag unset 2435 self.page.revisions.exclude(id=self.id).update(submitted_for_moderation=False) 2436 2437 if ( 2438 self.approved_go_live_at is None 2439 and 'update_fields' in kwargs and 'approved_go_live_at' in kwargs['update_fields'] 2440 ): 2441 # Log scheduled revision publish cancellation 2442 page = self.as_page_object() 2443 # go_live_at = kwargs['update_fields'][] 2444 PageLogEntry.objects.log_action( 2445 instance=page, 2446 action='wagtail.schedule.cancel', 2447 data={ 2448 'revision': { 2449 'id': self.id, 2450 'created': self.created_at.strftime("%d %b %Y %H:%M"), 2451 'go_live_at': page.go_live_at.strftime("%d %b %Y %H:%M") if page.go_live_at else None, 2452 } 2453 }, 2454 user=user, 2455 revision=self, 2456 ) 2457 2458 def as_page_object(self): 2459 return self.page.specific.with_content_json(self.content_json) 2460 2461 def approve_moderation(self, user=None): 2462 if self.submitted_for_moderation: 2463 logger.info("Page moderation approved: \"%s\" id=%d revision_id=%d", self.page.title, self.page.id, self.id) 2464 PageLogEntry.objects.log_action( 2465 instance=self.as_page_object(), 2466 action='wagtail.moderation.approve', 2467 user=user, 2468 revision=self, 2469 ) 2470 self.publish() 2471 2472 def reject_moderation(self, user=None): 2473 if self.submitted_for_moderation: 2474 logger.info("Page moderation rejected: \"%s\" id=%d revision_id=%d", self.page.title, self.page.id, self.id) 2475 PageLogEntry.objects.log_action( 2476 instance=self.as_page_object(), 2477 action='wagtail.moderation.reject', 2478 user=user, 2479 revision=self, 2480 ) 2481 self.submitted_for_moderation = False 2482 self.save(update_fields=['submitted_for_moderation']) 2483 2484 def is_latest_revision(self): 2485 if self.id is None: 2486 # special case: a revision without an ID is presumed to be newly-created and is thus 2487 # newer than any revision that might exist in the database 2488 return True 2489 latest_revision = PageRevision.objects.filter(page_id=self.page_id).order_by('-created_at', '-id').first() 2490 return (latest_revision == self) 2491 2492 def delete(self): 2493 # Update revision_created fields for comments that reference the current revision, if applicable. 2494 2495 try: 2496 next_revision = self.get_next() 2497 except PageRevision.DoesNotExist: 2498 next_revision = None 2499 2500 if next_revision: 2501 # move comments created on this revision to the next revision, as they may well still apply if they're unresolved 2502 self.created_comments.all().update(revision_created=next_revision) 2503 2504 return super().delete() 2505 2506 def publish(self, user=None, changed=True, log_action=True, previous_revision=None): 2507 """ 2508 Publishes or schedules revision for publishing. 2509 2510 :param user: the publishing user 2511 :param changed: indicated whether content has changed 2512 :param log_action: 2513 flag for the logging action. Pass False to skip logging. Cannot pass an action string as the method 2514 performs several actions: "publish", "revert" (and publish the reverted revision), 2515 "schedule publishing with a live revision", "schedule revision reversal publishing, with a live revision", 2516 "schedule publishing", "schedule revision reversal publishing" 2517 :param previous_revision: indicates a revision reversal. Should be set to the previous revision instance 2518 """ 2519 page = self.as_page_object() 2520 2521 def log_scheduling_action(revision, user=None, changed=changed): 2522 PageLogEntry.objects.log_action( 2523 instance=page, 2524 action='wagtail.publish.schedule', 2525 user=user, 2526 data={ 2527 'revision': { 2528 'id': revision.id, 2529 'created': revision.created_at.strftime("%d %b %Y %H:%M"), 2530 'go_live_at': page.go_live_at.strftime("%d %b %Y %H:%M"), 2531 'has_live_version': page.live, 2532 } 2533 }, 2534 revision=revision, 2535 content_changed=changed, 2536 ) 2537 2538 if page.go_live_at and page.go_live_at > timezone.now(): 2539 page.has_unpublished_changes = True 2540 # Instead set the approved_go_live_at of this revision 2541 self.approved_go_live_at = page.go_live_at 2542 self.save() 2543 # And clear the the approved_go_live_at of any other revisions 2544 page.revisions.exclude(id=self.id).update(approved_go_live_at=None) 2545 # if we are updating a currently live page skip the rest 2546 if page.live_revision: 2547 # Log scheduled publishing 2548 if log_action: 2549 log_scheduling_action(self, user, changed) 2550 2551 return 2552 # if we have a go_live in the future don't make the page live 2553 page.live = False 2554 else: 2555 page.live = True 2556 # at this point, the page has unpublished changes if and only if there are newer revisions than this one 2557 page.has_unpublished_changes = not self.is_latest_revision() 2558 # If page goes live clear the approved_go_live_at of all revisions 2559 page.revisions.update(approved_go_live_at=None) 2560 page.expired = False # When a page is published it can't be expired 2561 2562 # Set first_published_at, last_published_at and live_revision 2563 # if the page is being published now 2564 if page.live: 2565 now = timezone.now() 2566 page.last_published_at = now 2567 page.live_revision = self 2568 2569 if page.first_published_at is None: 2570 page.first_published_at = now 2571 2572 if previous_revision: 2573 previous_revision_page = previous_revision.as_page_object() 2574 old_page_title = previous_revision_page.title if page.title != previous_revision_page.title else None 2575 else: 2576 try: 2577 previous = self.get_previous() 2578 except PageRevision.DoesNotExist: 2579 previous = None 2580 old_page_title = previous.page.title if previous and page.title != previous.page.title else None 2581 else: 2582 # Unset live_revision if the page is going live in the future 2583 page.live_revision = None 2584 2585 page.save() 2586 2587 for comment in getattr(page, COMMENTS_RELATION_NAME).all().only('position'): 2588 comment.save(update_fields=['position']) 2589 2590 self.submitted_for_moderation = False 2591 page.revisions.update(submitted_for_moderation=False) 2592 2593 workflow_state = page.current_workflow_state 2594 if workflow_state and getattr(settings, 'WAGTAIL_WORKFLOW_CANCEL_ON_PUBLISH', True): 2595 workflow_state.cancel(user=user) 2596 2597 if page.live: 2598 page_published.send(sender=page.specific_class, instance=page.specific, revision=self) 2599 2600 # Update alias pages 2601 page.update_aliases(revision=self, user=user, _content_json=self.content_json) 2602 2603 if log_action: 2604 data = None 2605 if previous_revision: 2606 data = { 2607 'revision': { 2608 'id': previous_revision.id, 2609 'created': previous_revision.created_at.strftime("%d %b %Y %H:%M") 2610 } 2611 } 2612 2613 if old_page_title: 2614 data = data or {} 2615 data['title'] = { 2616 'old': old_page_title, 2617 'new': page.title, 2618 } 2619 2620 PageLogEntry.objects.log_action( 2621 instance=page, 2622 action='wagtail.rename', 2623 user=user, 2624 data=data, 2625 revision=self, 2626 ) 2627 2628 PageLogEntry.objects.log_action( 2629 instance=page, 2630 action=log_action if isinstance(log_action, str) else 'wagtail.publish', 2631 user=user, 2632 data=data, 2633 revision=self, 2634 content_changed=changed, 2635 ) 2636 2637 logger.info("Page published: \"%s\" id=%d revision_id=%d", page.title, page.id, self.id) 2638 elif page.go_live_at: 2639 logger.info( 2640 "Page scheduled for publish: \"%s\" id=%d revision_id=%d go_live_at=%s", 2641 page.title, 2642 page.id, 2643 self.id, 2644 page.go_live_at.isoformat() 2645 ) 2646 2647 if log_action: 2648 log_scheduling_action(self, user, changed) 2649 2650 def get_previous(self): 2651 return self.get_previous_by_created_at(page=self.page) 2652 2653 def get_next(self): 2654 return self.get_next_by_created_at(page=self.page) 2655 2656 def __str__(self): 2657 return '"' + str(self.page) + '" at ' + str(self.created_at) 2658 2659 class Meta: 2660 verbose_name = _('page revision') 2661 verbose_name_plural = _('page revisions') 2662 2663 2664PAGE_PERMISSION_TYPES = [ 2665 ('add', _("Add"), _("Add/edit pages you own")), 2666 ('edit', _("Edit"), _("Edit any page")), 2667 ('publish', _("Publish"), _("Publish any page")), 2668 ('bulk_delete', _("Bulk delete"), _("Delete pages with children")), 2669 ('lock', _("Lock"), _("Lock/unlock pages you've locked")), 2670 ('unlock', _("Unlock"), _("Unlock any page")), 2671] 2672 2673PAGE_PERMISSION_TYPE_CHOICES = [ 2674 (identifier, long_label) 2675 for identifier, short_label, long_label in PAGE_PERMISSION_TYPES 2676] 2677 2678 2679class GroupPagePermission(models.Model): 2680 group = models.ForeignKey(Group, verbose_name=_('group'), related_name='page_permissions', on_delete=models.CASCADE) 2681 page = models.ForeignKey('Page', verbose_name=_('page'), related_name='group_permissions', on_delete=models.CASCADE) 2682 permission_type = models.CharField( 2683 verbose_name=_('permission type'), 2684 max_length=20, 2685 choices=PAGE_PERMISSION_TYPE_CHOICES 2686 ) 2687 2688 class Meta: 2689 unique_together = ('group', 'page', 'permission_type') 2690 verbose_name = _('group page permission') 2691 verbose_name_plural = _('group page permissions') 2692 2693 def __str__(self): 2694 return "Group %d ('%s') has permission '%s' on page %d ('%s')" % ( 2695 self.group.id, self.group, 2696 self.permission_type, 2697 self.page.id, self.page 2698 ) 2699 2700 2701class UserPagePermissionsProxy: 2702 """Helper object that encapsulates all the page permission rules that this user has 2703 across the page hierarchy.""" 2704 2705 def __init__(self, user): 2706 self.user = user 2707 2708 if user.is_active and not user.is_superuser: 2709 self.permissions = GroupPagePermission.objects.filter(group__user=self.user).select_related('page') 2710 2711 def revisions_for_moderation(self): 2712 """Return a queryset of page revisions awaiting moderation that this user has publish permission on""" 2713 2714 # Deal with the trivial cases first... 2715 if not self.user.is_active: 2716 return PageRevision.objects.none() 2717 if self.user.is_superuser: 2718 return PageRevision.submitted_revisions.all() 2719 2720 # get the list of pages for which they have direct publish permission 2721 # (i.e. they can publish any page within this subtree) 2722 publishable_pages_paths = self.permissions.filter( 2723 permission_type='publish' 2724 ).values_list('page__path', flat=True).distinct() 2725 if not publishable_pages_paths: 2726 return PageRevision.objects.none() 2727 2728 # compile a filter expression to apply to the PageRevision.submitted_revisions manager: 2729 # return only those pages whose paths start with one of the publishable_pages paths 2730 only_my_sections = Q(page__path__startswith=publishable_pages_paths[0]) 2731 for page_path in publishable_pages_paths[1:]: 2732 only_my_sections = only_my_sections | Q(page__path__startswith=page_path) 2733 2734 # return the filtered queryset 2735 return PageRevision.submitted_revisions.filter(only_my_sections) 2736 2737 def for_page(self, page): 2738 """Return a PagePermissionTester object that can be used to query whether this user has 2739 permission to perform specific tasks on the given page""" 2740 return PagePermissionTester(self, page) 2741 2742 def explorable_pages(self): 2743 """Return a queryset of pages that the user has access to view in the 2744 explorer (e.g. add/edit/publish permission). Includes all pages with 2745 specific group permissions and also the ancestors of those pages (in 2746 order to enable navigation in the explorer)""" 2747 # Deal with the trivial cases first... 2748 if not self.user.is_active: 2749 return Page.objects.none() 2750 if self.user.is_superuser: 2751 return Page.objects.all() 2752 2753 explorable_pages = Page.objects.none() 2754 2755 # Creates a union queryset of all objects the user has access to add, 2756 # edit and publish 2757 for perm in self.permissions.filter( 2758 Q(permission_type="add") 2759 | Q(permission_type="edit") 2760 | Q(permission_type="publish") 2761 | Q(permission_type="lock") 2762 ): 2763 explorable_pages |= Page.objects.descendant_of( 2764 perm.page, inclusive=True 2765 ) 2766 2767 # For all pages with specific permissions, add their ancestors as 2768 # explorable. This will allow deeply nested pages to be accessed in the 2769 # explorer. For example, in the hierarchy A>B>C>D where the user has 2770 # 'edit' access on D, they will be able to navigate to D without having 2771 # explicit access to A, B or C. 2772 page_permissions = Page.objects.filter(group_permissions__in=self.permissions) 2773 for page in page_permissions: 2774 explorable_pages |= page.get_ancestors() 2775 2776 # Remove unnecessary top-level ancestors that the user has no access to 2777 fca_page = page_permissions.first_common_ancestor() 2778 explorable_pages = explorable_pages.filter(path__startswith=fca_page.path) 2779 2780 return explorable_pages 2781 2782 def editable_pages(self): 2783 """Return a queryset of the pages that this user has permission to edit""" 2784 # Deal with the trivial cases first... 2785 if not self.user.is_active: 2786 return Page.objects.none() 2787 if self.user.is_superuser: 2788 return Page.objects.all() 2789 2790 editable_pages = Page.objects.none() 2791 2792 for perm in self.permissions.filter(permission_type='add'): 2793 # user has edit permission on any subpage of perm.page 2794 # (including perm.page itself) that is owned by them 2795 editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True).filter(owner=self.user) 2796 2797 for perm in self.permissions.filter(permission_type='edit'): 2798 # user has edit permission on any subpage of perm.page 2799 # (including perm.page itself) regardless of owner 2800 editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True) 2801 2802 return editable_pages 2803 2804 def can_edit_pages(self): 2805 """Return True if the user has permission to edit any pages""" 2806 return self.editable_pages().exists() 2807 2808 def publishable_pages(self): 2809 """Return a queryset of the pages that this user has permission to publish""" 2810 # Deal with the trivial cases first... 2811 if not self.user.is_active: 2812 return Page.objects.none() 2813 if self.user.is_superuser: 2814 return Page.objects.all() 2815 2816 publishable_pages = Page.objects.none() 2817 2818 for perm in self.permissions.filter(permission_type='publish'): 2819 # user has publish permission on any subpage of perm.page 2820 # (including perm.page itself) 2821 publishable_pages |= Page.objects.descendant_of(perm.page, inclusive=True) 2822 2823 return publishable_pages 2824 2825 def can_publish_pages(self): 2826 """Return True if the user has permission to publish any pages""" 2827 return self.publishable_pages().exists() 2828 2829 def can_remove_locks(self): 2830 """Returns True if the user has permission to unlock pages they have not locked""" 2831 if self.user.is_superuser: 2832 return True 2833 if not self.user.is_active: 2834 return False 2835 else: 2836 return self.permissions.filter(permission_type='unlock').exists() 2837 2838 2839class PagePermissionTester: 2840 def __init__(self, user_perms, page): 2841 self.user = user_perms.user 2842 self.user_perms = user_perms 2843 self.page = page 2844 self.page_is_root = page.depth == 1 # Equivalent to page.is_root() 2845 2846 if self.user.is_active and not self.user.is_superuser: 2847 self.permissions = set( 2848 perm.permission_type for perm in user_perms.permissions 2849 if self.page.path.startswith(perm.page.path) 2850 ) 2851 2852 def user_has_lock(self): 2853 return self.page.locked_by_id == self.user.pk 2854 2855 def page_locked(self): 2856 current_workflow_task = self.page.current_workflow_task 2857 if current_workflow_task: 2858 if current_workflow_task.page_locked_for_user(self.page, self.user): 2859 return True 2860 2861 if not self.page.locked: 2862 # Page is not locked 2863 return False 2864 2865 if getattr(settings, 'WAGTAILADMIN_GLOBAL_PAGE_EDIT_LOCK', False): 2866 # All locks are global 2867 return True 2868 else: 2869 # Locked only if the current user was not the one who locked the page 2870 return not self.user_has_lock() 2871 2872 def can_add_subpage(self): 2873 if not self.user.is_active: 2874 return False 2875 specific_class = self.page.specific_class 2876 if specific_class is None or not specific_class.creatable_subpage_models(): 2877 return False 2878 return self.user.is_superuser or ('add' in self.permissions) 2879 2880 def can_edit(self): 2881 if not self.user.is_active: 2882 return False 2883 2884 if self.page_is_root: # root node is not a page and can never be edited, even by superusers 2885 return False 2886 2887 if self.user.is_superuser: 2888 return True 2889 2890 if 'edit' in self.permissions: 2891 return True 2892 2893 if 'add' in self.permissions and self.page.owner_id == self.user.pk: 2894 return True 2895 2896 current_workflow_task = self.page.current_workflow_task 2897 if current_workflow_task: 2898 if current_workflow_task.user_can_access_editor(self.page, self.user): 2899 return True 2900 2901 return False 2902 2903 def can_delete(self, ignore_bulk=False): 2904 if not self.user.is_active: 2905 return False 2906 2907 if self.page_is_root: # root node is not a page and can never be deleted, even by superusers 2908 return False 2909 2910 if self.user.is_superuser: 2911 # superusers require no further checks 2912 return True 2913 2914 # if the user does not have bulk_delete permission, they may only delete leaf pages 2915 if 'bulk_delete' not in self.permissions and not self.page.is_leaf() and not ignore_bulk: 2916 return False 2917 2918 if 'edit' in self.permissions: 2919 # if the user does not have publish permission, we also need to confirm that there 2920 # are no published pages here 2921 if 'publish' not in self.permissions: 2922 pages_to_delete = self.page.get_descendants(inclusive=True) 2923 if pages_to_delete.live().exists(): 2924 return False 2925 2926 return True 2927 2928 elif 'add' in self.permissions: 2929 pages_to_delete = self.page.get_descendants(inclusive=True) 2930 if 'publish' in self.permissions: 2931 # we don't care about live state, but all pages must be owned by this user 2932 # (i.e. eliminating pages owned by this user must give us the empty set) 2933 return not pages_to_delete.exclude(owner=self.user).exists() 2934 else: 2935 # all pages must be owned by this user and non-live 2936 # (i.e. eliminating non-live pages owned by this user must give us the empty set) 2937 return not pages_to_delete.exclude(live=False, owner=self.user).exists() 2938 2939 else: 2940 return False 2941 2942 def can_unpublish(self): 2943 if not self.user.is_active: 2944 return False 2945 if (not self.page.live) or self.page_is_root: 2946 return False 2947 if self.page_locked(): 2948 return False 2949 2950 return self.user.is_superuser or ('publish' in self.permissions) 2951 2952 def can_publish(self): 2953 if not self.user.is_active: 2954 return False 2955 if self.page_is_root: 2956 return False 2957 2958 return self.user.is_superuser or ('publish' in self.permissions) 2959 2960 def can_submit_for_moderation(self): 2961 return not self.page_locked() and self.page.has_workflow and not self.page.workflow_in_progress 2962 2963 def can_set_view_restrictions(self): 2964 return self.can_publish() 2965 2966 def can_unschedule(self): 2967 return self.can_publish() 2968 2969 def can_lock(self): 2970 if self.user.is_superuser: 2971 return True 2972 current_workflow_task = self.page.current_workflow_task 2973 if current_workflow_task: 2974 return current_workflow_task.user_can_lock(self.page, self.user) 2975 2976 if 'lock' in self.permissions: 2977 return True 2978 2979 return False 2980 2981 def can_unlock(self): 2982 if self.user.is_superuser: 2983 return True 2984 2985 if self.user_has_lock(): 2986 return True 2987 2988 current_workflow_task = self.page.current_workflow_task 2989 if current_workflow_task: 2990 return current_workflow_task.user_can_unlock(self.page, self.user) 2991 2992 if 'unlock' in self.permissions: 2993 return True 2994 2995 return False 2996 2997 def can_publish_subpage(self): 2998 """ 2999 Niggly special case for creating and publishing a page in one go. 3000 Differs from can_publish in that we want to be able to publish subpages of root, but not 3001 to be able to publish root itself. (Also, can_publish_subpage returns false if the page 3002 does not allow subpages at all.) 3003 """ 3004 if not self.user.is_active: 3005 return False 3006 specific_class = self.page.specific_class 3007 if specific_class is None or not specific_class.creatable_subpage_models(): 3008 return False 3009 3010 return self.user.is_superuser or ('publish' in self.permissions) 3011 3012 def can_reorder_children(self): 3013 """ 3014 Keep reorder permissions the same as publishing, since it immediately affects published pages 3015 (and the use-cases for a non-admin needing to do it are fairly obscure...) 3016 """ 3017 return self.can_publish_subpage() 3018 3019 def can_move(self): 3020 """ 3021 Moving a page should be logically equivalent to deleting and re-adding it (and all its children). 3022 As such, the permission test for 'can this be moved at all?' should be the same as for deletion. 3023 (Further constraints will then apply on where it can be moved *to*.) 3024 """ 3025 return self.can_delete(ignore_bulk=True) 3026 3027 def can_copy(self): 3028 return not self.page_is_root 3029 3030 def can_move_to(self, destination): 3031 # reject the logically impossible cases first 3032 if self.page == destination or destination.is_descendant_of(self.page): 3033 return False 3034 3035 # reject moves that are forbidden by subpage_types / parent_page_types rules 3036 # (these rules apply to superusers too) 3037 if not self.page.specific.can_move_to(destination): 3038 return False 3039 3040 # shortcut the trivial 'everything' / 'nothing' permissions 3041 if not self.user.is_active: 3042 return False 3043 if self.user.is_superuser: 3044 return True 3045 3046 # check that the page can be moved at all 3047 if not self.can_move(): 3048 return False 3049 3050 # Inspect permissions on the destination 3051 destination_perms = self.user_perms.for_page(destination) 3052 3053 # we always need at least add permission in the target 3054 if 'add' not in destination_perms.permissions: 3055 return False 3056 3057 if self.page.live or self.page.get_descendants().filter(live=True).exists(): 3058 # moving this page will entail publishing within the destination section 3059 return ('publish' in destination_perms.permissions) 3060 else: 3061 # no publishing required, so the already-tested 'add' permission is sufficient 3062 return True 3063 3064 def can_copy_to(self, destination, recursive=False): 3065 # reject the logically impossible cases first 3066 # recursive can't copy to the same tree otherwise it will be on infinite loop 3067 if recursive and (self.page == destination or destination.is_descendant_of(self.page)): 3068 return False 3069 3070 # reject inactive users early 3071 if not self.user.is_active: 3072 return False 3073 3074 # reject early if pages of this type cannot be created at the destination 3075 if not self.page.specific_class.can_create_at(destination): 3076 return False 3077 3078 # skip permission checking for super users 3079 if self.user.is_superuser: 3080 return True 3081 3082 # Inspect permissions on the destination 3083 destination_perms = self.user_perms.for_page(destination) 3084 3085 if not destination.specific_class.creatable_subpage_models(): 3086 return False 3087 3088 # we always need at least add permission in the target 3089 if 'add' not in destination_perms.permissions: 3090 return False 3091 3092 return True 3093 3094 def can_view_revisions(self): 3095 return not self.page_is_root 3096 3097 3098class PageViewRestriction(BaseViewRestriction): 3099 page = models.ForeignKey( 3100 'Page', verbose_name=_('page'), related_name='view_restrictions', on_delete=models.CASCADE 3101 ) 3102 3103 passed_view_restrictions_session_key = 'passed_page_view_restrictions' 3104 3105 class Meta: 3106 verbose_name = _('page view restriction') 3107 verbose_name_plural = _('page view restrictions') 3108 3109 def save(self, user=None, **kwargs): 3110 """ 3111 Custom save handler to include logging. 3112 :param user: the user add/updating the view restriction 3113 :param specific_instance: the specific model instance the restriction applies to 3114 """ 3115 specific_instance = self.page.specific 3116 is_new = self.id is None 3117 super().save(**kwargs) 3118 3119 if specific_instance: 3120 PageLogEntry.objects.log_action( 3121 instance=specific_instance, 3122 action='wagtail.view_restriction.create' if is_new else 'wagtail.view_restriction.edit', 3123 user=user, 3124 data={ 3125 'restriction': { 3126 'type': self.restriction_type, 3127 'title': force_str(dict(self.RESTRICTION_CHOICES).get(self.restriction_type)) 3128 } 3129 } 3130 ) 3131 3132 def delete(self, user=None, **kwargs): 3133 """ 3134 Custom delete handler to aid in logging 3135 :param user: the user removing the view restriction 3136 :param specific_instance: the specific model instance the restriction applies to 3137 """ 3138 specific_instance = self.page.specific 3139 if specific_instance: 3140 PageLogEntry.objects.log_action( 3141 instance=specific_instance, 3142 action='wagtail.view_restriction.delete', 3143 user=user, 3144 data={ 3145 'restriction': { 3146 'type': self.restriction_type, 3147 'title': force_str(dict(self.RESTRICTION_CHOICES).get(self.restriction_type)) 3148 } 3149 } 3150 ) 3151 return super().delete(**kwargs) 3152 3153 3154class WorkflowPage(models.Model): 3155 page = models.OneToOneField( 3156 'Page', 3157 verbose_name=_('page'), 3158 on_delete=models.CASCADE, 3159 primary_key=True, 3160 unique=True 3161 ) 3162 workflow = models.ForeignKey( 3163 'Workflow', 3164 related_name='workflow_pages', 3165 verbose_name=_('workflow'), 3166 on_delete=models.CASCADE, 3167 ) 3168 3169 def get_pages(self): 3170 """ 3171 Returns a queryset of pages that are affected by this WorkflowPage link. 3172 3173 This includes all descendants of the page excluding any that have other WorkflowPages. 3174 """ 3175 descendant_pages = Page.objects.descendant_of(self.page, inclusive=True) 3176 descendant_workflow_pages = WorkflowPage.objects.filter(page_id__in=descendant_pages.values_list('id', flat=True)).exclude(pk=self.pk) 3177 3178 for path, depth in descendant_workflow_pages.values_list('page__path', 'page__depth'): 3179 descendant_pages = descendant_pages.exclude(path__startswith=path, depth__gte=depth) 3180 3181 return descendant_pages 3182 3183 class Meta: 3184 verbose_name = _('workflow page') 3185 verbose_name_plural = _('workflow pages') 3186 3187 3188class WorkflowTask(Orderable): 3189 workflow = ParentalKey('Workflow', on_delete=models.CASCADE, verbose_name=_('workflow_tasks'), 3190 related_name='workflow_tasks') 3191 task = models.ForeignKey('Task', on_delete=models.CASCADE, verbose_name=_('task'), related_name='workflow_tasks', 3192 limit_choices_to={'active': True}) 3193 3194 class Meta(Orderable.Meta): 3195 unique_together = [('workflow', 'task')] 3196 verbose_name = _('workflow task order') 3197 verbose_name_plural = _('workflow task orders') 3198 3199 3200class TaskManager(models.Manager): 3201 def active(self): 3202 return self.filter(active=True) 3203 3204 3205class Task(models.Model): 3206 name = models.CharField(max_length=255, verbose_name=_('name')) 3207 content_type = models.ForeignKey( 3208 ContentType, 3209 verbose_name=_('content type'), 3210 related_name='wagtail_tasks', 3211 on_delete=models.CASCADE 3212 ) 3213 active = models.BooleanField(verbose_name=_('active'), default=True, help_text=_( 3214 "Active tasks can be added to workflows. Deactivating a task does not remove it from existing workflows.")) 3215 objects = TaskManager() 3216 3217 admin_form_fields = ['name'] 3218 admin_form_readonly_on_edit_fields = ['name'] 3219 3220 def __init__(self, *args, **kwargs): 3221 super().__init__(*args, **kwargs) 3222 if not self.id: 3223 # this model is being newly created 3224 # rather than retrieved from the db; 3225 if not self.content_type_id: 3226 # set content type to correctly represent the model class 3227 # that this was created as 3228 self.content_type = ContentType.objects.get_for_model(self) 3229 3230 def __str__(self): 3231 return self.name 3232 3233 @property 3234 def workflows(self): 3235 """Returns all ``Workflow`` instances that use this task""" 3236 return Workflow.objects.filter(workflow_tasks__task=self) 3237 3238 @property 3239 def active_workflows(self): 3240 """Return a ``QuerySet``` of active workflows that this task is part of""" 3241 return Workflow.objects.active().filter(workflow_tasks__task=self) 3242 3243 @classmethod 3244 def get_verbose_name(cls): 3245 """ 3246 Returns the human-readable "verbose name" of this task model e.g "Group approval task". 3247 """ 3248 # This is similar to doing cls._meta.verbose_name.title() 3249 # except this doesn't convert any characters to lowercase 3250 return capfirst(cls._meta.verbose_name) 3251 3252 @cached_property 3253 def specific(self): 3254 """ 3255 Return this Task in its most specific subclassed form. 3256 """ 3257 # the ContentType.objects manager keeps a cache, so this should potentially 3258 # avoid a database lookup over doing self.content_type. I think. 3259 content_type = ContentType.objects.get_for_id(self.content_type_id) 3260 model_class = content_type.model_class() 3261 if model_class is None: 3262 # Cannot locate a model class for this content type. This might happen 3263 # if the codebase and database are out of sync (e.g. the model exists 3264 # on a different git branch and we haven't rolled back migrations before 3265 # switching branches); if so, the best we can do is return the page 3266 # unchanged. 3267 return self 3268 elif isinstance(self, model_class): 3269 # self is already the an instance of the most specific class 3270 return self 3271 else: 3272 return content_type.get_object_for_this_type(id=self.id) 3273 3274 task_state_class = None 3275 3276 @classmethod 3277 def get_task_state_class(self): 3278 return self.task_state_class or TaskState 3279 3280 def start(self, workflow_state, user=None): 3281 """Start this task on the provided workflow state by creating an instance of TaskState""" 3282 task_state = self.get_task_state_class()(workflow_state=workflow_state) 3283 task_state.status = TaskState.STATUS_IN_PROGRESS 3284 task_state.page_revision = workflow_state.page.get_latest_revision() 3285 task_state.task = self 3286 task_state.save() 3287 task_submitted.send(sender=task_state.specific.__class__, instance=task_state.specific, user=user) 3288 return task_state 3289 3290 @transaction.atomic 3291 def on_action(self, task_state, user, action_name, **kwargs): 3292 """Performs an action on a task state determined by the ``action_name`` string passed""" 3293 if action_name == 'approve': 3294 task_state.approve(user=user, **kwargs) 3295 elif action_name == 'reject': 3296 task_state.reject(user=user, **kwargs) 3297 3298 def user_can_access_editor(self, page, user): 3299 """Returns True if a user who would not normally be able to access the editor for the page should be able to if the page is currently on this task. 3300 Note that returning False does not remove permissions from users who would otherwise have them.""" 3301 return False 3302 3303 def page_locked_for_user(self, page, user): 3304 """Returns True if the page should be locked to a given user's edits. This can be used to prevent editing by non-reviewers.""" 3305 return False 3306 3307 def user_can_lock(self, page, user): 3308 """Returns True if a user who would not normally be able to lock the page should be able to if the page is currently on this task. 3309 Note that returning False does not remove permissions from users who would otherwise have them.""" 3310 return False 3311 3312 def user_can_unlock(self, page, user): 3313 """Returns True if a user who would not normally be able to unlock the page should be able to if the page is currently on this task. 3314 Note that returning False does not remove permissions from users who would otherwise have them.""" 3315 return False 3316 3317 def get_actions(self, page, user): 3318 """ 3319 Get the list of action strings (name, verbose_name, whether the action requires additional data - see 3320 ``get_form_for_action``) for actions the current user can perform for this task on the given page. 3321 These strings should be the same as those able to be passed to ``on_action`` 3322 """ 3323 return [] 3324 3325 def get_form_for_action(self, action): 3326 return TaskStateCommentForm 3327 3328 def get_template_for_action(self, action): 3329 return '' 3330 3331 def get_task_states_user_can_moderate(self, user, **kwargs): 3332 """Returns a ``QuerySet`` of the task states the current user can moderate""" 3333 return TaskState.objects.none() 3334 3335 @classmethod 3336 def get_description(cls): 3337 """Returns the task description.""" 3338 return '' 3339 3340 @transaction.atomic 3341 def deactivate(self, user=None): 3342 """Set ``active`` to False and cancel all in progress task states linked to this task""" 3343 self.active = False 3344 self.save() 3345 in_progress_states = TaskState.objects.filter(task=self, status=TaskState.STATUS_IN_PROGRESS) 3346 for state in in_progress_states: 3347 state.cancel(user=user) 3348 3349 class Meta: 3350 verbose_name = _('task') 3351 verbose_name_plural = _('tasks') 3352 3353 3354class WorkflowManager(models.Manager): 3355 def active(self): 3356 return self.filter(active=True) 3357 3358 3359class Workflow(ClusterableModel): 3360 name = models.CharField(max_length=255, verbose_name=_('name')) 3361 active = models.BooleanField(verbose_name=_('active'), default=True, help_text=_( 3362 "Active workflows can be added to pages. Deactivating a workflow does not remove it from existing pages.")) 3363 objects = WorkflowManager() 3364 3365 def __str__(self): 3366 return self.name 3367 3368 @property 3369 def tasks(self): 3370 """Returns all ``Task`` instances linked to this workflow""" 3371 return Task.objects.filter(workflow_tasks__workflow=self).order_by('workflow_tasks__sort_order') 3372 3373 @transaction.atomic 3374 def start(self, page, user): 3375 """Initiates a workflow by creating an instance of ``WorkflowState``""" 3376 state = WorkflowState(page=page, workflow=self, status=WorkflowState.STATUS_IN_PROGRESS, requested_by=user) 3377 state.save() 3378 state.update(user=user) 3379 workflow_submitted.send(sender=state.__class__, instance=state, user=user) 3380 3381 next_task_data = None 3382 if state.current_task_state: 3383 next_task_data = { 3384 'id': state.current_task_state.task.id, 3385 'title': state.current_task_state.task.name, 3386 } 3387 PageLogEntry.objects.log_action( 3388 instance=page, 3389 action='wagtail.workflow.start', 3390 data={ 3391 'workflow': { 3392 'id': self.id, 3393 'title': self.name, 3394 'status': state.status, 3395 'next': next_task_data, 3396 'task_state_id': state.current_task_state.id if state.current_task_state else None, 3397 } 3398 }, 3399 revision=page.get_latest_revision(), 3400 user=user, 3401 ) 3402 3403 return state 3404 3405 @transaction.atomic 3406 def deactivate(self, user=None): 3407 """Sets the workflow as inactive, and cancels all in progress instances of ``WorkflowState`` linked to this workflow""" 3408 self.active = False 3409 in_progress_states = WorkflowState.objects.filter(workflow=self, status=WorkflowState.STATUS_IN_PROGRESS) 3410 for state in in_progress_states: 3411 state.cancel(user=user) 3412 WorkflowPage.objects.filter(workflow=self).delete() 3413 self.save() 3414 3415 def all_pages(self): 3416 """ 3417 Returns a queryset of all the pages that this Workflow applies to. 3418 """ 3419 pages = Page.objects.none() 3420 3421 for workflow_page in self.workflow_pages.all(): 3422 pages |= workflow_page.get_pages() 3423 3424 return pages 3425 3426 class Meta: 3427 verbose_name = _('workflow') 3428 verbose_name_plural = _('workflows') 3429 3430 3431class GroupApprovalTask(Task): 3432 groups = models.ManyToManyField(Group, verbose_name=_('groups'), help_text=_('Pages at this step in a workflow will be moderated or approved by these groups of users')) 3433 3434 admin_form_fields = Task.admin_form_fields + ['groups'] 3435 admin_form_widgets = { 3436 'groups': forms.CheckboxSelectMultiple, 3437 } 3438 3439 def start(self, workflow_state, user=None): 3440 if workflow_state.page.locked_by: 3441 # If the person who locked the page isn't in one of the groups, unlock the page 3442 if not workflow_state.page.locked_by.groups.filter(id__in=self.groups.all()).exists(): 3443 workflow_state.page.locked = False 3444 workflow_state.page.locked_by = None 3445 workflow_state.page.locked_at = None 3446 workflow_state.page.save(update_fields=['locked', 'locked_by', 'locked_at']) 3447 3448 return super().start(workflow_state, user=user) 3449 3450 def user_can_access_editor(self, page, user): 3451 return self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser 3452 3453 def page_locked_for_user(self, page, user): 3454 return not (self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser) 3455 3456 def user_can_lock(self, page, user): 3457 return self.groups.filter(id__in=user.groups.all()).exists() 3458 3459 def user_can_unlock(self, page, user): 3460 return False 3461 3462 def get_actions(self, page, user): 3463 if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser: 3464 return [ 3465 ('reject', _("Request changes"), True), 3466 ('approve', _("Approve"), False), 3467 ('approve', _("Approve with comment"), True), 3468 ] 3469 3470 return [] 3471 3472 def get_task_states_user_can_moderate(self, user, **kwargs): 3473 if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser: 3474 return TaskState.objects.filter(status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr) 3475 else: 3476 return TaskState.objects.none() 3477 3478 @classmethod 3479 def get_description(cls): 3480 return _("Members of the chosen Wagtail Groups can approve this task") 3481 3482 class Meta: 3483 verbose_name = _('Group approval task') 3484 verbose_name_plural = _('Group approval tasks') 3485 3486 3487class WorkflowStateManager(models.Manager): 3488 def active(self): 3489 """ 3490 Filters to only STATUS_IN_PROGRESS and STATUS_NEEDS_CHANGES WorkflowStates 3491 """ 3492 return self.filter(Q(status=WorkflowState.STATUS_IN_PROGRESS) | Q(status=WorkflowState.STATUS_NEEDS_CHANGES)) 3493 3494 3495class WorkflowState(models.Model): 3496 """Tracks the status of a started Workflow on a Page.""" 3497 STATUS_IN_PROGRESS = 'in_progress' 3498 STATUS_APPROVED = 'approved' 3499 STATUS_NEEDS_CHANGES = 'needs_changes' 3500 STATUS_CANCELLED = 'cancelled' 3501 STATUS_CHOICES = ( 3502 (STATUS_IN_PROGRESS, _("In progress")), 3503 (STATUS_APPROVED, _("Approved")), 3504 (STATUS_NEEDS_CHANGES, _("Needs changes")), 3505 (STATUS_CANCELLED, _("Cancelled")), 3506 ) 3507 3508 page = models.ForeignKey('Page', on_delete=models.CASCADE, verbose_name=_("page"), related_name='workflow_states') 3509 workflow = models.ForeignKey('Workflow', on_delete=models.CASCADE, verbose_name=_('workflow'), related_name='workflow_states') 3510 status = models.fields.CharField(choices=STATUS_CHOICES, verbose_name=_("status"), max_length=50, default=STATUS_IN_PROGRESS) 3511 created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at")) 3512 requested_by = models.ForeignKey(settings.AUTH_USER_MODEL, 3513 verbose_name=_('requested by'), 3514 null=True, 3515 blank=True, 3516 editable=True, 3517 on_delete=models.SET_NULL, 3518 related_name='requested_workflows') 3519 current_task_state = models.OneToOneField('TaskState', on_delete=models.SET_NULL, null=True, blank=True, 3520 verbose_name=_("current task state")) 3521 3522 # allows a custom function to be called on finishing the Workflow successfully. 3523 on_finish = import_string(getattr(settings, 'WAGTAIL_FINISH_WORKFLOW_ACTION', 'wagtail.core.workflows.publish_workflow_state')) 3524 3525 objects = WorkflowStateManager() 3526 3527 def clean(self): 3528 super().clean() 3529 3530 if self.status in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES): 3531 # The unique constraint is conditional, and so not supported on the MySQL backend - so an additional check is done here 3532 if WorkflowState.objects.active().filter(page=self.page).exclude(pk=self.pk).exists(): 3533 raise ValidationError(_('There may only be one in progress or needs changes workflow state per page.')) 3534 3535 def save(self, *args, **kwargs): 3536 self.full_clean() 3537 return super().save(*args, **kwargs) 3538 3539 def __str__(self): 3540 return _("Workflow '{0}' on Page '{1}': {2}").format(self.workflow, self.page, self.status) 3541 3542 def resume(self, user=None): 3543 """Put a STATUS_NEEDS_CHANGES workflow state back into STATUS_IN_PROGRESS, and restart the current task""" 3544 if self.status != self.STATUS_NEEDS_CHANGES: 3545 raise PermissionDenied 3546 revision = self.current_task_state.page_revision 3547 current_task_state = self.current_task_state 3548 self.current_task_state = None 3549 self.status = self.STATUS_IN_PROGRESS 3550 self.save() 3551 3552 PageLogEntry.objects.log_action( 3553 instance=self.page.specific, 3554 action='wagtail.workflow.resume', 3555 data={ 3556 'workflow': { 3557 'id': self.workflow_id, 3558 'title': self.workflow.name, 3559 'status': self.status, 3560 'task_state_id': current_task_state.id, 3561 'task': { 3562 'id': current_task_state.task.id, 3563 'title': current_task_state.task.name, 3564 }, 3565 } 3566 }, 3567 revision=revision, 3568 user=user, 3569 ) 3570 return self.update(user=user, next_task=current_task_state.task) 3571 3572 def user_can_cancel(self, user): 3573 if self.page.locked and self.page.locked_by != user: 3574 return False 3575 return user == self.requested_by or user == self.page.owner or (self.current_task_state and self.current_task_state.status == self.current_task_state.STATUS_IN_PROGRESS and 'approve' in [action[0] for action in self.current_task_state.task.get_actions(self.page, user)]) 3576 3577 def update(self, user=None, next_task=None): 3578 """Checks the status of the current task, and progresses (or ends) the workflow if appropriate. If the workflow progresses, 3579 next_task will be used to start a specific task next if provided.""" 3580 if self.status != self.STATUS_IN_PROGRESS: 3581 # Updating a completed or cancelled workflow should have no effect 3582 return 3583 try: 3584 current_status = self.current_task_state.status 3585 except AttributeError: 3586 current_status = None 3587 if current_status == TaskState.STATUS_REJECTED: 3588 self.status = self.STATUS_NEEDS_CHANGES 3589 self.save() 3590 workflow_rejected.send(sender=self.__class__, instance=self, user=user) 3591 else: 3592 if not next_task: 3593 next_task = self.get_next_task() 3594 if next_task: 3595 if (not self.current_task_state) or self.current_task_state.status != self.current_task_state.STATUS_IN_PROGRESS: 3596 # if not on a task, or the next task to move to is not the current task (ie current task's status is 3597 # not STATUS_IN_PROGRESS), move to the next task 3598 self.current_task_state = next_task.specific.start(self, user=user) 3599 self.save() 3600 # if task has auto-approved, update the workflow again 3601 if self.current_task_state.status != self.current_task_state.STATUS_IN_PROGRESS: 3602 self.update(user=user) 3603 # otherwise, continue on the current task 3604 else: 3605 # if there is no uncompleted task, finish the workflow. 3606 self.finish(user=user) 3607 3608 @property 3609 def successful_task_states(self): 3610 successful_task_states = self.task_states.filter( 3611 Q(status=TaskState.STATUS_APPROVED) | Q(status=TaskState.STATUS_SKIPPED) 3612 ) 3613 if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False): 3614 successful_task_states = successful_task_states.filter(page_revision=self.page.get_latest_revision()) 3615 3616 return successful_task_states 3617 3618 def get_next_task(self): 3619 """Returns the next active task, which has not been either approved or skipped""" 3620 3621 return ( 3622 Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True) 3623 .exclude( 3624 task_states__in=self.successful_task_states 3625 ).order_by('workflow_tasks__sort_order').first() 3626 ) 3627 3628 def cancel(self, user=None): 3629 """Cancels the workflow state""" 3630 if self.status not in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES): 3631 raise PermissionDenied 3632 self.status = self.STATUS_CANCELLED 3633 self.save() 3634 3635 PageLogEntry.objects.log_action( 3636 instance=self.page.specific, 3637 action='wagtail.workflow.cancel', 3638 data={ 3639 'workflow': { 3640 'id': self.workflow_id, 3641 'title': self.workflow.name, 3642 'status': self.status, 3643 'task_state_id': self.current_task_state.id, 3644 'task': { 3645 'id': self.current_task_state.task.id, 3646 'title': self.current_task_state.task.name, 3647 }, 3648 } 3649 }, 3650 revision=self.current_task_state.page_revision, 3651 user=user, 3652 ) 3653 3654 for state in self.task_states.filter(status=TaskState.STATUS_IN_PROGRESS): 3655 # Cancel all in progress task states 3656 state.specific.cancel(user=user) 3657 workflow_cancelled.send(sender=self.__class__, instance=self, user=user) 3658 3659 @transaction.atomic 3660 def finish(self, user=None): 3661 """Finishes a successful in progress workflow, marking it as approved and performing the ``on_finish`` action""" 3662 if self.status != self.STATUS_IN_PROGRESS: 3663 raise PermissionDenied 3664 self.status = self.STATUS_APPROVED 3665 self.save() 3666 self.on_finish(user=user) 3667 workflow_approved.send(sender=self.__class__, instance=self, user=user) 3668 3669 def copy_approved_task_states_to_revision(self, revision): 3670 """This creates copies of previously approved task states with page_revision set to a different revision.""" 3671 approved_states = TaskState.objects.filter(workflow_state=self, status=TaskState.STATUS_APPROVED) 3672 for state in approved_states: 3673 state.copy(update_attrs={'page_revision': revision}) 3674 3675 def revisions(self): 3676 """Returns all page revisions associated with task states linked to the current workflow state""" 3677 return PageRevision.objects.filter( 3678 page_id=self.page_id, 3679 id__in=self.task_states.values_list('page_revision_id', flat=True) 3680 ).defer('content_json') 3681 3682 def _get_applicable_task_states(self): 3683 """Returns the set of task states whose status applies to the current revision""" 3684 3685 task_states = TaskState.objects.filter(workflow_state_id=self.id) 3686 # If WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=True, this is only task states created on the current revision 3687 if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False): 3688 latest_revision_id = self.revisions().order_by('-created_at', '-id').values_list('id', flat=True).first() 3689 task_states = task_states.filter(page_revision_id=latest_revision_id) 3690 return task_states 3691 3692 def all_tasks_with_status(self): 3693 """ 3694 Returns a list of Task objects that are linked with this workflow state's 3695 workflow. The status of that task in this workflow state is annotated in the 3696 `.status` field. And a displayable version of that status is annotated in the 3697 `.status_display` field. 3698 3699 This is different to querying TaskState as it also returns tasks that haven't 3700 been started yet (so won't have a TaskState). 3701 """ 3702 # Get the set of task states whose status applies to the current revision 3703 task_states = self._get_applicable_task_states() 3704 3705 tasks = list( 3706 self.workflow.tasks.annotate( 3707 status=Subquery( 3708 task_states.filter( 3709 task_id=OuterRef('id'), 3710 ).order_by( 3711 '-started_at', '-id' 3712 ).values('status')[:1] 3713 ), 3714 ) 3715 ) 3716 3717 # Manually annotate status_display 3718 status_choices = dict(TaskState.STATUS_CHOICES) 3719 for task in tasks: 3720 task.status_display = status_choices.get(task.status, _("Not started")) 3721 3722 return tasks 3723 3724 def all_tasks_with_state(self): 3725 """ 3726 Returns a list of Task objects that are linked with this WorkflowState's 3727 workflow, and have the latest task state. 3728 3729 In a "Submit for moderation -> reject at step 1 -> resubmit -> accept" workflow, this ensures 3730 the task list reflects the accept, rather than the reject. 3731 """ 3732 task_states = self._get_applicable_task_states() 3733 3734 tasks = list( 3735 self.workflow.tasks.annotate( 3736 task_state_id=Subquery( 3737 task_states.filter( 3738 task_id=OuterRef('id'), 3739 ).order_by( 3740 '-started_at', '-id' 3741 ).values('id')[:1] 3742 ), 3743 ) 3744 ) 3745 3746 task_states = {task_state.id: task_state for task_state in task_states} 3747 # Manually annotate task_state 3748 for task in tasks: 3749 task.task_state = task_states.get(task.task_state_id) 3750 3751 return tasks 3752 3753 @property 3754 def is_active(self): 3755 return self.status not in [self.STATUS_APPROVED, self.STATUS_CANCELLED] 3756 3757 @property 3758 def is_at_final_task(self): 3759 """Returns the next active task, which has not been either approved or skipped""" 3760 3761 last_task = Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)\ 3762 .exclude(task_states__in=self.successful_task_states)\ 3763 .order_by('workflow_tasks__sort_order').last() 3764 3765 return self.get_next_task() == last_task 3766 3767 class Meta: 3768 verbose_name = _('Workflow state') 3769 verbose_name_plural = _('Workflow states') 3770 # prevent multiple STATUS_IN_PROGRESS/STATUS_NEEDS_CHANGES workflows for the same page. This is only supported by specific databases (e.g. Postgres, SQL Server), so is checked additionally on save. 3771 constraints = [ 3772 models.UniqueConstraint(fields=['page'], condition=Q(status__in=('in_progress', 'needs_changes')), name='unique_in_progress_workflow') 3773 ] 3774 3775 3776class TaskStateManager(models.Manager): 3777 def reviewable_by(self, user): 3778 tasks = Task.objects.filter(active=True) 3779 states = TaskState.objects.none() 3780 for task in tasks: 3781 states = states | task.specific.get_task_states_user_can_moderate(user=user) 3782 return states 3783 3784 3785class TaskState(models.Model): 3786 """Tracks the status of a given Task for a particular page revision.""" 3787 STATUS_IN_PROGRESS = 'in_progress' 3788 STATUS_APPROVED = 'approved' 3789 STATUS_REJECTED = 'rejected' 3790 STATUS_SKIPPED = 'skipped' 3791 STATUS_CANCELLED = 'cancelled' 3792 STATUS_CHOICES = ( 3793 (STATUS_IN_PROGRESS, _("In progress")), 3794 (STATUS_APPROVED, _("Approved")), 3795 (STATUS_REJECTED, _("Rejected")), 3796 (STATUS_SKIPPED, _("Skipped")), 3797 (STATUS_CANCELLED, _("Cancelled")), 3798 ) 3799 3800 workflow_state = models.ForeignKey('WorkflowState', on_delete=models.CASCADE, verbose_name=_('workflow state'), related_name='task_states') 3801 page_revision = models.ForeignKey('PageRevision', on_delete=models.CASCADE, verbose_name=_('page revision'), related_name='task_states') 3802 task = models.ForeignKey('Task', on_delete=models.CASCADE, verbose_name=_('task'), related_name='task_states') 3803 status = models.fields.CharField(choices=STATUS_CHOICES, verbose_name=_("status"), max_length=50, default=STATUS_IN_PROGRESS) 3804 started_at = models.DateTimeField(verbose_name=_('started at'), auto_now_add=True) 3805 finished_at = models.DateTimeField(verbose_name=_('finished at'), blank=True, null=True) 3806 finished_by = models.ForeignKey( 3807 settings.AUTH_USER_MODEL, 3808 verbose_name=_('finished by'), 3809 null=True, 3810 blank=True, 3811 on_delete=models.SET_NULL, 3812 related_name='finished_task_states' 3813 ) 3814 comment = models.TextField(blank=True) 3815 content_type = models.ForeignKey( 3816 ContentType, 3817 verbose_name=_('content type'), 3818 related_name='wagtail_task_states', 3819 on_delete=models.CASCADE 3820 ) 3821 exclude_fields_in_copy = [] 3822 default_exclude_fields_in_copy = ['id'] 3823 3824 objects = TaskStateManager() 3825 3826 def __init__(self, *args, **kwargs): 3827 super().__init__(*args, **kwargs) 3828 if not self.id: 3829 # this model is being newly created 3830 # rather than retrieved from the db; 3831 if not self.content_type_id: 3832 # set content type to correctly represent the model class 3833 # that this was created as 3834 self.content_type = ContentType.objects.get_for_model(self) 3835 3836 def __str__(self): 3837 return _("Task '{0}' on Page Revision '{1}': {2}").format(self.task, self.page_revision, self.status) 3838 3839 @cached_property 3840 def specific(self): 3841 """ 3842 Return this TaskState in its most specific subclassed form. 3843 """ 3844 # the ContentType.objects manager keeps a cache, so this should potentially 3845 # avoid a database lookup over doing self.content_type. I think. 3846 content_type = ContentType.objects.get_for_id(self.content_type_id) 3847 model_class = content_type.model_class() 3848 if model_class is None: 3849 # Cannot locate a model class for this content type. This might happen 3850 # if the codebase and database are out of sync (e.g. the model exists 3851 # on a different git branch and we haven't rolled back migrations before 3852 # switching branches); if so, the best we can do is return the page 3853 # unchanged. 3854 return self 3855 elif isinstance(self, model_class): 3856 # self is already the an instance of the most specific class 3857 return self 3858 else: 3859 return content_type.get_object_for_this_type(id=self.id) 3860 3861 @transaction.atomic 3862 def approve(self, user=None, update=True, comment=''): 3863 """Approve the task state and update the workflow state""" 3864 if self.status != self.STATUS_IN_PROGRESS: 3865 raise PermissionDenied 3866 self.status = self.STATUS_APPROVED 3867 self.finished_at = timezone.now() 3868 self.finished_by = user 3869 self.comment = comment 3870 self.save() 3871 3872 self.log_state_change_action(user, 'approve') 3873 if update: 3874 self.workflow_state.update(user=user) 3875 task_approved.send(sender=self.specific.__class__, instance=self.specific, user=user) 3876 return self 3877 3878 @transaction.atomic 3879 def reject(self, user=None, update=True, comment=''): 3880 """Reject the task state and update the workflow state""" 3881 if self.status != self.STATUS_IN_PROGRESS: 3882 raise PermissionDenied 3883 self.status = self.STATUS_REJECTED 3884 self.finished_at = timezone.now() 3885 self.finished_by = user 3886 self.comment = comment 3887 self.save() 3888 3889 self.log_state_change_action(user, 'reject') 3890 if update: 3891 self.workflow_state.update(user=user) 3892 task_rejected.send(sender=self.specific.__class__, instance=self.specific, user=user) 3893 3894 return self 3895 3896 @cached_property 3897 def task_type_started_at(self): 3898 """Finds the first chronological started_at for successive TaskStates - ie started_at if the task had not been restarted""" 3899 task_states = TaskState.objects.filter(workflow_state=self.workflow_state).order_by('-started_at').select_related('task') 3900 started_at = None 3901 for task_state in task_states: 3902 if task_state.task == self.task: 3903 started_at = task_state.started_at 3904 elif started_at: 3905 break 3906 return started_at 3907 3908 @transaction.atomic 3909 def cancel(self, user=None, resume=False, comment=''): 3910 """Cancel the task state and update the workflow state. If ``resume`` is set to True, then upon update the workflow state 3911 is passed the current task as ``next_task``, causing it to start a new task state on the current task if possible""" 3912 self.status = self.STATUS_CANCELLED 3913 self.finished_at = timezone.now() 3914 self.comment = comment 3915 self.finished_by = user 3916 self.save() 3917 if resume: 3918 self.workflow_state.update(user=user, next_task=self.task.specific) 3919 else: 3920 self.workflow_state.update(user=user) 3921 task_cancelled.send(sender=self.specific.__class__, instance=self.specific, user=user) 3922 return self 3923 3924 def copy(self, update_attrs=None, exclude_fields=None): 3925 """Copy this task state, excluding the attributes in the ``exclude_fields`` list and updating any attributes to values 3926 specified in the ``update_attrs`` dictionary of ``attribute``: ``new value`` pairs""" 3927 exclude_fields = self.default_exclude_fields_in_copy + self.exclude_fields_in_copy + (exclude_fields or []) 3928 instance, child_object_map = _copy(self.specific, exclude_fields, update_attrs) 3929 instance.save() 3930 _copy_m2m_relations(self, instance, exclude_fields=exclude_fields) 3931 return instance 3932 3933 def get_comment(self): 3934 """ 3935 Returns a string that is displayed in workflow history. 3936 3937 This could be a comment by the reviewer, or generated. 3938 Use mark_safe to return HTML. 3939 """ 3940 return self.comment 3941 3942 def log_state_change_action(self, user, action): 3943 """Log the approval/rejection action""" 3944 page = self.page_revision.as_page_object() 3945 next_task = self.workflow_state.get_next_task() 3946 next_task_data = None 3947 if next_task: 3948 next_task_data = { 3949 'id': next_task.id, 3950 'title': next_task.name 3951 } 3952 PageLogEntry.objects.log_action( 3953 instance=page, 3954 action='wagtail.workflow.{}'.format(action), 3955 user=user, 3956 data={ 3957 'workflow': { 3958 'id': self.workflow_state.workflow.id, 3959 'title': self.workflow_state.workflow.name, 3960 'status': self.status, 3961 'task_state_id': self.id, 3962 'task': { 3963 'id': self.task.id, 3964 'title': self.task.name, 3965 }, 3966 'next': next_task_data, 3967 }, 3968 'comment': self.get_comment() 3969 }, 3970 revision=self.page_revision 3971 ) 3972 3973 class Meta: 3974 verbose_name = _('Task state') 3975 verbose_name_plural = _('Task states') 3976 3977 3978class PageLogEntryManager(BaseLogEntryManager): 3979 3980 def get_instance_title(self, instance): 3981 return instance.specific_deferred.get_admin_display_title() 3982 3983 def log_action(self, instance, action, **kwargs): 3984 kwargs.update(page=instance) 3985 return super().log_action(instance, action, **kwargs) 3986 3987 3988class PageLogEntry(BaseLogEntry): 3989 page = models.ForeignKey( 3990 'wagtailcore.Page', 3991 on_delete=models.DO_NOTHING, 3992 db_constraint=False, 3993 related_name='+' 3994 ) 3995 # Pointer to a specific page revision 3996 revision = models.ForeignKey( 3997 'wagtailcore.PageRevision', 3998 null=True, 3999 blank=True, 4000 on_delete=models.DO_NOTHING, 4001 db_constraint=False, 4002 related_name='+', 4003 ) 4004 4005 objects = PageLogEntryManager() 4006 4007 action_registry = page_log_action_registry 4008 4009 class Meta: 4010 ordering = ['-timestamp', '-id'] 4011 verbose_name = _('page log entry') 4012 verbose_name_plural = _('page log entries') 4013 4014 def __str__(self): 4015 return "PageLogEntry %d: '%s' on '%s' with id %s" % ( 4016 self.pk, self.action, self.object_verbose_name(), self.page_id 4017 ) 4018 4019 @cached_property 4020 def object_id(self): 4021 return self.page_id 4022 4023 4024class Comment(ClusterableModel): 4025 """ 4026 A comment on a field, or a field within a streamfield block 4027 """ 4028 page = ParentalKey(Page, on_delete=models.CASCADE, related_name=COMMENTS_RELATION_NAME) 4029 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name=COMMENTS_RELATION_NAME) 4030 text = models.TextField() 4031 4032 contentpath = models.TextField() 4033 # This stores the field or field within a streamfield block that the comment is applied on, in the form: 'field', or 'field.block_id.field' 4034 # This must be unchanging across all revisions, so we will not support (current-format) ListBlock or the contents of InlinePanels initially. 4035 4036 position = models.TextField(blank=True) 4037 # This stores the position within a field, to be interpreted by the field's frontend widget. It may change between revisions 4038 4039 created_at = models.DateTimeField(auto_now_add=True) 4040 updated_at = models.DateTimeField(auto_now=True) 4041 4042 revision_created = models.ForeignKey(PageRevision, on_delete=models.CASCADE, related_name='created_comments', null=True, blank=True) 4043 4044 resolved_at = models.DateTimeField(null=True, blank=True) 4045 resolved_by = models.ForeignKey( 4046 settings.AUTH_USER_MODEL, 4047 on_delete=models.SET_NULL, 4048 related_name='comments_resolved', 4049 null=True, 4050 blank=True 4051 ) 4052 4053 class Meta: 4054 verbose_name = _('comment') 4055 verbose_name_plural = _('comments') 4056 4057 def __str__(self): 4058 return "Comment on Page '{0}', left by {1}: '{2}'".format(self.page, self.user, self.text) 4059 4060 def save(self, update_position=False, **kwargs): 4061 # Don't save the position unless specifically instructed to, as the position will normally be retrieved from the revision 4062 update_fields = kwargs.pop('update_fields', None) 4063 if not update_position and (not update_fields or 'position' not in update_fields): 4064 if self.id: 4065 # The instance is already saved; we can use `update_fields` 4066 update_fields = update_fields if update_fields else self._meta.get_fields() 4067 update_fields = [field.name for field in update_fields if field.name not in {'position', 'id'}] 4068 else: 4069 # This is a new instance, we have to preserve and then restore the position via a variable 4070 position = self.position 4071 result = super().save(**kwargs) 4072 self.position = position 4073 return result 4074 return super().save(update_fields=update_fields, **kwargs) 4075 4076 def _log(self, action, page_revision=None, user=None): 4077 PageLogEntry.objects.log_action( 4078 instance=self.page, 4079 action=action, 4080 user=user, 4081 revision=page_revision, 4082 data={ 4083 'comment': { 4084 'id': self.pk, 4085 'contentpath': self.contentpath, 4086 'text': self.text, 4087 } 4088 } 4089 ) 4090 4091 def log_create(self, **kwargs): 4092 self._log('wagtail.comments.create', **kwargs) 4093 4094 def log_edit(self, **kwargs): 4095 self._log('wagtail.comments.edit', **kwargs) 4096 4097 def log_resolve(self, **kwargs): 4098 self._log('wagtail.comments.resolve', **kwargs) 4099 4100 def log_delete(self, **kwargs): 4101 self._log('wagtail.comments.delete', **kwargs) 4102 4103 4104class CommentReply(models.Model): 4105 comment = ParentalKey(Comment, on_delete=models.CASCADE, related_name='replies') 4106 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='comment_replies') 4107 text = models.TextField() 4108 created_at = models.DateTimeField(auto_now_add=True) 4109 updated_at = models.DateTimeField(auto_now=True) 4110 4111 class Meta: 4112 verbose_name = _('comment reply') 4113 verbose_name_plural = _('comment replies') 4114 4115 def __str__(self): 4116 return "CommentReply left by '{0}': '{1}'".format(self.user, self.text) 4117 4118 def _log(self, action, page_revision=None, user=None): 4119 PageLogEntry.objects.log_action( 4120 instance=self.comment.page, 4121 action=action, 4122 user=user, 4123 revision=page_revision, 4124 data={ 4125 'comment': { 4126 'id': self.comment.pk, 4127 'contentpath': self.comment.contentpath, 4128 'text': self.comment.text, 4129 }, 4130 'reply': { 4131 'id': self.pk, 4132 'text': self.text, 4133 } 4134 } 4135 ) 4136 4137 def log_create(self, **kwargs): 4138 self._log('wagtail.comments.create_reply', **kwargs) 4139 4140 def log_edit(self, **kwargs): 4141 self._log('wagtail.comments.edit_reply', **kwargs) 4142 4143 def log_delete(self, **kwargs): 4144 self._log('wagtail.comments.delete_reply', **kwargs) 4145 4146 4147class PageSubscription(models.Model): 4148 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='page_subscriptions') 4149 page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name='subscribers') 4150 4151 comment_notifications = models.BooleanField() 4152 4153 class Meta: 4154 unique_together = [ 4155 ('page', 'user'), 4156 ] 4157