1# -*- coding: utf-8 -*-
2import json
3import sys
4import warnings
5
6from django.conf import settings
7from django.contrib.auth import get_user_model
8from django.contrib.auth.models import AnonymousUser, Permission
9from django.contrib.sites.models import Site
10from django.core.cache import cache
11from django.core.exceptions import ObjectDoesNotExist
12from django.forms.models import model_to_dict
13from django.template import engines
14from django.template.context import Context
15from django.test import testcases
16from django.test.client import RequestFactory
17from django.urls import reverse
18from django.utils.http import urlencode
19from django.utils.six.moves.urllib.parse import unquote, urljoin
20from django.utils.timezone import now
21from django.utils.translation import activate
22from menus.menu_pool import menu_pool
23
24from cms.api import create_page
25from cms.constants import (
26    PUBLISHER_STATE_DEFAULT,
27    PUBLISHER_STATE_DIRTY,
28    PUBLISHER_STATE_PENDING,
29)
30from cms.plugin_rendering import ContentRenderer, StructureRenderer
31from cms.models import Page
32from cms.models.permissionmodels import (
33    GlobalPagePermission,
34    PagePermission,
35    PageUser,
36)
37from cms.test_utils.util.context_managers import UserLoginContext
38from cms.utils.conf import get_cms_setting
39from cms.utils.permissions import set_current_user
40from cms.utils.urlutils import admin_reverse
41
42
43URL_CMS_PAGE = "/en/admin/cms/page/"
44URL_CMS_PAGE_ADD = urljoin(URL_CMS_PAGE, "add/")
45URL_CMS_PAGE_CHANGE_BASE = urljoin(URL_CMS_PAGE, "%d/")
46URL_CMS_PAGE_CHANGE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "change/")
47URL_CMS_PAGE_ADVANCED_CHANGE = urljoin(URL_CMS_PAGE, "%d/advanced-settings/")
48URL_CMS_PAGE_PERMISSION_CHANGE = urljoin(URL_CMS_PAGE, "%d/permission-settings/")
49URL_CMS_PAGE_PERMISSIONS = urljoin(URL_CMS_PAGE, "%d/permissions/")
50URL_CMS_PAGE_PUBLISHED = urljoin(URL_CMS_PAGE, "published-pages/")
51URL_CMS_PAGE_MOVE = urljoin(URL_CMS_PAGE, "%d/move-page/")
52URL_CMS_PAGE_COPY = urljoin(URL_CMS_PAGE, "%d/copy-page/")
53URL_CMS_PAGE_CHANGE_LANGUAGE = URL_CMS_PAGE_CHANGE + "?language=%s"
54URL_CMS_PAGE_CHANGE_TEMPLATE = urljoin(URL_CMS_PAGE_CHANGE, "change-template/")
55URL_CMS_PAGE_PUBLISH = urljoin(URL_CMS_PAGE_CHANGE_BASE, "%s/publish/")
56URL_CMS_PAGE_DELETE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "delete/")
57URL_CMS_PLUGIN_ADD = urljoin(URL_CMS_PAGE, "add-plugin/")
58URL_CMS_PLUGIN_EDIT = urljoin(URL_CMS_PAGE, "edit-plugin/")
59URL_CMS_PLUGIN_MOVE = urljoin(URL_CMS_PAGE, "move-plugin/")
60URL_CMS_PLUGIN_PAGE_MOVE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "move-plugin/")
61URL_CMS_PLUGIN_PAGE_ADD = urljoin(URL_CMS_PAGE_CHANGE_BASE, "add-plugin/")
62URL_CMS_PLUGIN_REMOVE = urljoin(URL_CMS_PAGE, "delete-plugin/")
63URL_CMS_PLUGIN_DELETE = urljoin(URL_CMS_PAGE, "delete-plugin/%s/")
64URL_CMS_PLUGINS_COPY = urljoin(URL_CMS_PAGE, "copy-plugins/")
65URL_CMS_TRANSLATION_DELETE = urljoin(URL_CMS_PAGE_CHANGE_BASE, "delete-translation/")
66URL_CMS_USERSETTINGS = "/en/admin/cms/usersettings/"
67
68
69class _Warning(object):
70    def __init__(self, message, category, filename, lineno):
71        self.message = message
72        self.category = category
73        self.filename = filename
74        self.lineno = lineno
75
76
77def _collectWarnings(observeWarning, f, *args, **kwargs):
78    def showWarning(message, category, filename, lineno, file=None, line=None):
79        assert isinstance(message, Warning)
80        observeWarning(_Warning(
81            message.args[0], category, filename, lineno))
82
83    # Disable the per-module cache for every module otherwise if the warning
84    # which the caller is expecting us to collect was already emitted it won't
85    # be re-emitted by the call to f which happens below.
86    for v in sys.modules.values():
87        if v is not None:
88            try:
89                v.__warningregistry__ = None
90            except:
91                # Don't specify a particular exception type to handle in case
92                # some wacky object raises some wacky exception in response to
93                # the setattr attempt.
94                pass
95
96    origFilters = warnings.filters[:]
97    origShow = warnings.showwarning
98    warnings.simplefilter('always')
99    try:
100        warnings.showwarning = showWarning
101        result = f(*args, **kwargs)
102    finally:
103        warnings.filters[:] = origFilters
104        warnings.showwarning = origShow
105    return result
106
107
108class BaseCMSTestCase(object):
109    counter = 1
110
111    def _fixture_setup(self):
112        super(BaseCMSTestCase, self)._fixture_setup()
113        self.create_fixtures()
114        activate("en")
115
116    def create_fixtures(self):
117        pass
118
119    def _post_teardown(self):
120        menu_pool.clear()
121        cache.clear()
122        super(BaseCMSTestCase, self)._post_teardown()
123        set_current_user(None)
124
125    def login_user_context(self, user):
126        return UserLoginContext(self, user)
127
128    def get_permission(self, codename):
129        return Permission.objects.get(codename=codename)
130
131    def add_permission(self, user, codename):
132        user.user_permissions.add(self.get_permission(codename))
133
134    def remove_permission(self, user, codename):
135        user.user_permissions.remove(Permission.objects.get(codename=codename))
136
137    def add_global_permission(self, user, **kwargs):
138        options = {
139            'can_add': False,
140            'can_change': False,
141            'can_delete': False,
142            'can_change_advanced_settings': False,
143            'can_publish': False,
144            'can_change_permissions': False,
145            'can_move_page': False,
146            'can_recover_page': False,
147            'user': user,
148        }
149        options.update(**kwargs)
150
151        gpp = GlobalPagePermission.objects.create(**options)
152        gpp.sites.set(Site.objects.all())
153        return gpp
154
155    def add_page_permission(self, user, page, **kwargs):
156        options = {
157            'can_add': False,
158            'can_change': False,
159            'can_delete': False,
160            'can_change_advanced_settings': False,
161            'can_publish': False,
162            'can_change_permissions': False,
163            'can_move_page': False,
164            'page': page,
165            'user': user,
166        }
167        options.update(**kwargs)
168
169        pp = PagePermission.objects.create(**options)
170        pp.sites = Site.objects.all()
171        return pp
172
173    def _create_user(self, username, is_staff=False, is_superuser=False,
174                     is_active=True, add_default_permissions=False, permissions=None):
175        """
176        Use this method to create users.
177
178        Default permissions on page and text plugin are added if creating a
179        non-superuser and `add_default_permissions` is set.
180
181        Set `permissions` parameter to an iterable of permission codes to add
182        custom permissios.
183        """
184        User = get_user_model()
185
186        fields = dict(email=username + '@django-cms.org', last_login=now(),
187                      is_staff=is_staff, is_active=is_active, is_superuser=is_superuser
188        )
189
190        # Check for special case where email is used as username
191        if (get_user_model().USERNAME_FIELD != 'email'):
192            fields[get_user_model().USERNAME_FIELD] = username
193
194        user = User(**fields)
195
196        user.set_password(getattr(user, get_user_model().USERNAME_FIELD))
197        user.save()
198        if is_staff and not is_superuser and add_default_permissions:
199            user.user_permissions.add(Permission.objects.get(codename='add_text'))
200            user.user_permissions.add(Permission.objects.get(codename='delete_text'))
201            user.user_permissions.add(Permission.objects.get(codename='change_text'))
202            user.user_permissions.add(Permission.objects.get(codename='publish_page'))
203
204            user.user_permissions.add(Permission.objects.get(codename='add_page'))
205            user.user_permissions.add(Permission.objects.get(codename='change_page'))
206            user.user_permissions.add(Permission.objects.get(codename='delete_page'))
207        if is_staff and not is_superuser and permissions:
208            for permission in permissions:
209                user.user_permissions.add(Permission.objects.get(codename=permission))
210        return user
211
212    def get_superuser(self):
213        try:
214            query = dict()
215
216            if get_user_model().USERNAME_FIELD != "email":
217                query[get_user_model().USERNAME_FIELD] = "admin"
218            else:
219                query[get_user_model().USERNAME_FIELD] = "admin@django-cms.org"
220
221            admin = get_user_model().objects.get(**query)
222        except get_user_model().DoesNotExist:
223            admin = self._create_user("admin", is_staff=True, is_superuser=True)
224        return admin
225
226    def get_staff_user_with_no_permissions(self):
227        """
228        Used in security tests
229        """
230        staff = self._create_user("staff", is_staff=True, is_superuser=False)
231        return staff
232
233    def get_staff_user_with_std_permissions(self):
234        """
235        This is a non superuser staff
236        """
237        staff = self._create_user("staff", is_staff=True, is_superuser=False,
238                                  add_default_permissions=True)
239        return staff
240
241    def get_standard_user(self):
242        """
243        Used in security tests
244        """
245        standard = self._create_user("standard", is_staff=False, is_superuser=False)
246        return standard
247
248    def get_staff_page_user(self, created_by=None):
249        if not created_by:
250            created_by = self.get_superuser()
251
252        parent_link_field = list(PageUser._meta.parents.values())[0]
253        user = self._create_user(
254            'perms-testuser',
255            is_staff=True,
256            is_superuser=False,
257        )
258        data = model_to_dict(user, exclude=['groups', 'user_permissions'])
259        data[parent_link_field.name] = user
260        data['created_by'] = created_by
261        return PageUser.objects.create(**data)
262
263    def get_new_page_data(self, parent_id=''):
264        page_data = {
265            'title': 'test page %d' % self.counter,
266            'slug': 'test-page-%d' % self.counter,
267            'parent_node': parent_id,
268        }
269        # required only if user haves can_change_permission
270        self.counter += 1
271        return page_data
272
273    def get_new_page_data_dbfields(self, parent=None, site=None,
274                                   language=None,
275                                   template='nav_playground.html', ):
276        page_data = {
277            'title': 'test page %d' % self.counter,
278            'slug': 'test-page-%d' % self.counter,
279            'language': settings.LANGUAGES[0][0] if not language else language,
280            'template': template,
281            'parent': parent if parent else None,
282            'site': site if site else Site.objects.get_current(),
283        }
284        self.counter = self.counter + 1
285        return page_data
286
287    def get_pagedata_from_dbfields(self, page_data):
288        """Converts data created by get_new_page_data_dbfields to data
289        created from get_new_page_data so you can switch between test cases
290        in api.create_page and client.post"""
291        page_data['site'] = page_data['site'].id
292        page_data['parent'] = page_data['parent'].id if page_data['parent'] else ''
293        # required only if user haves can_change_permission
294        page_data['pagepermission_set-TOTAL_FORMS'] = 0
295        page_data['pagepermission_set-INITIAL_FORMS'] = 0
296        page_data['pagepermission_set-MAX_NUM_FORMS'] = 0
297        page_data['pagepermission_set-2-TOTAL_FORMS'] = 0
298        page_data['pagepermission_set-2-INITIAL_FORMS'] = 0
299        page_data['pagepermission_set-2-MAX_NUM_FORMS'] = 0
300        return page_data
301
302    def print_page_structure(self, qs):
303        """Just a helper to see the page struct.
304        """
305        for page in qs.order_by('path'):
306            ident = "  " * page.level
307            print(u"%s%s (%s), path: %s, depth: %s, numchild: %s" % (ident, page,
308            page.pk, page.path, page.depth, page.numchild))
309
310    def print_node_structure(self, nodes, *extra):
311        def _rec(nodes, level=0):
312            ident = level * '  '
313            for node in nodes:
314                raw_attrs = [(bit, getattr(node, bit, node.attr.get(bit, "unknown"))) for bit in extra]
315                attrs = ', '.join(['%s: %r' % data for data in raw_attrs])
316                print(u"%s%s: %s" % (ident, node.title, attrs))
317                _rec(node.children, level + 1)
318
319        _rec(nodes)
320
321    def assertObjectExist(self, qs, **filter):
322        try:
323            return qs.get(**filter)
324        except ObjectDoesNotExist:
325            pass
326        raise self.failureException("ObjectDoesNotExist raised for filter %s" % filter)
327
328    def assertObjectDoesNotExist(self, qs, **filter):
329        try:
330            qs.get(**filter)
331        except ObjectDoesNotExist:
332            return
333        raise self.failureException("ObjectDoesNotExist not raised for filter %s" % filter)
334
335    def copy_page(self, page, target_page, position=0, target_site=None):
336        from cms.utils.page import get_available_slug
337
338        if target_site is None:
339            target_site = target_page.node.site
340
341        data = {
342            'position': position,
343            'target': target_page.pk,
344            'source_site': page.node.site_id,
345            'copy_permissions': 'on',
346            'copy_moderation': 'on',
347        }
348        source_translation = page.title_set.all()[0]
349        parent_translation = target_page.title_set.all()[0]
350        language = source_translation.language
351        copied_page_path = source_translation.get_path_for_base(parent_translation.path)
352        new_page_slug = get_available_slug(target_site, copied_page_path, language)
353
354        with self.settings(SITE_ID=target_site.pk):
355            response = self.client.post(URL_CMS_PAGE + "%d/copy-page/" % page.pk, data)
356        self.assertEqual(response.status_code, 200)
357        response_data = json.loads(response.content.decode('utf8'))
358        copied_page = self.assertObjectExist(
359            Page.objects.all(),
360            pk=response_data['id'],
361        )
362        self.assertObjectExist(copied_page.title_set.filter(language=language), slug=new_page_slug)
363        page._clear_node_cache()
364        target_page._clear_node_cache()
365        return copied_page
366
367    def create_homepage(self, *args, **kwargs):
368        homepage = create_page(*args, **kwargs)
369        homepage.set_as_homepage()
370        return homepage.reload()
371
372    def move_page(self, page, target_page, position="first-child"):
373        page.move_page(target_page.node, position)
374        return self.reload_page(page)
375
376    def reload_page(self, page):
377        """
378        Returns a fresh instance of the page from the database
379        """
380        return self.reload(page)
381
382    def reload(self, obj):
383        return obj.__class__.objects.get(pk=obj.pk)
384
385    def get_pages_root(self):
386        return unquote(reverse("pages-root"))
387
388    def get_context(self, path=None, page=None):
389        if not path:
390            path = self.get_pages_root()
391        context = {}
392        request = self.get_request(path, page=page)
393        context['request'] = request
394        return Context(context)
395
396    def get_content_renderer(self, request=None):
397        request = request or self.get_request()
398        return ContentRenderer(request)
399
400    def get_structure_renderer(self, request=None):
401        request = request or self.get_request()
402        return StructureRenderer(request)
403
404    def get_request(self, path=None, language=None, post_data=None, enforce_csrf_checks=False, page=None):
405        factory = RequestFactory()
406
407        if not path:
408            path = self.get_pages_root()
409
410        if not language:
411            if settings.USE_I18N:
412                language = settings.LANGUAGES[0][0]
413            else:
414                language = settings.LANGUAGE_CODE
415
416        if post_data:
417            request = factory.post(path, post_data)
418        else:
419            request = factory.get(path)
420        request.session = self.client.session
421        request.user = getattr(self, 'user', AnonymousUser())
422        request.LANGUAGE_CODE = language
423        request._dont_enforce_csrf_checks = not enforce_csrf_checks
424        if page:
425            request.current_page = page
426        else:
427            request.current_page = None
428
429        class MockStorage(object):
430
431            def __len__(self):
432                return 0
433
434            def __iter__(self):
435                return iter([])
436
437            def add(self, level, message, extra_tags=''):
438                pass
439
440            def update(self, response):
441                pass
442
443        request._messages = MockStorage()
444        return request
445
446    def failUnlessWarns(self, category, message, f, *args, **kwargs):
447        warningsShown = []
448        result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
449
450        if not warningsShown:
451            self.fail("No warnings emitted")
452        first = warningsShown[0]
453        for other in warningsShown[1:]:
454            if ((other.message, other.category)
455                != (first.message, first.category)):
456                self.fail("Can't handle different warnings")
457        self.assertEqual(first.message, message)
458        self.assertTrue(first.category is category)
459
460        return result
461
462    assertWarns = failUnlessWarns
463
464    def load_template_from_string(self, template):
465        return engines['django'].from_string(template)
466
467    def get_template(self, template):
468        return engines['django'].get_template(template)
469
470    def render_template_obj(self, template, context, request):
471        template_obj = self.load_template_from_string(template)
472        return template_obj.render(context, request)
473
474    def apphook_clear(self):
475        from cms.apphook_pool import apphook_pool
476        for name, label in list(apphook_pool.get_apphooks()):
477            if apphook_pool.apps[name].__class__.__module__ in sys.modules:
478                del sys.modules[apphook_pool.apps[name].__class__.__module__]
479        apphook_pool.clear()
480
481    def get_admin_url(self, model, action, *args):
482        opts = model._meta
483        url_name = "{}_{}_{}".format(opts.app_label, opts.model_name, action)
484        return admin_reverse(url_name, args=args)
485
486    def get_permissions_test_page(self):
487        admin = self.get_superuser()
488        create_page(
489            "home",
490            "nav_playground.html",
491            "en",
492            created_by=admin,
493            published=True,
494        )
495        page = create_page(
496            "permissions",
497            "nav_playground.html",
498            "en",
499            created_by=admin,
500            published=True,
501            reverse_id='permissions',
502        )
503        return page
504
505    def get_plugin_model(self, plugin_type):
506        from cms.plugin_pool import plugin_pool
507
508        return plugin_pool.get_plugin(plugin_type).model
509
510    def get_add_plugin_uri(self, placeholder, plugin_type, language='en', parent=None):
511        if placeholder.page:
512            path = placeholder.page.get_absolute_url(language)
513        else:
514            path = '/{}/'.format(language)
515
516        endpoint = placeholder.get_add_url()
517        data = {
518            'plugin_type': plugin_type,
519            'placeholder_id': placeholder.pk,
520            'plugin_language': language,
521            'cms_path': path,
522        }
523
524        if parent:
525            data['plugin_parent'] = parent.pk
526        return endpoint + '?' + urlencode(data)
527
528    def get_change_plugin_uri(self, plugin, container=None, language=None):
529        container = container or Page
530        language = language or 'en'
531
532        if plugin.page:
533            path = plugin.page.get_absolute_url(language)
534        else:
535            path = '/{}/'.format(language)
536
537        endpoint = self.get_admin_url(container, 'edit_plugin', plugin.pk)
538        endpoint += '?' + urlencode({'cms_path': path})
539        return endpoint
540
541    def get_move_plugin_uri(self, plugin, container=None, language=None):
542        container = container or Page
543        language = language or 'en'
544
545        if plugin.page:
546            path = plugin.page.get_absolute_url(language)
547        else:
548            path = '/{}/'.format(language)
549
550        endpoint = self.get_admin_url(container, 'move_plugin')
551        endpoint += '?' + urlencode({'cms_path': path})
552        return endpoint
553
554    def get_copy_plugin_uri(self, plugin, container=None, language=None):
555        container = container or Page
556        language = language or 'en'
557
558        if plugin.page:
559            path = plugin.page.get_absolute_url(language)
560        else:
561            path = '/{}/'.format(language)
562
563        endpoint = self.get_admin_url(container, 'copy_plugins')
564        endpoint += '?' + urlencode({'cms_path': path})
565        return endpoint
566
567    def get_copy_placeholder_uri(self, placeholder, container=None, language=None):
568        container = container or Page
569        language = language or 'en'
570
571        if placeholder.page:
572            path = placeholder.page.get_absolute_url(language)
573        else:
574            path = '/{}/'.format(language)
575
576        endpoint = self.get_admin_url(container, 'copy_plugins')
577        endpoint += '?' + urlencode({'cms_path': path})
578        return endpoint
579
580    def get_delete_plugin_uri(self, plugin, container=None, language=None):
581        container = container or Page
582        language = language or 'en'
583
584        if plugin.page:
585            path = plugin.page.get_absolute_url(language)
586        else:
587            path = '/{}/'.format(language)
588
589        endpoint = self.get_admin_url(container, 'delete_plugin', plugin.pk)
590        endpoint += '?' + urlencode({'cms_path': path})
591        return endpoint
592
593    def get_clear_placeholder_url(self, placeholder, container=None, language=None):
594        container = container or Page
595        language = language or 'en'
596
597        if placeholder.page:
598            path = placeholder.page.get_absolute_url(language)
599        else:
600            path = '/{}/'.format(language)
601
602        endpoint = self.get_admin_url(
603            container,
604            'clear_placeholder',
605            placeholder.pk,
606        )
607        endpoint += '?' + urlencode({
608            'language': language,
609            'cms_path': path,
610        })
611        return endpoint
612
613    def get_edit_on_url(self, url):
614        return '{}?{}'.format(url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON'))
615
616    def get_edit_off_url(self, url):
617        return '{}?{}'.format(url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_OFF'))
618
619    def get_obj_structure_url(self, url):
620        return '{}?{}'.format(url, get_cms_setting('TOOLBAR_URL__BUILD'))
621
622    def get_toolbar_disable_url(self, url):
623        return '{}?{}'.format(url, get_cms_setting('TOOLBAR_URL__DISABLE'))
624
625
626class CMSTestCase(BaseCMSTestCase, testcases.TestCase):
627
628    def assertPending(self, page):
629        if page.publisher_is_draft:
630            # draft
631            self.assertFalse(page.is_published('en'))
632            self.assertTrue(bool(page.publisher_public_id))
633            self.assertTrue(page.get_title_obj('en').published)
634            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_PENDING)
635            self.assertPending(page.publisher_public)
636        else:
637            # public
638            self.assertFalse(page.is_published('en'))
639            self.assertTrue(bool(page.publisher_public_id))
640            self.assertFalse(page.get_title_obj('en').published)
641
642    def assertPublished(self, page):
643        if page.publisher_is_draft:
644            # draft
645            self.assertTrue(page.is_published('en'))
646            self.assertTrue(page.get_title_obj('en').published)
647            self.assertTrue(bool(page.publisher_public_id))
648            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DEFAULT)
649            self.assertPublished(page.publisher_public)
650        else:
651            # public
652            self.assertTrue(page.is_published('en'))
653            self.assertTrue(page.get_title_obj('en').published)
654            self.assertTrue(bool(page.publisher_public_id))
655
656    def assertUnpublished(self, page):
657        if page.publisher_is_draft:
658            # draft
659            self.assertFalse(page.is_published('en'))
660            self.assertTrue(bool(page.publisher_public_id))
661            self.assertFalse(page.get_title_obj('en').published)
662            self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DIRTY)
663            self.assertUnpublished(page.publisher_public)
664        else:
665            # public
666            self.assertFalse(page.is_published('en'))
667            self.assertTrue(bool(page.publisher_public_id))
668            self.assertFalse(page.get_title_obj('en').published)
669
670    def assertNeverPublished(self, page):
671        self.assertTrue(page.publisher_is_draft)
672        self.assertFalse(page.is_published('en'))
673        self.assertIsNone(page.publisher_public)
674        self.assertEqual(page.get_publisher_state("en"), PUBLISHER_STATE_DIRTY)
675
676
677class TransactionCMSTestCase(CMSTestCase, testcases.TransactionTestCase):
678    pass
679