1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2016-2021 Edgewall Software
4# All rights reserved.
5#
6# This software is licensed as described in the file COPYING, which
7# you should have received as part of this distribution. The terms
8# are also available at https://trac.edgewall.org/wiki/TracLicense.
9#
10# This software consists of voluntary contributions made by many
11# individuals. For the exact contribution history, see the revision
12# history and logs, available at https://trac.edgewall.org/log/.
13
14import unittest
15from datetime import datetime
16
17from trac.notification.model import Subscription
18from trac.test import EnvironmentStub, MockRequest
19from trac.util.datefmt import to_utimestamp, utc
20
21
22class SubscriptionTestCase(unittest.TestCase):
23
24    def setUp(self):
25        self.env = EnvironmentStub()
26
27    def tearDown(self):
28        self.env.reset_db()
29
30    def _add_subscriber(self, req, class_, distributor='email',
31                        format='text/plain', adverb='always'):
32        session = req.session
33        args = {'sid': session.sid, 'authenticated': session.authenticated,
34                'distributor': distributor, 'format': format, 'adverb': adverb,
35                'class': class_}
36        return Subscription.add(self.env, args)
37
38    def _insert_rows(self):
39        rows = [
40            ('joe', 1, 'email', 'text/plain', 1, 'always', 'EmailSubscriber1'),
41            ('joe', 1, 'email', 'text/html',  2, 'always', 'EmailSubscriber2'),
42            ('joe', 1, 'email', 'text/plain', 3, 'always', 'EmailSubscriber3'),
43            ('joe', 1, 'xmpp',  'text/html',  1, 'always', 'XmppSubscriber1'),
44            ('joe', 1, 'xmpp',  'text/plain', 2, 'never',  'XmppSubscriber2'),
45            ('joe', 1, 'xmpp',  'text/html',  3, 'never',  'XmppSubscriber3'),
46            ('joe', 1, 'irc',   'text/plain', 1, 'never',  'IrcSubscriber1'),
47            ('joe', 1, 'irc',   'text/plain', 2, 'never',  'IrcSubscriber2'),
48            ('joe', 1, 'irc',   'text/plain', 3, 'never',  'IrcSubscriber3'),
49            ('jes', 1, 'email', 'text/html',  1, 'always', 'EmailSubscriber1'),
50            ('jes', 1, 'email', 'text/plain', 2, 'never',  'EmailSubscriber2'),
51            ('jes', 1, 'email', 'text/html',  3, 'always', 'EmailSubscriber3'),
52            ('jan', 1, 'xmpp',  'text/plain', 1, 'always', 'XmppSubscriber1'),
53            ('jan', 1, 'xmpp',  'text/html',  2, 'never',  'XmppSubscriber2'),
54            ('jan', 1, 'xmpp',  'text/plain', 3, 'never',  'XmppSubscriber3'),
55            ('jim', 1, 'irc',   'text/html',  1, 'always', 'IrcSubscriber1'),
56            ('jim', 1, 'irc',   'text/plain', 2, 'never',  'IrcSubscriber2'),
57            ('jim', 1, 'irc',   'text/html',  3, 'always', 'IrcSubscriber3'),
58        ]
59        ts = to_utimestamp(datetime(2016, 2, 3, 12, 34, 56, 987654, utc))
60        with self.env.db_transaction as db:
61            cursor = db.cursor()
62            cursor.executemany("""
63                INSERT INTO notify_subscription (
64                    time, changetime, sid, authenticated, distributor,
65                    format, priority, adverb, class)
66                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
67                [(ts + idx, ts + idx * 2) + row for idx, row
68                                                in enumerate(rows)])
69
70    def _props(self, items, name):
71        return [item[name] for item in items]
72
73    def test_add(self):
74        req = MockRequest(self.env, authname='joe')
75        with self.env.db_transaction:
76            self._add_subscriber(req, 'TicketSubscriber1', format='text/html')
77            self._add_subscriber(req, 'TicketSubscriber2')
78            self._add_subscriber(req, 'TicketSubscriber3', format='text/html')
79            self._add_subscriber(req, 'XmppSubscriber1', distributor='xmpp',
80                                 adverb='never')
81        self.assertEqual(
82            [('joe', 1, 'email', 'text/html', 1, 'always',
83              'TicketSubscriber1'),
84             ('joe', 1, 'email', 'text/plain', 2, 'always',
85              'TicketSubscriber2'),
86             ('joe', 1, 'email', 'text/html', 3, 'always',
87              'TicketSubscriber3'),
88             ('joe', 1, 'xmpp',  'text/plain', 1, 'never',
89              'XmppSubscriber1')],
90            self.env.db_query("""\
91                SELECT sid, authenticated, distributor, format, priority,
92                       adverb, class
93                FROM notify_subscription
94                WHERE sid=%s AND authenticated=%s
95                ORDER BY distributor, priority""", ('joe', 1)))
96
97    def test_delete(self):
98        req = MockRequest(self.env, authname='joe')
99        with self.env.db_transaction:
100            ids = [self._add_subscriber(req, 'TicketSubscriber1'),
101                   self._add_subscriber(req, 'TicketSubscriber2'),
102                   self._add_subscriber(req, 'TicketSubscriber3'),
103                   self._add_subscriber(req, 'XmppSubscriber1',
104                                        distributor='xmpp', adverb='never'),
105                   self._add_subscriber(req, 'XmppSubscriber2',
106                                        distributor='xmpp')]
107        self.assertEqual(5, self.env.db_query("""\
108            SELECT COUNT(*) FROM notify_subscription
109            WHERE sid=%s AND authenticated=%s""", ('joe', 1))[0][0])
110
111        Subscription.delete(self.env, ids[1])
112        rows = self.env.db_query("""\
113            SELECT id, distributor, priority, class FROM notify_subscription
114            WHERE sid=%s AND authenticated=%s
115            ORDER BY distributor, priority""", ('joe', 1))
116        self.assertEqual((ids[0], 'email', 1, 'TicketSubscriber1'), rows[0])
117        self.assertEqual((ids[2], 'email', 2, 'TicketSubscriber3'), rows[1])
118        self.assertEqual((ids[3], 'xmpp', 1, 'XmppSubscriber1'), rows[2])
119        self.assertEqual((ids[4], 'xmpp', 2, 'XmppSubscriber2'), rows[3])
120        self.assertEqual(4, len(rows))
121
122    def test_find_by_sid_and_distributor(self):
123        self._insert_rows()
124        items = Subscription.find_by_sid_and_distributor(self.env, 'joe', True,
125                                                         'xmpp')
126        self.assertEqual(['joe'] * 3, self._props(items, 'sid'))
127        self.assertEqual([1] * 3, self._props(items, 'authenticated'))
128        self.assertEqual(['xmpp'] * 3, self._props(items, 'distributor'))
129        self.assertEqual(['text/html', 'text/plain', 'text/html'],
130                         self._props(items, 'format'))
131        self.assertEqual([1, 2, 3], self._props(items, 'priority'))
132        self.assertEqual(['always', 'never', 'never'], self._props(items, 'adverb'))
133        self.assertEqual(['XmppSubscriber1', 'XmppSubscriber2',
134                          'XmppSubscriber3'], self._props(items, 'class'))
135
136    def test_find_by_sids_and_class(self):
137        self._insert_rows()
138        sids = [('joe', True), ('jes', True), ('jan', True), ('jim', True)]
139        items = Subscription.find_by_sids_and_class(self.env, sids,
140                                                    'IrcSubscriber3')
141        self.assertEqual(['joe', 'jim'], self._props(items, 'sid'))
142        self.assertEqual([1] * 2, self._props(items, 'authenticated'))
143        self.assertEqual(['irc'] * 2, self._props(items, 'distributor'))
144        self.assertEqual(['text/plain', 'text/html'],
145                         self._props(items, 'format'))
146        self.assertEqual([3, 3], self._props(items, 'priority'))
147        self.assertEqual(['never', 'always'], self._props(items, 'adverb'))
148        self.assertEqual(['IrcSubscriber3', 'IrcSubscriber3'],
149                         self._props(items, 'class'))
150
151    def test_move(self):
152        def query_subs():
153            return self.env.db_query("""\
154                SELECT distributor, priority, class
155                FROM notify_subscription WHERE sid=%s AND authenticated=%s
156                ORDER BY distributor, priority""", ('joe', 1))
157
158        req = MockRequest(self.env, authname='joe')
159        with self.env.db_transaction:
160            rule_ids = {}
161            for class_, distributor in [('EmailSubscriber1', 'email'),
162                                        ('EmailSubscriber2', 'email'),
163                                        ('EmailSubscriber3', 'email'),
164                                        ('EmailSubscriber4', 'email'),
165                                        ('XmppSubscriber1',  'xmpp'),
166                                        ('XmppSubscriber2',  'xmpp')]:
167                rule_ids[(class_, distributor)] = \
168                    self._add_subscriber(req, class_, distributor)
169        self.assertEqual([('email', 1, 'EmailSubscriber1'),
170                          ('email', 2, 'EmailSubscriber2'),
171                          ('email', 3, 'EmailSubscriber3'),
172                          ('email', 4, 'EmailSubscriber4'),
173                          ('xmpp',  1, 'XmppSubscriber1'),
174                          ('xmpp',  2, 'XmppSubscriber2'),
175                         ], query_subs())
176
177        Subscription.move(self.env, rule_ids[('EmailSubscriber3', 'email')], 1)
178        self.assertEqual([('email', 1, 'EmailSubscriber3'),
179                          ('email', 2, 'EmailSubscriber1'),
180                          ('email', 3, 'EmailSubscriber2'),
181                          ('email', 4, 'EmailSubscriber4'),
182                          ('xmpp',  1, 'XmppSubscriber1'),
183                          ('xmpp',  2, 'XmppSubscriber2'),
184                         ], query_subs())
185
186        Subscription.move(self.env, rule_ids[('EmailSubscriber1', 'email')], 4)
187        self.assertEqual([('email', 1, 'EmailSubscriber3'),
188                          ('email', 2, 'EmailSubscriber2'),
189                          ('email', 3, 'EmailSubscriber4'),
190                          ('email', 4, 'EmailSubscriber1'),
191                          ('xmpp',  1, 'XmppSubscriber1'),
192                          ('xmpp',  2, 'XmppSubscriber2'),
193                         ], query_subs())
194
195        Subscription.move(self.env, rule_ids[('EmailSubscriber3', 'email')], 3)
196        self.assertEqual([('email', 1, 'EmailSubscriber2'),
197                          ('email', 2, 'EmailSubscriber4'),
198                          ('email', 3, 'EmailSubscriber3'),
199                          ('email', 4, 'EmailSubscriber1'),
200                          ('xmpp',  1, 'XmppSubscriber1'),
201                          ('xmpp',  2, 'XmppSubscriber2'),
202                         ], query_subs())
203
204    def test_replace_all(self):
205        def query(sid, authenticated):
206            return self.env.db_query("""\
207                SELECT distributor, format, priority, adverb, class
208                FROM notify_subscription
209                WHERE sid=%s AND authenticated=%s
210                ORDER BY distributor, priority""", (sid, authenticated))
211
212        req = MockRequest(self.env, authname='joe')
213        sess = req.session
214        items = [
215            ('email', 'text/plain', 'always', 'TicketSubscriber1'),
216            ('email', 'text/html',  'always', 'TicketSubscriber2'),
217            ('email', 'text/html',  'always', 'TicketSubscriber3'),
218            ('xmpp',  'text/html',  'never',  'XmppSubscriber1'),
219            ('xmpp',  'text/plain', 'always', 'XmppSubscriber2'),
220        ]
221        items = [dict(zip(('distributor', 'format', 'adverb', 'class'), item))
222                 for item in items]
223        Subscription.replace_all(self.env, sess.sid, sess.authenticated, items)
224        rows = query('joe', 1)
225        expected = [
226            ('email', 'text/plain', 1, 'always', 'TicketSubscriber1'),
227            ('email', 'text/html',  2, 'always', 'TicketSubscriber2'),
228            ('email', 'text/html',  3, 'always', 'TicketSubscriber3'),
229            ('xmpp',  'text/html',  1, 'never',  'XmppSubscriber1'),
230            ('xmpp',  'text/plain', 2, 'always', 'XmppSubscriber2'),
231        ]
232        self.assertEqual(expected, rows)
233
234        items = [
235            ('email', 'text/plain', 'never',  'TicketSubscriber3'),
236            ('xmpp',  'text/html',  'always', 'XmppSubscriber1'),
237        ]
238        items = [dict(zip(('distributor', 'format', 'adverb', 'class'), item))
239                 for item in items]
240        Subscription.replace_all(self.env, sess.sid, sess.authenticated, items)
241        rows = query('joe', 1)
242        expected = [
243            ('email', 'text/plain', 1, 'never',  'TicketSubscriber3'),
244            ('xmpp',  'text/html',  1, 'always', 'XmppSubscriber1'),
245        ]
246        self.assertEqual(expected, rows)
247
248        Subscription.replace_all(self.env, sess.sid, sess.authenticated, [])
249        self.assertEqual([], query('joe', 1))
250
251    def test_update_format_by_distributor_and_sid(self):
252        self._insert_rows()
253        Subscription.update_format_by_distributor_and_sid(
254            self.env, 'email', 'joe', True, 'application/pdf')
255        rows = self.env.db_query("""\
256            SELECT distributor, format, priority, adverb, class
257            FROM notify_subscription
258            WHERE sid=%s AND authenticated=%s
259            ORDER BY distributor, priority""", ('joe', 1))
260        expected = [
261            ('email', 'application/pdf', 1, 'always', 'EmailSubscriber1'),
262            ('email', 'application/pdf', 2, 'always', 'EmailSubscriber2'),
263            ('email', 'application/pdf', 3, 'always', 'EmailSubscriber3'),
264            ('irc',   'text/plain',      1, 'never',  'IrcSubscriber1'),
265            ('irc',   'text/plain',      2, 'never',  'IrcSubscriber2'),
266            ('irc',   'text/plain',      3, 'never',  'IrcSubscriber3'),
267            ('xmpp',  'text/html',       1, 'always', 'XmppSubscriber1'),
268            ('xmpp',  'text/plain',      2, 'never',  'XmppSubscriber2'),
269            ('xmpp',  'text/html',       3, 'never',  'XmppSubscriber3'),
270        ]
271        self.assertEqual(expected, rows)
272
273
274def test_suite():
275    suite = unittest.TestSuite()
276    suite.addTest(unittest.makeSuite(SubscriptionTestCase))
277    return suite
278
279
280if __name__ == '__main__':
281    unittest.main(defaultTest='test_suite')
282