1 /*
2 Copyright (c) 2009-2012 250bpm s.r.o.
3 Copyright (c) 2007-2009 iMatix Corporation
4 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
5
6 This file is part of Crossroads I/O project.
7
8 Crossroads I/O is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
12
13 Crossroads is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <limits>
25
26 #include "decoder.hpp"
27 #include "session_base.hpp"
28 #include "likely.hpp"
29 #include "wire.hpp"
30 #include "err.hpp"
31
decoder_t(size_t bufsize_,uint64_t maxmsgsize_)32 xs::decoder_t::decoder_t (size_t bufsize_, uint64_t maxmsgsize_) :
33 decoder_base_t <decoder_t> (bufsize_),
34 session (NULL),
35 maxmsgsize (maxmsgsize_)
36 {
37 int rc = in_progress.init ();
38 errno_assert (rc == 0);
39
40 // At the beginning, read one byte and go to one_byte_size_ready state.
41 next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
42 }
43
~decoder_t()44 xs::decoder_t::~decoder_t ()
45 {
46 int rc = in_progress.close ();
47 errno_assert (rc == 0);
48 }
49
set_session(session_base_t * session_)50 void xs::decoder_t::set_session (session_base_t *session_)
51 {
52 session = session_;
53 }
54
one_byte_size_ready()55 bool xs::decoder_t::one_byte_size_ready ()
56 {
57 // First byte of size is read. If it is 0xff read 8-byte size.
58 // Otherwise allocate the buffer for message data and read the
59 // message data into it.
60 if (*tmpbuf == 0xff)
61 next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
62 else {
63
64 // There has to be at least one byte (the flags) in the message).
65 if (!*tmpbuf) {
66 decoding_error ();
67 return false;
68 }
69
70 // in_progress is initialised at this point so in theory we should
71 // close it before calling xs_msg_init_size, however, it's a 0-byte
72 // message and thus we can treat it as uninitialised...
73 int rc;
74 if (maxmsgsize >= 0 && (uint64_t) (*tmpbuf - 1) > maxmsgsize) {
75 rc = -1;
76 errno = ENOMEM;
77 }
78 else
79 rc = in_progress.init_size (*tmpbuf - 1);
80 if (rc != 0 && errno == ENOMEM) {
81 rc = in_progress.init ();
82 errno_assert (rc == 0);
83 decoding_error ();
84 return false;
85 }
86 errno_assert (rc == 0);
87
88 next_step (tmpbuf, 1, &decoder_t::flags_ready);
89 }
90 return true;
91 }
92
eight_byte_size_ready()93 bool xs::decoder_t::eight_byte_size_ready ()
94 {
95 // 8-byte payload length is read. Allocate the buffer
96 // for message body and read the message data into it.
97 const uint64_t payload_length = get_uint64 (tmpbuf);
98
99 // There has to be at least one byte (the flags) in the message).
100 if (unlikely (payload_length == 0)) {
101 decoding_error ();
102 return false;
103 }
104
105 // Message size must not exceed the maximum allowed size.
106 if (unlikely (maxmsgsize >= 0 && payload_length - 1 > maxmsgsize)) {
107 decoding_error ();
108 return false;
109 }
110
111 // Message size must fit within range of size_t data type.
112 if (unlikely (payload_length - 1 > std::numeric_limits <size_t>::max ())) {
113 decoding_error ();
114 return false;
115 }
116
117 const size_t msg_size = static_cast <size_t> (payload_length - 1);
118
119 // in_progress is initialised at this point so in theory we should
120 // close it before calling init_size, however, it's a 0-byte
121 // message and thus we can treat it as uninitialised...
122 int rc = in_progress.init_size (msg_size);
123 if (rc != 0) {
124 errno_assert (errno == ENOMEM);
125 rc = in_progress.init ();
126 errno_assert (rc == 0);
127 decoding_error ();
128 return false;
129 }
130 errno_assert (rc == 0);
131
132 next_step (tmpbuf, 1, &decoder_t::flags_ready);
133 return true;
134 }
135
flags_ready()136 bool xs::decoder_t::flags_ready ()
137 {
138 // Store the flags from the wire into the message structure.
139 in_progress.set_flags (tmpbuf [0] & 0x01);
140
141 next_step (in_progress.data (), in_progress.size (),
142 &decoder_t::message_ready);
143
144 return true;
145 }
146
message_ready()147 bool xs::decoder_t::message_ready ()
148 {
149 // Message is completely read. Push it further and start reading
150 // new message. (in_progress is a 0-byte message after this point.)
151 if (unlikely (!session))
152 return false;
153 int rc = session->write (&in_progress);
154 if (unlikely (rc != 0)) {
155 if (errno != EAGAIN)
156 decoding_error ();
157 return false;
158 }
159
160 next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
161 return true;
162 }
163