1 #include <msgpack.hpp>
2 #include <gtest/gtest.h>
3 #include <sstream>
4 
TEST(streaming,basic)5 TEST(streaming, basic)
6 {
7     msgpack::sbuffer buffer;
8 
9     msgpack::packer<msgpack::sbuffer> pk(&buffer);
10     pk.pack(1);
11     pk.pack(2);
12     pk.pack(3);
13 
14     const char* input = buffer.data();
15     const char* const eof = input + buffer.size();
16 
17     msgpack::unpacker pac;
18     msgpack::object_handle oh;
19 
20     int count = 0;
21     while(count < 3) {
22         pac.reserve_buffer(32*1024);
23 
24         // read buffer into pac.buffer() upto
25         // pac.buffer_capacity() bytes.
26         size_t len = 1;
27         memcpy(pac.buffer(), input, len);
28         input += len;
29 
30         pac.buffer_consumed(len);
31 
32         while(pac.next(oh)) {
33             msgpack::object obj = oh.get();
34             switch(count++) {
35             case 0:
36                 EXPECT_EQ(1, obj.as<int>());
37                 break;
38             case 1:
39                 EXPECT_EQ(2, obj.as<int>());
40                 break;
41             case 2:
42                 EXPECT_EQ(3, obj.as<int>());
43                 return;
44             }
45         }
46 
47         EXPECT_TRUE(input < eof);
48     }
49 }
50 
51 // obsolete
52 #if MSGPACK_DEFAULT_API_VERSION == 1
53 
TEST(streaming,basic_pointer)54 TEST(streaming, basic_pointer)
55 {
56     msgpack::sbuffer buffer;
57 
58     msgpack::packer<msgpack::sbuffer> pk(&buffer);
59     pk.pack(1);
60     pk.pack(2);
61     pk.pack(3);
62 
63     const char* input = buffer.data();
64     const char* const eof = input + buffer.size();
65 
66     msgpack::unpacker pac;
67     msgpack::object_handle oh;
68 
69     int count = 0;
70     while(count < 3) {
71         pac.reserve_buffer(32*1024);
72 
73         // read buffer into pac.buffer() upto
74         // pac.buffer_capacity() bytes.
75         size_t len = 1;
76         memcpy(pac.buffer(), input, len);
77         input += len;
78 
79         pac.buffer_consumed(len);
80 
81 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
82 #pragma GCC diagnostic push
83 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
84 #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
85         while(pac.next(&oh)) {
86 #if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
87 #pragma GCC diagnostic pop
88 #endif // (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2))
89             msgpack::object obj = oh.get();
90             switch(count++) {
91             case 0:
92                 EXPECT_EQ(1, obj.as<int>());
93                 break;
94             case 1:
95                 EXPECT_EQ(2, obj.as<int>());
96                 break;
97             case 2:
98                 EXPECT_EQ(3, obj.as<int>());
99                 return;
100             }
101         }
102 
103         EXPECT_TRUE(input < eof);
104     }
105 }
106 
107 #endif // MSGPACK_DEFAULT_API_VERSION == 1
108 
109 #if !defined(MSGPACK_USE_CPP03)
110 
TEST(streaming,move)111 TEST(streaming, move)
112 {
113     msgpack::sbuffer buffer;
114 
115     msgpack::packer<msgpack::sbuffer> pk(&buffer);
116     pk.pack(1);
117     pk.pack(2);
118     pk.pack(3);
119 
120     const char* input = buffer.data();
121     const char* const eof = input + buffer.size();
122 
123     msgpack::unpacker pac;
124     msgpack::object_handle oh;
125 
126     int count = 0;
127     while(count < 3) {
128         msgpack::unpacker pac_in(std::move(pac));
129         pac_in.reserve_buffer(32*1024);
130 
131         // read buffer into pac_in.buffer() upto
132         // pac_in.buffer_capac_inity() bytes.
133         size_t len = 1;
134         memcpy(pac_in.buffer(), input, len);
135         input += len;
136 
137         pac_in.buffer_consumed(len);
138 
139         while(pac_in.next(oh)) {
140             msgpack::object obj = oh.get();
141             switch(count++) {
142             case 0:
143                 EXPECT_EQ(1, obj.as<int>());
144                 break;
145             case 1:
146                 EXPECT_EQ(2, obj.as<int>());
147                 break;
148             case 2:
149                 EXPECT_EQ(3, obj.as<int>());
150                 return;
151             }
152         }
153 
154         EXPECT_TRUE(input < eof);
155         pac = std::move(pac_in);
156     }
157 }
158 
159 #endif // !defined(MSGPACK_USE_CPP03)
160 
161 class event_handler {
162 public:
event_handler(std::istream & input)163     event_handler(std::istream& input) : input(input) { }
~event_handler()164     ~event_handler() { }
165 
on_read()166     void on_read()
167     {
168         while(true) {
169             pac.reserve_buffer(32*1024);
170 
171             size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
172 
173             if(len == 0) {
174                 return;
175             }
176 
177             pac.buffer_consumed(len);
178 
179             msgpack::object_handle oh;
180             while(pac.next(oh)) {
181                 on_message(oh.get(), msgpack::move(oh.zone()));
182             }
183 
184             if(pac.message_size() > 10*1024*1024) {
185                 throw std::runtime_error("message is too large");
186             }
187         }
188     }
189 
on_message(msgpack::object obj,msgpack::unique_ptr<msgpack::zone>)190     void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
191     {
192         EXPECT_EQ(expect, obj.as<int>());
193     }
194 
195     int expect;
196 
197 private:
198     std::istream& input;
199     msgpack::unpacker pac;
200 };
201 
TEST(streaming,event)202 TEST(streaming, event)
203 {
204     std::stringstream stream;
205     msgpack::packer<std::ostream> pk(&stream);
206 
207     event_handler handler(stream);
208 
209     pk.pack(1);
210     handler.expect = 1;
211     handler.on_read();
212 
213     pk.pack(2);
214     handler.expect = 2;
215     handler.on_read();
216 
217     pk.pack(3);
218     handler.expect = 3;
219     handler.on_read();
220 }
221 
222 // obsolete
223 #if MSGPACK_DEFAULT_API_VERSION == 1
224 
225 // backward compatibility
TEST(streaming,basic_compat)226 TEST(streaming, basic_compat)
227 {
228     std::ostringstream stream;
229     msgpack::packer<std::ostream> pk(&stream);
230 
231     pk.pack(1);
232     pk.pack(2);
233     pk.pack(3);
234 
235     std::istringstream input(stream.str());
236 
237     msgpack::unpacker pac;
238 
239     int count = 0;
240     while(count < 3) {
241         pac.reserve_buffer(32*1024);
242 
243         size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
244         pac.buffer_consumed(len);
245 
246         while(pac.execute()) {
247             msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
248             msgpack::object obj = pac.data();
249             pac.reset();
250 
251             switch(count++) {
252             case 0:
253                 EXPECT_EQ(1, obj.as<int>());
254                 break;
255             case 1:
256                 EXPECT_EQ(2, obj.as<int>());
257                 break;
258             case 2:
259                 EXPECT_EQ(3, obj.as<int>());
260                 return;
261             }
262 
263         }
264     }
265 }
266 
267 
268 // backward compatibility
269 class event_handler_compat {
270 public:
event_handler_compat(std::istream & input)271     event_handler_compat(std::istream& input) : input(input) { }
~event_handler_compat()272     ~event_handler_compat() { }
273 
on_read()274     void on_read()
275     {
276         while(true) {
277             pac.reserve_buffer(32*1024);
278 
279             size_t len = static_cast<size_t>(input.readsome(pac.buffer(), pac.buffer_capacity()));
280 
281             if(len == 0) {
282                 return;
283             }
284 
285             pac.buffer_consumed(len);
286 
287             while(pac.execute()) {
288                 msgpack::unique_ptr<msgpack::zone> z(pac.release_zone());
289                 msgpack::object obj = pac.data();
290                 pac.reset();
291                 on_message(obj, msgpack::move(z));
292             }
293 
294             if(pac.message_size() > 10*1024*1024) {
295                 throw std::runtime_error("message is too large");
296             }
297         }
298     }
299 
on_message(msgpack::object obj,msgpack::unique_ptr<msgpack::zone>)300     void on_message(msgpack::object obj, msgpack::unique_ptr<msgpack::zone>)
301     {
302         EXPECT_EQ(expect, obj.as<int>());
303     }
304 
305     int expect;
306 
307 private:
308     std::istream& input;
309     msgpack::unpacker pac;
310 };
311 
TEST(streaming,event_compat)312 TEST(streaming, event_compat)
313 {
314     std::stringstream stream;
315     msgpack::packer<std::ostream> pk(&stream);
316 
317     event_handler_compat handler(stream);
318 
319     pk.pack(1);
320     handler.expect = 1;
321     handler.on_read();
322 
323     pk.pack(2);
324     handler.expect = 2;
325     handler.on_read();
326 
327     pk.pack(3);
328     handler.expect = 3;
329     handler.on_read();
330 }
331 
332 #endif // !defined(MSGPACK_USE_CPP03)
333