1from docker.utils.json_stream import json_splitter, stream_as_text, json_stream 2 3 4class TestJsonSplitter: 5 6 def test_json_splitter_no_object(self): 7 data = '{"foo": "bar' 8 assert json_splitter(data) is None 9 10 def test_json_splitter_with_object(self): 11 data = '{"foo": "bar"}\n \n{"next": "obj"}' 12 assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}') 13 14 def test_json_splitter_leading_whitespace(self): 15 data = '\n \r{"foo": "bar"}\n\n {"next": "obj"}' 16 assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}') 17 18 19class TestStreamAsText: 20 21 def test_stream_with_non_utf_unicode_character(self): 22 stream = [b'\xed\xf3\xf3'] 23 output, = stream_as_text(stream) 24 assert output == '���' 25 26 def test_stream_with_utf_character(self): 27 stream = ['ěĝ'.encode()] 28 output, = stream_as_text(stream) 29 assert output == 'ěĝ' 30 31 32class TestJsonStream: 33 34 def test_with_falsy_entries(self): 35 stream = [ 36 '{"one": "two"}\n{}\n', 37 "[1, 2, 3]\n[]\n", 38 ] 39 output = list(json_stream(stream)) 40 assert output == [ 41 {'one': 'two'}, 42 {}, 43 [1, 2, 3], 44 [], 45 ] 46 47 def test_with_leading_whitespace(self): 48 stream = [ 49 '\n \r\n {"one": "two"}{"x": 1}', 50 ' {"three": "four"}\t\t{"x": 2}' 51 ] 52 output = list(json_stream(stream)) 53 assert output == [ 54 {'one': 'two'}, 55 {'x': 1}, 56 {'three': 'four'}, 57 {'x': 2} 58 ] 59