1 /*
2 * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2013-2015, Grasshopper
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is Grasshopper
20 * Portions created by the Initial Developer are Copyright (C)
21 * the Initial Developer. All Rights Reserved.
22 *
23 * Contributor(s):
24 * Chris Rienzo <chris.rienzo@grasshopper.com>
25 *
26 * xmpp_streams.c -- XMPP s2s and c2s streams
27 *
28 */
29 #include <switch.h>
30 #include <iksemel.h>
31
32 #include <openssl/ssl.h>
33
34 #include "xmpp_streams.h"
35 #include "iks_helpers.h"
36 #include "sasl.h"
37
38 #define MAX_QUEUE_LEN 25000
39
40 /**
41 * Context for all streams
42 */
43 struct xmpp_stream_context {
44 /** memory pool to use */
45 switch_memory_pool_t *pool;
46 /** domain for this context */
47 const char *domain;
48 /** synchronizes access to streams and routes hashes */
49 switch_mutex_t *streams_mutex;
50 /** map of stream JID to routable stream */
51 switch_hash_t *routes;
52 /** map of stream ID to stream */
53 switch_hash_t *streams;
54 /** map of user ID to password */
55 switch_hash_t *users;
56 /** shared secret for server dialback */
57 const char *dialback_secret;
58 /** callback when a new resource is bound */
59 xmpp_stream_bind_callback bind_callback;
60 /** callback when a new stream is ready */
61 xmpp_stream_ready_callback ready_callback;
62 /** callback when a stream is destroyed */
63 xmpp_stream_destroy_callback destroy_callback;
64 /** callback when a stanza is received */
65 xmpp_stream_recv_callback recv_callback;
66 /** context shutdown flag */
67 int shutdown;
68 /** prevents context shutdown until all threads are finished */
69 switch_thread_rwlock_t *shutdown_rwlock;
70 /** path to cert PEM file */
71 const char *cert_pem_file;
72 /** path to key PEM file */
73 const char *key_pem_file;
74 };
75
76 /**
77 * State of a stream
78 */
79 enum xmpp_stream_state {
80 /** new connection */
81 XSS_CONNECT,
82 /** encrypted comms established */
83 XSS_SECURE,
84 /** remote party authenticated */
85 XSS_AUTHENTICATED,
86 /** client resource bound */
87 XSS_RESOURCE_BOUND,
88 /** ready to accept requests */
89 XSS_READY,
90 /** terminating stream */
91 XSS_SHUTDOWN,
92 /** unrecoverable error */
93 XSS_ERROR,
94 /** destroyed */
95 XSS_DESTROY
96 };
97
98 /**
99 * A client/server stream connection
100 */
101 struct xmpp_stream {
102 /** stream state */
103 enum xmpp_stream_state state;
104 /** true if server-to-server connection */
105 int s2s;
106 /** true if incoming connection */
107 int incoming;
108 /** Jabber ID of remote party */
109 char *jid;
110 /** stream ID */
111 char *id;
112 /** stream pool */
113 switch_memory_pool_t *pool;
114 /** address of this stream */
115 const char *address;
116 /** port of this stream */
117 int port;
118 /** synchronizes access to this stream */
119 switch_mutex_t *mutex;
120 /** socket to remote party */
121 switch_socket_t *socket;
122 /** socket poll descriptor */
123 switch_pollfd_t *pollfd;
124 /** XML stream parser */
125 iksparser *parser;
126 /** outbound message queue */
127 switch_queue_t *msg_queue;
128 /** true if no activity last poll */
129 int idle;
130 /** context for this stream */
131 struct xmpp_stream_context *context;
132 /** user private data */
133 void *user_private;
134 };
135
136 /**
137 * A socket listening for new connections
138 */
139 struct xmpp_listener {
140 /** listener pool */
141 switch_memory_pool_t *pool;
142 /** listen address */
143 char *addr;
144 /** listen port */
145 switch_port_t port;
146 /** access control list */
147 const char *acl;
148 /** listen socket */
149 switch_socket_t *socket;
150 /** pollset for listen socket */
151 switch_pollfd_t *read_pollfd;
152 /** true if server to server connections only */
153 int s2s;
154 /** context for new streams */
155 struct xmpp_stream_context *context;
156 };
157
158 static void xmpp_stream_new_id(struct xmpp_stream *stream);
159 static void xmpp_stream_set_id(struct xmpp_stream *stream, const char *id);
160
161 /**
162 * Convert xmpp stream state to string
163 * @param state the xmpp stream state
164 * @return the string value of state or "UNKNOWN"
165 */
xmpp_stream_state_to_string(enum xmpp_stream_state state)166 static const char *xmpp_stream_state_to_string(enum xmpp_stream_state state)
167 {
168 switch(state) {
169 case XSS_CONNECT: return "CONNECT";
170 case XSS_SECURE: return "SECURE";
171 case XSS_AUTHENTICATED: return "AUTHENTICATED";
172 case XSS_RESOURCE_BOUND: return "RESOURCE_BOUND";
173 case XSS_READY: return "READY";
174 case XSS_SHUTDOWN: return "SHUTDOWN";
175 case XSS_ERROR: return "ERROR";
176 case XSS_DESTROY: return "DESTROY";
177 }
178 return "UNKNOWN";
179 }
180
181 /**
182 * Handle XMPP stream logging callback
183 * @param user_data the xmpp stream
184 * @param data the log message
185 * @param size of the log message
186 * @param is_incoming true if this is a log for a received message
187 */
on_stream_log(void * user_data,const char * data,size_t size,int is_incoming)188 static void on_stream_log(void *user_data, const char *data, size_t size, int is_incoming)
189 {
190 if (size > 0) {
191 struct xmpp_stream *stream = (struct xmpp_stream *)user_data;
192 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, %s_%s %s %s\n", stream->jid, stream->address, stream->port, stream->s2s ? "s2s" : "c2s",
193 stream->incoming ? "in" : "out", is_incoming ? "RECV" : "SEND", data);
194 }
195 }
196
197 /**
198 * Send stanza to stream.
199 */
xmpp_stream_stanza_send(struct xmpp_stream * stream,iks * msg)200 static void xmpp_stream_stanza_send(struct xmpp_stream *stream, iks *msg)
201 {
202 /* send directly if client or outbound s2s stream */
203 if (!stream->s2s || !stream->incoming) {
204 iks_send(stream->parser, msg);
205 iks_delete(msg);
206 } else {
207 /* route message to outbound server stream */
208 xmpp_stream_context_send(stream->context, stream->jid, msg);
209 iks_delete(msg);
210 }
211 }
212
213 /**
214 * Attach stream to connected socket
215 * @param stream the stream
216 * @param socket the connected socket
217 */
xmpp_stream_set_socket(struct xmpp_stream * stream,switch_socket_t * socket)218 static void xmpp_stream_set_socket(struct xmpp_stream *stream, switch_socket_t *socket)
219 {
220 stream->socket = socket;
221 switch_socket_create_pollset(&stream->pollfd, stream->socket, SWITCH_POLLIN | SWITCH_POLLERR, stream->pool);
222
223 /* connect XMPP stream parser to socket */
224 {
225 switch_os_socket_t os_socket;
226 switch_os_sock_get(&os_socket, stream->socket);
227 iks_connect_fd(stream->parser, os_socket);
228 /* TODO connect error checking */
229 }
230 }
231
232 /**
233 * Assign a new ID to the stream
234 * @param stream the stream
235 */
xmpp_stream_new_id(struct xmpp_stream * stream)236 static void xmpp_stream_new_id(struct xmpp_stream *stream)
237 {
238 char id[SWITCH_UUID_FORMATTED_LENGTH + 1] = { 0 };
239 switch_uuid_str(id, sizeof(id));
240 xmpp_stream_set_id(stream, id);
241 }
242
243 /**
244 * Send session reply to server <stream> after auth is done
245 * @param stream the xmpp stream
246 */
xmpp_send_server_header_features(struct xmpp_stream * stream)247 static void xmpp_send_server_header_features(struct xmpp_stream *stream)
248 {
249 struct xmpp_stream_context *context = stream->context;
250 char *header = switch_mprintf(
251 "<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
252 " from='%s' id='%s' xml:lang='en' version='1.0'"
253 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
254 "</stream:features>", context->domain, stream->id);
255
256 iks_send_raw(stream->parser, header);
257 free(header);
258 }
259
260 /**
261 * Send bind + session reply to client <stream>
262 * @param stream the xmpp stream
263 */
xmpp_send_client_header_bind(struct xmpp_stream * stream)264 static void xmpp_send_client_header_bind(struct xmpp_stream *stream)
265 {
266 struct xmpp_stream_context *context = stream->context;
267 char *header = switch_mprintf(
268 "<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
269 " from='%s' id='%s' xml:lang='en' version='1.0'"
270 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
271 "<bind xmlns='"IKS_NS_XMPP_BIND"'/>"
272 "<session xmlns='"IKS_NS_XMPP_SESSION"'/>"
273 "</stream:features>", context->domain, stream->id);
274
275 iks_send_raw(stream->parser, header);
276 free(header);
277 }
278
279 /**
280 * Handle <presence> message callback
281 * @param stream the stream
282 * @param node the presence message
283 */
on_stream_presence(struct xmpp_stream * stream,iks * node)284 static void on_stream_presence(struct xmpp_stream *stream, iks *node)
285 {
286 struct xmpp_stream_context *context = stream->context;
287 const char *from = iks_find_attrib(node, "from");
288
289 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, presence, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
290
291 if (!from) {
292 if (stream->s2s) {
293 /* from is required in s2s connections */
294 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, no presence from JID\n", stream->jid, stream->address, stream->port);
295 return;
296 }
297
298 /* use stream JID if a c2s connection */
299 from = stream->jid;
300 if (zstr(from)) {
301 /* error */
302 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, no presence from JID\n", stream->jid, stream->address, stream->port);
303 return;
304 }
305 iks_insert_attrib(node, "from", from);
306 }
307 if (context->recv_callback) {
308 context->recv_callback(stream, node);
309 }
310 }
311
312 /**
313 * Send <success> reply to xmpp stream <auth>
314 * @param stream the xmpp stream.
315 */
xmpp_send_auth_success(struct xmpp_stream * stream)316 static void xmpp_send_auth_success(struct xmpp_stream *stream)
317 {
318 iks_send_raw(stream->parser, "<success xmlns='"IKS_NS_XMPP_SASL"'/>");
319 }
320
321 /**
322 * Send <failure> reply to xmpp client <auth>
323 * @param stream the xmpp stream to use.
324 * @param reason the reason for failure
325 */
xmpp_send_auth_failure(struct xmpp_stream * stream,const char * reason)326 static void xmpp_send_auth_failure(struct xmpp_stream *stream, const char *reason)
327 {
328 char *reply = switch_mprintf("<failure xmlns='"IKS_NS_XMPP_SASL"'>"
329 "<%s/></failure>", reason);
330 iks_send_raw(stream->parser, reply);
331 free(reply);
332 }
333
334 /**
335 * Validate username and password
336 * @param authzid authorization id
337 * @param authcid authentication id
338 * @param password
339 * @return 1 if authenticated
340 */
verify_plain_auth(struct xmpp_stream_context * context,const char * authzid,const char * authcid,const char * password)341 static int verify_plain_auth(struct xmpp_stream_context *context, const char *authzid, const char *authcid, const char *password)
342 {
343 char *correct_password;
344 if (zstr(authzid) || zstr(authcid) || zstr(password)) {
345 return 0;
346 }
347 correct_password = switch_core_hash_find(context->users, authcid);
348 return !zstr(correct_password) && !strcmp(correct_password, password);
349 }
350
351 /**
352 * Send sasl reply to xmpp <stream>
353 * @param stream the xmpp stream
354 */
xmpp_send_client_header_auth(struct xmpp_stream * stream)355 static void xmpp_send_client_header_auth(struct xmpp_stream *stream)
356 {
357 struct xmpp_stream_context *context = stream->context;
358 char *header = switch_mprintf(
359 "<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
360 " from='%s' id='%s' xml:lang='en' version='1.0'"
361 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
362 "<mechanisms xmlns='"IKS_NS_XMPP_SASL"'>"
363 "<mechanism>PLAIN</mechanism>"
364 "</mechanisms></stream:features>", context->domain, stream->id);
365 iks_send_raw(stream->parser, header);
366 free(header);
367 }
368
369 /**
370 * Send sasl + starttls reply to xmpp <stream>
371 * @param stream the xmpp stream
372 */
xmpp_send_client_header_tls(struct xmpp_stream * stream)373 static void xmpp_send_client_header_tls(struct xmpp_stream *stream)
374 {
375 if (stream->context->key_pem_file && stream->context->cert_pem_file) {
376 struct xmpp_stream_context *context = stream->context;
377 char *header = switch_mprintf(
378 "<stream:stream xmlns='"IKS_NS_CLIENT"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
379 " from='%s' id='%s' xml:lang='en' version='1.0'"
380 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'><stream:features>"
381 "<starttls xmlns='"IKS_NS_XMPP_TLS"'><required/></starttls>"
382 "<mechanisms xmlns='"IKS_NS_XMPP_SASL"'>"
383 "<mechanism>PLAIN</mechanism>"
384 "</mechanisms></stream:features>", context->domain, stream->id);
385 iks_send_raw(stream->parser, header);
386 free(header);
387 } else {
388 /* not set up for TLS, skip it */
389 stream->state = XSS_SECURE;
390 xmpp_send_client_header_auth(stream);
391 }
392 }
393
394 /**
395 * Send sasl reply to xmpp <stream>
396 * @param stream the xmpp stream
397 */
xmpp_send_server_header_auth(struct xmpp_stream * stream)398 static void xmpp_send_server_header_auth(struct xmpp_stream *stream)
399 {
400 struct xmpp_stream_context *context = stream->context;
401 char *header = switch_mprintf(
402 "<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
403 " from='%s' id='%s' xml:lang='en' version='1.0'"
404 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'>"
405 "<stream:features>"
406 "</stream:features>",
407 context->domain, stream->id);
408 iks_send_raw(stream->parser, header);
409 free(header);
410 }
411
412 /**
413 * Send dialback to receiving server
414 */
xmpp_send_dialback_key(struct xmpp_stream * stream)415 static void xmpp_send_dialback_key(struct xmpp_stream *stream)
416 {
417 struct xmpp_stream_context *context = stream->context;
418 char *dialback_key = iks_server_dialback_key(context->dialback_secret, stream->jid, context->domain, stream->id);
419 if (dialback_key) {
420 char *dialback = switch_mprintf(
421 "<db:result from='%s' to='%s'>%s</db:result>",
422 context->domain, stream->jid,
423 dialback_key);
424 iks_send_raw(stream->parser, dialback);
425 free(dialback);
426 free(dialback_key);
427 } else {
428 /* TODO missing shared secret */
429 }
430 }
431
432 /**
433 * Send initial <stream> header to peer server
434 * @param stream the xmpp stream
435 */
xmpp_send_outbound_server_header(struct xmpp_stream * stream)436 static void xmpp_send_outbound_server_header(struct xmpp_stream *stream)
437 {
438 struct xmpp_stream_context *context = stream->context;
439 char *header = switch_mprintf(
440 "<stream:stream xmlns='"IKS_NS_SERVER"' xmlns:db='"IKS_NS_XMPP_DIALBACK"'"
441 " from='%s' to='%s' xml:lang='en' version='1.0'"
442 " xmlns:stream='"IKS_NS_XMPP_STREAMS"'>", context->domain, stream->jid);
443 iks_send_raw(stream->parser, header);
444 free(header);
445 }
446
447 /**
448 * Handle <starttls> message.
449 * @param the xmpp stream
450 * @param node the <starttls> packet
451 */
on_stream_starttls(struct xmpp_stream * stream,iks * node)452 static void on_stream_starttls(struct xmpp_stream *stream, iks *node)
453 {
454 /* wait for handshake to start */
455 if (iks_proceed_tls(stream->parser, stream->context->cert_pem_file, stream->context->key_pem_file) == IKS_OK) {
456 stream->state = XSS_SECURE;
457 } else {
458 stream->state = XSS_ERROR;
459 }
460 }
461
462 /**
463 * Handle <auth> message. Only PLAIN supported.
464 * @param stream the xmpp stream
465 * @param node the <auth> packet
466 */
on_stream_auth(struct xmpp_stream * stream,iks * node)467 static void on_stream_auth(struct xmpp_stream *stream, iks *node)
468 {
469 struct xmpp_stream_context *context = stream->context;
470 const char *xmlns, *mechanism;
471 iks *auth_body;
472
473 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, auth, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
474
475 /* wrong state for authentication */
476 if (stream->state != XSS_SECURE) {
477 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth UNEXPECTED, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
478 /* on_auth unexpected error */
479 stream->state = XSS_ERROR;
480 return;
481 }
482
483 /* unsupported authentication type */
484 xmlns = iks_find_attrib_soft(node, "xmlns");
485 if (strcmp(IKS_NS_XMPP_SASL, xmlns)) {
486 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, unsupported namespace: %s!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), xmlns);
487 /* on_auth namespace error */
488 stream->state = XSS_ERROR;
489 return;
490 }
491
492 /* unsupported SASL authentication mechanism */
493 mechanism = iks_find_attrib_soft(node, "mechanism");
494 if (strcmp("PLAIN", mechanism)) {
495 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, unsupported SASL mechanism: %s!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), mechanism);
496 xmpp_send_auth_failure(stream, "invalid-mechanism");
497 stream->state = XSS_ERROR;
498 return;
499 }
500
501 if ((auth_body = iks_child(node)) && iks_type(auth_body) == IKS_CDATA) {
502 /* get user and password from auth */
503 char *message = iks_cdata(auth_body);
504 char *authzid = NULL, *authcid, *password;
505 /* TODO use library for SASL! */
506 parse_plain_auth_message(message, &authzid, &authcid, &password);
507 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, auth, state = %s, SASL/PLAIN decoded authzid = \"%s\" authcid = \"%s\"\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), authzid, authcid);
508 if (verify_plain_auth(context, authzid, authcid, password)) {
509 stream->jid = switch_core_strdup(stream->pool, authzid);
510 if (!stream->s2s && !strchr(stream->jid, '@')) {
511 /* add missing domain on client stream */
512 stream->jid = switch_core_sprintf(stream->pool, "%s@%s", stream->jid, context->domain);
513 }
514
515 xmpp_send_auth_success(stream);
516 stream->state = XSS_AUTHENTICATED;
517 } else {
518 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, auth, state = %s, invalid user or password!\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
519 xmpp_send_auth_failure(stream, "not-authorized");
520 stream->state = XSS_ERROR;
521 }
522 switch_safe_free(authzid);
523 switch_safe_free(authcid);
524 switch_safe_free(password);
525 } else {
526 /* missing message */
527 stream->state = XSS_ERROR;
528 }
529 }
530
531 /**
532 * Handle <iq><session> request
533 * @param stream the xmpp stream
534 * @param node the <iq> node
535 * @return NULL
536 */
on_iq_set_xmpp_session(struct xmpp_stream * stream,iks * node)537 static iks *on_iq_set_xmpp_session(struct xmpp_stream *stream, iks *node)
538 {
539 struct xmpp_stream_context *context = stream->context;
540 iks *reply;
541
542 switch(stream->state) {
543 case XSS_RESOURCE_BOUND: {
544 if (context->ready_callback && !context->ready_callback(stream)) {
545 reply = iks_new_error(node, STANZA_ERROR_INTERNAL_SERVER_ERROR);
546 stream->state = XSS_ERROR;
547 } else {
548 reply = iks_new_iq_result(node);
549 stream->state = XSS_READY;
550
551 /* add to available streams */
552 switch_mutex_lock(context->streams_mutex);
553 switch_core_hash_insert(context->routes, stream->jid, stream);
554 switch_mutex_unlock(context->streams_mutex);
555 }
556
557 break;
558 }
559 case XSS_AUTHENTICATED:
560 case XSS_READY:
561 default:
562 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, iq UNEXPECTED <session>, state = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state));
563 reply = iks_new_error(node, STANZA_ERROR_SERVICE_UNAVAILABLE);
564 break;
565 }
566
567 return reply;
568 }
569
570 /**
571 * Handle <iq><bind> request
572 * @param stream the xmpp stream
573 * @param node the <iq> node
574 */
on_iq_set_xmpp_bind(struct xmpp_stream * stream,iks * node)575 static iks *on_iq_set_xmpp_bind(struct xmpp_stream *stream, iks *node)
576 {
577 iks *reply = NULL;
578
579 switch(stream->state) {
580 case XSS_AUTHENTICATED: {
581 struct xmpp_stream_context *context = stream->context;
582 iks *bind = iks_find(node, "bind");
583 iks *x;
584 /* get optional client resource ID */
585 char *resource_id = iks_find_cdata(bind, "resource");
586
587 /* generate resource ID for client if not already set */
588 if (zstr(resource_id)) {
589 char resource_id_buf[SWITCH_UUID_FORMATTED_LENGTH + 1];
590 switch_uuid_str(resource_id_buf, sizeof(resource_id_buf));
591 resource_id = switch_core_strdup(stream->pool, resource_id_buf);
592 }
593
594 stream->jid = switch_core_sprintf(stream->pool, "%s/%s", stream->jid, resource_id);
595 if (context->bind_callback && !context->bind_callback(stream)) {
596 stream->jid = NULL;
597 reply = iks_new_error(node, STANZA_ERROR_CONFLICT);
598 } else {
599 stream->state = XSS_RESOURCE_BOUND;
600
601 reply = iks_new_iq_result(node);
602 x = iks_insert(reply, "bind");
603 iks_insert_attrib(x, "xmlns", IKS_NS_XMPP_BIND);
604 iks_insert_cdata(iks_insert(x, "jid"), stream->jid, strlen(stream->jid));
605 }
606 break;
607 }
608 default:
609 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_WARNING, "%s, %s:%i, iq UNEXPECTED <bind>\n", stream->jid, stream->address, stream->port);
610 reply = iks_new_error(node, STANZA_ERROR_NOT_ALLOWED);
611 break;
612 }
613
614 return reply;
615 }
616
617 /**
618 * Handle <iq> message callback
619 * @param stream the stream
620 * @param iq the packet
621 */
on_stream_iq(struct xmpp_stream * stream,iks * iq)622 static void on_stream_iq(struct xmpp_stream *stream, iks *iq)
623 {
624 struct xmpp_stream_context *context = stream->context;
625 switch(stream->state) {
626 case XSS_CONNECT:
627 case XSS_SECURE: {
628 iks *error = iks_new_error(iq, STANZA_ERROR_NOT_AUTHORIZED);
629 xmpp_stream_stanza_send(stream, error);
630 break;
631 }
632 case XSS_AUTHENTICATED: {
633 iks *cmd = iks_first_tag(iq);
634 if (cmd && !strcmp("bind", iks_name(cmd)) && !strcmp(IKS_NS_XMPP_BIND, iks_find_attrib_soft(cmd, "xmlns"))) {
635 iks *reply = on_iq_set_xmpp_bind(stream, iq);
636 xmpp_stream_stanza_send(stream, reply);
637 } else {
638 iks *error = iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE);
639 xmpp_stream_stanza_send(stream, error);
640 }
641 break;
642 }
643 case XSS_RESOURCE_BOUND: {
644 iks *cmd = iks_first_tag(iq);
645 if (cmd && !strcmp("session", iks_name(cmd)) && !strcmp(IKS_NS_XMPP_SESSION, iks_find_attrib_soft(cmd, "xmlns"))) {
646 iks *reply = on_iq_set_xmpp_session(stream, iq);
647 xmpp_stream_stanza_send(stream, reply);
648 } else {
649 iks *error = iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE);
650 xmpp_stream_stanza_send(stream, error);
651 }
652 break;
653 }
654 case XSS_READY: {
655 /* client requests */
656 if (context->recv_callback) {
657 context->recv_callback(stream, iq);
658 }
659 break;
660 }
661 case XSS_SHUTDOWN:
662 case XSS_DESTROY:
663 case XSS_ERROR: {
664 iks *error = iks_new_error(iq, STANZA_ERROR_UNEXPECTED_REQUEST);
665 xmpp_stream_stanza_send(stream, error);
666 break;
667 }
668 };
669 }
670
671 /**
672 * Handle </stream>
673 * @param stream the stream
674 */
on_stream_stop(struct xmpp_stream * stream)675 static void on_stream_stop(struct xmpp_stream *stream)
676 {
677 if (stream->state != XSS_SHUTDOWN) {
678 iks_send_raw(stream->parser, "</stream:stream>");
679 }
680 stream->state = XSS_DESTROY;
681 }
682
683 /**
684 * Handle <stream> from a client
685 * @param stream the stream
686 * @param node the stream message
687 */
on_client_stream_start(struct xmpp_stream * stream,iks * node)688 static void on_client_stream_start(struct xmpp_stream *stream, iks *node)
689 {
690 struct xmpp_stream_context *context = stream->context;
691 const char *to = iks_find_attrib_soft(node, "to");
692 const char *xmlns = iks_find_attrib_soft(node, "xmlns");
693
694 /* to is optional, must be server domain if set */
695 if (!zstr(to) && strcmp(context->domain, to)) {
696 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong server domain!\n", stream->jid, stream->address, stream->port);
697 stream->state = XSS_ERROR;
698 return;
699 }
700
701 /* xmlns = client */
702 if (zstr(xmlns) || strcmp(xmlns, IKS_NS_CLIENT)) {
703 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
704 stream->state = XSS_ERROR;
705 return;
706 }
707
708 switch (stream->state) {
709 case XSS_CONNECT:
710 xmpp_send_client_header_tls(stream);
711 break;
712 case XSS_SECURE:
713 xmpp_send_client_header_auth(stream);
714 break;
715 case XSS_AUTHENTICATED:
716 /* client bind required */
717 xmpp_stream_new_id(stream);
718 xmpp_send_client_header_bind(stream);
719 break;
720 case XSS_SHUTDOWN:
721 /* strange... I expect IKS_NODE_STOP, this is a workaround. */
722 stream->state = XSS_DESTROY;
723 break;
724 case XSS_RESOURCE_BOUND:
725 case XSS_READY:
726 case XSS_ERROR:
727 case XSS_DESTROY:
728 /* bad state */
729 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, bad state!\n", stream->jid, stream->address, stream->port);
730 stream->state = XSS_ERROR;
731 break;
732 }
733 }
734
735 /**
736 * Handle <db:result type='valid'>
737 */
on_stream_dialback_result_valid(struct xmpp_stream * stream,iks * node)738 static void on_stream_dialback_result_valid(struct xmpp_stream *stream, iks *node)
739 {
740 struct xmpp_stream_context *context = stream->context;
741
742 /* TODO check domain pair and allow access if pending request exists */
743 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, valid dialback result\n", stream->jid, stream->address, stream->port);
744
745 if (context->ready_callback && !context->ready_callback(stream)) {
746 stream->state = XSS_ERROR;
747 } else {
748 /* this stream is routable */
749 stream->state = XSS_READY;
750
751 /* add to available streams */
752 switch_mutex_lock(context->streams_mutex);
753 switch_core_hash_insert(context->routes, stream->jid, stream);
754 switch_mutex_unlock(context->streams_mutex);
755 }
756 }
757
758 /**
759 * Handle <db:result type='valid'>
760 */
on_stream_dialback_result_invalid(struct xmpp_stream * stream,iks * node)761 static void on_stream_dialback_result_invalid(struct xmpp_stream *stream, iks *node)
762 {
763 /* close stream */
764 stream->state = XSS_ERROR;
765 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid dialback result!\n", stream->jid, stream->address, stream->port);
766 }
767
768 /**
769 * Handle <db:result type='error'>
770 */
on_stream_dialback_result_error(struct xmpp_stream * stream,iks * node)771 static void on_stream_dialback_result_error(struct xmpp_stream *stream, iks *node)
772 {
773 /* close stream */
774 stream->state = XSS_ERROR;
775 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, error dialback result!\n", stream->jid, stream->address, stream->port);
776 }
777
778 /**
779 * Handle <db:result>
780 */
on_stream_dialback_result_key(struct xmpp_stream * stream,iks * node)781 static void on_stream_dialback_result_key(struct xmpp_stream *stream, iks *node)
782 {
783 struct xmpp_stream_context *context = stream->context;
784 const char *from = iks_find_attrib_soft(node, "from");
785 const char *to = iks_find_attrib_soft(node, "to");
786 iks *cdata = iks_child(node);
787 iks *reply;
788 const char *dialback_key = NULL;
789
790 if (cdata && iks_type(cdata) == IKS_CDATA) {
791 dialback_key = iks_cdata(cdata);
792 }
793 if (zstr(dialback_key)) {
794 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing dialback key");
795 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing key!\n", stream->jid, stream->address, stream->port);
796 iks_send(stream->parser, error);
797 iks_delete(error);
798 stream->state = XSS_ERROR;
799 return;
800 }
801
802 if (zstr(from)) {
803 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing from");
804 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing from!\n", stream->jid, stream->address, stream->port);
805 iks_send(stream->parser, error);
806 iks_delete(error);
807 stream->state = XSS_ERROR;
808 return;
809 }
810
811 if (zstr(to)) {
812 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing to");
813 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback result missing to!\n", stream->jid, stream->address, stream->port);
814 iks_send(stream->parser, error);
815 iks_delete(error);
816 stream->state = XSS_ERROR;
817 return;
818 }
819
820 if (strcmp(context->domain, to)) {
821 iks *error = iks_new_error(node, STANZA_ERROR_ITEM_NOT_FOUND);
822 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid domain!\n", stream->jid, stream->address, stream->port);
823 iks_send(stream->parser, error);
824 iks_delete(error);
825 stream->state = XSS_ERROR;
826 return;
827 }
828
829 /* this stream is not routable */
830 stream->state = XSS_READY;
831 stream->jid = switch_core_strdup(stream->pool, from);
832
833 if (context->ready_callback && !context->ready_callback(stream)) {
834 iks *error = iks_new_error(node, STANZA_ERROR_INTERNAL_SERVER_ERROR);
835 iks_send(stream->parser, error);
836 iks_delete(error);
837 stream->state = XSS_ERROR;
838 return;
839 }
840
841 /* TODO validate key */
842 reply = iks_new("db:result");
843 iks_insert_attrib(reply, "from", to);
844 iks_insert_attrib(reply, "to", from);
845 iks_insert_attrib(reply, "type", "valid");
846 iks_send(stream->parser, reply);
847 iks_delete(reply);
848 }
849
850 /**
851 * Handle <db:result>
852 */
on_stream_dialback_result(struct xmpp_stream * stream,iks * node)853 static void on_stream_dialback_result(struct xmpp_stream *stream, iks *node)
854 {
855 const char *type = iks_find_attrib_soft(node, "type");
856
857 if (stream->state == XSS_ERROR || stream->state == XSS_DESTROY) {
858 stream->state = XSS_ERROR;
859 return;
860 }
861
862 if (zstr(type)) {
863 on_stream_dialback_result_key(stream, node);
864 } else if (!strcmp("valid", type)) {
865 on_stream_dialback_result_valid(stream, node);
866 } else if (!strcmp("invalid", type)) {
867 on_stream_dialback_result_invalid(stream, node);
868 } else if (!strcmp("error", type)) {
869 on_stream_dialback_result_error(stream, node);
870 }
871 }
872
873 /**
874 * Handle <db:verify>
875 */
on_stream_dialback_verify(struct xmpp_stream * stream,iks * node)876 static void on_stream_dialback_verify(struct xmpp_stream *stream, iks *node)
877 {
878 struct xmpp_stream_context *context = stream->context;
879 const char *from = iks_find_attrib_soft(node, "from");
880 const char *id = iks_find_attrib_soft(node, "id");
881 const char *to = iks_find_attrib_soft(node, "to");
882 iks *cdata = iks_child(node);
883 iks *reply;
884 const char *dialback_key = NULL;
885 char *expected_key = NULL;
886 int valid;
887
888 if (stream->state == XSS_ERROR || stream->state == XSS_DESTROY) {
889 stream->state = XSS_ERROR;
890 return;
891 }
892
893 if (cdata && iks_type(cdata) == IKS_CDATA) {
894 dialback_key = iks_cdata(cdata);
895 }
896 if (zstr(dialback_key)) {
897 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing dialback key");
898 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing key!\n", stream->jid, stream->address, stream->port);
899 iks_send(stream->parser, error);
900 iks_delete(error);
901 return;
902 }
903
904 if (zstr(id)) {
905 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing id");
906 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing stream ID!\n", stream->jid, stream->address, stream->port);
907 iks_send(stream->parser, error);
908 iks_delete(error);
909 return;
910 }
911
912 if (zstr(from)) {
913 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing from");
914 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing from!\n", stream->jid, stream->address, stream->port);
915 iks_send(stream->parser, error);
916 iks_delete(error);
917 return;
918 }
919
920 if (zstr(to)) {
921 iks *error = iks_new_error_detailed(node, STANZA_ERROR_BAD_REQUEST, "Missing to");
922 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, dialback verify missing to!\n", stream->jid, stream->address, stream->port);
923 iks_send(stream->parser, error);
924 iks_delete(error);
925 return;
926 }
927
928 if (strcmp(context->domain, to)) {
929 iks *error = iks_new_error(node, STANZA_ERROR_ITEM_NOT_FOUND);
930 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, invalid domain!\n", stream->jid, stream->address, stream->port);
931 iks_send(stream->parser, error);
932 iks_delete(error);
933 return;
934 }
935
936 expected_key = iks_server_dialback_key(context->dialback_secret, from, to, id);
937 valid = expected_key && !strcmp(expected_key, dialback_key);
938
939 reply = iks_new("db:verify");
940 iks_insert_attrib(reply, "from", to);
941 iks_insert_attrib(reply, "to", from);
942 iks_insert_attrib(reply, "id", id);
943 iks_insert_attrib(reply, "type", valid ? "valid" : "invalid");
944 iks_send(stream->parser, reply);
945 iks_delete(reply);
946 free(expected_key);
947
948 if (!valid) {
949 /* close the stream */
950 stream->state = XSS_ERROR;
951 }
952 }
953
954 /**
955 * Handle <stream> from an outbound peer server
956 */
on_outbound_server_stream_start(struct xmpp_stream * stream,iks * node)957 static void on_outbound_server_stream_start(struct xmpp_stream *stream, iks *node)
958 {
959 const char *xmlns = iks_find_attrib_soft(node, "xmlns");
960
961 /* xmlns = server */
962 if (zstr(xmlns) || strcmp(xmlns, IKS_NS_SERVER)) {
963 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
964 stream->state = XSS_ERROR;
965 return;
966 }
967
968 switch (stream->state) {
969 case XSS_CONNECT: {
970 /* get stream ID and send dialback */
971 const char *id = iks_find_attrib_soft(node, "id");
972 if (zstr(id)) {
973 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, missing stream ID!\n", stream->jid, stream->address, stream->port);
974 stream->state = XSS_ERROR;
975 return;
976 }
977 xmpp_stream_set_id(stream, id);
978
979 /* send dialback */
980 xmpp_send_dialback_key(stream);
981 break;
982 }
983 case XSS_SHUTDOWN:
984 /* strange... I expect IKS_NODE_STOP, this is a workaround. */
985 stream->state = XSS_DESTROY;
986 break;
987 case XSS_SECURE:
988 case XSS_AUTHENTICATED:
989 case XSS_RESOURCE_BOUND:
990 case XSS_READY:
991 case XSS_ERROR:
992 case XSS_DESTROY:
993 /* bad state */
994 stream->state = XSS_ERROR;
995 break;
996 }
997 }
998
999 /**
1000 * Handle <stream> from an inbound peer server
1001 * @param stream the stream
1002 * @param node the stream message
1003 */
on_inbound_server_stream_start(struct xmpp_stream * stream,iks * node)1004 static void on_inbound_server_stream_start(struct xmpp_stream *stream, iks *node)
1005 {
1006 struct xmpp_stream_context *context = stream->context;
1007 const char *to = iks_find_attrib_soft(node, "to");
1008 const char *xmlns = iks_find_attrib_soft(node, "xmlns");
1009
1010 /* to is required, must be server domain */
1011 if (zstr(to) || strcmp(context->domain, to)) {
1012 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong server domain!\n", stream->jid, stream->address, stream->port);
1013 stream->state = XSS_ERROR;
1014 return;
1015 }
1016
1017 /* xmlns = server */
1018 if (zstr(xmlns) || strcmp(xmlns, IKS_NS_SERVER)) {
1019 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, wrong stream namespace!\n", stream->jid, stream->address, stream->port);
1020 stream->state = XSS_ERROR;
1021 return;
1022 }
1023
1024 switch (stream->state) {
1025 case XSS_CONNECT:
1026 xmpp_send_server_header_auth(stream);
1027 break;
1028 case XSS_SECURE:
1029 break;
1030 case XSS_AUTHENTICATED: {
1031 if (context->ready_callback && !context->ready_callback(stream)) {
1032 stream->state = XSS_ERROR;
1033 break;
1034 }
1035
1036 /* all set */
1037 xmpp_send_server_header_features(stream);
1038 stream->state = XSS_READY;
1039
1040 /* add to available streams */
1041 switch_mutex_lock(context->streams_mutex);
1042 switch_core_hash_insert(context->routes, stream->jid, stream);
1043 switch_mutex_unlock(context->streams_mutex);
1044 break;
1045 }
1046 case XSS_SHUTDOWN:
1047 /* strange... I expect IKS_NODE_STOP, this is a workaround. */
1048 stream->state = XSS_DESTROY;
1049 break;
1050 case XSS_RESOURCE_BOUND:
1051 case XSS_READY:
1052 case XSS_ERROR:
1053 case XSS_DESTROY:
1054 /* bad state */
1055 stream->state = XSS_ERROR;
1056 break;
1057 }
1058 }
1059
1060 /**
1061 * Handle XML stream callback
1062 * @param user_data the xmpp stream
1063 * @param type stream type (start/normal/stop/etc)
1064 * @param node optional XML node
1065 * @return IKS_OK
1066 */
on_stream(void * user_data,int type,iks * node)1067 static int on_stream(void *user_data, int type, iks *node)
1068 {
1069 struct xmpp_stream *stream = (struct xmpp_stream *)user_data;
1070
1071 stream->idle = 0;
1072
1073 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, state = %s, node type = %s\n", stream->jid, stream->address, stream->port, xmpp_stream_state_to_string(stream->state), iks_node_type_to_string(type));
1074
1075 switch(type) {
1076 case IKS_NODE_START:
1077 /* <stream> */
1078 if (node) {
1079 if (stream->s2s) {
1080 if (stream->incoming) {
1081 on_inbound_server_stream_start(stream, node);
1082 } else {
1083 on_outbound_server_stream_start(stream, node);
1084 }
1085 } else {
1086 on_client_stream_start(stream, node);
1087 }
1088 } else {
1089 stream->state = XSS_ERROR;
1090 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, missing node!\n", stream->jid, stream->address, stream->port);
1091 }
1092 break;
1093 case IKS_NODE_NORMAL:
1094 /* stanza */
1095 if (node) {
1096 const char *name = iks_name(node);
1097 if (!strcmp("iq", name) || !strcmp("message", name)) {
1098 on_stream_iq(stream, node);
1099 } else if (!strcmp("presence", name)) {
1100 on_stream_presence(stream, node);
1101 } else if (!strcmp("auth", name)) {
1102 on_stream_auth(stream, node);
1103 } else if (!strcmp("starttls", name)) {
1104 on_stream_starttls(stream, node);
1105 } else if (!strcmp("db:result", name)) {
1106 on_stream_dialback_result(stream, node);
1107 } else if (!strcmp("db:verify", name)) {
1108 on_stream_dialback_verify(stream, node);
1109 } else {
1110 /* unknown first-level element */
1111 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, unknown first-level element: %s\n", stream->jid, stream->address, stream->port, name);
1112 }
1113 }
1114 break;
1115 case IKS_NODE_ERROR:
1116 /* <error> */
1117 break;
1118 case IKS_NODE_STOP:
1119 on_stream_stop(stream);
1120 break;
1121 }
1122
1123 if (node) {
1124 iks_delete(node);
1125 }
1126
1127 return IKS_OK;
1128 }
1129
1130 /**
1131 * Cleanup xmpp stream
1132 */
xmpp_stream_destroy(struct xmpp_stream * stream)1133 static void xmpp_stream_destroy(struct xmpp_stream *stream)
1134 {
1135 struct xmpp_stream_context *context = stream->context;
1136 switch_memory_pool_t *pool = stream->pool;
1137 stream->state = XSS_DESTROY;
1138
1139 /* remove from available streams */
1140 switch_mutex_lock(context->streams_mutex);
1141 if (stream->jid) {
1142 switch_core_hash_delete(context->routes, stream->jid);
1143 }
1144 if (stream->id) {
1145 switch_core_hash_delete(context->streams, stream->id);
1146 }
1147 switch_mutex_unlock(context->streams_mutex);
1148
1149 /* close connection */
1150 if (stream->parser) {
1151 iks_disconnect(stream->parser);
1152 iks_parser_delete(stream->parser);
1153 }
1154
1155 if (stream->socket) {
1156 switch_socket_shutdown(stream->socket, SWITCH_SHUTDOWN_READWRITE);
1157 switch_socket_close(stream->socket);
1158 }
1159
1160 /* flush pending messages */
1161 if (stream->msg_queue) {
1162 char *msg;
1163 while (switch_queue_trypop(stream->msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) {
1164 iks_free(msg);
1165 }
1166 }
1167
1168 if (context->destroy_callback) {
1169 context->destroy_callback(stream);
1170 }
1171
1172 switch_core_destroy_memory_pool(&pool);
1173 }
1174
1175 /**
1176 * @param stream the xmpp stream to check
1177 * @return 0 if stream is dead
1178 */
xmpp_stream_ready(struct xmpp_stream * stream)1179 static int xmpp_stream_ready(struct xmpp_stream *stream)
1180 {
1181 return stream->state != XSS_ERROR && stream->state != XSS_DESTROY;
1182 }
1183
1184 #define KEEP_ALIVE_INTERVAL_NS (60 * 1000 * 1000)
1185
1186 /**
1187 * Thread that handles xmpp XML stream
1188 * @param thread this thread
1189 * @param obj the xmpp stream
1190 * @return NULL
1191 */
xmpp_stream_thread(switch_thread_t * thread,void * obj)1192 static void *SWITCH_THREAD_FUNC xmpp_stream_thread(switch_thread_t *thread, void *obj)
1193 {
1194 struct xmpp_stream *stream = (struct xmpp_stream *)obj;
1195 struct xmpp_stream_context *context = stream->context;
1196 int err_count = 0;
1197 switch_time_t last_activity = 0;
1198 int ping_id = 1;
1199
1200 if (stream->incoming) {
1201 switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1202 }
1203
1204 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s:%i, New %s_%s stream\n", stream->address, stream->port, stream->s2s ? "s2s" : "c2s", stream->incoming ? "in" : "out");
1205
1206 if (stream->s2s && !stream->incoming) {
1207 xmpp_send_outbound_server_header(stream);
1208 }
1209
1210 while (xmpp_stream_ready(stream)) {
1211 char *msg;
1212 int result;
1213 switch_time_t now = switch_micro_time_now();
1214
1215 /* read any messages from client */
1216 stream->idle = 1;
1217 result = iks_recv(stream->parser, 0);
1218 switch (result) {
1219 case IKS_OK:
1220 err_count = 0;
1221 break;
1222 case IKS_NET_TLSFAIL:
1223 case IKS_NET_RWERR:
1224 case IKS_NET_NOCONN:
1225 case IKS_NET_NOSOCK:
1226 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, iks_recv() error = %s, ending session\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1227 stream->state = XSS_ERROR;
1228 goto done;
1229 default:
1230 if (err_count++ == 0) {
1231 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, iks_recv() error = %s\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1232 }
1233 if (err_count >= 50) {
1234 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, too many iks_recv() error = %s, ending session\n", stream->jid, stream->address, stream->port, iks_net_error_to_string(result));
1235 stream->state = XSS_ERROR;
1236 goto done;
1237 }
1238 }
1239
1240 /* send queued stanzas once stream is authorized for outbound stanzas */
1241 if (!stream->s2s || stream->state == XSS_READY) {
1242 while (switch_queue_trypop(stream->msg_queue, (void *)&msg) == SWITCH_STATUS_SUCCESS) {
1243 if (!stream->s2s || !stream->incoming) {
1244 iks_send_raw(stream->parser, msg);
1245 } else {
1246 /* TODO sent out wrong stream! */
1247 }
1248 iks_free(msg);
1249 stream->idle = 0;
1250 }
1251 }
1252
1253 /* check for shutdown */
1254 if (stream->state != XSS_DESTROY && context->shutdown && stream->state != XSS_SHUTDOWN) {
1255 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_INFO, "%s, %s:%i, detected shutdown\n", stream->jid, stream->address, stream->port);
1256 iks_send_raw(stream->parser, "</stream:stream>");
1257 stream->state = XSS_SHUTDOWN;
1258 stream->idle = 0;
1259 }
1260
1261 if (stream->idle) {
1262 int fdr = 0;
1263
1264 /* send keep-alive ping if idle for a long time */
1265 if (stream->s2s && !stream->incoming && stream->state == XSS_READY && now - last_activity > KEEP_ALIVE_INTERVAL_NS) {
1266 char *ping = switch_mprintf("<iq to=\"%s\" from=\"%s\" type=\"get\" id=\"internal-%d\"><ping xmlns=\""IKS_NS_XMPP_PING"\"/></iq>",
1267 stream->jid, stream->context->domain, ping_id++);
1268 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_DEBUG, "%s, %s:%i, keep alive\n", stream->jid, stream->address, stream->port);
1269 last_activity = now;
1270 iks_send_raw(stream->parser, ping);
1271 free(ping);
1272 }
1273
1274 switch_poll(stream->pollfd, 1, &fdr, 20000);
1275 } else {
1276 last_activity = now;
1277 switch_os_yield();
1278 }
1279 }
1280
1281 done:
1282
1283 if (stream->incoming) {
1284 xmpp_stream_destroy(stream);
1285 switch_thread_rwlock_unlock(context->shutdown_rwlock);
1286 }
1287
1288 return NULL;
1289 }
1290
1291 /**
1292 * Initialize the xmpp stream
1293 * @param context the stream context
1294 * @param stream the stream to initialize
1295 * @param pool for this stream
1296 * @param address remote address
1297 * @param port remote port
1298 * @param s2s true if a server-to-server stream
1299 * @param incoming true if incoming stream
1300 * @return the stream
1301 */
xmpp_stream_init(struct xmpp_stream_context * context,struct xmpp_stream * stream,switch_memory_pool_t * pool,const char * address,int port,int s2s,int incoming)1302 static struct xmpp_stream *xmpp_stream_init(struct xmpp_stream_context *context, struct xmpp_stream *stream, switch_memory_pool_t *pool, const char *address, int port, int s2s, int incoming)
1303 {
1304 stream->context = context;
1305 stream->pool = pool;
1306 if (incoming) {
1307 xmpp_stream_new_id(stream);
1308 }
1309 switch_mutex_init(&stream->mutex, SWITCH_MUTEX_NESTED, pool);
1310 if (!zstr(address)) {
1311 stream->address = switch_core_strdup(pool, address);
1312 }
1313 if (port > 0) {
1314 stream->port = port;
1315 }
1316 stream->s2s = s2s;
1317 stream->incoming = incoming;
1318 switch_queue_create(&stream->msg_queue, MAX_QUEUE_LEN, pool);
1319
1320 /* set up XMPP stream parser */
1321 stream->parser = iks_stream_new(stream->s2s ? IKS_NS_SERVER : IKS_NS_CLIENT, stream, on_stream);
1322
1323 /* enable logging of XMPP stream */
1324 iks_set_log_hook(stream->parser, on_stream_log);
1325
1326 return stream;
1327 }
1328
1329 /**
1330 * Create a new xmpp stream
1331 * @param context the stream context
1332 * @param pool the memory pool for this stream
1333 * @param address remote address
1334 * @param port remote port
1335 * @param s2s true if server-to-server stream
1336 * @param incoming true if incoming stream
1337 * @return the new stream or NULL
1338 */
xmpp_stream_create(struct xmpp_stream_context * context,switch_memory_pool_t * pool,const char * address,int port,int s2s,int incoming)1339 static struct xmpp_stream *xmpp_stream_create(struct xmpp_stream_context *context, switch_memory_pool_t *pool, const char *address, int port, int s2s, int incoming)
1340 {
1341 struct xmpp_stream *stream = NULL;
1342 if (!(stream = switch_core_alloc(pool, sizeof(*stream)))) {
1343 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
1344 return NULL;
1345 }
1346 return xmpp_stream_init(context, stream, pool, address, port, s2s, incoming);
1347 }
1348
1349 /**
1350 * Thread that handles XMPP XML stream
1351 * @param thread this thread
1352 * @param obj the XMPP stream
1353 * @return NULL
1354 */
xmpp_outbound_stream_thread(switch_thread_t * thread,void * obj)1355 static void *SWITCH_THREAD_FUNC xmpp_outbound_stream_thread(switch_thread_t *thread, void *obj)
1356 {
1357 struct xmpp_stream *stream = (struct xmpp_stream *)obj;
1358 struct xmpp_stream_context *context = stream->context;
1359 switch_socket_t *socket;
1360 int warned = 0;
1361
1362 switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1363
1364 /* connect to server */
1365 while (!context->shutdown) {
1366 struct xmpp_stream *new_stream = NULL;
1367 switch_memory_pool_t *pool;
1368 switch_sockaddr_t *sa;
1369
1370 if (switch_sockaddr_info_get(&sa, stream->address, SWITCH_UNSPEC, stream->port, 0, stream->pool) != SWITCH_STATUS_SUCCESS) {
1371 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s:%i, failed to get sockaddr info!\n", stream->address, stream->port);
1372 goto fail;
1373 }
1374
1375 if (switch_socket_create(&socket, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, stream->pool) != SWITCH_STATUS_SUCCESS) {
1376 if (!warned) {
1377 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, failed to create socket!\n", stream->address, stream->port);
1378 }
1379 goto sock_fail;
1380 }
1381
1382 switch_socket_opt_set(socket, SWITCH_SO_KEEPALIVE, 1);
1383 switch_socket_opt_set(socket, SWITCH_SO_TCP_NODELAY, 1);
1384
1385 if (switch_socket_connect(socket, sa) != SWITCH_STATUS_SUCCESS) {
1386 if (!warned) {
1387 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, Socket Error!\n", stream->address, stream->port);
1388 }
1389 goto sock_fail;
1390 }
1391
1392 if (warned) {
1393 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(stream->id), SWITCH_LOG_ERROR, "%s:%i, connected!\n", stream->address, stream->port);
1394 warned = 0;
1395 }
1396
1397 /* run the stream thread */
1398 xmpp_stream_set_socket(stream, socket);
1399 xmpp_stream_thread(thread, stream);
1400
1401 /* re-establish connection if not shutdown */
1402 if (!context->shutdown) {
1403 /* create new stream for reconnection */
1404 switch_core_new_memory_pool(&pool);
1405 new_stream = xmpp_stream_create(stream->context, pool, stream->address, stream->port, 1, 0);
1406 new_stream->jid = switch_core_strdup(pool, stream->jid);
1407 xmpp_stream_destroy(stream);
1408 stream = new_stream;
1409
1410 switch_yield(1000 * 1000); /* 1000 ms */
1411 continue;
1412 }
1413 break;
1414
1415 sock_fail:
1416 if (socket) {
1417 switch_socket_close(socket);
1418 socket = NULL;
1419 }
1420 if (!warned) {
1421 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! Could not connect to %s:%i\n", stream->address, stream->port);
1422 warned = 1;
1423 }
1424 switch_yield(1000 * 1000); /* 1000 ms */
1425 }
1426
1427 fail:
1428
1429 xmpp_stream_destroy(stream);
1430
1431 switch_thread_rwlock_unlock(context->shutdown_rwlock);
1432 return NULL;
1433 }
1434
1435 /**
1436 * Set the id for this stream
1437 * @param stream
1438 * @param id
1439 */
xmpp_stream_set_id(struct xmpp_stream * stream,const char * id)1440 static void xmpp_stream_set_id(struct xmpp_stream *stream, const char *id)
1441 {
1442 struct xmpp_stream_context *context = stream->context;
1443 if (!zstr(stream->id)) {
1444 switch_mutex_lock(context->streams_mutex);
1445 switch_core_hash_delete(context->streams, stream->id);
1446 switch_mutex_unlock(context->streams_mutex);
1447 }
1448 if (!zstr(id)) {
1449 stream->id = switch_core_strdup(stream->pool, id);
1450 switch_mutex_lock(context->streams_mutex);
1451 switch_core_hash_insert(context->streams, stream->id, stream);
1452 switch_mutex_unlock(context->streams_mutex);
1453 } else {
1454 stream->id = NULL;
1455 }
1456 }
1457
1458 /**
1459 * Destroy the listener
1460 * @param server the server
1461 */
xmpp_listener_destroy(struct xmpp_listener * listener)1462 static void xmpp_listener_destroy(struct xmpp_listener *listener)
1463 {
1464 switch_memory_pool_t *pool = listener->pool;
1465
1466 /* shutdown socket */
1467 if (listener->socket) {
1468 switch_socket_shutdown(listener->socket, SWITCH_SHUTDOWN_READWRITE);
1469 switch_socket_close(listener->socket);
1470 }
1471 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "xmpp listener %s:%u closed\n", listener->addr, listener->port);
1472 switch_core_destroy_memory_pool(&pool);
1473 }
1474
1475 /**
1476 * Open a new XMPP stream with a peer server
1477 * @param peer_domain of server - if not set, address is used
1478 * @param peer_address of server - if not set, domain is used
1479 * @param peer_port of server - if not set default port is used
1480 */
xmpp_stream_context_connect(struct xmpp_stream_context * context,const char * peer_domain,const char * peer_address,int peer_port)1481 switch_status_t xmpp_stream_context_connect(struct xmpp_stream_context *context, const char *peer_domain, const char *peer_address, int peer_port)
1482 {
1483 struct xmpp_stream *stream;
1484 switch_memory_pool_t *pool;
1485 switch_thread_t *thread;
1486 switch_threadattr_t *thd_attr = NULL;
1487
1488 if (peer_port <= 0) {
1489 peer_port = IKS_JABBER_SERVER_PORT;
1490 }
1491
1492 if (zstr(peer_address)) {
1493 peer_address = peer_domain;
1494 } else if (zstr(peer_domain)) {
1495 peer_domain = peer_address;
1496 }
1497
1498 /* start outbound stream thread */
1499 switch_core_new_memory_pool(&pool);
1500 stream = xmpp_stream_create(context, pool, peer_address, peer_port, 1, 0);
1501 stream->jid = switch_core_strdup(pool, peer_domain);
1502 switch_threadattr_create(&thd_attr, pool);
1503 switch_threadattr_detach_set(thd_attr, 1);
1504 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1505 switch_thread_create(&thread, thd_attr, xmpp_outbound_stream_thread, stream, pool);
1506
1507 return SWITCH_STATUS_SUCCESS;
1508 }
1509
1510 /**
1511 * Thread that listens for new XMPP connections
1512 * @param thread this thread
1513 * @param obj the listener
1514 * @return NULL
1515 */
xmpp_listener_thread(switch_thread_t * thread,void * obj)1516 static void *SWITCH_THREAD_FUNC xmpp_listener_thread(switch_thread_t *thread, void *obj)
1517 {
1518 struct xmpp_listener *listener = (struct xmpp_listener *)obj;
1519 struct xmpp_stream_context *context = listener->context;
1520 switch_memory_pool_t *pool = NULL;
1521 uint32_t errs = 0;
1522 int warned = 0;
1523
1524 switch_thread_rwlock_rdlock(context->shutdown_rwlock);
1525
1526 /* bind to XMPP port */
1527 while (!context->shutdown) {
1528 switch_status_t rv;
1529 switch_sockaddr_t *sa;
1530 rv = switch_sockaddr_info_get(&sa, listener->addr, SWITCH_UNSPEC, listener->port, 0, listener->pool);
1531 if (rv)
1532 goto fail;
1533 rv = switch_socket_create(&listener->socket, switch_sockaddr_get_family(sa), SOCK_STREAM, SWITCH_PROTO_TCP, listener->pool);
1534 if (rv)
1535 goto sock_fail;
1536 rv = switch_socket_opt_set(listener->socket, SWITCH_SO_REUSEADDR, 1);
1537 if (rv)
1538 goto sock_fail;
1539 #ifdef WIN32
1540 /* Enable dual-stack listening on Windows (if the listening address is IPv6), it's default on Linux */
1541 if (switch_sockaddr_get_family(sa) == AF_INET6) {
1542 rv = switch_socket_opt_set(listener->socket, 16384, 0);
1543 if (rv) goto sock_fail;
1544 }
1545 #endif
1546 rv = switch_socket_bind(listener->socket, sa);
1547 if (rv)
1548 goto sock_fail;
1549 rv = switch_socket_listen(listener->socket, 5);
1550 if (rv)
1551 goto sock_fail;
1552
1553 rv = switch_socket_create_pollset(&listener->read_pollfd, listener->socket, SWITCH_POLLIN | SWITCH_POLLERR, listener->pool);
1554 if (rv) {
1555 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Create pollset for %s listener socket %s:%u error!\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1556 goto sock_fail;
1557 }
1558
1559 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "xmpp %s listener bound to %s:%u\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1560
1561 break;
1562 sock_fail:
1563 if (listener->socket) {
1564 switch_socket_close(listener->socket);
1565 listener->socket = NULL;
1566 }
1567 if (!warned) {
1568 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error! xmpp %s listener could not bind to %s:%u\n", listener->s2s ? "s2s" : "c2s", listener->addr, listener->port);
1569 warned = 1;
1570 }
1571 switch_yield(1000 * 100); /* 100 ms */
1572 }
1573
1574 /* Listen for XMPP client connections */
1575 while (!context->shutdown) {
1576 switch_socket_t *socket = NULL;
1577 switch_status_t rv;
1578 int32_t fdr;
1579
1580 if (pool == NULL && switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
1581 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create memory pool for new client connection!\n");
1582 goto fail;
1583 }
1584
1585 /* is there a new connection? */
1586 rv = switch_poll(listener->read_pollfd, 1, &fdr, 1000 * 1000 /* 1000 ms */);
1587 if (rv != SWITCH_STATUS_SUCCESS) {
1588 continue;
1589 }
1590
1591 /* accept the connection */
1592 if ((rv = switch_socket_accept(&socket, listener->socket, pool))) {
1593 if (context->shutdown) {
1594 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Shutting down xmpp listener\n");
1595 goto end;
1596 } else {
1597 /* I wish we could use strerror_r here but its not defined everywhere =/ */
1598 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Accept connection error [%s]\n", strerror(errno));
1599 if (++errs > 100) {
1600 goto end;
1601 }
1602 }
1603 } else { /* got a new connection */
1604 switch_thread_t *thread;
1605 switch_threadattr_t *thd_attr = NULL;
1606 struct xmpp_stream *stream;
1607 switch_sockaddr_t *sa = NULL;
1608 char remote_ip[50] = { 0 };
1609 int remote_port = 0;
1610
1611 errs = 0;
1612
1613 /* get remote address and port */
1614 if (switch_socket_addr_get(&sa, SWITCH_TRUE, socket) == SWITCH_STATUS_SUCCESS && sa) {
1615 switch_get_addr(remote_ip, sizeof(remote_ip), sa);
1616 remote_port = switch_sockaddr_get_port(sa);
1617 }
1618
1619 if (zstr_buf(remote_ip)) {
1620 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get IP of incoming connection.\n");
1621 switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1622 switch_socket_close(socket);
1623 continue;
1624 }
1625
1626 /* check if connection is allowed */
1627 if (listener->acl) {
1628 if (!switch_check_network_list_ip(remote_ip, listener->acl)) {
1629 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "ACL %s denies access to %s.\n", listener->acl, remote_ip);
1630 switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1631 switch_socket_close(socket);
1632 continue;
1633 }
1634 }
1635
1636 /* start connection thread */
1637 if (!(stream = xmpp_stream_create(context, pool, remote_ip, remote_port, listener->s2s, 1))) {
1638 switch_socket_shutdown(socket, SWITCH_SHUTDOWN_READWRITE);
1639 switch_socket_close(socket);
1640 break;
1641 }
1642 xmpp_stream_set_socket(stream, socket);
1643 pool = NULL; /* connection now owns the pool */
1644 switch_threadattr_create(&thd_attr, stream->pool);
1645 switch_threadattr_detach_set(thd_attr, 1);
1646 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1647 switch_thread_create(&thread, thd_attr, xmpp_stream_thread, stream, stream->pool);
1648 }
1649 }
1650
1651 end:
1652
1653 if (pool) {
1654 switch_core_destroy_memory_pool(&pool);
1655 }
1656
1657 fail:
1658
1659 xmpp_listener_destroy(listener);
1660
1661 switch_thread_rwlock_unlock(context->shutdown_rwlock);
1662 return NULL;
1663 }
1664
1665 /**
1666 * Add a new socket to listen for XMPP client/server connections.
1667 * @param context the XMPP context
1668 * @param addr the IP address
1669 * @param port the port
1670 * @param is_s2s true if s2s
1671 * @param acl name of optional access control list
1672 * @return SWITCH_STATUS_SUCCESS if successful
1673 */
xmpp_stream_context_listen(struct xmpp_stream_context * context,const char * addr,int port,int is_s2s,const char * acl)1674 switch_status_t xmpp_stream_context_listen(struct xmpp_stream_context *context, const char *addr, int port, int is_s2s, const char *acl)
1675 {
1676 switch_memory_pool_t *pool;
1677 struct xmpp_listener *new_listener = NULL;
1678 switch_thread_t *thread;
1679 switch_threadattr_t *thd_attr = NULL;
1680
1681 if (zstr(addr)) {
1682 return SWITCH_STATUS_FALSE;
1683 }
1684
1685 switch_core_new_memory_pool(&pool);
1686 new_listener = switch_core_alloc(pool, sizeof(*new_listener));
1687 new_listener->pool = pool;
1688 new_listener->addr = switch_core_strdup(pool, addr);
1689 if (!zstr(acl)) {
1690 new_listener->acl = switch_core_strdup(pool, acl);
1691 }
1692
1693 new_listener->s2s = is_s2s;
1694 if (port <= 0) {
1695 new_listener->port = is_s2s ? IKS_JABBER_SERVER_PORT : IKS_JABBER_PORT;
1696 } else {
1697 new_listener->port = port;
1698 }
1699 new_listener->context = context;
1700
1701 /* start the server thread */
1702 switch_threadattr_create(&thd_attr, pool);
1703 switch_threadattr_detach_set(thd_attr, 1);
1704 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1705 switch_thread_create(&thread, thd_attr, xmpp_listener_thread, new_listener, pool);
1706
1707 return SWITCH_STATUS_SUCCESS;
1708 }
1709
1710 /**
1711 * Queue a message for delivery
1712 */
xmpp_stream_context_send(struct xmpp_stream_context * context,const char * jid,iks * msg)1713 void xmpp_stream_context_send(struct xmpp_stream_context *context, const char *jid, iks *msg)
1714 {
1715 if (!zstr(jid)) {
1716 if (msg) {
1717 struct xmpp_stream *stream;
1718 switch_mutex_lock(context->streams_mutex);
1719 stream = switch_core_hash_find(context->routes, jid);
1720 if (stream) {
1721 char *raw = iks_string(NULL, msg);
1722 if (switch_queue_trypush(stream->msg_queue, raw) != SWITCH_STATUS_SUCCESS) {
1723 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s, %s:%i, failed to deliver outbound message via %s!\n", stream->jid, stream->address, stream->port, jid);
1724 iks_free(raw);
1725 }
1726 } else {
1727 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s stream is gone\n", jid);
1728 /* TODO automatically open connection if valid domain JID? */
1729 }
1730 switch_mutex_unlock(context->streams_mutex);
1731 } else {
1732 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing message\n");
1733 }
1734 } else {
1735 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing stream JID\n");
1736 }
1737 }
1738
1739 /**
1740 * Dump xmpp stream stats
1741 */
xmpp_stream_context_dump(struct xmpp_stream_context * context,switch_stream_handle_t * stream)1742 void xmpp_stream_context_dump(struct xmpp_stream_context *context, switch_stream_handle_t *stream)
1743 {
1744 switch_hash_index_t *hi;
1745 switch_mutex_lock(context->streams_mutex);
1746 stream->write_function(stream, "\nACTIVE STREAMS\n");
1747 for (hi = switch_core_hash_first(context->streams); hi; hi = switch_core_hash_next(&hi)) {
1748 struct xmpp_stream *s = NULL;
1749 const void *key;
1750 void *val;
1751 switch_core_hash_this(hi, &key, NULL, &val);
1752 s = (struct xmpp_stream *)val;
1753 switch_assert(s);
1754 stream->write_function(stream, " TYPE='%s_%s',ID='%s',JID='%s',REMOTE_ADDRESS='%s',REMOTE_PORT=%i,STATE='%s'\n", s->s2s ? "s2s" : "c2s", s->incoming ? "in" : "out", s->id, s->jid, s->address, s->port, xmpp_stream_state_to_string(s->state));
1755 }
1756 switch_mutex_unlock(context->streams_mutex);
1757 }
1758
1759 /**
1760 * Create a new XMPP stream context
1761 * @param domain for new streams
1762 * @param domain_secret domain shared secret for server dialback
1763 * @param bind_cb callback function when a resource is bound to a new stream
1764 * @param ready callback function when new stream is ready
1765 * @param recv callback function when a new stanza is received
1766 * @param destroy callback function when a stream is destroyed
1767 * @return the context
1768 */
xmpp_stream_context_create(const char * domain,const char * domain_secret,xmpp_stream_bind_callback bind_cb,xmpp_stream_ready_callback ready,xmpp_stream_recv_callback recv,xmpp_stream_destroy_callback destroy)1769 struct xmpp_stream_context *xmpp_stream_context_create(const char *domain, const char *domain_secret, xmpp_stream_bind_callback bind_cb, xmpp_stream_ready_callback ready, xmpp_stream_recv_callback recv, xmpp_stream_destroy_callback destroy)
1770 {
1771 switch_memory_pool_t *pool;
1772 struct xmpp_stream_context *context;
1773
1774 switch_core_new_memory_pool(&pool);
1775 context = switch_core_alloc(pool, sizeof(*context));
1776 context->pool = pool;
1777 switch_mutex_init(&context->streams_mutex, SWITCH_MUTEX_NESTED, context->pool);
1778 switch_core_hash_init(&context->routes);
1779 switch_core_hash_init(&context->streams);
1780 context->dialback_secret = switch_core_strdup(context->pool, domain_secret);
1781 context->bind_callback = bind_cb;
1782 context->ready_callback = ready;
1783 context->destroy_callback = destroy;
1784 context->recv_callback = recv;
1785 context->shutdown = 0;
1786 context->domain = switch_core_strdup(context->pool, domain);
1787 switch_thread_rwlock_create(&context->shutdown_rwlock, context->pool);
1788 switch_core_hash_init(&context->users);
1789
1790 return context;
1791 }
1792
1793 /**
1794 * Add an authorized user
1795 * @param context the context to add user to
1796 * @param user the username
1797 * @param password the password
1798 */
xmpp_stream_context_add_user(struct xmpp_stream_context * context,const char * user,const char * password)1799 void xmpp_stream_context_add_user(struct xmpp_stream_context *context, const char *user, const char *password)
1800 {
1801 switch_core_hash_insert(context->users, user, switch_core_strdup(context->pool, password));
1802 }
1803
1804 /**
1805 * Destroy an XMPP stream context. All open streams are closed.
1806 * @param context to destroy
1807 */
xmpp_stream_context_destroy(struct xmpp_stream_context * context)1808 void xmpp_stream_context_destroy(struct xmpp_stream_context *context)
1809 {
1810 switch_memory_pool_t *pool;
1811 context->shutdown = 1;
1812 /* wait for threads to finish */
1813 switch_thread_rwlock_wrlock(context->shutdown_rwlock);
1814 switch_core_hash_destroy(&context->routes);
1815 switch_core_hash_destroy(&context->streams);
1816 switch_core_hash_destroy(&context->users);
1817 pool = context->pool;
1818 switch_core_destroy_memory_pool(&pool);
1819 }
1820
1821 /**
1822 * @param stream
1823 * @return true if server-to-server stream
1824 */
xmpp_stream_is_s2s(struct xmpp_stream * stream)1825 int xmpp_stream_is_s2s(struct xmpp_stream *stream)
1826 {
1827 return stream->s2s;
1828 }
1829
1830 /**
1831 * @param stream
1832 * @return true if incoming stream
1833 */
xmpp_stream_is_incoming(struct xmpp_stream * stream)1834 int xmpp_stream_is_incoming(struct xmpp_stream *stream)
1835 {
1836 return stream->incoming;
1837 }
1838
1839 /**
1840 * @param stream
1841 * @return the stream JID
1842 */
xmpp_stream_get_jid(struct xmpp_stream * stream)1843 const char *xmpp_stream_get_jid(struct xmpp_stream *stream)
1844 {
1845 return stream->jid;
1846 }
1847
1848 /**
1849 * Set private data for this stream
1850 */
xmpp_stream_set_private(struct xmpp_stream * stream,void * user_private)1851 void xmpp_stream_set_private(struct xmpp_stream *stream, void *user_private)
1852 {
1853 stream->user_private = user_private;
1854 }
1855
1856 /**
1857 * Get private data for this stream
1858 */
xmpp_stream_get_private(struct xmpp_stream * stream)1859 void *xmpp_stream_get_private(struct xmpp_stream *stream)
1860 {
1861 return stream->user_private;
1862 }
1863
1864 /**
1865 * Add PEM cert file to stream for new SSL connections
1866 */
xmpp_stream_context_add_cert(struct xmpp_stream_context * context,const char * cert_pem_file)1867 void xmpp_stream_context_add_cert(struct xmpp_stream_context *context, const char *cert_pem_file)
1868 {
1869 context->cert_pem_file = switch_core_strdup(context->pool, cert_pem_file);
1870 }
1871
1872 /**
1873 * Add PEM key file to stream for new SSL connections
1874 */
xmpp_stream_context_add_key(struct xmpp_stream_context * context,const char * key_pem_file)1875 void xmpp_stream_context_add_key(struct xmpp_stream_context *context, const char *key_pem_file)
1876 {
1877 context->key_pem_file = switch_core_strdup(context->pool, key_pem_file);
1878 }
1879
1880
1881 /* For Emacs:
1882 * Local Variables:
1883 * mode:c
1884 * indent-tabs-mode:t
1885 * tab-width:4
1886 * c-basic-offset:4
1887 * End:
1888 * For VIM:
1889 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
1890 */
1891