1 /*
2 * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2014, Grasshopper
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is Grasshopper
20 * Portions created by the Initial Developer are Copyright (C)
21 * the Initial Developer. All Rights Reserved.
22 *
23 * Contributor(s):
24 * Chris Rienzo <chris.rienzo@grasshopper.com>
25 *
26 * rayo_cpa_component.c -- input component "cpa" mode implementation
27 */
28 #include <switch.h>
29
30 #include "rayo_cpa_component.h"
31 #include "mod_rayo.h"
32 #include "rayo_components.h"
33 #include "rayo_cpa_detector.h"
34
35 /**
36 * Module globals
37 */
38 static struct {
39 /** signal subscribers */
40 switch_hash_t *subscribers;
41 /** synchronizes access to subscribers */
42 switch_mutex_t *subscribers_mutex;
43 /** module pool */
44 switch_memory_pool_t *pool;
45 } globals;
46
47 /**
48 * A CPA signal monitored by this component
49 */
50 struct cpa_signal {
51 /** name of this signal */
52 const char *name;
53 /** true if signal causes component termination */
54 int terminate;
55 };
56
57 /**
58 * CPA component state
59 */
60 struct cpa_component {
61 /** component base class */
62 struct rayo_component base;
63 /** true if ready to forward detector events */
64 int ready;
65 /** signals this component wants */
66 switch_hash_t *signals;
67 };
68
69 #define CPA_COMPONENT(x) ((struct cpa_component *)x)
70
71 typedef void (* subscriber_execute_fn)(const char *jid, void *user_data);
72
73 /**
74 * Request signals
75 */
subscribe(const char * uuid,const char * signal_type,const char * jid)76 static void subscribe(const char *uuid, const char *signal_type, const char *jid)
77 {
78 char *key = switch_mprintf("%s:%s", uuid, signal_type);
79 switch_mutex_lock(globals.subscribers_mutex);
80 {
81 switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
82 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Subscribe %s => %s\n", signal_type, jid);
83 if (!signal_subscribers) {
84 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Create %s subscriber hash\n", signal_type);
85 switch_core_hash_init(&signal_subscribers);
86 switch_core_hash_insert(globals.subscribers, key, signal_subscribers);
87 }
88 switch_core_hash_insert(signal_subscribers, jid, "1");
89 }
90 switch_mutex_unlock(globals.subscribers_mutex);
91 switch_safe_free(key);
92 }
93
94 /**
95 * Stop receiving signals
96 */
unsubscribe(const char * uuid,const char * signal_type,const char * jid)97 static void unsubscribe(const char *uuid, const char *signal_type, const char *jid)
98 {
99 char *key = switch_mprintf("%s:%s", uuid, signal_type);
100 switch_mutex_lock(globals.subscribers_mutex);
101 {
102 switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
103 if (signal_subscribers) {
104 switch_core_hash_delete(signal_subscribers, jid);
105 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Unsubscribe %s => %s\n", signal_type, jid);
106
107 /* clean up hash if empty */
108 if (switch_core_hash_empty(signal_subscribers)) {
109 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Destroy %s subscriber hash\n", signal_type);
110 switch_core_hash_destroy(&signal_subscribers);
111 switch_core_hash_delete(globals.subscribers, key);
112 }
113 }
114 }
115 switch_mutex_unlock(globals.subscribers_mutex);
116 switch_safe_free(key);
117 }
118
119 /**
120 * Execute function for each subscriber
121 */
subscriber_execute(const char * uuid,const char * signal_type,subscriber_execute_fn callback,void * user_data)122 static void subscriber_execute(const char *uuid, const char *signal_type, subscriber_execute_fn callback, void *user_data)
123 {
124 switch_event_t *subscriber_list = NULL;
125 switch_event_header_t *subscriber = NULL;
126
127 /* fetch list of subscribers */
128 char *key = switch_mprintf("%s:%s", uuid, signal_type);
129 switch_event_create_subclass(&subscriber_list, SWITCH_EVENT_CLONE, NULL);
130 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "Subscriber execute %s\n", signal_type);
131 switch_mutex_lock(globals.subscribers_mutex);
132 {
133 switch_hash_index_t *hi = NULL;
134 switch_hash_t *signal_subscribers = switch_core_hash_find(globals.subscribers, key);
135 if (signal_subscribers) {
136 for (hi = switch_core_hash_first(signal_subscribers); hi; hi = switch_core_hash_next(&hi)) {
137 const void *jid;
138 void *dont_care;
139 switch_core_hash_this(hi, &jid, NULL, &dont_care);
140 switch_event_add_header_string(subscriber_list, SWITCH_STACK_BOTTOM, "execute", (const char *)jid);
141 }
142 } else {
143 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "No subscribers for %s\n", signal_type);
144 }
145 }
146 switch_mutex_unlock(globals.subscribers_mutex);
147 switch_safe_free(key);
148
149 /* execute function for each subscriber */
150 for (subscriber = subscriber_list->headers; subscriber; subscriber = subscriber->next) {
151 callback(subscriber->value, user_data);
152 }
153
154 switch_event_destroy(&subscriber_list);
155 }
156
157 /**
158 * Stop all CPA detectors
159 */
stop_cpa_detectors(struct cpa_component * cpa)160 static void stop_cpa_detectors(struct cpa_component *cpa)
161 {
162 if (cpa->signals) {
163 switch_hash_index_t *hi = NULL;
164 for (hi = switch_core_hash_first(cpa->signals); hi; hi = switch_core_hash_next(&hi)) {
165 const void *signal_type;
166 void *cpa_signal = NULL;
167 switch_core_hash_this(hi, &signal_type, NULL, &cpa_signal);
168 if (cpa_signal) {
169 rayo_cpa_detector_stop(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name);
170 unsubscribe(RAYO_ACTOR(cpa)->parent->id, ((struct cpa_signal *)cpa_signal)->name, RAYO_JID(cpa));
171 }
172 }
173 switch_core_hash_destroy(&cpa->signals);
174 cpa->signals = NULL;
175 }
176 unsubscribe(RAYO_ACTOR(cpa)->parent->id, "hangup", RAYO_JID(cpa));
177 }
178
179 /**
180 * Stop execution of CPA component
181 */
stop_cpa_component(struct rayo_actor * component,struct rayo_message * msg,void * data)182 static iks *stop_cpa_component(struct rayo_actor *component, struct rayo_message *msg, void *data)
183 {
184 stop_cpa_detectors(CPA_COMPONENT(component));
185 rayo_component_send_complete(RAYO_COMPONENT(component), COMPONENT_COMPLETE_STOP);
186 return iks_new_iq_result(msg->payload);
187 }
188
189 /**
190 * Forward CPA signal to client
191 */
rayo_cpa_detector_event(const char * jid,void * user_data)192 static void rayo_cpa_detector_event(const char *jid, void *user_data)
193 {
194 struct rayo_actor *component = RAYO_LOCATE(jid);
195 if (component) {
196 if (CPA_COMPONENT(component)->ready) {
197 switch_event_t *event = (switch_event_t *)user_data;
198 const char *signal_type = switch_event_get_header(event, "signal-type");
199 struct cpa_signal *cpa_signal = switch_core_hash_find(CPA_COMPONENT(component)->signals, signal_type);
200 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Handling CPA event\n");
201 if (cpa_signal) {
202 const char *value = switch_event_get_header(event, "value");
203 const char *duration = switch_event_get_header(event, "duration");
204 if (cpa_signal->terminate) {
205 iks *complete_event;
206 iks *signal_xml;
207
208 stop_cpa_detectors(CPA_COMPONENT(component));
209
210 /* send complete event to client */
211 complete_event = rayo_component_create_complete_event(RAYO_COMPONENT(component), "signal", RAYO_CPA_NS);
212 signal_xml = iks_find(complete_event, "complete");
213 signal_xml = iks_find(signal_xml, "signal");
214 iks_insert_attrib(signal_xml, "type", signal_type);
215 if (!zstr(value)) {
216 iks_insert_attrib(signal_xml, "value", value);
217 }
218 if (!zstr(duration)) {
219 iks_insert_attrib(signal_xml, "duration", duration);
220 }
221 rayo_component_send_complete_event(RAYO_COMPONENT(component), complete_event);
222 } else {
223 /* send event to client */
224 iks *signal_event = iks_new_presence("signal", RAYO_CPA_NS, RAYO_JID(component), RAYO_COMPONENT(component)->client_jid);
225 iks *signal_xml = iks_find(signal_event, "signal");
226 iks_insert_attrib(signal_xml, "type", signal_type);
227 if (!zstr(value)) {
228 iks_insert_attrib(signal_xml, "value", value);
229 }
230 if (!zstr(duration)) {
231 iks_insert_attrib(signal_xml, "duration", duration);
232 }
233 RAYO_SEND_REPLY(component, RAYO_COMPONENT(component)->client_jid, signal_event);
234 }
235 }
236 } else {
237 switch_log_printf(SWITCH_CHANNEL_UUID_LOG(component->parent->id), SWITCH_LOG_DEBUG, "Skipping CPA event\n");
238 }
239 RAYO_RELEASE(component);
240 }
241 }
242
243 /**
244 * Handle CPA signal-type event
245 */
on_rayo_cpa_detector_event(switch_event_t * event)246 static void on_rayo_cpa_detector_event(switch_event_t *event)
247 {
248 subscriber_execute(switch_event_get_header(event, "Unique-ID"), switch_event_get_header(event, "signal-type"), rayo_cpa_detector_event, event);
249 }
250
251 /**
252 * Handle CPA completion because of hangup
253 */
rayo_cpa_component_hangup(const char * jid,void * user_data)254 static void rayo_cpa_component_hangup(const char *jid, void *user_data)
255 {
256 struct rayo_actor *component = RAYO_LOCATE(jid);
257 if (component) {
258 stop_cpa_detectors(CPA_COMPONENT(component));
259 rayo_component_send_complete(RAYO_COMPONENT(component), COMPONENT_COMPLETE_HANGUP);
260 RAYO_RELEASE(component);
261 }
262 }
263
264 /**
265 * Handle hungup call event
266 */
on_channel_hangup_complete_event(switch_event_t * event)267 static void on_channel_hangup_complete_event(switch_event_t *event)
268 {
269 subscriber_execute(switch_event_get_header(event, "Unique-ID"), "hangup", rayo_cpa_component_hangup, event);
270 }
271
272 /**
273 * Start CPA
274 */
rayo_cpa_component_start(struct rayo_actor * call,struct rayo_message * msg,void * session_data)275 iks *rayo_cpa_component_start(struct rayo_actor *call, struct rayo_message *msg, void *session_data)
276 {
277 iks *iq = msg->payload;
278 switch_core_session_t *session = (switch_core_session_t *)session_data;
279 iks *input = iks_find(iq, "input");
280 switch_memory_pool_t *pool = NULL;
281 struct cpa_component *component = NULL;
282 int have_grammar = 0;
283 iks *grammar = NULL;
284
285 /* create CPA component */
286 switch_core_new_memory_pool(&pool);
287 component = switch_core_alloc(pool, sizeof(*component));
288 component = CPA_COMPONENT(rayo_component_init((struct rayo_component *)component, pool, RAT_CALL_COMPONENT, "cpa", NULL, call, iks_find_attrib(iq, "from")));
289 if (!component) {
290 switch_core_destroy_memory_pool(&pool);
291 return iks_new_error_detailed(iq, STANZA_ERROR_INTERNAL_SERVER_ERROR, "Failed to create CPA entity");
292 }
293
294 switch_core_hash_init(&component->signals);
295
296 /* start CPA detectors */
297 for (grammar = iks_find(input, "grammar"); grammar; grammar = iks_next_tag(grammar)) {
298 if (!strcmp("grammar", iks_name(grammar))) {
299 const char *error_str = "";
300 const char *url = iks_find_attrib_soft(grammar, "url");
301 char *url_dup;
302 char *url_params;
303
304 if (zstr(url)) {
305 stop_cpa_detectors(component);
306 RAYO_RELEASE(component);
307 RAYO_DESTROY(component);
308 return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "Missing grammar URL");
309 }
310 have_grammar = 1;
311
312 url_dup = strdup(url);
313 switch_assert(url_dup);
314 if ((url_params = strchr(url_dup, '?'))) {
315 *url_params = '\0';
316 url_params++;
317 }
318
319 if (switch_core_hash_find(component->signals, url)) {
320 free(url_dup);
321 stop_cpa_detectors(component);
322 RAYO_RELEASE(component);
323 RAYO_DESTROY(component);
324 return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "Duplicate URL");
325 }
326
327 /* start detector */
328 /* TODO return better reasons... */
329 if (rayo_cpa_detector_start(switch_core_session_get_uuid(session), url_dup, &error_str)) {
330 struct cpa_signal *cpa_signal = switch_core_alloc(pool, sizeof(*cpa_signal));
331 cpa_signal->terminate = !zstr(url_params) && strstr(url_params, "terminate=true");
332 cpa_signal->name = switch_core_strdup(pool, url_dup);
333 switch_core_hash_insert(component->signals, cpa_signal->name, cpa_signal);
334 subscribe(switch_core_session_get_uuid(session), cpa_signal->name, RAYO_JID(component));
335 } else {
336 free(url_dup);
337 stop_cpa_detectors(component);
338 RAYO_RELEASE(component);
339 RAYO_DESTROY(component);
340 return iks_new_error_detailed(iq, STANZA_ERROR_INTERNAL_SERVER_ERROR, error_str);
341 }
342
343 free(url_dup);
344 }
345 }
346
347 if (!have_grammar) {
348 stop_cpa_detectors(component);
349 RAYO_RELEASE(component);
350 RAYO_DESTROY(component);
351 return iks_new_error_detailed(iq, STANZA_ERROR_BAD_REQUEST, "No grammar defined");
352 }
353
354 /* acknowledge command */
355 rayo_component_send_start(RAYO_COMPONENT(component), iq);
356
357 /* TODO hangup race condition */
358 subscribe(switch_core_session_get_uuid(session), "hangup", RAYO_JID(component));
359
360 /* ready to forward detector events */
361 component->ready = 1;
362
363 return NULL;
364 }
365
366 /**
367 * Load input CPA
368 * @param module_interface
369 * @param pool memory pool
370 * @param config_file
371 * @return SWITCH_STATUS_SUCCESS if successfully loaded
372 */
rayo_cpa_component_load(switch_loadable_module_interface_t ** module_interface,switch_memory_pool_t * pool,const char * config_file)373 switch_status_t rayo_cpa_component_load(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool, const char *config_file)
374 {
375 rayo_actor_command_handler_add(RAT_CALL_COMPONENT, "cpa", "set:"RAYO_EXT_NS":stop", stop_cpa_component);
376 switch_event_bind("rayo_cpa_component", SWITCH_EVENT_CUSTOM, "rayo::cpa", on_rayo_cpa_detector_event, NULL);
377 switch_event_bind("rayo_cpa_component", SWITCH_EVENT_CHANNEL_HANGUP_COMPLETE, NULL, on_channel_hangup_complete_event, NULL);
378
379 globals.pool = pool;
380 switch_core_hash_init(&globals.subscribers);
381 switch_mutex_init(&globals.subscribers_mutex, SWITCH_MUTEX_NESTED, pool);
382
383 return rayo_cpa_detector_load(module_interface, pool, config_file);
384 }
385
386 /**
387 * Stop input CPA
388 */
rayo_cpa_component_shutdown(void)389 void rayo_cpa_component_shutdown(void)
390 {
391 switch_event_unbind_callback(on_rayo_cpa_detector_event);
392 switch_event_unbind_callback(on_channel_hangup_complete_event);
393 rayo_cpa_detector_shutdown();
394 if (globals.subscribers) {
395 switch_core_hash_destroy(&globals.subscribers);
396 }
397 }
398
399 /* For Emacs:
400 * Local Variables:
401 * mode:c
402 * indent-tabs-mode:t
403 * tab-width:4
404 * c-basic-offset:4
405 * End:
406 * For VIM:
407 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
408 */
409