1 /*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <stdlib.h>
32 #include <string.h>
33 #include <cmath>
34
35 #include "ws_protocol.hpp"
36 #include "ws_decoder.hpp"
37 #include "likely.hpp"
38 #include "wire.hpp"
39 #include "err.hpp"
40
ws_decoder_t(size_t bufsize_,int64_t maxmsgsize_,bool zero_copy_,bool must_mask_)41 zmq::ws_decoder_t::ws_decoder_t (size_t bufsize_,
42 int64_t maxmsgsize_,
43 bool zero_copy_,
44 bool must_mask_) :
45 decoder_base_t<ws_decoder_t, shared_message_memory_allocator> (bufsize_),
46 _msg_flags (0),
47 _zero_copy (zero_copy_),
48 _max_msg_size (maxmsgsize_),
49 _must_mask (must_mask_),
50 _size (0)
51 {
52 memset (_tmpbuf, 0, sizeof (_tmpbuf));
53 int rc = _in_progress.init ();
54 errno_assert (rc == 0);
55
56 // At the beginning, read one byte and go to opcode_ready state.
57 next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
58 }
59
~ws_decoder_t()60 zmq::ws_decoder_t::~ws_decoder_t ()
61 {
62 const int rc = _in_progress.close ();
63 errno_assert (rc == 0);
64 }
65
opcode_ready(unsigned char const *)66 int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
67 {
68 const bool final = (_tmpbuf[0] & 0x80) != 0; // final bit
69 if (!final)
70 return -1; // non final messages are not supported
71
72 _opcode = static_cast<zmq::ws_protocol_t::opcode_t> (_tmpbuf[0] & 0xF);
73
74 _msg_flags = 0;
75
76 switch (_opcode) {
77 case zmq::ws_protocol_t::opcode_binary:
78 break;
79 case zmq::ws_protocol_t::opcode_close:
80 _msg_flags = msg_t::command | msg_t::close_cmd;
81 break;
82 case zmq::ws_protocol_t::opcode_ping:
83 _msg_flags = msg_t::ping | msg_t::command;
84 break;
85 case zmq::ws_protocol_t::opcode_pong:
86 _msg_flags = msg_t::pong | msg_t::command;
87 break;
88 default:
89 return -1;
90 }
91
92 next_step (_tmpbuf, 1, &ws_decoder_t::size_first_byte_ready);
93
94 return 0;
95 }
96
size_first_byte_ready(unsigned char const * read_from_)97 int zmq::ws_decoder_t::size_first_byte_ready (unsigned char const *read_from_)
98 {
99 const bool is_masked = (_tmpbuf[0] & 0x80) != 0;
100
101 if (is_masked != _must_mask) // wrong mask value
102 return -1;
103
104 _size = static_cast<uint64_t> (_tmpbuf[0] & 0x7F);
105
106 if (_size < 126) {
107 if (_must_mask)
108 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
109 else if (_opcode == ws_protocol_t::opcode_binary) {
110 if (_size == 0)
111 return -1;
112 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
113 } else
114 return size_ready (read_from_);
115 } else if (_size == 126)
116 next_step (_tmpbuf, 2, &ws_decoder_t::short_size_ready);
117 else
118 next_step (_tmpbuf, 8, &ws_decoder_t::long_size_ready);
119
120 return 0;
121 }
122
123
short_size_ready(unsigned char const * read_from_)124 int zmq::ws_decoder_t::short_size_ready (unsigned char const *read_from_)
125 {
126 _size = (_tmpbuf[0] << 8) | _tmpbuf[1];
127
128 if (_must_mask)
129 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
130 else if (_opcode == ws_protocol_t::opcode_binary) {
131 if (_size == 0)
132 return -1;
133 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
134 } else
135 return size_ready (read_from_);
136
137 return 0;
138 }
139
long_size_ready(unsigned char const * read_from_)140 int zmq::ws_decoder_t::long_size_ready (unsigned char const *read_from_)
141 {
142 // The payload size is encoded as 64-bit unsigned integer.
143 // The most significant byte comes first.
144 _size = get_uint64 (_tmpbuf);
145
146 if (_must_mask)
147 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
148 else if (_opcode == ws_protocol_t::opcode_binary) {
149 if (_size == 0)
150 return -1;
151 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
152 } else
153 return size_ready (read_from_);
154
155 return 0;
156 }
157
mask_ready(unsigned char const * read_from_)158 int zmq::ws_decoder_t::mask_ready (unsigned char const *read_from_)
159 {
160 memcpy (_mask, _tmpbuf, 4);
161
162 if (_opcode == ws_protocol_t::opcode_binary) {
163 if (_size == 0)
164 return -1;
165
166 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
167 } else
168 return size_ready (read_from_);
169
170 return 0;
171 }
172
flags_ready(unsigned char const * read_from_)173 int zmq::ws_decoder_t::flags_ready (unsigned char const *read_from_)
174 {
175 unsigned char flags;
176
177 if (_must_mask)
178 flags = _tmpbuf[0] ^ _mask[0];
179 else
180 flags = _tmpbuf[0];
181
182 if (flags & ws_protocol_t::more_flag)
183 _msg_flags |= msg_t::more;
184 if (flags & ws_protocol_t::command_flag)
185 _msg_flags |= msg_t::command;
186
187 _size--;
188
189 return size_ready (read_from_);
190 }
191
192
size_ready(unsigned char const * read_pos_)193 int zmq::ws_decoder_t::size_ready (unsigned char const *read_pos_)
194 {
195 // Message size must not exceed the maximum allowed size.
196 if (_max_msg_size >= 0)
197 if (unlikely (_size > static_cast<uint64_t> (_max_msg_size))) {
198 errno = EMSGSIZE;
199 return -1;
200 }
201
202 // Message size must fit into size_t data type.
203 if (unlikely (_size != static_cast<size_t> (_size))) {
204 errno = EMSGSIZE;
205 return -1;
206 }
207
208 int rc = _in_progress.close ();
209 assert (rc == 0);
210
211 // the current message can exceed the current buffer. We have to copy the buffer
212 // data into a new message and complete it in the next receive.
213
214 shared_message_memory_allocator &allocator = get_allocator ();
215 if (unlikely (!_zero_copy || allocator.data () > read_pos_
216 || static_cast<size_t> (read_pos_ - allocator.data ())
217 > allocator.size ()
218 || _size > static_cast<size_t> (
219 allocator.data () + allocator.size () - read_pos_))) {
220 // a new message has started, but the size would exceed the pre-allocated arena
221 // (or read_pos_ is in the initial handshake buffer)
222 // this happens every time when a message does not fit completely into the buffer
223 rc = _in_progress.init_size (static_cast<size_t> (_size));
224 } else {
225 // construct message using n bytes from the buffer as storage
226 // increase buffer ref count
227 // if the message will be a large message, pass a valid refcnt memory location as well
228 rc = _in_progress.init (
229 const_cast<unsigned char *> (read_pos_), static_cast<size_t> (_size),
230 shared_message_memory_allocator::call_dec_ref, allocator.buffer (),
231 allocator.provide_content ());
232
233 // For small messages, data has been copied and refcount does not have to be increased
234 if (_in_progress.is_zcmsg ()) {
235 allocator.advance_content ();
236 allocator.inc_ref ();
237 }
238 }
239
240 if (unlikely (rc)) {
241 errno_assert (errno == ENOMEM);
242 rc = _in_progress.init ();
243 errno_assert (rc == 0);
244 errno = ENOMEM;
245 return -1;
246 }
247
248 _in_progress.set_flags (_msg_flags);
249 // this sets read_pos to
250 // the message data address if the data needs to be copied
251 // for small message / messages exceeding the current buffer
252 // or
253 // to the current start address in the buffer because the message
254 // was constructed to use n bytes from the address passed as argument
255 next_step (_in_progress.data (), _in_progress.size (),
256 &ws_decoder_t::message_ready);
257
258 return 0;
259 }
260
message_ready(unsigned char const *)261 int zmq::ws_decoder_t::message_ready (unsigned char const *)
262 {
263 if (_must_mask) {
264 int mask_index = _opcode == ws_protocol_t::opcode_binary ? 1 : 0;
265
266 unsigned char *data =
267 static_cast<unsigned char *> (_in_progress.data ());
268 for (size_t i = 0; i < _size; ++i, mask_index++)
269 data[i] = data[i] ^ _mask[mask_index % 4];
270 }
271
272 // Message is completely read. Signal this to the caller
273 // and prepare to decode next message.
274 next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
275 return 1;
276 }
277