1# Copyright (c) 2008-2012 testtools developers. See LICENSE for details.
2
3import io
4import os
5import tempfile
6import unittest
7
8from testtools import TestCase, skipUnless
9from testtools.compat import (
10    _b,
11    )
12from testtools.content import (
13    attach_file,
14    Content,
15    content_from_file,
16    content_from_stream,
17    JSON,
18    json_content,
19    StackLinesContent,
20    StacktraceContent,
21    TracebackContent,
22    text_content,
23    )
24from testtools.content_type import (
25    ContentType,
26    UTF8_TEXT,
27    )
28from testtools.matchers import (
29    Equals,
30    MatchesException,
31    Raises,
32    raises,
33    )
34from testtools.tests.helpers import an_exc_info
35
36
37raises_value_error = Raises(MatchesException(ValueError))
38
39
40class TestContent(TestCase):
41
42    def test___init___None_errors(self):
43        self.assertThat(lambda: Content(None, None), raises_value_error)
44        self.assertThat(
45            lambda: Content(None, lambda: ["traceback"]), raises_value_error)
46        self.assertThat(
47            lambda: Content(ContentType("text", "traceback"), None),
48            raises_value_error)
49
50    def test___init___sets_ivars(self):
51        content_type = ContentType("foo", "bar")
52        content = Content(content_type, lambda: ["bytes"])
53        self.assertEqual(content_type, content.content_type)
54        self.assertEqual(["bytes"], list(content.iter_bytes()))
55
56    def test___eq__(self):
57        content_type = ContentType("foo", "bar")
58        one_chunk = lambda: [_b("bytes")]
59        two_chunk = lambda: [_b("by"), _b("tes")]
60        content1 = Content(content_type, one_chunk)
61        content2 = Content(content_type, one_chunk)
62        content3 = Content(content_type, two_chunk)
63        content4 = Content(content_type, lambda: [_b("by"), _b("te")])
64        content5 = Content(ContentType("f", "b"), two_chunk)
65        self.assertEqual(content1, content2)
66        self.assertEqual(content1, content3)
67        self.assertNotEqual(content1, content4)
68        self.assertNotEqual(content1, content5)
69
70    def test___repr__(self):
71        content = Content(ContentType("application", "octet-stream"),
72            lambda: [_b("\x00bin"), _b("ary\xff")])
73        self.assertIn("\\x00binary\\xff", repr(content))
74
75    def test_iter_text_not_text_errors(self):
76        content_type = ContentType("foo", "bar")
77        content = Content(content_type, lambda: ["bytes"])
78        self.assertThat(content.iter_text, raises_value_error)
79
80    def test_iter_text_decodes(self):
81        content_type = ContentType("text", "strange", {"charset": "utf8"})
82        content = Content(
83            content_type, lambda: ["bytes\xea".encode()])
84        self.assertEqual(["bytes\xea"], list(content.iter_text()))
85
86    def test_iter_text_default_charset_iso_8859_1(self):
87        content_type = ContentType("text", "strange")
88        text = "bytes\xea"
89        iso_version = text.encode("ISO-8859-1")
90        content = Content(content_type, lambda: [iso_version])
91        self.assertEqual([text], list(content.iter_text()))
92
93    def test_as_text(self):
94        content_type = ContentType("text", "strange", {"charset": "utf8"})
95        content = Content(
96            content_type, lambda: ["bytes\xea".encode()])
97        self.assertEqual("bytes\xea", content.as_text())
98
99    def test_from_file(self):
100        fd, path = tempfile.mkstemp()
101        self.addCleanup(os.remove, path)
102        os.write(fd, _b('some data'))
103        os.close(fd)
104        content = content_from_file(path, UTF8_TEXT, chunk_size=2)
105        self.assertThat(
106            list(content.iter_bytes()),
107            Equals([_b('so'), _b('me'), _b(' d'), _b('at'), _b('a')]))
108
109    def test_from_nonexistent_file(self):
110        directory = tempfile.mkdtemp()
111        nonexistent = os.path.join(directory, 'nonexistent-file')
112        content = content_from_file(nonexistent)
113        self.assertThat(content.iter_bytes, raises(IOError))
114
115    def test_from_file_default_type(self):
116        content = content_from_file('/nonexistent/path')
117        self.assertThat(content.content_type, Equals(UTF8_TEXT))
118
119    def test_from_file_eager_loading(self):
120        fd, path = tempfile.mkstemp()
121        os.write(fd, _b('some data'))
122        os.close(fd)
123        content = content_from_file(path, UTF8_TEXT, buffer_now=True)
124        os.remove(path)
125        self.assertThat(
126            ''.join(content.iter_text()), Equals('some data'))
127
128    def test_from_file_with_simple_seek(self):
129        f = tempfile.NamedTemporaryFile()
130        f.write(_b('some data'))
131        f.flush()
132        self.addCleanup(f.close)
133        content = content_from_file(
134            f.name, UTF8_TEXT, chunk_size=50, seek_offset=5)
135        self.assertThat(
136            list(content.iter_bytes()), Equals([_b('data')]))
137
138    def test_from_file_with_whence_seek(self):
139        f = tempfile.NamedTemporaryFile()
140        f.write(_b('some data'))
141        f.flush()
142        self.addCleanup(f.close)
143        content = content_from_file(
144            f.name, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
145        self.assertThat(
146            list(content.iter_bytes()), Equals([_b('data')]))
147
148    def test_from_stream(self):
149        data = io.StringIO('some data')
150        content = content_from_stream(data, UTF8_TEXT, chunk_size=2)
151        self.assertThat(
152            list(content.iter_bytes()), Equals(['so', 'me', ' d', 'at', 'a']))
153
154    def test_from_stream_default_type(self):
155        data = io.StringIO('some data')
156        content = content_from_stream(data)
157        self.assertThat(content.content_type, Equals(UTF8_TEXT))
158
159    def test_from_stream_eager_loading(self):
160        fd, path = tempfile.mkstemp()
161        self.addCleanup(os.remove, path)
162        self.addCleanup(os.close, fd)
163        os.write(fd, _b('some data'))
164        stream = open(path, 'rb')
165        self.addCleanup(stream.close)
166        content = content_from_stream(stream, UTF8_TEXT, buffer_now=True)
167        os.write(fd, _b('more data'))
168        self.assertThat(
169            ''.join(content.iter_text()), Equals('some data'))
170
171    def test_from_stream_with_simple_seek(self):
172        data = io.BytesIO(_b('some data'))
173        content = content_from_stream(
174            data, UTF8_TEXT, chunk_size=50, seek_offset=5)
175        self.assertThat(
176            list(content.iter_bytes()), Equals([_b('data')]))
177
178    def test_from_stream_with_whence_seek(self):
179        data = io.BytesIO(_b('some data'))
180        content = content_from_stream(
181            data, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
182        self.assertThat(
183            list(content.iter_bytes()), Equals([_b('data')]))
184
185    def test_from_text(self):
186        data = "some data"
187        expected = Content(UTF8_TEXT, lambda: [data.encode('utf8')])
188        self.assertEqual(expected, text_content(data))
189
190    def test_text_content_raises_TypeError_when_passed_bytes(self):
191        data = _b("Some Bytes")
192        self.assertRaises(TypeError, text_content, data)
193
194    def test_text_content_raises_TypeError_when_passed_non_text(self):
195        bad_values = (None, list(), dict(), 42, 1.23)
196        for value in bad_values:
197            self.assertThat(
198                lambda: text_content(value),
199                raises(
200                    TypeError("text_content must be given text, not '%s'." %
201                        type(value).__name__)
202                ),
203            )
204
205    def test_json_content(self):
206        data = {'foo': 'bar'}
207        expected = Content(JSON, lambda: [_b('{"foo": "bar"}')])
208        self.assertEqual(expected, json_content(data))
209
210
211class TestStackLinesContent(TestCase):
212
213    def _get_stack_line_and_expected_output(self):
214        stack_lines = [
215            ('/path/to/file', 42, 'some_function', 'print("Hello World")'),
216        ]
217        expected = '  File "/path/to/file", line 42, in some_function\n' \
218                   '    print("Hello World")\n'
219        return stack_lines, expected
220
221    def test_single_stack_line(self):
222        stack_lines, expected = self._get_stack_line_and_expected_output()
223        actual = StackLinesContent(stack_lines).as_text()
224
225        self.assertEqual(expected, actual)
226
227    def test_prefix_content(self):
228        stack_lines, expected = self._get_stack_line_and_expected_output()
229        prefix = self.getUniqueString() + '\n'
230        content = StackLinesContent(stack_lines, prefix_content=prefix)
231        actual = content.as_text()
232        expected = prefix  + expected
233
234        self.assertEqual(expected, actual)
235
236    def test_postfix_content(self):
237        stack_lines, expected = self._get_stack_line_and_expected_output()
238        postfix = '\n' + self.getUniqueString()
239        content = StackLinesContent(stack_lines, postfix_content=postfix)
240        actual = content.as_text()
241        expected = expected + postfix
242
243        self.assertEqual(expected, actual)
244
245    def test___init___sets_content_type(self):
246        stack_lines, expected = self._get_stack_line_and_expected_output()
247        content = StackLinesContent(stack_lines)
248        expected_content_type = ContentType("text", "x-traceback",
249            {"language": "python", "charset": "utf8"})
250
251        self.assertEqual(expected_content_type, content.content_type)
252
253
254class TestTracebackContent(TestCase):
255
256    def test___init___None_errors(self):
257        self.assertThat(
258            lambda: TracebackContent(None, None), raises_value_error)
259
260    def test___init___sets_ivars(self):
261        content = TracebackContent(an_exc_info, self)
262        content_type = ContentType("text", "x-traceback",
263            {"language": "python", "charset": "utf8"})
264        self.assertEqual(content_type, content.content_type)
265        result = unittest.TestResult()
266        expected = result._exc_info_to_string(an_exc_info, self)
267        self.assertEqual(expected, ''.join(list(content.iter_text())))
268
269
270class TestStacktraceContent(TestCase):
271
272    def test___init___sets_ivars(self):
273        content = StacktraceContent()
274        content_type = ContentType("text", "x-traceback",
275            {"language": "python", "charset": "utf8"})
276        self.assertEqual(content_type, content.content_type)
277
278    def test_prefix_is_used(self):
279        prefix = self.getUniqueString()
280        actual = StacktraceContent(prefix_content=prefix).as_text()
281        self.assertTrue(actual.startswith(prefix))
282
283    def test_postfix_is_used(self):
284        postfix = self.getUniqueString()
285        actual = StacktraceContent(postfix_content=postfix).as_text()
286        self.assertTrue(actual.endswith(postfix))
287
288    def test_top_frame_is_skipped_when_no_stack_is_specified(self):
289        actual = StacktraceContent().as_text()
290        self.assertTrue('testtools/content.py' not in actual)
291
292
293class TestAttachFile(TestCase):
294
295    def make_file(self, data):
296        # GZ 2011-04-21: This helper could be useful for methods above trying
297        #                to use mkstemp, but should handle write failures and
298        #                always close the fd. There must be a better way.
299        fd, path = tempfile.mkstemp()
300        self.addCleanup(os.remove, path)
301        os.write(fd, _b(data))
302        os.close(fd)
303        return path
304
305    def test_simple(self):
306        class SomeTest(TestCase):
307            def test_foo(self):
308                pass
309        test = SomeTest('test_foo')
310        data = 'some data'
311        path = self.make_file(data)
312        my_content = text_content(data)
313        attach_file(test, path, name='foo')
314        self.assertEqual({'foo': my_content}, test.getDetails())
315
316    def test_optional_name(self):
317        # If no name is provided, attach_file just uses the base name of the
318        # file.
319        class SomeTest(TestCase):
320            def test_foo(self):
321                pass
322        test = SomeTest('test_foo')
323        path = self.make_file('some data')
324        base_path = os.path.basename(path)
325        attach_file(test, path)
326        self.assertEqual([base_path], list(test.getDetails()))
327
328    def test_lazy_read(self):
329        class SomeTest(TestCase):
330            def test_foo(self):
331                pass
332        test = SomeTest('test_foo')
333        path = self.make_file('some data')
334        attach_file(test, path, name='foo', buffer_now=False)
335        content = test.getDetails()['foo']
336        content_file = open(path, 'w')
337        content_file.write('new data')
338        content_file.close()
339        self.assertEqual(''.join(content.iter_text()), 'new data')
340
341    def test_eager_read_by_default(self):
342        class SomeTest(TestCase):
343            def test_foo(self):
344                pass
345        test = SomeTest('test_foo')
346        path = self.make_file('some data')
347        attach_file(test, path, name='foo')
348        content = test.getDetails()['foo']
349        content_file = open(path, 'w')
350        content_file.write('new data')
351        content_file.close()
352        self.assertEqual(''.join(content.iter_text()), 'some data')
353
354
355def test_suite():
356    from unittest import TestLoader
357    return TestLoader().loadTestsFromName(__name__)
358