1 /*
2  * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2013-2015, Grasshopper
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is Grasshopper
20  * Portions created by the Initial Developer are Copyright (C)
21  * the Initial Developer. All Rights Reserved.
22  *
23  * Contributor(s):
24  * Chris Rienzo <chris.rienzo@grasshopper.com>
25  *
26  * xmpp_streams.c -- XMPP s2s and c2s streams
27  *
28  */
29 #include <switch.h>
30 #include <iksemel.h>
31 
32 #include <openssl/ssl.h>
33 
34 #include "xmpp_streams.h"
35 #include "iks_helpers.h"
36 #include "sasl.h"
37 
38 #define MAX_QUEUE_LEN 25000
39 
40 /**
41  * Context for all streams
42  */
43 struct xmpp_stream_context {
44 	/** memory pool to use */
45 	switch_memory_pool_t *pool;
46 	/** domain for this context */
47 	const char *domain;
48 	/** synchronizes access to streams and routes hashes */
49 	switch_mutex_t *streams_mutex;
50 	/** map of stream JID to routable stream */
51 	switch_hash_t *routes;
52 	/** map of stream ID to stream */
53 	switch_hash_t *streams;
54 	/** map of user ID to password */
55 	switch_hash_t *users;
56 	/** shared secret for server dialback */
57 	const char *dialback_secret;
58 	/** callback when a new resource is bound */
59 	xmpp_stream_bind_callback bind_callback;
60 	/** callback when a new stream is ready */
61 	xmpp_stream_ready_callback ready_callback;
62 	/** callback when a stream is destroyed */
63 	xmpp_stream_destroy_callback destroy_callback;
64 	/** callback when a stanza is received */
65 	xmpp_stream_recv_callback recv_callback;
66 	/** context shutdown flag */
67 	int shutdown;
68 	/** prevents context shutdown until all threads are finished */
69 	switch_thread_rwlock_t *shutdown_rwlock;
70 	/** path to cert PEM file */
71 	const char *cert_pem_file;
72 	/** path to key PEM file */
73 	const char *key_pem_file;
74 };
75 
76 /**
77  * State of a stream
78  */
79 enum xmpp_stream_state {
80 	/** new connection */
81 	XSS_CONNECT,
82 	/** encrypted comms established */
83 	XSS_SECURE,
84 	/** remote party authenticated */
85 	XSS_AUTHENTICATED,
86 	/** client resource bound */
87 	XSS_RESOURCE_BOUND,
88 	/** ready to accept requests */
89 	XSS_READY,
90 	/** terminating stream */
91 	XSS_SHUTDOWN,
92 	/** unrecoverable error */
93 	XSS_ERROR,
94 	/** destroyed */
95 	XSS_DESTROY
96 };
97 
98 /**
99  * A client/server stream connection
100  */
101 struct xmpp_stream {
102 	/** stream state */
103 	enum xmpp_stream_state state;
104 	/** true if server-to-server connection */
105 	int s2s;
106 	/** true if incoming connection */
107 	int incoming;
108 	/** Jabber ID of remote party */
109 	char *jid;
110 	/** stream ID */
111 	char *id;
112 	/** stream pool */
113 	switch_memory_pool_t *pool;
114 	/** address of this stream */
115 	const char *address;
116 	/** port of this stream */
117 	int port;
118 	/** synchronizes access to this stream */
119 	switch_mutex_t *mutex;
120 	/** socket to remote party */
121 	switch_socket_t *socket;
122 	/** socket poll descriptor */
123 	switch_pollfd_t *pollfd;
124 	/** XML stream parser */
125 	iksparser *parser;
126 	/** outbound message queue */
127 	switch_queue_t *msg_queue;
128 	/** true if no activity last poll */
129 	int idle;
130 	/** context for this stream */
131 	struct xmpp_stream_context *context;
132 	/** user private data */
133 	void *user_private;
134 };
135 
136 /**
137  * A socket listening for new connections
138  */
139 struct xmpp_listener {
140 	/** listener pool */
141 	switch_memory_pool_t *pool;
142 	/** listen address */
143 	char *addr;
144 	/** listen port */
145 	switch_port_t port;
146 	/** access control list */
147 	const char *acl;
148 	/** listen socket */
149 	switch_socket_t *socket;
150 	/** pollset for listen socket */
151 	switch_pollfd_t *read_pollfd;
152 	/** true if server to server connections only */
153 	int s2s;
154 	/** context for new streams */
155 	struct xmpp_stream_context *context;
156 };
157 
158 static void xmpp_stream_new_id(struct xmpp_stream *stream);
159 static void xmpp_stream_set_id(struct xmpp_stream *stream, const char *id);
160 
161 /**
162  * Convert xmpp stream state to string
163  * @param state the xmpp stream state
164  * @return the string value of state or "UNKNOWN"
165  */
xmpp_stream_state_to_string(enum xmpp_stream_state state)166 static const char *xmpp_stream_state_to_string(enum xmpp_stream_state state)
167 {
168 	switch(state) {
169 		case XSS_CONNECT: return "CONNECT";
170 		case XSS_SECURE: return "SECURE";
171 		case XSS_AUTHENTICATED: return "AUTHENTICATED";
172 		case XSS_RESOURCE_BOUND: return "RESOURCE_BOUND";
173 		case XSS_READY: return "READY";
174 		case XSS_SHUTDOWN: return "SHUTDOWN";
175 		case XSS_ERROR: return "ERROR";
176 		case XSS_DESTROY: return "DESTROY";
177 	}
178 	return "UNKNOWN";
179 }
180 
181 /**
182  * Handle XMPP stream logging callback
183  * @param user_data the xmpp stream
184  * @param data the log message
185  * @param size of the log message
186  * @param is_incoming true if this is a log for a received message
187  */
on_stream_log(void * user_data,const char * data,size_t size,int is_incoming)188 static void on_stream_log(void *user_data, const char *data, size_t size, int is_incoming)
189 {
190 	if (size > 0) {
191 		struct xmpp_stream *stream = (struct xmpp_stream *)user_data;
192 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, %s_%s %s %s\n", stream->jid, stream->address, stream->port, stream->s2s ? "s2s" : "c2s",
193 			stream->incoming ? "in" : "out", is_incoming ? "RECV" : "SEND", data);
194 	}
195 }
196 
197 /**
198  * Send stanza to stream.
199  */
xmpp_stream_stanza_send(struct xmpp_stream * stream,iks * msg)200 static void xmpp_stream_stanza_send(struct xmpp_stream *stream, iks *msg)
201 {
202 	/* send directly if client or outbound s2s stream */
203 	if (!stream->s2s || !stream->incoming) {
204 		iks_send(stream->parser, msg);
205 		iks_delete(msg);
206 	} else {
207 		/* route message to outbound server stream */
208 		xmpp_stream_context_send(stream->context, stream->jid, msg);
209 		iks_delete(msg);
210 	}
211 }
212 
213 /**
214  * Attach stream to connected socket
215  * @param stream the stream
216  * @param socket the connected socket
217  */
xmpp_stream_set_socket(struct xmpp_stream * stream,switch_socket_t * socket)218 static void xmpp_stream_set_socket(struct xmpp_stream *stream, switch_socket_t *socket)
219 {
220 	stream->socket = socket;
221 	switch_socket_create_pollset(&stream->pollfd, stream->socket, SWITCH_POLLIN | SWITCH_POLLERR, stream->pool);
222 
223 	/* connect XMPP stream parser to socket */
224 	{
225 		switch_os_socket_t os_socket;
226 		switch_os_sock_get(&os_socket, stream->socket);
227 		iks_connect_fd(stream->parser, os_socket);
228 		/* TODO connect error checking */
229 	}
230 }
231 
232 /**
233  * Assign a new ID to the stream
234  * @param stream the stream
235  */
xmpp_stream_new_id(struct xmpp_stream * stream)236 static void xmpp_stream_new_id(struct xmpp_stream *stream)
237 {
238 	char id[SWITCH_UUID_FORMATTED_LENGTH + 1] = { 0 };
239 	switch_uuid_str(id, sizeof(id));
240 	xmpp_stream_set_id(stream, id);
241 }
242 
243 /**
244  * Send session reply to server <stream> after auth is done
245  * @param stream the xmpp stream
246  */
xmpp_send_server_header_features(struct xmpp_stream * stream)247 static void xmpp_send_server_header_features(struct xmpp_stream *stream)
248 {
249 	struct xmpp_stream_context *context = stream->context;
250 	char *header = switch_mprintf(
251 		"<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
252 		" from='%s' id='%s' xml:lang='en' version='1.0'"
253 		" xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
254 		"</stream:features>", context->domain, stream->id);
255 
256 	iks_send_raw(stream->parser, header);
257 	free(header);
258 }
259 
260 /**
261  * Send bind + session reply to client <stream>
262  * @param stream the xmpp stream
263  */
xmpp_send_client_header_bind(struct xmpp_stream * stream)264 static void xmpp_send_client_header_bind(struct xmpp_stream *stream)
265 {
266 	struct xmpp_stream_context *context = stream->context;
267 	char *header = switch_mprintf(
268 		"<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
269 		" from='%s' id='%s' xml:lang='en' version='1.0'"
270 		" xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
271 		"<bind xmlns='"IKS_NS_XMPP_BIND"'/>"
272 		"<session xmlns='"IKS_NS_XMPP_SESSION"'/>"
273 		"</stream:features>", context->domain, stream->id);
274 
275 	iks_send_raw(stream->parser, header);
276 	free(header);
277 }
278 
279 /**
280  * Handle <presence> message callback
281  * @param stream the stream
282  * @param node the presence message
283  */
on_stream_presence(struct xmpp_stream * stream,iks * node)284 static void on_stream_presence(struct xmpp_stream *stream, iks *node)
285 {
286 	struct xmpp_stream_context *context = stream->context;
287 	const char *from = iks_find_attrib(node, "from");
288 
289 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, presence, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
290 
291 	if (!from) {
292 		if (stream->s2s) {
293 			/* from is required in s2s connections */
294 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, no presence from JID\n", stream->jid, stream->address, stream->port);
295 			return;
296 		}
297 
298 		/* use stream JID if a c2s connection */
299 		from = stream->jid;
300 		if (zstr(from)) {
301 			/* error */
302 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, no presence from JID\n", stream->jid, stream->address, stream->port);
303 			return;
304 		}
305 		iks_insert_attrib(node, "from", from);
306 	}
307 	if (context->recv_callback) {
308 		context->recv_callback(stream, node);
309 	}
310 }
311 
312 /**
313  * Send <success> reply to xmpp stream <auth>
314  * @param stream the xmpp stream.
315  */
xmpp_send_auth_success(struct xmpp_stream * stream)316 static void xmpp_send_auth_success(struct xmpp_stream *stream)
317 {
318 	iks_send_raw(stream->parser, "<success xmlns='"IKS_NS_XMPP_SASL"'/>");
319 }
320 
321 /**
322  * Send <failure> reply to xmpp client <auth>
323  * @param stream the xmpp stream to use.
324  * @param reason the reason for failure
325  */
xmpp_send_auth_failure(struct xmpp_stream * stream,const char * reason)326 static void xmpp_send_auth_failure(struct xmpp_stream *stream, const char *reason)
327 {
328 	char *reply = switch_mprintf("<failure xmlns='"IKS_NS_XMPP_SASL"'>"
329 		"<%s/></failure>", reason);
330 	iks_send_raw(stream->parser, reply);
331 	free(reply);
332 }
333 
334 /**
335  * Validate username and password
336  * @param authzid authorization id
337  * @param authcid authentication id
338  * @param password
339  * @return 1 if authenticated
340  */
verify_plain_auth(struct xmpp_stream_context * context,const char * authzid,const char * authcid,const char * password)341 static int verify_plain_auth(struct xmpp_stream_context *context, const char *authzid, const char *authcid, const char *password)
342 {
343 	char *correct_password;
344 	if (zstr(authzid) || zstr(authcid) || zstr(password)) {
345 		return 0;
346 	}
347 	correct_password = switch_core_hash_find(context->users, authcid);
348 	return !zstr(correct_password) && !strcmp(correct_password, password);
349 }
350 
351 /**
352  * Send sasl reply to xmpp <stream>
353  * @param stream the xmpp stream
354  */
xmpp_send_client_header_auth(struct xmpp_stream * stream)355 static void xmpp_send_client_header_auth(struct xmpp_stream *stream)
356 {
357 	struct xmpp_stream_context *context = stream->context;
358 	char *header = switch_mprintf(
359 		"<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
360 		" from='%s' id='%s' xml:lang='en' version='1.0'"
361 		" xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
362 		"<mechanisms xmlns='"IKS_NS_XMPP_SASL"'>"
363 		"<mechanism>PLAIN</mechanism>"
364 		"</mechanisms></stream:features>", context->domain, stream->id);
365 	iks_send_raw(stream->parser, header);
366 	free(header);
367 }
368 
369 /**
370  * Send sasl + starttls reply to xmpp <stream>
371  * @param stream the xmpp stream
372  */
xmpp_send_client_header_tls(struct xmpp_stream * stream)373 static void xmpp_send_client_header_tls(struct xmpp_stream *stream)
374 {
375 	if (stream->context->key_pem_file && stream->context->cert_pem_file) {
376 		struct xmpp_stream_context *context = stream->context;
377 		char *header = switch_mprintf(
378 			"<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
379 			" from='%s' id='%s' xml:lang='en' version='1.0'"
380 			" xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
381 			"<starttls xmlns='"IKS_NS_XMPP_TLS"'><required/></starttls>"
382 			"<mechanisms xmlns='"IKS_NS_XMPP_SASL"'>"
383 			"<mechanism>PLAIN</mechanism>"
384 			"</mechanisms></stream:features>", context->domain, stream->id);
385 		iks_send_raw(stream->parser, header);
386 		free(header);
387 	} else {
388 		/* not set up for TLS, skip it */
389 		stream->state = XSS_SECURE;
390 		xmpp_send_client_header_auth(stream);
391 	}
392 }
393 
394 /**
395  * Send sasl reply to xmpp <stream>
396  * @param stream the xmpp stream
397  */
xmpp_send_server_header_auth(struct xmpp_stream * stream)398 static void xmpp_send_server_header_auth(struct xmpp_stream *stream)
399 {
400 	struct xmpp_stream_context *context = stream->context;
401 	char *header = switch_mprintf(
402 		"<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
403 		" from='%s' id='%s' xml:lang='en' version='1.0'"
404 		" xmlns:stream='"IKS_NS_XMPP_STREAMS"'>"
405 		"<stream:features>"
406 		"</stream:features>",
407 		context->domain, stream->id);
408 	iks_send_raw(stream->parser, header);
409 	free(header);
410 }
411 
412 /**
413  * Send dialback to receiving server
414  */
xmpp_send_dialback_key(struct xmpp_stream * stream)415 static void xmpp_send_dialback_key(struct xmpp_stream *stream)
416 {
417 	struct xmpp_stream_context *context = stream->context;
418 	char *dialback_key = iks_server_dialback_key(context->dialback_secret, stream->jid, context->domain, stream->id);
419 	if (dialback_key) {
420 		char *dialback = switch_mprintf(
421 			"<db:result from='%s' to='%s'>%s</db:result>",
422 			context->domain, stream->jid,
423 			dialback_key);
424 		iks_send_raw(stream->parser, dialback);
425 		free(dialback);
426 		free(dialback_key);
427 	} else {
428 		/* TODO missing shared secret */
429 	}
430 }
431 
432 /**
433  * Send initial <stream> header to peer server
434  * @param stream the xmpp stream
435  */
xmpp_send_outbound_server_header(struct xmpp_stream * stream)436 static void xmpp_send_outbound_server_header(struct xmpp_stream *stream)
437 {
438 	struct xmpp_stream_context *context = stream->context;
439 	char *header = switch_mprintf(
440 		"<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
441 		" from='%s' to='%s' xml:lang='en' version='1.0'"
442 		" xmlns:stream='"IKS_NS_XMPP_STREAMS"'>", context->domain, stream->jid);
443 	iks_send_raw(stream->parser, header);
444 	free(header);
445 }
446 
447 /**
448  * Handle <starttls> message.
449  * @param the xmpp stream
450  * @param node the <starttls> packet
451  */
on_stream_starttls(struct xmpp_stream * stream,iks * node)452 static void on_stream_starttls(struct xmpp_stream *stream, iks *node)
453 {
454 	/* wait for handshake to start */
455 	if (iks_proceed_tls(stream->parser, stream->context->cert_pem_file, stream->context->key_pem_file) == IKS_OK) {
456 		stream->state = XSS_SECURE;
457 	} else {
458 		stream->state = XSS_ERROR;
459 	}
460 }
461 
462 /**
463  * Handle <auth> message.  Only PLAIN supported.
464  * @param stream the xmpp stream
465  * @param node the <auth> packet
466  */
on_stream_auth(struct xmpp_stream * stream,iks * node)467 static void on_stream_auth(struct xmpp_stream *stream, iks *node)
468 {
469 	struct xmpp_stream_context *context = stream->context;
470 	const char *xmlns, *mechanism;
471 	iks *auth_body;
472 
473 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, auth, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
474 
475 	/* wrong state for authentication */
476 	if (stream->state != XSS_SECURE) {
477 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth UNEXPECTED, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
478 		/* on_auth unexpected error */
479 		stream->state = XSS_ERROR;
480 		return;
481 	}
482 
483 	/* unsupported authentication type */
484 	xmlns = iks_find_attrib_soft(node, "xmlns");
485 	if (strcmp(IKS_NS_XMPP_SASL, xmlns)) {
486 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, unsupported namespace: %s!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), xmlns);
487 		/* on_auth namespace error */
488 		stream->state = XSS_ERROR;
489 		return;
490 	}
491 
492 	/* unsupported SASL authentication mechanism */
493 	mechanism = iks_find_attrib_soft(node, "mechanism");
494 	if (strcmp("PLAIN", mechanism)) {
495 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, unsupported SASL mechanism: %s!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), mechanism);
496 		xmpp_send_auth_failure(stream, "invalid-mechanism");
497 		stream->state = XSS_ERROR;
498 		return;
499 	}
500 
501 	if ((auth_body = iks_child(node)) && iks_type(auth_body) == IKS_CDATA) {
502 		/* get user and password from auth */
503 		char *message = iks_cdata(auth_body);
504 		char *authzid = NULL, *authcid, *password;
505 		/* TODO use library for SASL! */
506 		parse_plain_auth_message(message, &authzid, &authcid, &password);
507 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, auth, state = %s, SASL/PLAIN decoded authzid = \"%s\" authcid = \"%s\"\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), authzid, authcid);
508 		if (verify_plain_auth(context, authzid, authcid, password)) {
509 			stream->jid = switch_core_strdup(stream->pool, authzid);
510 			if (!stream->s2s && !strchr(stream->jid, '@')) {
511 				/* add missing domain on client stream */
512 				stream->jid = switch_core_sprintf(stream->pool, "%s@%s", stream->jid, context->domain);
513 			}
514 
515 			xmpp_send_auth_success(stream);
516 			stream->state = XSS_AUTHENTICATED;
517 		} else {
518 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, invalid user or password!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
519 			xmpp_send_auth_failure(stream, "not-authorized");
520 			stream->state = XSS_ERROR;
521 		}
522 		switch_safe_free(authzid);
523 		switch_safe_free(authcid);
524 		switch_safe_free(password);
525 	} else {
526 		/* missing message */
527 		stream->state = XSS_ERROR;
528 	}
529 }
530 
531 /**
532  * Handle <iq><session> request
533  * @param stream the xmpp stream
534  * @param node the <iq> node
535  * @return NULL
536  */
on_iq_set_xmpp_session(struct xmpp_stream * stream,iks * node)537 static iks *on_iq_set_xmpp_session(struct xmpp_stream *stream, iks *node)
538 {
539 	struct xmpp_stream_context *context = stream->context;
540 	iks *reply;
541 
542 	switch(stream->state) {
543 		case XSS_RESOURCE_BOUND: {
544 			if (context->ready_callback && !context->ready_callback(stream)) {
545 				reply = iks_new_error(node, STANZA_ERROR_INTERNAL_SERVER_ERROR);
546 				stream->state = XSS_ERROR;
547 			} else {
548 				reply = iks_new_iq_result(node);
549 				stream->state = XSS_READY;
550 
551 				/* add to available streams */
552 				switch_mutex_lock(context->streams_mutex);
553 				switch_core_hash_insert(context->routes, stream->jid, stream);
554 				switch_mutex_unlock(context->streams_mutex);
555 			}
556 
557 			break;
558 		}
559 		case XSS_AUTHENTICATED:
560 		case XSS_READY:
561 		default:
562 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, iq UNEXPECTED <session>, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
563 			reply = iks_new_error(node, STANZA_ERROR_SERVICE_UNAVAILABLE);
564 			break;
565 	}
566 
567 	return reply;
568 }
569 
570 /**
571  * Handle <iq><bind> request
572  * @param stream the xmpp stream
573  * @param node the <iq> node
574  */
on_iq_set_xmpp_bind(struct xmpp_stream * stream,iks * node)575 static iks *on_iq_set_xmpp_bind(struct xmpp_stream *stream, iks *node)
576 {
577 	iks *reply = NULL;
578 
579 	switch(stream->state) {
580 		case XSS_AUTHENTICATED: {
581 			struct xmpp_stream_context *context = stream->context;
582 			iks *bind = iks_find(node, "bind");
583 			iks *x;
584 			/* get optional client resource ID */
585 			char *resource_id = iks_find_cdata(bind, "resource");
586 
587 			/* generate resource ID for client if not already set */
588 			if (zstr(resource_id)) {
589 				char resource_id_buf[SWITCH_UUID_FORMATTED_LENGTH + 1];
590 				switch_uuid_str(resource_id_buf, sizeof(resource_id_buf));
591 				resource_id = switch_core_strdup(stream->pool, resource_id_buf);
592 			}
593 
594 			stream->jid = switch_core_sprintf(stream->pool, "%s/%s", stream->jid, resource_id);
595 			if (context->bind_callback && !context->bind_callback(stream)) {
596 				stream->jid = NULL;
597 				reply = iks_new_error(node, STANZA_ERROR_CONFLICT);
598 			} else {
599 				stream->state = XSS_RESOURCE_BOUND;
600 
601 				reply = iks_new_iq_result(node);
602 				x = iks_insert(reply, "bind");
603 				iks_insert_attrib(x, "xmlns", IKS_NS_XMPP_BIND);
604 				iks_insert_cdata(iks_insert(x, "jid"), stream->jid, strlen(stream->jid));
605 			}
606 			break;
607 		}
608 		default:
609 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, iq UNEXPECTED <bind>\n", stream->jid, stream->address, stream->port);
610 			reply = iks_new_error(node, STANZA_ERROR_NOT_ALLOWED);
611 			break;
612 	}
613 
614 	return reply;
615 }
616 
617 /**
618  * Handle <iq> message callback
619  * @param stream the stream
620  * @param iq the packet
621  */
on_stream_iq(struct xmpp_stream * stream,iks * iq)622 static void on_stream_iq(struct xmpp_stream *stream, iks *iq)
623 {
624 	struct xmpp_stream_context *context = stream->context;
625 	switch(stream->state) {
626 		case XSS_CONNECT:
627 		case XSS_SECURE: {
628 			iks *error = iks_new_error(iq, STANZA_ERROR_NOT_AUTHORIZED);
629 			xmpp_stream_stanza_send(stream, error);
630 			break;
631 		}
632 		case XSS_AUTHENTICATED: {
633 			iks *cmd = iks_first_tag(iq);
634 			if (cmd && !strcmp("bind", iks_name(cmd)) && !strcmp(IKS_NS_XMPP_BIND, iks_find_attrib_soft(cmd, "xmlns"))) {
635 				iks *reply = on_iq_set_xmpp_bind(stream, iq);
636 				xmpp_stream_stanza_send(stream, reply);
637 			} else {
638 				iks *error = iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE);
639 				xmpp_stream_stanza_send(stream, error);
640 			}
641 			break;
642 		}
643 		case XSS_RESOURCE_BOUND: {
644 			iks *cmd = iks_first_tag(iq);
645 			if (cmd && !strcmp("session", iks_name(cmd)) && !strcmp(IKS_NS_XMPP_SESSION, iks_find_attrib_soft(cmd, "xmlns"))) {
646 				iks *reply = on_iq_set_xmpp_session(stream, iq);
647 				xmpp_stream_stanza_send(stream, reply);
648 			} else {
649 				iks *error = iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE);
650 				xmpp_stream_stanza_send(stream, error);
651 			}
652 			break;
653 		}
654 		case XSS_READY: {
655 			/* client requests */
656 			if (context->recv_callback) {
657 				context->recv_callback(stream, iq);
658 			}
659 			break;
660 		}
661 		case XSS_SHUTDOWN:
662 		case XSS_DESTROY:
663 		case XSS_ERROR: {
664 			iks *error = iks_new_error(iq, STANZA_ERROR_UNEXPECTED_REQUEST);
665 			xmpp_stream_stanza_send(stream, error);
666 			break;
667 		}
668 	};
669 }
670 
671 /**
672  * Handle </stream>
673  * @param stream the stream
674  */
on_stream_stop(struct xmpp_stream * stream)675 static void on_stream_stop(struct xmpp_stream *stream)
676 {
677 	if (stream->state != XSS_SHUTDOWN) {
678 		iks_send_raw(stream->parser, "</stream:stream>");
679 	}
680 	stream->state = XSS_DESTROY;
681 }
682 
683 /**
684  * Handle <stream> from a client
685  * @param stream the stream
686  * @param node the stream message
687  */
on_client_stream_start(struct xmpp_stream * stream,iks * node)688 static void on_client_stream_start(struct xmpp_stream *stream, iks *node)
689 {
690 	struct xmpp_stream_context *context = stream->context;
691 	const char *to = iks_find_attrib_soft(node, "to");
692 	const char *xmlns = iks_find_attrib_soft(node, "xmlns");
693 
694 	/* to is optional, must be server domain if set */
695 	if (!zstr(to) && strcmp(context->domain, to)) {
696 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong server domain!\n", stream->jid, stream->address, stream->port);
697 		stream->state = XSS_ERROR;
698 		return;
699 	}
700 
701 	/* xmlns = client */
702 	if (zstr(xmlns) || strcmp(xmlns, IKS_NS_CLIENT)) {
703 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
704 		stream->state = XSS_ERROR;
705 		return;
706 	}
707 
708 	switch (stream->state) {
709 		case XSS_CONNECT:
710 			xmpp_send_client_header_tls(stream);
711 			break;
712 		case XSS_SECURE:
713 			xmpp_send_client_header_auth(stream);
714 			break;
715 		case XSS_AUTHENTICATED:
716 			/* client bind required */
717 			xmpp_stream_new_id(stream);
718 			xmpp_send_client_header_bind(stream);
719 			break;
720 		case XSS_SHUTDOWN:
721 			/* strange... I expect IKS_NODE_STOP, this is a workaround. */
722 			stream->state = XSS_DESTROY;
723 			break;
724 		case XSS_RESOURCE_BOUND:
725 		case XSS_READY:
726 		case XSS_ERROR:
727 		case XSS_DESTROY:
728 			/* bad state */
729 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, bad state!\n", stream->jid, stream->address, stream->port);
730 			stream->state = XSS_ERROR;
731 			break;
732 	}
733 }
734 
735 /**
736  * Handle <db:result type='valid'>
737  */
on_stream_dialback_result_valid(struct xmpp_stream * stream,iks * node)738 static void on_stream_dialback_result_valid(struct xmpp_stream *stream, iks *node)
739 {
740 	struct xmpp_stream_context *context = stream->context;
741 
742 	/* TODO check domain pair and allow access if pending request exists */
743 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, valid dialback result\n", stream->jid, stream->address, stream->port);
744 
745 	if (context->ready_callback && !context->ready_callback(stream)) {
746 		stream->state = XSS_ERROR;
747 	} else {
748 		/* this stream is routable */
749 		stream->state = XSS_READY;
750 
751 		/* add to available streams */
752 		switch_mutex_lock(context->streams_mutex);
753 		switch_core_hash_insert(context->routes, stream->jid, stream);
754 		switch_mutex_unlock(context->streams_mutex);
755 	}
756 }
757 
758 /**
759  * Handle <db:result type='valid'>
760  */
on_stream_dialback_result_invalid(struct xmpp_stream * stream,iks * node)761 static void on_stream_dialback_result_invalid(struct xmpp_stream *stream, iks *node)
762 {
763 	/* close stream */
764 	stream->state = XSS_ERROR;
765 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid dialback result!\n", stream->jid, stream->address, stream->port);
766 }
767 
768 /**
769  * Handle <db:result type='error'>
770  */
on_stream_dialback_result_error(struct xmpp_stream * stream,iks * node)771 static void on_stream_dialback_result_error(struct xmpp_stream *stream, iks *node)
772 {
773 	/* close stream */
774 	stream->state = XSS_ERROR;
775 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, error dialback result!\n", stream->jid, stream->address, stream->port);
776 }
777 
778 /**
779  * Handle <db:result>
780  */
on_stream_dialback_result_key(struct xmpp_stream * stream,iks * node)781 static void on_stream_dialback_result_key(struct xmpp_stream *stream, iks *node)
782 {
783 	struct xmpp_stream_context *context = stream->context;
784 	const char *from = iks_find_attrib_soft(node, "from");
785 	const char *to = iks_find_attrib_soft(node, "to");
786 	iks *cdata = iks_child(node);
787 	iks *reply;
788 	const char *dialback_key = NULL;
789 
790 	if (cdata && iks_type(cdata) == IKS_CDATA) {
791 		dialback_key = iks_cdata(cdata);
792 	}
793 	if (zstr(dialback_key)) {
794 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing dialback key");
795 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing key!\n", stream->jid, stream->address, stream->port);
796 		iks_send(stream->parser, error);
797 		iks_delete(error);
798 		stream->state = XSS_ERROR;
799 		return;
800 	}
801 
802 	if (zstr(from)) {
803 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing from");
804 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing from!\n", stream->jid, stream->address, stream->port);
805 		iks_send(stream->parser, error);
806 		iks_delete(error);
807 		stream->state = XSS_ERROR;
808 		return;
809 	}
810 
811 	if (zstr(to)) {
812 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing to");
813 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing to!\n", stream->jid, stream->address, stream->port);
814 		iks_send(stream->parser, error);
815 		iks_delete(error);
816 		stream->state = XSS_ERROR;
817 		return;
818 	}
819 
820 	if (strcmp(context->domain, to)) {
821 		iks *error = iks_new_error(node, STANZA_ERROR_ITEM_NOT_FOUND);
822 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid domain!\n", stream->jid, stream->address, stream->port);
823 		iks_send(stream->parser, error);
824 		iks_delete(error);
825 		stream->state = XSS_ERROR;
826 		return;
827 	}
828 
829 	/* this stream is not routable */
830 	stream->state = XSS_READY;
831 	stream->jid = switch_core_strdup(stream->pool, from);
832 
833 	if (context->ready_callback && !context->ready_callback(stream)) {
834 		iks *error = iks_new_error(node, STANZA_ERROR_INTERNAL_SERVER_ERROR);
835 		iks_send(stream->parser, error);
836 		iks_delete(error);
837 		stream->state = XSS_ERROR;
838 		return;
839 	}
840 
841 	/* TODO validate key */
842 	reply = iks_new("db:result");
843 	iks_insert_attrib(reply, "from", to);
844 	iks_insert_attrib(reply, "to", from);
845 	iks_insert_attrib(reply, "type", "valid");
846 	iks_send(stream->parser, reply);
847 	iks_delete(reply);
848 }
849 
850 /**
851  * Handle <db:result>
852  */
on_stream_dialback_result(struct xmpp_stream * stream,iks * node)853 static void on_stream_dialback_result(struct xmpp_stream *stream, iks *node)
854 {
855 	const char *type = iks_find_attrib_soft(node, "type");
856 
857 	if (stream->state == XSS_ERROR || stream->state == XSS_DESTROY) {
858 		stream->state = XSS_ERROR;
859 		return;
860 	}
861 
862 	if (zstr(type)) {
863 		on_stream_dialback_result_key(stream, node);
864 	} else if (!strcmp("valid", type)) {
865 		on_stream_dialback_result_valid(stream, node);
866 	} else if (!strcmp("invalid", type)) {
867 		on_stream_dialback_result_invalid(stream, node);
868 	} else if (!strcmp("error", type)) {
869 		on_stream_dialback_result_error(stream, node);
870 	}
871 }
872 
873 /**
874  * Handle <db:verify>
875  */
on_stream_dialback_verify(struct xmpp_stream * stream,iks * node)876 static void on_stream_dialback_verify(struct xmpp_stream *stream, iks *node)
877 {
878 	struct xmpp_stream_context *context = stream->context;
879 	const char *from = iks_find_attrib_soft(node, "from");
880 	const char *id = iks_find_attrib_soft(node, "id");
881 	const char *to = iks_find_attrib_soft(node, "to");
882 	iks *cdata = iks_child(node);
883 	iks *reply;
884 	const char *dialback_key = NULL;
885 	char *expected_key = NULL;
886 	int valid;
887 
888 	if (stream->state == XSS_ERROR || stream->state == XSS_DESTROY) {
889 		stream->state = XSS_ERROR;
890 		return;
891 	}
892 
893 	if (cdata && iks_type(cdata) == IKS_CDATA) {
894 		dialback_key = iks_cdata(cdata);
895 	}
896 	if (zstr(dialback_key)) {
897 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing dialback key");
898 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing key!\n", stream->jid, stream->address, stream->port);
899 		iks_send(stream->parser, error);
900 		iks_delete(error);
901 		return;
902 	}
903 
904 	if (zstr(id)) {
905 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing id");
906 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing stream ID!\n", stream->jid, stream->address, stream->port);
907 		iks_send(stream->parser, error);
908 		iks_delete(error);
909 		return;
910 	}
911 
912 	if (zstr(from)) {
913 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing from");
914 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing from!\n", stream->jid, stream->address, stream->port);
915 		iks_send(stream->parser, error);
916 		iks_delete(error);
917 		return;
918 	}
919 
920 	if (zstr(to)) {
921 		iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing to");
922 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing to!\n", stream->jid, stream->address, stream->port);
923 		iks_send(stream->parser, error);
924 		iks_delete(error);
925 		return;
926 	}
927 
928 	if (strcmp(context->domain, to)) {
929 		iks *error = iks_new_error(node, STANZA_ERROR_ITEM_NOT_FOUND);
930 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid domain!\n", stream->jid, stream->address, stream->port);
931 		iks_send(stream->parser, error);
932 		iks_delete(error);
933 		return;
934 	}
935 
936 	expected_key = iks_server_dialback_key(context->dialback_secret, from, to, id);
937 	valid = expected_key && !strcmp(expected_key, dialback_key);
938 
939 	reply = iks_new("db:verify");
940 	iks_insert_attrib(reply, "from", to);
941 	iks_insert_attrib(reply, "to", from);
942 	iks_insert_attrib(reply, "id", id);
943 	iks_insert_attrib(reply, "type", valid ? "valid" : "invalid");
944 	iks_send(stream->parser, reply);
945 	iks_delete(reply);
946 	free(expected_key);
947 
948 	if (!valid) {
949 		/* close the stream */
950 		stream->state = XSS_ERROR;
951 	}
952 }
953 
954 /**
955  * Handle <stream> from an outbound peer server
956  */
on_outbound_server_stream_start(struct xmpp_stream * stream,iks * node)957 static void on_outbound_server_stream_start(struct xmpp_stream *stream, iks *node)
958 {
959 	const char *xmlns = iks_find_attrib_soft(node, "xmlns");
960 
961 	/* xmlns = server */
962 	if (zstr(xmlns) || strcmp(xmlns, IKS_NS_SERVER)) {
963 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
964 		stream->state = XSS_ERROR;
965 		return;
966 	}
967 
968 	switch (stream->state) {
969 		case XSS_CONNECT: {
970 			/* get stream ID and send dialback */
971 			const char *id = iks_find_attrib_soft(node, "id");
972 			if (zstr(id)) {
973 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, missing stream ID!\n", stream->jid, stream->address, stream->port);
974 				stream->state = XSS_ERROR;
975 				return;
976 			}
977 			xmpp_stream_set_id(stream, id);
978 
979 			/* send dialback */
980 			xmpp_send_dialback_key(stream);
981 			break;
982 		}
983 		case XSS_SHUTDOWN:
984 			/* strange... I expect IKS_NODE_STOP, this is a workaround. */
985 			stream->state = XSS_DESTROY;
986 			break;
987 		case XSS_SECURE:
988 		case XSS_AUTHENTICATED:
989 		case XSS_RESOURCE_BOUND:
990 		case XSS_READY:
991 		case XSS_ERROR:
992 		case XSS_DESTROY:
993 			/* bad state */
994 			stream->state = XSS_ERROR;
995 			break;
996 	}
997 }
998 
999 /**
1000  * Handle <stream> from an inbound peer server
1001  * @param stream the stream
1002  * @param node the stream message
1003  */
on_inbound_server_stream_start(struct xmpp_stream * stream,iks * node)1004 static void on_inbound_server_stream_start(struct xmpp_stream *stream, iks *node)
1005 {
1006 	struct xmpp_stream_context *context = stream->context;
1007 	const char *to = iks_find_attrib_soft(node, "to");
1008 	const char *xmlns = iks_find_attrib_soft(node, "xmlns");
1009 
1010 	/* to is required, must be server domain */
1011 	if (zstr(to) || strcmp(context->domain, to)) {
1012 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong server domain!\n", stream->jid, stream->address, stream->port);
1013 		stream->state = XSS_ERROR;
1014 		return;
1015 	}
1016 
1017 	/* xmlns = server */
1018 	if (zstr(xmlns) || strcmp(xmlns, IKS_NS_SERVER)) {
1019 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
1020 		stream->state = XSS_ERROR;
1021 		return;
1022 	}
1023 
1024 	switch (stream->state) {
1025 		case XSS_CONNECT:
1026 			xmpp_send_server_header_auth(stream);
1027 			break;
1028 		case XSS_SECURE:
1029 			break;
1030 		case XSS_AUTHENTICATED: {
1031 			if (context->ready_callback && !context->ready_callback(stream)) {
1032 				stream->state = XSS_ERROR;
1033 				break;
1034 			}
1035 
1036 			/* all set */
1037 			xmpp_send_server_header_features(stream);
1038 			stream->state = XSS_READY;
1039 
1040 			/* add to available streams */
1041 			switch_mutex_lock(context->streams_mutex);
1042 			switch_core_hash_insert(context->routes, stream->jid, stream);
1043 			switch_mutex_unlock(context->streams_mutex);
1044 			break;
1045 		}
1046 		case XSS_SHUTDOWN:
1047 			/* strange... I expect IKS_NODE_STOP, this is a workaround. */
1048 			stream->state = XSS_DESTROY;
1049 			break;
1050 		case XSS_RESOURCE_BOUND:
1051 		case XSS_READY:
1052 		case XSS_ERROR:
1053 		case XSS_DESTROY:
1054 			/* bad state */
1055 			stream->state = XSS_ERROR;
1056 			break;
1057 	}
1058 }
1059 
1060 /**
1061  * Handle XML stream callback
1062  * @param user_data the xmpp stream
1063  * @param type stream type (start/normal/stop/etc)
1064  * @param node optional XML node
1065  * @return IKS_OK
1066  */
on_stream(void * user_data,int type,iks * node)1067 static int on_stream(void *user_data, int type, iks *node)
1068 {
1069 	struct xmpp_stream *stream = (struct xmpp_stream *)user_data;
1070 
1071 	stream->idle = 0;
1072 
1073 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, state = %s, node type = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), iks_node_type_to_string(type));
1074 
1075 	switch(type) {
1076 		case IKS_NODE_START:
1077 			/* <stream> */
1078 			if (node) {
1079 				if (stream->s2s) {
1080 					if (stream->incoming) {
1081 						on_inbound_server_stream_start(stream, node);
1082 					} else {
1083 						on_outbound_server_stream_start(stream, node);
1084 					}
1085 				} else {
1086 					on_client_stream_start(stream, node);
1087 				}
1088 			} else {
1089 				stream->state = XSS_ERROR;
1090 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, missing node!\n", stream->jid, stream->address, stream->port);
1091 			}
1092 			break;
1093 		case IKS_NODE_NORMAL:
1094 			/* stanza */
1095 			if (node) {
1096 				const char *name = iks_name(node);
1097 				if (!strcmp("iq", name) || !strcmp("message", name)) {
1098 					on_stream_iq(stream, node);
1099 				} else if (!strcmp("presence", name)) {
1100 					on_stream_presence(stream, node);
1101 				} else if (!strcmp("auth", name)) {
1102 					on_stream_auth(stream, node);
1103 				} else if (!strcmp("starttls", name)) {
1104 					on_stream_starttls(stream, node);
1105 				} else if (!strcmp("db:result", name)) {
1106 					on_stream_dialback_result(stream, node);
1107 				} else if (!strcmp("db:verify", name)) {
1108 					on_stream_dialback_verify(stream, node);
1109 				} else {
1110 					/* unknown first-level element */
1111 					switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, unknown first-level element: %s\n", stream->jid, stream->address, stream->port, name);
1112 				}
1113 			}
1114 			break;
1115 		case IKS_NODE_ERROR:
1116 			/* <error> */
1117 			break;
1118 		case IKS_NODE_STOP:
1119 			on_stream_stop(stream);
1120 			break;
1121 	}
1122 
1123 	if (node) {
1124 		iks_delete(node);
1125 	}
1126 
1127 	return IKS_OK;
1128 }
1129 
1130 /**
1131  * Cleanup xmpp stream
1132  */
xmpp_stream_destroy(struct xmpp_stream * stream)1133 static void xmpp_stream_destroy(struct xmpp_stream *stream)
1134 {
1135 	struct xmpp_stream_context *context = stream->context;
1136 	switch_memory_pool_t *pool = stream->pool;
1137 	stream->state = XSS_DESTROY;
1138 
1139 	/* remove from available streams */
1140 	switch_mutex_lock(context->streams_mutex);
1141 	if (stream->jid) {
1142 		switch_core_hash_delete(context->routes, stream->jid);
1143 	}
1144 	if (stream->id) {
1145 		switch_core_hash_delete(context->streams, stream->id);
1146 	}
1147 	switch_mutex_unlock(context->streams_mutex);
1148 
1149 	/* close connection */
1150 	if (stream->parser) {
1151 		iks_disconnect(stream->parser);
1152 		iks_parser_delete(stream->parser);
1153 	}
1154 
1155 	if (stream->socket) {
1156 		switch_socket_shutdown(stream->socket, SWITCH_SHUTDOWN_READWRITE);
1157 		switch_socket_close(stream->socket);
1158 	}
1159 
1160 	/* flush pending messages */
1161 	if (stream->msg_queue) {
1162 		char *msg;
1163 		while (switch_queue_trypop(stream->msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) {
1164 			iks_free(msg);
1165 		}
1166 	}
1167 
1168 	if (context->destroy_callback) {
1169 		context->destroy_callback(stream);
1170 	}
1171 
1172 	switch_core_destroy_memory_pool(&pool);
1173 }
1174 
1175 /**
1176  * @param stream the xmpp stream to check
1177  * @return 0 if stream is dead
1178  */
xmpp_stream_ready(struct xmpp_stream * stream)1179 static int xmpp_stream_ready(struct xmpp_stream *stream)
1180 {
1181 	return stream->state != XSS_ERROR && stream->state != XSS_DESTROY;
1182 }
1183 
1184 #define KEEP_ALIVE_INTERVAL_NS (60 * 1000 * 1000)
1185 
1186 /**
1187  * Thread that handles xmpp XML stream
1188  * @param thread this thread
1189  * @param obj the xmpp stream
1190  * @return NULL
1191  */
xmpp_stream_thread(switch_thread_t * thread,void * obj)1192 static void *SWITCH_THREAD_FUNC xmpp_stream_thread(switch_thread_t *thread, void *obj)
1193 {
1194 	struct xmpp_stream *stream = (struct xmpp_stream *)obj;
1195 	struct xmpp_stream_context *context = stream->context;
1196 	int err_count = 0;
1197 	switch_time_t last_activity = 0;
1198 	int ping_id = 1;
1199 
1200 	if (stream->incoming) {
1201 		switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1202 	}
1203 
1204 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s:%i, New %s_%s stream\n", stream->address, stream->port, stream->s2s ? "s2s" : "c2s", stream->incoming ? "in" : "out");
1205 
1206 	if (stream->s2s && !stream->incoming) {
1207 		xmpp_send_outbound_server_header(stream);
1208 	}
1209 
1210 	while (xmpp_stream_ready(stream)) {
1211 		char *msg;
1212 		int result;
1213 		switch_time_t now = switch_micro_time_now();
1214 
1215 		/* read any messages from client */
1216 		stream->idle = 1;
1217 		result = iks_recv(stream->parser, 0);
1218 		switch (result) {
1219 		case IKS_OK:
1220 			err_count = 0;
1221 			break;
1222 		case IKS_NET_TLSFAIL:
1223 		case IKS_NET_RWERR:
1224 		case IKS_NET_NOCONN:
1225 		case IKS_NET_NOSOCK:
1226 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, iks_recv() error = %s, ending session\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1227 			stream->state = XSS_ERROR;
1228 			goto done;
1229 		default:
1230 			if (err_count++ == 0) {
1231 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, iks_recv() error = %s\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1232 			}
1233 			if (err_count >= 50) {
1234 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, too many iks_recv() error = %s, ending session\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1235 				stream->state = XSS_ERROR;
1236 				goto done;
1237 			}
1238 		}
1239 
1240 		/* send queued stanzas once stream is authorized for outbound stanzas */
1241 		if (!stream->s2s || stream->state == XSS_READY) {
1242 			while (switch_queue_trypop(stream->msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) {
1243 				if (!stream->s2s || !stream->incoming) {
1244 					iks_send_raw(stream->parser, msg);
1245 				} else {
1246 					/* TODO sent out wrong stream! */
1247 				}
1248 				iks_free(msg);
1249 				stream->idle = 0;
1250 			}
1251 		}
1252 
1253 		/* check for shutdown */
1254 		if (stream->state != XSS_DESTROY && context->shutdown && stream->state != XSS_SHUTDOWN) {
1255 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, detected shutdown\n", stream->jid, stream->address, stream->port);
1256 			iks_send_raw(stream->parser, "</stream:stream>");
1257 			stream->state = XSS_SHUTDOWN;
1258 			stream->idle = 0;
1259 		}
1260 
1261 		if (stream->idle) {
1262 			int fdr = 0;
1263 
1264 			/* send keep-alive ping if idle for a long time */
1265 			if (stream->s2s && !stream->incoming && stream->state == XSS_READY && now - last_activity > KEEP_ALIVE_INTERVAL_NS) {
1266 				char *ping = switch_mprintf("<iq to=\"%s\" from=\"%s\" type=\"get\" id=\"internal-%d\"><ping xmlns=\""IKS_NS_XMPP_PING"\"/></iq>",
1267 					stream->jid, stream->context->domain, ping_id++);
1268 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, keep alive\n", stream->jid, stream->address, stream->port);
1269 				last_activity = now;
1270 				iks_send_raw(stream->parser, ping);
1271 				free(ping);
1272 			}
1273 
1274 			switch_poll(stream->pollfd, 1, &fdr, 20000);
1275 		} else {
1276 			last_activity = now;
1277 			switch_os_yield();
1278 		}
1279 	}
1280 
1281   done:
1282 
1283 	if (stream->incoming) {
1284 		xmpp_stream_destroy(stream);
1285 		switch_thread_rwlock_unlock(context->shutdown_rwlock);
1286 	}
1287 
1288 	return NULL;
1289 }
1290 
1291 /**
1292  * Initialize the xmpp stream
1293  * @param context the stream context
1294  * @param stream the stream to initialize
1295  * @param pool for this stream
1296  * @param address remote address
1297  * @param port remote port
1298  * @param s2s true if a server-to-server stream
1299  * @param incoming true if incoming stream
1300  * @return the stream
1301  */
xmpp_stream_init(struct xmpp_stream_context * context,struct xmpp_stream * stream,switch_memory_pool_t * pool,const char * address,int port,int s2s,int incoming)1302 static struct xmpp_stream *xmpp_stream_init(struct xmpp_stream_context *context, struct xmpp_stream *stream, switch_memory_pool_t *pool, const char *address, int port, int s2s, int incoming)
1303 {
1304 	stream->context = context;
1305 	stream->pool = pool;
1306 	if (incoming) {
1307 		xmpp_stream_new_id(stream);
1308 	}
1309 	switch_mutex_init(&stream->mutex, SWITCH_MUTEX_NESTED, pool);
1310 	if (!zstr(address)) {
1311 		stream->address = switch_core_strdup(pool, address);
1312 	}
1313 	if (port > 0) {
1314 		stream->port = port;
1315 	}
1316 	stream->s2s = s2s;
1317 	stream->incoming = incoming;
1318 	switch_queue_create(&stream->msg_queue, MAX_QUEUE_LEN, pool);
1319 
1320 	/* set up XMPP stream parser */
1321 	stream->parser = iks_stream_new(stream->s2s ? IKS_NS_SERVER : IKS_NS_CLIENT, stream, on_stream);
1322 
1323 	/* enable logging of XMPP stream */
1324 	iks_set_log_hook(stream->parser, on_stream_log);
1325 
1326 	return stream;
1327 }
1328 
1329 /**
1330  * Create a new xmpp stream
1331  * @param context the stream context
1332  * @param pool the memory pool for this stream
1333  * @param address remote address
1334  * @param port remote port
1335  * @param s2s true if server-to-server stream
1336  * @param incoming true if incoming stream
1337  * @return the new stream or NULL
1338  */
xmpp_stream_create(struct xmpp_stream_context * context,switch_memory_pool_t * pool,const char * address,int port,int s2s,int incoming)1339 static struct xmpp_stream *xmpp_stream_create(struct xmpp_stream_context *context, switch_memory_pool_t *pool, const char *address, int port, int s2s, int incoming)
1340 {
1341 	struct xmpp_stream *stream = NULL;
1342 	if (!(stream = switch_core_alloc(pool, sizeof(*stream)))) {
1343 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
1344 		return NULL;
1345 	}
1346 	return xmpp_stream_init(context, stream, pool, address, port, s2s, incoming);
1347 }
1348 
1349 /**
1350  * Thread that handles XMPP XML stream
1351  * @param thread this thread
1352  * @param obj the XMPP stream
1353  * @return NULL
1354  */
xmpp_outbound_stream_thread(switch_thread_t * thread,void * obj)1355 static void *SWITCH_THREAD_FUNC xmpp_outbound_stream_thread(switch_thread_t *thread, void *obj)
1356 {
1357 	struct xmpp_stream *stream = (struct xmpp_stream *)obj;
1358 	struct xmpp_stream_context *context = stream->context;
1359 	switch_socket_t *socket;
1360 	int warned = 0;
1361 
1362 	switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1363 
1364 	/* connect to server */
1365 	while (!context->shutdown) {
1366 		struct xmpp_stream *new_stream = NULL;
1367 		switch_memory_pool_t *pool;
1368 		switch_sockaddr_t *sa;
1369 
1370 		if (switch_sockaddr_info_get(&sa, stream->address, SWITCH_UNSPEC, stream->port, 0, stream->pool) != SWITCH_STATUS_SUCCESS) {
1371 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s:%i, failed to get sockaddr info!\n", stream->address, stream->port);
1372 			goto fail;
1373 		}
1374 
1375 		if (switch_socket_create(&socket, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, stream->pool) != SWITCH_STATUS_SUCCESS) {
1376 			if (!warned) {
1377 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, failed to create socket!\n", stream->address, stream->port);
1378 			}
1379 			goto sock_fail;
1380 		}
1381 
1382 		switch_socket_opt_set(socket, SWITCH_SO_KEEPALIVE, 1);
1383 		switch_socket_opt_set(socket, SWITCH_SO_TCP_NODELAY, 1);
1384 
1385 		if (switch_socket_connect(socket, sa) != SWITCH_STATUS_SUCCESS) {
1386 			if (!warned) {
1387 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, Socket Error!\n", stream->address, stream->port);
1388 			}
1389 			goto sock_fail;
1390 		}
1391 
1392 		if (warned) {
1393 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, connected!\n", stream->address, stream->port);
1394 			warned = 0;
1395 		}
1396 
1397 		/* run the stream thread */
1398 		xmpp_stream_set_socket(stream, socket);
1399 		xmpp_stream_thread(thread, stream);
1400 
1401 		/* re-establish connection if not shutdown */
1402 		if (!context->shutdown) {
1403 			/* create new stream for reconnection */
1404 			switch_core_new_memory_pool(&pool);
1405 			new_stream = xmpp_stream_create(stream->context, pool, stream->address, stream->port, 1, 0);
1406 			new_stream->jid = switch_core_strdup(pool, stream->jid);
1407 			xmpp_stream_destroy(stream);
1408 			stream = new_stream;
1409 
1410 			switch_yield(1000 * 1000); /* 1000 ms */
1411 			continue;
1412 		}
1413 		break;
1414 
1415    sock_fail:
1416 		if (socket) {
1417 			switch_socket_close(socket);
1418 			socket = NULL;
1419 		}
1420 		if (!warned) {
1421 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not connect to %s:%i\n", stream->address, stream->port);
1422 			warned = 1;
1423 		}
1424 		switch_yield(1000 * 1000); /* 1000 ms */
1425 	}
1426 
1427   fail:
1428 
1429 	xmpp_stream_destroy(stream);
1430 
1431 	switch_thread_rwlock_unlock(context->shutdown_rwlock);
1432 	return NULL;
1433 }
1434 
1435 /**
1436  * Set the id for this stream
1437  * @param stream
1438  * @param id
1439  */
xmpp_stream_set_id(struct xmpp_stream * stream,const char * id)1440 static void xmpp_stream_set_id(struct xmpp_stream *stream, const char *id)
1441 {
1442 	struct xmpp_stream_context *context = stream->context;
1443 	if (!zstr(stream->id)) {
1444 		switch_mutex_lock(context->streams_mutex);
1445 		switch_core_hash_delete(context->streams, stream->id);
1446 		switch_mutex_unlock(context->streams_mutex);
1447 	}
1448 	if (!zstr(id)) {
1449 		stream->id = switch_core_strdup(stream->pool, id);
1450 		switch_mutex_lock(context->streams_mutex);
1451 		switch_core_hash_insert(context->streams, stream->id, stream);
1452 		switch_mutex_unlock(context->streams_mutex);
1453 	} else {
1454 		stream->id = NULL;
1455 	}
1456 }
1457 
1458 /**
1459  * Destroy the listener
1460  * @param server the server
1461  */
xmpp_listener_destroy(struct xmpp_listener * listener)1462 static void xmpp_listener_destroy(struct xmpp_listener *listener)
1463 {
1464 	switch_memory_pool_t *pool = listener->pool;
1465 
1466 	/* shutdown socket */
1467 	if (listener->socket) {
1468 		switch_socket_shutdown(listener->socket, SWITCH_SHUTDOWN_READWRITE);
1469 		switch_socket_close(listener->socket);
1470 	}
1471 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "xmpp listener %s:%u closed\n", listener->addr, listener->port);
1472 	switch_core_destroy_memory_pool(&pool);
1473 }
1474 
1475 /**
1476  * Open a new XMPP stream with a peer server
1477  * @param peer_domain of server - if not set, address is used
1478  * @param peer_address of server - if not set, domain is used
1479  * @param peer_port of server - if not set default port is used
1480  */
xmpp_stream_context_connect(struct xmpp_stream_context * context,const char * peer_domain,const char * peer_address,int peer_port)1481 switch_status_t xmpp_stream_context_connect(struct xmpp_stream_context *context, const char *peer_domain, const char *peer_address, int peer_port)
1482 {
1483 	struct xmpp_stream *stream;
1484 	switch_memory_pool_t *pool;
1485 	switch_thread_t *thread;
1486 	switch_threadattr_t *thd_attr = NULL;
1487 
1488 	if (peer_port <= 0) {
1489 		peer_port = IKS_JABBER_SERVER_PORT;
1490 	}
1491 
1492 	if (zstr(peer_address)) {
1493 		peer_address = peer_domain;
1494 	} else if (zstr(peer_domain)) {
1495 		peer_domain = peer_address;
1496 	}
1497 
1498 	/* start outbound stream thread */
1499 	switch_core_new_memory_pool(&pool);
1500 	stream = xmpp_stream_create(context, pool, peer_address, peer_port, 1, 0);
1501 	stream->jid = switch_core_strdup(pool, peer_domain);
1502 	switch_threadattr_create(&thd_attr, pool);
1503 			switch_threadattr_detach_set(thd_attr, 1);
1504 			switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1505 			switch_thread_create(&thread, thd_attr, xmpp_outbound_stream_thread, stream, pool);
1506 
1507 	return SWITCH_STATUS_SUCCESS;
1508 }
1509 
1510 /**
1511  * Thread that listens for new XMPP connections
1512  * @param thread this thread
1513  * @param obj the listener
1514  * @return NULL
1515  */
xmpp_listener_thread(switch_thread_t * thread,void * obj)1516 static void *SWITCH_THREAD_FUNC xmpp_listener_thread(switch_thread_t *thread, void *obj)
1517 {
1518 	struct xmpp_listener *listener = (struct xmpp_listener *)obj;
1519 	struct xmpp_stream_context *context = listener->context;
1520 	switch_memory_pool_t *pool = NULL;
1521 	uint32_t errs = 0;
1522 	int warned = 0;
1523 
1524 	switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1525 
1526 	/* bind to XMPP port */
1527 	while (!context->shutdown) {
1528 		switch_status_t rv;
1529 		switch_sockaddr_t *sa;
1530 		rv = switch_sockaddr_info_get(&sa, listener->addr, SWITCH_UNSPEC, listener->port, 0, listener->pool);
1531 		if (rv)
1532 			goto fail;
1533 		rv = switch_socket_create(&listener->socket, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, listener->pool);
1534 		if (rv)
1535 			goto sock_fail;
1536 		rv = switch_socket_opt_set(listener->socket, SWITCH_SO_REUSEADDR, 1);
1537 		if (rv)
1538 			goto sock_fail;
1539 #ifdef WIN32
1540 		/* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
1541 		if (switch_sockaddr_get_family(sa) == AF_INET6) {
1542 			rv = switch_socket_opt_set(listener->socket, 16384, 0);
1543 			if (rv) goto sock_fail;
1544 		}
1545 #endif
1546 		rv = switch_socket_bind(listener->socket, sa);
1547 		if (rv)
1548 			goto sock_fail;
1549 		rv = switch_socket_listen(listener->socket, 5);
1550 		if (rv)
1551 			goto sock_fail;
1552 
1553 		rv = switch_socket_create_pollset(&listener->read_pollfd, listener->socket, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool);
1554 		if (rv) {
1555 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Create pollset for %s listener socket %s:%u error!\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1556 			goto sock_fail;
1557 		}
1558 
1559 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "xmpp %s listener bound to %s:%u\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1560 
1561 		break;
1562    sock_fail:
1563 		if (listener->socket) {
1564 			switch_socket_close(listener->socket);
1565 			listener->socket = NULL;
1566 		}
1567 		if (!warned) {
1568 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! xmpp %s listener could not bind to %s:%u\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1569 			warned = 1;
1570 		}
1571 		switch_yield(1000 * 100); /* 100 ms */
1572 	}
1573 
1574 	/* Listen for XMPP client connections */
1575 	while (!context->shutdown) {
1576 		switch_socket_t *socket = NULL;
1577 		switch_status_t rv;
1578 		int32_t fdr;
1579 
1580 		if (pool == NULL && switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
1581 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create memory pool for new client connection!\n");
1582 			goto fail;
1583 		}
1584 
1585 		/* is there a new connection? */
1586 		rv = switch_poll(listener->read_pollfd, 1, &fdr, 1000 * 1000 /* 1000 ms */);
1587 		if (rv != SWITCH_STATUS_SUCCESS) {
1588 			continue;
1589 		}
1590 
1591 		/* accept the connection */
1592 		if ((rv = switch_socket_accept(&socket, listener->socket, pool))) {
1593 			if (context->shutdown) {
1594 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting down xmpp listener\n");
1595 				goto end;
1596 			} else {
1597 				/* I wish we could use strerror_r here but its not defined everywhere =/ */
1598 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Accept connection error [%s]\n", strerror(errno));
1599 				if (++errs > 100) {
1600 					goto end;
1601 				}
1602 			}
1603 		} else { /* got a new connection */
1604 			switch_thread_t *thread;
1605 			switch_threadattr_t *thd_attr = NULL;
1606 			struct xmpp_stream *stream;
1607 			switch_sockaddr_t *sa = NULL;
1608 			char remote_ip[50] = { 0 };
1609 			int remote_port = 0;
1610 
1611 			errs = 0;
1612 
1613 			/* get remote address and port */
1614 			if (switch_socket_addr_get(&sa, SWITCH_TRUE, socket) == SWITCH_STATUS_SUCCESS && sa) {
1615 				switch_get_addr(remote_ip, sizeof(remote_ip), sa);
1616 				remote_port = switch_sockaddr_get_port(sa);
1617 			}
1618 
1619 			if (zstr_buf(remote_ip)) {
1620 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get IP of incoming connection.\n");
1621 				switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1622 				switch_socket_close(socket);
1623 				continue;
1624 			}
1625 
1626 			/* check if connection is allowed */
1627 			if (listener->acl) {
1628 				if (!switch_check_network_list_ip(remote_ip, listener->acl)) {
1629 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "ACL %s denies access to %s.\n", listener->acl, remote_ip);
1630 					switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1631 					switch_socket_close(socket);
1632 					continue;
1633 				}
1634 			}
1635 
1636 			/* start connection thread */
1637 			if (!(stream = xmpp_stream_create(context, pool, remote_ip, remote_port, listener->s2s, 1))) {
1638 				switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1639 				switch_socket_close(socket);
1640 				break;
1641 			}
1642 			xmpp_stream_set_socket(stream, socket);
1643 			pool = NULL; /* connection now owns the pool */
1644 			switch_threadattr_create(&thd_attr, stream->pool);
1645 			switch_threadattr_detach_set(thd_attr, 1);
1646 			switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1647 			switch_thread_create(&thread, thd_attr, xmpp_stream_thread, stream, stream->pool);
1648 		}
1649 	}
1650 
1651   end:
1652 
1653 	if (pool) {
1654 		switch_core_destroy_memory_pool(&pool);
1655 	}
1656 
1657   fail:
1658 
1659 	xmpp_listener_destroy(listener);
1660 
1661 	switch_thread_rwlock_unlock(context->shutdown_rwlock);
1662 	return NULL;
1663 }
1664 
1665 /**
1666  * Add a new socket to listen for XMPP client/server connections.
1667  * @param context the XMPP context
1668  * @param addr the IP address
1669  * @param port the port
1670  * @param is_s2s true if s2s
1671  * @param acl name of optional access control list
1672  * @return SWITCH_STATUS_SUCCESS if successful
1673  */
xmpp_stream_context_listen(struct xmpp_stream_context * context,const char * addr,int port,int is_s2s,const char * acl)1674 switch_status_t xmpp_stream_context_listen(struct xmpp_stream_context *context, const char *addr, int port, int is_s2s, const char *acl)
1675 {
1676 	switch_memory_pool_t *pool;
1677 	struct xmpp_listener *new_listener = NULL;
1678 	switch_thread_t *thread;
1679 	switch_threadattr_t *thd_attr = NULL;
1680 
1681 	if (zstr(addr)) {
1682 		return SWITCH_STATUS_FALSE;
1683 	}
1684 
1685 	switch_core_new_memory_pool(&pool);
1686 	new_listener = switch_core_alloc(pool, sizeof(*new_listener));
1687 	new_listener->pool = pool;
1688 	new_listener->addr = switch_core_strdup(pool, addr);
1689 	if (!zstr(acl)) {
1690 		new_listener->acl = switch_core_strdup(pool, acl);
1691 	}
1692 
1693 	new_listener->s2s = is_s2s;
1694 	if (port <= 0) {
1695 		new_listener->port = is_s2s ? IKS_JABBER_SERVER_PORT : IKS_JABBER_PORT;
1696 	} else {
1697 		new_listener->port = port;
1698 	}
1699 	new_listener->context = context;
1700 
1701 	/* start the server thread */
1702 	switch_threadattr_create(&thd_attr, pool);
1703 	switch_threadattr_detach_set(thd_attr, 1);
1704 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1705 	switch_thread_create(&thread, thd_attr, xmpp_listener_thread, new_listener, pool);
1706 
1707 	return SWITCH_STATUS_SUCCESS;
1708 }
1709 
1710 /**
1711  * Queue a message for delivery
1712  */
xmpp_stream_context_send(struct xmpp_stream_context * context,const char * jid,iks * msg)1713 void xmpp_stream_context_send(struct xmpp_stream_context *context, const char *jid, iks *msg)
1714 {
1715 	if (!zstr(jid)) {
1716 		if (msg) {
1717 			struct xmpp_stream *stream;
1718 			switch_mutex_lock(context->streams_mutex);
1719 			stream = switch_core_hash_find(context->routes, jid);
1720 			if (stream) {
1721 				char *raw = iks_string(NULL, msg);
1722 				if (switch_queue_trypush(stream->msg_queue, raw) != SWITCH_STATUS_SUCCESS) {
1723 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s, %s:%i, failed to deliver outbound message via %s!\n", stream->jid, stream->address, stream->port, jid);
1724 					iks_free(raw);
1725 				}
1726 			} else {
1727 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s stream is gone\n", jid);
1728 				/* TODO automatically open connection if valid domain JID? */
1729 			}
1730 			switch_mutex_unlock(context->streams_mutex);
1731 		} else {
1732 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing message\n");
1733 		}
1734 	} else {
1735 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing stream JID\n");
1736 	}
1737 }
1738 
1739 /**
1740  * Dump xmpp stream stats
1741  */
xmpp_stream_context_dump(struct xmpp_stream_context * context,switch_stream_handle_t * stream)1742 void xmpp_stream_context_dump(struct xmpp_stream_context *context, switch_stream_handle_t *stream)
1743 {
1744 	switch_hash_index_t *hi;
1745 	switch_mutex_lock(context->streams_mutex);
1746 	stream->write_function(stream, "\nACTIVE STREAMS\n");
1747 	for (hi = switch_core_hash_first(context->streams); hi; hi = switch_core_hash_next(&hi)) {
1748 		struct xmpp_stream *s = NULL;
1749 		const void *key;
1750 		void *val;
1751 		switch_core_hash_this(hi, &key, NULL, &val);
1752 		s = (struct xmpp_stream *)val;
1753 		switch_assert(s);
1754 		stream->write_function(stream, "        TYPE='%s_%s',ID='%s',JID='%s',REMOTE_ADDRESS='%s',REMOTE_PORT=%i,STATE='%s'\n", s->s2s ? "s2s" : "c2s", s->incoming ? "in" : "out", s->id, s->jid, s->address, s->port, xmpp_stream_state_to_string(s->state));
1755 	}
1756 	switch_mutex_unlock(context->streams_mutex);
1757 }
1758 
1759 /**
1760  * Create a new XMPP stream context
1761  * @param domain for new streams
1762  * @param domain_secret domain shared secret for server dialback
1763  * @param bind_cb callback function when a resource is bound to a new stream
1764  * @param ready callback function when new stream is ready
1765  * @param recv callback function when a new stanza is received
1766  * @param destroy callback function when a stream is destroyed
1767  * @return the context
1768  */
xmpp_stream_context_create(const char * domain,const char * domain_secret,xmpp_stream_bind_callback bind_cb,xmpp_stream_ready_callback ready,xmpp_stream_recv_callback recv,xmpp_stream_destroy_callback destroy)1769 struct xmpp_stream_context *xmpp_stream_context_create(const char *domain, const char *domain_secret, xmpp_stream_bind_callback bind_cb, xmpp_stream_ready_callback ready, xmpp_stream_recv_callback recv, xmpp_stream_destroy_callback destroy)
1770 {
1771 	switch_memory_pool_t *pool;
1772 	struct xmpp_stream_context *context;
1773 
1774 	switch_core_new_memory_pool(&pool);
1775 	context = switch_core_alloc(pool, sizeof(*context));
1776 	context->pool = pool;
1777 	switch_mutex_init(&context->streams_mutex, SWITCH_MUTEX_NESTED, context->pool);
1778 	switch_core_hash_init(&context->routes);
1779 	switch_core_hash_init(&context->streams);
1780 	context->dialback_secret = switch_core_strdup(context->pool, domain_secret);
1781 	context->bind_callback = bind_cb;
1782 	context->ready_callback = ready;
1783 	context->destroy_callback = destroy;
1784 	context->recv_callback = recv;
1785 	context->shutdown = 0;
1786 	context->domain = switch_core_strdup(context->pool, domain);
1787 	switch_thread_rwlock_create(&context->shutdown_rwlock, context->pool);
1788 	switch_core_hash_init(&context->users);
1789 
1790 	return context;
1791 }
1792 
1793 /**
1794  * Add an authorized user
1795  * @param context the context to add user to
1796  * @param user the username
1797  * @param password the password
1798  */
xmpp_stream_context_add_user(struct xmpp_stream_context * context,const char * user,const char * password)1799 void xmpp_stream_context_add_user(struct xmpp_stream_context *context, const char *user, const char *password)
1800 {
1801 	switch_core_hash_insert(context->users, user, switch_core_strdup(context->pool, password));
1802 }
1803 
1804 /**
1805  * Destroy an XMPP stream context.  All open streams are closed.
1806  * @param context to destroy
1807  */
xmpp_stream_context_destroy(struct xmpp_stream_context * context)1808 void xmpp_stream_context_destroy(struct xmpp_stream_context *context)
1809 {
1810 	switch_memory_pool_t *pool;
1811 	context->shutdown = 1;
1812 	/* wait for threads to finish */
1813 	switch_thread_rwlock_wrlock(context->shutdown_rwlock);
1814 	switch_core_hash_destroy(&context->routes);
1815 	switch_core_hash_destroy(&context->streams);
1816 	switch_core_hash_destroy(&context->users);
1817 	pool = context->pool;
1818 	switch_core_destroy_memory_pool(&pool);
1819 }
1820 
1821 /**
1822  * @param stream
1823  * @return true if server-to-server stream
1824  */
xmpp_stream_is_s2s(struct xmpp_stream * stream)1825 int xmpp_stream_is_s2s(struct xmpp_stream *stream)
1826 {
1827 	return stream->s2s;
1828 }
1829 
1830 /**
1831  * @param stream
1832  * @return true if incoming stream
1833  */
xmpp_stream_is_incoming(struct xmpp_stream * stream)1834 int xmpp_stream_is_incoming(struct xmpp_stream *stream)
1835 {
1836 	return stream->incoming;
1837 }
1838 
1839 /**
1840  * @param stream
1841  * @return the stream JID
1842  */
xmpp_stream_get_jid(struct xmpp_stream * stream)1843 const char *xmpp_stream_get_jid(struct xmpp_stream *stream)
1844 {
1845 	return stream->jid;
1846 }
1847 
1848 /**
1849  * Set private data for this stream
1850  */
xmpp_stream_set_private(struct xmpp_stream * stream,void * user_private)1851 void xmpp_stream_set_private(struct xmpp_stream *stream, void *user_private)
1852 {
1853 	stream->user_private = user_private;
1854 }
1855 
1856 /**
1857  * Get private data for this stream
1858  */
xmpp_stream_get_private(struct xmpp_stream * stream)1859 void *xmpp_stream_get_private(struct xmpp_stream *stream)
1860 {
1861 	return stream->user_private;
1862 }
1863 
1864 /**
1865  * Add PEM cert file to stream for new SSL connections
1866  */
xmpp_stream_context_add_cert(struct xmpp_stream_context * context,const char * cert_pem_file)1867 void xmpp_stream_context_add_cert(struct xmpp_stream_context *context, const char *cert_pem_file)
1868 {
1869 	context->cert_pem_file = switch_core_strdup(context->pool, cert_pem_file);
1870 }
1871 
1872 /**
1873  * Add PEM key file to stream for new SSL connections
1874  */
xmpp_stream_context_add_key(struct xmpp_stream_context * context,const char * key_pem_file)1875 void xmpp_stream_context_add_key(struct xmpp_stream_context *context, const char *key_pem_file)
1876 {
1877 	context->key_pem_file = switch_core_strdup(context->pool, key_pem_file);
1878 }
1879 
1880 
1881 /* For Emacs:
1882  * Local Variables:
1883  * mode:c
1884  * indent-tabs-mode:t
1885  * tab-width:4
1886  * c-basic-offset:4
1887  * End:
1888  * For VIM:
1889  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
1890  */
1891