1 /*! \file   janus_websockets.c
2  * \author Lorenzo Miniero <lorenzo@meetecho.com>
3  * \copyright GNU General Public License v3
4  * \brief  Janus WebSockets transport plugin
5  * \details  This is an implementation of a WebSockets transport for the
6  * Janus API, using the libwebsockets library (http://libwebsockets.org).
7  * This means that, with the help of this module, browsers or applications
8  * (e.g., nodejs server side implementations) can also make use of
9  * WebSockets to make requests to Janus. In that case, the same
10  * WebSocket can be used for both sending requests and receiving
11  * notifications, without the need for long polls. At the same time,
12  * without the concept of a REST path, requests sent through the
13  * WebSockets interface will need to include, when needed, additional
14  * pieces of information like \c session_id and \c handle_id. That is,
15  * where you'd send a Janus request related to a specific session to the
16  * \c /janus/\<session> path, with WebSockets you'd have to send the same
17  * request with an additional \c session_id field in the JSON payload.
18  * The same applies for the handle. The JavaScript library (janus.js)
19  * implements all of this on the client side automatically.
20  * \note When you create a session using WebSockets, a subscription to
21  * the events related to it is done automatically, so no need for an
22  * explicit request as the GET in the plain HTTP API. Closing a WebSocket
23  * will also destroy all the sessions it created.
24  *
25  * \ingroup transports
26  * \ref transports
27  */
28 
29 #include "transport.h"
30 
31 #include <arpa/inet.h>
32 #include <net/if.h>
33 #include <ifaddrs.h>
34 
35 #include <libwebsockets.h>
36 
37 #include "../debug.h"
38 #include "../apierror.h"
39 #include "../config.h"
40 #include "../mutex.h"
41 #include "../utils.h"
42 
43 
44 /* Transport plugin information */
45 #define JANUS_WEBSOCKETS_VERSION			1
46 #define JANUS_WEBSOCKETS_VERSION_STRING		"0.0.1"
47 #define JANUS_WEBSOCKETS_DESCRIPTION		"This transport plugin adds WebSockets support to the Janus API via libwebsockets."
48 #define JANUS_WEBSOCKETS_NAME				"JANUS WebSockets transport plugin"
49 #define JANUS_WEBSOCKETS_AUTHOR				"Meetecho s.r.l."
50 #define JANUS_WEBSOCKETS_PACKAGE			"janus.transport.websockets"
51 
52 /* Transport methods */
53 janus_transport *create(void);
54 int janus_websockets_init(janus_transport_callbacks *callback, const char *config_path);
55 void janus_websockets_destroy(void);
56 int janus_websockets_get_api_compatibility(void);
57 int janus_websockets_get_version(void);
58 const char *janus_websockets_get_version_string(void);
59 const char *janus_websockets_get_description(void);
60 const char *janus_websockets_get_name(void);
61 const char *janus_websockets_get_author(void);
62 const char *janus_websockets_get_package(void);
63 gboolean janus_websockets_is_janus_api_enabled(void);
64 gboolean janus_websockets_is_admin_api_enabled(void);
65 int janus_websockets_send_message(janus_transport_session *transport, void *request_id, gboolean admin, json_t *message);
66 void janus_websockets_session_created(janus_transport_session *transport, guint64 session_id);
67 void janus_websockets_session_over(janus_transport_session *transport, guint64 session_id, gboolean timeout, gboolean claimed);
68 void janus_websockets_session_claimed(janus_transport_session *transport, guint64 session_id);
69 json_t *janus_websockets_query_transport(json_t *request);
70 
71 
72 /* Transport setup */
73 static janus_transport janus_websockets_transport =
74 	JANUS_TRANSPORT_INIT (
75 		.init = janus_websockets_init,
76 		.destroy = janus_websockets_destroy,
77 
78 		.get_api_compatibility = janus_websockets_get_api_compatibility,
79 		.get_version = janus_websockets_get_version,
80 		.get_version_string = janus_websockets_get_version_string,
81 		.get_description = janus_websockets_get_description,
82 		.get_name = janus_websockets_get_name,
83 		.get_author = janus_websockets_get_author,
84 		.get_package = janus_websockets_get_package,
85 
86 		.is_janus_api_enabled = janus_websockets_is_janus_api_enabled,
87 		.is_admin_api_enabled = janus_websockets_is_admin_api_enabled,
88 
89 		.send_message = janus_websockets_send_message,
90 		.session_created = janus_websockets_session_created,
91 		.session_over = janus_websockets_session_over,
92 		.session_claimed = janus_websockets_session_claimed,
93 
94 		.query_transport = janus_websockets_query_transport,
95 	);
96 
97 /* Transport creator */
create(void)98 janus_transport *create(void) {
99 	JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_WEBSOCKETS_NAME);
100 	return &janus_websockets_transport;
101 }
102 
103 
104 /* Useful stuff */
105 static gint initialized = 0, stopping = 0;
106 static janus_transport_callbacks *gateway = NULL;
107 static gboolean ws_janus_api_enabled = FALSE;
108 static gboolean ws_admin_api_enabled = FALSE;
109 static gboolean notify_events = TRUE;
110 
111 /* Clients maps */
112 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
113 static GHashTable *clients = NULL, *writable_clients = NULL;
114 #endif
115 static janus_mutex writable_mutex;
116 
117 /* JSON serialization options */
118 static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
119 
120 /* Parameter validation (for tweaking and queries via Admin API) */
121 static struct janus_json_parameter request_parameters[] = {
122 	{"request", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
123 };
124 static struct janus_json_parameter configure_parameters[] = {
125 	{"events", JANUS_JSON_BOOL, 0},
126 	{"json", JSON_STRING, 0},
127 	{"logging", JSON_STRING, 0},
128 };
129 /* Error codes (for the tweaking and queries via Admin API) */
130 #define JANUS_WEBSOCKETS_ERROR_INVALID_REQUEST		411
131 #define JANUS_WEBSOCKETS_ERROR_MISSING_ELEMENT		412
132 #define JANUS_WEBSOCKETS_ERROR_INVALID_ELEMENT		413
133 #define JANUS_WEBSOCKETS_ERROR_UNKNOWN_ERROR		499
134 
135 
136 /* Logging */
137 static int ws_log_level = 0;
janus_websockets_get_level_str(int level)138 static const char *janus_websockets_get_level_str(int level) {
139 	switch(level) {
140 		case LLL_ERR:
141 			return "ERR";
142 		case LLL_WARN:
143 			return "WARN";
144 		case LLL_NOTICE:
145 			return "NOTICE";
146 		case LLL_INFO:
147 			return "INFO";
148 		case LLL_DEBUG:
149 			return "DEBUG";
150 		case LLL_PARSER:
151 			return "PARSER";
152 		case LLL_HEADER:
153 			return "HEADER";
154 		case LLL_EXT:
155 			return "EXT";
156 		case LLL_CLIENT:
157 			return "CLIENT";
158 		case LLL_LATENCY:
159 			return "LATENCY";
160 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
161 		case LLL_USER:
162 			return "USER";
163 #endif
164 		case LLL_COUNT:
165 			return "COUNT";
166 		default:
167 			return NULL;
168 	}
169 }
janus_websockets_log_emit_function(int level,const char * line)170 static void janus_websockets_log_emit_function(int level, const char *line) {
171 	/* FIXME Do we want to use different Janus debug levels according to the level here? */
172 	JANUS_LOG(LOG_INFO, "[libwebsockets][%s] %s", janus_websockets_get_level_str(level), line);
173 }
174 
175 /* WebSockets service thread */
176 static GThread *ws_thread = NULL;
177 void *janus_websockets_thread(void *data);
178 
179 
180 /* WebSocket client session */
181 typedef struct janus_websockets_client {
182 	struct lws *wsi;						/* The libwebsockets client instance */
183 	GAsyncQueue *messages;					/* Queue of outgoing messages to push */
184 	char *incoming;							/* Buffer containing the incoming message to process (in case there are fragments) */
185 	unsigned char *buffer;					/* Buffer containing the message to send */
186 	size_t buflen;								/* Length of the buffer (may be resized after re-allocations) */
187 	size_t bufpending;							/* Data an interrupted previous write couldn't send */
188 	size_t bufoffset;							/* Offset from where the interrupted previous write should resume */
189 	volatile gint destroyed;				/* Whether this libwebsockets client instance has been closed */
190 	janus_transport_session *ts;			/* Janus core-transport session */
191 } janus_websockets_client;
192 
193 
194 /* libwebsockets WS context */
195 static struct lws_context *wsc = NULL;
196 /* Callbacks for HTTP-related events (automatically rejected) */
197 static int janus_websockets_callback_http(
198 		struct lws *wsi,
199 		enum lws_callback_reasons reason,
200 		void *user, void *in, size_t len);
201 static int janus_websockets_callback_https(
202 		struct lws *wsi,
203 		enum lws_callback_reasons reason,
204 		void *user, void *in, size_t len);
205 /* Callbacks for WebSockets-related events */
206 static int janus_websockets_callback(
207 		struct lws *wsi,
208 		enum lws_callback_reasons reason,
209 		void *user, void *in, size_t len);
210 static int janus_websockets_callback_secure(
211 		struct lws *wsi,
212 		enum lws_callback_reasons reason,
213 		void *user, void *in, size_t len);
214 static int janus_websockets_admin_callback(
215 		struct lws *wsi,
216 		enum lws_callback_reasons reason,
217 		void *user, void *in, size_t len);
218 static int janus_websockets_admin_callback_secure(
219 		struct lws *wsi,
220 		enum lws_callback_reasons reason,
221 		void *user, void *in, size_t len);
222 /* Protocol mappings */
223 static struct lws_protocols ws_protocols[] = {
224 	{ "http-only", janus_websockets_callback_http, 0, 0 },
225 	{ "janus-protocol", janus_websockets_callback, sizeof(janus_websockets_client), 0 },
226 	{ NULL, NULL, 0 }
227 };
228 static struct lws_protocols sws_protocols[] = {
229 	{ "http-only", janus_websockets_callback_https, 0, 0 },
230 	{ "janus-protocol", janus_websockets_callback_secure, sizeof(janus_websockets_client), 0 },
231 	{ NULL, NULL, 0 }
232 };
233 static struct lws_protocols admin_ws_protocols[] = {
234 	{ "http-only", janus_websockets_callback_http, 0, 0 },
235 	{ "janus-admin-protocol", janus_websockets_admin_callback, sizeof(janus_websockets_client), 0 },
236 	{ NULL, NULL, 0 }
237 };
238 static struct lws_protocols admin_sws_protocols[] = {
239 	{ "http-only", janus_websockets_callback_https, 0, 0 },
240 	{ "janus-admin-protocol", janus_websockets_admin_callback_secure, sizeof(janus_websockets_client), 0 },
241 	{ NULL, NULL, 0 }
242 };
243 /* Helper for debugging reasons */
244 #define CASE_STR(name) case name: return #name
janus_websockets_reason_string(enum lws_callback_reasons reason)245 static const char *janus_websockets_reason_string(enum lws_callback_reasons reason) {
246 	switch(reason) {
247 		CASE_STR(LWS_CALLBACK_ESTABLISHED);
248 		CASE_STR(LWS_CALLBACK_CLIENT_CONNECTION_ERROR);
249 		CASE_STR(LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH);
250 		CASE_STR(LWS_CALLBACK_CLIENT_ESTABLISHED);
251 		CASE_STR(LWS_CALLBACK_CLOSED);
252 		CASE_STR(LWS_CALLBACK_CLOSED_HTTP);
253 		CASE_STR(LWS_CALLBACK_RECEIVE);
254 		CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE);
255 		CASE_STR(LWS_CALLBACK_CLIENT_RECEIVE_PONG);
256 		CASE_STR(LWS_CALLBACK_CLIENT_WRITEABLE);
257 		CASE_STR(LWS_CALLBACK_SERVER_WRITEABLE);
258 		CASE_STR(LWS_CALLBACK_HTTP);
259 		CASE_STR(LWS_CALLBACK_HTTP_BODY);
260 		CASE_STR(LWS_CALLBACK_HTTP_BODY_COMPLETION);
261 		CASE_STR(LWS_CALLBACK_HTTP_FILE_COMPLETION);
262 		CASE_STR(LWS_CALLBACK_HTTP_WRITEABLE);
263 		CASE_STR(LWS_CALLBACK_ADD_HEADERS);
264 		CASE_STR(LWS_CALLBACK_FILTER_NETWORK_CONNECTION);
265 		CASE_STR(LWS_CALLBACK_FILTER_HTTP_CONNECTION);
266 		CASE_STR(LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED);
267 		CASE_STR(LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION);
268 		CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS);
269 		CASE_STR(LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS);
270 		CASE_STR(LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION);
271 		CASE_STR(LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER);
272 		CASE_STR(LWS_CALLBACK_CONFIRM_EXTENSION_OKAY);
273 		CASE_STR(LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED);
274 		CASE_STR(LWS_CALLBACK_PROTOCOL_INIT);
275 		CASE_STR(LWS_CALLBACK_PROTOCOL_DESTROY);
276 		CASE_STR(LWS_CALLBACK_WSI_CREATE);
277 		CASE_STR(LWS_CALLBACK_WSI_DESTROY);
278 		CASE_STR(LWS_CALLBACK_GET_THREAD_ID);
279 		CASE_STR(LWS_CALLBACK_ADD_POLL_FD);
280 		CASE_STR(LWS_CALLBACK_DEL_POLL_FD);
281 		CASE_STR(LWS_CALLBACK_CHANGE_MODE_POLL_FD);
282 		CASE_STR(LWS_CALLBACK_LOCK_POLL);
283 		CASE_STR(LWS_CALLBACK_UNLOCK_POLL);
284 		CASE_STR(LWS_CALLBACK_OPENSSL_CONTEXT_REQUIRES_PRIVATE_KEY);
285 		CASE_STR(LWS_CALLBACK_USER);
286 		CASE_STR(LWS_CALLBACK_RECEIVE_PONG);
287 		default:
288 			break;
289 	}
290 	return NULL;
291 }
292 
293 #if (LWS_LIBRARY_VERSION_MAJOR >= 4)
294 static lws_retry_bo_t pingpong = { 0 };
295 #endif
296 
297 /* Helper method to return the interface associated with a local IP address */
janus_websockets_get_interface_name(const char * ip)298 static char *janus_websockets_get_interface_name(const char *ip) {
299 	struct ifaddrs *addrs = NULL, *iap = NULL;
300 	if(getifaddrs(&addrs) == -1)
301 		return NULL;
302 	for(iap = addrs; iap != NULL; iap = iap->ifa_next) {
303 		if(iap->ifa_addr && (iap->ifa_flags & IFF_UP)) {
304 			if(iap->ifa_addr->sa_family == AF_INET) {
305 				struct sockaddr_in *sa = (struct sockaddr_in *)(iap->ifa_addr);
306 				char buffer[16];
307 				inet_ntop(iap->ifa_addr->sa_family, (void *)&(sa->sin_addr), buffer, sizeof(buffer));
308 				if(!strcmp(ip, buffer)) {
309 					char *iface = g_strdup(iap->ifa_name);
310 					freeifaddrs(addrs);
311 					return iface;
312 				}
313 			} else if(iap->ifa_addr->sa_family == AF_INET6) {
314 				struct sockaddr_in6 *sa = (struct sockaddr_in6 *)(iap->ifa_addr);
315 				char buffer[48];
316 				inet_ntop(iap->ifa_addr->sa_family, (void *)&(sa->sin6_addr), buffer, sizeof(buffer));
317 				if(!strcmp(ip, buffer)) {
318 					char *iface = g_strdup(iap->ifa_name);
319 					freeifaddrs(addrs);
320 					return iface;
321 				}
322 			}
323 		}
324 	}
325 	freeifaddrs(addrs);
326 	return NULL;
327 }
328 
329 /* Custom Access-Control-Allow-Origin value, if specified */
330 static char *allow_origin = NULL;
331 static gboolean enforce_cors = FALSE;
332 
333 /* WebSockets ACL list for both Janus and Admin API */
334 static GList *janus_websockets_access_list = NULL, *janus_websockets_admin_access_list = NULL;
335 static janus_mutex access_list_mutex;
janus_websockets_allow_address(const char * ip,gboolean admin)336 static void janus_websockets_allow_address(const char *ip, gboolean admin) {
337 	if(ip == NULL)
338 		return;
339 	/* Is this an IP or an interface? */
340 	janus_mutex_lock(&access_list_mutex);
341 	if(!admin)
342 		janus_websockets_access_list = g_list_append(janus_websockets_access_list, (gpointer)ip);
343 	else
344 		janus_websockets_admin_access_list = g_list_append(janus_websockets_admin_access_list, (gpointer)ip);
345 	janus_mutex_unlock(&access_list_mutex);
346 }
janus_websockets_is_allowed(const char * ip,gboolean admin)347 static gboolean janus_websockets_is_allowed(const char *ip, gboolean admin) {
348 	JANUS_LOG(LOG_VERB, "Checking if %s is allowed to contact %s interface\n", ip, admin ? "admin" : "janus");
349 	if(ip == NULL)
350 		return FALSE;
351 	if(!admin && janus_websockets_access_list == NULL) {
352 		JANUS_LOG(LOG_VERB, "Yep\n");
353 		return TRUE;
354 	}
355 	if(admin && janus_websockets_admin_access_list == NULL) {
356 		JANUS_LOG(LOG_VERB, "Yeah\n");
357 		return TRUE;
358 	}
359 	janus_mutex_lock(&access_list_mutex);
360 	GList *temp = admin ? janus_websockets_admin_access_list : janus_websockets_access_list;
361 	while(temp) {
362 		const char *allowed = (const char *)temp->data;
363 		if(allowed != NULL && strstr(ip, allowed)) {
364 			janus_mutex_unlock(&access_list_mutex);
365 			return TRUE;
366 		}
367 		temp = temp->next;
368 	}
369 	janus_mutex_unlock(&access_list_mutex);
370 	JANUS_LOG(LOG_VERB, "Nope...\n");
371 	return FALSE;
372 }
373 
janus_websockets_create_ws_server(janus_config * config,janus_config_container * config_container,janus_config_container * config_certs,const char * prefix,const char * name,struct lws_protocols ws_protocols[],gboolean secure,uint16_t default_port)374 static struct lws_vhost* janus_websockets_create_ws_server(
375 		janus_config *config,
376 		janus_config_container *config_container,
377 		janus_config_container *config_certs,
378 		const char *prefix,
379 		const char *name,
380 		struct lws_protocols ws_protocols[],
381 		gboolean secure,
382 		uint16_t default_port)
383 {
384 	janus_config_item *item;
385 	char item_name[255];
386 #ifdef __FreeBSD__
387 	int ipv4_only = 0;
388 #endif
389 
390 	item = janus_config_get(config, config_container, janus_config_type_item, prefix);
391 	if(!item || !item->value || !janus_is_true(item->value)) {
392 		JANUS_LOG(LOG_VERB, "%s server disabled\n", name);
393 		return NULL;
394 	}
395 
396 	uint16_t wsport = default_port;
397 	g_snprintf(item_name, 255, "%s_port", prefix);
398 	item = janus_config_get(config, config_container, janus_config_type_item, item_name);
399 	if(item && item->value && janus_string_to_uint16(item->value, &wsport) < 0) {
400 		JANUS_LOG(LOG_ERR, "Invalid port (%s), falling back to default\n", item->value);
401 		wsport = default_port;
402 	}
403 
404 	char *interface = NULL;
405 	g_snprintf(item_name, 255, "%s_interface", prefix);
406 	item = janus_config_get(config, config_container, janus_config_type_item, item_name);
407 	if(item && item->value)
408 		interface = (char *)item->value;
409 
410 	char *ip = NULL;
411 	g_snprintf(item_name, 255, "%s_ip", prefix);
412 	item = janus_config_get(config, config_container, janus_config_type_item, item_name);
413 	if(item && item->value) {
414 		ip = (char *)item->value;
415 #ifdef __FreeBSD__
416 		struct in_addr addr;
417 		if(inet_net_pton(AF_INET, ip, &addr, sizeof(addr)) > 0)
418 			ipv4_only = 1;
419 #endif
420 		char *iface = janus_websockets_get_interface_name(ip);
421 		if(iface == NULL) {
422 			JANUS_LOG(LOG_WARN, "No interface associated with %s? Falling back to no interface...\n", ip);
423 		}
424 		ip = iface;
425 	}
426 
427 	g_snprintf(item_name, 255, "%s_unix", prefix);
428 	item = janus_config_get(config, config_container, janus_config_type_item, item_name);
429 #if defined(LWS_USE_UNIX_SOCK) || defined(LWS_WITH_UNIX_SOCK)
430 	char *unixpath = NULL;
431 	if(item && item->value)
432 		unixpath = (char *)item->value;
433 #else
434 	if(item && item->value)
435 		JANUS_LOG(LOG_WARN, "WebSockets option '%s' is not supported because libwebsockets compiled without UNIX sockets\n", item_name);
436 #endif
437 
438 	char *server_pem = NULL;
439 	char *server_key = NULL;
440 	char *password = NULL;
441 	char *ciphers = NULL;
442 
443 	if (secure) {
444 		item = janus_config_get(config, config_certs, janus_config_type_item, "cert_pem");
445 		if(!item || !item->value) {
446 			JANUS_LOG(LOG_FATAL, "Missing certificate/key path\n");
447 			return NULL;
448 		}
449 		server_pem = (char *)item->value;
450 		server_key = (char *)item->value;
451 		item = janus_config_get(config, config_certs, janus_config_type_item, "cert_key");
452 		if(item && item->value)
453 			server_key = (char *)item->value;
454 		item = janus_config_get(config, config_certs, janus_config_type_item, "cert_pwd");
455 		if(item && item->value)
456 			password = (char *)item->value;
457 		JANUS_LOG(LOG_VERB, "Using certificates:\n\t%s\n\t%s\n", server_pem, server_key);
458 		item = janus_config_get(config, config_certs, janus_config_type_item, "ciphers");
459 		if(item && item->value)
460 			ciphers = (char *)item->value;
461 	}
462 
463 	/* Prepare context */
464 	struct lws_context_creation_info info;
465 	memset(&info, 0, sizeof info);
466 #if defined(LWS_USE_UNIX_SOCK) || defined(LWS_WITH_UNIX_SOCK)
467 	info.port = unixpath ? 0 : wsport;
468 	info.iface = unixpath ? unixpath : (ip ? ip : interface);
469 #else
470 	info.port = wsport;
471 	info.iface = ip ? ip : interface;
472 #endif
473 	info.protocols = ws_protocols;
474 	info.extensions = NULL;
475 	info.ssl_cert_filepath = server_pem;
476 	info.ssl_private_key_filepath = server_key;
477 	info.ssl_private_key_password = password;
478 	info.ssl_cipher_list = ciphers;
479 	info.gid = -1;
480 	info.uid = -1;
481 	info.options = 0;
482 
483 	if (server_pem) {
484 #if (LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR > 3)
485 		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_FAIL_UPON_UNABLE_TO_BIND;
486 #elif LWS_LIBRARY_VERSION_MAJOR >= 2
487 		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
488 #endif
489 	}
490 
491 #ifdef __FreeBSD__
492 	if (ipv4_only) {
493 		info.options |= LWS_SERVER_OPTION_DISABLE_IPV6;
494 		ipv4_only = 0;
495 	}
496 #endif
497 #if defined(LWS_USE_UNIX_SOCK) || defined(LWS_WITH_UNIX_SOCK)
498 	if (unixpath)
499 		info.options |= LWS_SERVER_OPTION_UNIX_SOCK;
500 #endif
501 #if (LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR > 3)
502 	info.options |= LWS_SERVER_OPTION_FAIL_UPON_UNABLE_TO_BIND;
503 
504 #endif
505 	/* Create the WebSocket context */
506 	struct lws_vhost *vhost = lws_create_vhost(wsc, &info);
507 	if(vhost == NULL) {
508 		JANUS_LOG(LOG_FATAL, "Error creating vhost for %s server...\n", name);
509 #if defined(LWS_USE_UNIX_SOCK) || defined(LWS_WITH_UNIX_SOCK)
510 	} else if (unixpath) {
511 		JANUS_LOG(LOG_INFO, "%s server started (UNIX socket %s)...\n", name, unixpath);
512 #endif
513 	} else {
514 		JANUS_LOG(LOG_INFO, "%s server started (port %d)...\n", name, wsport);
515 	}
516 	g_free(ip);
517 	return vhost;
518 }
519 
520 /* Transport implementation */
janus_websockets_init(janus_transport_callbacks * callback,const char * config_path)521 int janus_websockets_init(janus_transport_callbacks *callback, const char *config_path) {
522 	if(g_atomic_int_get(&stopping)) {
523 		/* Still stopping from before */
524 		return -1;
525 	}
526 	if(callback == NULL || config_path == NULL) {
527 		/* Invalid arguments */
528 		return -1;
529 	}
530 
531 #ifndef LWS_WITH_IPV6
532 	JANUS_LOG(LOG_WARN, "libwebsockets has been built without IPv6 support, will bind to IPv4 only\n");
533 #endif
534 
535 	/* This is the callback we'll need to invoke to contact the Janus core */
536 	gateway = callback;
537 
538 	/* Prepare the common context */
539 	struct lws_context_creation_info wscinfo;
540 	memset(&wscinfo, 0, sizeof wscinfo);
541 	wscinfo.options |= LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
542 
543 	/* We use vhosts on the same context to address both APIs, secure or not */
544 	struct lws_vhost *wss = NULL, *swss = NULL,
545 		*admin_wss = NULL, *admin_swss = NULL;
546 
547 	/* Read configuration */
548 	char filename[255];
549 	g_snprintf(filename, 255, "%s/%s.jcfg", config_path, JANUS_WEBSOCKETS_PACKAGE);
550 	JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
551 	janus_config *config = janus_config_parse(filename);
552 	if(config == NULL) {
553 		JANUS_LOG(LOG_WARN, "Couldn't find .jcfg configuration file (%s), trying .cfg\n", JANUS_WEBSOCKETS_PACKAGE);
554 		g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_WEBSOCKETS_PACKAGE);
555 		JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
556 		config = janus_config_parse(filename);
557 	}
558 	if(config != NULL) {
559 		janus_config_print(config);
560 		janus_config_category *config_general = janus_config_get_create(config, NULL, janus_config_type_category, "general");
561 		janus_config_category *config_admin = janus_config_get_create(config, NULL, janus_config_type_category, "admin");
562 		janus_config_category *config_cors = janus_config_get_create(config, NULL, janus_config_type_category, "cors");
563 		janus_config_category *config_certs = janus_config_get_create(config, NULL, janus_config_type_category, "certificates");
564 
565 		/* Handle configuration */
566 		janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "json");
567 		if(item && item->value) {
568 			/* Check how we need to format/serialize the JSON output */
569 			if(!strcasecmp(item->value, "indented")) {
570 				/* Default: indented, we use three spaces for that */
571 				json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
572 			} else if(!strcasecmp(item->value, "plain")) {
573 				/* Not indented and no new lines, but still readable */
574 				json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
575 			} else if(!strcasecmp(item->value, "compact")) {
576 				/* Compact, so no spaces between separators */
577 				json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
578 			} else {
579 				JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
580 				json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
581 			}
582 		}
583 
584 		/* Check if we need to send events to handlers */
585 		janus_config_item *events = janus_config_get(config, config_general, janus_config_type_item, "events");
586 		if(events != NULL && events->value != NULL)
587 			notify_events = janus_is_true(events->value);
588 		if(!notify_events && callback->events_is_enabled()) {
589 			JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_WEBSOCKETS_NAME);
590 		}
591 
592 		item = janus_config_get(config, config_general, janus_config_type_item, "ws_logging");
593 		if(item && item->value) {
594 			/* libwebsockets uses a mask to set log levels, as documented here:
595 			 * https://libwebsockets.org/lws-api-doc-master/html/group__log.html */
596 			if(strstr(item->value, "none")) {
597 				/* Disable libwebsockets logging completely (the default) */
598 			} else if(strstr(item->value, "all")) {
599 				/* Enable all libwebsockets logging */
600 				ws_log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO |
601 					LLL_DEBUG | LLL_PARSER | LLL_HEADER | LLL_EXT |
602 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
603 					LLL_CLIENT | LLL_LATENCY | LLL_USER | LLL_COUNT;
604 #else
605 					LLL_CLIENT | LLL_LATENCY | LLL_COUNT;
606 #endif
607 			} else {
608 				/* Only enable some of the properties */
609 				if(strstr(item->value, "err"))
610 					ws_log_level |= LLL_ERR;
611 				if(strstr(item->value, "warn"))
612 					ws_log_level |= LLL_WARN;
613 				if(strstr(item->value, "notice"))
614 					ws_log_level |= LLL_NOTICE;
615 				if(strstr(item->value, "info"))
616 					ws_log_level |= LLL_INFO;
617 				if(strstr(item->value, "debug"))
618 					ws_log_level |= LLL_DEBUG;
619 				if(strstr(item->value, "parser"))
620 					ws_log_level |= LLL_PARSER;
621 				if(strstr(item->value, "header"))
622 					ws_log_level |= LLL_HEADER;
623 				if(strstr(item->value, "ext"))
624 					ws_log_level |= LLL_EXT;
625 				if(strstr(item->value, "client"))
626 					ws_log_level |= LLL_CLIENT;
627 				if(strstr(item->value, "latency"))
628 					ws_log_level |= LLL_LATENCY;
629 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
630 				if(strstr(item->value, "user"))
631 					ws_log_level |= LLL_USER;
632 #endif
633 				if(strstr(item->value, "count"))
634 					ws_log_level |= LLL_COUNT;
635 			}
636 		}
637 		JANUS_LOG(LOG_INFO, "libwebsockets logging: %d\n", ws_log_level);
638 		lws_set_log_level(ws_log_level, janus_websockets_log_emit_function);
639 
640 		/* Any ACL for either the Janus or Admin API? */
641 		item = janus_config_get(config, config_general, janus_config_type_item, "ws_acl");
642 		if(item && item->value) {
643 			gchar **list = g_strsplit(item->value, ",", -1);
644 			gchar *index = list[0];
645 			if(index != NULL) {
646 				int i=0;
647 				while(index != NULL) {
648 					if(strlen(index) > 0) {
649 						JANUS_LOG(LOG_INFO, "Adding '%s' to the Janus API allowed list...\n", index);
650 						janus_websockets_allow_address(g_strdup(index), FALSE);
651 					}
652 					i++;
653 					index = list[i];
654 				}
655 			}
656 			g_strfreev(list);
657 			list = NULL;
658 		}
659 		item = janus_config_get(config, config_admin, janus_config_type_item, "admin_ws_acl");
660 		if(item && item->value) {
661 			gchar **list = g_strsplit(item->value, ",", -1);
662 			gchar *index = list[0];
663 			if(index != NULL) {
664 				int i=0;
665 				while(index != NULL) {
666 					if(strlen(index) > 0) {
667 						JANUS_LOG(LOG_INFO, "Adding '%s' to the Admin/monitor allowed list...\n", index);
668 						janus_websockets_allow_address(g_strdup(index), TRUE);
669 					}
670 					i++;
671 					index = list[i];
672 				}
673 			}
674 			g_strfreev(list);
675 			list = NULL;
676 		}
677 
678 		/* Any custom value for the Access-Control-Allow-Origin header? */
679 		item = janus_config_get(config, config_cors, janus_config_type_item, "allow_origin");
680 		if(item && item->value) {
681 			allow_origin = g_strdup(item->value);
682 			JANUS_LOG(LOG_INFO, "Restricting Access-Control-Allow-Origin to '%s'\n", allow_origin);
683 		}
684 		if(allow_origin != NULL) {
685 			item = janus_config_get(config, config_cors, janus_config_type_item, "enforce_cors");
686 			if(item && item->value && janus_is_true(item->value)) {
687 				enforce_cors = TRUE;
688 				JANUS_LOG(LOG_INFO, "Going to enforce CORS by rejecting WebSocket connections\n");
689 			}
690 		}
691 
692 		/* Check if we need to enable the transport level ping/pong mechanism */
693 		int pingpong_trigger = 0, pingpong_timeout = 0;
694 		item = janus_config_get(config, config_general, janus_config_type_item, "pingpong_trigger");
695 		if(item && item->value) {
696 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 1) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
697 			pingpong_trigger = atoi(item->value);
698 			if(pingpong_trigger < 0) {
699 				JANUS_LOG(LOG_WARN, "Invalid value for pingpong_trigger (%d), ignoring...\n", pingpong_trigger);
700 				pingpong_trigger = 0;
701 			}
702 #else
703 			JANUS_LOG(LOG_WARN, "WebSockets ping/pong only supported in libwebsockets >= 2.1\n");
704 #endif
705 		}
706 		item = janus_config_get(config, config_general, janus_config_type_item, "pingpong_timeout");
707 		if(item && item->value) {
708 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 1) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
709 			pingpong_timeout = atoi(item->value);
710 			if(pingpong_timeout < 0) {
711 				JANUS_LOG(LOG_WARN, "Invalid value for pingpong_timeout (%d), ignoring...\n", pingpong_timeout);
712 				pingpong_timeout = 0;
713 			}
714 #else
715 			JANUS_LOG(LOG_WARN, "WebSockets ping/pong only supported in libwebsockets >= 2.1\n");
716 #endif
717 		}
718 		if((pingpong_trigger && !pingpong_timeout) || (!pingpong_trigger && pingpong_timeout)) {
719 			JANUS_LOG(LOG_WARN, "pingpong_trigger and pingpong_timeout not both set, ignoring...\n");
720 		}
721 #if (LWS_LIBRARY_VERSION_MAJOR >= 4)
722 		/* libwebsockets 4 has a different API, that works differently
723 		 * https://github.com/warmcat/libwebsockets/blob/master/READMEs/README.lws_retry.md */
724 		if(pingpong_trigger > 0 && pingpong_timeout > 0) {
725 			pingpong.secs_since_valid_ping = pingpong_trigger;
726 			pingpong.secs_since_valid_hangup = pingpong_trigger + pingpong_timeout;
727 			wscinfo.retry_and_idle_policy = &pingpong;
728 		}
729 #else
730 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 1) || (LWS_LIBRARY_VERSION_MAJOR == 3)
731 		if(pingpong_trigger > 0 && pingpong_timeout > 0) {
732 			wscinfo.ws_ping_pong_interval = pingpong_trigger;
733 			wscinfo.timeout_secs = pingpong_timeout;
734 		}
735 #endif
736 #endif
737 		/* Force single-thread server */
738 		wscinfo.count_threads = 1;
739 
740 		/* Create the base context */
741 		wsc = lws_create_context(&wscinfo);
742 		if(wsc == NULL) {
743 			JANUS_LOG(LOG_ERR, "Error creating libwebsockets context...\n");
744 			janus_config_destroy(config);
745 			return -1;	/* No point in keeping the plugin loaded */
746 		}
747 
748 		/* Setup the Janus API WebSockets server(s) */
749 		wss = janus_websockets_create_ws_server(config, config_general, NULL, "ws",
750 				"Websockets", ws_protocols, FALSE, 8188);
751 		swss = janus_websockets_create_ws_server(config, config_general, config_certs, "wss",
752 				"Secure Websockets", sws_protocols, TRUE, 8989);
753 		/* Do the same for the Admin API, if enabled */
754 		admin_wss = janus_websockets_create_ws_server(config, config_admin, NULL, "admin_ws",
755 				"Admin Websockets", admin_ws_protocols, FALSE, 7188);
756 		admin_swss = janus_websockets_create_ws_server(config, config_admin, config_certs, "admin_wss",
757 				"Secure Admin Websockets", admin_sws_protocols, TRUE, 7989);
758 	}
759 	janus_config_destroy(config);
760 	config = NULL;
761 	if(!wss && !swss && !admin_wss && !admin_swss) {
762 		JANUS_LOG(LOG_WARN, "No WebSockets server started, giving up...\n");
763 		lws_context_destroy(wsc);
764 		return -1;	/* No point in keeping the plugin loaded */
765 	}
766 	ws_janus_api_enabled = wss || swss;
767 	ws_admin_api_enabled = admin_wss || admin_swss;
768 
769 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
770 	clients = g_hash_table_new(NULL, NULL);
771 	writable_clients = g_hash_table_new(NULL, NULL);
772 #endif
773 	janus_mutex_init(&writable_mutex);
774 
775 	g_atomic_int_set(&initialized, 1);
776 
777 	GError *error = NULL;
778 	/* Start the WebSocket service thread */
779 	if(ws_janus_api_enabled || ws_admin_api_enabled) {
780 		ws_thread = g_thread_try_new("ws thread", &janus_websockets_thread, wsc, &error);
781 		if(error != NULL) {
782 			g_atomic_int_set(&initialized, 0);
783 			JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the WebSockets thread...\n",
784 				error->code, error->message ? error->message : "??");
785 			g_error_free(error);
786 			return -1;
787 		}
788 	}
789 
790 	/* Done */
791 	JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_WEBSOCKETS_NAME);
792 	return 0;
793 }
794 
janus_websockets_destroy(void)795 void janus_websockets_destroy(void) {
796 	if(!g_atomic_int_get(&initialized))
797 		return;
798 	g_atomic_int_set(&stopping, 1);
799 #if ((LWS_LIBRARY_VERSION_MAJOR == 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || LWS_LIBRARY_VERSION_MAJOR >= 4)
800 	lws_cancel_service(wsc);
801 #endif
802 
803 	/* Stop the service thread */
804 	if(ws_thread != NULL) {
805 		g_thread_join(ws_thread);
806 		ws_thread = NULL;
807 	}
808 
809 	/* Destroy the context */
810 	if(wsc != NULL) {
811 		lws_context_destroy(wsc);
812 		wsc = NULL;
813 	}
814 
815 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
816 	janus_mutex_lock(&writable_mutex);
817 	g_hash_table_destroy(clients);
818 	clients = NULL;
819 	g_hash_table_destroy(writable_clients);
820 	writable_clients = NULL;
821 	janus_mutex_unlock(&writable_mutex);
822 #endif
823 
824 	g_atomic_int_set(&initialized, 0);
825 	g_atomic_int_set(&stopping, 0);
826 	JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_WEBSOCKETS_NAME);
827 }
828 
janus_websockets_destroy_client(janus_websockets_client * ws_client,struct lws * wsi,const char * log_prefix)829 static void janus_websockets_destroy_client(
830 		janus_websockets_client *ws_client,
831 		struct lws *wsi,
832 		const char *log_prefix) {
833 	if(!ws_client || !ws_client->ts)
834 		return;
835 	janus_mutex_lock(&ws_client->ts->mutex);
836 	if(!g_atomic_int_compare_and_exchange(&ws_client->destroyed, 0, 1)) {
837 		janus_mutex_unlock(&ws_client->ts->mutex);
838 		return;
839 	}
840 	/* Cleanup */
841 	JANUS_LOG(LOG_INFO, "[%s-%p] Destroying WebSocket client\n", log_prefix, wsi);
842 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
843 	janus_mutex_lock(&writable_mutex);
844 	g_hash_table_remove(clients, ws_client);
845 	g_hash_table_remove(writable_clients, ws_client);
846 	janus_mutex_unlock(&writable_mutex);
847 #endif
848 	ws_client->wsi = NULL;
849 	/* Notify handlers about this transport being gone */
850 	if(notify_events && gateway->events_is_enabled()) {
851 		json_t *info = json_object();
852 		json_object_set_new(info, "event", json_string("disconnected"));
853 		gateway->notify_event(&janus_websockets_transport, ws_client->ts, info);
854 	}
855 	ws_client->ts->transport_p = NULL;
856 	/* Remove messages queue too, if needed */
857 	if(ws_client->messages != NULL) {
858 		char *response = NULL;
859 		while((response = g_async_queue_try_pop(ws_client->messages)) != NULL) {
860 			g_free(response);
861 		}
862 		g_async_queue_unref(ws_client->messages);
863 	}
864 	/* ... and the shared buffers */
865 	g_free(ws_client->incoming);
866 	ws_client->incoming = NULL;
867 	g_free(ws_client->buffer);
868 	ws_client->buffer = NULL;
869 	ws_client->buflen = 0;
870 	ws_client->bufpending = 0;
871 	ws_client->bufoffset = 0;
872 	janus_mutex_unlock(&ws_client->ts->mutex);
873 	/* Notify core */
874 	gateway->transport_gone(&janus_websockets_transport, ws_client->ts);
875 	janus_transport_session_destroy(ws_client->ts);
876 }
877 
janus_websockets_get_api_compatibility(void)878 int janus_websockets_get_api_compatibility(void) {
879 	/* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
880 	return JANUS_TRANSPORT_API_VERSION;
881 }
882 
janus_websockets_get_version(void)883 int janus_websockets_get_version(void) {
884 	return JANUS_WEBSOCKETS_VERSION;
885 }
886 
janus_websockets_get_version_string(void)887 const char *janus_websockets_get_version_string(void) {
888 	return JANUS_WEBSOCKETS_VERSION_STRING;
889 }
890 
janus_websockets_get_description(void)891 const char *janus_websockets_get_description(void) {
892 	return JANUS_WEBSOCKETS_DESCRIPTION;
893 }
894 
janus_websockets_get_name(void)895 const char *janus_websockets_get_name(void) {
896 	return JANUS_WEBSOCKETS_NAME;
897 }
898 
janus_websockets_get_author(void)899 const char *janus_websockets_get_author(void) {
900 	return JANUS_WEBSOCKETS_AUTHOR;
901 }
902 
janus_websockets_get_package(void)903 const char *janus_websockets_get_package(void) {
904 	return JANUS_WEBSOCKETS_PACKAGE;
905 }
906 
janus_websockets_is_janus_api_enabled(void)907 gboolean janus_websockets_is_janus_api_enabled(void) {
908 	return ws_janus_api_enabled;
909 }
910 
janus_websockets_is_admin_api_enabled(void)911 gboolean janus_websockets_is_admin_api_enabled(void) {
912 	return ws_admin_api_enabled;
913 }
914 
janus_websockets_send_message(janus_transport_session * transport,void * request_id,gboolean admin,json_t * message)915 int janus_websockets_send_message(janus_transport_session *transport, void *request_id, gboolean admin, json_t *message) {
916 	if(message == NULL)
917 		return -1;
918 	if(transport == NULL || g_atomic_int_get(&transport->destroyed)) {
919 		json_decref(message);
920 		return -1;
921 	}
922 	janus_mutex_lock(&transport->mutex);
923 	janus_websockets_client *client = (janus_websockets_client *)transport->transport_p;
924 	if(!client || !client->wsi || g_atomic_int_get(&client->destroyed)) {
925 		json_decref(message);
926 		janus_mutex_unlock(&transport->mutex);
927 		return -1;
928 	}
929 	/* Convert to string and enqueue */
930 	char *payload = json_dumps(message, json_format);
931 	if(payload == NULL) {
932 		JANUS_LOG(LOG_ERR, "Failed to stringify message...\n");
933 		json_decref(message);
934 		janus_mutex_unlock(&transport->mutex);
935 		return -1;
936 	}
937 	g_async_queue_push(client->messages, payload);
938 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
939 	/* On libwebsockets >= 3.x we use lws_cancel_service */
940 	janus_mutex_lock(&writable_mutex);
941 	if(g_hash_table_lookup(clients, client) == client)
942 		g_hash_table_insert(writable_clients, client, client);
943 	janus_mutex_unlock(&writable_mutex);
944 	lws_cancel_service(wsc);
945 #else
946 	/* On libwebsockets < 3.x we use lws_callback_on_writable */
947 	janus_mutex_lock(&writable_mutex);
948 	lws_callback_on_writable(client->wsi);
949 	janus_mutex_unlock(&writable_mutex);
950 #endif
951 	janus_mutex_unlock(&transport->mutex);
952 	json_decref(message);
953 	return 0;
954 }
955 
janus_websockets_session_created(janus_transport_session * transport,guint64 session_id)956 void janus_websockets_session_created(janus_transport_session *transport, guint64 session_id) {
957 	/* We don't care */
958 }
959 
janus_websockets_session_over(janus_transport_session * transport,guint64 session_id,gboolean timeout,gboolean claimed)960 void janus_websockets_session_over(janus_transport_session *transport, guint64 session_id, gboolean timeout, gboolean claimed) {
961 	/* We don't care either: transport timeouts can be detected using the ping/pong mechanism */
962 }
963 
janus_websockets_session_claimed(janus_transport_session * transport,guint64 session_id)964 void janus_websockets_session_claimed(janus_transport_session *transport, guint64 session_id) {
965 	/* We don't care about this. We should start receiving messages from the core about this session: no action necessary */
966 	/* FIXME Is the above statement accurate? Should we care? Unlike the HTTP transport, there is no hashtable to update */
967 }
968 
janus_websockets_query_transport(json_t * request)969 json_t *janus_websockets_query_transport(json_t *request) {
970 	if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
971 		return NULL;
972 	}
973 	/* We can use this request to dynamically change the behaviour of
974 	 * the transport plugin, and/or query for some specific information */
975 	json_t *response = json_object();
976 	int error_code = 0;
977 	char error_cause[512];
978 	JANUS_VALIDATE_JSON_OBJECT(request, request_parameters,
979 		error_code, error_cause, TRUE,
980 		JANUS_WEBSOCKETS_ERROR_MISSING_ELEMENT, JANUS_WEBSOCKETS_ERROR_INVALID_ELEMENT);
981 	if(error_code != 0)
982 		goto plugin_response;
983 	/* Get the request */
984 	const char *request_text = json_string_value(json_object_get(request, "request"));
985 	if(!strcasecmp(request_text, "configure")) {
986 		/* We only allow for the configuration of some basic properties:
987 		 * changing more complex things (e.g., port to bind to, etc.)
988 		 * would likely require restarting backends, so just too much */
989 		JANUS_VALIDATE_JSON_OBJECT(request, configure_parameters,
990 			error_code, error_cause, TRUE,
991 			JANUS_WEBSOCKETS_ERROR_MISSING_ELEMENT, JANUS_WEBSOCKETS_ERROR_INVALID_ELEMENT);
992 		/* Check if we now need to send events to handlers */
993 		json_object_set_new(response, "result", json_integer(200));
994 		json_t *notes = NULL;
995 		gboolean events = json_is_true(json_object_get(request, "events"));
996 		if(events && !gateway->events_is_enabled()) {
997 			/* Notify that this will be ignored */
998 			notes = json_array();
999 			json_array_append_new(notes, json_string("Event handlers disabled at the core level"));
1000 			json_object_set_new(response, "notes", notes);
1001 		}
1002 		if(events != notify_events) {
1003 			notify_events = events;
1004 			if(!notify_events && gateway->events_is_enabled()) {
1005 				JANUS_LOG(LOG_WARN, "Notification of events to handlers disabled for %s\n", JANUS_WEBSOCKETS_NAME);
1006 			}
1007 		}
1008 		const char *indentation = json_string_value(json_object_get(request, "json"));
1009 		if(indentation != NULL) {
1010 			if(!strcasecmp(indentation, "indented")) {
1011 				/* Default: indented, we use three spaces for that */
1012 				json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
1013 			} else if(!strcasecmp(indentation, "plain")) {
1014 				/* Not indented and no new lines, but still readable */
1015 				json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
1016 			} else if(!strcasecmp(indentation, "compact")) {
1017 				/* Compact, so no spaces between separators */
1018 				json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
1019 			} else {
1020 				JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', ignoring tweak\n", indentation);
1021 				/* Notify that this will be ignored */
1022 				if(notes == NULL) {
1023 					notes = json_array();
1024 					json_object_set_new(response, "notes", notes);
1025 				}
1026 				json_array_append_new(notes, json_string("Ignored unsupported indentation format"));
1027 			}
1028 		}
1029 		const char *logging = json_string_value(json_object_get(request, "logging"));
1030 		if(logging != NULL) {
1031 			/* libwebsockets uses a mask to set log levels, as documented here:
1032 			 * https://libwebsockets.org/lws-api-doc-master/html/group__log.html */
1033 			if(strstr(logging, "none")) {
1034 				/* Disable libwebsockets logging completely (the default) */
1035 			} else if(strstr(logging, "all")) {
1036 				/* Enable all libwebsockets logging */
1037 				ws_log_level = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO |
1038 					LLL_DEBUG | LLL_PARSER | LLL_HEADER | LLL_EXT |
1039 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
1040 					LLL_CLIENT | LLL_LATENCY | LLL_USER | LLL_COUNT;
1041 #else
1042 					LLL_CLIENT | LLL_LATENCY | LLL_COUNT;
1043 #endif
1044 			} else {
1045 				/* Only enable some of the properties */
1046 				ws_log_level = 0;
1047 				if(strstr(logging, "err"))
1048 					ws_log_level |= LLL_ERR;
1049 				if(strstr(logging, "warn"))
1050 					ws_log_level |= LLL_WARN;
1051 				if(strstr(logging, "notice"))
1052 					ws_log_level |= LLL_NOTICE;
1053 				if(strstr(logging, "info"))
1054 					ws_log_level |= LLL_INFO;
1055 				if(strstr(logging, "debug"))
1056 					ws_log_level |= LLL_DEBUG;
1057 				if(strstr(logging, "parser"))
1058 					ws_log_level |= LLL_PARSER;
1059 				if(strstr(logging, "header"))
1060 					ws_log_level |= LLL_HEADER;
1061 				if(strstr(logging, "ext"))
1062 					ws_log_level |= LLL_EXT;
1063 				if(strstr(logging, "client"))
1064 					ws_log_level |= LLL_CLIENT;
1065 				if(strstr(logging, "latency"))
1066 					ws_log_level |= LLL_LATENCY;
1067 #if (LWS_LIBRARY_VERSION_MAJOR >= 2 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 3)
1068 				if(strstr(logging, "user"))
1069 					ws_log_level |= LLL_USER;
1070 #endif
1071 				if(strstr(logging, "count"))
1072 					ws_log_level |= LLL_COUNT;
1073 			}
1074 			JANUS_LOG(LOG_INFO, "libwebsockets logging: %d\n", ws_log_level);
1075 			lws_set_log_level(ws_log_level, janus_websockets_log_emit_function);
1076 		}
1077 	} else if(!strcasecmp(request_text, "connections")) {
1078 		/* Return the number of active connections currently handled by the plugin */
1079 		json_object_set_new(response, "result", json_integer(200));
1080 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
1081 		janus_mutex_lock(&writable_mutex);
1082 		guint connections = g_hash_table_size(clients);
1083 		janus_mutex_unlock(&writable_mutex);
1084 		json_object_set_new(response, "connections", json_integer(connections));
1085 #endif
1086 	} else {
1087 		JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);
1088 		error_code = JANUS_WEBSOCKETS_ERROR_INVALID_REQUEST;
1089 		g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);
1090 	}
1091 
1092 plugin_response:
1093 		{
1094 			if(error_code != 0) {
1095 				/* Prepare JSON error event */
1096 				json_object_set_new(response, "error_code", json_integer(error_code));
1097 				json_object_set_new(response, "error", json_string(error_cause));
1098 			}
1099 			return response;
1100 		}
1101 }
1102 
1103 
1104 /* Thread */
janus_websockets_thread(void * data)1105 void *janus_websockets_thread(void *data) {
1106 	struct lws_context *service = (struct lws_context *)data;
1107 	if(service == NULL) {
1108 		JANUS_LOG(LOG_ERR, "Invalid service\n");
1109 		return NULL;
1110 	}
1111 
1112 	JANUS_LOG(LOG_INFO, "WebSockets thread started\n");
1113 
1114 	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
1115 		/* libwebsockets is single thread, we cycle through events here */
1116 		lws_service(service, 50);
1117 	}
1118 
1119 	/* Get rid of the WebSockets server */
1120 	lws_cancel_service(service);
1121 	/* Done */
1122 	JANUS_LOG(LOG_INFO, "WebSockets thread ended\n");
1123 	return NULL;
1124 }
1125 
1126 
1127 /* WebSockets */
janus_websockets_callback_http(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1128 static int janus_websockets_callback_http(
1129 		struct lws *wsi,
1130 		enum lws_callback_reasons reason,
1131 		void *user, void *in, size_t len)
1132 {
1133 	/* This endpoint cannot be used for HTTP */
1134 	switch(reason) {
1135 		case LWS_CALLBACK_HTTP:
1136 			JANUS_LOG(LOG_VERB, "Rejecting incoming HTTP request on WebSockets endpoint\n");
1137 			lws_return_http_status(wsi, 403, NULL);
1138 			/* Close and free connection */
1139 			return -1;
1140 		case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
1141 			if (!in) {
1142 				JANUS_LOG(LOG_VERB, "Rejecting incoming HTTP request on WebSockets endpoint: no sub-protocol specified\n");
1143 				return -1;
1144 			}
1145 			break;
1146 		case LWS_CALLBACK_GET_THREAD_ID:
1147 			return (uint64_t)pthread_self();
1148 		default:
1149 			break;
1150 	}
1151 	return 0;
1152 }
1153 
janus_websockets_callback_https(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1154 static int janus_websockets_callback_https(
1155 		struct lws *wsi,
1156 		enum lws_callback_reasons reason,
1157 		void *user, void *in, size_t len)
1158 {
1159 	/* We just forward the event to the HTTP handler */
1160 	return janus_websockets_callback_http(wsi, reason, user, in, len);
1161 }
1162 
1163 /* Use ~ 2xMTU as chunk size */
1164 #define MESSAGE_CHUNK_SIZE 2800
1165 
1166 /* This callback handles Janus API requests */
janus_websockets_common_callback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len,gboolean admin)1167 static int janus_websockets_common_callback(
1168 		struct lws *wsi,
1169 		enum lws_callback_reasons reason,
1170 		void *user, void *in, size_t len, gboolean admin)
1171 {
1172 	const char *log_prefix = admin ? "AdminWSS" : "WSS";
1173 	janus_websockets_client *ws_client = (janus_websockets_client *)user;
1174 	switch(reason) {
1175 		case LWS_CALLBACK_ESTABLISHED: {
1176 			/* Is there any filtering we should apply? */
1177 			char ip[256];
1178 #ifdef HAVE_LIBWEBSOCKETS_PEER_SIMPLE
1179 			lws_get_peer_simple(wsi, ip, 256);
1180 			JANUS_LOG(LOG_VERB, "[%s-%p] WebSocket connection opened from %s\n", log_prefix, wsi, ip);
1181 #else
1182 			char name[256];
1183 			lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), name, 256, ip, 256);
1184 			JANUS_LOG(LOG_VERB, "[%s-%p] WebSocket connection opened from %s by %s\n", log_prefix, wsi, ip, name);
1185 #endif
1186 			if(!janus_websockets_is_allowed(ip, admin)) {
1187 				JANUS_LOG(LOG_ERR, "[%s-%p] IP %s is unauthorized to connect to the WebSockets %s API interface\n", log_prefix, wsi, ip, admin ? "Admin" : "Janus");
1188 				/* Close the connection */
1189 				lws_callback_on_writable(wsi);
1190 				return -1;
1191 			}
1192 			JANUS_LOG(LOG_VERB, "[%s-%p] WebSocket connection accepted\n", log_prefix, wsi);
1193 			if(ws_client == NULL) {
1194 				JANUS_LOG(LOG_ERR, "[%s-%p] Invalid WebSocket client instance...\n", log_prefix, wsi);
1195 				return -1;
1196 			}
1197 			/* Prepare the session */
1198 			ws_client->wsi = wsi;
1199 			ws_client->messages = g_async_queue_new();
1200 			ws_client->buffer = NULL;
1201 			ws_client->buflen = 0;
1202 			ws_client->bufpending = 0;
1203 			ws_client->bufoffset = 0;
1204 			g_atomic_int_set(&ws_client->destroyed, 0);
1205 			ws_client->ts = janus_transport_session_create(ws_client, NULL);
1206 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
1207 			janus_mutex_lock(&writable_mutex);
1208 			g_hash_table_insert(clients, ws_client, ws_client);
1209 			janus_mutex_unlock(&writable_mutex);
1210 #endif
1211 			/* Let us know when the WebSocket channel becomes writeable */
1212 			lws_callback_on_writable(wsi);
1213 			JANUS_LOG(LOG_VERB, "[%s-%p]   -- Ready to be used!\n", log_prefix, wsi);
1214 			/* Notify handlers about this new transport */
1215 			if(notify_events && gateway->events_is_enabled()) {
1216 				json_t *info = json_object();
1217 				json_object_set_new(info, "event", json_string("connected"));
1218 				json_object_set_new(info, "admin_api", admin ? json_true() : json_false());
1219 				json_object_set_new(info, "ip", json_string(ip));
1220 				gateway->notify_event(&janus_websockets_transport, ws_client->ts, info);
1221 			}
1222 			return 0;
1223 		}
1224 		case LWS_CALLBACK_ADD_HEADERS: {
1225 			/* If CORS is enabled, check the headers and add our own */
1226 			struct lws_process_html_args *args = (struct lws_process_html_args *)in;
1227 			if(allow_origin == NULL) {
1228 				/* Return a wildcard for the Access-Control-Allow-Origin header */
1229 				if(lws_add_http_header_by_name(wsi,
1230 						(unsigned char *)"Access-Control-Allow-Origin:",
1231 						(unsigned char *)"*", 1,
1232 						(unsigned char **)&args->p,
1233 						(unsigned char *)args->p + args->max_len))
1234 					return 1;
1235 			} else {
1236 				/* Return the configured origin in the header */
1237 				if(lws_add_http_header_by_name(wsi,
1238 						(unsigned char *)"Access-Control-Allow-Origin:",
1239 						(unsigned char *)allow_origin, strlen(allow_origin),
1240 						(unsigned char **)&args->p,
1241 						(unsigned char *)args->p + args->max_len))
1242 					return 1;
1243 				char origin[256], headers[256], methods[256];
1244 				origin[0] = '\0';
1245 				headers[0] = '\0';
1246 				methods[0] = '\0';
1247 				int olen = lws_hdr_total_length(wsi, WSI_TOKEN_ORIGIN);
1248 				if(olen > 0 && olen < 255) {
1249 					lws_hdr_copy(wsi, origin, sizeof(origin), WSI_TOKEN_ORIGIN);
1250 				}
1251 				int hlen = lws_hdr_total_length(wsi, WSI_TOKEN_HTTP_AC_REQUEST_HEADERS);
1252 				if(hlen > 0 && hlen < 255) {
1253 					lws_hdr_copy(wsi, headers, sizeof(headers), WSI_TOKEN_HTTP_AC_REQUEST_HEADERS);
1254 					if(lws_add_http_header_by_name(wsi,
1255 							(unsigned char *)"Access-Control-Allow-Headers:",
1256 							(unsigned char *)headers, strlen(headers),
1257 							(unsigned char **)&args->p,
1258 							(unsigned char *)args->p + args->max_len))
1259 						return 1;
1260 				}
1261 #if (LWS_LIBRARY_VERSION_MAJOR >= 3 && LWS_LIBRARY_VERSION_MINOR >= 2) || (LWS_LIBRARY_VERSION_MAJOR >= 4)
1262 				int mlen = lws_hdr_custom_length(wsi, "Access-Control-Request-Methods", strlen("Access-Control-Request-Methods"));
1263 				if(mlen > 0 && mlen < 255) {
1264 					lws_hdr_custom_copy(wsi, methods, sizeof(methods),
1265 						"Access-Control-Request-Methods", strlen("Access-Control-Request-Methods"));
1266 					if(lws_add_http_header_by_name(wsi,
1267 							(unsigned char *)"Access-Control-Allow-Methods:",
1268 							(unsigned char *)methods, strlen(methods),
1269 							(unsigned char **)&args->p,
1270 							(unsigned char *)args->p + args->max_len))
1271 						return 1;
1272 				}
1273 #endif
1274 				/* WebSockets are not bound by CORS, but we can enforce this */
1275 				if(enforce_cors) {
1276 					if(strlen(origin) == 0 || strstr(origin, allow_origin) != origin) {
1277 						JANUS_LOG(LOG_ERR, "[%s-%p] Invalid origin, rejecting...\n", log_prefix, wsi);
1278 						return -1;
1279 					}
1280 				}
1281 			}
1282 			return 0;
1283 		}
1284 		case LWS_CALLBACK_RECEIVE: {
1285 			JANUS_LOG(LOG_HUGE, "[%s-%p] Got %zu bytes:\n", log_prefix, wsi, len);
1286 			if(ws_client == NULL || ws_client->wsi == NULL) {
1287 				JANUS_LOG(LOG_ERR, "[%s-%p] Invalid WebSocket client instance...\n", log_prefix, wsi);
1288 				return -1;
1289 			}
1290 			if(g_atomic_int_get(&ws_client->destroyed))
1291 				return 0;
1292 #if (LWS_LIBRARY_VERSION_MAJOR >= 4)
1293 			/* Refresh the lws connection validity (avoid sending a ping) */
1294 			lws_validity_confirmed(ws_client->wsi);
1295 #endif
1296 			/* Is this a new message, or part of a fragmented one? */
1297 			const size_t remaining = lws_remaining_packet_payload(wsi);
1298 			if(ws_client->incoming == NULL) {
1299 				JANUS_LOG(LOG_HUGE, "[%s-%p] First fragment: %zu bytes, %zu remaining\n", log_prefix, wsi, len, remaining);
1300 				ws_client->incoming = g_malloc(len+1);
1301 				memcpy(ws_client->incoming, in, len);
1302 				ws_client->incoming[len] = '\0';
1303 				JANUS_LOG(LOG_HUGE, "%s\n", ws_client->incoming);
1304 			} else {
1305 				size_t offset = strlen(ws_client->incoming);
1306 				JANUS_LOG(LOG_HUGE, "[%s-%p] Appending fragment: offset %zu, %zu bytes, %zu remaining\n", log_prefix, wsi, offset, len, remaining);
1307 				ws_client->incoming = g_realloc(ws_client->incoming, offset+len+1);
1308 				memcpy(ws_client->incoming+offset, in, len);
1309 				ws_client->incoming[offset+len] = '\0';
1310 				JANUS_LOG(LOG_HUGE, "%s\n", ws_client->incoming+offset);
1311 			}
1312 			if(remaining > 0 || !lws_is_final_fragment(wsi)) {
1313 				/* Still waiting for some more fragments */
1314 				JANUS_LOG(LOG_HUGE, "[%s-%p] Waiting for more fragments\n", log_prefix, wsi);
1315 				return 0;
1316 			}
1317 			JANUS_LOG(LOG_HUGE, "[%s-%p] Done, parsing message: %zu bytes\n", log_prefix, wsi, strlen(ws_client->incoming));
1318 			/* If we got here, the message is complete: parse the JSON payload */
1319 			json_error_t error;
1320 			json_t *root = json_loads(ws_client->incoming, 0, &error);
1321 			g_free(ws_client->incoming);
1322 			ws_client->incoming = NULL;
1323 			/* Notify the core, passing both the object and, since it may be needed, the error */
1324 			gateway->incoming_request(&janus_websockets_transport, ws_client->ts, NULL, admin, root, &error);
1325 			return 0;
1326 		}
1327 #if (LWS_LIBRARY_VERSION_MAJOR >= 3)
1328 		/* On libwebsockets >= 3.x, we use this event to mark connections as writable in the event loop */
1329 		case LWS_CALLBACK_EVENT_WAIT_CANCELLED: {
1330 			janus_mutex_lock(&writable_mutex);
1331 			/* We iterate on all the clients we marked as writable and act on them */
1332 			GHashTableIter iter;
1333 			gpointer value;
1334 			g_hash_table_iter_init(&iter, writable_clients);
1335 			while(g_hash_table_iter_next(&iter, NULL, &value)) {
1336 				janus_websockets_client *client = value;
1337 				if(client == NULL || client->wsi == NULL)
1338 					continue;
1339 				lws_callback_on_writable(client->wsi);
1340 			}
1341 			g_hash_table_remove_all(writable_clients);
1342 			janus_mutex_unlock(&writable_mutex);
1343 			return 0;
1344 		}
1345 #endif
1346 		case LWS_CALLBACK_SERVER_WRITEABLE: {
1347 			if(ws_client == NULL || ws_client->wsi == NULL) {
1348 				JANUS_LOG(LOG_ERR, "[%s-%p] Invalid WebSocket client instance...\n", log_prefix, wsi);
1349 				return -1;
1350 			}
1351 			if(!g_atomic_int_get(&ws_client->destroyed) && !g_atomic_int_get(&stopping)) {
1352 				janus_mutex_lock(&ws_client->ts->mutex);
1353 
1354 				/* Check if Websockets send pipe is choked */
1355 				if(lws_send_pipe_choked(wsi)) {
1356 					if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
1357 						JANUS_LOG(LOG_WARN, "Websockets choked with buffer: %zu, trying again\n", ws_client->bufpending);
1358 						lws_callback_on_writable(wsi);
1359 					} else {
1360 						gint qlen = g_async_queue_length(ws_client->messages);
1361 						JANUS_LOG(LOG_WARN, "Websockets choked with queue: %d, trying again\n", qlen);
1362 						if(qlen > 0) {
1363 							lws_callback_on_writable(wsi);
1364 						}
1365 					}
1366 					janus_mutex_unlock(&ws_client->ts->mutex);
1367 					return 0;
1368 				}
1369 
1370 				/* Check if we have a pending/partial write to complete first */
1371 				if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
1372 					JANUS_LOG(LOG_HUGE, "[%s-%p] Completing pending WebSocket write (still need to write last %zu bytes)...\n",
1373 						log_prefix, wsi, ws_client->bufpending);
1374 				} else {
1375 					/* Shoot all the pending messages */
1376 					char *response = g_async_queue_try_pop(ws_client->messages);
1377 					if (!response) {
1378 						/* No messages found */
1379 						janus_mutex_unlock(&ws_client->ts->mutex);
1380 						return 0;
1381 					}
1382 					if (g_atomic_int_get(&ws_client->destroyed) || g_atomic_int_get(&stopping)) {
1383 						free(response);
1384 						janus_mutex_unlock(&ws_client->ts->mutex);
1385 						return 0;
1386 					}
1387 					/* Gotcha! */
1388 					JANUS_LOG(LOG_HUGE, "[%s-%p] Sending WebSocket message (%zu bytes)...\n", log_prefix, wsi, strlen(response));
1389 					size_t buflen = LWS_PRE + strlen(response);
1390 					if (buflen > ws_client->buflen) {
1391 						/* We need a larger shared buffer */
1392 						JANUS_LOG(LOG_HUGE, "[%s-%p] Re-allocating to %zu bytes (was %zu, response is %zu bytes)\n", log_prefix, wsi, buflen, ws_client->buflen, strlen(response));
1393 						ws_client->buflen = buflen;
1394 						ws_client->buffer = g_realloc(ws_client->buffer, buflen);
1395 					}
1396 					memcpy(ws_client->buffer + LWS_PRE, response, strlen(response));
1397 					/* Initialize pending bytes count and buffer offset */
1398 					ws_client->bufpending = strlen(response);
1399 					ws_client->bufoffset = LWS_PRE;
1400 					/* We can get rid of the message */
1401 					free(response);
1402 				}
1403 
1404 				if (g_atomic_int_get(&ws_client->destroyed) || g_atomic_int_get(&stopping)) {
1405 					janus_mutex_unlock(&ws_client->ts->mutex);
1406 					return 0;
1407 				}
1408 
1409 				/* Evaluate amount of data to send according to MESSAGE_CHUNK_SIZE */
1410 				int amount = ws_client->bufpending <= MESSAGE_CHUNK_SIZE ? ws_client->bufpending : MESSAGE_CHUNK_SIZE;
1411 				/* Set fragment flags */
1412 				int flags = lws_write_ws_flags(LWS_WRITE_TEXT, ws_client->bufoffset == LWS_PRE, ws_client->bufpending <= (size_t)amount);
1413 				/* Send the fragment with proper flags */
1414 				int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, (size_t)amount, flags);
1415 				JANUS_LOG(LOG_HUGE, "[%s-%p]   -- First=%d, Last=%d, Requested=%d bytes, Sent=%d bytes, Missing=%zu bytes\n", log_prefix, wsi, ws_client->bufoffset <= LWS_PRE, ws_client->bufpending <= (size_t)amount, amount, sent, ws_client->bufpending - amount);
1416 				if(sent < amount) {
1417 					/* Error on sending, abort operation */
1418 					JANUS_LOG(LOG_ERR, "Websocket sent only %d bytes (expected %d)\n", sent, amount);
1419 					ws_client->bufpending = 0;
1420 					ws_client->bufoffset = 0;
1421 				} else {
1422 					/* Fragment successfully sent, update status */
1423 					ws_client->bufpending -= amount;
1424 					ws_client->bufoffset += amount;
1425 					if(ws_client->bufpending > 0) {
1426 						/* We couldn't send everything in a single write, we'll complete this in the next round */
1427 						JANUS_LOG(LOG_HUGE, "[%s-%p]   -- Couldn't write all bytes (%zu missing), setting offset %zu\n",
1428 							log_prefix, wsi, ws_client->bufpending, ws_client->bufoffset);
1429 					}
1430 				}
1431 				/* Done for this round, check the next response/notification later */
1432 				lws_callback_on_writable(wsi);
1433 				janus_mutex_unlock(&ws_client->ts->mutex);
1434 				return 0;
1435 			}
1436 			return 0;
1437 		}
1438 		case LWS_CALLBACK_CLOSED: {
1439 			JANUS_LOG(LOG_VERB, "[%s-%p] WS connection down, closing\n", log_prefix, wsi);
1440 			janus_websockets_destroy_client(ws_client, wsi, log_prefix);
1441 			JANUS_LOG(LOG_VERB, "[%s-%p]   -- closed\n", log_prefix, wsi);
1442 			return 0;
1443 		}
1444 		case LWS_CALLBACK_WSI_DESTROY: {
1445 			JANUS_LOG(LOG_VERB, "[%s-%p] WS connection down, destroying\n", log_prefix, wsi);
1446 			janus_websockets_destroy_client(ws_client, wsi, log_prefix);
1447 			JANUS_LOG(LOG_VERB, "[%s-%p]   -- destroyed\n", log_prefix, wsi);
1448 			return 0;
1449 		}
1450 		default:
1451 			if(wsi != NULL) {
1452 				JANUS_LOG(LOG_HUGE, "[%s-%p] %d (%s)\n", log_prefix, wsi, reason, janus_websockets_reason_string(reason));
1453 			} else {
1454 				JANUS_LOG(LOG_HUGE, "[%s] %d (%s)\n", log_prefix, reason, janus_websockets_reason_string(reason));
1455 			}
1456 			break;
1457 	}
1458 	return 0;
1459 }
1460 
1461 /* This callback handles Janus API requests */
janus_websockets_callback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1462 static int janus_websockets_callback(
1463 		struct lws *wsi,
1464 		enum lws_callback_reasons reason,
1465 		void *user, void *in, size_t len)
1466 {
1467 	return janus_websockets_common_callback(wsi, reason, user, in, len, FALSE);
1468 }
1469 
janus_websockets_callback_secure(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1470 static int janus_websockets_callback_secure(
1471 		struct lws *wsi,
1472 		enum lws_callback_reasons reason,
1473 		void *user, void *in, size_t len)
1474 {
1475 	/* We just forward the event to the Janus API handler */
1476 	return janus_websockets_callback(wsi, reason, user, in, len);
1477 }
1478 
1479 /* This callback handles Admin API requests */
janus_websockets_admin_callback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1480 static int janus_websockets_admin_callback(
1481 		struct lws *wsi,
1482 		enum lws_callback_reasons reason,
1483 		void *user, void *in, size_t len)
1484 {
1485 	return janus_websockets_common_callback(wsi, reason, user, in, len, TRUE);
1486 }
1487 
janus_websockets_admin_callback_secure(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)1488 static int janus_websockets_admin_callback_secure(
1489 		struct lws *wsi,
1490 		enum lws_callback_reasons reason,
1491 		void *user, void *in, size_t len)
1492 {
1493 	/* We just forward the event to the Admin API handler */
1494 	return janus_websockets_admin_callback(wsi, reason, user, in, len);
1495 }
1496