1# -*- encoding: utf-8 -*-
2"""
3Tests for pika.channel.ContentFrameAssembler
4
5"""
6import marshal
7import unittest
8
9from pika import channel, exceptions, frame, spec
10
11
12class ContentFrameAssemblerTests(unittest.TestCase):
13    def setUp(self):
14        self.obj = channel.ContentFrameAssembler()
15
16    def test_init_method_frame(self):
17        self.assertEqual(self.obj._method_frame, None)
18
19    def test_init_header_frame(self):
20        self.assertEqual(self.obj._header_frame, None)
21
22    def test_init_seen_so_far(self):
23        self.assertEqual(self.obj._seen_so_far, 0)
24
25    def test_init_body_fragments(self):
26        self.assertEqual(self.obj._body_fragments, list())
27
28    def test_process_with_basic_deliver(self):
29        value = frame.Method(1, spec.Basic.Deliver())
30        self.obj.process(value)
31        self.assertEqual(self.obj._method_frame, value)
32
33    def test_process_with_content_header(self):
34        value = frame.Header(1, 100, spec.BasicProperties)
35        self.obj.process(value)
36        self.assertEqual(self.obj._header_frame, value)
37
38    def test_process_with_body_frame_partial(self):
39        value = frame.Header(1, 100, spec.BasicProperties)
40        self.obj.process(value)
41        value = frame.Method(1, spec.Basic.Deliver())
42        self.obj.process(value)
43        value = frame.Body(1, b'abc123')
44        self.obj.process(value)
45        self.assertEqual(self.obj._body_fragments, [value.fragment])
46
47    def test_process_with_full_message(self):
48        method_frame = frame.Method(1, spec.Basic.Deliver())
49        self.obj.process(method_frame)
50        header_frame = frame.Header(1, 6, spec.BasicProperties)
51        self.obj.process(header_frame)
52        body_frame = frame.Body(1, b'abc123')
53        response = self.obj.process(body_frame)
54        self.assertEqual(response, (method_frame, header_frame, b'abc123'))
55
56    def test_process_with_body_frame_six_bytes(self):
57        method_frame = frame.Method(1, spec.Basic.Deliver())
58        self.obj.process(method_frame)
59        header_frame = frame.Header(1, 10, spec.BasicProperties)
60        self.obj.process(header_frame)
61        body_frame = frame.Body(1, b'abc123')
62        self.obj.process(body_frame)
63        self.assertEqual(self.obj._seen_so_far, 6)
64
65    def test_process_with_body_frame_too_big(self):
66        method_frame = frame.Method(1, spec.Basic.Deliver())
67        self.obj.process(method_frame)
68        header_frame = frame.Header(1, 6, spec.BasicProperties)
69        self.obj.process(header_frame)
70        body_frame = frame.Body(1, b'abcd1234')
71        self.assertRaises(exceptions.BodyTooLongError, self.obj.process,
72                          body_frame)
73
74    def test_process_with_unexpected_frame_type(self):
75        value = frame.Method(1, spec.Basic.Qos())
76        self.assertRaises(exceptions.UnexpectedFrameError, self.obj.process,
77                          value)
78
79    def test_reset_method_frame(self):
80        method_frame = frame.Method(1, spec.Basic.Deliver())
81        self.obj.process(method_frame)
82        header_frame = frame.Header(1, 10, spec.BasicProperties)
83        self.obj.process(header_frame)
84        body_frame = frame.Body(1, b'abc123')
85        self.obj.process(body_frame)
86        self.obj._reset()
87        self.assertEqual(self.obj._method_frame, None)
88
89    def test_reset_header_frame(self):
90        method_frame = frame.Method(1, spec.Basic.Deliver())
91        self.obj.process(method_frame)
92        header_frame = frame.Header(1, 10, spec.BasicProperties)
93        self.obj.process(header_frame)
94        body_frame = frame.Body(1, b'abc123')
95        self.obj.process(body_frame)
96        self.obj._reset()
97        self.assertEqual(self.obj._header_frame, None)
98
99    def test_reset_seen_so_far(self):
100        method_frame = frame.Method(1, spec.Basic.Deliver())
101        self.obj.process(method_frame)
102        header_frame = frame.Header(1, 10, spec.BasicProperties)
103        self.obj.process(header_frame)
104        body_frame = frame.Body(1, b'abc123')
105        self.obj.process(body_frame)
106        self.obj._reset()
107        self.assertEqual(self.obj._seen_so_far, 0)
108
109    def test_reset_body_fragments(self):
110        method_frame = frame.Method(1, spec.Basic.Deliver())
111        self.obj.process(method_frame)
112        header_frame = frame.Header(1, 10, spec.BasicProperties)
113        self.obj.process(header_frame)
114        body_frame = frame.Body(1, b'abc123')
115        self.obj.process(body_frame)
116        self.obj._reset()
117        self.assertEqual(self.obj._body_fragments, list())
118
119    def test_ascii_bytes_body_instance(self):
120        method_frame = frame.Method(1, spec.Basic.Deliver())
121        self.obj.process(method_frame)
122        header_frame = frame.Header(1, 11, spec.BasicProperties)
123        self.obj.process(header_frame)
124        body_frame = frame.Body(1, b'foo-bar-baz')
125        method_frame, header_frame, body_value = self.obj.process(body_frame)
126        self.assertIsInstance(body_value, bytes)
127
128    def test_ascii_body_value(self):
129        expectation = b'foo-bar-baz'
130        self.obj = channel.ContentFrameAssembler()
131        method_frame = frame.Method(1, spec.Basic.Deliver())
132        self.obj.process(method_frame)
133        header_frame = frame.Header(1, 11, spec.BasicProperties)
134        self.obj.process(header_frame)
135        body_frame = frame.Body(1, b'foo-bar-baz')
136        method_frame, header_frame, body_value = self.obj.process(body_frame)
137        self.assertEqual(body_value, expectation)
138        self.assertIsInstance(body_value, bytes)
139
140    def test_binary_non_unicode_value(self):
141        expectation = ('a', 0.8)
142        self.obj = channel.ContentFrameAssembler()
143        method_frame = frame.Method(1, spec.Basic.Deliver())
144        self.obj.process(method_frame)
145        marshalled_body = marshal.dumps(expectation)
146        header_frame = frame.Header(1, len(marshalled_body),
147                                    spec.BasicProperties)
148        self.obj.process(header_frame)
149        body_frame = frame.Body(1, marshalled_body)
150        method_frame, header_frame, body_value = self.obj.process(body_frame)
151        self.assertEqual(marshal.loads(body_value), expectation)
152