1 #include <msgpack.hpp>
2 #include <gtest/gtest.h>
3 #include <sstream>
4
TEST(streaming,basic)5 TEST(streaming, basic)
6 {
7 msgpack::sbuffer buffer;
8
9 msgpack::packer<msgpack::sbuffer> pk(&buffer);
10 pk.pack(1);
11 pk.pack(2);
12 pk.pack(3);
13
14 const char* input = buffer.data();
15 const char* const eof = input + buffer.size();
16
17 msgpack::unpacker pac;
18 msgpack::object_handle oh;
19
20 int count = 0;
21 while(count < 3) {
22 pac.reserve_buffer(32*1024);
23
24 // read buffer into pac.buffer() upto
25 // pac.buffer_capacity() bytes.
26 size_t len = 1;
27 memcpy(pac.buffer(), input, len);
28 input += len;
29
30 pac.buffer_consumed(len);
31
32 while(pac.next(oh)) {
33 msgpack::object obj = oh.get();
34 switch(count++) {
35 case 0:
36 EXPECT_EQ(1, obj.as<int>());
37 break;
38 case 1:
39 EXPECT_EQ(2, obj.as<int>());
40 break;
41 case 2:
42 EXPECT_EQ(3, obj.as<int>());
43 return;
44 }
45 }
46
47 EXPECT_TRUE(input < eof);
48 }
49 }
50
51 // obsolete
52 #if MSGPACK_DEFAULT_API_VERSION == 1
53
TEST(streaming,basic_pointer)54 TEST(streaming, basic_pointer)
55 {
56 msgpack::sbuffer buffer;
57
58 msgpack::packer<msgpack::sbuffer> pk(&buffer);
59 pk.pack(1);
60 pk.pack(2);
61 pk.pack(3);
62
63 const char* input = buffer.data();
64 const char* const eof = input + buffer.size();
65
66 msgpack::unpacker pac;
67 msgpack::object_handle oh;
68
69 int count = 0;
70 while(count < 3) {
71 pac.reserve_buffer(32*1024);
72
73 // read buffer into pac.buffer() upto
74 // pac.buffer_capacity() bytes.
75 size_t len = 1;
76 memcpy(pac.buffer(), input, len);
77 input += len;
78
79 pac.buffer_consumed(len);
80
81 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
82 #pragma GCC diagnostic push
83 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
84 #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
85 while(pac.next(&oh)) {
86 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
87 #pragma GCC diagnostic pop
88 #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
89 msgpack::object obj = oh.get();
90 switch(count++) {
91 case 0:
92 EXPECT_EQ(1, obj.as<int>());
93 break;
94 case 1:
95 EXPECT_EQ(2, obj.as<int>());
96 break;
97 case 2:
98 EXPECT_EQ(3, obj.as<int>());
99 return;
100 }
101 }
102
103 EXPECT_TRUE(input < eof);
104 }
105 }
106
107 #endif // MSGPACK_DEFAULT_API_VERSION == 1
108
109 #if !defined(MSGPACK_USE_CPP03)
110
TEST(streaming,move)111 TEST(streaming, move)
112 {
113 msgpack::sbuffer buffer;
114
115 msgpack::packer<msgpack::sbuffer> pk(&buffer);
116 pk.pack(1);
117 pk.pack(2);
118 pk.pack(3);
119
120 const char* input = buffer.data();
121 const char* const eof = input + buffer.size();
122
123 msgpack::unpacker pac;
124 msgpack::object_handle oh;
125
126 int count = 0;
127 while(count < 3) {
128 msgpack::unpacker pac_in(std::move(pac));
129 pac_in.reserve_buffer(32*1024);
130
131 // read buffer into pac_in.buffer() upto
132 // pac_in.buffer_capac_inity() bytes.
133 size_t len = 1;
134 memcpy(pac_in.buffer(), input, len);
135 input += len;
136
137 pac_in.buffer_consumed(len);
138
139 while(pac_in.next(oh)) {
140 msgpack::object obj = oh.get();
141 switch(count++) {
142 case 0:
143 EXPECT_EQ(1, obj.as<int>());
144 break;
145 case 1:
146 EXPECT_EQ(2, obj.as<int>());
147 break;
148 case 2:
149 EXPECT_EQ(3, obj.as<int>());
150 return;
151 }
152 }
153
154 EXPECT_TRUE(input < eof);
155 pac = std::move(pac_in);
156 }
157 }
158
159 #endif // !defined(MSGPACK_USE_CPP03)
160
161 class event_handler {
162 public:
event_handler(std::istream & input)163 event_handler(std::istream& input) : input(input) { }
~event_handler()164 ~event_handler() { }
165
on_read()166 void on_read()
167 {
168 while(true) {
169 pac.reserve_buffer(32*1024);
170
171 size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
172
173 if(len == 0) {
174 return;
175 }
176
177 pac.buffer_consumed(len);
178
179 msgpack::object_handle oh;
180 while(pac.next(oh)) {
181 on_message(oh.get(), msgpack::move(oh.zone()));
182 }
183
184 if(pac.message_size() > 10*1024*1024) {
185 throw std::runtime_error("message is too large");
186 }
187 }
188 }
189
on_message(msgpack::object obj,msgpack::unique_ptr<msgpack::zone>)190 void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
191 {
192 EXPECT_EQ(expect, obj.as<int>());
193 }
194
195 int expect;
196
197 private:
198 std::istream& input;
199 msgpack::unpacker pac;
200 };
201
TEST(streaming,event)202 TEST(streaming, event)
203 {
204 std::stringstream stream;
205 msgpack::packer<std::ostream> pk(&stream);
206
207 event_handler handler(stream);
208
209 pk.pack(1);
210 handler.expect = 1;
211 handler.on_read();
212
213 pk.pack(2);
214 handler.expect = 2;
215 handler.on_read();
216
217 pk.pack(3);
218 handler.expect = 3;
219 handler.on_read();
220 }
221
222 // obsolete
223 #if MSGPACK_DEFAULT_API_VERSION == 1
224
225 // backward compatibility
TEST(streaming,basic_compat)226 TEST(streaming, basic_compat)
227 {
228 std::ostringstream stream;
229 msgpack::packer<std::ostream> pk(&stream);
230
231 pk.pack(1);
232 pk.pack(2);
233 pk.pack(3);
234
235 std::istringstream input(stream.str());
236
237 msgpack::unpacker pac;
238
239 int count = 0;
240 while(count < 3) {
241 pac.reserve_buffer(32*1024);
242
243 size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
244 pac.buffer_consumed(len);
245
246 while(pac.execute()) {
247 msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
248 msgpack::object obj = pac.data();
249 pac.reset();
250
251 switch(count++) {
252 case 0:
253 EXPECT_EQ(1, obj.as<int>());
254 break;
255 case 1:
256 EXPECT_EQ(2, obj.as<int>());
257 break;
258 case 2:
259 EXPECT_EQ(3, obj.as<int>());
260 return;
261 }
262
263 }
264 }
265 }
266
267
268 // backward compatibility
269 class event_handler_compat {
270 public:
event_handler_compat(std::istream & input)271 event_handler_compat(std::istream& input) : input(input) { }
~event_handler_compat()272 ~event_handler_compat() { }
273
on_read()274 void on_read()
275 {
276 while(true) {
277 pac.reserve_buffer(32*1024);
278
279 size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
280
281 if(len == 0) {
282 return;
283 }
284
285 pac.buffer_consumed(len);
286
287 while(pac.execute()) {
288 msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
289 msgpack::object obj = pac.data();
290 pac.reset();
291 on_message(obj, msgpack::move(z));
292 }
293
294 if(pac.message_size() > 10*1024*1024) {
295 throw std::runtime_error("message is too large");
296 }
297 }
298 }
299
on_message(msgpack::object obj,msgpack::unique_ptr<msgpack::zone>)300 void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
301 {
302 EXPECT_EQ(expect, obj.as<int>());
303 }
304
305 int expect;
306
307 private:
308 std::istream& input;
309 msgpack::unpacker pac;
310 };
311
TEST(streaming,event_compat)312 TEST(streaming, event_compat)
313 {
314 std::stringstream stream;
315 msgpack::packer<std::ostream> pk(&stream);
316
317 event_handler_compat handler(stream);
318
319 pk.pack(1);
320 handler.expect = 1;
321 handler.on_read();
322
323 pk.pack(2);
324 handler.expect = 2;
325 handler.on_read();
326
327 pk.pack(3);
328 handler.expect = 3;
329 handler.on_read();
330 }
331
332 #endif // !defined(MSGPACK_USE_CPP03)
333