1 /**
2  * gwsocket.c -- An interface to send/recv data from/to Web Socket Server
3  *    _______       _______            __        __
4  *   / ____/ |     / / ___/____  _____/ /_____  / /_
5  *  / / __ | | /| / /\__ \/ __ \/ ___/ //_/ _ \/ __/
6  * / /_/ / | |/ |/ /___/ / /_/ / /__/ ,< /  __/ /_
7  * \____/  |__/|__//____/\____/\___/_/|_|\___/\__/
8  *
9  * The MIT License (MIT)
10  * Copyright (c) 2009-2020 Gerardo Orellana <hello @ goaccess.io>
11  *
12  * Permission is hereby granted, free of charge, to any person obtaining a copy
13  * of this software and associated documentation files (the "Software"), to deal
14  * in the Software without restriction, including without limitation the rights
15  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16  * copies of the Software, and to permit persons to whom the Software is
17  * furnished to do so, subject to the following conditions:
18  *
19  * The above copyright notice and this permission notice shall be included in all
20  * copies or substantial portions of the Software.
21  *
22  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28  * SOFTWARE.
29  */
30 
31 #include <stdio.h>
32 #include <signal.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <sys/stat.h>
38 #include <unistd.h>
39 
40 #include "gwsocket.h"
41 
42 #include "commons.h"
43 #include "error.h"
44 #include "goaccess.h"
45 #include "json.h"
46 #include "parser.h"
47 #include "settings.h"
48 #include "websocket.h"
49 #include "xmalloc.h"
50 
51 /* Allocate memory for a new GWSReader instance.
52  *
53  * On success, the newly allocated GWSReader is returned. */
54 GWSReader *
new_gwsreader(void)55 new_gwsreader (void) {
56   GWSReader *reader = xmalloc (sizeof (GWSReader));
57   memset (reader, 0, sizeof *reader);
58 
59   return reader;
60 }
61 
62 /* Allocate memory for a new GWSWriter instance.
63  *
64  * On success, the newly allocated GWSWriter is returned. */
65 GWSWriter *
new_gwswriter(void)66 new_gwswriter (void) {
67   GWSWriter *writer = xmalloc (sizeof (GWSWriter));
68   memset (writer, 0, sizeof *writer);
69 
70   return writer;
71 }
72 
73 /* Write the JSON data to a pipe.
74  *
75  * If unable to write bytes, -1 is returned.
76  * On success, the number of written bytes is returned . */
77 static int
write_holder(int fd,const char * buf,int len)78 write_holder (int fd, const char *buf, int len) {
79   int i, ret = 0;
80 
81   for (i = 0; i < len;) {
82     ret = write (fd, buf + i, len - i);
83     if (ret < 0) {
84       if (errno == EINTR || errno == EAGAIN)
85         continue;
86       return -1;
87     } else {
88       i += ret;
89     }
90   }
91 
92   return i;
93 }
94 
95 /* Clear an incoming FIFO packet and header data. */
96 static void
clear_fifo_packet(GWSReader * gwserver)97 clear_fifo_packet (GWSReader * gwserver) {
98   memset (gwserver->hdr, 0, sizeof (gwserver->hdr));
99   gwserver->hlen = 0;
100 
101   if (gwserver->packet == NULL)
102     return;
103 
104   if (gwserver->packet->data)
105     free (gwserver->packet->data);
106   free (gwserver->packet);
107   gwserver->packet = NULL;
108 }
109 
110 /* Pack the JSON data into a network byte order and writes it to a
111  * pipe.
112  *
113  * On success, 0 is returned . */
114 int
broadcast_holder(int fd,const char * buf,int len)115 broadcast_holder (int fd, const char *buf, int len) {
116   char *p = NULL, *ptr = NULL;
117 
118   p = calloc (sizeof (uint32_t) * 3, sizeof (char));
119 
120   ptr = p;
121   ptr += pack_uint32 (ptr, 0);
122   ptr += pack_uint32 (ptr, 0x01);
123   ptr += pack_uint32 (ptr, len);
124 
125   write_holder (fd, p, sizeof (uint32_t) * 3);
126   write_holder (fd, buf, len);
127   free (p);
128 
129   return 0;
130 }
131 
132 /* Pack the JSON data into a network byte order and write it to a
133  * pipe.
134  *
135  * On success, 0 is returned . */
136 int
send_holder_to_client(int fd,int listener,const char * buf,int len)137 send_holder_to_client (int fd, int listener, const char *buf, int len) {
138   char *p = NULL, *ptr = NULL;
139 
140   p = calloc (sizeof (uint32_t) * 3, sizeof (char));
141 
142   ptr = p;
143   ptr += pack_uint32 (ptr, listener);
144   ptr += pack_uint32 (ptr, 0x01);
145   ptr += pack_uint32 (ptr, len);
146 
147   write_holder (fd, p, sizeof (uint32_t) * 3);
148   write_holder (fd, buf, len);
149   free (p);
150 
151   return 0;
152 }
153 
154 /* Attempt to read data from the named pipe on strict mode.
155  * Note: For now it only reads on new connections, i.e., onopen.
156  *
157  * If there's less data than requested, 0 is returned
158  * If the thread is done, 1 is returned */
159 int
read_fifo(GWSReader * gwsreader,void (* f)(int))160 read_fifo (GWSReader * gwsreader, void (*f) (int)) {
161   WSPacket **pa = &gwsreader->packet;
162   char *ptr;
163   int bytes = 0, readh = 0, need = 0, fd = gwsreader->fd;
164   uint32_t listener = 0, type = 0, size = 0;
165   struct pollfd fds[] = {
166     {.fd = gwsreader->self_pipe[0],.events = POLLIN},
167     {.fd = gwsreader->fd,.events = POLLIN,},
168   };
169 
170   if (poll (fds, sizeof (fds) / sizeof (fds[0]), -1) == -1) {
171     switch (errno) {
172     case EINTR:
173       break;
174     default:
175       FATAL ("Unable to poll: %s.", strerror (errno));
176     }
177   }
178   /* handle self-pipe trick */
179   if (fds[0].revents & POLLIN)
180     return 1;
181   if (!(fds[1].revents & POLLIN)) {
182     LOG (("No file descriptor set on read_message()\n"));
183     return 0;
184   }
185 
186   readh = gwsreader->hlen;      /* read from header so far */
187   need = HDR_SIZE - readh;      /* need to read */
188   if (need > 0) {
189     if ((bytes = ws_read_fifo (fd, gwsreader->hdr, &gwsreader->hlen, readh, need)) < 0)
190       return 0;
191     if (bytes != need)
192       return 0;
193   }
194 
195   /* unpack size, and type */
196   ptr = gwsreader->hdr;
197   ptr += unpack_uint32 (ptr, &listener);
198   ptr += unpack_uint32 (ptr, &type);
199   ptr += unpack_uint32 (ptr, &size);
200 
201   if ((*pa) == NULL) {
202     (*pa) = xcalloc (1, sizeof (WSPacket));
203     (*pa)->type = type;
204     (*pa)->size = size;
205     (*pa)->data = xcalloc (size, sizeof (char));
206   }
207 
208   readh = (*pa)->len;   /* read from payload so far */
209   need = (*pa)->size - readh;   /* need to read */
210   if (need > 0) {
211     if ((bytes = ws_read_fifo (fd, (*pa)->data, &(*pa)->len, readh, need)) < 0)
212       return 0;
213     if (bytes != need)
214       return 0;
215   }
216   clear_fifo_packet (gwsreader);
217   /* fast forward JSON data to the given client */
218   (*f) (listener);
219 
220   return 0;
221 }
222 
223 /* Callback once a new connection is established
224  *
225  * It writes to a named pipe a header containing the socket, the
226  * message type, the payload's length and the actual payload */
227 static int
onopen(WSPipeOut * pipeout,WSClient * client)228 onopen (WSPipeOut * pipeout, WSClient * client) {
229   uint32_t hsize = sizeof (uint32_t) * 3;
230   char *hdr = calloc (hsize, sizeof (char));
231   char *ptr = hdr;
232 
233   ptr += pack_uint32 (ptr, client->listener);
234   ptr += pack_uint32 (ptr, WS_OPCODE_TEXT);
235   ptr += pack_uint32 (ptr, INET6_ADDRSTRLEN);
236 
237   ws_write_fifo (pipeout, hdr, hsize);
238   ws_write_fifo (pipeout, client->remote_ip, INET6_ADDRSTRLEN);
239   free (hdr);
240 
241   return 0;
242 }
243 
244 /* Done parsing, clear out line and set status message. */
245 void
set_ready_state(void)246 set_ready_state (void) {
247   fprintf (stderr, "\33[2K\r");
248   fprintf (stderr, "%s\n", INFO_WS_READY_FOR_CONN);
249 }
250 
251 /* Open the named pipe where the websocket server writes to.
252  *
253  * If unable to open, -1 is returned.
254  * On success, return the new file descriptor is returned . */
255 int
open_fifoout(void)256 open_fifoout (void) {
257   const char *fifo = conf.fifo_out;
258   int fdfifo;
259 
260   /* open fifo for reading before writing */
261   ws_setfifo (fifo);
262   if ((fdfifo = open (fifo, O_RDWR | O_NONBLOCK)) == -1)
263     return -1;
264 
265   return fdfifo;
266 }
267 
268 /* Open the named pipe where the websocket server reads from.
269  *
270  * If unable to open, -1 is returned.
271  * On success, return the new file descriptor is returned . */
272 int
open_fifoin(void)273 open_fifoin (void) {
274   const char *fifo = conf.fifo_in;
275   int fdfifo;
276 
277   if ((fdfifo = open (fifo, O_WRONLY | O_NONBLOCK)) == -1)
278     return -1;
279 
280   return fdfifo;
281 }
282 
283 /* Set the self-pipe trick to handle poll(2). */
284 void
set_self_pipe(int * self_pipe)285 set_self_pipe (int *self_pipe) {
286   /* Initialize self pipe. */
287   if (pipe (self_pipe) == -1)
288     FATAL ("Unable to create pipe: %s.", strerror (errno));
289 
290   /* make the read and write pipe non-blocking */
291   set_nonblocking (self_pipe[0]);
292   set_nonblocking (self_pipe[1]);
293 }
294 
295 /* Close the WebSocket server and clean up. */
296 void
stop_ws_server(GWSWriter * gwswriter,GWSReader * gwsreader)297 stop_ws_server (GWSWriter * gwswriter, GWSReader * gwsreader) {
298   pthread_t writer, reader;
299   WSServer *server = NULL;
300 
301   if (!gwsreader || !gwswriter)
302     return;
303   if (!(server = gwswriter->server))
304     return;
305 
306   pthread_mutex_lock (&gwsreader->mutex);
307   if ((write (gwsreader->self_pipe[1], "x", 1)) == -1 && errno != EAGAIN)
308     LOG (("Unable to write to self pipe on pipeout.\n"));
309   pthread_mutex_unlock (&gwsreader->mutex);
310 
311   pthread_mutex_lock (&gwswriter->mutex);
312   /* if it fails to write, force stop */
313   if ((write (server->self_pipe[1], "x", 1)) == -1 && errno != EAGAIN)
314     ws_stop (server);
315   pthread_mutex_unlock (&gwswriter->mutex);
316 
317   reader = gwsreader->thread;
318   if (pthread_join (reader, NULL) != 0)
319     LOG (("Unable to join thread gwsreader: %s\n", strerror (errno)));
320 
321   writer = gwswriter->thread;
322   if (pthread_join (writer, NULL) != 0)
323     LOG (("Unable to join thread gwswriter: %s\n", strerror (errno)));
324 }
325 
326 /* Start the WebSocket server and initialize default options. */
327 static void
start_server(void * ptr_data)328 start_server (void *ptr_data) {
329   GWSWriter *writer = (GWSWriter *) ptr_data;
330 
331   writer->server->onopen = onopen;
332   set_self_pipe (writer->server->self_pipe);
333 
334   /* poll(2) will block in here */
335   ws_start (writer->server);
336   fprintf (stderr, "Stopping WebSocket server...\n");
337   ws_stop (writer->server);
338 }
339 
340 /* Read and set the WebSocket config options. */
341 static void
set_ws_opts(void)342 set_ws_opts (void) {
343   ws_set_config_strict (1);
344   if (conf.addr)
345     ws_set_config_host (conf.addr);
346   if (conf.fifo_in)
347     ws_set_config_pipein (conf.fifo_in);
348   if (conf.fifo_out)
349     ws_set_config_pipeout (conf.fifo_out);
350   if (conf.origin)
351     ws_set_config_origin (conf.origin);
352   if (conf.port)
353     ws_set_config_port (conf.port);
354   if (conf.sslcert)
355     ws_set_config_sslcert (conf.sslcert);
356   if (conf.sslkey)
357     ws_set_config_sslkey (conf.sslkey);
358 }
359 
360 /* Setup and start the WebSocket threads. */
361 int
setup_ws_server(GWSWriter * gwswriter,GWSReader * gwsreader)362 setup_ws_server (GWSWriter * gwswriter, GWSReader * gwsreader) {
363   int id;
364   pthread_t *thread;
365 
366   if (pthread_mutex_init (&gwswriter->mutex, NULL))
367     FATAL ("Failed init gwswriter mutex");
368   if (pthread_mutex_init (&gwsreader->mutex, NULL))
369     FATAL ("Failed init gwsreader mutex");
370 
371   /* send WS data thread */
372   thread = &gwswriter->thread;
373 
374   /* pre-init the websocket server, to ensure the FIFOs are created */
375   if ((gwswriter->server = ws_init ("0.0.0.0", "7890", set_ws_opts)) == NULL)
376     FATAL ("Failed init websocket");
377 
378   id = pthread_create (&(*thread), NULL, (void *) &start_server, gwswriter);
379   if (id)
380     FATAL ("Return code from pthread_create(): %d", id);
381 
382   /* read WS data thread */
383   thread = &gwsreader->thread;
384   id = pthread_create (&(*thread), NULL, (void *) &read_client, gwsreader);
385   if (id)
386     FATAL ("Return code from pthread_create(): %d", id);
387 
388   return 0;
389 }
390