1 /*! \file janus_wsevh.c
2 * \author Lorenzo Miniero <lorenzo@meetecho.com>
3 * \copyright GNU General Public License v3
4 * \brief Janus WebSockets EventHandler plugin
5 * \details This is a trivial WebSockets event handler plugin for Janus
6 *
7 * \ingroup eventhandlers
8 * \ref eventhandlers
9 */
10
11 #include "eventhandler.h"
12
13 #include <math.h>
14
15 #include <libwebsockets.h>
16
17 #include "../debug.h"
18 #include "../config.h"
19 #include "../mutex.h"
20 #include "../utils.h"
21 #include "../events.h"
22
23
24 /* Plugin information */
25 #define JANUS_WSEVH_VERSION 1
26 #define JANUS_WSEVH_VERSION_STRING "0.0.1"
27 #define JANUS_WSEVH_DESCRIPTION "This is a trivial WebSockets event handler plugin for Janus."
28 #define JANUS_WSEVH_NAME "JANUS WebSocketsEventHandler plugin"
29 #define JANUS_WSEVH_AUTHOR "Meetecho s.r.l."
30 #define JANUS_WSEVH_PACKAGE "janus.eventhandler.wsevh"
31
32 /* Plugin methods */
33 janus_eventhandler *create(void);
34 int janus_wsevh_init(const char *config_path);
35 void janus_wsevh_destroy(void);
36 int janus_wsevh_get_api_compatibility(void);
37 int janus_wsevh_get_version(void);
38 const char *janus_wsevh_get_version_string(void);
39 const char *janus_wsevh_get_description(void);
40 const char *janus_wsevh_get_name(void);
41 const char *janus_wsevh_get_author(void);
42 const char *janus_wsevh_get_package(void);
43 void janus_wsevh_incoming_event(json_t *event);
44 json_t *janus_wsevh_handle_request(json_t *request);
45
46 /* Event handler setup */
47 static janus_eventhandler janus_wsevh =
48 JANUS_EVENTHANDLER_INIT (
49 .init = janus_wsevh_init,
50 .destroy = janus_wsevh_destroy,
51
52 .get_api_compatibility = janus_wsevh_get_api_compatibility,
53 .get_version = janus_wsevh_get_version,
54 .get_version_string = janus_wsevh_get_version_string,
55 .get_description = janus_wsevh_get_description,
56 .get_name = janus_wsevh_get_name,
57 .get_author = janus_wsevh_get_author,
58 .get_package = janus_wsevh_get_package,
59
60 .incoming_event = janus_wsevh_incoming_event,
61 .handle_request = janus_wsevh_handle_request,
62
63 .events_mask = JANUS_EVENT_TYPE_NONE
64 );
65
66 /* Plugin creator */
create(void)67 janus_eventhandler *create(void) {
68 JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_WSEVH_NAME);
69 return &janus_wsevh;
70 }
71
72 /* Useful stuff */
73 static volatile gint initialized = 0, stopping = 0;
74 static GThread *ws_thread, *handler_thread;
75 static void *janus_wsevh_thread(void *data);
76 static void *janus_wsevh_handler(void *data);
77
78 /* Connection related helper methods */
79 static void janus_wsevh_schedule_connect_attempt(void);
80 static void janus_wsevh_calculate_reconnect_delay_on_fail(void);
81 /* lws_sorted_usec_list_t is defined starting with lws 3.2 */
82 #if !(((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4))
83 #define lws_sorted_usec_list_t void
84 #endif
85 static void janus_wsevh_connect_attempt(lws_sorted_usec_list_t *sul);
86
87 /* Queue of events to handle */
88 static GAsyncQueue *events = NULL;
89 static gboolean group_events = TRUE;
90 static json_t exit_event;
janus_wsevh_event_free(json_t * event)91 static void janus_wsevh_event_free(json_t *event) {
92 if(!event || event == &exit_event)
93 return;
94 json_decref(event);
95 }
96
97 /* JSON serialization options */
98 static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
99
100
101 /* Parameter validation (for tweaking via Admin API) */
102 static struct janus_json_parameter request_parameters[] = {
103 {"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
104 };
105 static struct janus_json_parameter tweak_parameters[] = {
106 {"events", JSON_STRING, 0},
107 {"grouping", JANUS_JSON_BOOL, 0}
108 };
109 /* Error codes (for the tweaking via Admin API */
110 #define JANUS_WSEVH_ERROR_INVALID_REQUEST 411
111 #define JANUS_WSEVH_ERROR_MISSING_ELEMENT 412
112 #define JANUS_WSEVH_ERROR_INVALID_ELEMENT 413
113 #define JANUS_WSEVH_ERROR_UNKNOWN_ERROR 499
114
115 /* Logging */
116 static int wsevh_log_level = 0;
janus_wsevh_get_level_str(int level)117 static const char *janus_wsevh_get_level_str(int level) {
118 switch(level) {
119 case LLL_ERR:
120 return "ERR";
121 case LLL_WARN:
122 return "WARN";
123 case LLL_NOTICE:
124 return "NOTICE";
125 case LLL_INFO:
126 return "INFO";
127 case LLL_DEBUG:
128 return "DEBUG";
129 case LLL_PARSER:
130 return "PARSER";
131 case LLL_HEADER:
132 return "HEADER";
133 case LLL_EXT:
134 return "EXT";
135 case LLL_CLIENT:
136 return "CLIENT";
137 case LLL_LATENCY:
138 return "LATENCY";
139 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
140 case LLL_USER:
141 return "USER";
142 #endif
143 case LLL_COUNT:
144 return "COUNT";
145 default:
146 return NULL;
147 }
148 }
janus_wsevh_log_emit_function(int level,const char * line)149 static void janus_wsevh_log_emit_function(int level, const char *line) {
150 /* FIXME Do we want to use different Janus debug levels according to the level here? */
151 JANUS_LOG(LOG_INFO, "[libwebsockets][wsevh][%s] %s", janus_wsevh_get_level_str(level), line);
152 }
153
154
155 /* WebSockets properties */
156 static char *backend = NULL;
157 static const char *protocol = NULL, *address = NULL;
158 static char path[256];
159 static int port = 0;
160 static struct lws_context *context = NULL;
161 #if ((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4)
162 static lws_sorted_usec_list_t sul_stagger = { 0 };
163 #endif
164 static gint64 disconnected = 0;
165 static gboolean reconnect = FALSE;
166 static int reconnect_delay = 0;
167 #define JANUS_WSEVH_MAX_RETRY_SECS 8
168
169 typedef struct janus_wsevh_client {
170 struct lws *wsi; /* The libwebsockets client instance */
171 unsigned char *buffer; /* Buffer containing the message to send */
172 int buflen; /* Length of the buffer (may be resized after re-allocations) */
173 int bufpending; /* Data an interrupted previous write couldn't send */
174 int bufoffset; /* Offset from where the interrupted previous write should resume */
175 janus_mutex mutex; /* Mutex to lock/unlock this instance */
176 } janus_wsevh_client;
177 static janus_wsevh_client *ws_client = NULL;
178 static struct lws *wsi = NULL;
179 static GAsyncQueue *messages = NULL; /* Queue of outgoing messages to push */
180 static janus_mutex writable_mutex;
181
182 static int janus_wsevh_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
183 static struct lws_protocols protocols[] = {
184 { "janus-event-handlers", janus_wsevh_callback, sizeof(janus_wsevh_client), 0 }, /* Subprotocol will be configurable */
185 { NULL, NULL, 0, 0 }
186 };
187 static const struct lws_extension exts[] = {
188 #ifndef LWS_WITHOUT_EXTENSIONS
189 { "permessage-deflate", lws_extension_callback_pm_deflate, "permessage-deflate; client_max_window_bits" },
190 { "deflate-frame", lws_extension_callback_pm_deflate, "deflate_frame" },
191 #endif
192 { NULL, NULL, NULL }
193 };
194
195 /* WebSockets error management */
196 #define CASE_STR(name) case name: return #name
janus_wsevh_reason_string(enum lws_callback_reasons reason)197 static const char *janus_wsevh_reason_string(enum lws_callback_reasons reason) {
198 switch(reason) {
199 CASE_STR(LWS_CALLBACK_ESTABLISHED);
200 CASE_STR(LWS_CALLBACK_CLIENT_CONNECTION_ERROR);
201 CASE_STR(LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH);
202 CASE_STR(LWS_CALLBACK_CLIENT_ESTABLISHED);
203 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
204 CASE_STR(LWS_CALLBACK_CLIENT_CLOSED);
205 #endif
206 CASE_STR(LWS_CALLBACK_CLOSED);
207 CASE_STR(LWS_CALLBACK_CLOSED_HTTP);
208 CASE_STR(LWS_CALLBACK_RECEIVE);
209 CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE);
210 CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE_PONG);
211 CASE_STR(LWS_CALLBACK_CLIENT_WRITEABLE);
212 CASE_STR(LWS_CALLBACK_SERVER_WRITEABLE);
213 CASE_STR(LWS_CALLBACK_HTTP);
214 CASE_STR(LWS_CALLBACK_HTTP_BODY);
215 CASE_STR(LWS_CALLBACK_HTTP_BODY_COMPLETION);
216 CASE_STR(LWS_CALLBACK_HTTP_FILE_COMPLETION);
217 CASE_STR(LWS_CALLBACK_HTTP_WRITEABLE);
218 CASE_STR(LWS_CALLBACK_FILTER_NETWORK_CONNECTION);
219 CASE_STR(LWS_CALLBACK_FILTER_HTTP_CONNECTION);
220 CASE_STR(LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED);
221 CASE_STR(LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION);
222 CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS);
223 CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS);
224 CASE_STR(LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION);
225 CASE_STR(LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER);
226 CASE_STR(LWS_CALLBACK_CONFIRM_EXTENSION_OKAY);
227 CASE_STR(LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED);
228 CASE_STR(LWS_CALLBACK_PROTOCOL_INIT);
229 CASE_STR(LWS_CALLBACK_PROTOCOL_DESTROY);
230 CASE_STR(LWS_CALLBACK_WSI_CREATE);
231 CASE_STR(LWS_CALLBACK_WSI_DESTROY);
232 CASE_STR(LWS_CALLBACK_GET_THREAD_ID);
233 CASE_STR(LWS_CALLBACK_ADD_POLL_FD);
234 CASE_STR(LWS_CALLBACK_DEL_POLL_FD);
235 CASE_STR(LWS_CALLBACK_CHANGE_MODE_POLL_FD);
236 CASE_STR(LWS_CALLBACK_LOCK_POLL);
237 CASE_STR(LWS_CALLBACK_UNLOCK_POLL);
238 CASE_STR(LWS_CALLBACK_OPENSSL_CONTEXT_REQUIRES_PRIVATE_KEY);
239 CASE_STR(LWS_CALLBACK_USER);
240 default:
241 break;
242 }
243 return NULL;
244 }
245
246
247 /* Plugin implementation */
janus_wsevh_init(const char * config_path)248 int janus_wsevh_init(const char *config_path) {
249 gboolean success = TRUE;
250 if(g_atomic_int_get(&stopping)) {
251 /* Still stopping from before */
252 return -1;
253 }
254 if(config_path == NULL) {
255 /* Invalid arguments */
256 return -1;
257 }
258 /* Read configuration */
259 char filename[255];
260 g_snprintf(filename, 255, "%s/%s.jcfg", config_path, JANUS_WSEVH_PACKAGE);
261 JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
262 janus_config *config = janus_config_parse(filename);
263 if(config == NULL) {
264 JANUS_LOG(LOG_WARN, "Couldn't find .jcfg configuration file (%s), trying .cfg\n", JANUS_WSEVH_PACKAGE);
265 g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_WSEVH_PACKAGE);
266 JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
267 config = janus_config_parse(filename);
268 }
269 if(config != NULL)
270 janus_config_print(config);
271 janus_config_category *config_general = janus_config_get_create(config, NULL, janus_config_type_category, "general");
272
273 /* Setup the event handler, if required */
274 janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "enabled");
275 if(!item || !item->value || !janus_is_true(item->value)) {
276 JANUS_LOG(LOG_WARN, "WebSockets event handler disabled\n");
277 goto error;
278 }
279
280 item = janus_config_get(config, config_general, janus_config_type_item, "json");
281 if(item && item->value) {
282 /* Check how we need to format/serialize the JSON output */
283 if(!strcasecmp(item->value, "indented")) {
284 /* Default: indented, we use three spaces for that */
285 json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
286 } else if(!strcasecmp(item->value, "plain")) {
287 /* Not indented and no new lines, but still readable */
288 json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
289 } else if(!strcasecmp(item->value, "compact")) {
290 /* Compact, so no spaces between separators */
291 json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
292 } else {
293 JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
294 json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
295 }
296 }
297
298 item = janus_config_get(config, config_general, janus_config_type_item, "ws_logging");
299 if(item && item->value) {
300 /* libwebsockets uses a mask to set log levels, as documented here:
301 * https://libwebsockets.org/lws-api-doc-master/html/group__log.html */
302 if(strstr(item->value, "none")) {
303 /* Disable libwebsockets logging completely (the default) */
304 } else if(strstr(item->value, "all")) {
305 /* Enable all libwebsockets logging */
306 wsevh_log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO |
307 LLL_DEBUG | LLL_PARSER | LLL_HEADER | LLL_EXT |
308 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
309 LLL_CLIENT | LLL_LATENCY | LLL_USER | LLL_COUNT;
310 #else
311 LLL_CLIENT | LLL_LATENCY | LLL_COUNT;
312 #endif
313 } else {
314 /* Only enable some of the properties */
315 if(strstr(item->value, "err"))
316 wsevh_log_level |= LLL_ERR;
317 if(strstr(item->value, "warn"))
318 wsevh_log_level |= LLL_WARN;
319 if(strstr(item->value, "notice"))
320 wsevh_log_level |= LLL_NOTICE;
321 if(strstr(item->value, "info"))
322 wsevh_log_level |= LLL_INFO;
323 if(strstr(item->value, "debug"))
324 wsevh_log_level |= LLL_DEBUG;
325 if(strstr(item->value, "parser"))
326 wsevh_log_level |= LLL_PARSER;
327 if(strstr(item->value, "header"))
328 wsevh_log_level |= LLL_HEADER;
329 if(strstr(item->value, "ext"))
330 wsevh_log_level |= LLL_EXT;
331 if(strstr(item->value, "client"))
332 wsevh_log_level |= LLL_CLIENT;
333 if(strstr(item->value, "latency"))
334 wsevh_log_level |= LLL_LATENCY;
335 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
336 if(strstr(item->value, "user"))
337 wsevh_log_level |= LLL_USER;
338 #endif
339 if(strstr(item->value, "count"))
340 wsevh_log_level |= LLL_COUNT;
341 }
342 }
343 if(wsevh_log_level > 0)
344 JANUS_LOG(LOG_INFO, "WebSockets event handler libwebsockets logging: %d\n", wsevh_log_level);
345 lws_set_log_level(wsevh_log_level, janus_wsevh_log_emit_function);
346
347 /* Which events should we subscribe to? */
348 item = janus_config_get(config, config_general, janus_config_type_item, "events");
349 if(item && item->value)
350 janus_events_edit_events_mask(item->value, &janus_wsevh.events_mask);
351
352 /* Is grouping of events ok? */
353 item = janus_config_get(config, config_general, janus_config_type_item, "grouping");
354 if(item && item->value)
355 group_events = janus_is_true(item->value);
356
357 /* Handle the rest of the configuration, starting from the server details */
358 item = janus_config_get(config, config_general, janus_config_type_item, "backend");
359 if(item && item->value)
360 backend = g_strdup(item->value);
361 if(backend == NULL) {
362 JANUS_LOG(LOG_FATAL, "Missing WebSockets backend\n");
363 goto error;
364 }
365 const char *p = NULL;
366 if(lws_parse_uri(backend, &protocol, &address, &port, &p)) {
367 JANUS_LOG(LOG_FATAL, "Error parsing address\n");
368 goto error;
369 }
370 if((strcasecmp(protocol, "ws") && strcasecmp(protocol, "wss")) || !strlen(address)) {
371 JANUS_LOG(LOG_FATAL, "Invalid address (only ws:// and wss:// addresses are supported)\n");
372 JANUS_LOG(LOG_FATAL, " -- Protocol: %s\n", protocol);
373 JANUS_LOG(LOG_FATAL, " -- Address: %s\n", address);
374 JANUS_LOG(LOG_FATAL, " -- Path: %s\n", p);
375 goto error;
376 }
377 path[0] = '/';
378 if(strlen(p) > 1)
379 g_strlcpy(path + 1, p, sizeof(path)-2);
380 /* Before connecting, let's check if the server expects a subprotocol */
381 item = janus_config_get(config, config_general, janus_config_type_item, "subprotocol");
382 if(item && item->value)
383 protocols[0].name = g_strdup(item->value);
384
385 /* Connect */
386 gboolean secure = !strcasecmp(protocol, "wss");
387 struct lws_context_creation_info info = { 0 };
388 info.port = CONTEXT_PORT_NO_LISTEN;
389 info.protocols = protocols;
390 info.gid = -1;
391 info.uid = -1;
392 #if ((LWS_LIBRARY_VERSION_MAJOR == 4 && LWS_LIBRARY_VERSION_MINOR >= 1) || LWS_LIBRARY_VERSION_MAJOR >= 5)
393 info.connect_timeout_secs = 5;
394 #endif
395 if(secure)
396 info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
397 context = lws_create_context(&info);
398 if(context == NULL) {
399 JANUS_LOG(LOG_FATAL, "Creating libwebsocket context failed\n");
400 goto error;
401 }
402 janus_wsevh_connect_attempt(NULL);
403 if(wsi == NULL) {
404 JANUS_LOG(LOG_FATAL, "Error initializing WebSocket connection\n");
405 goto error;
406 }
407 janus_mutex_init(&writable_mutex);
408
409 /* Initialize the events queue */
410 events = g_async_queue_new_full((GDestroyNotify) janus_wsevh_event_free);
411 messages = g_async_queue_new();
412 g_atomic_int_set(&initialized, 1);
413
414 /* Start a thread to handle the WebSockets event loop */
415 GError *error = NULL;
416 ws_thread = g_thread_try_new("janus wsevh client", janus_wsevh_thread, NULL, &error);
417 if(error != NULL) {
418 g_atomic_int_set(&initialized, 0);
419 JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the WebSocketsEventHandler client thread...\n",
420 error->code, error->message ? error->message : "??");
421 g_error_free(error);
422 goto error;
423 }
424 /* Start another thread to handle incoming events */
425 error = NULL;
426 handler_thread = g_thread_try_new("janus wsevh handler", janus_wsevh_handler, NULL, &error);
427 if(error != NULL) {
428 g_atomic_int_set(&initialized, 0);
429 JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the WebSocketsEventHandler handler thread...\n",
430 error->code, error->message ? error->message : "??");
431 g_error_free(error);
432 goto error;
433 }
434
435 /* Done */
436 JANUS_LOG(LOG_INFO, "Setup of WebSockets event handler completed\n");
437 goto done;
438
439 error:
440 /* If we got here, something went wrong */
441 success = FALSE;
442 /* Fall through */
443 done:
444 if(config)
445 janus_config_destroy(config);
446 if(!success) {
447 return -1;
448 }
449 JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_WSEVH_NAME);
450 return 0;
451 }
452
janus_wsevh_destroy(void)453 void janus_wsevh_destroy(void) {
454 if(!g_atomic_int_get(&initialized))
455 return;
456 g_atomic_int_set(&stopping, 1);
457 #if ((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4)
458 lws_cancel_service(context);
459 #endif
460
461 if(ws_thread != NULL) {
462 g_thread_join(ws_thread);
463 ws_thread = NULL;
464 }
465
466 g_async_queue_push(events, &exit_event);
467 if(handler_thread != NULL) {
468 g_thread_join(handler_thread);
469 handler_thread = NULL;
470 }
471 g_async_queue_unref(events);
472 events = NULL;
473
474 char *message = NULL;
475 while((message = g_async_queue_try_pop(messages)) != NULL) {
476 g_free(message);
477 }
478 g_async_queue_unref(messages);
479
480 g_atomic_int_set(&initialized, 0);
481 g_atomic_int_set(&stopping, 0);
482 JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_WSEVH_NAME);
483 }
484
janus_wsevh_get_api_compatibility(void)485 int janus_wsevh_get_api_compatibility(void) {
486 /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
487 return JANUS_EVENTHANDLER_API_VERSION;
488 }
489
janus_wsevh_get_version(void)490 int janus_wsevh_get_version(void) {
491 return JANUS_WSEVH_VERSION;
492 }
493
janus_wsevh_get_version_string(void)494 const char *janus_wsevh_get_version_string(void) {
495 return JANUS_WSEVH_VERSION_STRING;
496 }
497
janus_wsevh_get_description(void)498 const char *janus_wsevh_get_description(void) {
499 return JANUS_WSEVH_DESCRIPTION;
500 }
501
janus_wsevh_get_name(void)502 const char *janus_wsevh_get_name(void) {
503 return JANUS_WSEVH_NAME;
504 }
505
janus_wsevh_get_author(void)506 const char *janus_wsevh_get_author(void) {
507 return JANUS_WSEVH_AUTHOR;
508 }
509
janus_wsevh_get_package(void)510 const char *janus_wsevh_get_package(void) {
511 return JANUS_WSEVH_PACKAGE;
512 }
513
janus_wsevh_incoming_event(json_t * event)514 void janus_wsevh_incoming_event(json_t *event) {
515 if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
516 /* Janus is closing or the plugin is */
517 return;
518 }
519
520 /* Do NOT handle the event here in this callback! Since Janus notifies you right
521 * away when something happens, these events are triggered from working threads and
522 * not some sort of message bus. As such, performing I/O or network operations in
523 * here could dangerously slow Janus down. Let's just reference and enqueue the event,
524 * and handle it in our own thread: the event contains a monotonic time indicator of
525 * when the event actually happened on this machine, so that, if relevant, we can compute
526 * any delay in the actual event processing ourselves. */
527 json_incref(event);
528 g_async_queue_push(events, event);
529 }
530
janus_wsevh_handle_request(json_t * request)531 json_t *janus_wsevh_handle_request(json_t *request) {
532 if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
533 return NULL;
534 }
535 /* We can use this requests to apply tweaks to the logic */
536 int error_code = 0;
537 char error_cause[512];
538 JANUS_VALIDATE_JSON_OBJECT(request, request_parameters,
539 error_code, error_cause, TRUE,
540 JANUS_WSEVH_ERROR_MISSING_ELEMENT, JANUS_WSEVH_ERROR_INVALID_ELEMENT);
541 if(error_code != 0)
542 goto plugin_response;
543 /* Get the request */
544 const char *request_text = json_string_value(json_object_get(request, "request"));
545 if(!strcasecmp(request_text, "tweak")) {
546 /* We only support a request to tweak the current settings */
547 JANUS_VALIDATE_JSON_OBJECT(request, tweak_parameters,
548 error_code, error_cause, TRUE,
549 JANUS_WSEVH_ERROR_MISSING_ELEMENT, JANUS_WSEVH_ERROR_INVALID_ELEMENT);
550 if(error_code != 0)
551 goto plugin_response;
552 /* Events */
553 if(json_object_get(request, "events"))
554 janus_events_edit_events_mask(json_string_value(json_object_get(request, "events")), &janus_wsevh.events_mask);
555 /* Grouping */
556 if(json_object_get(request, "grouping"))
557 group_events = json_is_true(json_object_get(request, "grouping"));
558 } else {
559 JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
560 error_code = JANUS_WSEVH_ERROR_INVALID_REQUEST;
561 g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
562 }
563
564 plugin_response:
565 {
566 json_t *response = json_object();
567 if(error_code == 0) {
568 /* Return a success */
569 json_object_set_new(response, "result", json_integer(200));
570 } else {
571 /* Prepare JSON error event */
572 json_object_set_new(response, "error_code", json_integer(error_code));
573 json_object_set_new(response, "error", json_string(error_cause));
574 }
575 return response;
576 }
577 }
578
579 #if (LWS_LIBRARY_VERSION_MAJOR >= 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 4)
580 /* Websocket thread loop for websocket library newer than 3.2
581 * The reconnect is handled in a dedicated lws scheduler janus_wsevh_schedule_connect_attempt */
janus_wsevh_thread(void * data)582 static void *janus_wsevh_thread(void *data) {
583 JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler (lws>=3.2) client thread\n");
584 int nLast = 0;
585 while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
586 int n = lws_service(context, 0);
587 if((n < 0 || nLast < 0) && nLast != n) {
588 JANUS_LOG(LOG_ERR, "lws_service returned %d\n", n);
589 nLast = n;
590 }
591 }
592 lws_context_destroy(context);
593 JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler (lws>=3.2) client thread\n");
594 return NULL;
595 }
596 #else
597 /* Websocket thread loop for websocket library prior to (less than) 3.2
598 * The reconnect is handled in the loop for lws < 3.2 */
janus_wsevh_thread(void * data)599 static void *janus_wsevh_thread(void *data) {
600 JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler (lws<3.2) client thread\n");
601 while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
602 /* Loop until we have to stop */
603 if(!reconnect) {
604 lws_service(context, 50);
605 } else {
606 /* We should reconnect, get rid of the previous context */
607 if(reconnect_delay > 0) {
608 /* Wait a few seconds before retrying */
609 gint64 now = janus_get_monotonic_time();
610 if((now-disconnected) < (gint64)reconnect_delay*G_USEC_PER_SEC) {
611 /* Try again later */
612 g_usleep(100000);
613 continue;
614 }
615 }
616 ws_client = NULL;
617 janus_wsevh_connect_attempt(NULL);
618 if(!wsi) {
619 janus_wsevh_calculate_reconnect_delay_on_fail();
620 JANUS_LOG(LOG_WARN, "WebSocketsEventHandler: Error attempting connection... (next retry in %ds)\n", reconnect_delay);
621 }
622 }
623 }
624 lws_context_destroy(context);
625 JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler (lws<3.2) client thread\n");
626 return NULL;
627 }
628 #endif
629
630 /* Thread to handle incoming events */
janus_wsevh_handler(void * data)631 static void *janus_wsevh_handler(void *data) {
632 JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler handler thread\n");
633 json_t *event = NULL, *output = NULL;
634 char *event_text = NULL;
635 int count = 0, max = group_events ? 100 : 1;
636
637 while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
638
639 event = g_async_queue_pop(events);
640 if(event == &exit_event)
641 break;
642 count = 0;
643 output = NULL;
644
645 while(TRUE) {
646 /* Handle event: just for fun, let's see how long it took for us to take care of this */
647 json_t *created = json_object_get(event, "timestamp");
648 if(created && json_is_integer(created)) {
649 gint64 then = json_integer_value(created);
650 gint64 now = janus_get_monotonic_time();
651 JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
652 }
653 if(!group_events) {
654 /* We're done here, we just need a single event */
655 output = event;
656 break;
657 }
658 /* If we got here, we're grouping */
659 if(output == NULL)
660 output = json_array();
661 json_array_append_new(output, event);
662 /* Never group more than a maximum number of events, though, or we might stay here forever */
663 count++;
664 if(count == max)
665 break;
666 event = g_async_queue_try_pop(events);
667 if(event == NULL || event == &exit_event)
668 break;
669 }
670
671 if(!g_atomic_int_get(&stopping)) {
672 /* Since this a simple plugin, it does the same for all events: so just convert to string... */
673 event_text = json_dumps(output, json_format);
674 if(event_text == NULL) {
675 JANUS_LOG(LOG_WARN, "Failed to stringify event, event lost...\n");
676 /* Nothing we can do... get rid of the event */
677 json_decref(output);
678 output = NULL;
679 continue;
680 }
681 g_async_queue_push(messages, event_text);
682 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
683 if(context != NULL)
684 lws_cancel_service(context);
685 #else
686 /* On libwebsockets < 3.x we use lws_callback_on_writable */
687 janus_mutex_lock(&writable_mutex);
688 if(wsi != NULL)
689 lws_callback_on_writable(wsi);
690 janus_mutex_unlock(&writable_mutex);
691 #endif
692 }
693
694 /* Done, let's unref the event */
695 json_decref(output);
696 output = NULL;
697 }
698 JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler handler thread\n");
699 return NULL;
700 }
701
janus_wsevh_callback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)702 static int janus_wsevh_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
703 switch(reason) {
704 case LWS_CALLBACK_CLIENT_ESTABLISHED: {
705 /* Prepare the session */
706 if(ws_client == NULL)
707 ws_client = (janus_wsevh_client *)user;
708 ws_client->wsi = wsi;
709 ws_client->buffer = NULL;
710 ws_client->buflen = 0;
711 ws_client->bufpending = 0;
712 ws_client->bufoffset = 0;
713 reconnect_delay = 0;
714 reconnect = FALSE;
715 janus_mutex_init(&ws_client->mutex);
716 lws_callback_on_writable(wsi);
717 JANUS_LOG(LOG_INFO, "WebSocketsEventHandler connected\n");
718 return 0;
719 }
720 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: {
721 janus_wsevh_calculate_reconnect_delay_on_fail();
722 JANUS_LOG(LOG_ERR, "WebSocketsEventHandler: Error connecting to backend (%s) (next retry in %ds)\n",
723 in ? (char *)in : "unknown error",
724 reconnect_delay);
725 disconnected = janus_get_monotonic_time();
726 reconnect = TRUE;
727 janus_wsevh_schedule_connect_attempt();
728 return 1;
729 }
730 case LWS_CALLBACK_CLIENT_RECEIVE: {
731 /* We don't care */
732 return 0;
733 }
734 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
735 /* On libwebsockets >= 3.x, we use this event to mark connections as writable in the event loop */
736 case LWS_CALLBACK_EVENT_WAIT_CANCELLED: {
737 if(ws_client != NULL && ws_client->wsi != NULL)
738 lws_callback_on_writable(ws_client->wsi);
739 return 0;
740 }
741 #endif
742 case LWS_CALLBACK_CLIENT_WRITEABLE: {
743 if(ws_client == NULL || ws_client->wsi == NULL) {
744 JANUS_LOG(LOG_ERR, "Invalid WebSocket client instance...\n");
745 return -1;
746 }
747 if(!g_atomic_int_get(&stopping)) {
748 janus_mutex_lock(&ws_client->mutex);
749 /* Check if we have a pending/partial write to complete first */
750 if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0
751 && !g_atomic_int_get(&stopping)) {
752 JANUS_LOG(LOG_VERB, "Completing pending WebSocket write (still need to write last %d bytes)...\n",
753 ws_client->bufpending);
754 int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, ws_client->bufpending, LWS_WRITE_TEXT);
755 JANUS_LOG(LOG_VERB, " -- Sent %d/%d bytes\n", sent, ws_client->bufpending);
756 if(sent > -1 && sent < ws_client->bufpending) {
757 /* We still couldn't send everything that was left, we'll try and complete this in the next round */
758 ws_client->bufpending -= sent;
759 ws_client->bufoffset += sent;
760 } else {
761 /* Clear the pending/partial write queue */
762 ws_client->bufpending = 0;
763 ws_client->bufoffset = 0;
764 }
765 /* Done for this round, check the next response/notification later */
766 lws_callback_on_writable(wsi);
767 janus_mutex_unlock(&ws_client->mutex);
768 return 0;
769 }
770 /* Shoot all the pending messages */
771 char *event = g_async_queue_try_pop(messages);
772 if(event && !g_atomic_int_get(&stopping)) {
773 /* Gotcha! */
774 int buflen = LWS_PRE + strlen(event);
775 if(ws_client->buffer == NULL) {
776 /* Let's allocate a shared buffer */
777 JANUS_LOG(LOG_VERB, "Allocating %d bytes (event is %zu bytes)\n", buflen, strlen(event));
778 ws_client->buflen = buflen;
779 ws_client->buffer = g_malloc0(buflen);
780 } else if(buflen > ws_client->buflen) {
781 /* We need a larger shared buffer */
782 JANUS_LOG(LOG_VERB, "Re-allocating to %d bytes (was %d, event is %zu bytes)\n",
783 buflen, ws_client->buflen, strlen(event));
784 ws_client->buflen = buflen;
785 ws_client->buffer = g_realloc(ws_client->buffer, buflen);
786 }
787 memcpy(ws_client->buffer + LWS_PRE, event, strlen(event));
788 JANUS_LOG(LOG_VERB, "Sending WebSocket message (%zu bytes)...\n", strlen(event));
789 int sent = lws_write(wsi, ws_client->buffer + LWS_PRE, strlen(event), LWS_WRITE_TEXT);
790 JANUS_LOG(LOG_VERB, " -- Sent %d/%zu bytes\n", sent, strlen(event));
791 if(sent > -1 && sent < (int)strlen(event)) {
792 /* We couldn't send everything in a single write, we'll complete this in the next round */
793 ws_client->bufpending = strlen(event) - sent;
794 ws_client->bufoffset = LWS_PRE + sent;
795 JANUS_LOG(LOG_VERB, " -- Couldn't write all bytes (%d missing), setting offset %d\n",
796 ws_client->bufpending, ws_client->bufoffset);
797 }
798 /* We can get rid of the message */
799 free(event);
800 /* Done for this round, check the next response/notification later */
801 lws_callback_on_writable(wsi);
802 janus_mutex_unlock(&ws_client->mutex);
803 return 0;
804 }
805 janus_mutex_unlock(&ws_client->mutex);
806 }
807 return 0;
808 }
809 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
810 case LWS_CALLBACK_CLIENT_CLOSED: {
811 #else
812 case LWS_CALLBACK_CLOSED: {
813 #endif
814 reconnect_delay = 1;
815 JANUS_LOG(LOG_INFO, "Connection to WebSocketsEventHandler backend closed (next connection attempt in %ds)\n", reconnect_delay);
816 if(ws_client != NULL) {
817 /* Cleanup */
818 janus_mutex_lock(&ws_client->mutex);
819 JANUS_LOG(LOG_INFO, "Destroying WebSocketsEventHandler client\n");
820 ws_client->wsi = NULL;
821 /* Free the shared buffers */
822 g_free(ws_client->buffer);
823 ws_client->buffer = NULL;
824 ws_client->buflen = 0;
825 ws_client->bufpending = 0;
826 ws_client->bufoffset = 0;
827 janus_mutex_unlock(&ws_client->mutex);
828 }
829 /* Check if we should reconnect */
830 ws_client = NULL;
831 wsi = NULL;
832 disconnected = janus_get_monotonic_time();
833 reconnect = TRUE;
834 janus_wsevh_schedule_connect_attempt();
835 return 0;
836 }
837 default:
838 if(wsi)
839 JANUS_LOG(LOG_HUGE, "%d (%s)\n", reason, janus_wsevh_reason_string(reason));
840 break;
841 }
842 return 0;
843 }
844
845 /* Implements the connecting attempt to the backend websocket server
846 * sets the connection result (lws_client_connect_info) to static wsi */
847 static void janus_wsevh_connect_attempt(lws_sorted_usec_list_t *sul) {
848 struct lws_client_connect_info i = { 0 };
849 i.host = address;
850 i.origin = address;
851 i.address = address;
852 i.port = port;
853 i.path = path;
854 i.context = context;
855 if(!strcasecmp(protocol, "wss"))
856 i.ssl_connection = 1;
857 i.ietf_version_or_minus_one = -1;
858 i.client_exts = exts;
859 i.protocol = protocols[0].name;
860 JANUS_LOG(LOG_INFO, "WebSocketsEventHandler: Connecting to backend websocket server %s:%d...\n", address, port);
861 wsi = lws_client_connect_via_info(&i);
862 if(!wsi) {
863 /* As we specified a callback pointer in the context the NULL result is unlikely to happen */
864 disconnected = janus_get_monotonic_time();
865 reconnect = TRUE;
866 JANUS_LOG(LOG_ERR, "WebSocketsEventHandler: Connecting to backend websocket server %s:%d failed\n", address, port);
867 return;
868 }
869 reconnect = FALSE;
870 }
871
872 /* Adopts the reconnect_delay value in case of an error
873 * Increases the value up to JANUS_WSEVH_MAX_RETRY_SECS */
874 static void janus_wsevh_calculate_reconnect_delay_on_fail(void) {
875 if(reconnect_delay == 0)
876 reconnect_delay = 1;
877 else if(reconnect_delay < JANUS_WSEVH_MAX_RETRY_SECS) {
878 reconnect_delay += reconnect_delay;
879 if(reconnect_delay > JANUS_WSEVH_MAX_RETRY_SECS)
880 reconnect_delay = JANUS_WSEVH_MAX_RETRY_SECS;
881 }
882 }
883
884 /* Schedules a connect attempt using the lws scheduler as */
885 static void janus_wsevh_schedule_connect_attempt(void) {
886 #if (LWS_LIBRARY_VERSION_MAJOR >= 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 4)
887 lws_sul_schedule(context, 0, &sul_stagger, janus_wsevh_connect_attempt, reconnect_delay * LWS_US_PER_SEC);
888 #endif
889 }
890