1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 *
26 * Karl Anderson <karl@2600hz.com>
27 * Darren Schreiber <darren@2600hz.com>
28 *
29 *
30 * kazoo_event_streams.c -- Event Publisher
31 *
32 */
33 #include "mod_kazoo.h"
34
35 #define MAX_FRAMING 4
36
37 /* Blatantly repurposed from switch_eventc */
my_dup(const char * s)38 static char *my_dup(const char *s) {
39 size_t len = strlen(s) + 1;
40 void *new = malloc(len);
41 switch_assert(new);
42
43 return (char *) memcpy(new, s, len);
44 }
45
46 #ifndef DUP
47 #define DUP(str) my_dup(str)
48 #endif
49
50 static const char* private_headers[] = {"variable_sip_h_", "sip_h_", "P-", "X-"};
51
is_private_header(const char * name)52 static int is_private_header(const char *name) {
53 int i;
54 for(i=0; i < 4; i++) {
55 if(!strncmp(name, private_headers[i], strlen(private_headers[i]))) {
56 return 1;
57 }
58 }
59 return 0;
60 }
61
is_kazoo_var(char * header)62 static int is_kazoo_var(char* header)
63 {
64 int idx = 0;
65 while(kazoo_globals.kazoo_var_prefixes[idx] != NULL) {
66 char *prefix = kazoo_globals.kazoo_var_prefixes[idx];
67 if(!strncasecmp(header, prefix, strlen(prefix))) {
68 return 1;
69 }
70 idx++;
71 }
72
73 return 0;
74 }
75
kazoo_event_dup(switch_event_t ** clone,switch_event_t * event,switch_hash_t * filter)76 static switch_status_t kazoo_event_dup(switch_event_t **clone, switch_event_t *event, switch_hash_t *filter) {
77 switch_event_header_t *header;
78
79 if (switch_event_create_subclass(clone, SWITCH_EVENT_CLONE, event->subclass_name) != SWITCH_STATUS_SUCCESS) {
80 return SWITCH_STATUS_GENERR;
81 }
82
83 (*clone)->event_id = event->event_id;
84 (*clone)->event_user_data = event->event_user_data;
85 (*clone)->bind_user_data = event->bind_user_data;
86 (*clone)->flags = event->flags;
87
88 for (header = event->headers; header; header = header->next) {
89 if (event->subclass_name && !strcmp(header->name, "Event-Subclass")) {
90 continue;
91 }
92
93 if (!is_kazoo_var(header->name)
94 && filter
95 && !switch_core_hash_find(filter, header->name)
96 && (!kazoo_globals.send_all_headers)
97 && (!(kazoo_globals.send_all_private_headers && is_private_header(header->name)))
98 )
99 {
100 continue;
101 }
102
103 if (header->idx) {
104 int i;
105 for (i = 0; i < header->idx; i++) {
106 switch_event_add_header_string(*clone, SWITCH_STACK_PUSH, header->name, header->array[i]);
107 }
108 } else {
109 switch_event_add_header_string(*clone, SWITCH_STACK_BOTTOM, header->name, header->value);
110 }
111 }
112
113 if (event->body) {
114 (*clone)->body = DUP(event->body);
115 }
116
117 (*clone)->key = event->key;
118
119 return SWITCH_STATUS_SUCCESS;
120 }
121
encode_event_old(switch_event_t * event,ei_x_buff * ebuf)122 static int encode_event_old(switch_event_t *event, ei_x_buff *ebuf) {
123 switch_event_t *clone = NULL;
124
125 if (kazoo_event_dup(&clone, event, kazoo_globals.event_filter) != SWITCH_STATUS_SUCCESS) {
126 return 0;
127 }
128
129 ei_encode_switch_event(ebuf, clone);
130
131 switch_event_destroy(&clone);
132
133 return 1;
134 }
135
encode_event_new(switch_event_t * event,ei_x_buff * ebuf)136 static int encode_event_new(switch_event_t *event, ei_x_buff *ebuf) {
137 kazoo_message_ptr msg = NULL;
138 ei_event_binding_t *event_binding = (ei_event_binding_t *) event->bind_user_data;
139
140 msg = kazoo_message_create_event(event, event_binding->event, kazoo_globals.events);
141
142 if(msg == NULL) {
143 return 0;
144 }
145
146 ei_x_encode_tuple_header(ebuf, 3);
147 ei_x_encode_atom(ebuf, "event");
148 if(kazoo_globals.json_encoding == ERLANG_TUPLE) {
149 ei_x_encode_atom(ebuf, "json");
150 } else {
151 ei_x_encode_atom(ebuf, "map");
152 }
153 ei_encode_json(ebuf, msg->JObj);
154
155 kazoo_message_destroy(&msg);
156
157 return 1;
158 }
159
160 /*
161 * event_handler is duplicated when there are 2+ nodes connected
162 * with the same bindings
163 * we should maintain a list of event_streams in event_binding struct
164 * and build a ref count in the message
165 *
166 */
event_handler(switch_event_t * event)167 static void event_handler(switch_event_t *event) {
168 ei_event_binding_t *event_binding = (ei_event_binding_t *) event->bind_user_data;
169 ei_event_stream_t *event_stream = event_binding->stream;
170 ei_x_buff *ebuf = NULL;
171 int res = 0;
172
173 /* if mod_kazoo or the event stream isn't running dont push a new event */
174 if (!switch_test_flag(event_stream, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
175 return;
176 }
177
178 kz_event_decode(event);
179
180 switch_malloc(ebuf, sizeof(*ebuf));
181 if(ebuf == NULL) {
182 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not allocate erlang buffer for mod_kazoo message\n");
183 return;
184 }
185 memset(ebuf, 0, sizeof(*ebuf));
186
187 if(kazoo_globals.event_stream_preallocate > 0) {
188 ebuf->buff = malloc(kazoo_globals.event_stream_preallocate);
189 ebuf->buffsz = kazoo_globals.event_stream_preallocate;
190 ebuf->index = 0;
191 if(ebuf->buff == NULL) {
192 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not pre-allocate memory for mod_kazoo message\n");
193 switch_safe_free(ebuf);
194 return;
195 }
196 } else {
197 ei_x_new(ebuf);
198 }
199
200 ebuf->index = MAX_FRAMING;
201
202 ei_x_encode_version(ebuf);
203
204 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Target-Node", event_binding->stream->node->peer_nodename);
205 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Switch-Nodename", kazoo_globals.ei_cnode.thisnodename);
206
207 if(event_stream->node->legacy) {
208 res = encode_event_old(event, ebuf);
209 } else {
210 res = encode_event_new(event, ebuf);
211 }
212
213 switch_event_del_header(event, "Switch-Nodename");
214 switch_event_del_header(event, "Target-Node");
215
216 if(!res) {
217 ei_x_free(ebuf);
218 switch_safe_free(ebuf);
219 return;
220 }
221
222 if (kazoo_globals.event_stream_preallocate > 0 && ebuf->buffsz > kazoo_globals.event_stream_preallocate) {
223 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased event stream buffer size to %d\n", ebuf->buffsz);
224 }
225
226 if (switch_queue_trypush(event_stream->queue, ebuf) != SWITCH_STATUS_SUCCESS) {
227 /* if we couldn't place the cloned event into the listeners */
228 /* event queue make sure we destroy it, real good like */
229 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error placing the event in the listeners queue\n");
230 ei_x_free(ebuf);
231 switch_safe_free(ebuf);
232 }
233
234 }
235
event_stream_loop(switch_thread_t * thread,void * obj)236 static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void *obj) {
237 ei_event_stream_t *event_stream = (ei_event_stream_t *) obj;
238 ei_event_binding_t *event_binding;
239 switch_sockaddr_t *sa;
240 uint16_t port;
241 char ipbuf[48];
242 const char *ip_addr;
243 void *pop;
244 short event_stream_framing;
245 short event_stream_keepalive;
246 short ok = 1;
247
248 switch_atomic_inc(&kazoo_globals.threads);
249
250 switch_assert(event_stream != NULL);
251
252 event_stream_framing = event_stream->event_stream_framing;
253 event_stream_keepalive = event_stream->event_stream_keepalive;
254
255 /* figure out what socket we just opened */
256 switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
257 port = switch_sockaddr_get_port(sa);
258 ip_addr = switch_get_addr(ipbuf, sizeof(ipbuf), sa);
259
260 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang event stream %p on %s:%u for %s <%d.%d.%d>\n"
261 ,(void *)event_stream, ip_addr, port, event_stream->pid.node, event_stream->pid.creation
262 ,event_stream->pid.num, event_stream->pid.serial);
263
264 while (switch_test_flag(event_stream, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING) && ok) {
265 const switch_pollfd_t *fds;
266 int32_t numfds;
267
268 /* check if a new connection is pending */
269 if (switch_pollset_poll(event_stream->pollset, 0, &numfds, &fds) == SWITCH_STATUS_SUCCESS) {
270 int32_t i;
271 for (i = 0; i < numfds; i++) {
272 switch_socket_t *newsocket;
273
274 /* accept the new client connection */
275 if (switch_socket_accept(&newsocket, event_stream->acceptor, event_stream->pool) == SWITCH_STATUS_SUCCESS) {
276 switch_sockaddr_t *sa;
277
278 if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) {
279 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
280 }
281
282 if (event_stream_keepalive) {
283 if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) {
284 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n");
285 }
286 }
287
288 if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
289 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
290 }
291
292 /* close the current client, if there is one */
293 close_socket(&event_stream->socket);
294
295 switch_mutex_lock(event_stream->socket_mutex);
296 /* start sending to the new client */
297 event_stream->socket = newsocket;
298
299 switch_socket_addr_get(&sa, SWITCH_TRUE, newsocket);
300 event_stream->remote_port = switch_sockaddr_get_port(sa);
301 switch_get_addr(event_stream->remote_ip, sizeof (event_stream->remote_ip), sa);
302
303 switch_socket_addr_get(&sa, SWITCH_FALSE, newsocket);
304 event_stream->local_port = switch_sockaddr_get_port(sa);
305 switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
306
307 event_stream->connected = SWITCH_TRUE;
308 event_stream->connected_time = switch_micro_time_now();
309
310 switch_mutex_unlock(event_stream->socket_mutex);
311
312 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
313 }
314 }
315 }
316
317 /* if there was an event waiting in our queue send it to the client */
318 if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) {
319 ei_x_buff *ebuf = (ei_x_buff *) pop;
320
321 if (event_stream->socket) {
322 switch_size_t size = 1, expected = 0;
323 switch_status_t status = SWITCH_STATUS_SUCCESS;
324
325 if(ebuf->index >= pow(2, 8 * event_stream_framing)) {
326 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sending frame size %d with insufficient frame capacity, change event_stream_framing here and tcp_packet_type in ecallmgr\n", ebuf->index);
327 } else {
328 if(event_stream_framing) {
329 int index = ebuf->index - MAX_FRAMING;
330 char byte;
331 short i = event_stream_framing;
332 while (i) {
333 byte = index >> (8 * --i);
334 ebuf->buff[MAX_FRAMING - i - 1] = byte;
335 }
336 }
337 expected = size = (switch_size_t)ebuf->index - MAX_FRAMING + event_stream_framing;
338 if((status = switch_socket_send(event_stream->socket, ebuf->buff + (MAX_FRAMING - event_stream_framing), &size)) != SWITCH_STATUS_SUCCESS) {
339 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error %d sending event stream\n", status);
340 ok = 0;
341 } else if(expected != size) {
342 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error sending event stream, sent bytes is different of expected\n");
343 ok = 0;
344 }
345 }
346 }
347
348 ei_x_free(ebuf);
349 switch_safe_free(ebuf);
350 }
351 }
352
353 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutting down erlang event stream %p\n", (void *)event_stream);
354
355 /* unbind from the system events */
356 event_binding = event_stream->bindings;
357 while(event_binding != NULL) {
358 switch_event_unbind(&event_binding->node);
359 event_binding = event_binding->next;
360 }
361 event_stream->bindings = NULL;
362
363 /* clear and destroy any remaining queued events */
364 while (switch_queue_trypop(event_stream->queue, &pop) == SWITCH_STATUS_SUCCESS) {
365 ei_x_buff *ebuf = (ei_x_buff *) pop;
366 ei_x_free(ebuf);
367 switch_safe_free(ebuf);
368 }
369
370 /* remove the acceptor pollset */
371 switch_pollset_remove(event_stream->pollset, event_stream->pollfd);
372
373 /* close any open sockets */
374 close_socket(&event_stream->acceptor);
375
376 switch_mutex_lock(event_stream->socket_mutex);
377 event_stream->connected = SWITCH_FALSE;
378 close_socket(&event_stream->socket);
379 switch_mutex_unlock(event_stream->socket_mutex);
380
381 switch_mutex_destroy(event_stream->socket_mutex);
382
383 /* clean up the memory */
384 switch_core_destroy_memory_pool(&event_stream->pool);
385
386 switch_atomic_dec(&kazoo_globals.threads);
387
388 return NULL;
389 }
390
new_event_stream(ei_node_t * ei_node,const erlang_pid * from)391 ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from) {
392 switch_thread_t *thread;
393 switch_threadattr_t *thd_attr = NULL;
394 switch_memory_pool_t *pool = NULL;
395 ei_event_stream_t *event_stream;
396 ei_event_stream_t **event_streams = &ei_node->event_streams;
397
398 /* create memory pool for this event stream */
399 if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
400 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory: How many Alzheimer's patients does it take to screw in a light bulb? To get to the other side.\n");
401 return NULL;
402 }
403
404 /* from the memory pool, allocate the event stream structure */
405 if (!(event_stream = switch_core_alloc(pool, sizeof (*event_stream)))) {
406 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory: I may have Alzheimers but at least I dont have Alzheimers.\n");
407 goto cleanup;
408 }
409
410 /* prepare the event stream */
411 memset(event_stream, 0, sizeof(*event_stream));
412 event_stream->bindings = NULL;
413 event_stream->pool = pool;
414 event_stream->connected = SWITCH_FALSE;
415 event_stream->node = ei_node;
416 event_stream->event_stream_framing = ei_node->event_stream_framing;
417 event_stream->event_stream_keepalive = ei_node->event_stream_keepalive;
418 event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
419 memcpy(&event_stream->pid, from, sizeof(erlang_pid));
420 switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
421
422 /* create a socket for accepting the event stream client */
423 if (!(event_stream->acceptor = create_socket(pool))) {
424 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Like car accidents, most hardware problems are due to driver error.\n");
425 goto cleanup;
426 }
427
428 if (switch_socket_opt_set(event_stream->acceptor, SWITCH_SO_NONBLOCK, TRUE)) {
429 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Hey, it compiles!\n");
430 goto cleanup;
431 }
432
433 /* create a pollset so we can efficiently check for new client connections */
434 if (switch_pollset_create(&event_stream->pollset, 1000, pool, 0) != SWITCH_STATUS_SUCCESS) {
435 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "My software never has bugs. It just develops random features.\n");
436 goto cleanup;
437 }
438
439 switch_socket_create_pollfd(&event_stream->pollfd, event_stream->acceptor, SWITCH_POLLIN | SWITCH_POLLERR, NULL, pool);
440 if (switch_pollset_add(event_stream->pollset, event_stream->pollfd) != SWITCH_STATUS_SUCCESS) {
441 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "If you saw a heat wave, would you wave back?\n");
442 goto cleanup;
443 }
444
445 switch_mutex_init(&event_stream->socket_mutex, SWITCH_MUTEX_DEFAULT, pool);
446
447 /* add the new event stream to the link list
448 * since the event streams link list is only
449 * accessed from the same thread no locks
450 * are required */
451 if (!*event_streams) {
452 *event_streams = event_stream;
453 } else {
454 event_stream->next = *event_streams;
455 *event_streams = event_stream;
456 }
457
458 /* when we start we are running */
459 switch_set_flag(event_stream, LFLAG_RUNNING);
460
461 switch_threadattr_create(&thd_attr, event_stream->pool);
462 switch_threadattr_detach_set(thd_attr, 1);
463 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
464 switch_thread_create(&thread, thd_attr, event_stream_loop, event_stream, event_stream->pool);
465
466 return event_stream;
467
468 cleanup:
469
470 if (event_stream) {
471 /* remove the acceptor pollset */
472 if (event_stream->pollset) {
473 switch_pollset_remove(event_stream->pollset, event_stream->pollfd);
474 }
475
476 /* close any open sockets */
477 if (event_stream->acceptor) {
478 close_socket(&event_stream->acceptor);
479 }
480 }
481
482 /* clean up the memory */
483 switch_core_destroy_memory_pool(&pool);
484
485 return NULL;
486
487 }
488
get_stream_port(const ei_event_stream_t * event_stream)489 unsigned long get_stream_port(const ei_event_stream_t *event_stream) {
490 switch_sockaddr_t *sa;
491 switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
492 return (unsigned long) switch_sockaddr_get_port(sa);
493 }
494
find_event_stream(ei_event_stream_t * event_stream,const erlang_pid * from)495 ei_event_stream_t *find_event_stream(ei_event_stream_t *event_stream, const erlang_pid *from) {
496 while (event_stream != NULL) {
497 if (ei_compare_pids(&event_stream->pid, from) == SWITCH_STATUS_SUCCESS) {
498 return event_stream;
499 }
500 event_stream = event_stream->next;
501 }
502
503 return NULL;
504 }
505
remove_event_stream(ei_event_stream_t ** event_streams,const erlang_pid * from)506 switch_status_t remove_event_stream(ei_event_stream_t **event_streams, const erlang_pid *from) {
507 ei_event_stream_t *event_stream, *prev = NULL;
508 int found = 0;
509
510 /* if there are no event bindings there is nothing to do */
511 if (!*event_streams) {
512 return SWITCH_STATUS_SUCCESS;
513 }
514
515 /* try to find the event stream for the client process */
516 event_stream = *event_streams;
517 while(event_stream != NULL) {
518 if (ei_compare_pids(&event_stream->pid, from) == SWITCH_STATUS_SUCCESS) {
519 found = 1;
520 break;
521 }
522
523 prev = event_stream;
524 event_stream = event_stream->next;
525 }
526
527 if (found) {
528 /* if we found an event stream remove it from
529 * from the link list */
530 if (!prev) {
531 *event_streams = event_stream->next;
532 } else {
533 prev->next = event_stream->next;
534 }
535
536 /* stop the event stream thread */
537 switch_clear_flag(event_stream, LFLAG_RUNNING);
538 }
539
540 return SWITCH_STATUS_SUCCESS;
541 }
542
remove_event_streams(ei_event_stream_t ** event_streams)543 switch_status_t remove_event_streams(ei_event_stream_t **event_streams) {
544 ei_event_stream_t *event_stream = *event_streams;
545
546 while(event_stream != NULL) {
547 /* stop the event bindings publisher thread */
548 switch_clear_flag(event_stream, LFLAG_RUNNING);
549
550 event_stream = event_stream->next;
551 }
552
553 *event_streams = NULL;
554
555 return SWITCH_STATUS_SUCCESS;
556 }
557
bind_event_profile(ei_event_binding_t * event_binding,kazoo_event_ptr event)558 void bind_event_profile(ei_event_binding_t *event_binding, kazoo_event_ptr event)
559 {
560 switch_event_types_t event_type;
561 while(event != NULL) {
562 if (switch_name_event(event->name, &event_type) != SWITCH_STATUS_SUCCESS) {
563 event_type = SWITCH_EVENT_CUSTOM;
564 }
565 if(event_binding->type != SWITCH_EVENT_CUSTOM
566 && event_binding->type == event_type) {
567 break;
568 }
569 if (event_binding->type == SWITCH_EVENT_CUSTOM
570 && event_binding->type == event_type
571 && !strcasecmp(event_binding->subclass_name, event->name)) {
572 break;
573 }
574 event = event->next;
575 }
576 event_binding->event = event;
577 if(event == NULL && (!event_binding->stream->node->legacy)) {
578 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "event binding to an event without profile in non legacy mode => %s - %s\n",switch_event_name(event_binding->type), event_binding->subclass_name);
579 }
580 }
581
bind_event_profiles(kazoo_event_ptr event)582 void bind_event_profiles(kazoo_event_ptr event)
583 {
584 ei_node_t *ei_node = kazoo_globals.ei_nodes;
585 while(ei_node) {
586 ei_event_stream_t *event_streams = ei_node->event_streams;
587 while(event_streams) {
588 ei_event_binding_t *bindings = event_streams->bindings;
589 while(bindings) {
590 bind_event_profile(bindings, event);
591 bindings = bindings->next;
592 }
593 event_streams = event_streams->next;
594 }
595 ei_node = ei_node->next;
596 }
597 }
598
add_event_binding(ei_event_stream_t * event_stream,const char * event_name)599 switch_status_t add_event_binding(ei_event_stream_t *event_stream, const char *event_name) {
600 ei_event_binding_t *event_binding = event_stream->bindings;
601 switch_event_types_t event_type;
602
603 if(!strcasecmp(event_name, "CUSTOM")) {
604 return SWITCH_STATUS_SUCCESS;
605 }
606
607 if (switch_name_event(event_name, &event_type) != SWITCH_STATUS_SUCCESS) {
608 event_type = SWITCH_EVENT_CUSTOM;
609 }
610
611 /* check if the event binding already exists, ignore if so */
612 while(event_binding != NULL) {
613 if (event_binding->type == SWITCH_EVENT_CUSTOM) {
614 if(event_type == SWITCH_EVENT_CUSTOM
615 && event_name
616 && event_binding->subclass_name
617 && !strcasecmp(event_name, event_binding->subclass_name)) {
618 return SWITCH_STATUS_SUCCESS;
619 }
620 } else if (event_binding->type == event_type) {
621 return SWITCH_STATUS_SUCCESS;
622 }
623 event_binding = event_binding->next;
624 }
625
626 /* from the event stream memory pool, allocate the event binding structure */
627 if (!(event_binding = switch_core_alloc(event_stream->pool, sizeof (*event_binding)))) {
628 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of random-access memory, attempting write-only memory\n");
629 return SWITCH_STATUS_FALSE;
630 }
631
632 /* prepare the event binding struct */
633 event_binding->stream = event_stream;
634 event_binding->type = event_type;
635 if(event_binding->type == SWITCH_EVENT_CUSTOM) {
636 event_binding->subclass_name = switch_core_strdup(event_stream->pool, event_name);
637 } else {
638 event_binding->subclass_name = SWITCH_EVENT_SUBCLASS_ANY;
639 }
640 event_binding->next = NULL;
641
642 bind_event_profile(event_binding, kazoo_globals.events->events);
643
644
645 /* bind to the event with a unique ID and capture the event_node pointer */
646 switch_uuid_str(event_binding->id, sizeof(event_binding->id));
647 if (switch_event_bind_removable(event_binding->id, event_type, event_binding->subclass_name, event_handler, event_binding, &event_binding->node) != SWITCH_STATUS_SUCCESS) {
648 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to bind to event %s %s!\n"
649 ,switch_event_name(event_binding->type), event_binding->subclass_name ? event_binding->subclass_name : "");
650 return SWITCH_STATUS_GENERR;
651 }
652
653 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding event binding %s to stream %p for %s <%d.%d.%d>: %s %s\n"
654 ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
655 ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
656 ,event_binding->subclass_name ? event_binding->subclass_name : "");
657
658 /* add the new binding to the list */
659 if (!event_stream->bindings) {
660 event_stream->bindings = event_binding;
661 } else {
662 event_binding->next = event_stream->bindings;
663 event_stream->bindings = event_binding;
664 }
665
666 return SWITCH_STATUS_SUCCESS;
667 }
668
remove_event_binding(ei_event_stream_t * event_stream,const switch_event_types_t event_type,const char * subclass_name)669 switch_status_t remove_event_binding(ei_event_stream_t *event_stream, const switch_event_types_t event_type, const char *subclass_name) {
670 ei_event_binding_t *event_binding = event_stream->bindings, *prev = NULL;
671 int found = 0;
672
673 /* if there are no bindings then there is nothing to do */
674 if (!event_binding) {
675 return SWITCH_STATUS_SUCCESS;
676 }
677
678 /* try to find the event binding specified */
679 while(event_binding != NULL) {
680 if (event_binding->type == SWITCH_EVENT_CUSTOM
681 && subclass_name
682 && event_binding->subclass_name
683 && !strcmp(subclass_name, event_binding->subclass_name)) {
684 found = 1;
685 break;
686 } else if (event_binding->type == event_type) {
687 found = 1;
688 break;
689 }
690
691 prev = event_binding;
692 event_binding = event_binding->next;
693 }
694
695 if (found) {
696 /* if the event binding exists, unbind from the system */
697 switch_event_unbind(&event_binding->node);
698
699 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing event binding %s from %p for %s <%d.%d.%d>: %s %s\n"
700 ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
701 ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
702 ,event_binding->subclass_name ? event_binding->subclass_name : "");
703
704 /* remove the event binding from the list */
705 if (!prev) {
706 event_stream->bindings = event_binding->next;
707 } else {
708 prev->next = event_binding->next;
709 }
710 }
711
712 return SWITCH_STATUS_SUCCESS;
713 }
714
remove_event_bindings(ei_event_stream_t * event_stream)715 switch_status_t remove_event_bindings(ei_event_stream_t *event_stream) {
716 ei_event_binding_t *event_binding = event_stream->bindings;
717
718 /* if there are no bindings then there is nothing to do */
719 if (!event_binding) {
720 return SWITCH_STATUS_SUCCESS;
721 }
722
723 /* try to find the event binding specified */
724 while(event_binding != NULL) {
725 /* if the event binding exists, unbind from the system */
726 switch_event_unbind(&event_binding->node);
727
728 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing event binding %s from %p for %s <%d.%d.%d>: %s %s\n"
729 ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
730 ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
731 ,event_binding->subclass_name ? event_binding->subclass_name : "");
732
733 event_binding = event_binding->next;
734 }
735
736 event_stream->bindings = NULL;
737
738 return SWITCH_STATUS_SUCCESS;
739 }
740
741 /* For Emacs:
742 * Local Variables:
743 * mode:c
744 * indent-tabs-mode:t
745 * tab-width:4
746 * c-basic-offset:4
747 * End:
748 * For VIM:
749 * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
750 */
751