1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  * Andrew Thompson <andrew@hijacked.us>
28  * Rob Charlton <rob.charlton@savageminds.com>
29  * Darren Schreiber <d@d-man.org>
30  * Mike Jerris <mike@jerris.com>
31  * Tamas Cseke <tamas.cseke@virtual-call-center.eu>
32  *
33  *
34  * handle_msg.c -- handle messages received from erlang nodes
35  *
36  */
37 #include <switch.h>
38 #include <ei.h>
39 #include "mod_erlang_event.h"
40 
41 static char *MARKER = "1";
42 
43 static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf);
44 
api_exec(switch_thread_t * thread,void * obj)45 static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
46 {
47 	switch_bool_t r = SWITCH_TRUE;
48 	struct api_command_struct *acs = (struct api_command_struct *) obj;
49 	switch_stream_handle_t stream = { 0 };
50 	char *reply, *freply = NULL;
51 	switch_status_t status;
52 
53 	if (!acs) {
54 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
55 		return NULL;
56 	}
57 
58 	if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
59 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
60 		goto done;
61 	}
62 
63 	SWITCH_STANDARD_STREAM(stream);
64 
65 	if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
66 		reply = stream.data;
67 	} else {
68 		freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
69 		reply = freply;
70 		r = SWITCH_FALSE;
71 	}
72 
73 	if (!reply) {
74 		reply = "Command returned no output!";
75 		r = SWITCH_FALSE;
76 	}
77 
78 	if (*reply == '-')
79 		r = SWITCH_FALSE;
80 
81 	if (acs->bg) {
82 		switch_event_t *event;
83 
84 		if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
85 			ei_x_buff ebuf;
86 
87 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
88 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
89 
90 			ei_x_new_with_version(&ebuf);
91 
92 			if (acs->arg) {
93 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
94 			}
95 
96 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false");
97 			switch_event_add_body(event, "%s", reply);
98 
99 			switch_event_fire(&event);
100 
101 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
102 
103 			ei_x_encode_tuple_header(&ebuf, 3);
104 
105 			if (r)
106 				ei_x_encode_atom(&ebuf, "bgok");
107 			else
108 				ei_x_encode_atom(&ebuf, "bgerror");
109 
110 			_ei_x_encode_string(&ebuf, acs->uuid_str);
111 			_ei_x_encode_string(&ebuf, reply);
112 
113 			switch_mutex_lock(acs->listener->sock_mutex);
114 			ei_send(acs->listener->sockdes, &acs->pid, ebuf.buff, ebuf.index);
115 			switch_mutex_unlock(acs->listener->sock_mutex);
116 #ifdef EI_DEBUG
117 			ei_x_print_msg(&ebuf, &acs->pid, 1);
118 #endif
119 
120 			ei_x_free(&ebuf);
121 		}
122 	} else {
123 		ei_x_buff rbuf;
124 		ei_x_new_with_version(&rbuf);
125 		ei_x_encode_tuple_header(&rbuf, 2);
126 
127 		if (!strlen(reply)) {
128 			reply = "Command returned no output!";
129 			r = SWITCH_FALSE;
130 		}
131 
132 		if (r) {
133 			ei_x_encode_atom(&rbuf, "ok");
134 		} else {
135 			ei_x_encode_atom(&rbuf, "error");
136 		}
137 
138 		_ei_x_encode_string(&rbuf, reply);
139 
140 
141 		switch_mutex_lock(acs->listener->sock_mutex);
142 		ei_send(acs->listener->sockdes, &acs->pid, rbuf.buff, rbuf.index);
143 		switch_mutex_unlock(acs->listener->sock_mutex);
144 #ifdef EI_DEBUG
145 		ei_x_print_msg(&rbuf, &acs->pid, 1);
146 #endif
147 
148 		ei_x_free(&rbuf);
149 	}
150 
151 	switch_safe_free(stream.data);
152 	switch_safe_free(freply);
153 
154 	if (acs->listener->rwlock) {
155 		switch_thread_rwlock_unlock(acs->listener->rwlock);
156 	}
157 
158   done:
159 	if (acs->bg) {
160 		switch_memory_pool_t *pool = acs->pool;
161 		acs = NULL;
162 		switch_core_destroy_memory_pool(&pool);
163 		pool = NULL;
164 	}
165 	return NULL;
166 
167 }
168 
handle_msg_fetch_reply(listener_t * listener,ei_x_buff * buf,ei_x_buff * rbuf)169 static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * buf, ei_x_buff * rbuf)
170 {
171 	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
172 	fetch_reply_t *p;
173 
174 	if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
175 		ei_x_encode_tuple_header(rbuf, 2);
176 		ei_x_encode_atom(rbuf, "error");
177 		ei_x_encode_atom(rbuf, "badarg");
178 	} else {
179 
180 		/* reply mutex is locked */
181 		if ((p = find_fetch_reply(uuid_str))) {
182 			switch (p->state) {
183 			case reply_waiting:
184 				{
185 					/* clone the reply so it doesn't get destroyed on us */
186 					ei_x_buff *nbuf = malloc(sizeof(*nbuf));
187 					nbuf->buff = malloc(buf->buffsz);
188 					memcpy(nbuf->buff, buf->buff, buf->buffsz);
189 					nbuf->index = buf->index;
190 					nbuf->buffsz = buf->buffsz;
191 
192 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str);
193 
194 					/* copy info into the reply struct */
195 					p->state = reply_found;
196 					p->reply = nbuf;
197 					strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
198 
199 					/* signal waiting thread that its time to wake up */
200 					switch_thread_cond_signal(p->ready_or_found);
201 					/* reply OK */
202 					ei_x_encode_tuple_header(rbuf, 2);
203 					ei_x_encode_atom(rbuf, "ok");
204 					_ei_x_encode_string(rbuf, uuid_str);
205 					break;
206 				};
207 			case reply_found:
208 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
209 				ei_x_encode_tuple_header(rbuf, 3);
210 				ei_x_encode_atom(rbuf, "error");
211 				_ei_x_encode_string(rbuf, uuid_str);
212 				ei_x_encode_atom(rbuf, "duplicate_response");
213 				break;
214 			case reply_timeout:
215 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
216 				ei_x_encode_tuple_header(rbuf, 3);
217 				ei_x_encode_atom(rbuf, "error");
218 				_ei_x_encode_string(rbuf, uuid_str);
219 				ei_x_encode_atom(rbuf, "timeout");
220 				break;
221 			case reply_not_ready:
222 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
223 				ei_x_encode_tuple_header(rbuf, 3);
224 				ei_x_encode_atom(rbuf, "error");
225 				_ei_x_encode_string(rbuf, uuid_str);
226 				ei_x_encode_atom(rbuf, "not_ready");
227 				break;
228 			}
229 
230 			switch_mutex_unlock(p->mutex);
231 		} else {
232 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str);
233 			ei_x_encode_tuple_header(rbuf, 2);
234 			ei_x_encode_atom(rbuf, "error");
235 			ei_x_encode_atom(rbuf, "invalid_uuid");
236 		}
237 	}
238 
239 	return SWITCH_STATUS_SUCCESS;
240 }
241 
handle_msg_set_log_level(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)242 static switch_status_t handle_msg_set_log_level(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
243 {
244 	switch_log_level_t ltype = SWITCH_LOG_DEBUG;
245 	char loglevelstr[MAXATOMLEN];
246 	if (arity != 2 || ei_decode_atom(buf->buff, &buf->index, loglevelstr)) {
247 		ei_x_encode_tuple_header(rbuf, 2);
248 		ei_x_encode_atom(rbuf, "error");
249 		ei_x_encode_atom(rbuf, "badarg");
250 	} else {
251 		ltype = switch_log_str2level(loglevelstr);
252 
253 		if (ltype && ltype != SWITCH_LOG_INVALID) {
254 			listener->level = ltype;
255 			ei_x_encode_atom(rbuf, "ok");
256 		} else {
257 			ei_x_encode_tuple_header(rbuf, 2);
258 			ei_x_encode_atom(rbuf, "error");
259 			ei_x_encode_atom(rbuf, "badarg");
260 		}
261 	}
262 	return SWITCH_STATUS_SUCCESS;
263 }
264 
handle_msg_event(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)265 static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
266 {
267 	char atom[MAXATOMLEN];
268 
269 	if (arity == 1) {
270 		ei_x_encode_tuple_header(rbuf, 2);
271 		ei_x_encode_atom(rbuf, "error");
272 		ei_x_encode_atom(rbuf, "badarg");
273 	} else {
274 		int custom = 0;
275 		switch_event_types_t type;
276 		int i = 0;
277 
278 		if (!switch_test_flag(listener, LFLAG_EVENTS)) {
279 			switch_set_flag_locked(listener, LFLAG_EVENTS);
280 		}
281 
282 		switch_thread_rwlock_wrlock(listener->event_rwlock);
283 
284 		for (i = 1; i < arity; i++) {
285 			if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
286 
287 				if (custom) {
288 					switch_core_hash_insert(listener->event_hash, atom, MARKER);
289 				} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
290 					if (type == SWITCH_EVENT_ALL) {
291 						uint32_t x = 0;
292 
293 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
294 						for (x = 0; x < SWITCH_EVENT_ALL; x++) {
295 							listener->event_list[x] = 1;
296 						}
297 					}
298 					if (type <= SWITCH_EVENT_ALL) {
299 						listener->event_list[type] = 1;
300 					}
301 					if (type == SWITCH_EVENT_CUSTOM) {
302 						custom++;
303 					}
304 
305 				}
306 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
307 			}
308 		}
309 		switch_thread_rwlock_unlock(listener->event_rwlock);
310 
311 		ei_x_encode_atom(rbuf, "ok");
312 	}
313 	return SWITCH_STATUS_SUCCESS;
314 }
315 
handle_msg_filter(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)316 static switch_status_t handle_msg_filter(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
317 {
318 	char atom[MAXATOMLEN];
319 	char reply[MAXATOMLEN]= "";
320 	char *header_name = NULL;
321 	char *header_val = NULL;
322 
323 	if (arity == 1) {
324 		ei_x_encode_tuple_header(rbuf, 2);
325 		ei_x_encode_atom(rbuf, "error");
326 		ei_x_encode_atom(rbuf, "badarg");
327 	} else {
328 		int i = 0;
329 
330 		ei_x_encode_tuple_header(rbuf, 2);
331 		ei_x_encode_atom(rbuf, "filter_command_processing_log");
332 		ei_x_encode_list_header(rbuf, arity - 1);
333 
334 		switch_thread_rwlock_wrlock(listener->event_rwlock);
335 
336 		switch_mutex_lock(listener->filter_mutex);
337 		if (!listener->filters) {
338 			switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
339 			switch_clear_flag(listener->filters, EF_UNIQ_HEADERS);
340 		}
341 
342 		for (i = 1; i < arity; i++) {
343 			if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
344 				header_name=atom;
345 
346 				while (header_name && *header_name && *header_name == ' ')
347 				header_name++;
348 
349 				if ((header_val = strchr(atom, ' '))) {
350 					*header_val++ = '\0';
351 				}
352 
353 				if (!strcasecmp(header_name, "delete") && header_val) {
354 					header_name = header_val;
355 					if ((header_val = strchr(header_name, ' '))) {
356 						*header_val++ = '\0';
357 					}
358 					if (!strcasecmp(header_name, "all")) {
359 						switch_event_destroy(&listener->filters);
360 						switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
361 					} else {
362 						switch_event_del_header_val(listener->filters, header_name, header_val);
363 					}
364 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter deleted. [%s]=[%s]", header_name, switch_str_nil(header_val));
365 					ei_x_encode_tuple_header(rbuf, 3);
366 					_ei_x_encode_string(rbuf, "deleted");
367 					_ei_x_encode_string(rbuf, header_name);
368 					_ei_x_encode_string(rbuf, switch_str_nil(header_val));
369 				} else if (header_val) {
370 					if (!strcasecmp(header_name, "add")) {
371 						header_name = header_val;
372 						if ((header_val = strchr(header_name, ' '))) {
373 							*header_val++ = '\0';
374 						}
375 					}
376 					switch_event_add_header_string(listener->filters, SWITCH_STACK_BOTTOM, header_name, header_val);
377 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter added. [%s]=[%s]", header_name, header_val);
378 					ei_x_encode_tuple_header(rbuf, 3);
379 					_ei_x_encode_string(rbuf, "added");
380 					_ei_x_encode_string(rbuf, header_name);
381 					_ei_x_encode_string(rbuf, header_val);
382 				} else {
383 					switch_snprintf(reply, MAXATOMLEN, "-ERR invalid syntax");
384 					ei_x_encode_atom(rbuf, "-ERR invalid syntax");
385 				}
386 			}
387 		}
388 
389 		switch_mutex_unlock(listener->filter_mutex);
390 		switch_thread_rwlock_unlock(listener->event_rwlock);
391 
392 		ei_x_encode_empty_list(rbuf);
393 	}
394 	return SWITCH_STATUS_SUCCESS;
395 }
396 
handle_msg_session_event(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)397 static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
398 {
399 	char atom[MAXATOMLEN];
400 
401 	if (arity == 1) {
402 		ei_x_encode_tuple_header(rbuf, 2);
403 		ei_x_encode_atom(rbuf, "error");
404 		ei_x_encode_atom(rbuf, "badarg");
405 	} else {
406 		session_elem_t *session;
407 		if ((session = find_session_elem_by_pid(listener, &msg->from))) {
408 
409 			int custom = 0;
410 			switch_event_types_t type;
411 			int i = 0;
412 
413 			switch_thread_rwlock_wrlock(session->event_rwlock);
414 
415 			for (i = 1; i < arity; i++) {
416 				if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
417 
418 					if (custom) {
419 						switch_core_hash_insert(session->event_hash, atom, MARKER);
420 					} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
421 						if (type == SWITCH_EVENT_ALL) {
422 							uint32_t x = 0;
423 
424 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled for %s\n", session->uuid_str);
425 							for (x = 0; x < SWITCH_EVENT_ALL; x++) {
426 								session->event_list[x] = 1;
427 							}
428 						}
429 						if (type <= SWITCH_EVENT_ALL) {
430 							session->event_list[type] = 1;
431 						}
432 						if (type == SWITCH_EVENT_CUSTOM) {
433 							custom++;
434 						}
435 
436 					}
437 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
438 				}
439 			}
440 
441 			switch_thread_rwlock_unlock(session->event_rwlock);
442 			switch_thread_rwlock_unlock(session->rwlock);
443 
444 			ei_x_encode_atom(rbuf, "ok");
445 		} else {
446 			ei_x_encode_tuple_header(rbuf, 2);
447 			ei_x_encode_atom(rbuf, "error");
448 			ei_x_encode_atom(rbuf, "notlistening");
449 		}
450 	}
451 	return SWITCH_STATUS_SUCCESS;
452 }
453 
handle_msg_nixevent(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)454 static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
455 {
456 	char atom[MAXATOMLEN];
457 
458 	if (arity == 1) {
459 		ei_x_encode_tuple_header(rbuf, 2);
460 		ei_x_encode_atom(rbuf, "error");
461 		ei_x_encode_atom(rbuf, "badarg");
462 	} else {
463 		int custom = 0;
464 		int i = 0;
465 		switch_event_types_t type;
466 
467 		switch_thread_rwlock_wrlock(listener->event_rwlock);
468 
469 		for (i = 1; i < arity; i++) {
470 			if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
471 
472 				if (custom) {
473 
474 					switch_core_hash_delete(listener->event_hash, atom);
475 				} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
476 					uint32_t x = 0;
477 
478 					if (type == SWITCH_EVENT_CUSTOM) {
479 						custom++;
480 					} else if (type == SWITCH_EVENT_ALL) {
481 						for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
482 							listener->event_list[x] = 0;
483 						}
484 					} else {
485 						if (listener->event_list[SWITCH_EVENT_ALL]) {
486 							listener->event_list[SWITCH_EVENT_ALL] = 0;
487 							for (x = 0; x < SWITCH_EVENT_ALL; x++) {
488 								listener->event_list[x] = 1;
489 							}
490 						}
491 						listener->event_list[type] = 0;
492 					}
493 				}
494 			}
495 		}
496 
497 		switch_thread_rwlock_unlock(listener->event_rwlock);
498 		ei_x_encode_atom(rbuf, "ok");
499 	}
500 	return SWITCH_STATUS_SUCCESS;
501 }
502 
handle_msg_session_nixevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)503 static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
504 {
505 	char atom[MAXATOMLEN];
506 
507 	if (arity == 1) {
508 		ei_x_encode_tuple_header(rbuf, 2);
509 		ei_x_encode_atom(rbuf, "error");
510 		ei_x_encode_atom(rbuf, "badarg");
511 	} else {
512 		session_elem_t *session;
513 		if ((session = find_session_elem_by_pid(listener, &msg->from))) {
514 			int custom = 0;
515 			int i = 0;
516 			switch_event_types_t type;
517 
518 			switch_thread_rwlock_wrlock(session->event_rwlock);
519 
520 			for (i = 1; i < arity; i++) {
521 				if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
522 
523 					if (custom) {
524 						switch_core_hash_delete(session->event_hash, atom);
525 					} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
526 						uint32_t x = 0;
527 
528 						if (type == SWITCH_EVENT_CUSTOM) {
529 							custom++;
530 						} else if (type == SWITCH_EVENT_ALL) {
531 							for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
532 								session->event_list[x] = 0;
533 							}
534 						} else {
535 							if (session->event_list[SWITCH_EVENT_ALL]) {
536 								session->event_list[SWITCH_EVENT_ALL] = 0;
537 								for (x = 0; x < SWITCH_EVENT_ALL; x++) {
538 									session->event_list[x] = 1;
539 								}
540 							}
541 							session->event_list[type] = 0;
542 						}
543 					}
544 				}
545 			}
546 			switch_thread_rwlock_unlock(session->event_rwlock);
547 			switch_thread_rwlock_unlock(session->rwlock);
548 
549 			ei_x_encode_atom(rbuf, "ok");
550 		} else { /* no session for this pid */
551 			ei_x_encode_tuple_header(rbuf, 2);
552 			ei_x_encode_atom(rbuf, "error");
553 			ei_x_encode_atom(rbuf, "notlistening");
554 		}
555 	}
556 	return SWITCH_STATUS_SUCCESS;
557 }
558 
559 // Nix's all events, then sets up a listener for the given ones.
560 // meant to ensure that no events are missed during this common operation.
handle_msg_setevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)561 static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
562 {
563 	char atom[MAXATOMLEN];
564 
565 	if(arity == 1) {
566 		ei_x_encode_tuple_header(rbuf, 2);
567 		ei_x_encode_atom(rbuf, "error");
568 		ei_x_encode_atom(rbuf, "badarg");
569 	} else {
570 		uint8_t event_list[SWITCH_EVENT_ALL + 1];
571 		switch_hash_t *event_hash;
572 		uint32_t x = 0;
573 		int custom = 0;
574 		switch_event_types_t type;
575 		int i = 0;
576 
577 		/* clear any previous event registrations */
578 		for(x = 0; x <= SWITCH_EVENT_ALL; x++) {
579 			event_list[x] = 0;
580 		}
581 
582 		/* create new hash */
583 		switch_core_hash_init(&event_hash);
584 
585 		if(!switch_test_flag(listener, LFLAG_EVENTS)) {
586 			switch_set_flag_locked(listener, LFLAG_EVENTS);
587 		}
588 
589 		for(i = 1; i < arity; i++){
590 			if(!ei_decode_atom(buf->buff, &buf->index, atom)){
591 
592 				if(custom){
593 					switch_core_hash_insert(event_hash, atom, MARKER);
594 				} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
595 					if (type == SWITCH_EVENT_ALL) {
596 						ei_x_encode_tuple_header(rbuf, 2);
597 						ei_x_encode_atom(rbuf, "error");
598 						ei_x_encode_atom(rbuf, "badarg");
599 						break;
600 					}
601 					if (type <= SWITCH_EVENT_ALL) {
602 						event_list[type] = 1;
603 					}
604 					if (type == SWITCH_EVENT_CUSTOM) {
605 						custom++;
606 					}
607 				}
608 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
609 			}
610 		}
611 		/* update the event subscriptions with the new ones */
612 		switch_thread_rwlock_wrlock(listener->event_rwlock);
613 		memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
614 		switch_core_hash_destroy(&listener->event_hash);
615 		listener->event_hash = event_hash;
616 		switch_thread_rwlock_unlock(listener->event_rwlock);
617 
618 		/* TODO - we should flush any non-matching events from the queue */
619 		ei_x_encode_atom(rbuf, "ok");
620 	}
621 	return SWITCH_STATUS_SUCCESS;
622 }
623 
handle_msg_session_setevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)624 static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
625 {
626 	char atom[MAXATOMLEN];
627 
628 	if (arity == 1){
629 		ei_x_encode_tuple_header(rbuf, 2);
630 		ei_x_encode_atom(rbuf, "error");
631 		ei_x_encode_atom(rbuf, "badarg");
632 	} else {
633 		session_elem_t *session;
634 		if ((session = find_session_elem_by_pid(listener, &msg->from))) {
635 			uint8_t event_list[SWITCH_EVENT_ALL + 1];
636 			switch_hash_t *event_hash;
637 			int custom = 0;
638 			int i = 0;
639 			switch_event_types_t type;
640 			uint32_t x = 0;
641 
642 			/* clear any previous event registrations */
643 			for (x = 0; x <= SWITCH_EVENT_ALL; x++){
644 				event_list[x] = 0;
645 			}
646 
647 			/* create new hash */
648 			switch_core_hash_init(&event_hash);
649 
650 			for (i = 1; i < arity; i++){
651 				if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
652 					if (custom) {
653 						switch_core_hash_insert(event_hash, atom, MARKER);
654 					} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
655 						if (type == SWITCH_EVENT_ALL) {
656 							ei_x_encode_tuple_header(rbuf, 1);
657 							ei_x_encode_atom(rbuf, "error");
658 							ei_x_encode_atom(rbuf, "badarg");
659 							break;
660 						}
661 						if (type <= SWITCH_EVENT_ALL) {
662 							event_list[type] = 1;
663 						}
664 						if (type == SWITCH_EVENT_CUSTOM) {
665 							custom++;
666 						}
667 					}
668 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
669 				}
670 			}
671 
672 			/* update the event subscriptions with the new ones */
673 			switch_thread_rwlock_wrlock(session->event_rwlock);
674 			memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
675 			/* wipe the old hash, and point the pointer at the new one */
676 			switch_core_hash_destroy(&session->event_hash);
677 			session->event_hash = event_hash;
678 			switch_thread_rwlock_unlock(session->event_rwlock);
679 
680 			switch_thread_rwlock_unlock(session->rwlock);
681 
682 			/* TODO - we should flush any non-matching events from the queue */
683 			ei_x_encode_atom(rbuf, "ok");
684 		} else { /* no session for this pid */
685 			ei_x_encode_tuple_header(rbuf, 2);
686 			ei_x_encode_atom(rbuf, "error");
687 			ei_x_encode_atom(rbuf, "notlistening");
688 		}
689 	}
690 	return SWITCH_STATUS_SUCCESS;
691 }
692 
handle_msg_api(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)693 static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
694 {
695 	char api_cmd[MAXATOMLEN];
696 	int type;
697 	int size;
698 	char *arg;
699 	switch_bool_t fail = SWITCH_FALSE;
700 
701 	if (arity < 3) {
702 		fail = SWITCH_TRUE;
703 	}
704 
705 	ei_get_type(buf->buff, &buf->index, &type, &size);
706 
707 	if ((size > (sizeof(api_cmd) - 1)) || ei_decode_atom(buf->buff, &buf->index, api_cmd)) {
708 		fail = SWITCH_TRUE;
709 	}
710 
711 	ei_get_type(buf->buff, &buf->index, &type, &size);
712 	arg = malloc(size + 1);
713 
714 	if (ei_decode_string_or_binary(buf->buff, &buf->index, size, arg)) {
715 		fail = SWITCH_TRUE;
716 	}
717 
718 	if (!fail) {
719 		struct api_command_struct acs = { 0 };
720 		acs.listener = listener;
721 		acs.api_cmd = api_cmd;
722 		acs.arg = arg;
723 		acs.bg = 0;
724 		acs.pid = msg->from;
725 		api_exec(NULL, (void *) &acs);
726 
727 		switch_safe_free(arg);
728 
729 		/* don't reply */
730 		return SWITCH_STATUS_FALSE;
731 	} else {
732 		ei_x_encode_tuple_header(rbuf, 2);
733 		ei_x_encode_atom(rbuf, "error");
734 		ei_x_encode_atom(rbuf, "badarg");
735 		return SWITCH_STATUS_SUCCESS;
736 
737 	}
738 }
739 
handle_msg_bgapi(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)740 static switch_status_t handle_msg_bgapi(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
741 {
742 	char api_cmd[MAXATOMLEN];
743 	char *arg = NULL;
744 	int size, type;
745 
746 	if (arity < 3 ||
747 		ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
748 		ei_get_type(buf->buff, &buf->index, &type, &size) ||
749 		!(arg = malloc(size + 1)) ||
750 		ei_decode_string_or_binary(buf->buff, &buf->index, size, arg)) {
751 
752 		ei_x_encode_tuple_header(rbuf, 2);
753 		ei_x_encode_atom(rbuf, "error");
754 		ei_x_encode_atom(rbuf, "badarg");
755 	} else {
756 		struct api_command_struct *acs = NULL;
757 		switch_memory_pool_t *pool;
758 		switch_thread_t *thread;
759 		switch_threadattr_t *thd_attr = NULL;
760 		switch_uuid_t uuid;
761 
762 		switch_core_new_memory_pool(&pool);
763 		acs = switch_core_alloc(pool, sizeof(*acs));
764 		switch_assert(acs);
765 		acs->pool = pool;
766 		acs->listener = listener;
767 		acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
768 		acs->arg = switch_core_strdup(acs->pool, arg);
769 		acs->bg = 1;
770 		acs->pid = msg->from;
771 
772 		switch_threadattr_create(&thd_attr, acs->pool);
773 		switch_threadattr_detach_set(thd_attr, 1);
774 		switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
775 
776 		switch_uuid_get(&uuid);
777 		switch_uuid_format(acs->uuid_str, &uuid);
778 		switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool);
779 
780 		ei_x_encode_tuple_header(rbuf, 2);
781 		ei_x_encode_atom(rbuf, "ok");
782 		_ei_x_encode_string(rbuf, acs->uuid_str);
783 	}
784 
785 	switch_safe_free(arg);
786 
787 	return SWITCH_STATUS_SUCCESS;
788 }
789 
handle_msg_sendevent(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)790 static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
791 {
792 	char ename[MAXATOMLEN + 1];
793 	char esname[MAXATOMLEN + 1];
794 	int headerlength;
795 
796 	memset(esname, 0, MAXATOMLEN);
797 
798 	if (ei_decode_atom(buf->buff, &buf->index, ename) ||
799 		(!strncasecmp(ename, "CUSTOM", MAXATOMLEN) &&
800 		 ei_decode_atom(buf->buff, &buf->index, esname)) || ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
801 		ei_x_encode_tuple_header(rbuf, 2);
802 		ei_x_encode_atom(rbuf, "error");
803 		ei_x_encode_atom(rbuf, "badarg");
804 	} else {
805 		switch_event_types_t etype;
806 		if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) {
807 			switch_event_t *event;
808 			if ((strlen(esname) && switch_event_create_subclass(&event, etype, esname) == SWITCH_STATUS_SUCCESS) ||
809 				switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) {
810 				char key[1024];
811 				char *value;
812                                 int type;
813                                 int size;
814 				int i = 0;
815 				switch_bool_t fail = SWITCH_FALSE;
816 
817 				while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
818 					i++;
819 
820 					ei_get_type(buf->buff, &buf->index, &type, &size);
821 
822 					if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
823 						fail = SWITCH_TRUE;
824 						break;
825 					}
826 
827 					ei_get_type(buf->buff, &buf->index, &type, &size);
828 					value = malloc(size + 1);
829 
830 					if (ei_decode_string(buf->buff, &buf->index, value)) {
831        						fail = SWITCH_TRUE;
832 						break;
833 					}
834 
835 					if (!fail && !strcmp(key, "body")) {
836 						switch_safe_free(event->body);
837 						event->body = value;
838 					} else if (!fail)  {
839 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
840 					}
841 
842 					/* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */
843 				}
844 
845 				if (headerlength != i || fail) {
846 					ei_x_encode_tuple_header(rbuf, 2);
847 					ei_x_encode_atom(rbuf, "error");
848 					ei_x_encode_atom(rbuf, "badarg");
849 				} else {
850 					switch_event_fire(&event);
851 					ei_x_encode_atom(rbuf, "ok");
852 				}
853 			}
854 			/* If the event wasn't successfully fired, or failed for any other reason, then make sure not to leak it. */
855 			if ( event ) {
856 				switch_event_destroy(&event);
857 			}
858 		}
859 	}
860 	return SWITCH_STATUS_SUCCESS;
861 }
862 
handle_msg_sendmsg(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)863 static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
864 {
865 	char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1];
866 	int headerlength;
867 
868 	if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid) ||
869 		ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
870 		ei_x_encode_tuple_header(rbuf, 2);
871 		ei_x_encode_atom(rbuf, "error");
872 		ei_x_encode_atom(rbuf, "badarg");
873 	} else {
874 		switch_core_session_t *session;
875 		if (!zstr_buf(uuid) && (session = switch_core_session_locate(uuid))) {
876 			switch_event_t *event;
877 			if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
878 
879 				char key[1024];
880 				char *value;
881 				int type;
882 				int size;
883 				int i = 0;
884 				switch_bool_t fail = SWITCH_FALSE;
885 
886 				while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
887 					i++;
888 					ei_get_type(buf->buff, &buf->index, &type, &size);
889 
890 					if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
891 						fail = SWITCH_TRUE;
892 						break;
893 					}
894 
895 					ei_get_type(buf->buff, &buf->index, &type, &size);
896 					value = malloc(size + 1);
897 
898 					if (ei_decode_string(buf->buff, &buf->index, value)) {
899 						fail = SWITCH_TRUE;
900 						break;
901 					}
902 
903 					if (!fail) {
904 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
905 					}
906 				}
907 
908 				if (headerlength != i || fail) {
909 					ei_x_encode_tuple_header(rbuf, 2);
910 					ei_x_encode_atom(rbuf, "error");
911 					ei_x_encode_atom(rbuf, "badarg");
912 					switch_event_destroy(&event);
913 				} else {
914 					if (switch_core_session_queue_private_event(session, &event, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS) {
915 						ei_x_encode_atom(rbuf, "ok");
916 					} else {
917 						ei_x_encode_tuple_header(rbuf, 2);
918 						ei_x_encode_atom(rbuf, "error");
919 						ei_x_encode_atom(rbuf, "badmem");
920 						switch_event_destroy(&event);
921 					}
922 
923 				}
924 			}
925 			/* release the lock returned by session locate */
926 			switch_core_session_rwunlock(session);
927 
928 		} else {
929 			ei_x_encode_tuple_header(rbuf, 2);
930 			ei_x_encode_atom(rbuf, "error");
931 			ei_x_encode_atom(rbuf, "nosession");
932 		}
933 	}
934 	return SWITCH_STATUS_SUCCESS;
935 }
936 
handle_msg_bind(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)937 static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
938 {
939 	/* format is (result|config|directory|dialplan|phrases)  */
940 	char sectionstr[MAXATOMLEN];
941 	switch_xml_section_t section;
942 
943 	if (ei_decode_atom(buf->buff, &buf->index, sectionstr) || !(section = switch_xml_parse_section_string(sectionstr))) {
944 		ei_x_encode_tuple_header(rbuf, 2);
945 		ei_x_encode_atom(rbuf, "error");
946 		ei_x_encode_atom(rbuf, "badarg");
947 	} else {
948 		struct erlang_binding *binding, *ptr;
949 
950 		if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
951 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
952 			ei_x_encode_tuple_header(rbuf, 2);
953 			ei_x_encode_atom(rbuf, "error");
954 			ei_x_encode_atom(rbuf, "badmem");
955 		} else {
956 			binding->section = section;
957 			binding->process.type = ERLANG_PID;
958 			binding->process.pid = msg->from;
959 			binding->listener = listener;
960 
961 			switch_thread_rwlock_wrlock(mod_erlang_event_globals.bindings_rwlock);
962 
963 			for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
964 
965 			if (ptr) {
966 				ptr->next = binding;
967 			} else {
968 				bindings.head = binding;
969 			}
970 
971 			switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
972 			switch_thread_rwlock_unlock(mod_erlang_event_globals.bindings_rwlock);
973 
974 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
975 
976 			ei_link(listener, ei_self(listener->ec), &msg->from);
977 			ei_x_encode_atom(rbuf, "ok");
978 		}
979 	}
980 	return SWITCH_STATUS_SUCCESS;
981 }
982 
983 /* {handlecall,<uuid>,<handler process registered name>}
984    or
985    {handlecall,<uuid>} to send messages back to the sender
986  */
handle_msg_handlecall(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)987 static switch_status_t handle_msg_handlecall(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
988 {
989 	char reg_name[MAXATOMLEN];
990 	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
991 
992 	if (arity < 2 || arity > 3 ||
993 		(arity == 3 && ei_decode_atom(buf->buff, &buf->index, reg_name)) ||
994 		ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
995 		ei_x_encode_tuple_header(rbuf, 2);
996 		ei_x_encode_atom(rbuf, "error");
997 		ei_x_encode_atom(rbuf, "badarg");
998 	} else {
999 		switch_core_session_t *session;
1000 		if (!zstr_buf(uuid_str)) {
1001 			if ((session = switch_core_session_locate(uuid_str))) {
1002 				/* create a new session list element and attach it to this listener */
1003 				if ((arity == 2 && attach_call_to_pid(listener, &msg->from, session)) ||
1004 					(arity == 3 && attach_call_to_registered_process(listener, reg_name, session))) {
1005 					ei_x_encode_atom(rbuf, "ok");
1006 				} else {
1007 					ei_x_encode_tuple_header(rbuf, 2);
1008 					ei_x_encode_atom(rbuf, "error");
1009 					ei_x_encode_atom(rbuf, "session_attach_failed");
1010 				}
1011 				/* release the lock returned by session locate */
1012 				switch_core_session_rwunlock(session);
1013 			} else {
1014 				ei_x_encode_tuple_header(rbuf, 2);
1015 				ei_x_encode_atom(rbuf, "error");
1016 				ei_x_encode_atom(rbuf, "badsession");
1017 			}
1018 		} else {
1019 			ei_x_encode_tuple_header(rbuf, 2);
1020 			ei_x_encode_atom(rbuf, "error");
1021 			ei_x_encode_atom(rbuf, "baduuid");
1022 		}
1023 	}
1024 	return SWITCH_STATUS_SUCCESS;
1025 }
1026 
1027 /* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}}
1028    The {Ref,Pid} bit can be handled by handle_ref_tuple
1029  */
handle_msg_rpcresponse(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)1030 static switch_status_t handle_msg_rpcresponse(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
1031 {
1032 	int type, size, arity2, tmpindex;
1033 
1034 	ei_get_type(buf->buff, &buf->index, &type, &size);
1035 	switch (type) {
1036 	case ERL_SMALL_TUPLE_EXT:
1037 	case ERL_LARGE_TUPLE_EXT:
1038 		tmpindex = buf->index;
1039 		ei_decode_tuple_header(buf->buff, &tmpindex, &arity2);
1040 		return handle_ref_tuple(listener, msg, buf, rbuf);
1041 	default:
1042 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unknown rpc response\n");
1043 		break;
1044 	}
1045 	/* no reply */
1046 	return SWITCH_STATUS_FALSE;
1047 }
1048 
handle_msg_tuple(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1049 static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1050 {
1051 	char tupletag[MAXATOMLEN];
1052 	int arity;
1053 	switch_status_t ret = SWITCH_STATUS_SUCCESS;
1054 
1055 	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1056 	if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
1057 		ei_x_encode_tuple_header(rbuf, 2);
1058 		ei_x_encode_atom(rbuf, "error");
1059 		ei_x_encode_atom(rbuf, "badarg");
1060 	} else {
1061 		if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) {
1062 			ret = handle_msg_fetch_reply(listener, buf, rbuf);
1063 		} else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) {
1064 			ret = handle_msg_set_log_level(listener, arity, buf, rbuf);
1065 		} else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
1066 			ret = handle_msg_event(listener, arity, buf, rbuf);
1067 		} else if (!strncmp(tupletag, "filter", MAXATOMLEN)) {
1068 			ret = handle_msg_filter(listener, arity, buf, rbuf);
1069 		} else if (!strncmp(tupletag, "session_event", MAXATOMLEN)) {
1070 			ret = handle_msg_session_event(listener, msg, arity, buf, rbuf);
1071 		} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
1072 			ret = handle_msg_nixevent(listener, arity, buf, rbuf);
1073 		} else if (!strncmp(tupletag, "session_nixevent", MAXATOMLEN)) {
1074 			ret = handle_msg_session_nixevent(listener, msg, arity, buf, rbuf);
1075 		} else if (!strncmp(tupletag, "api", MAXATOMLEN)) {
1076 			ret = handle_msg_api(listener, msg, arity, buf, rbuf);
1077 		} else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
1078 			ret = handle_msg_bgapi(listener, msg, arity, buf, rbuf);
1079 		} else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) {
1080 			ret = handle_msg_sendevent(listener, arity, buf, rbuf);
1081 		} else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) {
1082 			ret = handle_msg_sendmsg(listener, arity, buf, rbuf);
1083 		} else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
1084 			ret = handle_msg_bind(listener, msg, buf, rbuf);
1085 		} else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
1086 			ret = handle_msg_handlecall(listener, msg, arity, buf, rbuf);
1087 		} else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
1088 			ret = handle_msg_rpcresponse(listener, msg, arity, buf, rbuf);
1089 		} else if (!strncmp(tupletag, "setevent", MAXATOMLEN)) {
1090 			ret = handle_msg_setevent(listener, msg, arity, buf, rbuf);
1091 		} else if (!strncmp(tupletag, "session_setevent", MAXATOMLEN)) {
1092 			ret = handle_msg_session_setevent(listener, msg, arity, buf, rbuf);
1093 		} else {
1094 			ei_x_encode_tuple_header(rbuf, 2);
1095 			ei_x_encode_atom(rbuf, "error");
1096 			ei_x_encode_atom(rbuf, "undef");
1097 		}
1098 	}
1099 	return ret;
1100 }
1101 
handle_msg_atom(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1102 static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1103 {
1104 	char atom[MAXATOMLEN];
1105 	switch_status_t ret = SWITCH_STATUS_SUCCESS;
1106 
1107 	if (ei_decode_atom(buf->buff, &buf->index, atom)) {
1108 		ei_x_encode_tuple_header(rbuf, 2);
1109 		ei_x_encode_atom(rbuf, "error");
1110 		ei_x_encode_atom(rbuf, "badarg");
1111 	} else if (!strncmp(atom, "nolog", MAXATOMLEN)) {
1112 		if (switch_test_flag(listener, LFLAG_LOG)) {
1113 			void *pop;
1114 			/*purge the log queue */
1115 			while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS);
1116 			switch_clear_flag_locked(listener, LFLAG_LOG);
1117 		}
1118 		ei_x_encode_atom(rbuf, "ok");
1119 	} else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
1120 		ei_link(listener, ei_self(listener->ec), &msg->from);
1121 		listener->log_process.type = ERLANG_PID;
1122 		memcpy(&listener->log_process.pid, &msg->from, sizeof(erlang_pid));
1123 		listener->level = SWITCH_LOG_DEBUG;
1124 		switch_set_flag(listener, LFLAG_LOG);
1125 		ei_x_encode_atom(rbuf, "ok");
1126 	} else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
1127 		ei_link(listener, ei_self(listener->ec), &msg->from);
1128 		listener->event_process.type = ERLANG_PID;
1129 		memcpy(&listener->event_process.pid, &msg->from, sizeof(erlang_pid));
1130 		if (!switch_test_flag(listener, LFLAG_EVENTS)) {
1131 			switch_set_flag_locked(listener, LFLAG_EVENTS);
1132 		}
1133 		ei_x_encode_atom(rbuf, "ok");
1134 	} else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
1135 		void *pop;
1136 		/*purge the event queue */
1137 		while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
1138 
1139 		if (switch_test_flag(listener, LFLAG_EVENTS)) {
1140 			uint8_t x = 0;
1141 			switch_clear_flag_locked(listener, LFLAG_EVENTS);
1142 
1143 			switch_thread_rwlock_wrlock(listener->event_rwlock);
1144 			for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
1145 				listener->event_list[x] = 0;
1146 			}
1147 
1148 			switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
1149 
1150 			switch_thread_rwlock_unlock(listener->event_rwlock);
1151 			ei_x_encode_atom(rbuf, "ok");
1152 		} else {
1153 			ei_x_encode_tuple_header(rbuf, 2);
1154 			ei_x_encode_atom(rbuf, "error");
1155 			ei_x_encode_atom(rbuf, "notlistening");
1156 		}
1157 	} else if (!strncmp(atom, "session_noevents", MAXATOMLEN)) {
1158 		session_elem_t *session;
1159 		if ((session = find_session_elem_by_pid(listener, &msg->from))) {
1160 			void *pop;
1161 			uint8_t x = 0;
1162 
1163 			/*purge the event queue */
1164 			while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
1165 
1166 			switch_thread_rwlock_wrlock(session->event_rwlock);
1167 			for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
1168 				session->event_list[x] = 0;
1169 			}
1170 			/* wipe the hash */
1171 			switch_core_hash_delete_multi(session->event_hash, NULL, NULL);
1172 			switch_thread_rwlock_unlock(session->event_rwlock);
1173 
1174 			switch_thread_rwlock_unlock(session->rwlock);
1175 
1176 			ei_x_encode_atom(rbuf, "ok");
1177 		} else {
1178 			ei_x_encode_tuple_header(rbuf, 2);
1179 			ei_x_encode_atom(rbuf, "error");
1180 			ei_x_encode_atom(rbuf, "notlistening");
1181 		}
1182 	} else if (!strncmp(atom, "exit", MAXATOMLEN)) {
1183 		ei_x_encode_atom(rbuf, "ok");
1184 		ret = SWITCH_STATUS_TERM;
1185 	} else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
1186 		ei_x_encode_tuple_header(rbuf, 2);
1187 		ei_x_encode_atom(rbuf, "ok");
1188 		ei_x_encode_pid(rbuf, ei_self(listener->ec));
1189 	} else if (!strncmp(atom, "link", MAXATOMLEN)) {
1190 		/* debugging */
1191 		ei_link(listener, ei_self(listener->ec), &msg->from);
1192 		ret = SWITCH_STATUS_FALSE;
1193 	} else {
1194 		ei_x_encode_tuple_header(rbuf, 2);
1195 		ei_x_encode_atom(rbuf, "error");
1196 		ei_x_encode_atom(rbuf, "undef");
1197 	}
1198 
1199 	return ret;
1200 }
1201 
1202 
handle_ref_tuple(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1203 static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1204 {
1205 	erlang_ref ref;
1206 	erlang_pid pid;
1207 	char hash[100];
1208 	int arity;
1209 	const void *key;
1210 	void *val;
1211 	session_elem_t *se;
1212 	switch_hash_index_t *iter;
1213 	int found = 0;
1214 
1215 	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1216 
1217 	if (ei_decode_ref(buf->buff, &buf->index, &ref)) {
1218 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid reference\n");
1219 		return SWITCH_STATUS_FALSE;
1220 	}
1221 
1222 	if (ei_decode_pid(buf->buff, &buf->index, &pid)) {
1223 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n");
1224 		return SWITCH_STATUS_FALSE;
1225 	}
1226 
1227 	ei_hash_ref(&ref, hash);
1228 
1229 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
1230 
1231 	switch_thread_rwlock_rdlock(listener->session_rwlock);
1232 	for (iter = switch_core_hash_first(listener->sessions); iter; iter = switch_core_hash_next(&iter)) {
1233 		switch_core_hash_this(iter, &key, NULL, &val);
1234 		se = (session_elem_t*)val;
1235 		if (switch_test_flag(se, LFLAG_WAITING_FOR_PID) && se->spawn_reply && !strncmp(se->spawn_reply->hash, hash, 100)) {
1236 
1237 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "found matching session for %s : %s\n", hash, se->uuid_str);
1238 
1239 			switch_mutex_lock(se->spawn_reply->mutex);
1240 
1241 			se->spawn_reply->pid = switch_core_alloc(se->pool, sizeof(erlang_pid));
1242 			switch_assert(se->spawn_reply->pid != NULL);
1243 			memcpy(se->spawn_reply->pid, &pid, sizeof(erlang_pid));
1244 
1245 			switch_thread_cond_signal(se->spawn_reply->ready_or_found);
1246 
1247 			switch_mutex_unlock(se->spawn_reply->mutex);
1248 
1249 			found++;
1250 
1251 			break;
1252 		}
1253 	}
1254 	switch_safe_free(iter);
1255 	switch_thread_rwlock_unlock(listener->session_rwlock);
1256 
1257 	if (found) {
1258 		return SWITCH_STATUS_FALSE;
1259 	}
1260 
1261 	ei_x_encode_tuple_header(rbuf, 2);
1262 	ei_x_encode_atom(rbuf, "error");
1263 	ei_x_encode_atom(rbuf, "notfound");
1264 
1265 	return SWITCH_STATUS_SUCCESS;
1266 }
1267 
1268 
1269 /* fake enough of the net_kernel module to be able to respond to net_adm:ping */
1270 /* {'$gen_call', {<cpx@freecpx.128.0>, #Ref<254770.4.0>}, {is_auth, cpx@freecpx} */
handle_net_kernel_msg(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1271 static switch_status_t handle_net_kernel_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1272 {
1273 	int version, size, type, arity;
1274 	char atom[MAXATOMLEN];
1275 	erlang_ref ref;
1276 	erlang_pid pid;
1277 
1278 	buf->index = 0;
1279 	ei_decode_version(buf->buff, &buf->index, &version);
1280 	ei_get_type(buf->buff, &buf->index, &type, &size);
1281 
1282 	if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1283 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1284 		return SWITCH_STATUS_FALSE;
1285 	}
1286 
1287 	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1288 
1289 	if (arity != 3) {
1290 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "wrong arity\n");
1291 		return SWITCH_STATUS_FALSE;
1292 	}
1293 
1294 	if (ei_decode_atom(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", MAXATOMLEN)) {
1295 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not gen_call\n");
1296 		return SWITCH_STATUS_FALSE;
1297 	}
1298 
1299 	ei_get_type(buf->buff, &buf->index, &type, &size);
1300 
1301 	if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1302 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1303 		return SWITCH_STATUS_FALSE;
1304 	}
1305 
1306 	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1307 
1308 	if (arity != 2) {
1309 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "wrong arity\n");
1310 		return SWITCH_STATUS_FALSE;
1311 	}
1312 
1313 	if (ei_decode_pid(buf->buff, &buf->index, &pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) {
1314 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "decoding pid and ref error\n");
1315 		return SWITCH_STATUS_FALSE;
1316 	}
1317 
1318 	ei_get_type(buf->buff, &buf->index, &type, &size);
1319 
1320 	if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1321 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1322 		return SWITCH_STATUS_FALSE;
1323 	}
1324 
1325 	ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1326 
1327 	if (arity != 2) {
1328 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "bad arity\n");
1329 		return SWITCH_STATUS_FALSE;
1330 	}
1331 
1332 	if (ei_decode_atom(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) {
1333 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not is_auth\n");
1334 		return SWITCH_STATUS_FALSE;
1335 	}
1336 
1337 	/* To ! {Tag, Reply} */
1338 	ei_x_encode_tuple_header(rbuf, 2);
1339 	ei_x_encode_ref(rbuf, &ref);
1340 	ei_x_encode_atom(rbuf, "yes");
1341 
1342 	switch_mutex_lock(listener->sock_mutex);
1343 	ei_send(listener->sockdes, &pid, rbuf->buff, rbuf->index);
1344 	switch_mutex_unlock(listener->sock_mutex);
1345 #ifdef EI_DEBUG
1346 	ei_x_print_msg(rbuf, &pid, 1);
1347 #endif
1348 
1349 	return SWITCH_STATUS_FALSE;
1350 }
1351 
1352 
handle_msg(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1353 int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1354 {
1355 	int type, type2, size, version, arity, tmpindex;
1356 	switch_status_t ret = SWITCH_STATUS_SUCCESS;
1357 
1358 	if (msg->msgtype == ERL_REG_SEND && !strncmp(msg->toname, "net_kernel", MAXATOMLEN)) {
1359 		/* try to respond to ping stuff */
1360 		ret = handle_net_kernel_msg(listener, msg, buf, rbuf);
1361 	} else {
1362 		buf->index = 0;
1363 		ei_decode_version(buf->buff, &buf->index, &version);
1364 		ei_get_type(buf->buff, &buf->index, &type, &size);
1365 
1366 		switch (type) {
1367 		case ERL_SMALL_TUPLE_EXT:
1368 		case ERL_LARGE_TUPLE_EXT:
1369 			tmpindex = buf->index;
1370 			ei_decode_tuple_header(buf->buff, &tmpindex, &arity);
1371 			ei_get_type(buf->buff, &tmpindex, &type2, &size);
1372 
1373 			switch (type2) {
1374 			case ERL_ATOM_EXT:
1375 				ret = handle_msg_tuple(listener, msg, buf, rbuf);
1376 				break;
1377 			case ERL_REFERENCE_EXT:
1378 			case ERL_NEW_REFERENCE_EXT:
1379 				ret = handle_ref_tuple(listener, msg, buf, rbuf);
1380 				break;
1381 			default:
1382 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d %d\n", type, type2);
1383 				/* some other kind of erlang term */
1384 				ei_x_encode_tuple_header(rbuf, 2);
1385 				ei_x_encode_atom(rbuf, "error");
1386 				ei_x_encode_atom(rbuf, "undef");
1387 				break;
1388 			}
1389 
1390 			break;
1391 
1392 		case ERL_ATOM_EXT:
1393 			ret = handle_msg_atom(listener, msg, buf, rbuf);
1394 			break;
1395 
1396 		default:
1397 			/* some other kind of erlang term */
1398 			ei_x_encode_tuple_header(rbuf, 2);
1399 			ei_x_encode_atom(rbuf, "error");
1400 			ei_x_encode_atom(rbuf, "undef");
1401 			break;
1402 		}
1403 	}
1404 
1405 	if (SWITCH_STATUS_FALSE == ret) {
1406 		return 0;
1407 	} else if (rbuf->index > 1) {
1408 		switch_mutex_lock(listener->sock_mutex);
1409 		ei_send(listener->sockdes, &msg->from, rbuf->buff, rbuf->index);
1410 		switch_mutex_unlock(listener->sock_mutex);
1411 #ifdef EI_DEBUG
1412 		ei_x_print_msg(rbuf, &msg->from, 1);
1413 #endif
1414 		return SWITCH_STATUS_SUCCESS != ret;
1415 
1416 	} else {
1417 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n");
1418 		return 0;
1419 	}
1420 }
1421 
1422 /* For Emacs:
1423  * Local Variables:
1424  * mode:c
1425  * indent-tabs-mode:t
1426  * tab-width:4
1427  * c-basic-offset:4
1428  * End:
1429  * For VIM:
1430  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1431  */
1432