1# -*- encoding: utf-8 -*- 2""" 3Tests for pika.channel.ContentFrameAssembler 4 5""" 6import marshal 7import unittest 8 9from pika import channel, exceptions, frame, spec 10 11 12class ContentFrameAssemblerTests(unittest.TestCase): 13 def setUp(self): 14 self.obj = channel.ContentFrameAssembler() 15 16 def test_init_method_frame(self): 17 self.assertEqual(self.obj._method_frame, None) 18 19 def test_init_header_frame(self): 20 self.assertEqual(self.obj._header_frame, None) 21 22 def test_init_seen_so_far(self): 23 self.assertEqual(self.obj._seen_so_far, 0) 24 25 def test_init_body_fragments(self): 26 self.assertEqual(self.obj._body_fragments, list()) 27 28 def test_process_with_basic_deliver(self): 29 value = frame.Method(1, spec.Basic.Deliver()) 30 self.obj.process(value) 31 self.assertEqual(self.obj._method_frame, value) 32 33 def test_process_with_content_header(self): 34 value = frame.Header(1, 100, spec.BasicProperties) 35 self.obj.process(value) 36 self.assertEqual(self.obj._header_frame, value) 37 38 def test_process_with_body_frame_partial(self): 39 value = frame.Header(1, 100, spec.BasicProperties) 40 self.obj.process(value) 41 value = frame.Method(1, spec.Basic.Deliver()) 42 self.obj.process(value) 43 value = frame.Body(1, b'abc123') 44 self.obj.process(value) 45 self.assertEqual(self.obj._body_fragments, [value.fragment]) 46 47 def test_process_with_full_message(self): 48 method_frame = frame.Method(1, spec.Basic.Deliver()) 49 self.obj.process(method_frame) 50 header_frame = frame.Header(1, 6, spec.BasicProperties) 51 self.obj.process(header_frame) 52 body_frame = frame.Body(1, b'abc123') 53 response = self.obj.process(body_frame) 54 self.assertEqual(response, (method_frame, header_frame, b'abc123')) 55 56 def test_process_with_body_frame_six_bytes(self): 57 method_frame = frame.Method(1, spec.Basic.Deliver()) 58 self.obj.process(method_frame) 59 header_frame = frame.Header(1, 10, spec.BasicProperties) 60 self.obj.process(header_frame) 61 body_frame = frame.Body(1, b'abc123') 62 self.obj.process(body_frame) 63 self.assertEqual(self.obj._seen_so_far, 6) 64 65 def test_process_with_body_frame_too_big(self): 66 method_frame = frame.Method(1, spec.Basic.Deliver()) 67 self.obj.process(method_frame) 68 header_frame = frame.Header(1, 6, spec.BasicProperties) 69 self.obj.process(header_frame) 70 body_frame = frame.Body(1, b'abcd1234') 71 self.assertRaises(exceptions.BodyTooLongError, self.obj.process, 72 body_frame) 73 74 def test_process_with_unexpected_frame_type(self): 75 value = frame.Method(1, spec.Basic.Qos()) 76 self.assertRaises(exceptions.UnexpectedFrameError, self.obj.process, 77 value) 78 79 def test_reset_method_frame(self): 80 method_frame = frame.Method(1, spec.Basic.Deliver()) 81 self.obj.process(method_frame) 82 header_frame = frame.Header(1, 10, spec.BasicProperties) 83 self.obj.process(header_frame) 84 body_frame = frame.Body(1, b'abc123') 85 self.obj.process(body_frame) 86 self.obj._reset() 87 self.assertEqual(self.obj._method_frame, None) 88 89 def test_reset_header_frame(self): 90 method_frame = frame.Method(1, spec.Basic.Deliver()) 91 self.obj.process(method_frame) 92 header_frame = frame.Header(1, 10, spec.BasicProperties) 93 self.obj.process(header_frame) 94 body_frame = frame.Body(1, b'abc123') 95 self.obj.process(body_frame) 96 self.obj._reset() 97 self.assertEqual(self.obj._header_frame, None) 98 99 def test_reset_seen_so_far(self): 100 method_frame = frame.Method(1, spec.Basic.Deliver()) 101 self.obj.process(method_frame) 102 header_frame = frame.Header(1, 10, spec.BasicProperties) 103 self.obj.process(header_frame) 104 body_frame = frame.Body(1, b'abc123') 105 self.obj.process(body_frame) 106 self.obj._reset() 107 self.assertEqual(self.obj._seen_so_far, 0) 108 109 def test_reset_body_fragments(self): 110 method_frame = frame.Method(1, spec.Basic.Deliver()) 111 self.obj.process(method_frame) 112 header_frame = frame.Header(1, 10, spec.BasicProperties) 113 self.obj.process(header_frame) 114 body_frame = frame.Body(1, b'abc123') 115 self.obj.process(body_frame) 116 self.obj._reset() 117 self.assertEqual(self.obj._body_fragments, list()) 118 119 def test_ascii_bytes_body_instance(self): 120 method_frame = frame.Method(1, spec.Basic.Deliver()) 121 self.obj.process(method_frame) 122 header_frame = frame.Header(1, 11, spec.BasicProperties) 123 self.obj.process(header_frame) 124 body_frame = frame.Body(1, b'foo-bar-baz') 125 method_frame, header_frame, body_value = self.obj.process(body_frame) 126 self.assertIsInstance(body_value, bytes) 127 128 def test_ascii_body_value(self): 129 expectation = b'foo-bar-baz' 130 self.obj = channel.ContentFrameAssembler() 131 method_frame = frame.Method(1, spec.Basic.Deliver()) 132 self.obj.process(method_frame) 133 header_frame = frame.Header(1, 11, spec.BasicProperties) 134 self.obj.process(header_frame) 135 body_frame = frame.Body(1, b'foo-bar-baz') 136 method_frame, header_frame, body_value = self.obj.process(body_frame) 137 self.assertEqual(body_value, expectation) 138 self.assertIsInstance(body_value, bytes) 139 140 def test_binary_non_unicode_value(self): 141 expectation = ('a', 0.8) 142 self.obj = channel.ContentFrameAssembler() 143 method_frame = frame.Method(1, spec.Basic.Deliver()) 144 self.obj.process(method_frame) 145 marshalled_body = marshal.dumps(expectation) 146 header_frame = frame.Header(1, len(marshalled_body), 147 spec.BasicProperties) 148 self.obj.process(header_frame) 149 body_frame = frame.Body(1, marshalled_body) 150 method_frame, header_frame, body_value = self.obj.process(body_frame) 151 self.assertEqual(marshal.loads(body_value), expectation) 152