1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Karl Anderson <karl@2600hz.com>
27  * Darren Schreiber <darren@2600hz.com>
28  *
29  *
30  * kazoo_event_streams.c -- Event Publisher
31  *
32  */
33 #include "mod_kazoo.h"
34 
35 #define MAX_FRAMING 4
36 
37 /* Blatantly repurposed from switch_eventc */
my_dup(const char * s)38 static char *my_dup(const char *s) {
39 	size_t len = strlen(s) + 1;
40 	void *new = malloc(len);
41 	switch_assert(new);
42 
43 	return (char *) memcpy(new, s, len);
44 }
45 
46 #ifndef DUP
47 #define DUP(str) my_dup(str)
48 #endif
49 
50 static const char* private_headers[] = {"variable_sip_h_", "sip_h_", "P-", "X-"};
51 
is_private_header(const char * name)52 static int is_private_header(const char *name) {
53 	int i;
54 	for(i=0; i < 4; i++) {
55 		if(!strncmp(name, private_headers[i], strlen(private_headers[i]))) {
56 			return 1;
57 		}
58 	}
59 	return 0;
60 }
61 
is_kazoo_var(char * header)62 static int is_kazoo_var(char* header)
63 {
64 	int idx = 0;
65 	while(kazoo_globals.kazoo_var_prefixes[idx] != NULL) {
66 		char *prefix = kazoo_globals.kazoo_var_prefixes[idx];
67 		if(!strncasecmp(header, prefix, strlen(prefix))) {
68 			return 1;
69 		}
70 		idx++;
71 	}
72 
73 	return 0;
74 }
75 
kazoo_event_dup(switch_event_t ** clone,switch_event_t * event,switch_hash_t * filter)76 static switch_status_t kazoo_event_dup(switch_event_t **clone, switch_event_t *event, switch_hash_t *filter) {
77 	switch_event_header_t *header;
78 
79 	if (switch_event_create_subclass(clone, SWITCH_EVENT_CLONE, event->subclass_name) != SWITCH_STATUS_SUCCESS) {
80 		return SWITCH_STATUS_GENERR;
81 	}
82 
83 	(*clone)->event_id = event->event_id;
84 	(*clone)->event_user_data = event->event_user_data;
85 	(*clone)->bind_user_data = event->bind_user_data;
86 	(*clone)->flags = event->flags;
87 
88 	for (header = event->headers; header; header = header->next) {
89 		if (event->subclass_name && !strcmp(header->name, "Event-Subclass")) {
90 			continue;
91 		}
92 
93 		if (!is_kazoo_var(header->name)
94 			&& filter
95 			&& !switch_core_hash_find(filter, header->name)
96 			&& (!kazoo_globals.send_all_headers)
97 			&& (!(kazoo_globals.send_all_private_headers && is_private_header(header->name)))
98 			)
99 			{
100 				continue;
101 			}
102 
103         if (header->idx) {
104             int i;
105             for (i = 0; i < header->idx; i++) {
106                 switch_event_add_header_string(*clone, SWITCH_STACK_PUSH, header->name, header->array[i]);
107             }
108         } else {
109             switch_event_add_header_string(*clone, SWITCH_STACK_BOTTOM, header->name, header->value);
110         }
111     }
112 
113     if (event->body) {
114         (*clone)->body = DUP(event->body);
115     }
116 
117     (*clone)->key = event->key;
118 
119     return SWITCH_STATUS_SUCCESS;
120 }
121 
encode_event_old(switch_event_t * event,ei_x_buff * ebuf)122 static int encode_event_old(switch_event_t *event, ei_x_buff *ebuf) {
123 	switch_event_t *clone = NULL;
124 
125 	if (kazoo_event_dup(&clone, event, kazoo_globals.event_filter) != SWITCH_STATUS_SUCCESS) {
126 		return 0;
127 	}
128 
129 	ei_encode_switch_event(ebuf, clone);
130 
131 	switch_event_destroy(&clone);
132 
133 	return 1;
134 }
135 
encode_event_new(switch_event_t * event,ei_x_buff * ebuf)136 static int encode_event_new(switch_event_t *event, ei_x_buff *ebuf) {
137 	kazoo_message_ptr msg = NULL;
138 	ei_event_binding_t *event_binding = (ei_event_binding_t *) event->bind_user_data;
139 
140 	msg =  kazoo_message_create_event(event, event_binding->event, kazoo_globals.events);
141 
142 	if(msg == NULL) {
143 		return 0;
144 	}
145 
146 	ei_x_encode_tuple_header(ebuf, 3);
147 	ei_x_encode_atom(ebuf, "event");
148 	if(kazoo_globals.json_encoding == ERLANG_TUPLE) {
149 		ei_x_encode_atom(ebuf, "json");
150 	} else {
151 		ei_x_encode_atom(ebuf, "map");
152 	}
153 	ei_encode_json(ebuf, msg->JObj);
154 
155 	kazoo_message_destroy(&msg);
156 
157 	return 1;
158 }
159 
160 /*
161  * event_handler is duplicated when there are 2+ nodes connected
162  * with the same bindings
163  * we should maintain a list of event_streams in event_binding struct
164  * and build a ref count in the message
165  *
166  */
event_handler(switch_event_t * event)167 static void event_handler(switch_event_t *event) {
168 	ei_event_binding_t *event_binding = (ei_event_binding_t *) event->bind_user_data;
169 	ei_event_stream_t *event_stream = event_binding->stream;
170 	ei_x_buff *ebuf = NULL;
171 	int res = 0;
172 
173 	/* if mod_kazoo or the event stream isn't running dont push a new event */
174 	if (!switch_test_flag(event_stream, LFLAG_RUNNING) || !switch_test_flag(&kazoo_globals, LFLAG_RUNNING)) {
175 		return;
176 	}
177 
178 	kz_event_decode(event);
179 
180 	switch_malloc(ebuf, sizeof(*ebuf));
181 	if(ebuf == NULL) {
182 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not allocate erlang buffer for mod_kazoo message\n");
183 		return;
184 	}
185 	memset(ebuf, 0, sizeof(*ebuf));
186 
187 	if(kazoo_globals.event_stream_preallocate > 0) {
188 		ebuf->buff = malloc(kazoo_globals.event_stream_preallocate);
189 		ebuf->buffsz = kazoo_globals.event_stream_preallocate;
190 		ebuf->index = 0;
191 		if(ebuf->buff == NULL) {
192 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not pre-allocate memory for mod_kazoo message\n");
193 			switch_safe_free(ebuf);
194 			return;
195 		}
196 	} else {
197 		ei_x_new(ebuf);
198 	}
199 
200 	ebuf->index = MAX_FRAMING;
201 
202 	ei_x_encode_version(ebuf);
203 
204 	switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Target-Node", event_binding->stream->node->peer_nodename);
205 	switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Switch-Nodename", kazoo_globals.ei_cnode.thisnodename);
206 
207 	if(event_stream->node->legacy) {
208 		res = encode_event_old(event, ebuf);
209 	} else {
210 		res = encode_event_new(event, ebuf);
211 	}
212 
213 	switch_event_del_header(event, "Switch-Nodename");
214 	switch_event_del_header(event, "Target-Node");
215 
216 	if(!res) {
217 		ei_x_free(ebuf);
218 		switch_safe_free(ebuf);
219 		return;
220 	}
221 
222 	if (kazoo_globals.event_stream_preallocate > 0 && ebuf->buffsz > kazoo_globals.event_stream_preallocate) {
223 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased event stream buffer size to %d\n", ebuf->buffsz);
224 	}
225 
226 	if (switch_queue_trypush(event_stream->queue, ebuf) != SWITCH_STATUS_SUCCESS) {
227 			/* if we couldn't place the cloned event into the listeners */
228 			/* event queue make sure we destroy it, real good like */
229 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error placing the event in the listeners queue\n");
230 		ei_x_free(ebuf);
231 		switch_safe_free(ebuf);
232 	}
233 
234 }
235 
event_stream_loop(switch_thread_t * thread,void * obj)236 static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void *obj) {
237     ei_event_stream_t *event_stream = (ei_event_stream_t *) obj;
238 	ei_event_binding_t *event_binding;
239 	switch_sockaddr_t *sa;
240 	uint16_t port;
241     char ipbuf[48];
242     const char *ip_addr;
243 	void *pop;
244 	short event_stream_framing;
245 	short event_stream_keepalive;
246 	short ok = 1;
247 
248 	switch_atomic_inc(&kazoo_globals.threads);
249 
250 	switch_assert(event_stream != NULL);
251 
252 	event_stream_framing = event_stream->event_stream_framing;
253 	event_stream_keepalive = event_stream->event_stream_keepalive;
254 
255 	/* figure out what socket we just opened */
256 	switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
257 	port = switch_sockaddr_get_port(sa);
258     ip_addr = switch_get_addr(ipbuf, sizeof(ipbuf), sa);
259 
260 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting erlang event stream %p on %s:%u for %s <%d.%d.%d>\n"
261 					  ,(void *)event_stream, ip_addr, port, event_stream->pid.node, event_stream->pid.creation
262 					  ,event_stream->pid.num, event_stream->pid.serial);
263 
264 	while (switch_test_flag(event_stream, LFLAG_RUNNING) && switch_test_flag(&kazoo_globals, LFLAG_RUNNING) && ok) {
265 		const switch_pollfd_t *fds;
266 		int32_t numfds;
267 
268 		/* check if a new connection is pending */
269 		if (switch_pollset_poll(event_stream->pollset, 0, &numfds, &fds) == SWITCH_STATUS_SUCCESS) {
270 			int32_t i;
271 			for (i = 0; i < numfds; i++) {
272 				switch_socket_t *newsocket;
273 
274 				/* accept the new client connection */
275 				if (switch_socket_accept(&newsocket, event_stream->acceptor, event_stream->pool) == SWITCH_STATUS_SUCCESS) {
276 					switch_sockaddr_t *sa;
277 
278                     if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) {
279                         switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
280                     }
281 
282                     if (event_stream_keepalive) {
283                     	if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) {
284                     		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n");
285                     	}
286                     }
287 
288                     if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
289                         switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
290                     }
291 
292 					/* close the current client, if there is one */
293 					close_socket(&event_stream->socket);
294 
295 					switch_mutex_lock(event_stream->socket_mutex);
296 					/* start sending to the new client */
297 					event_stream->socket = newsocket;
298 
299 					switch_socket_addr_get(&sa, SWITCH_TRUE, newsocket);
300 					event_stream->remote_port = switch_sockaddr_get_port(sa);
301 					switch_get_addr(event_stream->remote_ip, sizeof (event_stream->remote_ip), sa);
302 
303 					switch_socket_addr_get(&sa, SWITCH_FALSE, newsocket);
304 					event_stream->local_port = switch_sockaddr_get_port(sa);
305 					switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
306 
307 					event_stream->connected = SWITCH_TRUE;
308 					event_stream->connected_time = switch_micro_time_now();
309 
310 					switch_mutex_unlock(event_stream->socket_mutex);
311 
312 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
313 				}
314 			}
315 		}
316 
317 		/* if there was an event waiting in our queue send it to the client */
318 		if (ei_queue_pop(event_stream->queue, &pop, event_stream->queue_timeout) == SWITCH_STATUS_SUCCESS) {
319 			ei_x_buff *ebuf = (ei_x_buff *) pop;
320 
321 			if (event_stream->socket) {
322 				switch_size_t size = 1, expected = 0;
323 				switch_status_t status = SWITCH_STATUS_SUCCESS;
324 
325 				if(ebuf->index >= pow(2, 8 * event_stream_framing)) {
326 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sending frame size %d with insufficient frame capacity, change event_stream_framing here and tcp_packet_type in ecallmgr\n", ebuf->index);
327 				} else {
328 					if(event_stream_framing) {
329 						int index = ebuf->index - MAX_FRAMING;
330 						char byte;
331 						short i = event_stream_framing;
332 						while (i) {
333 							byte = index >> (8 * --i);
334 							ebuf->buff[MAX_FRAMING - i - 1] = byte;
335 						}
336 					}
337 					expected = size = (switch_size_t)ebuf->index - MAX_FRAMING + event_stream_framing;
338 					if((status = switch_socket_send(event_stream->socket, ebuf->buff + (MAX_FRAMING - event_stream_framing), &size)) != SWITCH_STATUS_SUCCESS) {
339 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error %d sending event stream\n", status);
340 						ok = 0;
341 					} else if(expected != size) {
342 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error sending event stream, sent bytes is different of expected\n");
343 						ok = 0;
344 					}
345 				}
346 			}
347 
348 			ei_x_free(ebuf);
349 			switch_safe_free(ebuf);
350 		}
351 	}
352 
353 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutting down erlang event stream %p\n", (void *)event_stream);
354 
355 	/* unbind from the system events */
356 	event_binding = event_stream->bindings;
357 	while(event_binding != NULL) {
358 		switch_event_unbind(&event_binding->node);
359 		event_binding = event_binding->next;
360 	}
361 	event_stream->bindings = NULL;
362 
363 	/* clear and destroy any remaining queued events */
364 	while (switch_queue_trypop(event_stream->queue, &pop) == SWITCH_STATUS_SUCCESS) {
365 		ei_x_buff *ebuf = (ei_x_buff *) pop;
366 		ei_x_free(ebuf);
367 		switch_safe_free(ebuf);
368 	}
369 
370 	/* remove the acceptor pollset */
371 	switch_pollset_remove(event_stream->pollset, event_stream->pollfd);
372 
373 	/* close any open sockets */
374 	close_socket(&event_stream->acceptor);
375 
376 	switch_mutex_lock(event_stream->socket_mutex);
377 	event_stream->connected = SWITCH_FALSE;
378 	close_socket(&event_stream->socket);
379 	switch_mutex_unlock(event_stream->socket_mutex);
380 
381 	switch_mutex_destroy(event_stream->socket_mutex);
382 
383 	/* clean up the memory */
384 	switch_core_destroy_memory_pool(&event_stream->pool);
385 
386 	switch_atomic_dec(&kazoo_globals.threads);
387 
388 	return NULL;
389 }
390 
new_event_stream(ei_node_t * ei_node,const erlang_pid * from)391 ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from) {
392 	switch_thread_t *thread;
393 	switch_threadattr_t *thd_attr = NULL;
394 	switch_memory_pool_t *pool = NULL;
395 	ei_event_stream_t *event_stream;
396 	ei_event_stream_t **event_streams = &ei_node->event_streams;
397 
398 	/* create memory pool for this event stream */
399 	if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
400 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory: How many Alzheimer's patients does it take to screw in a light bulb? To get to the other side.\n");
401 		return NULL;
402 	}
403 
404 	/* from the memory pool, allocate the event stream structure */
405 	if (!(event_stream = switch_core_alloc(pool, sizeof (*event_stream)))) {
406 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory: I may have Alzheimers but at least I dont have Alzheimers.\n");
407 		goto cleanup;
408 	}
409 
410 	/* prepare the event stream */
411 	memset(event_stream, 0, sizeof(*event_stream));
412 	event_stream->bindings = NULL;
413 	event_stream->pool = pool;
414 	event_stream->connected = SWITCH_FALSE;
415 	event_stream->node = ei_node;
416 	event_stream->event_stream_framing = ei_node->event_stream_framing;
417 	event_stream->event_stream_keepalive = ei_node->event_stream_keepalive;
418 	event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
419 	memcpy(&event_stream->pid, from, sizeof(erlang_pid));
420 	switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
421 
422 	/* create a socket for accepting the event stream client */
423     if (!(event_stream->acceptor = create_socket(pool))) {
424 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Like car accidents, most hardware problems are due to driver error.\n");
425 		goto cleanup;
426     }
427 
428 	if (switch_socket_opt_set(event_stream->acceptor, SWITCH_SO_NONBLOCK, TRUE)) {
429 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Hey, it compiles!\n");
430 		goto cleanup;
431 	}
432 
433 	/* create a pollset so we can efficiently check for new client connections */
434 	if (switch_pollset_create(&event_stream->pollset, 1000, pool, 0) != SWITCH_STATUS_SUCCESS) {
435 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "My software never has bugs. It just develops random features.\n");
436 		goto cleanup;
437 	}
438 
439 	switch_socket_create_pollfd(&event_stream->pollfd, event_stream->acceptor, SWITCH_POLLIN | SWITCH_POLLERR, NULL, pool);
440 	if (switch_pollset_add(event_stream->pollset, event_stream->pollfd) != SWITCH_STATUS_SUCCESS) {
441 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "If you saw a heat wave, would you wave back?\n");
442 		goto cleanup;
443 	}
444 
445 	switch_mutex_init(&event_stream->socket_mutex, SWITCH_MUTEX_DEFAULT, pool);
446 
447 	/* add the new event stream to the link list
448 	 * since the event streams link list is only
449 	 * accessed from the same thread no locks
450 	 * are required */
451 	if (!*event_streams) {
452 		*event_streams = event_stream;
453 	} else {
454 		event_stream->next = *event_streams;
455 		*event_streams = event_stream;
456 	}
457 
458 	/* when we start we are running */
459 	switch_set_flag(event_stream, LFLAG_RUNNING);
460 
461 	switch_threadattr_create(&thd_attr, event_stream->pool);
462 	switch_threadattr_detach_set(thd_attr, 1);
463 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
464 	switch_thread_create(&thread, thd_attr, event_stream_loop, event_stream, event_stream->pool);
465 
466 	return event_stream;
467 
468 cleanup:
469 
470 	if (event_stream) {
471 		/* remove the acceptor pollset */
472 		if (event_stream->pollset) {
473 			switch_pollset_remove(event_stream->pollset, event_stream->pollfd);
474 		}
475 
476 		/* close any open sockets */
477 		if (event_stream->acceptor) {
478 			close_socket(&event_stream->acceptor);
479 		}
480 	}
481 
482 	/* clean up the memory */
483 	switch_core_destroy_memory_pool(&pool);
484 
485     return NULL;
486 
487 }
488 
get_stream_port(const ei_event_stream_t * event_stream)489 unsigned long get_stream_port(const ei_event_stream_t *event_stream) {
490 	switch_sockaddr_t *sa;
491 	switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
492 	return (unsigned long) switch_sockaddr_get_port(sa);
493 }
494 
find_event_stream(ei_event_stream_t * event_stream,const erlang_pid * from)495 ei_event_stream_t *find_event_stream(ei_event_stream_t *event_stream, const erlang_pid *from) {
496 	while (event_stream != NULL) {
497 		if (ei_compare_pids(&event_stream->pid, from) == SWITCH_STATUS_SUCCESS) {
498 			return event_stream;
499 		}
500 		event_stream = event_stream->next;
501 	}
502 
503 	return NULL;
504 }
505 
remove_event_stream(ei_event_stream_t ** event_streams,const erlang_pid * from)506 switch_status_t remove_event_stream(ei_event_stream_t **event_streams, const erlang_pid *from) {
507 	ei_event_stream_t *event_stream, *prev = NULL;
508 	int found = 0;
509 
510 	/* if there are no event bindings there is nothing to do */
511 	if (!*event_streams) {
512 		return SWITCH_STATUS_SUCCESS;
513 	}
514 
515 	/* try to find the event stream for the client process */
516 	event_stream = *event_streams;
517 	while(event_stream != NULL) {
518 		if (ei_compare_pids(&event_stream->pid, from) == SWITCH_STATUS_SUCCESS) {
519 			found = 1;
520 			break;
521 		}
522 
523 		prev = event_stream;
524 		event_stream = event_stream->next;
525 	}
526 
527 	if (found) {
528 		/* if we found an event stream remove it from
529 		 * from the link list */
530 		if (!prev) {
531 			*event_streams = event_stream->next;
532 		} else {
533 			prev->next = event_stream->next;
534 		}
535 
536 		/* stop the event stream thread */
537 		switch_clear_flag(event_stream, LFLAG_RUNNING);
538 	}
539 
540 	return SWITCH_STATUS_SUCCESS;
541 }
542 
remove_event_streams(ei_event_stream_t ** event_streams)543 switch_status_t remove_event_streams(ei_event_stream_t **event_streams) {
544 	ei_event_stream_t *event_stream = *event_streams;
545 
546 	while(event_stream != NULL) {
547 		/* stop the event bindings publisher thread */
548 		switch_clear_flag(event_stream, LFLAG_RUNNING);
549 
550 		event_stream = event_stream->next;
551 	}
552 
553 	*event_streams = NULL;
554 
555 	return SWITCH_STATUS_SUCCESS;
556 }
557 
bind_event_profile(ei_event_binding_t * event_binding,kazoo_event_ptr event)558 void bind_event_profile(ei_event_binding_t *event_binding, kazoo_event_ptr event)
559 {
560 	switch_event_types_t event_type;
561 	while(event != NULL) {
562 	   	if (switch_name_event(event->name, &event_type) != SWITCH_STATUS_SUCCESS) {
563 	   		event_type = SWITCH_EVENT_CUSTOM;
564 	   	}
565 	   	if(event_binding->type != SWITCH_EVENT_CUSTOM
566 				&& event_binding->type == event_type) {
567 				break;
568 		}
569 		if (event_binding->type == SWITCH_EVENT_CUSTOM
570 				&& event_binding->type == event_type
571 				&& !strcasecmp(event_binding->subclass_name, event->name)) {
572 				break;
573 		}
574 		event = event->next;
575 	}
576 	event_binding->event = event;
577 	if(event == NULL && (!event_binding->stream->node->legacy)) {
578 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "event binding to an event without profile in non legacy mode => %s - %s\n",switch_event_name(event_binding->type), event_binding->subclass_name);
579 	}
580 }
581 
bind_event_profiles(kazoo_event_ptr event)582 void bind_event_profiles(kazoo_event_ptr event)
583 {
584 	ei_node_t *ei_node = kazoo_globals.ei_nodes;
585 	while(ei_node) {
586 		ei_event_stream_t *event_streams = ei_node->event_streams;
587 		while(event_streams) {
588 			ei_event_binding_t *bindings = event_streams->bindings;
589 			while(bindings) {
590 				bind_event_profile(bindings, event);
591 				bindings = bindings->next;
592 			}
593 			event_streams = event_streams->next;
594 		}
595 		ei_node = ei_node->next;
596 	}
597 }
598 
add_event_binding(ei_event_stream_t * event_stream,const char * event_name)599 switch_status_t add_event_binding(ei_event_stream_t *event_stream, const char *event_name) {
600 	ei_event_binding_t *event_binding = event_stream->bindings;
601 	switch_event_types_t event_type;
602 
603 	if(!strcasecmp(event_name, "CUSTOM")) {
604 		return SWITCH_STATUS_SUCCESS;
605 	}
606 
607    	if (switch_name_event(event_name, &event_type) != SWITCH_STATUS_SUCCESS) {
608    		event_type = SWITCH_EVENT_CUSTOM;
609    	}
610 
611 	/* check if the event binding already exists, ignore if so */
612 	while(event_binding != NULL) {
613 		if (event_binding->type == SWITCH_EVENT_CUSTOM) {
614 			if(event_type == SWITCH_EVENT_CUSTOM
615 				&& event_name
616 				&& event_binding->subclass_name
617 				&& !strcasecmp(event_name, event_binding->subclass_name)) {
618 					return SWITCH_STATUS_SUCCESS;
619 				}
620 		} else if (event_binding->type == event_type) {
621 			return SWITCH_STATUS_SUCCESS;
622 		}
623 		event_binding = event_binding->next;
624 	}
625 
626 	/* from the event stream memory pool, allocate the event binding structure */
627 	if (!(event_binding = switch_core_alloc(event_stream->pool, sizeof (*event_binding)))) {
628 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of random-access memory, attempting write-only memory\n");
629 		return SWITCH_STATUS_FALSE;
630 	}
631 
632 	/* prepare the event binding struct */
633 	event_binding->stream = event_stream;
634 	event_binding->type = event_type;
635 	if(event_binding->type == SWITCH_EVENT_CUSTOM) {
636 		event_binding->subclass_name = switch_core_strdup(event_stream->pool, event_name);
637 	} else {
638 		event_binding->subclass_name = SWITCH_EVENT_SUBCLASS_ANY;
639 	}
640 	event_binding->next = NULL;
641 
642 	bind_event_profile(event_binding, kazoo_globals.events->events);
643 
644 
645 	/* bind to the event with a unique ID and capture the event_node pointer */
646 	switch_uuid_str(event_binding->id, sizeof(event_binding->id));
647 	if (switch_event_bind_removable(event_binding->id, event_type, event_binding->subclass_name, event_handler, event_binding, &event_binding->node) != SWITCH_STATUS_SUCCESS) {
648 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to bind to event %s %s!\n"
649 						  ,switch_event_name(event_binding->type), event_binding->subclass_name ? event_binding->subclass_name : "");
650 		return SWITCH_STATUS_GENERR;
651 	}
652 
653 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding event binding %s to stream %p for %s <%d.%d.%d>: %s %s\n"
654 					  ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
655 					  ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
656 					  ,event_binding->subclass_name ? event_binding->subclass_name : "");
657 
658 	/* add the new binding to the list */
659 	if (!event_stream->bindings) {
660 		event_stream->bindings = event_binding;
661 	} else {
662 		event_binding->next = event_stream->bindings;
663 		event_stream->bindings = event_binding;
664 	}
665 
666 	return SWITCH_STATUS_SUCCESS;
667 }
668 
remove_event_binding(ei_event_stream_t * event_stream,const switch_event_types_t event_type,const char * subclass_name)669 switch_status_t remove_event_binding(ei_event_stream_t *event_stream, const switch_event_types_t event_type, const char *subclass_name) {
670 	ei_event_binding_t *event_binding = event_stream->bindings, *prev = NULL;
671 	int found = 0;
672 
673 	/* if there are no bindings then there is nothing to do */
674 	if (!event_binding) {
675 		return SWITCH_STATUS_SUCCESS;
676 	}
677 
678 	/* try to find the event binding specified */
679 	while(event_binding != NULL) {
680 		if (event_binding->type == SWITCH_EVENT_CUSTOM
681 			&& subclass_name
682 			&& event_binding->subclass_name
683 			&& !strcmp(subclass_name, event_binding->subclass_name)) {
684 			found = 1;
685 			break;
686 		} else if (event_binding->type == event_type) {
687 			found = 1;
688 			break;
689 		}
690 
691 		prev = event_binding;
692 		event_binding = event_binding->next;
693 	}
694 
695 	if (found) {
696 		/* if the event binding exists, unbind from the system */
697 		switch_event_unbind(&event_binding->node);
698 
699 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing event binding %s from %p for %s <%d.%d.%d>: %s %s\n"
700 						  ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
701 						  ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
702 						  ,event_binding->subclass_name ? event_binding->subclass_name : "");
703 
704 		/* remove the event binding from the list */
705 		if (!prev) {
706 			event_stream->bindings = event_binding->next;
707 		} else {
708 			prev->next = event_binding->next;
709 		}
710 	}
711 
712 	return SWITCH_STATUS_SUCCESS;
713 }
714 
remove_event_bindings(ei_event_stream_t * event_stream)715 switch_status_t remove_event_bindings(ei_event_stream_t *event_stream) {
716 	ei_event_binding_t *event_binding = event_stream->bindings;
717 
718 	/* if there are no bindings then there is nothing to do */
719 	if (!event_binding) {
720 		return SWITCH_STATUS_SUCCESS;
721 	}
722 
723 	/* try to find the event binding specified */
724 	while(event_binding != NULL) {
725 		/* if the event binding exists, unbind from the system */
726 		switch_event_unbind(&event_binding->node);
727 
728 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Removing event binding %s from %p for %s <%d.%d.%d>: %s %s\n"
729 						  ,event_binding->id, (void *)event_stream, event_stream->pid.node, event_stream->pid.creation
730 						  ,event_stream->pid.num, event_stream->pid.serial, switch_event_name(event_binding->type)
731 						  ,event_binding->subclass_name ? event_binding->subclass_name : "");
732 
733 		event_binding = event_binding->next;
734 	}
735 
736 	event_stream->bindings = NULL;
737 
738 	return SWITCH_STATUS_SUCCESS;
739 }
740 
741 /* For Emacs:
742  * Local Variables:
743  * mode:c
744  * indent-tabs-mode:t
745  * tab-width:4
746  * c-basic-offset:4
747  * End:
748  * For VIM:
749  * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
750  */
751