1 /*
2  * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2014, Grasshopper
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is Grasshopper
20  * Portions created by the Initial Developer are Copyright (C)
21  * the Initial Developer. All Rights Reserved.
22  *
23  * Contributor(s):
24  * Chris Rienzo <chris.rienzo@grasshopper.com>
25  *
26  * rayo_cpa_component.c -- input component "cpa" mode implementation
27  */
28 #include <switch.h>
29 
30 #include "rayo_cpa_component.h"
31 #include "mod_rayo.h"
32 #include "rayo_components.h"
33 #include "rayo_cpa_detector.h"
34 
35 /**
36  * Module globals
37  */
38 static struct {
39 	/** signal subscribers */
40 	switch_hash_t *subscribers;
41 	/** synchronizes access to subscribers */
42 	switch_mutex_t *subscribers_mutex;
43 	/** module pool */
44 	switch_memory_pool_t *pool;
45 } globals;
46 
47 /**
48  * A CPA signal monitored by this component
49  */
50 struct cpa_signal {
51 	/** name of this signal */
52 	const char *name;
53 	/** true if signal causes component termination */
54 	int terminate;
55 };
56 
57 /**
58  * CPA component state
59  */
60 struct cpa_component {
61 	/** component base class */
62 	struct rayo_component base;
63 	/** true if ready to forward detector events */
64 	int ready;
65 	/** signals this component wants */
66 	switch_hash_t *signals;
67 };
68 
69 #define CPA_COMPONENT(x) ((struct cpa_component *)x)
70 
71 typedef void (* subscriber_execute_fn)(const char *jid, void *user_data);
72 
73 /**
74  * Request signals
75  */
subscribe(const char * uuid,const char * signal_type,const char * jid)76 static void subscribe(const char *uuid, const char *signal_type, const char *jid)
77 {
78 	char *key = switch_mprintf("%s:%s", uuid, signal_type);
79 	switch_mutex_lock(globals.subscribers_mutex);
80 	{
81 		switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
82 		switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Subscribe %s => %s\n", signal_type, jid);
83 		if (!signal_subscribers) {
84 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Create %s subscriber hash\n", signal_type);
85 			switch_core_hash_init(&signal_subscribers);
86 			switch_core_hash_insert(globals.subscribers, key, signal_subscribers);
87 		}
88 		switch_core_hash_insert(signal_subscribers, jid, "1");
89 	}
90 	switch_mutex_unlock(globals.subscribers_mutex);
91 	switch_safe_free(key);
92 }
93 
94 /**
95  * Stop receiving signals
96  */
unsubscribe(const char * uuid,const char * signal_type,const char * jid)97 static void unsubscribe(const char *uuid, const char *signal_type, const char *jid)
98 {
99 	char *key = switch_mprintf("%s:%s", uuid, signal_type);
100 	switch_mutex_lock(globals.subscribers_mutex);
101 	{
102 		switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
103 		if (signal_subscribers) {
104 			switch_core_hash_delete(signal_subscribers, jid);
105 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Unsubscribe %s => %s\n", signal_type, jid);
106 
107 			/* clean up hash if empty */
108 			if (switch_core_hash_empty(signal_subscribers)) {
109 				switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Destroy %s subscriber hash\n", signal_type);
110 				switch_core_hash_destroy(&signal_subscribers);
111 				switch_core_hash_delete(globals.subscribers, key);
112 			}
113 		}
114 	}
115 	switch_mutex_unlock(globals.subscribers_mutex);
116 	switch_safe_free(key);
117 }
118 
119 /**
120  * Execute function for each subscriber
121  */
subscriber_execute(const char * uuid,const char * signal_type,subscriber_execute_fn callback,void * user_data)122 static void subscriber_execute(const char *uuid, const char *signal_type, subscriber_execute_fn callback, void *user_data)
123 {
124 	switch_event_t *subscriber_list = NULL;
125 	switch_event_header_t *subscriber = NULL;
126 
127 	/* fetch list of subscribers */
128 	char *key = switch_mprintf("%s:%s", uuid, signal_type);
129 	switch_event_create_subclass(&subscriber_list, SWITCH_EVENT_CLONE, NULL);
130 	switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Subscriber execute %s\n", signal_type);
131 	switch_mutex_lock(globals.subscribers_mutex);
132 	{
133 		switch_hash_index_t *hi = NULL;
134 		switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
135 		if (signal_subscribers) {
136 			for (hi = switch_core_hash_first(signal_subscribers); hi; hi = switch_core_hash_next(&hi)) {
137 				const void *jid;
138 				void *dont_care;
139 				switch_core_hash_this(hi, &jid, NULL, &dont_care);
140 				switch_event_add_header_string(subscriber_list, SWITCH_STACK_BOTTOM, "execute", (const char *)jid);
141 			}
142 		} else {
143 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "No subscribers for %s\n", signal_type);
144 		}
145 	}
146 	switch_mutex_unlock(globals.subscribers_mutex);
147 	switch_safe_free(key);
148 
149 	/* execute function for each subscriber */
150 	for (subscriber = subscriber_list->headers; subscriber; subscriber = subscriber->next) {
151 		callback(subscriber->value, user_data);
152 	}
153 
154 	switch_event_destroy(&subscriber_list);
155 }
156 
157 /**
158  * Stop all CPA detectors
159  */
stop_cpa_detectors(struct cpa_component * cpa)160 static void stop_cpa_detectors(struct cpa_component *cpa)
161 {
162 	if (cpa->signals) {
163 		switch_hash_index_t *hi = NULL;
164 		for (hi = switch_core_hash_first(cpa->signals); hi; hi = switch_core_hash_next(&hi)) {
165 			const void *signal_type;
166 			void *cpa_signal = NULL;
167 			switch_core_hash_this(hi, &signal_type, NULL, &cpa_signal);
168 			if (cpa_signal) {
169 				rayo_cpa_detector_stop(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name);
170 				unsubscribe(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name, RAYO_JID(cpa));
171 			}
172 		}
173 		switch_core_hash_destroy(&cpa->signals);
174 		cpa->signals = NULL;
175 	}
176 	unsubscribe(RAYO_ACTOR(cpa)->parent->id, "hangup", RAYO_JID(cpa));
177 }
178 
179 /**
180  * Stop execution of CPA component
181  */
stop_cpa_component(struct rayo_actor * component,struct rayo_message * msg,void * data)182 static iks *stop_cpa_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
183 {
184 	stop_cpa_detectors(CPA_COMPONENT(component));
185 	rayo_component_send_complete(RAYO_COMPONENT(component), COMPONENT_COMPLETE_STOP);
186 	return iks_new_iq_result(msg->payload);
187 }
188 
189 /**
190  * Forward CPA signal to client
191  */
rayo_cpa_detector_event(const char * jid,void * user_data)192 static void rayo_cpa_detector_event(const char *jid, void *user_data)
193 {
194 	struct rayo_actor *component = RAYO_LOCATE(jid);
195 	if (component) {
196 		if (CPA_COMPONENT(component)->ready) {
197 			switch_event_t *event = (switch_event_t *)user_data;
198 			const char *signal_type = switch_event_get_header(event, "signal-type");
199 			struct cpa_signal *cpa_signal = switch_core_hash_find(CPA_COMPONENT(component)->signals, signal_type);
200 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Handling CPA event\n");
201 			if (cpa_signal) {
202 				const char *value = switch_event_get_header(event, "value");
203 				const char *duration = switch_event_get_header(event, "duration");
204 				if (cpa_signal->terminate) {
205 					iks *complete_event;
206 					iks *signal_xml;
207 
208 					stop_cpa_detectors(CPA_COMPONENT(component));
209 
210 					/* send complete event to client */
211 					complete_event = rayo_component_create_complete_event(RAYO_COMPONENT(component), "signal", RAYO_CPA_NS);
212 					signal_xml = iks_find(complete_event, "complete");
213 					signal_xml = iks_find(signal_xml, "signal");
214 					iks_insert_attrib(signal_xml, "type", signal_type);
215 					if (!zstr(value)) {
216 						iks_insert_attrib(signal_xml, "value", value);
217 					}
218 					if (!zstr(duration)) {
219 						iks_insert_attrib(signal_xml, "duration", duration);
220 					}
221 					rayo_component_send_complete_event(RAYO_COMPONENT(component), complete_event);
222 				} else {
223 					/* send event to client */
224 					iks *signal_event = iks_new_presence("signal", RAYO_CPA_NS, RAYO_JID(component), RAYO_COMPONENT(component)->client_jid);
225 					iks *signal_xml = iks_find(signal_event, "signal");
226 					iks_insert_attrib(signal_xml, "type", signal_type);
227 					if (!zstr(value)) {
228 						iks_insert_attrib(signal_xml, "value", value);
229 					}
230 					if (!zstr(duration)) {
231 						iks_insert_attrib(signal_xml, "duration", duration);
232 					}
233 					RAYO_SEND_REPLY(component, RAYO_COMPONENT(component)->client_jid, signal_event);
234 				}
235 			}
236 		} else {
237 			switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Skipping CPA event\n");
238 		}
239 		RAYO_RELEASE(component);
240 	}
241 }
242 
243 /**
244  * Handle CPA signal-type event
245  */
on_rayo_cpa_detector_event(switch_event_t * event)246 static void on_rayo_cpa_detector_event(switch_event_t *event)
247 {
248 	subscriber_execute(switch_event_get_header(event, "Unique-ID"), switch_event_get_header(event, "signal-type"), rayo_cpa_detector_event, event);
249 }
250 
251 /**
252  * Handle CPA completion because of hangup
253  */
rayo_cpa_component_hangup(const char * jid,void * user_data)254 static void rayo_cpa_component_hangup(const char *jid, void *user_data)
255 {
256 	struct rayo_actor *component = RAYO_LOCATE(jid);
257 	if (component) {
258 		stop_cpa_detectors(CPA_COMPONENT(component));
259 		rayo_component_send_complete(RAYO_COMPONENT(component), COMPONENT_COMPLETE_HANGUP);
260 		RAYO_RELEASE(component);
261 	}
262 }
263 
264 /**
265  * Handle hungup call event
266  */
on_channel_hangup_complete_event(switch_event_t * event)267 static void on_channel_hangup_complete_event(switch_event_t *event)
268 {
269 	subscriber_execute(switch_event_get_header(event, "Unique-ID"), "hangup", rayo_cpa_component_hangup, event);
270 }
271 
272 /**
273  * Start CPA
274  */
rayo_cpa_component_start(struct rayo_actor * call,struct rayo_message * msg,void * session_data)275 iks *rayo_cpa_component_start(struct rayo_actor *call, struct rayo_message *msg, void *session_data)
276 {
277 	iks *iq = msg->payload;
278 	switch_core_session_t *session = (switch_core_session_t *)session_data;
279 	iks *input = iks_find(iq, "input");
280 	switch_memory_pool_t *pool = NULL;
281 	struct cpa_component *component = NULL;
282 	int have_grammar = 0;
283 	iks *grammar = NULL;
284 
285 	/* create CPA component */
286 	switch_core_new_memory_pool(&pool);
287 	component = switch_core_alloc(pool, sizeof(*component));
288 	component = CPA_COMPONENT(rayo_component_init((struct rayo_component *)component, pool, RAT_CALL_COMPONENT, "cpa", NULL, call, iks_find_attrib(iq, "from")));
289 	if (!component) {
290 		switch_core_destroy_memory_pool(&pool);
291 		return iks_new_error_detailed(iq, STANZA_ERROR_INTERNAL_SERVER_ERROR, "Failed to create CPA entity");
292 	}
293 
294 	switch_core_hash_init(&component->signals);
295 
296 	/* start CPA detectors */
297 	for (grammar = iks_find(input, "grammar"); grammar; grammar = iks_next_tag(grammar)) {
298 		if (!strcmp("grammar", iks_name(grammar))) {
299 			const char *error_str = "";
300 			const char *url = iks_find_attrib_soft(grammar, "url");
301 			char *url_dup;
302 			char *url_params;
303 
304 			if (zstr(url)) {
305 				stop_cpa_detectors(component);
306 				RAYO_RELEASE(component);
307 				RAYO_DESTROY(component);
308 				return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "Missing grammar URL");
309 			}
310 			have_grammar = 1;
311 
312 			url_dup = strdup(url);
313 			switch_assert(url_dup);
314 			if ((url_params = strchr(url_dup, '?'))) {
315 				*url_params = '\0';
316 				url_params++;
317 			}
318 
319 			if (switch_core_hash_find(component->signals, url)) {
320 				free(url_dup);
321 				stop_cpa_detectors(component);
322 				RAYO_RELEASE(component);
323 				RAYO_DESTROY(component);
324 				return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "Duplicate URL");
325 			}
326 
327 			/* start detector */
328 			/* TODO return better reasons... */
329 			if (rayo_cpa_detector_start(switch_core_session_get_uuid(session), url_dup, &error_str)) {
330 				struct cpa_signal *cpa_signal = switch_core_alloc(pool, sizeof(*cpa_signal));
331 				cpa_signal->terminate = !zstr(url_params) && strstr(url_params, "terminate=true");
332 				cpa_signal->name = switch_core_strdup(pool, url_dup);
333 				switch_core_hash_insert(component->signals, cpa_signal->name, cpa_signal);
334 				subscribe(switch_core_session_get_uuid(session), cpa_signal->name, RAYO_JID(component));
335 			} else {
336 				free(url_dup);
337 				stop_cpa_detectors(component);
338 				RAYO_RELEASE(component);
339 				RAYO_DESTROY(component);
340 				return iks_new_error_detailed(iq, STANZA_ERROR_INTERNAL_SERVER_ERROR, error_str);
341 			}
342 
343 			free(url_dup);
344 		}
345 	}
346 
347 	if (!have_grammar) {
348 		stop_cpa_detectors(component);
349 		RAYO_RELEASE(component);
350 		RAYO_DESTROY(component);
351 		return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "No grammar defined");
352 	}
353 
354 	/* acknowledge command */
355 	rayo_component_send_start(RAYO_COMPONENT(component), iq);
356 
357 	/* TODO hangup race condition */
358 	subscribe(switch_core_session_get_uuid(session), "hangup", RAYO_JID(component));
359 
360 	/* ready to forward detector events */
361 	component->ready = 1;
362 
363 	return NULL;
364 }
365 
366 /**
367  * Load input CPA
368  * @param module_interface
369  * @param pool memory pool
370  * @param config_file
371  * @return SWITCH_STATUS_SUCCESS if successfully loaded
372  */
rayo_cpa_component_load(switch_loadable_module_interface_t ** module_interface,switch_memory_pool_t * pool,const char * config_file)373 switch_status_t rayo_cpa_component_load(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool, const char *config_file)
374 {
375 	rayo_actor_command_handler_add(RAT_CALL_COMPONENT, "cpa", "set:"RAYO_EXT_NS":stop", stop_cpa_component);
376 	switch_event_bind("rayo_cpa_component", SWITCH_EVENT_CUSTOM, "rayo::cpa", on_rayo_cpa_detector_event, NULL);
377 	switch_event_bind("rayo_cpa_component", SWITCH_EVENT_CHANNEL_HANGUP_COMPLETE, NULL, on_channel_hangup_complete_event, NULL);
378 
379 	globals.pool = pool;
380 	switch_core_hash_init(&globals.subscribers);
381 	switch_mutex_init(&globals.subscribers_mutex, SWITCH_MUTEX_NESTED, pool);
382 
383 	return rayo_cpa_detector_load(module_interface, pool, config_file);
384 }
385 
386 /**
387  * Stop input CPA
388  */
rayo_cpa_component_shutdown(void)389 void rayo_cpa_component_shutdown(void)
390 {
391 	switch_event_unbind_callback(on_rayo_cpa_detector_event);
392 	switch_event_unbind_callback(on_channel_hangup_complete_event);
393 	rayo_cpa_detector_shutdown();
394 	if (globals.subscribers) {
395 		switch_core_hash_destroy(&globals.subscribers);
396 	}
397 }
398 
399 /* For Emacs:
400  * Local Variables:
401  * mode:c
402  * indent-tabs-mode:t
403  * tab-width:4
404  * c-basic-offset:4
405  * End:
406  * For VIM:
407  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
408  */
409