1import json
2import logging
3
4from unittest import mock
5
6from django.conf import settings
7from django.contrib.auth.models import Group, Permission
8from django.core import mail
9from django.core.mail import EmailMultiAlternatives
10from django.test import TestCase, override_settings
11from django.urls import reverse
12from freezegun import freeze_time
13
14from wagtail.core.models import (
15    GroupApprovalTask, Page, Task, TaskState, Workflow, WorkflowPage, WorkflowState, WorkflowTask)
16from wagtail.core.signals import page_published
17from wagtail.tests.testapp.models import SimplePage, SimpleTask
18from wagtail.tests.utils import WagtailTestUtils
19from wagtail.users.models import UserProfile
20
21
22def delete_existing_workflows():
23    WorkflowPage.objects.all().delete()
24    Workflow.objects.all().delete()
25    Task.objects.all().delete()
26    WorkflowTask.objects.all().delete()
27
28
29class TestWorkflowMenus(TestCase, WagtailTestUtils):
30    def setUp(self):
31        self.login()
32
33        self.editor = self.create_user(
34            username='editor',
35            email='editor@email.com',
36            password='password',
37        )
38        editors = Group.objects.get(name='Editors')
39        editors.user_set.add(self.editor)
40
41    def test_workflow_settings_and_reports_menus_are_shown_to_admin(self):
42        response = self.client.get('/admin/')
43        self.assertContains(response, 'href="/admin/workflows/list/"')
44        self.assertContains(response, 'href="/admin/workflows/tasks/index/"')
45        self.assertContains(response, 'href="/admin/reports/workflow/"')
46        self.assertContains(response, 'href="/admin/reports/workflow_tasks/"')
47
48    def test_workflow_settings_menus_are_not_shown_to_editor(self):
49        self.login(user=self.editor)
50        response = self.client.get('/admin/')
51        self.assertNotContains(response, 'href="/admin/workflows/list/"')
52        self.assertNotContains(response, 'href="/admin/workflows/tasks/index/"')
53        self.assertContains(response, 'href="/admin/reports/workflow/"')
54        self.assertContains(response, 'href="/admin/reports/workflow_tasks/"')
55
56    @override_settings(WAGTAIL_WORKFLOW_ENABLED=False)
57    def test_workflow_menus_are_hidden_when_workflows_are_disabled(self):
58        response = self.client.get('/admin/')
59        self.assertNotContains(response, 'href="/admin/workflows/list/"')
60        self.assertNotContains(response, 'href="/admin/workflows/tasks/index/"')
61        self.assertNotContains(response, 'href="/admin/reports/workflow/"')
62        self.assertNotContains(response, 'href="/admin/reports/workflow_tasks/"')
63
64
65class TestWorkflowsIndexView(TestCase, WagtailTestUtils):
66
67    def setUp(self):
68        delete_existing_workflows()
69        self.login()
70
71        self.editor = self.create_user(
72            username='editor',
73            email='editor@email.com',
74            password='password',
75        )
76        editors = Group.objects.get(name='Editors')
77        editors.user_set.add(self.editor)
78
79        self.moderator = self.create_user(
80            username='moderator',
81            email='moderator@email.com',
82            password='password',
83        )
84        moderators = Group.objects.get(name='Moderators')
85        moderators.user_set.add(self.moderator)
86        moderators.permissions.add(Permission.objects.get(codename="add_workflow"))
87
88    def get(self, params={}):
89        return self.client.get(reverse('wagtailadmin_workflows:index'), params)
90
91    def test_simple(self):
92        response = self.get()
93        self.assertEqual(response.status_code, 200)
94        self.assertTemplateUsed(response, 'wagtailadmin/workflows/index.html')
95
96        # Initially there should be no workflows listed
97        self.assertContains(response, "There are no enabled workflows.")
98
99        Workflow.objects.create(name="test_workflow", active=True)
100
101        # Now the listing should contain our workflow
102        response = self.get()
103        self.assertEqual(response.status_code, 200)
104        self.assertTemplateUsed(response, 'wagtailadmin/workflows/index.html')
105        self.assertNotContains(response, "There are no enabled workflows.")
106        self.assertContains(response, "test_workflow")
107
108    def test_deactivated(self):
109        Workflow.objects.create(name="test_workflow", active=False)
110
111        # The listing should contain our workflow, as well as marking it as disabled
112        response = self.get(params={'show_disabled': 'true'})
113        self.assertEqual(response.status_code, 200)
114        self.assertNotContains(response, "No workflows have been created.")
115        self.assertContains(response, "test_workflow")
116        self.assertContains(response, '<span class="status-tag">Disabled</span>', html=True)
117
118        # If we set 'show_disabled' to 'False', the workflow should not be displayed
119        response = self.get(params={})
120        self.assertEqual(response.status_code, 200)
121        self.assertContains(response, "There are no enabled workflows.")
122
123    def test_permissions(self):
124        self.login(user=self.editor)
125        response = self.get()
126        self.assertEqual(response.status_code, 302)
127        full_context = {key: value for context in response.context for key, value in context.items()}
128        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
129
130        self.login(user=self.moderator)
131        response = self.get()
132        self.assertEqual(response.status_code, 200)
133
134
135class TestWorkflowsCreateView(TestCase, WagtailTestUtils):
136
137    def setUp(self):
138        delete_existing_workflows()
139        self.login()
140        self.task_1 = SimpleTask.objects.create(name="first_task")
141        self.task_2 = SimpleTask.objects.create(name="second_task")
142
143        self.editor = self.create_user(
144            username='editor',
145            email='editor@email.com',
146            password='password',
147        )
148        editors = Group.objects.get(name='Editors')
149        editors.user_set.add(self.editor)
150
151        self.moderator = self.create_user(
152            username='moderator',
153            email='moderator@email.com',
154            password='password',
155        )
156        moderators = Group.objects.get(name='Moderators')
157        moderators.user_set.add(self.moderator)
158        moderators.permissions.add(Permission.objects.get(codename="add_workflow"))
159
160        self.root_page = Page.objects.get(depth=1)
161
162    def get(self, params={}):
163        return self.client.get(reverse('wagtailadmin_workflows:add'), params)
164
165    def post(self, post_data={}):
166        return self.client.post(reverse('wagtailadmin_workflows:add'), post_data)
167
168    def test_get(self):
169        response = self.get()
170        self.assertEqual(response.status_code, 200)
171        self.assertTemplateUsed(response, 'wagtailadmin/workflows/create.html')
172
173    def test_post(self):
174        response = self.post({
175            'name': ['test_workflow'],
176            'active': ['on'],
177            'workflow_tasks-TOTAL_FORMS': ['2'],
178            'workflow_tasks-INITIAL_FORMS': ['0'],
179            'workflow_tasks-MIN_NUM_FORMS': ['0'],
180            'workflow_tasks-MAX_NUM_FORMS': ['1000'],
181            'workflow_tasks-0-task': [str(self.task_1.id)],
182            'workflow_tasks-0-id': [''],
183            'workflow_tasks-0-ORDER': ['1'],
184            'workflow_tasks-0-DELETE': [''],
185            'workflow_tasks-1-task': [str(self.task_2.id)],
186            'workflow_tasks-1-id': [''],
187            'workflow_tasks-1-ORDER': ['2'],
188            'workflow_tasks-1-DELETE': [''],
189            'pages-TOTAL_FORMS': ['2'],
190            'pages-INITIAL_FORMS': ['1'],
191            'pages-MIN_NUM_FORMS': ['0'],
192            'pages-MAX_NUM_FORMS': ['1000'],
193            'pages-0-page': [str(self.root_page.id)],
194            'pages-0-DELETE': [''],
195            'pages-1-page': [''],
196            'pages-1-DELETE': [''],
197        })
198
199        # Should redirect back to index
200        self.assertRedirects(response, reverse('wagtailadmin_workflows:index'))
201
202        # Check that the workflow was created
203        workflows = Workflow.objects.filter(name="test_workflow", active=True)
204        self.assertEqual(workflows.count(), 1)
205
206        workflow = workflows.first()
207
208        # Check that the tasks are associated with the workflow
209        self.assertEqual([self.task_1.task_ptr, self.task_2.task_ptr], list(workflow.tasks))
210
211        # Check that the tasks have sort_order set on WorkflowTask correctly
212        self.assertEqual(WorkflowTask.objects.get(workflow=workflow, task=self.task_1.task_ptr).sort_order, 0)
213        self.assertEqual(WorkflowTask.objects.get(workflow=workflow, task=self.task_2.task_ptr).sort_order, 1)
214
215    def test_permissions(self):
216        self.login(user=self.editor)
217        response = self.get()
218        self.assertEqual(response.status_code, 302)
219        full_context = {key: value for context in response.context for key, value in context.items()}
220        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
221
222        self.login(user=self.moderator)
223        response = self.get()
224        self.assertEqual(response.status_code, 200)
225
226    def test_page_already_has_workflow_check(self):
227        workflow = Workflow.objects.create(name="existing_workflow")
228        WorkflowPage.objects.create(workflow=workflow, page=self.root_page)
229
230        response = self.post({
231            'name': ['test_workflow'],
232            'active': ['on'],
233            'workflow_tasks-TOTAL_FORMS': ['2'],
234            'workflow_tasks-INITIAL_FORMS': ['0'],
235            'workflow_tasks-MIN_NUM_FORMS': ['0'],
236            'workflow_tasks-MAX_NUM_FORMS': ['1000'],
237            'workflow_tasks-0-task': [str(self.task_1.id)],
238            'workflow_tasks-0-id': [''],
239            'workflow_tasks-0-ORDER': ['1'],
240            'workflow_tasks-0-DELETE': [''],
241            'workflow_tasks-1-task': [str(self.task_2.id)],
242            'workflow_tasks-1-id': [''],
243            'workflow_tasks-1-ORDER': ['2'],
244            'workflow_tasks-1-DELETE': [''],
245            'pages-TOTAL_FORMS': ['2'],
246            'pages-INITIAL_FORMS': ['1'],
247            'pages-MIN_NUM_FORMS': ['0'],
248            'pages-MAX_NUM_FORMS': ['1000'],
249            'pages-0-page': [str(self.root_page.id)],
250            'pages-0-DELETE': [''],
251            'pages-1-page': [''],
252            'pages-1-DELETE': [''],
253        })
254
255        self.assertEqual(response.status_code, 200)
256        self.assertFormsetError(response, 'pages_formset', 0, 'page', ["This page already has workflow 'existing_workflow' assigned."])
257
258
259class TestWorkflowsEditView(TestCase, WagtailTestUtils):
260
261    def setUp(self):
262        delete_existing_workflows()
263        self.login()
264        self.workflow = Workflow.objects.create(name="workflow_to_edit")
265        self.task_1 = SimpleTask.objects.create(name="first_task")
266        self.task_2 = SimpleTask.objects.create(name="second_task")
267        self.inactive_task = SimpleTask.objects.create(name="inactive_task", active=False)
268        self.workflow_task = WorkflowTask.objects.create(workflow=self.workflow, task=self.task_1.task_ptr, sort_order=0)
269        self.page = Page.objects.first()
270        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
271
272        self.editor = self.create_user(
273            username='editor',
274            email='editor@email.com',
275            password='password',
276        )
277        editors = Group.objects.get(name='Editors')
278        editors.user_set.add(self.editor)
279
280        self.moderator = self.create_user(
281            username='moderator',
282            email='moderator@email.com',
283            password='password',
284        )
285        moderators = Group.objects.get(name='Moderators')
286        moderators.user_set.add(self.moderator)
287        moderators.permissions.add(Permission.objects.get(codename="change_workflow"))
288
289    def get(self, params={}):
290        return self.client.get(reverse('wagtailadmin_workflows:edit', args=[self.workflow.id]), params)
291
292    def post(self, post_data={}):
293        return self.client.post(reverse('wagtailadmin_workflows:edit', args=[self.workflow.id]), post_data)
294
295    def test_get(self):
296        response = self.get()
297        self.assertEqual(response.status_code, 200)
298        self.assertTemplateUsed(response, 'wagtailadmin/workflows/edit.html')
299
300        # Check that the list of pages has the page to which this workflow is assigned
301        self.assertContains(response, self.page.title)
302
303    def test_post(self):
304        response = self.post({
305            'name': [str(self.workflow.name)],
306            'active': ['on'],
307            'workflow_tasks-TOTAL_FORMS': ['2'],
308            'workflow_tasks-INITIAL_FORMS': ['1'],
309            'workflow_tasks-MIN_NUM_FORMS': ['0'],
310            'workflow_tasks-MAX_NUM_FORMS': ['1000'],
311            'workflow_tasks-0-task': [str(self.task_1.id)],
312            'workflow_tasks-0-id': [str(self.workflow_task.id)],
313            'workflow_tasks-0-ORDER': ['1'],
314            'workflow_tasks-0-DELETE': [''],
315            'workflow_tasks-1-task': [str(self.task_2.id)],
316            'workflow_tasks-1-id': [''],
317            'workflow_tasks-1-ORDER': ['2'],
318            'workflow_tasks-1-DELETE': [''],
319            'pages-TOTAL_FORMS': ['2'],
320            'pages-INITIAL_FORMS': ['1'],
321            'pages-MIN_NUM_FORMS': ['0'],
322            'pages-MAX_NUM_FORMS': ['1000'],
323            'pages-0-page': [str(self.page.id)],
324            'pages-0-DELETE': [''],
325            'pages-1-page': [''],
326            'pages-1-DELETE': [''],
327        })
328
329        # Should redirect back to index
330        self.assertRedirects(response, reverse('wagtailadmin_workflows:index'))
331
332        # Check that the workflow was created
333        workflows = Workflow.objects.filter(name="workflow_to_edit", active=True)
334        self.assertEqual(workflows.count(), 1)
335
336        workflow = workflows.first()
337
338        # Check that the tasks are associated with the workflow
339        self.assertEqual([self.task_1.task_ptr, self.task_2.task_ptr], list(workflow.tasks))
340
341        # Check that the tasks have sort_order set on WorkflowTask correctly
342        self.assertEqual(WorkflowTask.objects.get(workflow=workflow, task=self.task_1.task_ptr).sort_order, 0)
343        self.assertEqual(WorkflowTask.objects.get(workflow=workflow, task=self.task_2.task_ptr).sort_order, 1)
344
345    def test_permissions(self):
346        self.login(user=self.editor)
347        response = self.get()
348        self.assertEqual(response.status_code, 302)
349        full_context = {key: value for context in response.context for key, value in context.items()}
350        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
351
352        self.login(user=self.moderator)
353        response = self.get()
354        self.assertEqual(response.status_code, 200)
355
356    def test_duplicate_page_check(self):
357        response = self.post({
358            'name': [str(self.workflow.name)],
359            'active': ['on'],
360            'workflow_tasks-TOTAL_FORMS': ['2'],
361            'workflow_tasks-INITIAL_FORMS': ['1'],
362            'workflow_tasks-MIN_NUM_FORMS': ['0'],
363            'workflow_tasks-MAX_NUM_FORMS': ['1000'],
364            'workflow_tasks-0-task': [str(self.task_1.id)],
365            'workflow_tasks-0-id': [str(self.workflow_task.id)],
366            'workflow_tasks-0-ORDER': ['1'],
367            'workflow_tasks-0-DELETE': [''],
368            'workflow_tasks-1-task': [str(self.task_2.id)],
369            'workflow_tasks-1-id': [''],
370            'workflow_tasks-1-ORDER': ['2'],
371            'workflow_tasks-1-DELETE': [''],
372            'pages-TOTAL_FORMS': ['2'],
373            'pages-INITIAL_FORMS': ['1'],
374            'pages-MIN_NUM_FORMS': ['0'],
375            'pages-MAX_NUM_FORMS': ['1000'],
376            'pages-0-page': [str(self.page.id)],
377            'pages-0-DELETE': [''],
378            'pages-1-page': [str(self.page.id)],
379            'pages-1-DELETE': [''],
380        })
381
382        self.assertEqual(response.status_code, 200)
383        self.assertFormsetError(response, 'pages_formset', None, None, ['You cannot assign this workflow to the same page multiple times.'])
384
385    def test_pages_ignored_if_workflow_disabled(self):
386        self.workflow.active = False
387        self.workflow.save()
388        self.workflow.workflow_pages.all().delete()
389
390        response = self.post({
391            'name': [str(self.workflow.name)],
392            'active': ['on'],
393            'workflow_tasks-TOTAL_FORMS': ['2'],
394            'workflow_tasks-INITIAL_FORMS': ['1'],
395            'workflow_tasks-MIN_NUM_FORMS': ['0'],
396            'workflow_tasks-MAX_NUM_FORMS': ['1000'],
397            'workflow_tasks-0-task': [str(self.task_1.id)],
398            'workflow_tasks-0-id': [str(self.workflow_task.id)],
399            'workflow_tasks-0-ORDER': ['1'],
400            'workflow_tasks-0-DELETE': [''],
401            'workflow_tasks-1-task': [str(self.task_2.id)],
402            'workflow_tasks-1-id': [''],
403            'workflow_tasks-1-ORDER': ['2'],
404            'workflow_tasks-1-DELETE': [''],
405            'pages-TOTAL_FORMS': ['2'],
406            'pages-INITIAL_FORMS': ['1'],
407            'pages-MIN_NUM_FORMS': ['0'],
408            'pages-MAX_NUM_FORMS': ['1000'],
409            'pages-0-page': [str(self.page.id)],
410            'pages-0-DELETE': [''],
411            'pages-1-page': [''],
412            'pages-1-DELETE': [''],
413        })
414
415        # Should redirect back to index
416        self.assertRedirects(response, reverse('wagtailadmin_workflows:index'))
417
418        # Check that the pages weren't added to the workflow
419        self.workflow.refresh_from_db()
420        self.assertFalse(self.workflow.workflow_pages.exists())
421
422
423class TestRemoveWorkflow(TestCase, WagtailTestUtils):
424    fixtures = ['test.json']
425
426    def setUp(self):
427        delete_existing_workflows()
428        self.login()
429        self.workflow = Workflow.objects.create(name="workflow")
430        self.page = Page.objects.first()
431        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
432
433        self.editor = self.create_user(
434            username='editor',
435            email='editor@email.com',
436            password='password',
437        )
438        editors = Group.objects.get(name='Editors')
439        editors.user_set.add(self.editor)
440
441        self.moderator = self.create_user(
442            username='moderator',
443            email='moderator@email.com',
444            password='password',
445        )
446        moderators = Group.objects.get(name='Moderators')
447        moderators.user_set.add(self.moderator)
448        moderators.permissions.add(Permission.objects.get(codename="change_workflow"))
449
450    def post(self, post_data={}):
451        return self.client.post(reverse('wagtailadmin_workflows:remove', args=[self.page.id, self.workflow.id]), post_data)
452
453    def test_post(self):
454        # Check that a WorkflowPage instance is removed correctly
455        self.post()
456        self.assertEqual(WorkflowPage.objects.filter(workflow=self.workflow, page=self.page).count(), 0)
457
458    def test_no_permissions(self):
459        self.login(user=self.editor)
460        response = self.post()
461        self.assertEqual(response.status_code, 302)
462
463    def test_post_with_permission(self):
464        self.login(user=self.moderator)
465        response = self.post()
466        self.assertEqual(response.status_code, 302)
467
468
469class TestTaskIndexView(TestCase, WagtailTestUtils):
470
471    def setUp(self):
472        delete_existing_workflows()
473        self.login()
474
475        self.editor = self.create_user(
476            username='editor',
477            email='editor@email.com',
478            password='password',
479        )
480        editors = Group.objects.get(name='Editors')
481        editors.user_set.add(self.editor)
482
483        self.moderator = self.create_user(
484            username='moderator',
485            email='moderator@email.com',
486            password='password',
487        )
488        moderators = Group.objects.get(name='Moderators')
489        moderators.user_set.add(self.moderator)
490        moderators.permissions.add(Permission.objects.get(codename="change_task"))
491
492    def get(self, params={}):
493        return self.client.get(reverse('wagtailadmin_workflows:task_index'), params)
494
495    def test_simple(self):
496        response = self.get()
497        self.assertEqual(response.status_code, 200)
498        self.assertTemplateUsed(response, 'wagtailadmin/workflows/task_index.html')
499
500        # Initially there should be no tasks listed
501        self.assertContains(response, "There are no enabled tasks")
502
503        SimpleTask.objects.create(name="test_task", active=True)
504
505        # Now the listing should contain our task
506        response = self.get()
507        self.assertEqual(response.status_code, 200)
508        self.assertTemplateUsed(response, 'wagtailadmin/workflows/task_index.html')
509        self.assertNotContains(response, "There are no enabled tasks")
510        self.assertContains(response, "test_task")
511
512    def test_deactivated(self):
513        Task.objects.create(name="test_task", active=False)
514
515        # The listing should contain our task, as well as marking it as disabled
516        response = self.get(params={'show_disabled': 'true'})
517        self.assertEqual(response.status_code, 200)
518        self.assertNotContains(response, "No tasks have been created.")
519        self.assertContains(response, "test_task")
520        self.assertContains(response, '<span class="status-tag">Disabled</span>', html=True)
521
522        # The listing should not contain task if show_disabled query parameter is 'False'
523        response = self.get(params={})
524        self.assertEqual(response.status_code, 200)
525        self.assertContains(response, "There are no enabled tasks")
526        self.assertNotContains(response, "test_task")
527
528    def test_permissions(self):
529        self.login(user=self.editor)
530        response = self.get()
531        self.assertEqual(response.status_code, 302)
532        full_context = {key: value for context in response.context for key, value in context.items()}
533        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
534
535        self.login(user=self.moderator)
536        response = self.get()
537        self.assertEqual(response.status_code, 200)
538
539
540class TestCreateTaskView(TestCase, WagtailTestUtils):
541
542    def setUp(self):
543        delete_existing_workflows()
544        self.login()
545
546        self.editor = self.create_user(
547            username='editor',
548            email='editor@email.com',
549            password='password',
550        )
551        editors = Group.objects.get(name='Editors')
552        editors.user_set.add(self.editor)
553
554        self.moderator = self.create_user(
555            username='moderator',
556            email='moderator@email.com',
557            password='password',
558        )
559        moderators = Group.objects.get(name='Moderators')
560        moderators.user_set.add(self.moderator)
561        moderators.permissions.add(Permission.objects.get(codename="add_task"))
562
563    def get(self, url_kwargs=None, params={}):
564        url_kwargs = url_kwargs or {}
565        url_kwargs.setdefault('app_label', SimpleTask._meta.app_label)
566        url_kwargs.setdefault('model_name', SimpleTask._meta.model_name)
567        return self.client.get(reverse('wagtailadmin_workflows:add_task', kwargs=url_kwargs), params)
568
569    def post(self, post_data={}):
570        return self.client.post(reverse('wagtailadmin_workflows:add_task', kwargs={'app_label': SimpleTask._meta.app_label, 'model_name': SimpleTask._meta.model_name}), post_data)
571
572    def test_get(self):
573        response = self.get()
574        self.assertEqual(response.status_code, 200)
575        self.assertTemplateUsed(response, 'wagtailadmin/workflows/create_task.html')
576
577    def test_get_with_non_task_model(self):
578        response = self.get(url_kwargs={'app_label': 'wagtailcore', 'model_name': 'Site'})
579        self.assertEqual(response.status_code, 404)
580
581    def test_get_with_base_task_model(self):
582        response = self.get(url_kwargs={'app_label': 'wagtailcore', 'model_name': 'Task'})
583        self.assertEqual(response.status_code, 404)
584
585    def test_post(self):
586        response = self.post({'name': 'test_task', 'active': 'on'})
587
588        # Should redirect back to index
589        self.assertRedirects(response, reverse('wagtailadmin_workflows:task_index'))
590
591        # Check that the task was created
592        tasks = Task.objects.filter(name="test_task", active=True)
593        self.assertEqual(tasks.count(), 1)
594
595    def test_permissions(self):
596        self.login(user=self.editor)
597        response = self.get()
598        self.assertEqual(response.status_code, 302)
599        full_context = {key: value for context in response.context for key, value in context.items()}
600        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
601
602        self.login(user=self.moderator)
603        response = self.get()
604        self.assertEqual(response.status_code, 200)
605
606
607class TestSelectTaskTypeView(TestCase, WagtailTestUtils):
608
609    def setUp(self):
610        delete_existing_workflows()
611        self.login()
612
613    def get(self):
614        return self.client.get(reverse('wagtailadmin_workflows:select_task_type'))
615
616    def test_get(self):
617        response = self.get()
618        self.assertEqual(response.status_code, 200)
619        self.assertTemplateUsed(response, 'wagtailadmin/workflows/select_task_type.html')
620
621        # Check that the list of available task types includes SimpleTask and GroupApprovalTask
622        self.assertContains(response, SimpleTask.get_verbose_name())
623        self.assertContains(response, GroupApprovalTask.get_verbose_name())
624        self.assertContains(response, GroupApprovalTask.get_description())
625
626
627class TestEditTaskView(TestCase, WagtailTestUtils):
628
629    def setUp(self):
630        delete_existing_workflows()
631        self.login()
632        self.task = GroupApprovalTask.objects.create(name="test_task")
633
634        self.editor = self.create_user(
635            username='editor',
636            email='editor@email.com',
637            password='password',
638        )
639        editors = Group.objects.get(name='Editors')
640        editors.user_set.add(self.editor)
641
642        self.moderator = self.create_user(
643            username='moderator',
644            email='moderator@email.com',
645            password='password',
646        )
647        moderators = Group.objects.get(name='Moderators')
648        moderators.user_set.add(self.moderator)
649        moderators.permissions.add(Permission.objects.get(codename="change_task"))
650
651    def get(self, params={}):
652        return self.client.get(reverse('wagtailadmin_workflows:edit_task', args=[self.task.id]), params)
653
654    def post(self, post_data={}):
655        return self.client.post(reverse('wagtailadmin_workflows:edit_task', args=[self.task.id]), post_data)
656
657    def test_get(self):
658        response = self.get()
659        self.assertEqual(response.status_code, 200)
660        self.assertTemplateUsed(response, 'wagtailadmin/workflows/edit_task.html')
661
662    def test_post(self):
663        self.assertEqual(self.task.groups.count(), 0)
664        editors = Group.objects.get(name='Editors')
665
666        response = self.post({'name': 'test_task_modified', 'active': 'on', 'groups': [str(editors.id)]})
667
668        # Should redirect back to index
669        self.assertRedirects(response, reverse('wagtailadmin_workflows:task_index'))
670
671        # Check that the task was updated
672        task = GroupApprovalTask.objects.get(id=self.task.id)
673
674        # The task name cannot be changed
675        self.assertEqual(task.name, "test_task")
676
677        # This request should've added a group to the task
678        self.assertEqual(task.groups.count(), 1)
679        self.assertTrue(task.groups.filter(id=editors.id).exists())
680
681    def test_permissions(self):
682        self.login(user=self.editor)
683        response = self.get()
684        self.assertEqual(response.status_code, 302)
685        full_context = {key: value for context in response.context for key, value in context.items()}
686        self.assertEqual(full_context['message'], 'Sorry, you do not have permission to access this area.')
687
688        self.login(user=self.moderator)
689        response = self.get()
690        self.assertEqual(response.status_code, 200)
691
692
693class TestSubmitToWorkflow(TestCase, WagtailTestUtils):
694    def setUp(self):
695        delete_existing_workflows()
696        self.submitter = self.create_user(
697            username='submitter',
698            email='submitter@email.com',
699            password='password',
700        )
701        editors = Group.objects.get(name='Editors')
702        editors.user_set.add(self.submitter)
703        self.moderator = self.create_user(
704            username='moderator',
705            email='moderator@email.com',
706            password='password',
707        )
708        moderators = Group.objects.get(name='Moderators')
709        moderators.user_set.add(self.moderator)
710
711        self.superuser = self.create_superuser(
712            username='superuser',
713            email='superuser@email.com',
714            password='password',
715        )
716
717        self.login(user=self.submitter)
718
719        # Create a page
720        root_page = Page.objects.get(id=2)
721        self.page = SimplePage(
722            title="Hello world!",
723            slug='hello-world',
724            content="hello",
725            live=False,
726            has_unpublished_changes=True,
727        )
728        root_page.add_child(instance=self.page)
729        self.page.save_revision()
730
731        self.workflow, self.task_1, self.task_2 = self.create_workflow_and_tasks()
732
733        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
734
735    def create_workflow_and_tasks(self):
736        workflow = Workflow.objects.create(name='test_workflow')
737        task_1 = GroupApprovalTask.objects.create(name='test_task_1')
738        task_2 = GroupApprovalTask.objects.create(name='test_task_2')
739        task_1.groups.set(Group.objects.filter(name='Moderators'))
740        task_2.groups.set(Group.objects.filter(name='Moderators'))
741        WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
742        WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
743        return workflow, task_1, task_2
744
745    def submit(self):
746        post_data = {
747            'title': str(self.page.title),
748            'slug': str(self.page.slug),
749            'content': str(self.page.content),
750            'action-submit': "True",
751        }
752        return self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
753
754    def test_submit_for_approval_creates_states(self):
755        """Test that WorkflowState and TaskState objects are correctly created when a Page is submitted for approval"""
756
757        self.submit()
758
759        workflow_state = self.page.current_workflow_state
760
761        self.assertEqual(type(workflow_state), WorkflowState)
762        self.assertEqual(workflow_state.workflow, self.workflow)
763        self.assertEqual(workflow_state.status, workflow_state.STATUS_IN_PROGRESS)
764        self.assertEqual(workflow_state.requested_by, self.submitter)
765
766        task_state = workflow_state.current_task_state
767
768        self.assertEqual(type(task_state), TaskState)
769        self.assertEqual(task_state.task.specific, self.task_1)
770        self.assertEqual(task_state.status, task_state.STATUS_IN_PROGRESS)
771
772    def test_submit_for_approval_changes_status_in_header_meta(self):
773        edit_url = reverse('wagtailadmin_pages:edit', args=(self.page.id, ))
774
775        response = self.client.get(edit_url)
776        self.assertContains(response, 'Draft', count=1)
777
778        # submit for approval
779        self.submit()
780
781        response = self.client.get(edit_url)
782        workflow_status_url = reverse('wagtailadmin_pages:workflow_status', args=(self.page.id, ))
783        self.assertContains(response, workflow_status_url)
784        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.page.current_workflow_task.name))
785        self.assertNotContains(response, 'Draft')
786
787    def test_submit_sends_mail(self):
788        self.submit()
789        # 3 emails sent:
790        # - to moderator - submitted for approval in moderation stage test_task_1
791        # - to superuser - submitted for approval in moderation stage test_task_1
792        # - to superuser - submitted to workflow test_workflow
793        self.assertEqual(len(mail.outbox), 3)
794
795        # the 'submitted to workflow' email should include the submitter's name
796        workflow_message = None
797        for msg in mail.outbox:
798            if msg.subject == 'The page "Hello world! (simple page)" has been submitted to workflow "test_workflow"':
799                workflow_message = msg
800                break
801
802        self.assertTrue(workflow_message)
803        self.assertIn(
804            'The page "Hello world! (simple page)" has been submitted for moderation to workflow "test_workflow" by submitter',
805            workflow_message.body
806        )
807
808    @mock.patch.object(EmailMultiAlternatives, 'send', side_effect=IOError('Server down'))
809    def test_email_send_error(self, mock_fn):
810        logging.disable(logging.CRITICAL)
811
812        response = self.submit()
813        logging.disable(logging.NOTSET)
814
815        # An email that fails to send should return a message rather than crash the page
816        self.assertEqual(response.status_code, 302)
817        response = self.client.get(reverse('wagtailadmin_home'))
818
819    def test_resume_rejected_workflow(self):
820        # test that an existing workflow can be resumed by submitting when rejected
821        self.workflow.start(self.page, user=self.submitter)
822        workflow_state = self.page.current_workflow_state
823        workflow_state.current_task_state.approve(user=self.superuser)
824        workflow_state.refresh_from_db()
825        workflow_state.current_task_state.reject(user=self.superuser)
826        workflow_state.refresh_from_db()
827        self.assertEqual(workflow_state.current_task_state.task.specific, self.task_2)
828        self.assertEqual(workflow_state.status, WorkflowState.STATUS_NEEDS_CHANGES)
829
830        self.submit()
831        workflow_state.refresh_from_db()
832
833        # check that the same workflow state's status is now in progress
834        self.assertEqual(workflow_state.status, WorkflowState.STATUS_IN_PROGRESS)
835
836        # check that the workflow remains on the rejecting task, rather than resetting
837        self.assertEqual(workflow_state.current_task_state.task.specific, self.task_2)
838
839    def test_restart_rejected_workflow(self):
840        # test that an existing workflow can be restarted when rejected
841        self.workflow.start(self.page, user=self.submitter)
842        workflow_state = self.page.current_workflow_state
843        workflow_state.current_task_state.approve(user=self.superuser)
844        workflow_state.refresh_from_db()
845        workflow_state.current_task_state.reject(user=self.superuser)
846        workflow_state.refresh_from_db()
847        self.assertEqual(workflow_state.current_task_state.task.specific, self.task_2)
848        self.assertEqual(workflow_state.status, WorkflowState.STATUS_NEEDS_CHANGES)
849
850        post_data = {
851            'title': str(self.page.title),
852            'slug': str(self.page.slug),
853            'content': str(self.page.content),
854            'action-restart-workflow': "True",
855        }
856        self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
857        workflow_state.refresh_from_db()
858
859        # check that the same workflow state's status is now cancelled
860        self.assertEqual(workflow_state.status, WorkflowState.STATUS_CANCELLED)
861
862        # check that the new workflow has started on the first task
863        new_workflow_state = self.page.current_workflow_state
864        self.assertEqual(new_workflow_state.status, WorkflowState.STATUS_IN_PROGRESS)
865        self.assertEqual(new_workflow_state.current_task_state.task.specific, self.task_1)
866
867    def test_cancel_workflow(self):
868        # test that an existing workflow can be cancelled after submission by the submitter
869        self.workflow.start(self.page, user=self.submitter)
870        workflow_state = self.page.current_workflow_state
871        self.assertEqual(workflow_state.current_task_state.task.specific, self.task_1)
872        self.assertEqual(workflow_state.status, WorkflowState.STATUS_IN_PROGRESS)
873        post_data = {
874            'title': str(self.page.title),
875            'slug': str(self.page.slug),
876            'content': str(self.page.content),
877            'action-cancel-workflow': "True",
878        }
879        self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
880        workflow_state.refresh_from_db()
881
882        # check that the workflow state's status is now cancelled
883        self.assertEqual(workflow_state.status, WorkflowState.STATUS_CANCELLED)
884        self.assertEqual(workflow_state.current_task_state.status, TaskState.STATUS_CANCELLED)
885
886    def test_email_headers(self):
887        # Submit
888        self.submit()
889
890        msg_headers = set(mail.outbox[0].message().items())
891        headers = {('Auto-Submitted', 'auto-generated')}
892        self.assertTrue(headers.issubset(msg_headers), msg='Message is missing the Auto-Submitted header.',)
893
894
895@freeze_time("2020-03-31 12:00:00")
896class TestApproveRejectWorkflow(TestCase, WagtailTestUtils):
897    def setUp(self):
898        delete_existing_workflows()
899        self.submitter = self.create_user(
900            username='submitter',
901            first_name='Sebastian',
902            last_name='Mitter',
903            email='submitter@email.com',
904            password='password',
905        )
906        editors = Group.objects.get(name='Editors')
907        editors.user_set.add(self.submitter)
908        self.moderator = self.create_user(
909            username='moderator',
910            email='moderator@email.com',
911            password='password',
912        )
913        moderators = Group.objects.get(name='Moderators')
914        moderators.user_set.add(self.moderator)
915
916        self.superuser = self.create_superuser(
917            username='superuser',
918            email='superuser@email.com',
919            password='password',
920        )
921
922        self.login(user=self.submitter)
923
924        # Create a page
925        root_page = Page.objects.get(id=2)
926        self.page = SimplePage(
927            title="Hello world!",
928            slug='hello-world',
929            content="hello",
930            live=False,
931            has_unpublished_changes=True,
932        )
933        root_page.add_child(instance=self.page)
934
935        self.workflow, self.task_1 = self.create_workflow_and_tasks()
936
937        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
938
939        self.submit()
940
941        self.login(user=self.moderator)
942
943    def create_workflow_and_tasks(self):
944        workflow = Workflow.objects.create(name='test_workflow')
945        task_1 = GroupApprovalTask.objects.create(name='test_task_1')
946        task_1.groups.set(Group.objects.filter(name='Moderators'))
947        WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
948        return workflow, task_1
949
950    def submit(self):
951        post_data = {
952            'title': str(self.page.title),
953            'slug': str(self.page.slug),
954            'content': str(self.page.content),
955            'action-submit': "True",
956        }
957        return self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
958
959    @override_settings(WAGTAIL_FINISH_WORKFLOW_ACTION='')
960    def test_approve_task_and_workflow(self):
961        """
962        This posts to the approve task view and checks that the page was approved and published
963        """
964        # Unset WAGTAIL_FINISH_WORKFLOW_ACTION - default action should be to publish
965        del settings.WAGTAIL_FINISH_WORKFLOW_ACTION
966        # Connect a mock signal handler to page_published signal
967        mock_handler = mock.MagicMock()
968        page_published.connect(mock_handler)
969
970        # Post
971        self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)), {'comment': 'my comment'})
972
973        # Check that the workflow was approved
974
975        workflow_state = WorkflowState.objects.get(page=self.page, requested_by=self.submitter)
976
977        self.assertEqual(workflow_state.status, workflow_state.STATUS_APPROVED)
978
979        # Check that the task was approved
980
981        task_state = workflow_state.current_task_state
982
983        self.assertEqual(task_state.status, task_state.STATUS_APPROVED)
984
985        # Check that the comment was added to the task state correctly
986
987        self.assertEqual(task_state.comment, 'my comment')
988
989        page = Page.objects.get(id=self.page.id)
990        # Page must be live
991        self.assertTrue(page.live, "Approving moderation failed to set live=True")
992        # Page should now have no unpublished changes
993        self.assertFalse(
994            page.has_unpublished_changes,
995            "Approving moderation failed to set has_unpublished_changes=False"
996        )
997
998        # Check that the page_published signal was fired
999        self.assertEqual(mock_handler.call_count, 1)
1000        mock_call = mock_handler.mock_calls[0][2]
1001
1002        self.assertEqual(mock_call['sender'], self.page.specific_class)
1003        self.assertEqual(mock_call['instance'], self.page)
1004        self.assertIsInstance(mock_call['instance'], self.page.specific_class)
1005
1006    def test_workflow_action_get(self):
1007        """
1008        This tests that a GET request to the workflow action view (for the approve action) returns a modal with a form for extra data entry:
1009        adding a comment
1010        """
1011        response = self.client.get(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)))
1012        self.assertEqual(response.status_code, 200)
1013        self.assertTemplateUsed(response, 'wagtailadmin/pages/workflow_action_modal.html')
1014        html = json.loads(response.content)['html']
1015        self.assertTagInHTML('<form action="' + reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)) + '" method="POST" novalidate>', html)
1016        self.assertIn('Comment', html)
1017
1018    def test_workflow_action_view_bad_page_id(self):
1019        """
1020        This tests that the workflow action view handles invalid page ids correctly
1021        """
1022        # Post
1023        response = self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(127777777777, 'approve', self.page.current_workflow_task_state.id)))
1024
1025        # Check that the user received a 404 response
1026        self.assertEqual(response.status_code, 404)
1027
1028    def test_workflow_action_view_not_in_group(self):
1029        """
1030        This tests that the workflow action view for a GroupApprovalTask won't allow approval from a user not in the
1031        specified group/a superuser
1032        """
1033        # Remove privileges from user
1034        self.login(user=self.submitter)
1035
1036        # Post
1037        response = self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)))
1038        # Check that the user received a permission denied response
1039        self.assertRedirects(response, '/admin/')
1040
1041    def test_edit_view_workflow_cancellation_not_in_group(self):
1042        """
1043        This tests that the page edit view for a GroupApprovalTask, locked to a user not in the
1044        specified group/a superuser, still allows the submitter to cancel workflows
1045        """
1046        self.login(user=self.submitter)
1047
1048        # Post
1049        response = self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id, )), {'action-cancel-workflow': 'True'})
1050
1051        # Check that the user received a 200 response
1052        self.assertEqual(response.status_code, 200)
1053
1054        # Check that the workflow state was marked as cancelled
1055        workflow_state = WorkflowState.objects.get(page=self.page, requested_by=self.submitter)
1056        self.assertEqual(workflow_state.status, WorkflowState.STATUS_CANCELLED)
1057
1058    def test_reject_task_and_workflow(self):
1059        """
1060        This posts to the reject task view and checks that the page was rejected and not published
1061        """
1062        # Post
1063        self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'reject', self.page.current_workflow_task_state.id)))
1064
1065        # Check that the workflow was marked as needing changes
1066
1067        workflow_state = WorkflowState.objects.get(page=self.page, requested_by=self.submitter)
1068
1069        self.assertEqual(workflow_state.status, workflow_state.STATUS_NEEDS_CHANGES)
1070
1071        # Check that the task was rejected
1072
1073        task_state = workflow_state.current_task_state
1074
1075        self.assertEqual(task_state.status, task_state.STATUS_REJECTED)
1076
1077        page = Page.objects.get(id=self.page.id)
1078        # Page must not be live
1079        self.assertFalse(page.live)
1080
1081    def test_workflow_action_view_rejection_not_in_group(self):
1082        """
1083        This tests that the workflow action view for a GroupApprovalTask won't allow rejection from a user not in the
1084        specified group/a superuser
1085        """
1086        # Remove privileges from user
1087        self.login(user=self.submitter)
1088
1089        # Post
1090        response = self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'reject', self.page.current_workflow_task_state.id)))
1091
1092        # Check that the user received a permission denied response
1093        self.assertRedirects(response, '/admin/')
1094
1095    def test_collect_workflow_action_data_get(self):
1096        """
1097        This tests that a GET request to the collect_workflow_action_data view (for the approve action) returns a modal with a form for extra data entry:
1098        adding a comment
1099        """
1100        response = self.client.get(reverse('wagtailadmin_pages:collect_workflow_action_data', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)))
1101        self.assertEqual(response.status_code, 200)
1102        self.assertTemplateUsed(response, 'wagtailadmin/pages/workflow_action_modal.html')
1103        html = json.loads(response.content)['html']
1104        self.assertTagInHTML('<form action="' + reverse('wagtailadmin_pages:collect_workflow_action_data', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)) + '" method="POST" novalidate>', html)
1105        self.assertIn('Comment', html)
1106
1107    def test_collect_workflow_action_data_post(self):
1108        """
1109        This tests that a POST request to the collect_workflow_action_data view (for the approve action) returns a modal response with the validated data
1110        """
1111        response = self.client.post(
1112            reverse('wagtailadmin_pages:collect_workflow_action_data', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)),
1113            {'comment': "This is my comment"}
1114        )
1115        self.assertEqual(response.status_code, 200)
1116        response_json = json.loads(response.content)
1117        self.assertEqual(response_json['step'], 'success')
1118        self.assertEqual(response_json['cleaned_data'], {'comment': "This is my comment"})
1119
1120    def test_workflow_action_via_edit_view(self):
1121        """
1122        Posting to the 'edit' view with 'action-workflow-action' set should perform the given workflow action in addition to updating page content
1123        """
1124        # Post
1125        self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), {
1126            'title': "This title was edited while approving",
1127            'slug': str(self.page.slug),
1128            'content': str(self.page.content),
1129            'action-workflow-action': "True",
1130            'workflow-action-name': 'approve',
1131            'workflow-action-extra-data': '{"comment": "my comment"}'
1132        })
1133
1134        # Check that the workflow was approved
1135
1136        workflow_state = WorkflowState.objects.get(page=self.page, requested_by=self.submitter)
1137
1138        self.assertEqual(workflow_state.status, workflow_state.STATUS_APPROVED)
1139
1140        # Check that the task was approved
1141
1142        task_state = workflow_state.current_task_state
1143
1144        self.assertEqual(task_state.status, task_state.STATUS_APPROVED)
1145
1146        # Check that the comment was added to the task state correctly
1147
1148        self.assertEqual(task_state.comment, 'my comment')
1149
1150        # Check that page edits made at the same time as the action have been saved
1151        page = Page.objects.get(id=self.page.id)
1152        self.assertEqual(page.get_latest_revision_as_page().title, "This title was edited while approving")
1153
1154    def test_workflow_report(self):
1155        response = self.client.get(reverse('wagtailadmin_reports:workflow'))
1156        self.assertEqual(response.status_code, 200)
1157        self.assertContains(response, "Hello world!")
1158        self.assertContains(response, "test_workflow")
1159        self.assertContains(response, "Sebastian Mitter")
1160        self.assertContains(response, "March 31, 2020")
1161
1162        response = self.client.get(reverse('wagtailadmin_reports:workflow_tasks'))
1163        self.assertEqual(response.status_code, 200)
1164        self.assertContains(response, "Hello world!")
1165
1166    def test_workflow_report_filtered(self):
1167        # the moderator can review the task, so the workflow state should show up even when reports are filtered by reviewable
1168        response = self.client.get(reverse('wagtailadmin_reports:workflow'), {'reviewable': 'true'})
1169        self.assertEqual(response.status_code, 200)
1170        self.assertContains(response, "Hello world!")
1171        self.assertContains(response, "test_workflow")
1172        self.assertContains(response, "Sebastian Mitter")
1173        self.assertContains(response, "March 31, 2020")
1174
1175        response = self.client.get(reverse('wagtailadmin_reports:workflow_tasks'), {'reviewable': 'true'})
1176        self.assertEqual(response.status_code, 200)
1177        self.assertContains(response, "Hello world!")
1178
1179        # the submitter cannot review the task, so the workflow state shouldn't show up when reports are filtered by reviewable
1180        self.login(self.submitter)
1181        response = self.client.get(reverse('wagtailadmin_reports:workflow'), {'reviewable': 'true'})
1182        self.assertEqual(response.status_code, 200)
1183        self.assertNotContains(response, "Hello world!")
1184        self.assertNotContains(response, "Sebastian Mitter")
1185        self.assertNotContains(response, "March 31, 2020")
1186
1187        response = self.client.get(reverse('wagtailadmin_reports:workflow_tasks'), {'reviewable': 'true'})
1188        self.assertEqual(response.status_code, 200)
1189        self.assertNotContains(response, "Hello world!")
1190
1191
1192class TestNotificationPreferences(TestCase, WagtailTestUtils):
1193    def setUp(self):
1194        delete_existing_workflows()
1195        self.submitter = self.create_user(
1196            username='submitter',
1197            email='submitter@email.com',
1198            password='password',
1199        )
1200        editors = Group.objects.get(name='Editors')
1201        editors.user_set.add(self.submitter)
1202        self.moderator = self.create_user(
1203            username='moderator',
1204            email='moderator@email.com',
1205            password='password',
1206        )
1207        self.moderator2 = self.create_user(
1208            username='moderator2',
1209            email='moderator2@email.com',
1210            password='password',
1211        )
1212        moderators = Group.objects.get(name='Moderators')
1213        moderators.user_set.add(self.moderator)
1214        moderators.user_set.add(self.moderator2)
1215
1216        self.superuser = self.create_superuser(
1217            username='superuser',
1218            email='superuser@email.com',
1219            password='password',
1220        )
1221
1222        self.superuser_profile = UserProfile.get_for_user(self.superuser)
1223        self.moderator2_profile = UserProfile.get_for_user(self.moderator2)
1224        self.submitter_profile = UserProfile.get_for_user(self.submitter)
1225
1226        # Create a page
1227        root_page = Page.objects.get(id=2)
1228        self.page = SimplePage(
1229            title="Hello world!",
1230            slug='hello-world',
1231            content="hello",
1232            live=False,
1233            has_unpublished_changes=True,
1234        )
1235        root_page.add_child(instance=self.page)
1236
1237        self.workflow, self.task_1 = self.create_workflow_and_tasks()
1238
1239        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
1240
1241    def create_workflow_and_tasks(self):
1242        workflow = Workflow.objects.create(name='test_workflow')
1243        task_1 = GroupApprovalTask.objects.create(name='test_task_1')
1244        task_1.groups.set(Group.objects.filter(name='Moderators'))
1245        WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
1246        return workflow, task_1
1247
1248    def submit(self):
1249        post_data = {
1250            'title': str(self.page.title),
1251            'slug': str(self.page.slug),
1252            'content': str(self.page.content),
1253            'action-submit': "True",
1254        }
1255        return self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
1256
1257    def approve(self):
1258        return self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)))
1259
1260    def reject(self):
1261        return self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'reject', self.page.current_workflow_task_state.id)))
1262
1263    def test_vanilla_profile(self):
1264        # Check that the vanilla profile has rejected notifications on
1265        self.assertEqual(self.submitter_profile.rejected_notifications, True)
1266
1267        # Check that the vanilla profile has approved notifications on
1268        self.assertEqual(self.submitter_profile.approved_notifications, True)
1269
1270    @override_settings(WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS=True)
1271    def test_submitted_email_notifications_sent(self):
1272        """Test that 'submitted' notifications for WorkflowState and TaskState are both sent correctly"""
1273        self.login(self.submitter)
1274        self.submit()
1275
1276        self.assertEqual(len(mail.outbox), 4)
1277
1278        task_submission_emails = [email for email in mail.outbox if "task" in email.subject]
1279        task_submission_emailed_addresses = [address for email in task_submission_emails for address in email.to]
1280        workflow_submission_emails = [email for email in mail.outbox if "workflow" in email.subject]
1281        workflow_submission_emailed_addresses = [address for email in workflow_submission_emails for address in email.to]
1282
1283        self.assertEqual(len(task_submission_emails), 3)
1284        # the moderator is in the Group assigned to the GroupApproval task, so should get an email
1285        self.assertIn(self.moderator.email, task_submission_emailed_addresses)
1286        self.assertIn(self.moderator2.email, task_submission_emailed_addresses)
1287        # with `WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS`, the superuser should get a task email
1288        self.assertIn(self.superuser.email, task_submission_emailed_addresses)
1289        # the submitter triggered this workflow update, so should not get an email
1290        self.assertNotIn(self.submitter.email, task_submission_emailed_addresses)
1291
1292        self.assertEqual(len(workflow_submission_emails), 1)
1293        # the moderator should not get a workflow email
1294        self.assertNotIn(self.moderator.email, workflow_submission_emailed_addresses)
1295        self.assertNotIn(self.moderator2.email, workflow_submission_emailed_addresses)
1296        # with `WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS`, the superuser should get a workflow email
1297        self.assertIn(self.superuser.email, workflow_submission_emailed_addresses)
1298        # as the submitter was the triggering user, the submitter should not get an email notification
1299        self.assertNotIn(self.submitter.email, workflow_submission_emailed_addresses)
1300
1301    @override_settings(WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS=False)
1302    def test_submitted_email_notifications_superuser_settings(self):
1303        """Test that 'submitted' notifications for WorkflowState and TaskState are not sent to superusers if
1304        `WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS=False`"""
1305        self.login(self.submitter)
1306        self.submit()
1307
1308        task_submission_emails = [email for email in mail.outbox if "task" in email.subject]
1309        task_submission_emailed_addresses = [address for email in task_submission_emails for address in email.to]
1310        workflow_submission_emails = [email for email in mail.outbox if "workflow" in email.subject]
1311        workflow_submission_emailed_addresses = [address for email in workflow_submission_emails for address in email.to]
1312
1313        # with `WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS` off, the superuser should not get a task email
1314        self.assertNotIn(self.superuser.email, task_submission_emailed_addresses)
1315
1316        # with `WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS` off, the superuser should not get a workflow email
1317        self.assertNotIn(self.superuser.email, workflow_submission_emailed_addresses)
1318
1319    @override_settings(WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS=True)
1320    def test_submit_notification_preferences_respected(self):
1321        # moderator2 doesn't want emails
1322        self.moderator2_profile.submitted_notifications = False
1323        self.moderator2_profile.save()
1324
1325        # superuser doesn't want emails
1326        self.superuser_profile.submitted_notifications = False
1327        self.superuser_profile.save()
1328
1329        # Submit
1330        self.login(self.submitter)
1331        self.submit()
1332
1333        # Check that only one moderator got a task submitted email
1334        workflow_submission_emails = [email for email in mail.outbox if "workflow" in email.subject]
1335        workflow_submission_emailed_addresses = [address for email in workflow_submission_emails for address in email.to]
1336        task_submission_emails = [email for email in mail.outbox if "task" in email.subject]
1337        task_submission_emailed_addresses = [address for email in task_submission_emails for address in email.to]
1338        self.assertNotIn(self.moderator2.email, task_submission_emailed_addresses)
1339
1340        # Check that the superuser didn't receive a workflow or task email
1341        self.assertNotIn(self.superuser.email, task_submission_emailed_addresses)
1342        self.assertNotIn(self.superuser.email, workflow_submission_emailed_addresses)
1343
1344    def test_approved_notifications(self):
1345        self.login(self.submitter)
1346        self.submit()
1347        # Approve
1348        self.login(self.moderator)
1349        self.approve()
1350
1351        # Submitter must receive a workflow approved email
1352        workflow_approved_emails = [email for email in mail.outbox if ("workflow" in email.subject and "approved" in email.subject)]
1353        self.assertEqual(len(workflow_approved_emails), 1)
1354        self.assertIn(self.submitter.email, workflow_approved_emails[0].to)
1355
1356    def test_approved_notifications_preferences_respected(self):
1357        # Submitter doesn't want 'approved' emails
1358        self.submitter_profile.approved_notifications = False
1359        self.submitter_profile.save()
1360
1361        self.login(self.submitter)
1362        self.submit()
1363        # Approve
1364        self.login(self.moderator)
1365        self.approve()
1366
1367        # Submitter must not receive a workflow approved email, so there should be no emails in workflow_approved_emails
1368        workflow_approved_emails = [email for email in mail.outbox if ("workflow" in email.subject and "approved" in email.subject)]
1369        self.assertEqual(len(workflow_approved_emails), 0)
1370
1371    def test_rejected_notifications(self):
1372        self.login(self.submitter)
1373        self.submit()
1374        # Reject
1375        self.login(self.moderator)
1376        self.reject()
1377
1378        # Submitter must receive a workflow rejected email
1379        workflow_rejected_emails = [email for email in mail.outbox if ("workflow" in email.subject and "rejected" in email.subject)]
1380        self.assertEqual(len(workflow_rejected_emails), 1)
1381        self.assertIn(self.submitter.email, workflow_rejected_emails[0].to)
1382
1383    def test_rejected_notification_preferences_respected(self):
1384        # Submitter doesn't want 'rejected' emails
1385        self.submitter_profile.rejected_notifications = False
1386        self.submitter_profile.save()
1387
1388        self.login(self.submitter)
1389        self.submit()
1390        # Reject
1391        self.login(self.moderator)
1392        self.reject()
1393
1394        # Submitter must not receive a workflow rejected email
1395        workflow_rejected_emails = [email for email in mail.outbox if ("workflow" in email.subject and "rejected" in email.subject)]
1396        self.assertEqual(len(workflow_rejected_emails), 0)
1397
1398
1399class TestDisableViews(TestCase, WagtailTestUtils):
1400    def setUp(self):
1401        delete_existing_workflows()
1402        self.submitter = self.create_user(
1403            username='submitter',
1404            email='submitter@email.com',
1405            password='password',
1406        )
1407        editors = Group.objects.get(name='Editors')
1408        editors.user_set.add(self.submitter)
1409        self.moderator = self.create_user(
1410            username='moderator',
1411            email='moderator@email.com',
1412            password='password',
1413        )
1414        self.moderator2 = self.create_user(
1415            username='moderator2',
1416            email='moderator2@email.com',
1417            password='password',
1418        )
1419        moderators = Group.objects.get(name='Moderators')
1420        moderators.user_set.add(self.moderator)
1421        moderators.user_set.add(self.moderator2)
1422
1423        self.superuser = self.create_superuser(
1424            username='superuser',
1425            email='superuser@email.com',
1426            password='password',
1427        )
1428
1429        # Create a page
1430        root_page = Page.objects.get(id=2)
1431        self.page = SimplePage(
1432            title="Hello world!",
1433            slug='hello-world',
1434            content="hello",
1435            live=False,
1436            has_unpublished_changes=True,
1437        )
1438        root_page.add_child(instance=self.page)
1439
1440        self.workflow, self.task_1, self.task_2 = self.create_workflow_and_tasks()
1441
1442        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
1443
1444    def create_workflow_and_tasks(self):
1445        workflow = Workflow.objects.create(name='test_workflow')
1446        task_1 = GroupApprovalTask.objects.create(name='test_task_1')
1447        task_1.groups.set(Group.objects.filter(name='Moderators'))
1448        task_2 = GroupApprovalTask.objects.create(name='test_task_2')
1449        task_2.groups.set(Group.objects.filter(name='Moderators'))
1450        WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
1451        WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
1452        return workflow, task_1, task_2
1453
1454    def submit(self):
1455        post_data = {
1456            'title': str(self.page.title),
1457            'slug': str(self.page.slug),
1458            'content': str(self.page.content),
1459            'action-submit': "True",
1460        }
1461        return self.client.post(reverse('wagtailadmin_pages:edit', args=(self.page.id,)), post_data)
1462
1463    def approve(self):
1464        return self.client.post(reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, 'approve', self.page.current_workflow_task_state.id)))
1465
1466    def test_disable_workflow(self):
1467        """Test that deactivating a workflow sets it to inactive and cancels in progress states"""
1468        self.login(self.submitter)
1469        self.submit()
1470        self.login(self.superuser)
1471        self.approve()
1472
1473        response = self.client.post(reverse('wagtailadmin_workflows:disable', args=(self.workflow.pk,)))
1474        self.assertEqual(response.status_code, 302)
1475        self.workflow.refresh_from_db()
1476        self.assertEqual(self.workflow.active, False)
1477        states = WorkflowState.objects.filter(page=self.page, workflow=self.workflow)
1478        self.assertEqual(states.filter(status=WorkflowState.STATUS_IN_PROGRESS).count(), 0)
1479        self.assertEqual(states.filter(status=WorkflowState.STATUS_CANCELLED).count(), 1)
1480
1481        self.assertEqual(TaskState.objects.filter(workflow_state__workflow=self.workflow, status=TaskState.STATUS_IN_PROGRESS).count(), 0)
1482
1483    def test_disable_task(self):
1484        """Test that deactivating a task sets it to inactive and cancels in progress states"""
1485        self.login(self.submitter)
1486        self.submit()
1487        self.login(self.superuser)
1488
1489        response = self.client.post(reverse('wagtailadmin_workflows:disable_task', args=(self.task_1.pk,)))
1490        self.assertEqual(response.status_code, 302)
1491        self.task_1.refresh_from_db()
1492        self.assertEqual(self.task_1.active, False)
1493        states = TaskState.objects.filter(workflow_state__page=self.page, task=self.task_1.task_ptr)
1494        self.assertEqual(states.filter(status=TaskState.STATUS_IN_PROGRESS).count(), 0)
1495        self.assertEqual(states.filter(status=TaskState.STATUS_CANCELLED).count(), 1)
1496
1497        # Check that the page's WorkflowState has moved on to the next active task
1498        self.assertEqual(self.page.current_workflow_state.current_task_state.task.specific, self.task_2)
1499
1500    def test_enable_workflow(self):
1501        self.login(self.superuser)
1502        self.workflow.active = False
1503        self.workflow.save()
1504
1505        response = self.client.post(reverse('wagtailadmin_workflows:enable', args=(self.workflow.pk,)))
1506        self.assertEqual(response.status_code, 302)
1507        self.workflow.refresh_from_db()
1508        self.assertEqual(self.workflow.active, True)
1509
1510    def test_enable_task(self):
1511        self.login(self.superuser)
1512        self.task_1.active = False
1513        self.task_1.save()
1514
1515        response = self.client.post(reverse('wagtailadmin_workflows:enable_task', args=(self.task_1.pk,)))
1516        self.assertEqual(response.status_code, 302)
1517        self.task_1.refresh_from_db()
1518        self.assertEqual(self.task_1.active, True)
1519
1520
1521class TestTaskChooserView(TestCase, WagtailTestUtils):
1522    def setUp(self):
1523        self.login()
1524
1525    def test_get(self):
1526        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser'))
1527
1528        self.assertEqual(response.status_code, 200)
1529
1530        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/chooser.html")
1531
1532        # Check that the "select task type" view was shown in the "new" tab
1533        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/select_task_type.html")
1534        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/results.html")
1535        self.assertTemplateNotUsed(response, "wagtailadmin/workflows/task_chooser/includes/create_form.html")
1536        self.assertFalse(response.context['searchform'].is_searching())
1537
1538    def test_search(self):
1539        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser') + '?q=foo')
1540
1541        self.assertEqual(response.status_code, 200)
1542
1543        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/results.html")
1544        self.assertTemplateNotUsed(response, "wagtailadmin/workflows/task_chooser/chooser.html")
1545        self.assertTrue(response.context['searchform'].is_searching())
1546
1547    def test_pagination(self):
1548        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser') + '?p=2')
1549
1550        self.assertEqual(response.status_code, 200)
1551
1552        # When pagination is used, only the results template should be rendered
1553        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/results.html")
1554        self.assertTemplateNotUsed(response, "wagtailadmin/workflows/task_chooser/chooser.html")
1555        self.assertFalse(response.context['searchform'].is_searching())
1556
1557    def test_get_with_create_model_selected(self):
1558        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.GroupApprovalTask')
1559
1560        self.assertEqual(response.status_code, 200)
1561
1562        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/chooser.html")
1563
1564        # Check that the "create" view was shown in the "new" tab
1565        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/create_form.html")
1566        self.assertTemplateNotUsed(response, "wagtailadmin/workflows/task_chooser/includes/select_task_type.html")
1567
1568    def test_get_with_non_task_create_model_selected(self):
1569        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.Page')
1570
1571        self.assertEqual(response.status_code, 404)
1572
1573    def test_get_with_base_task_create_model_selected(self):
1574        # Task is technically a subclass of itself so we need an extra test for it
1575        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.Task')
1576
1577        self.assertEqual(response.status_code, 404)
1578
1579    @mock.patch('wagtail.admin.views.workflows.get_task_types')
1580    def test_get_with_single_task_model(self, get_task_types):
1581        # When a single task type exists there's no need to specify create_model
1582        get_task_types.return_value = [GroupApprovalTask]
1583
1584        response = self.client.get(reverse('wagtailadmin_workflows:task_chooser'))
1585
1586        self.assertEqual(response.status_code, 200)
1587
1588        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/chooser.html")
1589
1590        # Check that the "create" view was shown in the "new" tab
1591        self.assertTemplateUsed(response, "wagtailadmin/workflows/task_chooser/includes/create_form.html")
1592        self.assertTemplateNotUsed(response, "wagtailadmin/workflows/task_chooser/includes/select_task_type.html")
1593
1594    # POST requests are for creating new tasks
1595
1596    def get_post_data(self):
1597        return {
1598            'create-task-name': 'Editor approval task',
1599            'create-task-groups': [str(Group.objects.get(name="Editors").id)],
1600        }
1601
1602    def test_post_with_create_model_selected(self):
1603        response = self.client.post(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.GroupApprovalTask', self.get_post_data())
1604
1605        self.assertEqual(response.status_code, 201)
1606
1607        # Check that the task was created
1608        task = Task.objects.get(name="Editor approval task", active=True)
1609
1610        # Check the response JSON
1611        self.assertEqual(response.json(), {
1612            "step": "task_chosen",
1613            "result": {
1614                "id": task.id,
1615                "name": "Editor approval task",
1616                "edit_url": reverse('wagtailadmin_workflows:edit_task', args=[task.id])
1617            }
1618        })
1619
1620    @mock.patch('wagtail.admin.views.workflows.get_task_types')
1621    def test_post_with_single_task_model(self, get_task_types):
1622        # When a single task type exists there's no need to specify create_model
1623        get_task_types.return_value = [GroupApprovalTask]
1624
1625        response = self.client.post(reverse('wagtailadmin_workflows:task_chooser'), self.get_post_data())
1626
1627        self.assertEqual(response.status_code, 201)
1628
1629        # Check that the task was created
1630        task = Task.objects.get(name="Editor approval task", active=True)
1631
1632        # Check the response JSON
1633        self.assertEqual(response.json(), {
1634            "step": "task_chosen",
1635            "result": {
1636                "id": task.id,
1637                "name": "Editor approval task",
1638                "edit_url": reverse('wagtailadmin_workflows:edit_task', args=[task.id])
1639            }
1640        })
1641
1642    def test_post_without_create_model_selected(self):
1643        response = self.client.post(reverse('wagtailadmin_workflows:task_chooser'), self.get_post_data())
1644
1645        self.assertEqual(response.status_code, 400)
1646
1647        # Check that the task wasn't created
1648        self.assertFalse(Task.objects.filter(name="Editor approval task", active=True).exists())
1649
1650    def test_post_with_non_task_create_model_selected(self):
1651        response = self.client.post(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.Page', self.get_post_data())
1652
1653        self.assertEqual(response.status_code, 404)
1654
1655        # Check that the task wasn't created
1656        self.assertFalse(Task.objects.filter(name="Editor approval task", active=True).exists())
1657
1658    def test_post_with_base_task_create_model_selected(self):
1659        # Task is technically a subclass of itself so we need an extra test for it
1660        response = self.client.post(reverse('wagtailadmin_workflows:task_chooser') + '?create_model=wagtailcore.Task', self.get_post_data())
1661
1662        self.assertEqual(response.status_code, 404)
1663
1664        # Check that the task wasn't created
1665        self.assertFalse(Task.objects.filter(name="Editor approval task", active=True).exists())
1666
1667
1668class TestTaskChooserChosenView(TestCase, WagtailTestUtils):
1669    def setUp(self):
1670        delete_existing_workflows()
1671        self.login()
1672        self.task = SimpleTask.objects.create(name="test_task")
1673
1674    def test_get(self):
1675        response = self.client.get(reverse('wagtailadmin_workflows:task_chosen', args=[self.task.id]))
1676
1677        self.assertEqual(response.status_code, 200)
1678        self.assertEqual(response['Content-Type'], 'application/json')
1679        self.assertEqual(response.json(), {
1680            'result': {
1681                'edit_url': reverse('wagtailadmin_workflows:edit_task', args=[self.task.id]),
1682                'id': self.task.id,
1683                'name': 'test_task'
1684            },
1685            'step': 'task_chosen'
1686        })
1687
1688
1689class TestWorkflowUsageView(TestCase, WagtailTestUtils):
1690    def setUp(self):
1691        self.login()
1692        self.workflow = Workflow.objects.get()
1693
1694        self.root_page = Page.objects.get(depth=1)
1695        self.home_page = Page.objects.get(depth=2)
1696
1697        self.child_page_with_another_workflow = self.home_page.add_child(instance=SimplePage(title="Another page", content="I'm another page"))
1698        self.another_workflow = Workflow.objects.create(name="Another workflow")
1699        self.another_workflow.workflow_pages.create(page=self.child_page_with_another_workflow)
1700
1701    def test_get(self):
1702        response = self.client.get(reverse('wagtailadmin_workflows:usage', args=[self.workflow.id]))
1703
1704        self.assertEqual(response.status_code, 200)
1705
1706        object_set = set(page.id for page in response.context['used_by'].object_list)
1707        self.assertIn(self.root_page.id, object_set)
1708        self.assertIn(self.home_page.id, object_set)
1709        self.assertNotIn(self.child_page_with_another_workflow.id, object_set)
1710
1711
1712@freeze_time("2020-06-01 12:00:00")
1713class TestWorkflowStatus(TestCase, WagtailTestUtils):
1714    def setUp(self):
1715        delete_existing_workflows()
1716        self.submitter = self.create_user(
1717            username='submitter',
1718            email='submitter@email.com',
1719            password='password',
1720        )
1721        editors = Group.objects.get(name='Editors')
1722        editors.user_set.add(self.submitter)
1723        self.moderator = self.create_user(
1724            username='moderator',
1725            email='moderator@email.com',
1726            password='password',
1727        )
1728        moderators = Group.objects.get(name='Moderators')
1729        moderators.user_set.add(self.moderator)
1730
1731        self.superuser = self.create_superuser(
1732            username='superuser',
1733            email='superuser@email.com',
1734            password='password',
1735        )
1736
1737        self.login(self.superuser)
1738
1739        # Create a page
1740        root_page = Page.objects.get(id=2)
1741        self.page = SimplePage(
1742            title="Hello world!",
1743            slug='hello-world',
1744            content="hello",
1745            live=False,
1746            has_unpublished_changes=True,
1747        )
1748        root_page.add_child(instance=self.page)
1749
1750        self.workflow, self.task_1, self.task_2 = self.create_workflow_and_tasks()
1751
1752        WorkflowPage.objects.create(workflow=self.workflow, page=self.page)
1753
1754        self.edit_url = reverse('wagtailadmin_pages:edit', args=(self.page.id, ))
1755
1756    def create_workflow_and_tasks(self):
1757        workflow = Workflow.objects.create(name='test_workflow')
1758        task_1 = GroupApprovalTask.objects.create(name='test_task_1')
1759        task_1.groups.set(Group.objects.filter(name='Moderators'))
1760        WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
1761
1762        task_2 = GroupApprovalTask.objects.create(name='test_task_2')
1763        task_2.groups.set(Group.objects.filter(name='Editors'))
1764        WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
1765        return workflow, task_1, task_2
1766
1767    def submit(self, action='action-submit'):
1768        post_data = {
1769            'title': str(self.page.title),
1770            'slug': str(self.page.slug),
1771            'content': str(self.page.content),
1772            action: "True",
1773        }
1774        return self.client.post(self.edit_url, post_data)
1775
1776    def workflow_action(self, action):
1777        post_data = {
1778            'action': action,
1779            'comment': 'good work' if action == 'approve' else 'needs some changes',
1780            'next': self.edit_url
1781        }
1782        return self.client.post(
1783            reverse('wagtailadmin_pages:workflow_action', args=(self.page.id, action, self.page.current_workflow_task_state.id)),
1784            post_data,
1785            follow=True
1786        )
1787
1788    def test_workflow_status_modal(self):
1789        workflow_status_url = reverse('wagtailadmin_pages:workflow_status', args=(self.page.id, ))
1790
1791        # The page workflow status view should return permission denied when the page is but a draft
1792        response = self.client.get(workflow_status_url)
1793        self.assertRedirects(response, '/admin/')
1794
1795        # Submit for moderation
1796        self.submit()
1797
1798        response = self.client.get(workflow_status_url)
1799        self.assertEqual(response.status_code, 200)
1800        html = response.json().get('html')
1801        self.assertIn(self.task_1.name, html)
1802        self.assertIn('{}: In progress'.format(self.task_1.name), html)
1803        self.assertIn(self.task_2.name, html)
1804        self.assertIn('{}: Not started'.format(self.task_2.name), html)
1805        self.assertIn(reverse('wagtailadmin_pages:history', args=(self.page.id, )), html)
1806
1807        self.assertTemplateUsed(response, 'wagtailadmin/workflows/workflow_status.html')
1808
1809    def test_status_through_workflow_cycle(self):
1810        self.login(self.superuser)
1811        response = self.client.get(self.edit_url)
1812        self.assertContains(response, 'Draft', 1)
1813
1814        self.page.save_revision()
1815        response = self.client.get(self.edit_url)
1816        self.assertContains(response, 'Draft saved', 1)
1817
1818        self.submit()
1819        response = self.client.get(self.edit_url)
1820        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.task_1.name))
1821
1822        response = self.workflow_action('approve')
1823        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.task_2.name))
1824
1825        response = self.workflow_action('reject')
1826        self.assertContains(response, 'Changes requested')
1827
1828        # resubmit
1829        self.submit()
1830        response = self.client.get(self.edit_url)
1831        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.task_2.name))
1832
1833        response = self.workflow_action('approve')
1834        self.assertContains(response, 'Published')
1835
1836    def test_status_after_cancel(self):
1837        # start workflow, then cancel
1838        self.submit()
1839        self.submit('action-cancel-workflow')
1840        response = self.client.get(self.edit_url)
1841        self.assertContains(response, 'Draft saved')
1842
1843    def test_status_after_restart(self):
1844        self.submit()
1845        response = self.workflow_action('approve')
1846        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.task_2.name))
1847        self.workflow_action('reject')
1848        self.submit('action-restart-workflow')
1849        response = self.client.get(self.edit_url)
1850        self.assertRegex(response.content.decode('utf-8'), r'Awaiting[\s|\n]+{}'.format(self.task_1.name))
1851
1852    def test_workflow_status_modal_task_comments(self):
1853        workflow_status_url = reverse('wagtailadmin_pages:workflow_status', args=(self.page.id,))
1854
1855        self.submit()
1856        self.workflow_action('reject')
1857
1858        response = self.client.get(workflow_status_url)
1859        self.assertIn('needs some changes', response.json().get('html'))
1860
1861        self.submit()
1862        self.workflow_action('approve')
1863        response = self.client.get(workflow_status_url)
1864        self.assertIn('good work', response.json().get('html'))
1865
1866    def test_workflow_edit_locked_message(self):
1867        self.submit()
1868        self.login(self.submitter)
1869        response = self.client.get(self.edit_url)
1870
1871        needle = "This page is awaiting <b>\'test_task_1\'</b> in the <b>\'test_workflow\'</b> workflow. Only reviewers for this task can edit the page."
1872        self.assertTagInHTML(needle, str(response.content), count=1)
1873
1874        self.login(self.moderator)
1875        response = self.client.get(self.edit_url)
1876        self.assertNotInHTML(needle, str(response.content))
1877