1from builtins import next
2from builtins import str
3import sys
4import unittest
5import re
6import os
7sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
8
9import gc
10from itertools import islice
11from tempfile import mkdtemp
12from shutil import rmtree
13from Exscript.logger import Log, Logger
14from LogTest import FakeError
15from util.reportTest import FakeJob
16
17
18def count(iterable):
19    return sum(1 for _ in iterable)
20
21
22def nth(iterable, n, default=None):
23    "Returns the nth item or a default value"
24    return next(islice(iterable, n, None), default)
25
26
27class LoggerTest(unittest.TestCase):
28    CORRELATE = Logger
29
30    def setUp(self):
31        self.logger = Logger()
32        self.job = FakeJob('fake')
33
34    def tearDown(self):
35        # Needed to make sure that events are disconnected.
36        self.logger = None
37
38    def testConstructor(self):
39        logger = Logger()
40
41    def testGetSucceededActions(self):
42        self.assertEqual(self.logger.get_succeeded_actions(), 0)
43
44        job1 = FakeJob()
45        job2 = FakeJob()
46        self.assertEqual(self.logger.get_succeeded_actions(), 0)
47
48        self.logger.add_log(id(job1), job1.name, 1)
49        self.logger.add_log(id(job2), job2.name, 1)
50        self.assertEqual(self.logger.get_succeeded_actions(), 0)
51
52        self.logger.log_succeeded(id(job1))
53        self.assertEqual(self.logger.get_succeeded_actions(), 1)
54
55        try:
56            raise FakeError()
57        except FakeError:
58            self.logger.log_aborted(id(job2), sys.exc_info())
59        self.assertEqual(self.logger.get_succeeded_actions(), 1)
60
61    def testGetAbortedActions(self):
62        self.assertEqual(self.logger.get_aborted_actions(), 0)
63
64        job = FakeJob()
65        self.assertEqual(self.logger.get_aborted_actions(), 0)
66
67        self.logger.add_log(id(job), job.name, 1)
68        self.assertEqual(self.logger.get_aborted_actions(), 0)
69
70        self.logger.log_succeeded(id(job))
71        self.assertEqual(self.logger.get_aborted_actions(), 0)
72
73        try:
74            raise FakeError()
75        except FakeError:
76            self.logger.log_aborted(id(job), sys.exc_info())
77        self.assertEqual(self.logger.get_aborted_actions(), 1)
78
79    def testGetLogs(self):
80        self.assertEqual(count(self.logger.get_logs()), 0)
81
82        job = FakeJob()
83        self.assertEqual(count(self.logger.get_logs()), 0)
84
85        self.logger.add_log(id(job), job.name, 1)
86        self.assertEqual(count(self.logger.get_logs()), 1)
87        self.assertIsInstance(nth(self.logger.get_logs(), 0), Log)
88
89        self.logger.log(id(job), 'hello world')
90        self.assertEqual(count(self.logger.get_logs()), 1)
91
92        self.logger.log_succeeded(id(job))
93        self.assertEqual(count(self.logger.get_logs()), 1)
94
95        try:
96            raise FakeError()
97        except FakeError:
98            self.logger.log_aborted(id(job), sys.exc_info())
99        self.assertEqual(count(self.logger.get_logs()), 1)
100
101    def testGetSucceededLogs(self):
102        self.assertEqual(count(self.logger.get_succeeded_logs()), 0)
103
104        job = FakeJob()
105        self.assertEqual(count(self.logger.get_succeeded_logs()), 0)
106
107        self.logger.add_log(id(job), job.name, 1)
108        self.assertEqual(count(self.logger.get_succeeded_logs()), 0)
109
110        self.logger.log(id(job), 'hello world')
111        self.assertEqual(count(self.logger.get_succeeded_logs()), 0)
112
113        self.logger.log_succeeded(id(job))
114        self.assertEqual(count(self.logger.get_aborted_logs()), 0)
115        self.assertEqual(count(self.logger.get_succeeded_logs()), 1)
116        self.assertIsInstance(nth(self.logger.get_succeeded_logs(), 0), Log)
117
118    def testGetAbortedLogs(self):
119        self.assertEqual(count(self.logger.get_aborted_logs()), 0)
120
121        job = FakeJob()
122        self.assertEqual(count(self.logger.get_aborted_logs()), 0)
123
124        self.logger.add_log(id(job), job.name, 1)
125        self.assertEqual(count(self.logger.get_aborted_logs()), 0)
126
127        self.logger.log(id(job), 'hello world')
128        self.assertEqual(count(self.logger.get_aborted_logs()), 0)
129
130        try:
131            raise FakeError()
132        except FakeError:
133            self.logger.log_aborted(id(job), sys.exc_info())
134        self.assertEqual(count(self.logger.get_succeeded_logs()), 0)
135        self.assertEqual(count(self.logger.get_aborted_logs()), 1)
136        self.assertIsInstance(nth(self.logger.get_aborted_logs(), 0), Log)
137
138    def testAddLog(self):
139        self.assertEqual(count(self.logger.get_logs()), 0)
140        log = self.logger.add_log(id(self.job), self.job.name, 1)
141        self.assertEqual(count(self.logger.get_logs()), 1)
142        self.assertEqual(str(log), '')
143        return log
144
145    def testLog(self):
146        log = self.testAddLog()
147        self.logger.log(id(self.job), 'hello world')
148        self.assertEqual(str(log), 'hello world')
149        return log
150
151    def testLogAborted(self):
152        log = self.testLog()
153        try:
154            raise FakeError()
155        except Exception:
156            self.logger.log_aborted(id(self.job), sys.exc_info())
157        self.assertIn('FakeError', str(log))
158        return log
159
160    def testLogSucceeded(self):
161        log = self.testLog()
162        self.logger.log_succeeded(id(self.job))
163        self.assertEqual(str(log), 'hello world')
164        return log
165
166
167def suite():
168    return unittest.TestLoader().loadTestsFromTestCase(LoggerTest)
169if __name__ == '__main__':
170    unittest.TextTestRunner(verbosity=2).run(suite())
171