1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 *
26 * Anthony Minessale II <anthm@freeswitch.org>
27 * Andrew Thompson <andrew@hijacked.us>
28 * Rob Charlton <rob.charlton@savageminds.com>
29 * Darren Schreiber <d@d-man.org>
30 * Mike Jerris <mike@jerris.com>
31 * Tamas Cseke <tamas.cseke@virtual-call-center.eu>
32 *
33 *
34 * handle_msg.c -- handle messages received from erlang nodes
35 *
36 */
37 #include <switch.h>
38 #include <ei.h>
39 #include "mod_erlang_event.h"
40
41 static char *MARKER = "1";
42
43 static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf);
44
api_exec(switch_thread_t * thread,void * obj)45 static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
46 {
47 switch_bool_t r = SWITCH_TRUE;
48 struct api_command_struct *acs = (struct api_command_struct *) obj;
49 switch_stream_handle_t stream = { 0 };
50 char *reply, *freply = NULL;
51 switch_status_t status;
52
53 if (!acs) {
54 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n");
55 return NULL;
56 }
57
58 if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) {
59 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n");
60 goto done;
61 }
62
63 SWITCH_STANDARD_STREAM(stream);
64
65 if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) {
66 reply = stream.data;
67 } else {
68 freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd);
69 reply = freply;
70 r = SWITCH_FALSE;
71 }
72
73 if (!reply) {
74 reply = "Command returned no output!";
75 r = SWITCH_FALSE;
76 }
77
78 if (*reply == '-')
79 r = SWITCH_FALSE;
80
81 if (acs->bg) {
82 switch_event_t *event;
83
84 if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) {
85 ei_x_buff ebuf;
86
87 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str);
88 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd);
89
90 ei_x_new_with_version(&ebuf);
91
92 if (acs->arg) {
93 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg);
94 }
95
96 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false");
97 switch_event_add_body(event, "%s", reply);
98
99 switch_event_fire(&event);
100
101 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node);
102
103 ei_x_encode_tuple_header(&ebuf, 3);
104
105 if (r)
106 ei_x_encode_atom(&ebuf, "bgok");
107 else
108 ei_x_encode_atom(&ebuf, "bgerror");
109
110 _ei_x_encode_string(&ebuf, acs->uuid_str);
111 _ei_x_encode_string(&ebuf, reply);
112
113 switch_mutex_lock(acs->listener->sock_mutex);
114 ei_send(acs->listener->sockdes, &acs->pid, ebuf.buff, ebuf.index);
115 switch_mutex_unlock(acs->listener->sock_mutex);
116 #ifdef EI_DEBUG
117 ei_x_print_msg(&ebuf, &acs->pid, 1);
118 #endif
119
120 ei_x_free(&ebuf);
121 }
122 } else {
123 ei_x_buff rbuf;
124 ei_x_new_with_version(&rbuf);
125 ei_x_encode_tuple_header(&rbuf, 2);
126
127 if (!strlen(reply)) {
128 reply = "Command returned no output!";
129 r = SWITCH_FALSE;
130 }
131
132 if (r) {
133 ei_x_encode_atom(&rbuf, "ok");
134 } else {
135 ei_x_encode_atom(&rbuf, "error");
136 }
137
138 _ei_x_encode_string(&rbuf, reply);
139
140
141 switch_mutex_lock(acs->listener->sock_mutex);
142 ei_send(acs->listener->sockdes, &acs->pid, rbuf.buff, rbuf.index);
143 switch_mutex_unlock(acs->listener->sock_mutex);
144 #ifdef EI_DEBUG
145 ei_x_print_msg(&rbuf, &acs->pid, 1);
146 #endif
147
148 ei_x_free(&rbuf);
149 }
150
151 switch_safe_free(stream.data);
152 switch_safe_free(freply);
153
154 if (acs->listener->rwlock) {
155 switch_thread_rwlock_unlock(acs->listener->rwlock);
156 }
157
158 done:
159 if (acs->bg) {
160 switch_memory_pool_t *pool = acs->pool;
161 acs = NULL;
162 switch_core_destroy_memory_pool(&pool);
163 pool = NULL;
164 }
165 return NULL;
166
167 }
168
handle_msg_fetch_reply(listener_t * listener,ei_x_buff * buf,ei_x_buff * rbuf)169 static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * buf, ei_x_buff * rbuf)
170 {
171 char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
172 fetch_reply_t *p;
173
174 if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
175 ei_x_encode_tuple_header(rbuf, 2);
176 ei_x_encode_atom(rbuf, "error");
177 ei_x_encode_atom(rbuf, "badarg");
178 } else {
179
180 /* reply mutex is locked */
181 if ((p = find_fetch_reply(uuid_str))) {
182 switch (p->state) {
183 case reply_waiting:
184 {
185 /* clone the reply so it doesn't get destroyed on us */
186 ei_x_buff *nbuf = malloc(sizeof(*nbuf));
187 nbuf->buff = malloc(buf->buffsz);
188 memcpy(nbuf->buff, buf->buff, buf->buffsz);
189 nbuf->index = buf->index;
190 nbuf->buffsz = buf->buffsz;
191
192 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got reply for %s\n", uuid_str);
193
194 /* copy info into the reply struct */
195 p->state = reply_found;
196 p->reply = nbuf;
197 strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
198
199 /* signal waiting thread that its time to wake up */
200 switch_thread_cond_signal(p->ready_or_found);
201 /* reply OK */
202 ei_x_encode_tuple_header(rbuf, 2);
203 ei_x_encode_atom(rbuf, "ok");
204 _ei_x_encode_string(rbuf, uuid_str);
205 break;
206 };
207 case reply_found:
208 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for already complete request %s\n", uuid_str);
209 ei_x_encode_tuple_header(rbuf, 3);
210 ei_x_encode_atom(rbuf, "error");
211 _ei_x_encode_string(rbuf, uuid_str);
212 ei_x_encode_atom(rbuf, "duplicate_response");
213 break;
214 case reply_timeout:
215 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reply for timed out request %s\n", uuid_str);
216 ei_x_encode_tuple_header(rbuf, 3);
217 ei_x_encode_atom(rbuf, "error");
218 _ei_x_encode_string(rbuf, uuid_str);
219 ei_x_encode_atom(rbuf, "timeout");
220 break;
221 case reply_not_ready:
222 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Request %s is not ready?!\n", uuid_str);
223 ei_x_encode_tuple_header(rbuf, 3);
224 ei_x_encode_atom(rbuf, "error");
225 _ei_x_encode_string(rbuf, uuid_str);
226 ei_x_encode_atom(rbuf, "not_ready");
227 break;
228 }
229
230 switch_mutex_unlock(p->mutex);
231 } else {
232 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Could not find request for reply %s\n", uuid_str);
233 ei_x_encode_tuple_header(rbuf, 2);
234 ei_x_encode_atom(rbuf, "error");
235 ei_x_encode_atom(rbuf, "invalid_uuid");
236 }
237 }
238
239 return SWITCH_STATUS_SUCCESS;
240 }
241
handle_msg_set_log_level(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)242 static switch_status_t handle_msg_set_log_level(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
243 {
244 switch_log_level_t ltype = SWITCH_LOG_DEBUG;
245 char loglevelstr[MAXATOMLEN];
246 if (arity != 2 || ei_decode_atom(buf->buff, &buf->index, loglevelstr)) {
247 ei_x_encode_tuple_header(rbuf, 2);
248 ei_x_encode_atom(rbuf, "error");
249 ei_x_encode_atom(rbuf, "badarg");
250 } else {
251 ltype = switch_log_str2level(loglevelstr);
252
253 if (ltype && ltype != SWITCH_LOG_INVALID) {
254 listener->level = ltype;
255 ei_x_encode_atom(rbuf, "ok");
256 } else {
257 ei_x_encode_tuple_header(rbuf, 2);
258 ei_x_encode_atom(rbuf, "error");
259 ei_x_encode_atom(rbuf, "badarg");
260 }
261 }
262 return SWITCH_STATUS_SUCCESS;
263 }
264
handle_msg_event(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)265 static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
266 {
267 char atom[MAXATOMLEN];
268
269 if (arity == 1) {
270 ei_x_encode_tuple_header(rbuf, 2);
271 ei_x_encode_atom(rbuf, "error");
272 ei_x_encode_atom(rbuf, "badarg");
273 } else {
274 int custom = 0;
275 switch_event_types_t type;
276 int i = 0;
277
278 if (!switch_test_flag(listener, LFLAG_EVENTS)) {
279 switch_set_flag_locked(listener, LFLAG_EVENTS);
280 }
281
282 switch_thread_rwlock_wrlock(listener->event_rwlock);
283
284 for (i = 1; i < arity; i++) {
285 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
286
287 if (custom) {
288 switch_core_hash_insert(listener->event_hash, atom, MARKER);
289 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
290 if (type == SWITCH_EVENT_ALL) {
291 uint32_t x = 0;
292
293 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n");
294 for (x = 0; x < SWITCH_EVENT_ALL; x++) {
295 listener->event_list[x] = 1;
296 }
297 }
298 if (type <= SWITCH_EVENT_ALL) {
299 listener->event_list[type] = 1;
300 }
301 if (type == SWITCH_EVENT_CUSTOM) {
302 custom++;
303 }
304
305 }
306 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
307 }
308 }
309 switch_thread_rwlock_unlock(listener->event_rwlock);
310
311 ei_x_encode_atom(rbuf, "ok");
312 }
313 return SWITCH_STATUS_SUCCESS;
314 }
315
handle_msg_filter(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)316 static switch_status_t handle_msg_filter(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
317 {
318 char atom[MAXATOMLEN];
319 char reply[MAXATOMLEN]= "";
320 char *header_name = NULL;
321 char *header_val = NULL;
322
323 if (arity == 1) {
324 ei_x_encode_tuple_header(rbuf, 2);
325 ei_x_encode_atom(rbuf, "error");
326 ei_x_encode_atom(rbuf, "badarg");
327 } else {
328 int i = 0;
329
330 ei_x_encode_tuple_header(rbuf, 2);
331 ei_x_encode_atom(rbuf, "filter_command_processing_log");
332 ei_x_encode_list_header(rbuf, arity - 1);
333
334 switch_thread_rwlock_wrlock(listener->event_rwlock);
335
336 switch_mutex_lock(listener->filter_mutex);
337 if (!listener->filters) {
338 switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
339 switch_clear_flag(listener->filters, EF_UNIQ_HEADERS);
340 }
341
342 for (i = 1; i < arity; i++) {
343 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
344 header_name=atom;
345
346 while (header_name && *header_name && *header_name == ' ')
347 header_name++;
348
349 if ((header_val = strchr(atom, ' '))) {
350 *header_val++ = '\0';
351 }
352
353 if (!strcasecmp(header_name, "delete") && header_val) {
354 header_name = header_val;
355 if ((header_val = strchr(header_name, ' '))) {
356 *header_val++ = '\0';
357 }
358 if (!strcasecmp(header_name, "all")) {
359 switch_event_destroy(&listener->filters);
360 switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
361 } else {
362 switch_event_del_header_val(listener->filters, header_name, header_val);
363 }
364 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter deleted. [%s]=[%s]", header_name, switch_str_nil(header_val));
365 ei_x_encode_tuple_header(rbuf, 3);
366 _ei_x_encode_string(rbuf, "deleted");
367 _ei_x_encode_string(rbuf, header_name);
368 _ei_x_encode_string(rbuf, switch_str_nil(header_val));
369 } else if (header_val) {
370 if (!strcasecmp(header_name, "add")) {
371 header_name = header_val;
372 if ((header_val = strchr(header_name, ' '))) {
373 *header_val++ = '\0';
374 }
375 }
376 switch_event_add_header_string(listener->filters, SWITCH_STACK_BOTTOM, header_name, header_val);
377 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter added. [%s]=[%s]", header_name, header_val);
378 ei_x_encode_tuple_header(rbuf, 3);
379 _ei_x_encode_string(rbuf, "added");
380 _ei_x_encode_string(rbuf, header_name);
381 _ei_x_encode_string(rbuf, header_val);
382 } else {
383 switch_snprintf(reply, MAXATOMLEN, "-ERR invalid syntax");
384 ei_x_encode_atom(rbuf, "-ERR invalid syntax");
385 }
386 }
387 }
388
389 switch_mutex_unlock(listener->filter_mutex);
390 switch_thread_rwlock_unlock(listener->event_rwlock);
391
392 ei_x_encode_empty_list(rbuf);
393 }
394 return SWITCH_STATUS_SUCCESS;
395 }
396
handle_msg_session_event(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)397 static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
398 {
399 char atom[MAXATOMLEN];
400
401 if (arity == 1) {
402 ei_x_encode_tuple_header(rbuf, 2);
403 ei_x_encode_atom(rbuf, "error");
404 ei_x_encode_atom(rbuf, "badarg");
405 } else {
406 session_elem_t *session;
407 if ((session = find_session_elem_by_pid(listener, &msg->from))) {
408
409 int custom = 0;
410 switch_event_types_t type;
411 int i = 0;
412
413 switch_thread_rwlock_wrlock(session->event_rwlock);
414
415 for (i = 1; i < arity; i++) {
416 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
417
418 if (custom) {
419 switch_core_hash_insert(session->event_hash, atom, MARKER);
420 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
421 if (type == SWITCH_EVENT_ALL) {
422 uint32_t x = 0;
423
424 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled for %s\n", session->uuid_str);
425 for (x = 0; x < SWITCH_EVENT_ALL; x++) {
426 session->event_list[x] = 1;
427 }
428 }
429 if (type <= SWITCH_EVENT_ALL) {
430 session->event_list[type] = 1;
431 }
432 if (type == SWITCH_EVENT_CUSTOM) {
433 custom++;
434 }
435
436 }
437 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
438 }
439 }
440
441 switch_thread_rwlock_unlock(session->event_rwlock);
442 switch_thread_rwlock_unlock(session->rwlock);
443
444 ei_x_encode_atom(rbuf, "ok");
445 } else {
446 ei_x_encode_tuple_header(rbuf, 2);
447 ei_x_encode_atom(rbuf, "error");
448 ei_x_encode_atom(rbuf, "notlistening");
449 }
450 }
451 return SWITCH_STATUS_SUCCESS;
452 }
453
handle_msg_nixevent(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)454 static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
455 {
456 char atom[MAXATOMLEN];
457
458 if (arity == 1) {
459 ei_x_encode_tuple_header(rbuf, 2);
460 ei_x_encode_atom(rbuf, "error");
461 ei_x_encode_atom(rbuf, "badarg");
462 } else {
463 int custom = 0;
464 int i = 0;
465 switch_event_types_t type;
466
467 switch_thread_rwlock_wrlock(listener->event_rwlock);
468
469 for (i = 1; i < arity; i++) {
470 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
471
472 if (custom) {
473
474 switch_core_hash_delete(listener->event_hash, atom);
475 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
476 uint32_t x = 0;
477
478 if (type == SWITCH_EVENT_CUSTOM) {
479 custom++;
480 } else if (type == SWITCH_EVENT_ALL) {
481 for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
482 listener->event_list[x] = 0;
483 }
484 } else {
485 if (listener->event_list[SWITCH_EVENT_ALL]) {
486 listener->event_list[SWITCH_EVENT_ALL] = 0;
487 for (x = 0; x < SWITCH_EVENT_ALL; x++) {
488 listener->event_list[x] = 1;
489 }
490 }
491 listener->event_list[type] = 0;
492 }
493 }
494 }
495 }
496
497 switch_thread_rwlock_unlock(listener->event_rwlock);
498 ei_x_encode_atom(rbuf, "ok");
499 }
500 return SWITCH_STATUS_SUCCESS;
501 }
502
handle_msg_session_nixevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)503 static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
504 {
505 char atom[MAXATOMLEN];
506
507 if (arity == 1) {
508 ei_x_encode_tuple_header(rbuf, 2);
509 ei_x_encode_atom(rbuf, "error");
510 ei_x_encode_atom(rbuf, "badarg");
511 } else {
512 session_elem_t *session;
513 if ((session = find_session_elem_by_pid(listener, &msg->from))) {
514 int custom = 0;
515 int i = 0;
516 switch_event_types_t type;
517
518 switch_thread_rwlock_wrlock(session->event_rwlock);
519
520 for (i = 1; i < arity; i++) {
521 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
522
523 if (custom) {
524 switch_core_hash_delete(session->event_hash, atom);
525 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
526 uint32_t x = 0;
527
528 if (type == SWITCH_EVENT_CUSTOM) {
529 custom++;
530 } else if (type == SWITCH_EVENT_ALL) {
531 for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
532 session->event_list[x] = 0;
533 }
534 } else {
535 if (session->event_list[SWITCH_EVENT_ALL]) {
536 session->event_list[SWITCH_EVENT_ALL] = 0;
537 for (x = 0; x < SWITCH_EVENT_ALL; x++) {
538 session->event_list[x] = 1;
539 }
540 }
541 session->event_list[type] = 0;
542 }
543 }
544 }
545 }
546 switch_thread_rwlock_unlock(session->event_rwlock);
547 switch_thread_rwlock_unlock(session->rwlock);
548
549 ei_x_encode_atom(rbuf, "ok");
550 } else { /* no session for this pid */
551 ei_x_encode_tuple_header(rbuf, 2);
552 ei_x_encode_atom(rbuf, "error");
553 ei_x_encode_atom(rbuf, "notlistening");
554 }
555 }
556 return SWITCH_STATUS_SUCCESS;
557 }
558
559 // Nix's all events, then sets up a listener for the given ones.
560 // meant to ensure that no events are missed during this common operation.
handle_msg_setevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)561 static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
562 {
563 char atom[MAXATOMLEN];
564
565 if(arity == 1) {
566 ei_x_encode_tuple_header(rbuf, 2);
567 ei_x_encode_atom(rbuf, "error");
568 ei_x_encode_atom(rbuf, "badarg");
569 } else {
570 uint8_t event_list[SWITCH_EVENT_ALL + 1];
571 switch_hash_t *event_hash;
572 uint32_t x = 0;
573 int custom = 0;
574 switch_event_types_t type;
575 int i = 0;
576
577 /* clear any previous event registrations */
578 for(x = 0; x <= SWITCH_EVENT_ALL; x++) {
579 event_list[x] = 0;
580 }
581
582 /* create new hash */
583 switch_core_hash_init(&event_hash);
584
585 if(!switch_test_flag(listener, LFLAG_EVENTS)) {
586 switch_set_flag_locked(listener, LFLAG_EVENTS);
587 }
588
589 for(i = 1; i < arity; i++){
590 if(!ei_decode_atom(buf->buff, &buf->index, atom)){
591
592 if(custom){
593 switch_core_hash_insert(event_hash, atom, MARKER);
594 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
595 if (type == SWITCH_EVENT_ALL) {
596 ei_x_encode_tuple_header(rbuf, 2);
597 ei_x_encode_atom(rbuf, "error");
598 ei_x_encode_atom(rbuf, "badarg");
599 break;
600 }
601 if (type <= SWITCH_EVENT_ALL) {
602 event_list[type] = 1;
603 }
604 if (type == SWITCH_EVENT_CUSTOM) {
605 custom++;
606 }
607 }
608 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
609 }
610 }
611 /* update the event subscriptions with the new ones */
612 switch_thread_rwlock_wrlock(listener->event_rwlock);
613 memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
614 switch_core_hash_destroy(&listener->event_hash);
615 listener->event_hash = event_hash;
616 switch_thread_rwlock_unlock(listener->event_rwlock);
617
618 /* TODO - we should flush any non-matching events from the queue */
619 ei_x_encode_atom(rbuf, "ok");
620 }
621 return SWITCH_STATUS_SUCCESS;
622 }
623
handle_msg_session_setevent(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)624 static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
625 {
626 char atom[MAXATOMLEN];
627
628 if (arity == 1){
629 ei_x_encode_tuple_header(rbuf, 2);
630 ei_x_encode_atom(rbuf, "error");
631 ei_x_encode_atom(rbuf, "badarg");
632 } else {
633 session_elem_t *session;
634 if ((session = find_session_elem_by_pid(listener, &msg->from))) {
635 uint8_t event_list[SWITCH_EVENT_ALL + 1];
636 switch_hash_t *event_hash;
637 int custom = 0;
638 int i = 0;
639 switch_event_types_t type;
640 uint32_t x = 0;
641
642 /* clear any previous event registrations */
643 for (x = 0; x <= SWITCH_EVENT_ALL; x++){
644 event_list[x] = 0;
645 }
646
647 /* create new hash */
648 switch_core_hash_init(&event_hash);
649
650 for (i = 1; i < arity; i++){
651 if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
652 if (custom) {
653 switch_core_hash_insert(event_hash, atom, MARKER);
654 } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
655 if (type == SWITCH_EVENT_ALL) {
656 ei_x_encode_tuple_header(rbuf, 1);
657 ei_x_encode_atom(rbuf, "error");
658 ei_x_encode_atom(rbuf, "badarg");
659 break;
660 }
661 if (type <= SWITCH_EVENT_ALL) {
662 event_list[type] = 1;
663 }
664 if (type == SWITCH_EVENT_CUSTOM) {
665 custom++;
666 }
667 }
668 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
669 }
670 }
671
672 /* update the event subscriptions with the new ones */
673 switch_thread_rwlock_wrlock(session->event_rwlock);
674 memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
675 /* wipe the old hash, and point the pointer at the new one */
676 switch_core_hash_destroy(&session->event_hash);
677 session->event_hash = event_hash;
678 switch_thread_rwlock_unlock(session->event_rwlock);
679
680 switch_thread_rwlock_unlock(session->rwlock);
681
682 /* TODO - we should flush any non-matching events from the queue */
683 ei_x_encode_atom(rbuf, "ok");
684 } else { /* no session for this pid */
685 ei_x_encode_tuple_header(rbuf, 2);
686 ei_x_encode_atom(rbuf, "error");
687 ei_x_encode_atom(rbuf, "notlistening");
688 }
689 }
690 return SWITCH_STATUS_SUCCESS;
691 }
692
handle_msg_api(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)693 static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
694 {
695 char api_cmd[MAXATOMLEN];
696 int type;
697 int size;
698 char *arg;
699 switch_bool_t fail = SWITCH_FALSE;
700
701 if (arity < 3) {
702 fail = SWITCH_TRUE;
703 }
704
705 ei_get_type(buf->buff, &buf->index, &type, &size);
706
707 if ((size > (sizeof(api_cmd) - 1)) || ei_decode_atom(buf->buff, &buf->index, api_cmd)) {
708 fail = SWITCH_TRUE;
709 }
710
711 ei_get_type(buf->buff, &buf->index, &type, &size);
712 arg = malloc(size + 1);
713
714 if (ei_decode_string_or_binary(buf->buff, &buf->index, size, arg)) {
715 fail = SWITCH_TRUE;
716 }
717
718 if (!fail) {
719 struct api_command_struct acs = { 0 };
720 acs.listener = listener;
721 acs.api_cmd = api_cmd;
722 acs.arg = arg;
723 acs.bg = 0;
724 acs.pid = msg->from;
725 api_exec(NULL, (void *) &acs);
726
727 switch_safe_free(arg);
728
729 /* don't reply */
730 return SWITCH_STATUS_FALSE;
731 } else {
732 ei_x_encode_tuple_header(rbuf, 2);
733 ei_x_encode_atom(rbuf, "error");
734 ei_x_encode_atom(rbuf, "badarg");
735 return SWITCH_STATUS_SUCCESS;
736
737 }
738 }
739
handle_msg_bgapi(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)740 static switch_status_t handle_msg_bgapi(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
741 {
742 char api_cmd[MAXATOMLEN];
743 char *arg = NULL;
744 int size, type;
745
746 if (arity < 3 ||
747 ei_decode_atom(buf->buff, &buf->index, api_cmd) ||
748 ei_get_type(buf->buff, &buf->index, &type, &size) ||
749 !(arg = malloc(size + 1)) ||
750 ei_decode_string_or_binary(buf->buff, &buf->index, size, arg)) {
751
752 ei_x_encode_tuple_header(rbuf, 2);
753 ei_x_encode_atom(rbuf, "error");
754 ei_x_encode_atom(rbuf, "badarg");
755 } else {
756 struct api_command_struct *acs = NULL;
757 switch_memory_pool_t *pool;
758 switch_thread_t *thread;
759 switch_threadattr_t *thd_attr = NULL;
760 switch_uuid_t uuid;
761
762 switch_core_new_memory_pool(&pool);
763 acs = switch_core_alloc(pool, sizeof(*acs));
764 switch_assert(acs);
765 acs->pool = pool;
766 acs->listener = listener;
767 acs->api_cmd = switch_core_strdup(acs->pool, api_cmd);
768 acs->arg = switch_core_strdup(acs->pool, arg);
769 acs->bg = 1;
770 acs->pid = msg->from;
771
772 switch_threadattr_create(&thd_attr, acs->pool);
773 switch_threadattr_detach_set(thd_attr, 1);
774 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
775
776 switch_uuid_get(&uuid);
777 switch_uuid_format(acs->uuid_str, &uuid);
778 switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool);
779
780 ei_x_encode_tuple_header(rbuf, 2);
781 ei_x_encode_atom(rbuf, "ok");
782 _ei_x_encode_string(rbuf, acs->uuid_str);
783 }
784
785 switch_safe_free(arg);
786
787 return SWITCH_STATUS_SUCCESS;
788 }
789
handle_msg_sendevent(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)790 static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
791 {
792 char ename[MAXATOMLEN + 1];
793 char esname[MAXATOMLEN + 1];
794 int headerlength;
795
796 memset(esname, 0, MAXATOMLEN);
797
798 if (ei_decode_atom(buf->buff, &buf->index, ename) ||
799 (!strncasecmp(ename, "CUSTOM", MAXATOMLEN) &&
800 ei_decode_atom(buf->buff, &buf->index, esname)) || ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
801 ei_x_encode_tuple_header(rbuf, 2);
802 ei_x_encode_atom(rbuf, "error");
803 ei_x_encode_atom(rbuf, "badarg");
804 } else {
805 switch_event_types_t etype;
806 if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) {
807 switch_event_t *event;
808 if ((strlen(esname) && switch_event_create_subclass(&event, etype, esname) == SWITCH_STATUS_SUCCESS) ||
809 switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) {
810 char key[1024];
811 char *value;
812 int type;
813 int size;
814 int i = 0;
815 switch_bool_t fail = SWITCH_FALSE;
816
817 while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
818 i++;
819
820 ei_get_type(buf->buff, &buf->index, &type, &size);
821
822 if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
823 fail = SWITCH_TRUE;
824 break;
825 }
826
827 ei_get_type(buf->buff, &buf->index, &type, &size);
828 value = malloc(size + 1);
829
830 if (ei_decode_string(buf->buff, &buf->index, value)) {
831 fail = SWITCH_TRUE;
832 break;
833 }
834
835 if (!fail && !strcmp(key, "body")) {
836 switch_safe_free(event->body);
837 event->body = value;
838 } else if (!fail) {
839 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
840 }
841
842 /* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */
843 }
844
845 if (headerlength != i || fail) {
846 ei_x_encode_tuple_header(rbuf, 2);
847 ei_x_encode_atom(rbuf, "error");
848 ei_x_encode_atom(rbuf, "badarg");
849 } else {
850 switch_event_fire(&event);
851 ei_x_encode_atom(rbuf, "ok");
852 }
853 }
854 /* If the event wasn't successfully fired, or failed for any other reason, then make sure not to leak it. */
855 if ( event ) {
856 switch_event_destroy(&event);
857 }
858 }
859 }
860 return SWITCH_STATUS_SUCCESS;
861 }
862
handle_msg_sendmsg(listener_t * listener,int arity,ei_x_buff * buf,ei_x_buff * rbuf)863 static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
864 {
865 char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1];
866 int headerlength;
867
868 if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid) ||
869 ei_decode_list_header(buf->buff, &buf->index, &headerlength)) {
870 ei_x_encode_tuple_header(rbuf, 2);
871 ei_x_encode_atom(rbuf, "error");
872 ei_x_encode_atom(rbuf, "badarg");
873 } else {
874 switch_core_session_t *session;
875 if (!zstr_buf(uuid) && (session = switch_core_session_locate(uuid))) {
876 switch_event_t *event;
877 if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) {
878
879 char key[1024];
880 char *value;
881 int type;
882 int size;
883 int i = 0;
884 switch_bool_t fail = SWITCH_FALSE;
885
886 while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) {
887 i++;
888 ei_get_type(buf->buff, &buf->index, &type, &size);
889
890 if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) {
891 fail = SWITCH_TRUE;
892 break;
893 }
894
895 ei_get_type(buf->buff, &buf->index, &type, &size);
896 value = malloc(size + 1);
897
898 if (ei_decode_string(buf->buff, &buf->index, value)) {
899 fail = SWITCH_TRUE;
900 break;
901 }
902
903 if (!fail) {
904 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value);
905 }
906 }
907
908 if (headerlength != i || fail) {
909 ei_x_encode_tuple_header(rbuf, 2);
910 ei_x_encode_atom(rbuf, "error");
911 ei_x_encode_atom(rbuf, "badarg");
912 switch_event_destroy(&event);
913 } else {
914 if (switch_core_session_queue_private_event(session, &event, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS) {
915 ei_x_encode_atom(rbuf, "ok");
916 } else {
917 ei_x_encode_tuple_header(rbuf, 2);
918 ei_x_encode_atom(rbuf, "error");
919 ei_x_encode_atom(rbuf, "badmem");
920 switch_event_destroy(&event);
921 }
922
923 }
924 }
925 /* release the lock returned by session locate */
926 switch_core_session_rwunlock(session);
927
928 } else {
929 ei_x_encode_tuple_header(rbuf, 2);
930 ei_x_encode_atom(rbuf, "error");
931 ei_x_encode_atom(rbuf, "nosession");
932 }
933 }
934 return SWITCH_STATUS_SUCCESS;
935 }
936
handle_msg_bind(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)937 static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
938 {
939 /* format is (result|config|directory|dialplan|phrases) */
940 char sectionstr[MAXATOMLEN];
941 switch_xml_section_t section;
942
943 if (ei_decode_atom(buf->buff, &buf->index, sectionstr) || !(section = switch_xml_parse_section_string(sectionstr))) {
944 ei_x_encode_tuple_header(rbuf, 2);
945 ei_x_encode_atom(rbuf, "error");
946 ei_x_encode_atom(rbuf, "badarg");
947 } else {
948 struct erlang_binding *binding, *ptr;
949
950 if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) {
951 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
952 ei_x_encode_tuple_header(rbuf, 2);
953 ei_x_encode_atom(rbuf, "error");
954 ei_x_encode_atom(rbuf, "badmem");
955 } else {
956 binding->section = section;
957 binding->process.type = ERLANG_PID;
958 binding->process.pid = msg->from;
959 binding->listener = listener;
960
961 switch_thread_rwlock_wrlock(mod_erlang_event_globals.bindings_rwlock);
962
963 for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
964
965 if (ptr) {
966 ptr->next = binding;
967 } else {
968 bindings.head = binding;
969 }
970
971 switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
972 switch_thread_rwlock_unlock(mod_erlang_event_globals.bindings_rwlock);
973
974 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
975
976 ei_link(listener, ei_self(listener->ec), &msg->from);
977 ei_x_encode_atom(rbuf, "ok");
978 }
979 }
980 return SWITCH_STATUS_SUCCESS;
981 }
982
983 /* {handlecall,<uuid>,<handler process registered name>}
984 or
985 {handlecall,<uuid>} to send messages back to the sender
986 */
handle_msg_handlecall(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)987 static switch_status_t handle_msg_handlecall(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
988 {
989 char reg_name[MAXATOMLEN];
990 char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
991
992 if (arity < 2 || arity > 3 ||
993 (arity == 3 && ei_decode_atom(buf->buff, &buf->index, reg_name)) ||
994 ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
995 ei_x_encode_tuple_header(rbuf, 2);
996 ei_x_encode_atom(rbuf, "error");
997 ei_x_encode_atom(rbuf, "badarg");
998 } else {
999 switch_core_session_t *session;
1000 if (!zstr_buf(uuid_str)) {
1001 if ((session = switch_core_session_locate(uuid_str))) {
1002 /* create a new session list element and attach it to this listener */
1003 if ((arity == 2 && attach_call_to_pid(listener, &msg->from, session)) ||
1004 (arity == 3 && attach_call_to_registered_process(listener, reg_name, session))) {
1005 ei_x_encode_atom(rbuf, "ok");
1006 } else {
1007 ei_x_encode_tuple_header(rbuf, 2);
1008 ei_x_encode_atom(rbuf, "error");
1009 ei_x_encode_atom(rbuf, "session_attach_failed");
1010 }
1011 /* release the lock returned by session locate */
1012 switch_core_session_rwunlock(session);
1013 } else {
1014 ei_x_encode_tuple_header(rbuf, 2);
1015 ei_x_encode_atom(rbuf, "error");
1016 ei_x_encode_atom(rbuf, "badsession");
1017 }
1018 } else {
1019 ei_x_encode_tuple_header(rbuf, 2);
1020 ei_x_encode_atom(rbuf, "error");
1021 ei_x_encode_atom(rbuf, "baduuid");
1022 }
1023 }
1024 return SWITCH_STATUS_SUCCESS;
1025 }
1026
1027 /* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}}
1028 The {Ref,Pid} bit can be handled by handle_ref_tuple
1029 */
handle_msg_rpcresponse(listener_t * listener,erlang_msg * msg,int arity,ei_x_buff * buf,ei_x_buff * rbuf)1030 static switch_status_t handle_msg_rpcresponse(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
1031 {
1032 int type, size, arity2, tmpindex;
1033
1034 ei_get_type(buf->buff, &buf->index, &type, &size);
1035 switch (type) {
1036 case ERL_SMALL_TUPLE_EXT:
1037 case ERL_LARGE_TUPLE_EXT:
1038 tmpindex = buf->index;
1039 ei_decode_tuple_header(buf->buff, &tmpindex, &arity2);
1040 return handle_ref_tuple(listener, msg, buf, rbuf);
1041 default:
1042 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unknown rpc response\n");
1043 break;
1044 }
1045 /* no reply */
1046 return SWITCH_STATUS_FALSE;
1047 }
1048
handle_msg_tuple(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1049 static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1050 {
1051 char tupletag[MAXATOMLEN];
1052 int arity;
1053 switch_status_t ret = SWITCH_STATUS_SUCCESS;
1054
1055 ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1056 if (ei_decode_atom(buf->buff, &buf->index, tupletag)) {
1057 ei_x_encode_tuple_header(rbuf, 2);
1058 ei_x_encode_atom(rbuf, "error");
1059 ei_x_encode_atom(rbuf, "badarg");
1060 } else {
1061 if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) {
1062 ret = handle_msg_fetch_reply(listener, buf, rbuf);
1063 } else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) {
1064 ret = handle_msg_set_log_level(listener, arity, buf, rbuf);
1065 } else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
1066 ret = handle_msg_event(listener, arity, buf, rbuf);
1067 } else if (!strncmp(tupletag, "filter", MAXATOMLEN)) {
1068 ret = handle_msg_filter(listener, arity, buf, rbuf);
1069 } else if (!strncmp(tupletag, "session_event", MAXATOMLEN)) {
1070 ret = handle_msg_session_event(listener, msg, arity, buf, rbuf);
1071 } else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {
1072 ret = handle_msg_nixevent(listener, arity, buf, rbuf);
1073 } else if (!strncmp(tupletag, "session_nixevent", MAXATOMLEN)) {
1074 ret = handle_msg_session_nixevent(listener, msg, arity, buf, rbuf);
1075 } else if (!strncmp(tupletag, "api", MAXATOMLEN)) {
1076 ret = handle_msg_api(listener, msg, arity, buf, rbuf);
1077 } else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) {
1078 ret = handle_msg_bgapi(listener, msg, arity, buf, rbuf);
1079 } else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) {
1080 ret = handle_msg_sendevent(listener, arity, buf, rbuf);
1081 } else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) {
1082 ret = handle_msg_sendmsg(listener, arity, buf, rbuf);
1083 } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) {
1084 ret = handle_msg_bind(listener, msg, buf, rbuf);
1085 } else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) {
1086 ret = handle_msg_handlecall(listener, msg, arity, buf, rbuf);
1087 } else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
1088 ret = handle_msg_rpcresponse(listener, msg, arity, buf, rbuf);
1089 } else if (!strncmp(tupletag, "setevent", MAXATOMLEN)) {
1090 ret = handle_msg_setevent(listener, msg, arity, buf, rbuf);
1091 } else if (!strncmp(tupletag, "session_setevent", MAXATOMLEN)) {
1092 ret = handle_msg_session_setevent(listener, msg, arity, buf, rbuf);
1093 } else {
1094 ei_x_encode_tuple_header(rbuf, 2);
1095 ei_x_encode_atom(rbuf, "error");
1096 ei_x_encode_atom(rbuf, "undef");
1097 }
1098 }
1099 return ret;
1100 }
1101
handle_msg_atom(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1102 static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1103 {
1104 char atom[MAXATOMLEN];
1105 switch_status_t ret = SWITCH_STATUS_SUCCESS;
1106
1107 if (ei_decode_atom(buf->buff, &buf->index, atom)) {
1108 ei_x_encode_tuple_header(rbuf, 2);
1109 ei_x_encode_atom(rbuf, "error");
1110 ei_x_encode_atom(rbuf, "badarg");
1111 } else if (!strncmp(atom, "nolog", MAXATOMLEN)) {
1112 if (switch_test_flag(listener, LFLAG_LOG)) {
1113 void *pop;
1114 /*purge the log queue */
1115 while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS);
1116 switch_clear_flag_locked(listener, LFLAG_LOG);
1117 }
1118 ei_x_encode_atom(rbuf, "ok");
1119 } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) {
1120 ei_link(listener, ei_self(listener->ec), &msg->from);
1121 listener->log_process.type = ERLANG_PID;
1122 memcpy(&listener->log_process.pid, &msg->from, sizeof(erlang_pid));
1123 listener->level = SWITCH_LOG_DEBUG;
1124 switch_set_flag(listener, LFLAG_LOG);
1125 ei_x_encode_atom(rbuf, "ok");
1126 } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) {
1127 ei_link(listener, ei_self(listener->ec), &msg->from);
1128 listener->event_process.type = ERLANG_PID;
1129 memcpy(&listener->event_process.pid, &msg->from, sizeof(erlang_pid));
1130 if (!switch_test_flag(listener, LFLAG_EVENTS)) {
1131 switch_set_flag_locked(listener, LFLAG_EVENTS);
1132 }
1133 ei_x_encode_atom(rbuf, "ok");
1134 } else if (!strncmp(atom, "noevents", MAXATOMLEN)) {
1135 void *pop;
1136 /*purge the event queue */
1137 while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
1138
1139 if (switch_test_flag(listener, LFLAG_EVENTS)) {
1140 uint8_t x = 0;
1141 switch_clear_flag_locked(listener, LFLAG_EVENTS);
1142
1143 switch_thread_rwlock_wrlock(listener->event_rwlock);
1144 for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
1145 listener->event_list[x] = 0;
1146 }
1147
1148 switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
1149
1150 switch_thread_rwlock_unlock(listener->event_rwlock);
1151 ei_x_encode_atom(rbuf, "ok");
1152 } else {
1153 ei_x_encode_tuple_header(rbuf, 2);
1154 ei_x_encode_atom(rbuf, "error");
1155 ei_x_encode_atom(rbuf, "notlistening");
1156 }
1157 } else if (!strncmp(atom, "session_noevents", MAXATOMLEN)) {
1158 session_elem_t *session;
1159 if ((session = find_session_elem_by_pid(listener, &msg->from))) {
1160 void *pop;
1161 uint8_t x = 0;
1162
1163 /*purge the event queue */
1164 while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
1165
1166 switch_thread_rwlock_wrlock(session->event_rwlock);
1167 for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
1168 session->event_list[x] = 0;
1169 }
1170 /* wipe the hash */
1171 switch_core_hash_delete_multi(session->event_hash, NULL, NULL);
1172 switch_thread_rwlock_unlock(session->event_rwlock);
1173
1174 switch_thread_rwlock_unlock(session->rwlock);
1175
1176 ei_x_encode_atom(rbuf, "ok");
1177 } else {
1178 ei_x_encode_tuple_header(rbuf, 2);
1179 ei_x_encode_atom(rbuf, "error");
1180 ei_x_encode_atom(rbuf, "notlistening");
1181 }
1182 } else if (!strncmp(atom, "exit", MAXATOMLEN)) {
1183 ei_x_encode_atom(rbuf, "ok");
1184 ret = SWITCH_STATUS_TERM;
1185 } else if (!strncmp(atom, "getpid", MAXATOMLEN)) {
1186 ei_x_encode_tuple_header(rbuf, 2);
1187 ei_x_encode_atom(rbuf, "ok");
1188 ei_x_encode_pid(rbuf, ei_self(listener->ec));
1189 } else if (!strncmp(atom, "link", MAXATOMLEN)) {
1190 /* debugging */
1191 ei_link(listener, ei_self(listener->ec), &msg->from);
1192 ret = SWITCH_STATUS_FALSE;
1193 } else {
1194 ei_x_encode_tuple_header(rbuf, 2);
1195 ei_x_encode_atom(rbuf, "error");
1196 ei_x_encode_atom(rbuf, "undef");
1197 }
1198
1199 return ret;
1200 }
1201
1202
handle_ref_tuple(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1203 static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1204 {
1205 erlang_ref ref;
1206 erlang_pid pid;
1207 char hash[100];
1208 int arity;
1209 const void *key;
1210 void *val;
1211 session_elem_t *se;
1212 switch_hash_index_t *iter;
1213 int found = 0;
1214
1215 ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1216
1217 if (ei_decode_ref(buf->buff, &buf->index, &ref)) {
1218 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid reference\n");
1219 return SWITCH_STATUS_FALSE;
1220 }
1221
1222 if (ei_decode_pid(buf->buff, &buf->index, &pid)) {
1223 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n");
1224 return SWITCH_STATUS_FALSE;
1225 }
1226
1227 ei_hash_ref(&ref, hash);
1228
1229 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
1230
1231 switch_thread_rwlock_rdlock(listener->session_rwlock);
1232 for (iter = switch_core_hash_first(listener->sessions); iter; iter = switch_core_hash_next(&iter)) {
1233 switch_core_hash_this(iter, &key, NULL, &val);
1234 se = (session_elem_t*)val;
1235 if (switch_test_flag(se, LFLAG_WAITING_FOR_PID) && se->spawn_reply && !strncmp(se->spawn_reply->hash, hash, 100)) {
1236
1237 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "found matching session for %s : %s\n", hash, se->uuid_str);
1238
1239 switch_mutex_lock(se->spawn_reply->mutex);
1240
1241 se->spawn_reply->pid = switch_core_alloc(se->pool, sizeof(erlang_pid));
1242 switch_assert(se->spawn_reply->pid != NULL);
1243 memcpy(se->spawn_reply->pid, &pid, sizeof(erlang_pid));
1244
1245 switch_thread_cond_signal(se->spawn_reply->ready_or_found);
1246
1247 switch_mutex_unlock(se->spawn_reply->mutex);
1248
1249 found++;
1250
1251 break;
1252 }
1253 }
1254 switch_safe_free(iter);
1255 switch_thread_rwlock_unlock(listener->session_rwlock);
1256
1257 if (found) {
1258 return SWITCH_STATUS_FALSE;
1259 }
1260
1261 ei_x_encode_tuple_header(rbuf, 2);
1262 ei_x_encode_atom(rbuf, "error");
1263 ei_x_encode_atom(rbuf, "notfound");
1264
1265 return SWITCH_STATUS_SUCCESS;
1266 }
1267
1268
1269 /* fake enough of the net_kernel module to be able to respond to net_adm:ping */
1270 /* {'$gen_call', {<cpx@freecpx.128.0>, #Ref<254770.4.0>}, {is_auth, cpx@freecpx} */
handle_net_kernel_msg(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1271 static switch_status_t handle_net_kernel_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1272 {
1273 int version, size, type, arity;
1274 char atom[MAXATOMLEN];
1275 erlang_ref ref;
1276 erlang_pid pid;
1277
1278 buf->index = 0;
1279 ei_decode_version(buf->buff, &buf->index, &version);
1280 ei_get_type(buf->buff, &buf->index, &type, &size);
1281
1282 if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1283 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1284 return SWITCH_STATUS_FALSE;
1285 }
1286
1287 ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1288
1289 if (arity != 3) {
1290 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "wrong arity\n");
1291 return SWITCH_STATUS_FALSE;
1292 }
1293
1294 if (ei_decode_atom(buf->buff, &buf->index, atom) || strncmp(atom, "$gen_call", MAXATOMLEN)) {
1295 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not gen_call\n");
1296 return SWITCH_STATUS_FALSE;
1297 }
1298
1299 ei_get_type(buf->buff, &buf->index, &type, &size);
1300
1301 if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1302 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1303 return SWITCH_STATUS_FALSE;
1304 }
1305
1306 ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1307
1308 if (arity != 2) {
1309 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "wrong arity\n");
1310 return SWITCH_STATUS_FALSE;
1311 }
1312
1313 if (ei_decode_pid(buf->buff, &buf->index, &pid) || ei_decode_ref(buf->buff, &buf->index, &ref)) {
1314 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "decoding pid and ref error\n");
1315 return SWITCH_STATUS_FALSE;
1316 }
1317
1318 ei_get_type(buf->buff, &buf->index, &type, &size);
1319
1320 if (type != ERL_SMALL_TUPLE_EXT && type != ERL_SMALL_TUPLE_EXT) {
1321 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not a tuple\n");
1322 return SWITCH_STATUS_FALSE;
1323 }
1324
1325 ei_decode_tuple_header(buf->buff, &buf->index, &arity);
1326
1327 if (arity != 2) {
1328 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "bad arity\n");
1329 return SWITCH_STATUS_FALSE;
1330 }
1331
1332 if (ei_decode_atom(buf->buff, &buf->index, atom) || strncmp(atom, "is_auth", MAXATOMLEN)) {
1333 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "not is_auth\n");
1334 return SWITCH_STATUS_FALSE;
1335 }
1336
1337 /* To ! {Tag, Reply} */
1338 ei_x_encode_tuple_header(rbuf, 2);
1339 ei_x_encode_ref(rbuf, &ref);
1340 ei_x_encode_atom(rbuf, "yes");
1341
1342 switch_mutex_lock(listener->sock_mutex);
1343 ei_send(listener->sockdes, &pid, rbuf->buff, rbuf->index);
1344 switch_mutex_unlock(listener->sock_mutex);
1345 #ifdef EI_DEBUG
1346 ei_x_print_msg(rbuf, &pid, 1);
1347 #endif
1348
1349 return SWITCH_STATUS_FALSE;
1350 }
1351
1352
handle_msg(listener_t * listener,erlang_msg * msg,ei_x_buff * buf,ei_x_buff * rbuf)1353 int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buff * rbuf)
1354 {
1355 int type, type2, size, version, arity, tmpindex;
1356 switch_status_t ret = SWITCH_STATUS_SUCCESS;
1357
1358 if (msg->msgtype == ERL_REG_SEND && !strncmp(msg->toname, "net_kernel", MAXATOMLEN)) {
1359 /* try to respond to ping stuff */
1360 ret = handle_net_kernel_msg(listener, msg, buf, rbuf);
1361 } else {
1362 buf->index = 0;
1363 ei_decode_version(buf->buff, &buf->index, &version);
1364 ei_get_type(buf->buff, &buf->index, &type, &size);
1365
1366 switch (type) {
1367 case ERL_SMALL_TUPLE_EXT:
1368 case ERL_LARGE_TUPLE_EXT:
1369 tmpindex = buf->index;
1370 ei_decode_tuple_header(buf->buff, &tmpindex, &arity);
1371 ei_get_type(buf->buff, &tmpindex, &type2, &size);
1372
1373 switch (type2) {
1374 case ERL_ATOM_EXT:
1375 ret = handle_msg_tuple(listener, msg, buf, rbuf);
1376 break;
1377 case ERL_REFERENCE_EXT:
1378 case ERL_NEW_REFERENCE_EXT:
1379 ret = handle_ref_tuple(listener, msg, buf, rbuf);
1380 break;
1381 default:
1382 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d %d\n", type, type2);
1383 /* some other kind of erlang term */
1384 ei_x_encode_tuple_header(rbuf, 2);
1385 ei_x_encode_atom(rbuf, "error");
1386 ei_x_encode_atom(rbuf, "undef");
1387 break;
1388 }
1389
1390 break;
1391
1392 case ERL_ATOM_EXT:
1393 ret = handle_msg_atom(listener, msg, buf, rbuf);
1394 break;
1395
1396 default:
1397 /* some other kind of erlang term */
1398 ei_x_encode_tuple_header(rbuf, 2);
1399 ei_x_encode_atom(rbuf, "error");
1400 ei_x_encode_atom(rbuf, "undef");
1401 break;
1402 }
1403 }
1404
1405 if (SWITCH_STATUS_FALSE == ret) {
1406 return 0;
1407 } else if (rbuf->index > 1) {
1408 switch_mutex_lock(listener->sock_mutex);
1409 ei_send(listener->sockdes, &msg->from, rbuf->buff, rbuf->index);
1410 switch_mutex_unlock(listener->sock_mutex);
1411 #ifdef EI_DEBUG
1412 ei_x_print_msg(rbuf, &msg->from, 1);
1413 #endif
1414 return SWITCH_STATUS_SUCCESS != ret;
1415
1416 } else {
1417 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n");
1418 return 0;
1419 }
1420 }
1421
1422 /* For Emacs:
1423 * Local Variables:
1424 * mode:c
1425 * indent-tabs-mode:t
1426 * tab-width:4
1427 * c-basic-offset:4
1428 * End:
1429 * For VIM:
1430 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1431 */
1432