1from docker.utils.json_stream import json_splitter, stream_as_text, json_stream
2
3
4class TestJsonSplitter:
5
6    def test_json_splitter_no_object(self):
7        data = '{"foo": "bar'
8        assert json_splitter(data) is None
9
10    def test_json_splitter_with_object(self):
11        data = '{"foo": "bar"}\n  \n{"next": "obj"}'
12        assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
13
14    def test_json_splitter_leading_whitespace(self):
15        data = '\n   \r{"foo": "bar"}\n\n   {"next": "obj"}'
16        assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
17
18
19class TestStreamAsText:
20
21    def test_stream_with_non_utf_unicode_character(self):
22        stream = [b'\xed\xf3\xf3']
23        output, = stream_as_text(stream)
24        assert output == '���'
25
26    def test_stream_with_utf_character(self):
27        stream = ['ěĝ'.encode()]
28        output, = stream_as_text(stream)
29        assert output == 'ěĝ'
30
31
32class TestJsonStream:
33
34    def test_with_falsy_entries(self):
35        stream = [
36            '{"one": "two"}\n{}\n',
37            "[1, 2, 3]\n[]\n",
38        ]
39        output = list(json_stream(stream))
40        assert output == [
41            {'one': 'two'},
42            {},
43            [1, 2, 3],
44            [],
45        ]
46
47    def test_with_leading_whitespace(self):
48        stream = [
49            '\n  \r\n  {"one": "two"}{"x": 1}',
50            '  {"three": "four"}\t\t{"x": 2}'
51        ]
52        output = list(json_stream(stream))
53        assert output == [
54            {'one': 'two'},
55            {'x': 1},
56            {'three': 'four'},
57            {'x': 2}
58        ]
59