1# encoding: utf-8
2from __future__ import absolute_import
3from __future__ import unicode_literals
4
5from compose import utils
6
7
8class TestJsonSplitter(object):
9
10    def test_json_splitter_no_object(self):
11        data = '{"foo": "bar'
12        assert utils.json_splitter(data) is None
13
14    def test_json_splitter_with_object(self):
15        data = '{"foo": "bar"}\n  \n{"next": "obj"}'
16        assert utils.json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
17
18    def test_json_splitter_leading_whitespace(self):
19        data = '\n   \r{"foo": "bar"}\n\n   {"next": "obj"}'
20        assert utils.json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
21
22
23class TestStreamAsText(object):
24
25    def test_stream_with_non_utf_unicode_character(self):
26        stream = [b'\xed\xf3\xf3']
27        output, = utils.stream_as_text(stream)
28        assert output == '���'
29
30    def test_stream_with_utf_character(self):
31        stream = ['ěĝ'.encode('utf-8')]
32        output, = utils.stream_as_text(stream)
33        assert output == 'ěĝ'
34
35
36class TestJsonStream(object):
37
38    def test_with_falsy_entries(self):
39        stream = [
40            '{"one": "two"}\n{}\n',
41            "[1, 2, 3]\n[]\n",
42        ]
43        output = list(utils.json_stream(stream))
44        assert output == [
45            {'one': 'two'},
46            {},
47            [1, 2, 3],
48            [],
49        ]
50
51    def test_with_leading_whitespace(self):
52        stream = [
53            '\n  \r\n  {"one": "two"}{"x": 1}',
54            '  {"three": "four"}\t\t{"x": 2}'
55        ]
56        output = list(utils.json_stream(stream))
57        assert output == [
58            {'one': 'two'},
59            {'x': 1},
60            {'three': 'four'},
61            {'x': 2}
62        ]
63
64
65class TestParseBytes(object):
66    def test_parse_bytes(self):
67        assert utils.parse_bytes('123kb') == 123 * 1024
68        assert utils.parse_bytes(123) == 123
69        assert utils.parse_bytes('foobar') is None
70        assert utils.parse_bytes('123') == 123
71
72
73class TestMoreItertools(object):
74    def test_unique_everseen(self):
75        unique = utils.unique_everseen
76        assert list(unique([2, 1, 2, 1])) == [2, 1]
77        assert list(unique([2, 1, 2, 1], hash)) == [2, 1]
78        assert list(unique([2, 1, 2, 1], lambda x: 'key_%s' % x)) == [2, 1]
79