1import unittest
2
3class TestTM(unittest.TestCase):
4    def _getTargetClass(self):
5        from repoze.tm import TM
6        return TM
7
8    def _start_response(self, status, headers, exc_info=None):
9        pass
10
11    def _makeOne(self, app, commit_veto=None):
12        return self._getTargetClass()(app, commit_veto)
13
14    def test_ekey_inserted(self):
15        app = DummyApplication()
16        tm = self._makeOne(app)
17        tm.transaction = DummyTransactionModule()
18        from repoze.tm import ekey
19        env = {}
20        result = [chunk for chunk in tm(env, self._start_response)]
21        self.assertEqual(result, ['hello'])
22        self.assertTrue(ekey in env)
23
24    def test_committed(self):
25        app = DummyApplication()
26        tm = self._makeOne(app)
27        transaction = DummyTransactionModule()
28        tm.transaction = transaction
29        result = [chunk for chunk in tm({}, self._start_response)]
30        self.assertEqual(result, ['hello'])
31        self.assertEqual(transaction.committed, True)
32        self.assertEqual(transaction.aborted, False)
33
34    def test_aborted_via_doom(self):
35        app = DummyApplication()
36        tm = self._makeOne(app)
37        transaction = DummyTransactionModule(doom=True)
38        tm.transaction = transaction
39        result = [chunk for chunk in tm({}, self._start_response)]
40        self.assertEqual(result, ['hello'])
41        self.assertEqual(transaction.committed, False)
42        self.assertEqual(transaction.aborted, True)
43
44    def test_aborted_via_exception(self):
45        app = DummyApplication(exception=True)
46        tm = self._makeOne(app)
47        transaction = DummyTransactionModule()
48        tm.transaction = transaction
49        def execute_request():
50            [chunk for chunk in tm({}, self._start_response)]
51
52        self.assertRaises(ValueError, execute_request)
53        self.assertEqual(transaction.committed, False)
54        self.assertEqual(transaction.aborted, True)
55
56    def test_aborted_via_exception_and_doom(self):
57        app = DummyApplication(exception=True)
58        tm = self._makeOne(app)
59        transaction = DummyTransactionModule(doom=True)
60        tm.transaction = transaction
61        def execute_request():
62            [chunk for chunk in tm({}, self._start_response)]
63
64        self.assertRaises(ValueError, execute_request)
65        self.assertEqual(transaction.committed, False)
66        self.assertEqual(transaction.aborted, True)
67
68    def test_aborted_via_commit_veto(self):
69        app = DummyApplication(status="403 Forbidden")
70        def commit_veto(environ, status, headers):
71            self.assertTrue(isinstance(environ, dict),
72                            "environ is not passed properly")
73            self.assertTrue(isinstance(headers, list),
74                            "headers are not passed properly")
75            self.assertTrue(isinstance(status, str),
76                            "status is not passed properly")
77            return not (200 <= int(status.split()[0]) < 400)
78        tm = self._makeOne(app, commit_veto)
79        transaction = DummyTransactionModule()
80        tm.transaction = transaction
81        [chunk for chunk in tm({}, self._start_response)]
82        self.assertEqual(transaction.committed, False)
83        self.assertEqual(transaction.aborted, True) # ici
84
85    def test_committed_via_commit_veto_exception(self):
86        app = DummyApplication(status="403 Forbidden")
87        def commit_veto(environ, status, headers):
88            return None
89        tm = self._makeOne(app, commit_veto)
90        transaction = DummyTransactionModule()
91        tm.transaction = transaction
92        [chunk for chunk in tm({}, self._start_response)]
93        self.assertEqual(transaction.committed, True)
94        self.assertEqual(transaction.aborted, False)
95
96    def test_aborted_via_commit_veto_exception(self):
97        app = DummyApplication(status="403 Forbidden")
98        def commit_veto(environ, status, headers):
99            raise ValueError('foo')
100        tm = self._makeOne(app, commit_veto)
101        transaction = DummyTransactionModule()
102        tm.transaction = transaction
103        def execute_request():
104            [chunk for chunk in tm({}, self._start_response)]
105
106        self.assertRaises(ValueError, execute_request)
107        self.assertEqual(transaction.committed, False)
108        self.assertEqual(transaction.aborted, True)
109
110    def test_cleanup_on_commit(self):
111        from repoze.tm import after_end
112        dummycalled = []
113        def dummy():
114            dummycalled.append(True)
115        env = {}
116        app = DummyApplication()
117        tm = self._makeOne(app)
118        transaction = DummyTransactionModule()
119        setattr(transaction, after_end.key, [dummy])
120        tm.transaction = transaction
121        [chunk for chunk in tm(env, self._start_response)]
122        self.assertEqual(transaction.committed, True)
123        self.assertEqual(transaction.aborted, False)
124        self.assertEqual(dummycalled, [True])
125
126    def test_cleanup_on_abort(self):
127        from repoze.tm import after_end
128        dummycalled = []
129        def dummy():
130            dummycalled.append(True)
131        env = {}
132        app = DummyApplication(exception=True)
133        tm = self._makeOne(app)
134        transaction = DummyTransactionModule()
135        setattr(transaction, after_end.key, [dummy])
136        tm.transaction = transaction
137        def execute_request():
138            [chunk for chunk in tm(env, self._start_response)]
139
140        self.assertRaises(ValueError, execute_request)
141        self.assertEqual(transaction.committed, False)
142        self.assertEqual(transaction.aborted, True)
143        self.assertEqual(dummycalled, [True])
144
145class TestAfterEnd(unittest.TestCase):
146    def _getTargetClass(self):
147        from repoze.tm import AfterEnd
148        return AfterEnd
149
150    def _makeOne(self):
151        return self._getTargetClass()()
152
153    def test_register(self):
154        registry = self._makeOne()
155        func = lambda *x: None
156        txn = Dummy()
157        registry.register(func, txn)
158        self.assertEqual(getattr(txn, registry.key), [func])
159
160    def test_unregister_exists(self):
161        registry = self._makeOne()
162        func = lambda *x: None
163        txn = Dummy()
164        registry.register(func, txn)
165        self.assertEqual(getattr(txn, registry.key), [func])
166        registry.unregister(func, txn)
167        self.assertFalse(hasattr(txn, registry.key))
168
169    def test_unregister_notexists(self):
170        registry = self._makeOne()
171        func = lambda *x: None
172        txn = Dummy()
173        setattr(txn, registry.key, [None])
174        registry.unregister(func, txn)
175        self.assertEqual(getattr(txn, registry.key), [None])
176
177    def test_unregister_funcs_is_None(self):
178        registry = self._makeOne()
179        func = lambda *x: None
180        txn = Dummy()
181        self.assertEqual(registry.unregister(func, txn), None)
182
183class UtilityFunctionTests(unittest.TestCase):
184    def test_isActive(self):
185        from repoze.tm import ekey
186        from repoze.tm import isActive
187        self.assertEqual(isActive({ekey:True}), True)
188        self.assertEqual(isActive({}), False)
189
190class TestMakeTM(unittest.TestCase):
191    def test_make_tm_withveto(self):
192        from repoze.tm import make_tm
193        tm = make_tm(DummyApplication(), {}, 'repoze.tm.tests:fakeveto')
194        self.assertEqual(tm.commit_veto, fakeveto)
195
196    def test_make_tm_noveto(self):
197        from repoze.tm import make_tm
198        tm = make_tm(DummyApplication(), {}, None)
199        self.assertEqual(tm.commit_veto, None)
200
201class Test_default_commit_veto(unittest.TestCase):
202    def _callFUT(self, status, headers=()):
203        from repoze.tm import default_commit_veto
204        return default_commit_veto(None, status, headers)
205
206    def test_it_true_5XX(self):
207        self.assertTrue(self._callFUT('500 Server Error'))
208        self.assertTrue(self._callFUT('503 Service Unavailable'))
209
210    def test_it_true_4XX(self):
211        self.assertTrue(self._callFUT('400 Bad Request'))
212        self.assertTrue(self._callFUT('411 Length Required'))
213
214    def test_it_false_2XX(self):
215        self.assertFalse(self._callFUT('200 OK'))
216        self.assertFalse(self._callFUT('201 Created'))
217
218    def test_it_false_3XX(self):
219        self.assertFalse(self._callFUT('301 Moved Permanently'))
220        self.assertFalse(self._callFUT('302 Found'))
221
222    def test_it_true_x_tm_abort_specific(self):
223        self.assertTrue(self._callFUT('200 OK', [('X-Tm-Abort', True)]))
224
225    def test_it_false_x_tm_commit(self):
226        self.assertFalse(self._callFUT('200 OK', [('X-Tm', 'commit')]))
227
228    def test_it_true_x_tm_abort(self):
229        self.assertTrue(self._callFUT('200 OK', [('X-Tm', 'abort')]))
230
231    def test_it_true_x_tm_anythingelse(self):
232        self.assertTrue(self._callFUT('200 OK', [('X-Tm', '')]))
233
234    def test_x_tm_generic_precedes_x_tm_abort_specific(self):
235        self.assertFalse(self._callFUT('200 OK', [('X-Tm', 'commit'),
236                                                  ('X-Tm-Abort', True)]))
237
238def fakeveto(environ, status, headers):
239    """ """
240
241class DummyTransactionModule:
242    begun = False
243    committed = False
244    aborted = False
245    def __init__(self, doom=False):
246        self.doom = doom
247
248    def begin(self):
249        self.begun = True
250
251    def get(self):
252        return self
253
254    def commit(self):
255        self.committed = True
256
257    def abort(self):
258        self.aborted = True
259
260    def isDoomed(self):
261        return self.doom
262
263class Dummy:
264    pass
265
266class DummyApplication:
267    def __init__(self, exception=False, status="200 OK"):
268        self.exception = exception
269        self.status = status
270
271    def __call__(self, environ, start_response):
272        start_response(self.status, [], None)
273        if self.exception:
274            raise ValueError('raising')
275        return ['hello']
276
277