1 /**
2 * gwsocket.c -- An interface to send/recv data from/to Web Socket Server
3 * _______ _______ __ __
4 * / ____/ | / / ___/____ _____/ /_____ / /_
5 * / / __ | | /| / /\__ \/ __ \/ ___/ //_/ _ \/ __/
6 * / /_/ / | |/ |/ /___/ / /_/ / /__/ ,< / __/ /_
7 * \____/ |__/|__//____/\____/\___/_/|_|\___/\__/
8 *
9 * The MIT License (MIT)
10 * Copyright (c) 2009-2020 Gerardo Orellana <hello @ goaccess.io>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in all
20 * copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31 #include <stdio.h>
32 #include <signal.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <sys/stat.h>
38 #include <unistd.h>
39
40 #include "gwsocket.h"
41
42 #include "commons.h"
43 #include "error.h"
44 #include "goaccess.h"
45 #include "json.h"
46 #include "parser.h"
47 #include "settings.h"
48 #include "websocket.h"
49 #include "xmalloc.h"
50
51 /* Allocate memory for a new GWSReader instance.
52 *
53 * On success, the newly allocated GWSReader is returned. */
54 GWSReader *
new_gwsreader(void)55 new_gwsreader (void) {
56 GWSReader *reader = xmalloc (sizeof (GWSReader));
57 memset (reader, 0, sizeof *reader);
58
59 return reader;
60 }
61
62 /* Allocate memory for a new GWSWriter instance.
63 *
64 * On success, the newly allocated GWSWriter is returned. */
65 GWSWriter *
new_gwswriter(void)66 new_gwswriter (void) {
67 GWSWriter *writer = xmalloc (sizeof (GWSWriter));
68 memset (writer, 0, sizeof *writer);
69
70 return writer;
71 }
72
73 /* Write the JSON data to a pipe.
74 *
75 * If unable to write bytes, -1 is returned.
76 * On success, the number of written bytes is returned . */
77 static int
write_holder(int fd,const char * buf,int len)78 write_holder (int fd, const char *buf, int len) {
79 int i, ret = 0;
80
81 for (i = 0; i < len;) {
82 ret = write (fd, buf + i, len - i);
83 if (ret < 0) {
84 if (errno == EINTR || errno == EAGAIN)
85 continue;
86 return -1;
87 } else {
88 i += ret;
89 }
90 }
91
92 return i;
93 }
94
95 /* Clear an incoming FIFO packet and header data. */
96 static void
clear_fifo_packet(GWSReader * gwserver)97 clear_fifo_packet (GWSReader * gwserver) {
98 memset (gwserver->hdr, 0, sizeof (gwserver->hdr));
99 gwserver->hlen = 0;
100
101 if (gwserver->packet == NULL)
102 return;
103
104 if (gwserver->packet->data)
105 free (gwserver->packet->data);
106 free (gwserver->packet);
107 gwserver->packet = NULL;
108 }
109
110 /* Pack the JSON data into a network byte order and writes it to a
111 * pipe.
112 *
113 * On success, 0 is returned . */
114 int
broadcast_holder(int fd,const char * buf,int len)115 broadcast_holder (int fd, const char *buf, int len) {
116 char *p = NULL, *ptr = NULL;
117
118 p = calloc (sizeof (uint32_t) * 3, sizeof (char));
119
120 ptr = p;
121 ptr += pack_uint32 (ptr, 0);
122 ptr += pack_uint32 (ptr, 0x01);
123 ptr += pack_uint32 (ptr, len);
124
125 write_holder (fd, p, sizeof (uint32_t) * 3);
126 write_holder (fd, buf, len);
127 free (p);
128
129 return 0;
130 }
131
132 /* Pack the JSON data into a network byte order and write it to a
133 * pipe.
134 *
135 * On success, 0 is returned . */
136 int
send_holder_to_client(int fd,int listener,const char * buf,int len)137 send_holder_to_client (int fd, int listener, const char *buf, int len) {
138 char *p = NULL, *ptr = NULL;
139
140 p = calloc (sizeof (uint32_t) * 3, sizeof (char));
141
142 ptr = p;
143 ptr += pack_uint32 (ptr, listener);
144 ptr += pack_uint32 (ptr, 0x01);
145 ptr += pack_uint32 (ptr, len);
146
147 write_holder (fd, p, sizeof (uint32_t) * 3);
148 write_holder (fd, buf, len);
149 free (p);
150
151 return 0;
152 }
153
154 /* Attempt to read data from the named pipe on strict mode.
155 * Note: For now it only reads on new connections, i.e., onopen.
156 *
157 * If there's less data than requested, 0 is returned
158 * If the thread is done, 1 is returned */
159 int
read_fifo(GWSReader * gwsreader,void (* f)(int))160 read_fifo (GWSReader * gwsreader, void (*f) (int)) {
161 WSPacket **pa = &gwsreader->packet;
162 char *ptr;
163 int bytes = 0, readh = 0, need = 0, fd = gwsreader->fd;
164 uint32_t listener = 0, type = 0, size = 0;
165 struct pollfd fds[] = {
166 {.fd = gwsreader->self_pipe[0],.events = POLLIN},
167 {.fd = gwsreader->fd,.events = POLLIN,},
168 };
169
170 if (poll (fds, sizeof (fds) / sizeof (fds[0]), -1) == -1) {
171 switch (errno) {
172 case EINTR:
173 break;
174 default:
175 FATAL ("Unable to poll: %s.", strerror (errno));
176 }
177 }
178 /* handle self-pipe trick */
179 if (fds[0].revents & POLLIN)
180 return 1;
181 if (!(fds[1].revents & POLLIN)) {
182 LOG (("No file descriptor set on read_message()\n"));
183 return 0;
184 }
185
186 readh = gwsreader->hlen; /* read from header so far */
187 need = HDR_SIZE - readh; /* need to read */
188 if (need > 0) {
189 if ((bytes = ws_read_fifo (fd, gwsreader->hdr, &gwsreader->hlen, readh, need)) < 0)
190 return 0;
191 if (bytes != need)
192 return 0;
193 }
194
195 /* unpack size, and type */
196 ptr = gwsreader->hdr;
197 ptr += unpack_uint32 (ptr, &listener);
198 ptr += unpack_uint32 (ptr, &type);
199 ptr += unpack_uint32 (ptr, &size);
200
201 if ((*pa) == NULL) {
202 (*pa) = xcalloc (1, sizeof (WSPacket));
203 (*pa)->type = type;
204 (*pa)->size = size;
205 (*pa)->data = xcalloc (size, sizeof (char));
206 }
207
208 readh = (*pa)->len; /* read from payload so far */
209 need = (*pa)->size - readh; /* need to read */
210 if (need > 0) {
211 if ((bytes = ws_read_fifo (fd, (*pa)->data, &(*pa)->len, readh, need)) < 0)
212 return 0;
213 if (bytes != need)
214 return 0;
215 }
216 clear_fifo_packet (gwsreader);
217 /* fast forward JSON data to the given client */
218 (*f) (listener);
219
220 return 0;
221 }
222
223 /* Callback once a new connection is established
224 *
225 * It writes to a named pipe a header containing the socket, the
226 * message type, the payload's length and the actual payload */
227 static int
onopen(WSPipeOut * pipeout,WSClient * client)228 onopen (WSPipeOut * pipeout, WSClient * client) {
229 uint32_t hsize = sizeof (uint32_t) * 3;
230 char *hdr = calloc (hsize, sizeof (char));
231 char *ptr = hdr;
232
233 ptr += pack_uint32 (ptr, client->listener);
234 ptr += pack_uint32 (ptr, WS_OPCODE_TEXT);
235 ptr += pack_uint32 (ptr, INET6_ADDRSTRLEN);
236
237 ws_write_fifo (pipeout, hdr, hsize);
238 ws_write_fifo (pipeout, client->remote_ip, INET6_ADDRSTRLEN);
239 free (hdr);
240
241 return 0;
242 }
243
244 /* Done parsing, clear out line and set status message. */
245 void
set_ready_state(void)246 set_ready_state (void) {
247 fprintf (stderr, "\33[2K\r");
248 fprintf (stderr, "%s\n", INFO_WS_READY_FOR_CONN);
249 }
250
251 /* Open the named pipe where the websocket server writes to.
252 *
253 * If unable to open, -1 is returned.
254 * On success, return the new file descriptor is returned . */
255 int
open_fifoout(void)256 open_fifoout (void) {
257 const char *fifo = conf.fifo_out;
258 int fdfifo;
259
260 /* open fifo for reading before writing */
261 ws_setfifo (fifo);
262 if ((fdfifo = open (fifo, O_RDWR | O_NONBLOCK)) == -1)
263 return -1;
264
265 return fdfifo;
266 }
267
268 /* Open the named pipe where the websocket server reads from.
269 *
270 * If unable to open, -1 is returned.
271 * On success, return the new file descriptor is returned . */
272 int
open_fifoin(void)273 open_fifoin (void) {
274 const char *fifo = conf.fifo_in;
275 int fdfifo;
276
277 if ((fdfifo = open (fifo, O_WRONLY | O_NONBLOCK)) == -1)
278 return -1;
279
280 return fdfifo;
281 }
282
283 /* Set the self-pipe trick to handle poll(2). */
284 void
set_self_pipe(int * self_pipe)285 set_self_pipe (int *self_pipe) {
286 /* Initialize self pipe. */
287 if (pipe (self_pipe) == -1)
288 FATAL ("Unable to create pipe: %s.", strerror (errno));
289
290 /* make the read and write pipe non-blocking */
291 set_nonblocking (self_pipe[0]);
292 set_nonblocking (self_pipe[1]);
293 }
294
295 /* Close the WebSocket server and clean up. */
296 void
stop_ws_server(GWSWriter * gwswriter,GWSReader * gwsreader)297 stop_ws_server (GWSWriter * gwswriter, GWSReader * gwsreader) {
298 pthread_t writer, reader;
299 WSServer *server = NULL;
300
301 if (!gwsreader || !gwswriter)
302 return;
303 if (!(server = gwswriter->server))
304 return;
305
306 pthread_mutex_lock (&gwsreader->mutex);
307 if ((write (gwsreader->self_pipe[1], "x", 1)) == -1 && errno != EAGAIN)
308 LOG (("Unable to write to self pipe on pipeout.\n"));
309 pthread_mutex_unlock (&gwsreader->mutex);
310
311 pthread_mutex_lock (&gwswriter->mutex);
312 /* if it fails to write, force stop */
313 if ((write (server->self_pipe[1], "x", 1)) == -1 && errno != EAGAIN)
314 ws_stop (server);
315 pthread_mutex_unlock (&gwswriter->mutex);
316
317 reader = gwsreader->thread;
318 if (pthread_join (reader, NULL) != 0)
319 LOG (("Unable to join thread gwsreader: %s\n", strerror (errno)));
320
321 writer = gwswriter->thread;
322 if (pthread_join (writer, NULL) != 0)
323 LOG (("Unable to join thread gwswriter: %s\n", strerror (errno)));
324 }
325
326 /* Start the WebSocket server and initialize default options. */
327 static void
start_server(void * ptr_data)328 start_server (void *ptr_data) {
329 GWSWriter *writer = (GWSWriter *) ptr_data;
330
331 writer->server->onopen = onopen;
332 set_self_pipe (writer->server->self_pipe);
333
334 /* poll(2) will block in here */
335 ws_start (writer->server);
336 fprintf (stderr, "Stopping WebSocket server...\n");
337 ws_stop (writer->server);
338 }
339
340 /* Read and set the WebSocket config options. */
341 static void
set_ws_opts(void)342 set_ws_opts (void) {
343 ws_set_config_strict (1);
344 if (conf.addr)
345 ws_set_config_host (conf.addr);
346 if (conf.fifo_in)
347 ws_set_config_pipein (conf.fifo_in);
348 if (conf.fifo_out)
349 ws_set_config_pipeout (conf.fifo_out);
350 if (conf.origin)
351 ws_set_config_origin (conf.origin);
352 if (conf.port)
353 ws_set_config_port (conf.port);
354 if (conf.sslcert)
355 ws_set_config_sslcert (conf.sslcert);
356 if (conf.sslkey)
357 ws_set_config_sslkey (conf.sslkey);
358 }
359
360 /* Setup and start the WebSocket threads. */
361 int
setup_ws_server(GWSWriter * gwswriter,GWSReader * gwsreader)362 setup_ws_server (GWSWriter * gwswriter, GWSReader * gwsreader) {
363 int id;
364 pthread_t *thread;
365
366 if (pthread_mutex_init (&gwswriter->mutex, NULL))
367 FATAL ("Failed init gwswriter mutex");
368 if (pthread_mutex_init (&gwsreader->mutex, NULL))
369 FATAL ("Failed init gwsreader mutex");
370
371 /* send WS data thread */
372 thread = &gwswriter->thread;
373
374 /* pre-init the websocket server, to ensure the FIFOs are created */
375 if ((gwswriter->server = ws_init ("0.0.0.0", "7890", set_ws_opts)) == NULL)
376 FATAL ("Failed init websocket");
377
378 id = pthread_create (&(*thread), NULL, (void *) &start_server, gwswriter);
379 if (id)
380 FATAL ("Return code from pthread_create(): %d", id);
381
382 /* read WS data thread */
383 thread = &gwsreader->thread;
384 id = pthread_create (&(*thread), NULL, (void *) &read_client, gwsreader);
385 if (id)
386 FATAL ("Return code from pthread_create(): %d", id);
387
388 return 0;
389 }
390