1 /*! \file   janus_wsevh.c
2  * \author Lorenzo Miniero <lorenzo@meetecho.com>
3  * \copyright GNU General Public License v3
4  * \brief  Janus WebSockets EventHandler plugin
5  * \details  This is a trivial WebSockets event handler plugin for Janus
6  *
7  * \ingroup eventhandlers
8  * \ref eventhandlers
9  */
10 
11 #include "eventhandler.h"
12 
13 #include <math.h>
14 
15 #include <libwebsockets.h>
16 
17 #include "../debug.h"
18 #include "../config.h"
19 #include "../mutex.h"
20 #include "../utils.h"
21 #include "../events.h"
22 
23 
24 /* Plugin information */
25 #define JANUS_WSEVH_VERSION			1
26 #define JANUS_WSEVH_VERSION_STRING	"0.0.1"
27 #define JANUS_WSEVH_DESCRIPTION		"This is a trivial WebSockets event handler plugin for Janus."
28 #define JANUS_WSEVH_NAME			"JANUS WebSocketsEventHandler plugin"
29 #define JANUS_WSEVH_AUTHOR			"Meetecho s.r.l."
30 #define JANUS_WSEVH_PACKAGE			"janus.eventhandler.wsevh"
31 
32 /* Plugin methods */
33 janus_eventhandler *create(void);
34 int janus_wsevh_init(const char *config_path);
35 void janus_wsevh_destroy(void);
36 int janus_wsevh_get_api_compatibility(void);
37 int janus_wsevh_get_version(void);
38 const char *janus_wsevh_get_version_string(void);
39 const char *janus_wsevh_get_description(void);
40 const char *janus_wsevh_get_name(void);
41 const char *janus_wsevh_get_author(void);
42 const char *janus_wsevh_get_package(void);
43 void janus_wsevh_incoming_event(json_t *event);
44 json_t *janus_wsevh_handle_request(json_t *request);
45 
46 /* Event handler setup */
47 static janus_eventhandler janus_wsevh =
48 	JANUS_EVENTHANDLER_INIT (
49 		.init = janus_wsevh_init,
50 		.destroy = janus_wsevh_destroy,
51 
52 		.get_api_compatibility = janus_wsevh_get_api_compatibility,
53 		.get_version = janus_wsevh_get_version,
54 		.get_version_string = janus_wsevh_get_version_string,
55 		.get_description = janus_wsevh_get_description,
56 		.get_name = janus_wsevh_get_name,
57 		.get_author = janus_wsevh_get_author,
58 		.get_package = janus_wsevh_get_package,
59 
60 		.incoming_event = janus_wsevh_incoming_event,
61 		.handle_request = janus_wsevh_handle_request,
62 
63 		.events_mask = JANUS_EVENT_TYPE_NONE
64 	);
65 
66 /* Plugin creator */
create(void)67 janus_eventhandler *create(void) {
68 	JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_WSEVH_NAME);
69 	return &janus_wsevh;
70 }
71 
72 /* Useful stuff */
73 static volatile gint initialized = 0, stopping = 0;
74 static GThread *ws_thread, *handler_thread;
75 static void *janus_wsevh_thread(void *data);
76 static void *janus_wsevh_handler(void *data);
77 
78 /* Connection related helper methods */
79 static void janus_wsevh_schedule_connect_attempt(void);
80 static void janus_wsevh_calculate_reconnect_delay_on_fail(void);
81 /* lws_sorted_usec_list_t is defined starting with lws 3.2 */
82 #if !(((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4))
83 	#define lws_sorted_usec_list_t void
84 #endif
85 static void janus_wsevh_connect_attempt(lws_sorted_usec_list_t *sul);
86 
87 /* Queue of events to handle */
88 static GAsyncQueue *events = NULL;
89 static gboolean group_events = TRUE;
90 static json_t exit_event;
janus_wsevh_event_free(json_t * event)91 static void janus_wsevh_event_free(json_t *event) {
92 	if(!event || event == &exit_event)
93 		return;
94 	json_decref(event);
95 }
96 
97 /* JSON serialization options */
98 static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
99 
100 
101 /* Parameter validation (for tweaking via Admin API) */
102 static struct janus_json_parameter request_parameters[] = {
103 	{"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
104 };
105 static struct janus_json_parameter tweak_parameters[] = {
106 	{"events", JSON_STRING, 0},
107 	{"grouping", JANUS_JSON_BOOL, 0}
108 };
109 /* Error codes (for the tweaking via Admin API */
110 #define JANUS_WSEVH_ERROR_INVALID_REQUEST		411
111 #define JANUS_WSEVH_ERROR_MISSING_ELEMENT		412
112 #define JANUS_WSEVH_ERROR_INVALID_ELEMENT		413
113 #define JANUS_WSEVH_ERROR_UNKNOWN_ERROR			499
114 
115 /* Logging */
116 static int wsevh_log_level = 0;
janus_wsevh_get_level_str(int level)117 static const char *janus_wsevh_get_level_str(int level) {
118 	switch(level) {
119 		case LLL_ERR:
120 			return "ERR";
121 		case LLL_WARN:
122 			return "WARN";
123 		case LLL_NOTICE:
124 			return "NOTICE";
125 		case LLL_INFO:
126 			return "INFO";
127 		case LLL_DEBUG:
128 			return "DEBUG";
129 		case LLL_PARSER:
130 			return "PARSER";
131 		case LLL_HEADER:
132 			return "HEADER";
133 		case LLL_EXT:
134 			return "EXT";
135 		case LLL_CLIENT:
136 			return "CLIENT";
137 		case LLL_LATENCY:
138 			return "LATENCY";
139 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
140 		case LLL_USER:
141 			return "USER";
142 #endif
143 		case LLL_COUNT:
144 			return "COUNT";
145 		default:
146 			return NULL;
147 	}
148 }
janus_wsevh_log_emit_function(int level,const char * line)149 static void janus_wsevh_log_emit_function(int level, const char *line) {
150 	/* FIXME Do we want to use different Janus debug levels according to the level here? */
151 	JANUS_LOG(LOG_INFO, "[libwebsockets][wsevh][%s] %s", janus_wsevh_get_level_str(level), line);
152 }
153 
154 
155 /* WebSockets properties */
156 static char *backend = NULL;
157 static const char *protocol = NULL, *address = NULL;
158 static char path[256];
159 static int port = 0;
160 static struct lws_context *context = NULL;
161 #if ((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4)
162 static lws_sorted_usec_list_t sul_stagger = { 0 };
163 #endif
164 static gint64 disconnected = 0;
165 static gboolean reconnect = FALSE;
166 static int reconnect_delay = 0;
167 #define JANUS_WSEVH_MAX_RETRY_SECS	8
168 
169 typedef struct janus_wsevh_client {
170 	struct lws *wsi;		/* The libwebsockets client instance */
171 	unsigned char *buffer;	/* Buffer containing the message to send */
172 	int buflen;				/* Length of the buffer (may be resized after re-allocations) */
173 	int bufpending;			/* Data an interrupted previous write couldn't send */
174 	int bufoffset;			/* Offset from where the interrupted previous write should resume */
175 	janus_mutex mutex;		/* Mutex to lock/unlock this instance */
176 } janus_wsevh_client;
177 static janus_wsevh_client *ws_client = NULL;
178 static struct lws *wsi = NULL;
179 static GAsyncQueue *messages = NULL;	/* Queue of outgoing messages to push */
180 static janus_mutex writable_mutex;
181 
182 static int janus_wsevh_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
183 static struct lws_protocols protocols[] = {
184 	{ "janus-event-handlers", janus_wsevh_callback, sizeof(janus_wsevh_client), 0 },	/* Subprotocol will be configurable */
185 	{ NULL, NULL, 0, 0 }
186 };
187 static const struct lws_extension exts[] = {
188 #ifndef LWS_WITHOUT_EXTENSIONS
189 	{ "permessage-deflate", lws_extension_callback_pm_deflate, "permessage-deflate; client_max_window_bits" },
190 	{ "deflate-frame", lws_extension_callback_pm_deflate, "deflate_frame" },
191 #endif
192 	{ NULL, NULL, NULL }
193 };
194 
195 /* WebSockets error management */
196 #define CASE_STR(name) case name: return #name
janus_wsevh_reason_string(enum lws_callback_reasons reason)197 static const char *janus_wsevh_reason_string(enum lws_callback_reasons reason) {
198 	switch(reason) {
199 		CASE_STR(LWS_CALLBACK_ESTABLISHED);
200 		CASE_STR(LWS_CALLBACK_CLIENT_CONNECTION_ERROR);
201 		CASE_STR(LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH);
202 		CASE_STR(LWS_CALLBACK_CLIENT_ESTABLISHED);
203 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
204 		CASE_STR(LWS_CALLBACK_CLIENT_CLOSED);
205 #endif
206 		CASE_STR(LWS_CALLBACK_CLOSED);
207 		CASE_STR(LWS_CALLBACK_CLOSED_HTTP);
208 		CASE_STR(LWS_CALLBACK_RECEIVE);
209 		CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE);
210 		CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE_PONG);
211 		CASE_STR(LWS_CALLBACK_CLIENT_WRITEABLE);
212 		CASE_STR(LWS_CALLBACK_SERVER_WRITEABLE);
213 		CASE_STR(LWS_CALLBACK_HTTP);
214 		CASE_STR(LWS_CALLBACK_HTTP_BODY);
215 		CASE_STR(LWS_CALLBACK_HTTP_BODY_COMPLETION);
216 		CASE_STR(LWS_CALLBACK_HTTP_FILE_COMPLETION);
217 		CASE_STR(LWS_CALLBACK_HTTP_WRITEABLE);
218 		CASE_STR(LWS_CALLBACK_FILTER_NETWORK_CONNECTION);
219 		CASE_STR(LWS_CALLBACK_FILTER_HTTP_CONNECTION);
220 		CASE_STR(LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED);
221 		CASE_STR(LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION);
222 		CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS);
223 		CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS);
224 		CASE_STR(LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION);
225 		CASE_STR(LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER);
226 		CASE_STR(LWS_CALLBACK_CONFIRM_EXTENSION_OKAY);
227 		CASE_STR(LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED);
228 		CASE_STR(LWS_CALLBACK_PROTOCOL_INIT);
229 		CASE_STR(LWS_CALLBACK_PROTOCOL_DESTROY);
230 		CASE_STR(LWS_CALLBACK_WSI_CREATE);
231 		CASE_STR(LWS_CALLBACK_WSI_DESTROY);
232 		CASE_STR(LWS_CALLBACK_GET_THREAD_ID);
233 		CASE_STR(LWS_CALLBACK_ADD_POLL_FD);
234 		CASE_STR(LWS_CALLBACK_DEL_POLL_FD);
235 		CASE_STR(LWS_CALLBACK_CHANGE_MODE_POLL_FD);
236 		CASE_STR(LWS_CALLBACK_LOCK_POLL);
237 		CASE_STR(LWS_CALLBACK_UNLOCK_POLL);
238 		CASE_STR(LWS_CALLBACK_OPENSSL_CONTEXT_REQUIRES_PRIVATE_KEY);
239 		CASE_STR(LWS_CALLBACK_USER);
240 		default:
241 			break;
242 	}
243 	return NULL;
244 }
245 
246 
247 /* Plugin implementation */
janus_wsevh_init(const char * config_path)248 int janus_wsevh_init(const char *config_path) {
249 	gboolean success = TRUE;
250 	if(g_atomic_int_get(&stopping)) {
251 		/* Still stopping from before */
252 		return -1;
253 	}
254 	if(config_path == NULL) {
255 		/* Invalid arguments */
256 		return -1;
257 	}
258 	/* Read configuration */
259 	char filename[255];
260 	g_snprintf(filename, 255, "%s/%s.jcfg", config_path, JANUS_WSEVH_PACKAGE);
261 	JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
262 	janus_config *config = janus_config_parse(filename);
263 	if(config == NULL) {
264 		JANUS_LOG(LOG_WARN, "Couldn't find .jcfg configuration file (%s), trying .cfg\n", JANUS_WSEVH_PACKAGE);
265 		g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_WSEVH_PACKAGE);
266 		JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
267 		config = janus_config_parse(filename);
268 	}
269 	if(config != NULL)
270 		janus_config_print(config);
271 	janus_config_category *config_general = janus_config_get_create(config, NULL, janus_config_type_category, "general");
272 
273 	/* Setup the event handler, if required */
274 	janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "enabled");
275 	if(!item || !item->value || !janus_is_true(item->value)) {
276 		JANUS_LOG(LOG_WARN, "WebSockets event handler disabled\n");
277 		goto error;
278 	}
279 
280 	item = janus_config_get(config, config_general, janus_config_type_item, "json");
281 	if(item && item->value) {
282 		/* Check how we need to format/serialize the JSON output */
283 		if(!strcasecmp(item->value, "indented")) {
284 			/* Default: indented, we use three spaces for that */
285 			json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
286 		} else if(!strcasecmp(item->value, "plain")) {
287 			/* Not indented and no new lines, but still readable */
288 			json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
289 		} else if(!strcasecmp(item->value, "compact")) {
290 			/* Compact, so no spaces between separators */
291 			json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
292 		} else {
293 			JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
294 			json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
295 		}
296 	}
297 
298 	item = janus_config_get(config, config_general, janus_config_type_item, "ws_logging");
299 	if(item && item->value) {
300 		/* libwebsockets uses a mask to set log levels, as documented here:
301 		 * https://libwebsockets.org/lws-api-doc-master/html/group__log.html */
302 		if(strstr(item->value, "none")) {
303 			/* Disable libwebsockets logging completely (the default) */
304 		} else if(strstr(item->value, "all")) {
305 			/* Enable all libwebsockets logging */
306 			wsevh_log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO |
307 				LLL_DEBUG | LLL_PARSER | LLL_HEADER | LLL_EXT |
308 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
309 				LLL_CLIENT | LLL_LATENCY | LLL_USER | LLL_COUNT;
310 #else
311 				LLL_CLIENT | LLL_LATENCY | LLL_COUNT;
312 #endif
313 		} else {
314 			/* Only enable some of the properties */
315 			if(strstr(item->value, "err"))
316 				wsevh_log_level |= LLL_ERR;
317 			if(strstr(item->value, "warn"))
318 				wsevh_log_level |= LLL_WARN;
319 			if(strstr(item->value, "notice"))
320 				wsevh_log_level |= LLL_NOTICE;
321 			if(strstr(item->value, "info"))
322 				wsevh_log_level |= LLL_INFO;
323 			if(strstr(item->value, "debug"))
324 				wsevh_log_level |= LLL_DEBUG;
325 			if(strstr(item->value, "parser"))
326 				wsevh_log_level |= LLL_PARSER;
327 			if(strstr(item->value, "header"))
328 				wsevh_log_level |= LLL_HEADER;
329 			if(strstr(item->value, "ext"))
330 				wsevh_log_level |= LLL_EXT;
331 			if(strstr(item->value, "client"))
332 				wsevh_log_level |= LLL_CLIENT;
333 			if(strstr(item->value, "latency"))
334 				wsevh_log_level |= LLL_LATENCY;
335 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
336 			if(strstr(item->value, "user"))
337 				wsevh_log_level |= LLL_USER;
338 #endif
339 			if(strstr(item->value, "count"))
340 				wsevh_log_level |= LLL_COUNT;
341 		}
342 	}
343 	if(wsevh_log_level > 0)
344 		JANUS_LOG(LOG_INFO, "WebSockets event handler libwebsockets logging: %d\n", wsevh_log_level);
345 	lws_set_log_level(wsevh_log_level, janus_wsevh_log_emit_function);
346 
347 	/* Which events should we subscribe to? */
348 	item = janus_config_get(config, config_general, janus_config_type_item, "events");
349 	if(item && item->value)
350 		janus_events_edit_events_mask(item->value, &janus_wsevh.events_mask);
351 
352 	/* Is grouping of events ok? */
353 	item = janus_config_get(config, config_general, janus_config_type_item, "grouping");
354 	if(item && item->value)
355 		group_events = janus_is_true(item->value);
356 
357 	/* Handle the rest of the configuration, starting from the server details */
358 	item = janus_config_get(config, config_general, janus_config_type_item, "backend");
359 	if(item && item->value)
360 		backend = g_strdup(item->value);
361 	if(backend == NULL) {
362 		JANUS_LOG(LOG_FATAL, "Missing WebSockets backend\n");
363 		goto error;
364 	}
365 	const char *p = NULL;
366 	if(lws_parse_uri(backend, &protocol, &address, &port, &p)) {
367 		JANUS_LOG(LOG_FATAL, "Error parsing address\n");
368 		goto error;
369 	}
370 	if((strcasecmp(protocol, "ws") && strcasecmp(protocol, "wss")) || !strlen(address)) {
371 		JANUS_LOG(LOG_FATAL, "Invalid address (only ws:// and wss:// addresses are supported)\n");
372 		JANUS_LOG(LOG_FATAL, "  -- Protocol: %s\n", protocol);
373 		JANUS_LOG(LOG_FATAL, "  -- Address:  %s\n", address);
374 		JANUS_LOG(LOG_FATAL, "  -- Path:     %s\n", p);
375 		goto error;
376 	}
377 	path[0] = '/';
378 	if(strlen(p) > 1)
379 		g_strlcpy(path + 1, p, sizeof(path)-2);
380 	/* Before connecting, let's check if the server expects a subprotocol */
381 	item = janus_config_get(config, config_general, janus_config_type_item, "subprotocol");
382 	if(item && item->value)
383 		protocols[0].name = g_strdup(item->value);
384 
385 	/* Connect */
386 	gboolean secure = !strcasecmp(protocol, "wss");
387 	struct lws_context_creation_info info = { 0 };
388 	info.port = CONTEXT_PORT_NO_LISTEN;
389 	info.protocols = protocols;
390 	info.gid = -1;
391 	info.uid = -1;
392 #if ((LWS_LIBRARY_VERSION_MAJOR == 4 && LWS_LIBRARY_VERSION_MINOR >= 1) || LWS_LIBRARY_VERSION_MAJOR >= 5)
393 	info.connect_timeout_secs = 5;
394 #endif
395 	if(secure)
396 		info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
397 	context = lws_create_context(&info);
398 	if(context == NULL) {
399 		JANUS_LOG(LOG_FATAL, "Creating libwebsocket context failed\n");
400 		goto error;
401 	}
402 	janus_wsevh_connect_attempt(NULL);
403 	if(wsi == NULL) {
404 		JANUS_LOG(LOG_FATAL, "Error initializing WebSocket connection\n");
405 		goto error;
406 	}
407 	janus_mutex_init(&writable_mutex);
408 
409 	/* Initialize the events queue */
410 	events = g_async_queue_new_full((GDestroyNotify) janus_wsevh_event_free);
411 	messages = g_async_queue_new();
412 	g_atomic_int_set(&initialized, 1);
413 
414 	/* Start a thread to handle the WebSockets event loop */
415 	GError *error = NULL;
416 	ws_thread = g_thread_try_new("janus wsevh client", janus_wsevh_thread, NULL, &error);
417 	if(error != NULL) {
418 		g_atomic_int_set(&initialized, 0);
419 		JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the WebSocketsEventHandler client thread...\n",
420 			error->code, error->message ? error->message : "??");
421 		g_error_free(error);
422 		goto error;
423 	}
424 	/* Start another thread to handle incoming events */
425 	error = NULL;
426 	handler_thread = g_thread_try_new("janus wsevh handler", janus_wsevh_handler, NULL, &error);
427 	if(error != NULL) {
428 		g_atomic_int_set(&initialized, 0);
429 		JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the WebSocketsEventHandler handler thread...\n",
430 			error->code, error->message ? error->message : "??");
431 		g_error_free(error);
432 		goto error;
433 	}
434 
435 	/* Done */
436 	JANUS_LOG(LOG_INFO, "Setup of WebSockets event handler completed\n");
437 	goto done;
438 
439 error:
440 	/* If we got here, something went wrong */
441 	success = FALSE;
442 	/* Fall through */
443 done:
444 	if(config)
445 		janus_config_destroy(config);
446 	if(!success) {
447 		return -1;
448 	}
449 	JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_WSEVH_NAME);
450 	return 0;
451 }
452 
janus_wsevh_destroy(void)453 void janus_wsevh_destroy(void) {
454 	if(!g_atomic_int_get(&initialized))
455 		return;
456 	g_atomic_int_set(&stopping, 1);
457 #if ((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4)
458 	lws_cancel_service(context);
459 #endif
460 
461 	if(ws_thread != NULL) {
462 		g_thread_join(ws_thread);
463 		ws_thread = NULL;
464 	}
465 
466 	g_async_queue_push(events, &exit_event);
467 	if(handler_thread != NULL) {
468 		g_thread_join(handler_thread);
469 		handler_thread = NULL;
470 	}
471 	g_async_queue_unref(events);
472 	events = NULL;
473 
474 	char *message = NULL;
475 	while((message = g_async_queue_try_pop(messages)) != NULL) {
476 		g_free(message);
477 	}
478 	g_async_queue_unref(messages);
479 
480 	g_atomic_int_set(&initialized, 0);
481 	g_atomic_int_set(&stopping, 0);
482 	JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_WSEVH_NAME);
483 }
484 
janus_wsevh_get_api_compatibility(void)485 int janus_wsevh_get_api_compatibility(void) {
486 	/* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
487 	return JANUS_EVENTHANDLER_API_VERSION;
488 }
489 
janus_wsevh_get_version(void)490 int janus_wsevh_get_version(void) {
491 	return JANUS_WSEVH_VERSION;
492 }
493 
janus_wsevh_get_version_string(void)494 const char *janus_wsevh_get_version_string(void) {
495 	return JANUS_WSEVH_VERSION_STRING;
496 }
497 
janus_wsevh_get_description(void)498 const char *janus_wsevh_get_description(void) {
499 	return JANUS_WSEVH_DESCRIPTION;
500 }
501 
janus_wsevh_get_name(void)502 const char *janus_wsevh_get_name(void) {
503 	return JANUS_WSEVH_NAME;
504 }
505 
janus_wsevh_get_author(void)506 const char *janus_wsevh_get_author(void) {
507 	return JANUS_WSEVH_AUTHOR;
508 }
509 
janus_wsevh_get_package(void)510 const char *janus_wsevh_get_package(void) {
511 	return JANUS_WSEVH_PACKAGE;
512 }
513 
janus_wsevh_incoming_event(json_t * event)514 void janus_wsevh_incoming_event(json_t *event) {
515 	if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
516 		/* Janus is closing or the plugin is */
517 		return;
518 	}
519 
520 	/* Do NOT handle the event here in this callback! Since Janus notifies you right
521 	 * away when something happens, these events are triggered from working threads and
522 	 * not some sort of message bus. As such, performing I/O or network operations in
523 	 * here could dangerously slow Janus down. Let's just reference and enqueue the event,
524 	 * and handle it in our own thread: the event contains a monotonic time indicator of
525 	 * when the event actually happened on this machine, so that, if relevant, we can compute
526 	 * any delay in the actual event processing ourselves. */
527 	json_incref(event);
528 	g_async_queue_push(events, event);
529 }
530 
janus_wsevh_handle_request(json_t * request)531 json_t *janus_wsevh_handle_request(json_t *request) {
532 	if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
533 		return NULL;
534 	}
535 	/* We can use this requests to apply tweaks to the logic */
536 	int error_code = 0;
537 	char error_cause[512];
538 	JANUS_VALIDATE_JSON_OBJECT(request, request_parameters,
539 		error_code, error_cause, TRUE,
540 		JANUS_WSEVH_ERROR_MISSING_ELEMENT, JANUS_WSEVH_ERROR_INVALID_ELEMENT);
541 	if(error_code != 0)
542 		goto plugin_response;
543 	/* Get the request */
544 	const char *request_text = json_string_value(json_object_get(request, "request"));
545 	if(!strcasecmp(request_text, "tweak")) {
546 		/* We only support a request to tweak the current settings */
547 		JANUS_VALIDATE_JSON_OBJECT(request, tweak_parameters,
548 			error_code, error_cause, TRUE,
549 			JANUS_WSEVH_ERROR_MISSING_ELEMENT, JANUS_WSEVH_ERROR_INVALID_ELEMENT);
550 		if(error_code != 0)
551 			goto plugin_response;
552 		/* Events */
553 		if(json_object_get(request, "events"))
554 			janus_events_edit_events_mask(json_string_value(json_object_get(request, "events")), &janus_wsevh.events_mask);
555 		/* Grouping */
556 		if(json_object_get(request, "grouping"))
557 			group_events = json_is_true(json_object_get(request, "grouping"));
558 	} else {
559 		JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
560 		error_code = JANUS_WSEVH_ERROR_INVALID_REQUEST;
561 		g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
562 	}
563 
564 plugin_response:
565 		{
566 			json_t *response = json_object();
567 			if(error_code == 0) {
568 				/* Return a success */
569 				json_object_set_new(response, "result", json_integer(200));
570 			} else {
571 				/* Prepare JSON error event */
572 				json_object_set_new(response, "error_code", json_integer(error_code));
573 				json_object_set_new(response, "error", json_string(error_cause));
574 			}
575 			return response;
576 		}
577 }
578 
579 #if (LWS_LIBRARY_VERSION_MAJOR >= 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 4)
580 /* Websocket thread loop for websocket library newer than 3.2
581  * The reconnect is handled in a dedicated lws scheduler janus_wsevh_schedule_connect_attempt */
janus_wsevh_thread(void * data)582 static void *janus_wsevh_thread(void *data) {
583 	JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler (lws>=3.2) client thread\n");
584 	int nLast = 0;
585 	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
586 		int n = lws_service(context, 0);
587 		if((n < 0 || nLast < 0) && nLast != n) {
588 			JANUS_LOG(LOG_ERR, "lws_service returned %d\n", n);
589 			nLast = n;
590 		}
591 	}
592 	lws_context_destroy(context);
593 	JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler (lws>=3.2) client thread\n");
594 	return NULL;
595 }
596 #else
597 /* Websocket thread loop for websocket library prior to (less than) 3.2
598  * The reconnect is handled in the loop for lws < 3.2 */
janus_wsevh_thread(void * data)599 static void *janus_wsevh_thread(void *data) {
600 	JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler (lws<3.2) client thread\n");
601 	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
602 		/* Loop until we have to stop */
603 		if(!reconnect) {
604 			lws_service(context, 50);
605 		} else {
606 			/* We should reconnect, get rid of the previous context */
607 			if(reconnect_delay > 0) {
608 				/* Wait a few seconds before retrying */
609 				gint64 now = janus_get_monotonic_time();
610 				if((now-disconnected) < (gint64)reconnect_delay*G_USEC_PER_SEC) {
611 					/* Try again later */
612 					g_usleep(100000);
613 					continue;
614 				}
615 			}
616 			ws_client = NULL;
617 			janus_wsevh_connect_attempt(NULL);
618 			if(!wsi) {
619 				janus_wsevh_calculate_reconnect_delay_on_fail();
620 				JANUS_LOG(LOG_WARN, "WebSocketsEventHandler: Error attempting connection... (next retry in %ds)\n", reconnect_delay);
621 			}
622 		}
623 	}
624 	lws_context_destroy(context);
625 	JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler (lws<3.2) client thread\n");
626 	return NULL;
627 }
628 #endif
629 
630 /* Thread to handle incoming events */
janus_wsevh_handler(void * data)631 static void *janus_wsevh_handler(void *data) {
632 	JANUS_LOG(LOG_VERB, "Joining WebSocketsEventHandler handler thread\n");
633 	json_t *event = NULL, *output = NULL;
634 	char *event_text = NULL;
635 	int count = 0, max = group_events ? 100 : 1;
636 
637 	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
638 
639 		event = g_async_queue_pop(events);
640 		if(event == &exit_event)
641 			break;
642 		count = 0;
643 		output = NULL;
644 
645 		while(TRUE) {
646 			/* Handle event: just for fun, let's see how long it took for us to take care of this */
647 			json_t *created = json_object_get(event, "timestamp");
648 			if(created && json_is_integer(created)) {
649 				gint64 then = json_integer_value(created);
650 				gint64 now = janus_get_monotonic_time();
651 				JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
652 			}
653 			if(!group_events) {
654 				/* We're done here, we just need a single event */
655 				output = event;
656 				break;
657 			}
658 			/* If we got here, we're grouping */
659 			if(output == NULL)
660 				output = json_array();
661 			json_array_append_new(output, event);
662 			/* Never group more than a maximum number of events, though, or we might stay here forever */
663 			count++;
664 			if(count == max)
665 				break;
666 			event = g_async_queue_try_pop(events);
667 			if(event == NULL || event == &exit_event)
668 				break;
669 		}
670 
671 		if(!g_atomic_int_get(&stopping)) {
672 			/* Since this a simple plugin, it does the same for all events: so just convert to string... */
673 			event_text = json_dumps(output, json_format);
674 			if(event_text == NULL) {
675 				JANUS_LOG(LOG_WARN, "Failed to stringify event, event lost...\n");
676 				/* Nothing we can do... get rid of the event */
677 				json_decref(output);
678 				output = NULL;
679 				continue;
680 			}
681 			g_async_queue_push(messages, event_text);
682 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
683 			if(context != NULL)
684 				lws_cancel_service(context);
685 #else
686 			/* On libwebsockets < 3.x we use lws_callback_on_writable */
687 			janus_mutex_lock(&writable_mutex);
688 			if(wsi != NULL)
689 				lws_callback_on_writable(wsi);
690 			janus_mutex_unlock(&writable_mutex);
691 #endif
692 		}
693 
694 		/* Done, let's unref the event */
695 		json_decref(output);
696 		output = NULL;
697 	}
698 	JANUS_LOG(LOG_VERB, "Leaving WebSocketsEventHandler handler thread\n");
699 	return NULL;
700 }
701 
janus_wsevh_callback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)702 static int janus_wsevh_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
703 	switch(reason) {
704 		case LWS_CALLBACK_CLIENT_ESTABLISHED: {
705 			/* Prepare the session */
706 			if(ws_client == NULL)
707 				ws_client = (janus_wsevh_client *)user;
708 			ws_client->wsi = wsi;
709 			ws_client->buffer = NULL;
710 			ws_client->buflen = 0;
711 			ws_client->bufpending = 0;
712 			ws_client->bufoffset = 0;
713 			reconnect_delay = 0;
714 			reconnect = FALSE;
715 			janus_mutex_init(&ws_client->mutex);
716 			lws_callback_on_writable(wsi);
717 			JANUS_LOG(LOG_INFO, "WebSocketsEventHandler connected\n");
718 			return 0;
719 		}
720 		case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: {
721 			janus_wsevh_calculate_reconnect_delay_on_fail();
722 			JANUS_LOG(LOG_ERR, "WebSocketsEventHandler: Error connecting to backend (%s) (next retry in %ds)\n",
723 				in ? (char *)in : "unknown error",
724 				reconnect_delay);
725 			disconnected = janus_get_monotonic_time();
726 			reconnect = TRUE;
727 			janus_wsevh_schedule_connect_attempt();
728 			return 1;
729 		}
730 		case LWS_CALLBACK_CLIENT_RECEIVE: {
731 			/* We don't care */
732 			return 0;
733 		}
734 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
735 		/* On libwebsockets >= 3.x, we use this event to mark connections as writable in the event loop */
736 		case LWS_CALLBACK_EVENT_WAIT_CANCELLED: {
737 			if(ws_client != NULL && ws_client->wsi != NULL)
738 				lws_callback_on_writable(ws_client->wsi);
739 			return 0;
740 		}
741 #endif
742 		case LWS_CALLBACK_CLIENT_WRITEABLE: {
743 			if(ws_client == NULL || ws_client->wsi == NULL) {
744 				JANUS_LOG(LOG_ERR, "Invalid WebSocket client instance...\n");
745 				return -1;
746 			}
747 			if(!g_atomic_int_get(&stopping)) {
748 				janus_mutex_lock(&ws_client->mutex);
749 				/* Check if we have a pending/partial write to complete first */
750 				if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0
751 						&& !g_atomic_int_get(&stopping)) {
752 					JANUS_LOG(LOG_VERB, "Completing pending WebSocket write (still need to write last %d bytes)...\n",
753 						ws_client->bufpending);
754 					int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, ws_client->bufpending, LWS_WRITE_TEXT);
755 					JANUS_LOG(LOG_VERB, "  -- Sent %d/%d bytes\n", sent, ws_client->bufpending);
756 					if(sent > -1 && sent < ws_client->bufpending) {
757 						/* We still couldn't send everything that was left, we'll try and complete this in the next round */
758 						ws_client->bufpending -= sent;
759 						ws_client->bufoffset += sent;
760 					} else {
761 						/* Clear the pending/partial write queue */
762 						ws_client->bufpending = 0;
763 						ws_client->bufoffset = 0;
764 					}
765 					/* Done for this round, check the next response/notification later */
766 					lws_callback_on_writable(wsi);
767 					janus_mutex_unlock(&ws_client->mutex);
768 					return 0;
769 				}
770 				/* Shoot all the pending messages */
771 				char *event = g_async_queue_try_pop(messages);
772 				if(event && !g_atomic_int_get(&stopping)) {
773 					/* Gotcha! */
774 					int buflen = LWS_PRE + strlen(event);
775 					if(ws_client->buffer == NULL) {
776 						/* Let's allocate a shared buffer */
777 						JANUS_LOG(LOG_VERB, "Allocating %d bytes (event is %zu bytes)\n", buflen, strlen(event));
778 						ws_client->buflen = buflen;
779 						ws_client->buffer = g_malloc0(buflen);
780 					} else if(buflen > ws_client->buflen) {
781 						/* We need a larger shared buffer */
782 						JANUS_LOG(LOG_VERB, "Re-allocating to %d bytes (was %d, event is %zu bytes)\n",
783 							buflen, ws_client->buflen, strlen(event));
784 						ws_client->buflen = buflen;
785 						ws_client->buffer = g_realloc(ws_client->buffer, buflen);
786 					}
787 					memcpy(ws_client->buffer + LWS_PRE, event, strlen(event));
788 					JANUS_LOG(LOG_VERB, "Sending WebSocket message (%zu bytes)...\n", strlen(event));
789 					int sent = lws_write(wsi, ws_client->buffer + LWS_PRE, strlen(event), LWS_WRITE_TEXT);
790 					JANUS_LOG(LOG_VERB, "  -- Sent %d/%zu bytes\n", sent, strlen(event));
791 					if(sent > -1 && sent < (int)strlen(event)) {
792 						/* We couldn't send everything in a single write, we'll complete this in the next round */
793 						ws_client->bufpending = strlen(event) - sent;
794 						ws_client->bufoffset = LWS_PRE + sent;
795 						JANUS_LOG(LOG_VERB, "  -- Couldn't write all bytes (%d missing), setting offset %d\n",
796 							ws_client->bufpending, ws_client->bufoffset);
797 					}
798 					/* We can get rid of the message */
799 					free(event);
800 					/* Done for this round, check the next response/notification later */
801 					lws_callback_on_writable(wsi);
802 					janus_mutex_unlock(&ws_client->mutex);
803 					return 0;
804 				}
805 				janus_mutex_unlock(&ws_client->mutex);
806 			}
807 			return 0;
808 		}
809 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
810 		case LWS_CALLBACK_CLIENT_CLOSED: {
811 #else
812 		case LWS_CALLBACK_CLOSED: {
813 #endif
814 			reconnect_delay = 1;
815 			JANUS_LOG(LOG_INFO, "Connection to WebSocketsEventHandler backend closed (next connection attempt in %ds)\n", reconnect_delay);
816 			if(ws_client != NULL) {
817 				/* Cleanup */
818 				janus_mutex_lock(&ws_client->mutex);
819 				JANUS_LOG(LOG_INFO, "Destroying WebSocketsEventHandler client\n");
820 				ws_client->wsi = NULL;
821 				/* Free the shared buffers */
822 				g_free(ws_client->buffer);
823 				ws_client->buffer = NULL;
824 				ws_client->buflen = 0;
825 				ws_client->bufpending = 0;
826 				ws_client->bufoffset = 0;
827 				janus_mutex_unlock(&ws_client->mutex);
828 			}
829 			/* Check if we should reconnect */
830 			ws_client = NULL;
831 			wsi = NULL;
832 			disconnected = janus_get_monotonic_time();
833 			reconnect = TRUE;
834 			janus_wsevh_schedule_connect_attempt();
835 			return 0;
836 		}
837 		default:
838 			if(wsi)
839 				JANUS_LOG(LOG_HUGE, "%d (%s)\n", reason, janus_wsevh_reason_string(reason));
840 			break;
841 	}
842 	return 0;
843 }
844 
845 /* Implements the connecting attempt to the backend websocket server
846  * sets the connection result (lws_client_connect_info) to static wsi */
847 static void janus_wsevh_connect_attempt(lws_sorted_usec_list_t *sul) {
848 	struct lws_client_connect_info i = { 0 };
849 	i.host = address;
850 	i.origin = address;
851 	i.address = address;
852 	i.port = port;
853 	i.path = path;
854 	i.context = context;
855 	if(!strcasecmp(protocol, "wss"))
856 		i.ssl_connection = 1;
857 	i.ietf_version_or_minus_one = -1;
858 	i.client_exts = exts;
859 	i.protocol = protocols[0].name;
860 	JANUS_LOG(LOG_INFO, "WebSocketsEventHandler: Connecting to backend websocket server %s:%d...\n", address, port);
861 	wsi = lws_client_connect_via_info(&i);
862 	if(!wsi) {
863 		/* As we specified a callback pointer in the context the NULL result is unlikely to happen */
864 		disconnected = janus_get_monotonic_time();
865 		reconnect = TRUE;
866 		JANUS_LOG(LOG_ERR, "WebSocketsEventHandler: Connecting to backend websocket server %s:%d failed\n", address, port);
867 		return;
868 	}
869 	reconnect = FALSE;
870 }
871 
872 /* Adopts the reconnect_delay value in case of an error
873  * Increases the value up to JANUS_WSEVH_MAX_RETRY_SECS */
874 static void janus_wsevh_calculate_reconnect_delay_on_fail(void) {
875 	if(reconnect_delay == 0)
876 		reconnect_delay = 1;
877 	else if(reconnect_delay < JANUS_WSEVH_MAX_RETRY_SECS) {
878 		reconnect_delay += reconnect_delay;
879 		if(reconnect_delay > JANUS_WSEVH_MAX_RETRY_SECS)
880 			reconnect_delay = JANUS_WSEVH_MAX_RETRY_SECS;
881 	}
882 }
883 
884 /* Schedules a connect attempt using the lws scheduler as */
885 static void janus_wsevh_schedule_connect_attempt(void) {
886 	#if (LWS_LIBRARY_VERSION_MAJOR >= 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 4)
887 		lws_sul_schedule(context, 0, &sul_stagger, janus_wsevh_connect_attempt, reconnect_delay * LWS_US_PER_SEC);
888 	#endif
889 }
890