1 /*
2  * Copyright (c) 2013 Balabit
3  * Copyright (c) 2013 Viktor Tusa <tusa@balabit.hu>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * As an additional exemption you are allowed to compile & link against the
19  * OpenSSL libraries as published by the OpenSSL project. See the file
20  * COPYING for details.
21  *
22  */
23 
24 #include "stomp.h"
25 #include "host-resolve.h"
26 #include "str-utils.h"
27 #include "messages.h"
28 
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <stdio.h>
33 #include <poll.h>
34 #include <unistd.h>
35 
36 
37 #define STOMP_PARSE_HEADER 1
38 #define STOMP_PARSE_DATA 2
39 #define STOMP_PARSE_ERROR 0
40 
41 void
stomp_frame_init(stomp_frame * frame,const char * command,int command_len)42 stomp_frame_init(stomp_frame *frame, const char *command, int command_len)
43 {
44   frame->command = g_strndup(command, command_len);
45   frame->headers = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
46   frame->body_length = -1;
47   frame->body = NULL;
48 };
49 
50 void
stomp_frame_add_header(stomp_frame * frame,const char * name,const char * value)51 stomp_frame_add_header(stomp_frame *frame, const char *name, const char *value)
52 {
53   msg_debug("Adding header",
54             evt_tag_str("name", name),
55             evt_tag_str("value", value));
56 
57   g_hash_table_insert(frame->headers, g_strdup(name), g_strdup(value));
58 };
59 
60 void
stomp_frame_add_header_len(stomp_frame * frame,const char * name,int name_len,const char * value,int value_len)61 stomp_frame_add_header_len(stomp_frame *frame, const char *name, int name_len, const char *value, int value_len)
62 {
63   char *name_slice = g_strndup(name, name_len);
64   char *value_slice = g_strndup(value, value_len);
65   msg_debug("Adding header",
66             evt_tag_str("name", name_slice),
67             evt_tag_str("value", value_slice));
68 
69   g_hash_table_insert(frame->headers, name_slice, value_slice);
70 };
71 
72 void
stomp_frame_set_body(stomp_frame * frame,const char * body,int body_len)73 stomp_frame_set_body(stomp_frame *frame, const char *body, int body_len)
74 {
75   frame->body = g_strndup(body, body_len);
76   frame->body_length = body_len;
77 };
78 
79 int
stomp_frame_deinit(stomp_frame * frame)80 stomp_frame_deinit(stomp_frame *frame)
81 {
82   g_hash_table_destroy(frame->headers);
83   g_free(frame->command);
84   g_free(frame->body);
85 
86   return TRUE;
87 }
88 
89 static void
_stomp_connection_free(stomp_connection * conn)90 _stomp_connection_free(stomp_connection *conn)
91 {
92   g_sockaddr_unref(conn->remote_sa);
93   g_free(conn);
94 }
95 
96 int
stomp_connect(stomp_connection ** connection_ref,char * hostname,int port)97 stomp_connect(stomp_connection **connection_ref, char *hostname, int port)
98 {
99   stomp_connection *conn;
100 
101   conn = g_new0(stomp_connection, 1);
102 
103   conn->socket = socket(AF_INET, SOCK_STREAM, 0);
104   if (conn->socket == -1)
105     {
106       msg_error("Failed to create socket!");
107       return FALSE;
108     }
109 
110   if (!resolve_hostname_to_sockaddr(&conn->remote_sa, AF_INET, hostname))
111     {
112       msg_error("Failed to resolve hostname in stomp driver",
113                 evt_tag_str("hostname", hostname));
114 
115       return FALSE;
116     }
117 
118   g_sockaddr_set_port(conn->remote_sa, port);
119   if (!g_connect(conn->socket, conn->remote_sa))
120     {
121       msg_error("Stomp connection failed",
122                 evt_tag_str("host", hostname));
123       _stomp_connection_free(conn);
124       return FALSE;
125     }
126 
127   (*connection_ref) = conn;
128 
129   return TRUE;
130 };
131 
132 int
stomp_disconnect(stomp_connection ** connection_ref)133 stomp_disconnect(stomp_connection **connection_ref)
134 {
135   stomp_connection *conn = *connection_ref;
136 
137   if (!conn)
138     return TRUE;
139 
140   shutdown(conn->socket, SHUT_RDWR);
141   close(conn->socket);
142   _stomp_connection_free(conn);
143   *connection_ref = NULL;
144   return TRUE;
145 };
146 
147 static void
write_header_into_gstring(gpointer key,gpointer value,gpointer userdata)148 write_header_into_gstring(gpointer key, gpointer value, gpointer userdata)
149 {
150   GString *str = (GString *) userdata;
151 
152   if (key == NULL || value == NULL)
153     return;
154 
155   g_string_append(str, key);
156   g_string_append_c(str, ':');
157   g_string_append(str, value);
158   g_string_append_c(str, '\n');
159 }
160 
161 static int
write_gstring_to_socket(int fd,GString * data)162 write_gstring_to_socket(int fd, GString *data)
163 {
164   int res = 0;
165   int remaining = data->len;
166 
167   while ((remaining > 0) && (res >= 0))
168     {
169       res = write(fd, data->str + (data->len - remaining), remaining);
170       if (res > 0)
171         remaining = remaining - res;
172     }
173 
174   if (res < 0)
175     {
176       msg_error("Error happened during write",
177                 evt_tag_error("errno"));
178       return FALSE;
179     }
180 
181   return TRUE;
182 }
183 
184 static int
stomp_read_data(stomp_connection * connection,GString * buffer)185 stomp_read_data(stomp_connection *connection, GString *buffer)
186 {
187   char tmp_buf[4096];
188   int res;
189 
190   res = read(connection->socket, tmp_buf, sizeof(tmp_buf));
191   if (res < 0)
192     return FALSE;
193 
194   g_string_assign_len(buffer, tmp_buf, res);
195   while (res == sizeof(tmp_buf))
196     {
197       res = read(connection->socket, tmp_buf, sizeof(tmp_buf));
198       g_string_append_len(buffer, tmp_buf, res);
199     }
200   return TRUE;
201 }
202 
203 static int
stomp_parse_command(char * buffer,int buflen,stomp_frame * frame,char ** out_pos)204 stomp_parse_command(char *buffer, int buflen, stomp_frame *frame, char **out_pos)
205 {
206   char *pos;
207 
208   pos = g_strstr_len(buffer, buflen, "\n");
209   if (pos == NULL)
210     return STOMP_PARSE_ERROR;
211 
212   stomp_frame_init(frame, buffer, pos - buffer);
213   *out_pos = pos + 1;
214 
215   return STOMP_PARSE_HEADER;
216 }
217 
218 static int
stomp_parse_header(char * buffer,int buflen,stomp_frame * frame,char ** out_pos)219 stomp_parse_header(char *buffer, int buflen, stomp_frame *frame, char **out_pos)
220 {
221   char *pos, *colon;
222 
223   if (buflen <= 1)
224     {
225       *out_pos = buffer;
226       return STOMP_PARSE_DATA;
227     }
228 
229   pos = g_strstr_len(buffer, buflen, "\n");
230   if (!pos)
231     return STOMP_PARSE_ERROR;
232 
233   if (pos == buffer)
234     {
235       *out_pos = pos + 1;
236       return STOMP_PARSE_DATA;
237     }
238 
239   colon = g_strstr_len(buffer, pos - buffer, ":");
240   if (!colon)
241     return STOMP_PARSE_ERROR;
242 
243   stomp_frame_add_header_len(frame, buffer, colon - buffer,
244                              colon + 1, pos - colon - 1);
245   *out_pos = pos + 1;
246 
247   return STOMP_PARSE_HEADER;
248 };
249 
250 int
stomp_parse_frame(GString * data,stomp_frame * frame)251 stomp_parse_frame(GString *data, stomp_frame *frame)
252 {
253   char *pos;
254   int res;
255 
256   res = stomp_parse_command(data->str, data->len, frame, &pos);
257   if (!res)
258     return FALSE;
259 
260   res = stomp_parse_header(pos, data->str + data->len - pos, frame, &pos);
261   while (res == STOMP_PARSE_HEADER)
262     {
263       res = stomp_parse_header(pos, data->str + data->len - pos, frame, &pos);
264     }
265 
266   if (res == STOMP_PARSE_ERROR)
267     return FALSE;
268 
269   frame->body = g_strndup(pos, data->len - (pos - data->str));
270   return TRUE;
271 }
272 
273 int
stomp_receive_frame(stomp_connection * connection,stomp_frame * frame)274 stomp_receive_frame(stomp_connection *connection, stomp_frame *frame)
275 {
276   GString *data = g_string_sized_new(4096);
277 
278   if (!stomp_read_data(connection, data))
279     {
280       g_string_free(data, TRUE);
281       return FALSE;
282     }
283 
284   int res = stomp_parse_frame(data, frame);
285 
286   if (res)
287     msg_debug("Frame received", evt_tag_str("command", frame->command));
288 
289   g_string_free(data, TRUE);
290   return res;
291 }
292 
293 static int
stomp_check_for_frame(stomp_connection * connection)294 stomp_check_for_frame(stomp_connection *connection)
295 {
296   struct pollfd pfd;
297 
298   pfd.fd = connection->socket;
299   pfd.events = POLLIN | POLLPRI;
300 
301   poll(&pfd, 1, 0);
302   if (pfd.revents & ( POLLIN | POLLPRI))
303     {
304       stomp_frame frame;
305 
306       if (!stomp_receive_frame(connection, &frame))
307         return FALSE;
308       if (!strcmp(frame.command, "ERROR"))
309         {
310           msg_error("ERROR frame received from stomp_server");
311           stomp_frame_deinit(&frame);
312           return FALSE;
313         }
314 
315       /* According to stomp protocol, here only ERROR or RECEIPT
316          frames can come, so we missed a RECEIPT frame here, our
317          bad. */
318       stomp_frame_deinit(&frame);
319       return TRUE;
320     }
321 
322   return TRUE;
323 }
324 
325 GString *
create_gstring_from_frame(stomp_frame * frame)326 create_gstring_from_frame(stomp_frame *frame)
327 {
328   GString *data = g_string_new("");
329 
330   g_string_append(data, frame->command);
331   g_string_append_c(data, '\n');
332   g_hash_table_foreach(frame->headers, write_header_into_gstring, data);
333   g_string_append_c(data, '\n');
334   if (frame->body)
335     g_string_append_len(data, frame->body, frame->body_length);
336   g_string_append_c(data, 0);
337   return data;
338 }
339 
340 int
stomp_write(stomp_connection * connection,stomp_frame * frame)341 stomp_write(stomp_connection *connection, stomp_frame *frame)
342 {
343   GString *data;
344 
345   if (!stomp_check_for_frame(connection))
346     return FALSE;
347 
348   data = create_gstring_from_frame(frame);
349   if (!write_gstring_to_socket(connection->socket, data))
350     {
351       msg_error("Write error, partial write");
352       stomp_frame_deinit(frame);
353       g_string_free(data, TRUE);
354       return FALSE;
355     }
356 
357   g_string_free(data, TRUE);
358   stomp_frame_deinit(frame);
359   return TRUE;
360 }
361