1 /*
2     Copyright (c) 2009-2012 250bpm s.r.o.
3     Copyright (c) 2007-2009 iMatix Corporation
4     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
5 
6     This file is part of Crossroads I/O project.
7 
8     Crossroads I/O is free software; you can redistribute it and/or modify it
9     under the terms of the GNU Lesser General Public License as published by
10     the Free Software Foundation; either version 3 of the License, or
11     (at your option) any later version.
12 
13     Crossroads is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU Lesser General Public License for more details.
17 
18     You should have received a copy of the GNU Lesser General Public License
19     along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21 
22 #include <stdlib.h>
23 #include <string.h>
24 #include <limits>
25 
26 #include "decoder.hpp"
27 #include "session_base.hpp"
28 #include "likely.hpp"
29 #include "wire.hpp"
30 #include "err.hpp"
31 
decoder_t(size_t bufsize_,uint64_t maxmsgsize_)32 xs::decoder_t::decoder_t (size_t bufsize_, uint64_t maxmsgsize_) :
33     decoder_base_t <decoder_t> (bufsize_),
34     session (NULL),
35     maxmsgsize (maxmsgsize_)
36 {
37     int rc = in_progress.init ();
38     errno_assert (rc == 0);
39 
40     //  At the beginning, read one byte and go to one_byte_size_ready state.
41     next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
42 }
43 
~decoder_t()44 xs::decoder_t::~decoder_t ()
45 {
46     int rc = in_progress.close ();
47     errno_assert (rc == 0);
48 }
49 
set_session(session_base_t * session_)50 void xs::decoder_t::set_session (session_base_t *session_)
51 {
52     session = session_;
53 }
54 
one_byte_size_ready()55 bool xs::decoder_t::one_byte_size_ready ()
56 {
57     //  First byte of size is read. If it is 0xff read 8-byte size.
58     //  Otherwise allocate the buffer for message data and read the
59     //  message data into it.
60     if (*tmpbuf == 0xff)
61         next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
62     else {
63 
64         //  There has to be at least one byte (the flags) in the message).
65         if (!*tmpbuf) {
66             decoding_error ();
67             return false;
68         }
69 
70         //  in_progress is initialised at this point so in theory we should
71         //  close it before calling xs_msg_init_size, however, it's a 0-byte
72         //  message and thus we can treat it as uninitialised...
73         int rc;
74         if (maxmsgsize >= 0 && (uint64_t) (*tmpbuf - 1) > maxmsgsize) {
75             rc = -1;
76             errno = ENOMEM;
77         }
78         else
79             rc = in_progress.init_size (*tmpbuf - 1);
80         if (rc != 0 && errno == ENOMEM) {
81             rc = in_progress.init ();
82             errno_assert (rc == 0);
83             decoding_error ();
84             return false;
85         }
86         errno_assert (rc == 0);
87 
88         next_step (tmpbuf, 1, &decoder_t::flags_ready);
89     }
90     return true;
91 }
92 
eight_byte_size_ready()93 bool xs::decoder_t::eight_byte_size_ready ()
94 {
95     //  8-byte payload length is read. Allocate the buffer
96     //  for message body and read the message data into it.
97     const uint64_t payload_length = get_uint64 (tmpbuf);
98 
99     //  There has to be at least one byte (the flags) in the message).
100     if (unlikely (payload_length == 0)) {
101         decoding_error ();
102         return false;
103     }
104 
105     //  Message size must not exceed the maximum allowed size.
106     if (unlikely (maxmsgsize >= 0 && payload_length - 1 > maxmsgsize)) {
107         decoding_error ();
108         return false;
109     }
110 
111     //  Message size must fit within range of size_t data type.
112     if (unlikely (payload_length - 1 > std::numeric_limits <size_t>::max ())) {
113         decoding_error ();
114         return false;
115     }
116 
117     const size_t msg_size = static_cast <size_t> (payload_length - 1);
118 
119     //  in_progress is initialised at this point so in theory we should
120     //  close it before calling init_size, however, it's a 0-byte
121     //  message and thus we can treat it as uninitialised...
122     int rc = in_progress.init_size (msg_size);
123     if (rc != 0) {
124         errno_assert (errno == ENOMEM);
125         rc = in_progress.init ();
126         errno_assert (rc == 0);
127         decoding_error ();
128         return false;
129     }
130     errno_assert (rc == 0);
131 
132     next_step (tmpbuf, 1, &decoder_t::flags_ready);
133     return true;
134 }
135 
flags_ready()136 bool xs::decoder_t::flags_ready ()
137 {
138     //  Store the flags from the wire into the message structure.
139     in_progress.set_flags (tmpbuf [0] & 0x01);
140 
141     next_step (in_progress.data (), in_progress.size (),
142         &decoder_t::message_ready);
143 
144     return true;
145 }
146 
message_ready()147 bool xs::decoder_t::message_ready ()
148 {
149     //  Message is completely read. Push it further and start reading
150     //  new message. (in_progress is a 0-byte message after this point.)
151     if (unlikely (!session))
152         return false;
153     int rc = session->write (&in_progress);
154     if (unlikely (rc != 0)) {
155         if (errno != EAGAIN)
156             decoding_error ();
157         return false;
158     }
159 
160     next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
161     return true;
162 }
163