1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Contributor(s):
25 *
26 * Anthony Minessale II <anthm@freeswitch.org>
27 * Travis Cross <tc@traviscross.com>
28 *
29 * mod_fifo.c -- FIFO
30 *
31 */
32 #include <switch.h>
33 #define FIFO_APP_KEY "mod_fifo"
34
35 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown);
36 SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load);
37 SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
38
39 /*!\file
40 * # Theory of operation
41 *
42 * ## Kinds of things
43 *
44 * The fifo systems deals in various kinds of things: /fifos nodes/,
45 * /queues/, /(inbound) callers/, /outbound callers/, /consumers/, and
46 * /(outbound) members/.
47 *
48 * /fifo nodes/ accept callers and work to deliver those callers to
49 * consumers and members. The nodes contain an array of /queues/
50 * indexed by a priority value.
51 *
52 * /queues/ contain an array of callers treated as a list.
53 *
54 * /callers/ are the channels placed into a fifo node's queue for
55 * eventual delivery to consumers and members.
56 *
57 * /outbound callers/ are persons waiting to be called back via a
58 * dial string.
59 *
60 * /consumers/ are channels for agents who have dialed in to one or
61 * more fifos and will have callers connected to them.
62 *
63 * /members/ are agents who we'll place calls to via a dial string to
64 * attempt to deliver callers.
65 *
66 * An /agent/ may refer to either a /consumer/ or an /member/.
67 *
68 * ## Outbound Strategies
69 *
70 * An outbound strategy defines the way in which we attempt to deliver
71 * callers to members.
72 *
73 * The default strategy, /ringall/, preserves the caller ID of the
74 * caller being delivered. Because this means we must choose a caller
75 * before we place the call to the member, this impacts the order in
76 * which calls are delivered and the rate at which we can deliver
77 * those calls.
78 *
79 * The /enterprise/ outbound strategy does not preserve the caller ID
80 * of the caller thereby allowing deliver of callers to agents at the
81 * fastest possible rate.
82 *
83 * outbound_per_cycle is used to define the maximum number of agents
84 * who will be available to answer a single caller. In ringall this
85 * maximum is the number who will be called, in enterprise the need defines
86 * how many agents will be called. outbound_per_cycle_min will define
87 * the minimum agents who will be called to answer a caller regardless of
88 * need, giving the enterprise strategy the ability to ring through more
89 * than one agent for one caller.
90
91 *
92 * ## Manual calls
93 *
94 * The fifo system provides a way to prevent members on non-fifo calls
95 * from receiving a call from the fifo system. We do this by tracking
96 * non-fifo calls in a special fifo named `manual_calls`. When
97 * creating a channel for an agent we set the channel variable
98 * `fifo_outbound_uuid` to an arbitrary unique value <= 32 characters
99 * for that agent, then call `fifo_track_call`. For the corresponding
100 * member we must also set `{fifo_outbound_uuid=}` to the same value.
101 * We expect the value of `fifo_outbound_uuid` to be the MD5 hash of
102 * the unique ID. Values longer than 32 characters will cause the
103 * mechanism to fail to work as expected.
104 *
105 * ## Importance
106 *
107 * Importance is a value 0-9 that can be associated with a fifo. The
108 * default value is 0. If the fifo is being dynamically created the
109 * importance of the fifo can be set when calling the `fifo`
110 * application. If the fifo already exists the importance value
111 * passed to the fifo application will be ignored.
112 */
113
114 #define MANUAL_QUEUE_NAME "manual_calls"
115 #define FIFO_EVENT "fifo::info"
116
117 static switch_status_t load_config(int reload, int del_all);
118 #define MAX_PRI 10
119
120 typedef enum {
121 NODE_STRATEGY_INVALID = -1,
122 NODE_STRATEGY_RINGALL = 0,
123 NODE_STRATEGY_ENTERPRISE
124 } outbound_strategy_t;
125
126 /*!\struct fifo_queue_t
127 * \brief Queue of callers
128 *
129 * Callers are placed into a queue as events in `data` which is an
130 * array of such events. The array size is hard-coded as 1000
131 * elements.
132 *
133 * Fifo nodes are composed of an array of these queues representing
134 * each priority level of the fifo.
135 */
136 typedef struct {
137 int nelm;
138 int idx;
139 switch_event_t **data;
140 switch_memory_pool_t *pool;
141 switch_mutex_t *mutex;
142 } fifo_queue_t;
143
144 typedef enum {
145 FIFO_APP_BRIDGE_TAG = (1 << 0),
146 FIFO_APP_TRACKING = (1 << 1),
147 FIFO_APP_DID_HOOK = (1 << 2)
148 } fifo_app_flag_t;
149
150 static int check_caller_outbound_call(const char *key);
151 static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
152 static void del_caller_outbound_call(const char *key);
153 static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause);
154 static int check_consumer_outbound_call(const char *key);
155 static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
156 static void del_consumer_outbound_call(const char *key);
157 static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
158
159 static int check_bridge_call(const char *key);
160 static void add_bridge_call(const char *key);
161 static void del_bridge_call(const char *key);
162
fifo_queue_create(fifo_queue_t ** queue,int size,switch_memory_pool_t * pool)163 switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
164 {
165 fifo_queue_t *q;
166
167 q = switch_core_alloc(pool, sizeof(*q));
168 q->pool = pool;
169 q->nelm = size - 1;
170 q->data = switch_core_alloc(pool, size * sizeof(switch_event_t *));
171 switch_mutex_init(&q->mutex, SWITCH_MUTEX_NESTED, pool);
172
173 *queue = q;
174
175 return SWITCH_STATUS_SUCCESS;
176 }
177
change_pos(switch_event_t * event,int pos)178 static void change_pos(switch_event_t *event, int pos)
179 {
180 const char *uuid = switch_event_get_header(event, "unique-id");
181 switch_core_session_t *session;
182 switch_channel_t *channel;
183 char tmp[30] = "";
184
185 if (zstr(uuid)) return;
186 if (!(session = switch_core_session_locate(uuid))) {
187 return;
188 }
189 channel = switch_core_session_get_channel(session);
190 switch_snprintf(tmp, sizeof(tmp), "%d", pos);
191 switch_channel_set_variable(channel, "fifo_position", tmp);
192 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp);
193 switch_core_session_rwunlock(session);
194 }
195
fifo_queue_push(fifo_queue_t * queue,switch_event_t * ptr)196 static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
197 {
198 switch_mutex_lock(queue->mutex);
199 if (queue->idx == queue->nelm) {
200 switch_mutex_unlock(queue->mutex);
201 return SWITCH_STATUS_FALSE;
202 }
203 queue->data[queue->idx++] = ptr;
204 switch_mutex_unlock(queue->mutex);
205 return SWITCH_STATUS_SUCCESS;
206 }
207
fifo_queue_size(fifo_queue_t * queue)208 static int fifo_queue_size(fifo_queue_t *queue)
209 {
210 int s;
211 switch_mutex_lock(queue->mutex);
212 s = queue->idx;
213 switch_mutex_unlock(queue->mutex);
214 return s;
215 }
216
217 /*!
218 * \param remove Whether to remove the popped event from the queue
219 * If remove is 0, do not remove the popped event. If it is 1,
220 * remove it if it is not an event for an outbound caller. If it is
221 * 2, always remove it.
222 */
fifo_queue_pop(fifo_queue_t * queue,switch_event_t ** pop,int remove)223 static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
224 {
225 int i, j;
226
227 switch_mutex_lock(queue->mutex);
228
229 if (queue->idx == 0) {
230 switch_mutex_unlock(queue->mutex);
231 return SWITCH_STATUS_FALSE;
232 }
233
234 for (j = 0; j < queue->idx; j++) {
235 const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
236 if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) {
237 if (remove) {
238 *pop = queue->data[j];
239 } else {
240 switch_event_dup(pop, queue->data[j]);
241 }
242 break;
243 }
244 }
245
246 if (j == queue->idx) {
247 switch_mutex_unlock(queue->mutex);
248 return SWITCH_STATUS_FALSE;
249 }
250
251 if (remove) {
252 for (i = j+1; i < queue->idx; i++) {
253 queue->data[i-1] = queue->data[i];
254 queue->data[i] = NULL;
255 change_pos(queue->data[i-1], i);
256 }
257
258 queue->idx--;
259 }
260
261 switch_mutex_unlock(queue->mutex);
262 return SWITCH_STATUS_SUCCESS;
263 }
264
265 /*!\brief Remove matching event from queue
266 *
267 * Each event in the queue will be checked to see whether it has a
268 * header equal to name whose value is equal to val. If it does, that
269 * event will be returned unless the event is for an outbound caller.
270 * If name starts with '+' or remove == 2 then forcing is enabled and
271 * the event will be returned in any case. If remove > 0 then the
272 * returned event will be removed from the queue and the remaining
273 * elements shifted to make them contiguous.
274 */
fifo_queue_pop_nameval(fifo_queue_t * queue,const char * name,const char * val,switch_event_t ** pop,int remove)275 static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
276 {
277 int i, j, force = 0;
278
279 switch_mutex_lock(queue->mutex);
280
281 if (name && *name == '+') {
282 name++;
283 force = 1;
284 }
285
286 if (remove == 2) {
287 force = 1;
288 }
289
290 if (queue->idx == 0 || zstr(name) || zstr(val)) {
291 switch_mutex_unlock(queue->mutex);
292 return SWITCH_STATUS_FALSE;
293 }
294
295 for (j = 0; j < queue->idx; j++) {
296 const char *j_val = switch_event_get_header(queue->data[j], name);
297 const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
298 if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
299 if (remove) {
300 *pop = queue->data[j];
301 } else {
302 switch_event_dup(pop, queue->data[j]);
303 }
304 break;
305 }
306 }
307
308 if (j == queue->idx) {
309 switch_mutex_unlock(queue->mutex);
310 return SWITCH_STATUS_FALSE;
311 }
312
313 if (remove) {
314 for (i = j+1; i < queue->idx; i++) {
315 queue->data[i-1] = queue->data[i];
316 queue->data[i] = NULL;
317 change_pos(queue->data[i-1], i);
318 }
319
320 queue->idx--;
321 }
322
323 switch_mutex_unlock(queue->mutex);
324
325 return SWITCH_STATUS_SUCCESS;
326 }
327
328 /*!\brief Destroy event with given uuid and remove it from queue
329 *
330 * Elements of the queue are searched until a matching uuid is found.
331 * That uuid is then destroyed and removed from the queue. The
332 * remaining elements are shifted to make them contiguous.
333 */
fifo_queue_popfly(fifo_queue_t * queue,const char * uuid)334 static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
335 {
336 int i, j;
337
338 switch_mutex_lock(queue->mutex);
339
340 if (queue->idx == 0 || zstr(uuid)) {
341 switch_mutex_unlock(queue->mutex);
342 return SWITCH_STATUS_FALSE;
343 }
344
345 for (j = 0; j < queue->idx; j++) {
346 const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
347 if (j_uuid && !strcmp(j_uuid, uuid)) {
348 switch_event_destroy(&queue->data[j]);
349 break;
350 }
351 }
352
353 if (j == queue->idx) {
354 switch_mutex_unlock(queue->mutex);
355 return SWITCH_STATUS_FALSE;
356 }
357
358 for (i = j+1; i < queue->idx; i++) {
359 queue->data[i-1] = queue->data[i];
360 queue->data[i] = NULL;
361 change_pos(queue->data[i-1], i);
362 }
363
364 queue->idx--;
365
366 switch_mutex_unlock(queue->mutex);
367
368 return SWITCH_STATUS_SUCCESS;
369 }
370
371 /*!\struct fifo_node
372 *
373 * \var fifo_node::outbound_name
374 * \brief Name of fifo in caller ID
375 *
376 * For the ringall strategy, this value is a prefix for the
377 * caller ID shown to agents. If the value starts with '=' then the
378 * actual caller ID is replaced completely.
379 *
380 * For the enterprise strategy, this value is used instead of the
381 * queue name in the caller ID.
382 */
383 struct fifo_node {
384 char *name;
385 switch_mutex_t *mutex;
386 switch_mutex_t *update_mutex;
387 fifo_queue_t *fifo_list[MAX_PRI];
388 switch_hash_t *consumer_hash;
389 int outbound_priority;
390 int caller_count;
391 int consumer_count;
392 int ring_consumer_count;
393 int member_count;
394 switch_time_t start_waiting;
395 uint32_t importance;
396 switch_thread_rwlock_t *rwlock;
397 switch_memory_pool_t *pool;
398 int has_outbound;
399 int ready;
400 long busy;
401 int is_static;
402 int outbound_per_cycle;
403 int outbound_per_cycle_min;
404 char *outbound_name;
405 outbound_strategy_t outbound_strategy;
406 int ring_timeout;
407 int default_lag;
408 char *domain_name;
409 int retry_delay;
410 struct fifo_node *next;
411 };
412
413 typedef struct fifo_node fifo_node_t;
414
415 static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session);
416 static void fifo_caller_del(const char *uuid);
417
418 struct callback {
419 char *buf;
420 size_t len;
421 int matches;
422 };
423 typedef struct callback callback_t;
424
print_strategy(outbound_strategy_t s)425 static const char *print_strategy(outbound_strategy_t s)
426 {
427 switch (s) {
428 case NODE_STRATEGY_RINGALL:
429 return "ringall";
430 case NODE_STRATEGY_ENTERPRISE:
431 return "enterprise";
432 default:
433 break;
434 }
435
436 return "invalid";
437 }
438
parse_strategy(const char * name)439 static outbound_strategy_t parse_strategy(const char *name)
440 {
441 if (!strcasecmp(name, "ringall")) {
442 return NODE_STRATEGY_RINGALL;
443 }
444
445 if (!strcasecmp(name, "enterprise")) {
446 return NODE_STRATEGY_ENTERPRISE;
447 }
448
449 return NODE_STRATEGY_INVALID;
450 }
451
sql2str_callback(void * pArg,int argc,char ** argv,char ** columnNames)452 static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
453 {
454 callback_t *cbt = (callback_t *) pArg;
455
456 switch_copy_string(cbt->buf, argv[0], cbt->len);
457 cbt->matches++;
458 return 0;
459 }
460
461 /*!\brief Handler for consumer DTMF
462 *
463 * When `fifo_consumer_exit_key` is pressed by the consumer we hangup
464 * on the caller (unless we've put the caller on hold). The default
465 * exit key is '*'.
466 *
467 * When the consumer presses '0' we put both legs on hold and play
468 * hold music as follows. To the caller we play `fifo_music` or the
469 * default hold music for the channel. To the consumer we play
470 * `fifo_hold_music`, or `fifo_music`, or the default hold music for
471 * the channel. The consumer can press '0' again to pick up the
472 * caller from hold.
473 */
on_dtmf(switch_core_session_t * session,void * input,switch_input_type_t itype,void * buf,unsigned int buflen)474 static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
475 {
476 switch_core_session_t *bleg = (switch_core_session_t *) buf;
477
478 switch (itype) {
479 case SWITCH_INPUT_TYPE_DTMF:
480 {
481 switch_dtmf_t *dtmf = (switch_dtmf_t *) input;
482 switch_channel_t *bchan = switch_core_session_get_channel(bleg);
483 switch_channel_t *channel = switch_core_session_get_channel(session);
484
485 if (switch_channel_test_flag(switch_core_session_get_channel(session), CF_BRIDGE_ORIGINATOR)) {
486 const char *consumer_exit_key = switch_channel_get_variable(channel, "fifo_consumer_exit_key");
487 if (!consumer_exit_key) consumer_exit_key = "*";
488 if (dtmf->digit == *consumer_exit_key) {
489 switch_channel_hangup(bchan, SWITCH_CAUSE_NORMAL_CLEARING);
490 return SWITCH_STATUS_BREAK;
491 } else if (dtmf->digit == '0') {
492 const char *moh_a = NULL, *moh_b = NULL;
493
494 if (!(moh_b = switch_channel_get_variable(bchan, "fifo_music"))) {
495 moh_b = switch_channel_get_hold_music(bchan);
496 }
497
498 if (!(moh_a = switch_channel_get_variable(channel, "fifo_hold_music"))) {
499 if (!(moh_a = switch_channel_get_variable(channel, "fifo_music"))) {
500 moh_a = switch_channel_get_hold_music(channel);
501 }
502 }
503
504 switch_ivr_soft_hold(session, "0", moh_a, moh_b);
505 return SWITCH_STATUS_IGNORE;
506 }
507 }
508 }
509 break;
510 default:
511 break;
512 }
513
514 return SWITCH_STATUS_SUCCESS;
515 }
516
517 /*!\brief Handler for caller DTMF
518 *
519 * The channel variable `fifo_caller_exit_key` can be set to one or
520 * more digits that when pressed will cause the caller to exit from
521 * the fifo. We'll return via a single character in `buf` the digit
522 * that was pressed (not null-terminated).
523 */
moh_on_dtmf(switch_core_session_t * session,void * input,switch_input_type_t itype,void * buf,unsigned int buflen)524 static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
525 {
526 switch (itype) {
527 case SWITCH_INPUT_TYPE_DTMF:
528 {
529 switch_dtmf_t *dtmf = (switch_dtmf_t *) input;
530 switch_channel_t *channel = switch_core_session_get_channel(session);
531 const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
532
533 if (caller_exit_key && dtmf->digit && strchr(caller_exit_key, dtmf->digit)) {
534 char *bp = buf;
535 *bp = dtmf->digit;
536 return SWITCH_STATUS_BREAK;
537 }
538 }
539 break;
540 default:
541 break;
542 }
543
544 return SWITCH_STATUS_SUCCESS;
545 }
546
cleanup_fifo_arg(const char ** s)547 static inline void cleanup_fifo_arg(const char **s) {
548 if (!zstr(*s) && !strcasecmp(*s, "undef")) *s = NULL;
549 }
550
node_caller_count(fifo_node_t * node)551 static int node_caller_count(fifo_node_t *node)
552 {
553 int i, len = 0;
554
555 for (i = 0; i < MAX_PRI; i++) {
556 len += fifo_queue_size(node->fifo_list[i]);
557 }
558
559 return len;
560 }
561
node_remove_uuid(fifo_node_t * node,const char * uuid)562 static void node_remove_uuid(fifo_node_t *node, const char *uuid)
563 {
564 int i = 0;
565
566 for (i = 0; i < MAX_PRI; i++) {
567 fifo_queue_popfly(node->fifo_list[i], uuid);
568 }
569
570 if (!node_caller_count(node)) {
571 node->start_waiting = 0;
572 }
573
574 fifo_caller_del(uuid);
575
576 return;
577 }
578
579 /*!\struct fifo_chime_data
580 *
581 * \var fifo_chime_data::list
582 * A list of strings representing things to play back to the caller
583 * while they are waiting to be connected with an agent.
584 */
585 #define MAX_CHIME 25
586 struct fifo_chime_data {
587 char *list[MAX_CHIME];
588 int total;
589 int index;
590 time_t next;
591 int freq;
592 int abort;
593 time_t orbit_timeout;
594 int do_orbit;
595 char *orbit_exten;
596 char *orbit_dialplan;
597 char *orbit_context;
598 char *exit_key;
599 };
600
601 typedef struct fifo_chime_data fifo_chime_data_t;
602
603 /*!\brief Enforce the `fifo_orbit_timeout`
604 *
605 * If the caller has been waiting longer than the `fifo_orbit_timeout`
606 * we break out so the orbit can do something else with the call.
607 */
chime_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)608 static switch_status_t chime_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
609 {
610 fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
611
612 if (cd && cd->orbit_timeout && switch_epoch_time_now(NULL) >= cd->orbit_timeout) {
613 cd->do_orbit = 1;
614 return SWITCH_STATUS_BREAK;
615 }
616
617 return SWITCH_STATUS_SUCCESS;
618 }
619
620 /*!\brief Handle chimes and timeouts for callers
621 *
622 * Play back the chimes in order spaced out by the given `freq` while
623 * ensuring that we don't exceed the `orbit_timeout`.
624 */
caller_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)625 static switch_status_t caller_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
626 {
627 fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
628
629 if (!cd) {
630 return SWITCH_STATUS_SUCCESS;
631 }
632
633 if (cd->total && switch_epoch_time_now(NULL) >= cd->next) {
634 if (cd->index == MAX_CHIME || cd->index == cd->total || !cd->list[cd->index]) {
635 cd->index = 0;
636 }
637
638 if (cd->list[cd->index]) {
639 switch_input_args_t args = { 0 };
640 char buf[25] = "";
641 switch_status_t status;
642
643 args.input_callback = moh_on_dtmf;
644 args.buf = buf;
645 args.buflen = sizeof(buf);
646 args.read_frame_callback = chime_read_frame_callback;
647 args.user_data = user_data;
648
649 status = switch_ivr_play_file(session, NULL, cd->list[cd->index], &args);
650
651 if (cd->exit_key && *buf && strchr(cd->exit_key, *buf)) {
652 cd->abort = 1;
653 return SWITCH_STATUS_BREAK;
654 }
655
656 if (status != SWITCH_STATUS_SUCCESS) {
657 return SWITCH_STATUS_BREAK;
658 }
659
660 cd->next = switch_epoch_time_now(NULL) + cd->freq;
661 cd->index++;
662 }
663 }
664
665 return chime_read_frame_callback(session, frame, user_data);
666 }
667
668 /*!\brief Handler for waiting consumers
669 *
670 * In `user_data` we'll be passed an array of fifo_nodes representing
671 * the fifos for which this consumer will accept calls. If any of
672 * those fifos have a caller in them, we break out so we can accept
673 * the call.
674 */
consumer_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)675 static switch_status_t consumer_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
676 {
677 fifo_node_t *node, **node_list = (fifo_node_t **) user_data;
678 int total = 0, i = 0;
679
680 for (i = 0;; i++) {
681 if (!(node = node_list[i])) {
682 break;
683 }
684 total += node_caller_count(node);
685 }
686
687 if (total) {
688 return SWITCH_STATUS_BREAK;
689 }
690
691 return SWITCH_STATUS_SUCCESS;
692 }
693
694 static struct {
695 switch_hash_t *caller_orig_hash;
696 switch_hash_t *consumer_orig_hash;
697 switch_hash_t *bridge_hash;
698 switch_hash_t *use_hash;
699 switch_mutex_t *use_mutex;
700 switch_mutex_t *caller_orig_mutex;
701 switch_mutex_t *consumer_orig_mutex;
702 switch_mutex_t *bridge_mutex;
703 switch_hash_t *fifo_hash;
704 switch_mutex_t *mutex;
705 switch_mutex_t *sql_mutex;
706 switch_memory_pool_t *pool;
707 int running;
708 switch_event_node_t *node;
709 char hostname[256];
710 char *dbname;
711 char odbc_dsn[1024];
712 int node_thread_running;
713 switch_odbc_handle_t *master_odbc;
714 int threads;
715 switch_thread_t *node_thread;
716 int debug;
717 struct fifo_node *nodes;
718 char *pre_trans_execute;
719 char *post_trans_execute;
720 char *inner_pre_trans_execute;
721 char *inner_post_trans_execute;
722 switch_sql_queue_manager_t *qm;
723 int allow_transcoding;
724 switch_bool_t delete_all_members_on_startup;
725 outbound_strategy_t default_strategy;
726 } globals;
727
fifo_dec_use_count(const char * outbound_id)728 static int fifo_dec_use_count(const char *outbound_id)
729 {
730 int r = 0, *count;
731
732 switch_mutex_lock(globals.use_mutex);
733 if ((count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
734 if (*count > 0) {
735 r = --(*count);
736 }
737 }
738 switch_mutex_unlock(globals.use_mutex);
739
740 return r;
741 }
742
fifo_get_use_count(const char * outbound_id)743 static int fifo_get_use_count(const char *outbound_id)
744 {
745 int r = 0, *count;
746
747 switch_mutex_lock(globals.use_mutex);
748 if ((count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
749 r = *count;
750 }
751 switch_mutex_unlock(globals.use_mutex);
752
753 return r;
754 }
755
fifo_inc_use_count(const char * outbound_id)756 static int fifo_inc_use_count(const char *outbound_id)
757 {
758 int r = 0, *count;
759
760 switch_mutex_lock(globals.use_mutex);
761 if (!(count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
762 count = switch_core_alloc(globals.pool, sizeof(int));
763 switch_core_hash_insert(globals.use_hash, outbound_id, count);
764 }
765
766 r = ++(*count);
767
768 switch_mutex_unlock(globals.use_mutex);
769
770 return r;
771 }
772
fifo_init_use_count(void)773 static void fifo_init_use_count(void)
774 {
775 switch_mutex_lock(globals.use_mutex);
776 if (globals.use_hash) {
777 switch_core_hash_destroy(&globals.use_hash);
778 }
779 switch_core_hash_init(&globals.use_hash);
780 switch_mutex_unlock(globals.use_mutex);
781 }
782
check_caller_outbound_call(const char * key)783 static int check_caller_outbound_call(const char *key)
784 {
785 int x = 0;
786
787 if (!key) return x;
788
789 switch_mutex_lock(globals.caller_orig_mutex);
790 x = !!switch_core_hash_find(globals.caller_orig_hash, key);
791 switch_mutex_unlock(globals.caller_orig_mutex);
792 return x;
793 }
794
add_caller_outbound_call(const char * key,switch_call_cause_t * cancel_cause)795 static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
796 {
797 if (!key) return;
798
799 switch_mutex_lock(globals.caller_orig_mutex);
800 switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause);
801 switch_mutex_unlock(globals.caller_orig_mutex);
802 }
803
del_caller_outbound_call(const char * key)804 static void del_caller_outbound_call(const char *key)
805 {
806 if (!key) return;
807
808 switch_mutex_lock(globals.caller_orig_mutex);
809 switch_core_hash_delete(globals.caller_orig_hash, key);
810 switch_mutex_unlock(globals.caller_orig_mutex);
811 }
812
cancel_caller_outbound_call(const char * key,switch_call_cause_t cause)813 static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause)
814 {
815 switch_call_cause_t *cancel_cause = NULL;
816
817 if (!key) return;
818
819 switch_mutex_lock(globals.caller_orig_mutex);
820 if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) {
821 *cancel_cause = cause;
822 }
823 switch_mutex_unlock(globals.caller_orig_mutex);
824 }
825
check_bridge_call(const char * key)826 static int check_bridge_call(const char *key)
827 {
828 int x = 0;
829
830 if (!key) return x;
831
832 switch_mutex_lock(globals.bridge_mutex);
833 x = !!switch_core_hash_find(globals.bridge_hash, key);
834 switch_mutex_unlock(globals.bridge_mutex);
835 return x;
836 }
837
add_bridge_call(const char * key)838 static void add_bridge_call(const char *key)
839 {
840 static int marker = 1;
841 if (!key) return;
842
843 switch_mutex_lock(globals.bridge_mutex);
844 switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker);
845 switch_mutex_unlock(globals.bridge_mutex);
846 }
847
del_bridge_call(const char * key)848 static void del_bridge_call(const char *key)
849 {
850 switch_mutex_lock(globals.bridge_mutex);
851 switch_core_hash_delete(globals.bridge_hash, key);
852 switch_mutex_unlock(globals.bridge_mutex);
853 }
854
check_consumer_outbound_call(const char * key)855 static int check_consumer_outbound_call(const char *key)
856 {
857 int x = 0;
858
859 if (!key) return x;
860
861 switch_mutex_lock(globals.consumer_orig_mutex);
862 x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
863 switch_mutex_unlock(globals.consumer_orig_mutex);
864 return x;
865 }
866
add_consumer_outbound_call(const char * key,switch_call_cause_t * cancel_cause)867 static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
868 {
869 if (!key) return;
870
871 switch_mutex_lock(globals.consumer_orig_mutex);
872 switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause);
873 switch_mutex_unlock(globals.consumer_orig_mutex);
874 }
875
del_consumer_outbound_call(const char * key)876 static void del_consumer_outbound_call(const char *key)
877 {
878 if (!key) return;
879
880 switch_mutex_lock(globals.consumer_orig_mutex);
881 switch_core_hash_delete(globals.consumer_orig_hash, key);
882 switch_mutex_unlock(globals.consumer_orig_mutex);
883 }
884
cancel_consumer_outbound_call(const char * key,switch_call_cause_t cause)885 static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause)
886 {
887 switch_call_cause_t *cancel_cause = NULL;
888
889 if (!key) return;
890
891 switch_mutex_lock(globals.consumer_orig_mutex);
892 if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) {
893 *cancel_cause = cause;
894 }
895 switch_mutex_unlock(globals.consumer_orig_mutex);
896 }
897
fifo_get_db_handle(void)898 switch_cache_db_handle_t *fifo_get_db_handle(void)
899 {
900 switch_cache_db_handle_t *dbh = NULL;
901 char *dsn;
902
903 if (!zstr(globals.odbc_dsn)) {
904 dsn = globals.odbc_dsn;
905 } else {
906 dsn = globals.dbname;
907 }
908
909 if (switch_cache_db_get_db_handle_dsn(&dbh, dsn) != SWITCH_STATUS_SUCCESS) {
910 dbh = NULL;
911 }
912
913 return dbh;
914 }
915
fifo_execute_sql_queued(char ** sqlp,switch_bool_t sql_already_dynamic,switch_bool_t block)916 static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block)
917 {
918 int index = 1;
919 char *sql;
920
921 switch_assert(sqlp && *sqlp);
922 sql = *sqlp;
923
924 if (switch_stristr("insert", sql)) {
925 index = 0;
926 }
927
928 if (block) {
929 switch_sql_queue_manager_push_confirm(globals.qm, sql, index, !sql_already_dynamic);
930 } else {
931 switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic);
932 }
933
934 if (sql_already_dynamic) {
935 *sqlp = NULL;
936 }
937
938 return SWITCH_STATUS_SUCCESS;
939 }
940 #if 0
941 static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
942 {
943 switch_cache_db_handle_t *dbh = NULL;
944 switch_status_t status = SWITCH_STATUS_FALSE;
945
946 if (mutex) {
947 switch_mutex_lock(mutex);
948 }
949
950 if (!(dbh = fifo_get_db_handle())) {
951 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
952 goto end;
953 }
954
955 if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);
956
957 status = switch_cache_db_execute_sql(dbh, sql, NULL);
958
959 end:
960
961 switch_cache_db_release_db_handle(&dbh);
962
963 if (mutex) {
964 switch_mutex_unlock(mutex);
965 }
966
967 return status;
968 }
969 #endif
970
fifo_execute_sql_callback(switch_mutex_t * mutex,char * sql,switch_core_db_callback_func_t callback,void * pdata)971 static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
972 {
973 switch_bool_t ret = SWITCH_FALSE;
974 char *errmsg = NULL;
975 switch_cache_db_handle_t *dbh = NULL;
976
977 if (mutex) {
978 switch_mutex_lock(mutex);
979 }
980
981 if (!(dbh = fifo_get_db_handle())) {
982 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
983 goto end;
984 }
985
986 if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);
987
988 switch_cache_db_execute_sql_callback(dbh, sql, callback, pdata, &errmsg);
989
990 if (errmsg) {
991 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
992 free(errmsg);
993 }
994
995 end:
996
997 switch_cache_db_release_db_handle(&dbh);
998
999 if (mutex) {
1000 switch_mutex_unlock(mutex);
1001 }
1002
1003 return ret;
1004 }
1005
create_node(const char * name,uint32_t importance,switch_mutex_t * mutex)1006 static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mutex_t *mutex)
1007 {
1008 fifo_node_t *node;
1009 int x = 0;
1010 switch_memory_pool_t *pool;
1011 char outbound_count[80] = "";
1012 callback_t cbt = { 0 };
1013 char *sql = NULL;
1014 if (!globals.running) {
1015 return NULL;
1016 }
1017
1018 switch_core_new_memory_pool(&pool);
1019
1020 node = switch_core_alloc(pool, sizeof(*node));
1021 node->pool = pool;
1022 node->outbound_strategy = globals.default_strategy;
1023 node->name = switch_core_strdup(node->pool, name);
1024
1025 if (!strchr(name, '@')) {
1026 node->domain_name = switch_core_strdup(node->pool, switch_core_get_domain(SWITCH_FALSE));
1027 }
1028
1029 for (x = 0; x < MAX_PRI; x++) {
1030 fifo_queue_create(&node->fifo_list[x], 1000, node->pool);
1031 switch_assert(node->fifo_list[x]);
1032 }
1033
1034 switch_core_hash_init(&node->consumer_hash);
1035 switch_thread_rwlock_create(&node->rwlock, node->pool);
1036 switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
1037 switch_mutex_init(&node->update_mutex, SWITCH_MUTEX_NESTED, node->pool);
1038 cbt.buf = outbound_count;
1039 cbt.len = sizeof(outbound_count);
1040 sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
1041 fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
1042 node->member_count = atoi(outbound_count);
1043 node->has_outbound = (node->member_count > 0) ? 1 : 0;
1044 switch_safe_free(sql);
1045
1046 node->importance = importance;
1047
1048 switch_mutex_lock(globals.mutex);
1049
1050 switch_core_hash_insert(globals.fifo_hash, name, node);
1051 node->next = globals.nodes;
1052 globals.nodes = node;
1053 switch_mutex_unlock(globals.mutex);
1054
1055 return node;
1056 }
1057
node_idle_consumers(fifo_node_t * node)1058 static int node_idle_consumers(fifo_node_t *node)
1059 {
1060 switch_hash_index_t *hi;
1061 void *val;
1062 const void *var;
1063 switch_core_session_t *session;
1064 switch_channel_t *channel;
1065 int total = 0;
1066
1067 switch_mutex_lock(node->mutex);
1068 for (hi = switch_core_hash_first(node->consumer_hash); hi; hi = switch_core_hash_next(&hi)) {
1069 switch_core_hash_this(hi, &var, NULL, &val);
1070 session = (switch_core_session_t *) val;
1071 channel = switch_core_session_get_channel(session);
1072 if (!switch_channel_test_flag(channel, CF_BRIDGED)) {
1073 total++;
1074 }
1075 }
1076 switch_mutex_unlock(node->mutex);
1077
1078 return total;
1079 }
1080
1081 struct call_helper {
1082 char *uuid;
1083 char *node_name;
1084 char *originate_string;
1085 int timeout;
1086 switch_memory_pool_t *pool;
1087 };
1088
1089 #define MAX_ROWS 250
1090 struct callback_helper {
1091 int need;
1092 switch_memory_pool_t *pool;
1093 struct call_helper *rows[MAX_ROWS];
1094 int rowcount;
1095 int ready;
1096 };
1097
1098 /*!\brief Handle unbridging of manually tracked calls
1099 */
do_unbridge(switch_core_session_t * consumer_session,switch_core_session_t * caller_session)1100 static void do_unbridge(switch_core_session_t *consumer_session, switch_core_session_t *caller_session)
1101 {
1102 switch_channel_t *consumer_channel = switch_core_session_get_channel(consumer_session);
1103 switch_channel_t *caller_channel = NULL;
1104
1105 if (caller_session) {
1106 caller_channel = switch_core_session_get_channel(caller_session);
1107 }
1108
1109 if (switch_channel_test_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG)) {
1110 char date[80] = "";
1111 switch_time_exp_t tm;
1112 switch_time_t ts = switch_micro_time_now();
1113 switch_size_t retsize;
1114 long epoch_start = 0, epoch_end = 0;
1115 const char *epoch_start_a = NULL;
1116 char *sql;
1117 switch_event_t *event;
1118 const char *outbound_id = NULL;
1119 int use_count = 0;
1120
1121 switch_channel_clear_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG);
1122 switch_channel_set_variable(consumer_channel, "fifo_bridged", NULL);
1123
1124 if ((outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid"))) {
1125 use_count = fifo_get_use_count(outbound_id);
1126 }
1127
1128 switch_time_exp_lt(&tm, ts);
1129 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
1130
1131 sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
1132 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1133
1134 switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
1135 switch_channel_set_variable(consumer_channel, "fifo_timestamp", date);
1136
1137 if (caller_channel) {
1138 switch_channel_set_variable(caller_channel, "fifo_status", "DONE");
1139 switch_channel_set_variable(caller_channel, "fifo_timestamp", date);
1140 }
1141
1142 if ((epoch_start_a = switch_channel_get_variable(consumer_channel, "fifo_epoch_start_bridge"))) {
1143 epoch_start = atol(epoch_start_a);
1144 }
1145
1146 epoch_end = (long)switch_epoch_time_now(NULL);
1147
1148 switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
1149 switch_channel_set_variable_printf(consumer_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
1150
1151 if (caller_channel) {
1152 switch_channel_set_variable_printf(caller_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
1153 switch_channel_set_variable_printf(caller_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
1154 }
1155
1156 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1157 switch_channel_event_set_data(consumer_channel, event);
1158 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1159 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
1160 if (outbound_id) {
1161 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
1162 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", use_count);
1163 }
1164 switch_event_fire(&event);
1165 }
1166
1167 if (caller_channel) {
1168 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1169 switch_channel_event_set_data(caller_channel, event);
1170 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1171 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop");
1172 switch_event_fire(&event);
1173 }
1174 }
1175
1176 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1177 switch_channel_event_set_data(consumer_channel, event);
1178 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1179 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop");
1180 switch_event_fire(&event);
1181 }
1182 }
1183 }
1184
1185 /*!\brief Handle session messages for manually tracked calls
1186 */
messagehook(switch_core_session_t * session,switch_core_session_message_t * msg)1187 static switch_status_t messagehook (switch_core_session_t *session, switch_core_session_message_t *msg)
1188 {
1189 switch_event_t *event;
1190 switch_core_session_t *caller_session = NULL, *consumer_session = NULL;
1191 switch_channel_t *caller_channel = NULL, *consumer_channel = NULL;
1192 const char *outbound_id;
1193 char *sql;
1194
1195 consumer_session = session;
1196 consumer_channel = switch_core_session_get_channel(consumer_session);
1197 outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
1198
1199 if (!outbound_id) return SWITCH_STATUS_SUCCESS;
1200
1201 switch (msg->message_id) {
1202 case SWITCH_MESSAGE_INDICATE_BRIDGE:
1203 case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
1204 if (msg->numeric_arg == 42) {
1205 /* See audio_bridge_thread() for source of 42 constant. */
1206 /* When a session is interrupted to execute an application
1207 (e.g. by uuid_execute) we need to tell everything in FS
1208 to unbridge the channel (e.g. to turn on the
1209 jitterbuffer) but we need mod_fifo not to see the
1210 unbridge because we don't want fifo to stop tracking
1211 the call. So this magic number is a complete hack to
1212 make this happen. So we ignore it here and simply fall
1213 through. */
1214 goto end;
1215 }
1216 if ((caller_session = switch_core_session_locate(msg->string_arg))) {
1217 caller_channel = switch_core_session_get_channel(caller_session);
1218 if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) {
1219 cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
1220 switch_core_session_soft_lock(caller_session, 5);
1221 } else {
1222 switch_core_session_soft_unlock(caller_session);
1223 }
1224 }
1225 break;
1226 case SWITCH_MESSAGE_INDICATE_DISPLAY:
1227 sql = switch_mprintf("update fifo_bridge set caller_caller_id_name='%q', caller_caller_id_number='%q' where consumer_uuid='%q'",
1228 switch_str_nil(msg->string_array_arg[0]),
1229 switch_str_nil(msg->string_array_arg[1]),
1230 switch_core_session_get_uuid(session));
1231 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1232 goto end;
1233 default:
1234 goto end;
1235 }
1236
1237 switch (msg->message_id) {
1238 case SWITCH_MESSAGE_INDICATE_BRIDGE:
1239 {
1240 long epoch_start = 0;
1241 char date[80] = "";
1242 switch_time_t ts;
1243 switch_time_exp_t tm;
1244 switch_size_t retsize;
1245 const char *ced_name, *ced_number, *cid_name, *cid_number, *outbound_id;
1246
1247 if (switch_channel_test_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG)) {
1248 goto end;
1249 }
1250
1251 switch_channel_set_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG);
1252
1253 switch_channel_set_variable(consumer_channel, "fifo_bridged", "true");
1254 switch_channel_set_variable(consumer_channel, "fifo_manual_bridge", "true");
1255 switch_channel_set_variable(consumer_channel, "fifo_role", "consumer");
1256 outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
1257
1258 if (caller_channel) {
1259 switch_channel_set_variable(caller_channel, "fifo_role", "caller");
1260 switch_process_import(consumer_session, caller_channel, "fifo_caller_consumer_import",
1261 switch_channel_get_variable(consumer_channel, "fifo_import_prefix"));
1262 switch_process_import(caller_session, consumer_channel, "fifo_consumer_caller_import",
1263 switch_channel_get_variable(caller_channel, "fifo_import_prefix"));
1264 }
1265
1266 ced_name = switch_channel_get_variable(consumer_channel, "callee_id_name");
1267 ced_number = switch_channel_get_variable(consumer_channel, "callee_id_number");
1268
1269 cid_name = switch_channel_get_variable(consumer_channel, "caller_id_name");
1270 cid_number = switch_channel_get_variable(consumer_channel, "caller_id_number");
1271
1272 if (switch_channel_direction(consumer_channel) == SWITCH_CALL_DIRECTION_INBOUND) {
1273 if (zstr(ced_name) || !strcmp(ced_name, cid_name)) {
1274 ced_name = ced_number;
1275 }
1276
1277 if (zstr(ced_number) || !strcmp(ced_number, cid_number)) {
1278 ced_name = switch_channel_get_variable(consumer_channel, "destination_number");
1279 ced_number = ced_name;
1280 }
1281 } else {
1282 ced_name = cid_name;
1283 ced_number = cid_number;
1284 }
1285
1286 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1287 switch_channel_event_set_data(consumer_channel, event);
1288 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1289 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-start");
1290 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", ced_name);
1291 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", ced_number);
1292 if (outbound_id) {
1293 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
1294 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
1295 }
1296 switch_event_fire(&event);
1297 }
1298
1299 if (caller_channel) {
1300 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1301 switch_channel_event_set_data(caller_channel, event);
1302 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1303 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-start");
1304 switch_event_fire(&event);
1305 }
1306
1307 sql = switch_mprintf("insert into fifo_bridge "
1308 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
1309 "values ('%q','%q','%q','%q','%q','%q',%ld)",
1310 MANUAL_QUEUE_NAME,
1311 switch_core_session_get_uuid(caller_session),
1312 ced_name,
1313 ced_number,
1314 switch_core_session_get_uuid(session),
1315 switch_str_nil(outbound_id),
1316 (long) switch_epoch_time_now(NULL)
1317 );
1318 } else {
1319 sql = switch_mprintf("insert into fifo_bridge "
1320 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
1321 "values ('%q','%q','%q','%q','%q','%q',%ld)",
1322 MANUAL_QUEUE_NAME,
1323 (msg->string_arg && strchr(msg->string_arg, '-')) ? msg->string_arg : "00000000-0000-0000-0000-000000000000",
1324 ced_name,
1325 ced_number,
1326 switch_core_session_get_uuid(session),
1327 switch_str_nil(outbound_id),
1328 (long) switch_epoch_time_now(NULL)
1329 );
1330 }
1331
1332 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1333
1334 ts = switch_micro_time_now();
1335 switch_time_exp_lt(&tm, ts);
1336 epoch_start = (long)switch_epoch_time_now(NULL);
1337 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
1338 switch_channel_set_variable(consumer_channel, "fifo_status", "TALKING");
1339 if (caller_session) {
1340 switch_channel_set_variable(consumer_channel, "fifo_target", switch_core_session_get_uuid(caller_session));
1341 }
1342 switch_channel_set_variable(consumer_channel, "fifo_timestamp", date);
1343 switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
1344 switch_channel_set_variable(consumer_channel, "fifo_role", "consumer");
1345
1346 if (caller_channel) {
1347 switch_channel_set_variable(caller_channel, "fifo_status", "TALKING");
1348 switch_channel_set_variable(caller_channel, "fifo_timestamp", date);
1349 switch_channel_set_variable_printf(caller_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
1350 switch_channel_set_variable(caller_channel, "fifo_target", switch_core_session_get_uuid(session));
1351 switch_channel_set_variable(caller_channel, "fifo_role", "caller");
1352 }
1353 }
1354 break;
1355 case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
1356 {
1357 do_unbridge(consumer_session, caller_session);
1358 }
1359 break;
1360 default:
1361 break;
1362 }
1363
1364 end:
1365
1366 if (caller_session) {
1367 switch_core_session_rwunlock(caller_session);
1368 }
1369
1370 return SWITCH_STATUS_SUCCESS;
1371 }
1372
1373 /*!\brief Create calls to outbound members with ringall strategy
1374 *
1375 * A fifo node has been selected for us and we've been given a list of
1376 * outbound members to ring. We're going to pick a single caller by
1377 * searching through the fifo node queues in order of priority
1378 * (`fifo_priority`) from lowest to highest. We'll look first for
1379 * callers with fifo_vip=true. Finding none, we'll consider the
1380 * plebs.
1381 *
1382 * Once we have a caller to service, we'll set fifo_bridge_uuid for
1383 * that caller to let the fifo application in on our decision. Our
1384 * job being done, we'll let the fifo application deal with the
1385 * remaining details.
1386 */
outbound_ringall_thread_run(switch_thread_t * thread,void * obj)1387 static void *SWITCH_THREAD_FUNC outbound_ringall_thread_run(switch_thread_t *thread, void *obj)
1388 {
1389 struct callback_helper *cbh = (struct callback_helper *) obj;
1390 char *node_name;
1391 int i = 0;
1392 int timeout = 0;
1393 switch_stream_handle_t stream = { 0 };
1394 switch_stream_handle_t stream2 = { 0 };
1395 fifo_node_t *node = NULL;
1396 char *originate_string = NULL;
1397 switch_event_t *ovars = NULL;
1398 switch_status_t status;
1399 switch_core_session_t *session = NULL;
1400 switch_call_cause_t cause = SWITCH_CAUSE_NONE;
1401 char *app_name = NULL, *arg = NULL;
1402 switch_caller_extension_t *extension = NULL;
1403 switch_channel_t *channel;
1404 char *caller_id_name = NULL, *cid_num = NULL, *id = NULL;
1405 switch_event_t *pop = NULL, *pop_dup = NULL;
1406 fifo_queue_t *q = NULL;
1407 int x = 0;
1408 switch_event_t *event;
1409 switch_uuid_t uuid;
1410 char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
1411 switch_call_cause_t cancel_cause = 0;
1412 char *uuid_list = NULL;
1413 int total = 0;
1414 const char *codec;
1415 struct call_helper *rows[MAX_ROWS] = { 0 };
1416 int rowcount = 0;
1417 switch_memory_pool_t *pool;
1418 char *export = NULL;
1419
1420 switch_mutex_lock(globals.mutex);
1421 globals.threads++;
1422 switch_mutex_unlock(globals.mutex);
1423
1424 if (!globals.running) goto dpool;
1425
1426 switch_uuid_get(&uuid);
1427 switch_uuid_format(uuid_str, &uuid);
1428
1429 if (!cbh->rowcount) {
1430 goto end;
1431 }
1432
1433 node_name = cbh->rows[0]->node_name;
1434
1435 switch_mutex_lock(globals.mutex);
1436 if ((node = switch_core_hash_find(globals.fifo_hash, node_name))) {
1437 switch_thread_rwlock_rdlock(node->rwlock);
1438 }
1439 switch_mutex_unlock(globals.mutex);
1440
1441 if (!node) {
1442 goto end;
1443 }
1444
1445 for (i = 0; i < cbh->rowcount; i++) {
1446 struct call_helper *h = cbh->rows[i];
1447
1448 if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) {
1449 continue;
1450 }
1451
1452 rows[rowcount++] = h;
1453 add_consumer_outbound_call(h->uuid, &cancel_cause);
1454 total++;
1455 }
1456
1457 for (i = 0; i < rowcount; i++) {
1458 struct call_helper *h = rows[i];
1459 cbh->rows[i] = h;
1460 }
1461
1462 cbh->rowcount = rowcount;
1463
1464 cbh->ready = 1;
1465
1466 if (!total) {
1467 goto end;
1468 }
1469
1470 switch_mutex_lock(node->update_mutex);
1471 node->busy = 0;
1472 node->ring_consumer_count++;
1473 switch_mutex_unlock(node->update_mutex);
1474
1475 SWITCH_STANDARD_STREAM(stream);
1476 SWITCH_STANDARD_STREAM(stream2);
1477
1478 switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
1479 switch_assert(ovars);
1480
1481 for (i = 0; i < cbh->rowcount; i++) {
1482 struct call_helper *h = cbh->rows[i];
1483 char *parsed = NULL;
1484 int use_ent = 0;
1485 char *expanded_originate_string = switch_event_expand_headers(ovars, h->originate_string);
1486
1487 switch_assert(expanded_originate_string);
1488 if (strstr(expanded_originate_string, "user/")) {
1489 switch_event_create_brackets(expanded_originate_string, '<', '>', ',', &ovars, &parsed, SWITCH_TRUE);
1490 use_ent = 1;
1491 } else {
1492 switch_event_create_brackets(expanded_originate_string, '{', '}', ',', &ovars, &parsed, SWITCH_TRUE);
1493 }
1494
1495 switch_event_del_header(ovars, "fifo_outbound_uuid");
1496
1497 if (!h->timeout) h->timeout = node->ring_timeout;
1498 if (timeout < h->timeout) timeout = h->timeout;
1499
1500 if (use_ent) {
1501 stream.write_function(&stream, "{ignore_early_media=true,outbound_redirect_fatal=true,leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s}%s%s",
1502 h->timeout, h->uuid, node->name,
1503 parsed ? parsed : expanded_originate_string, (i == cbh->rowcount - 1) ? "" : SWITCH_ENT_ORIGINATE_DELIM);
1504 } else {
1505 stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s]%s,",
1506 h->timeout, h->uuid, node->name, parsed ? parsed : expanded_originate_string);
1507 }
1508
1509 stream2.write_function(&stream2, "%s,", h->uuid);
1510 switch_safe_free(parsed);
1511
1512 if (expanded_originate_string != h->originate_string) {
1513 switch_safe_free(expanded_originate_string);
1514 }
1515 }
1516
1517 originate_string = (char *) stream.data;
1518
1519 uuid_list = (char *) stream2.data;
1520
1521 if (uuid_list) {
1522 end_of(uuid_list) = '\0';
1523 }
1524
1525 if (!timeout) timeout = 60;
1526
1527 pop = pop_dup = NULL;
1528
1529 for (x = 0; x < MAX_PRI; x++) {
1530 q = node->fifo_list[x];
1531 if (fifo_queue_pop_nameval(q, "variable_fifo_vip", "true", &pop_dup, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop_dup) {
1532 pop = pop_dup;
1533 break;
1534 }
1535 }
1536
1537 if (!pop) {
1538 for (x = 0; x < MAX_PRI; x++) {
1539 if (fifo_queue_pop(node->fifo_list[x], &pop_dup, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop_dup) {
1540 pop = pop_dup;
1541 break;
1542 }
1543 }
1544 }
1545
1546 if (!pop) {
1547 goto end;
1548 }
1549
1550 if (!switch_event_get_header(ovars, "origination_caller_id_name")) {
1551 if ((caller_id_name = switch_event_get_header(pop, "caller-caller-id-name"))) {
1552 if (!zstr(node->outbound_name)) {
1553 if ( node->outbound_name[0] == '=' ) {
1554 switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", "%s", node->outbound_name + 1);
1555 } else {
1556 switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", "(%s) %s", node->outbound_name, caller_id_name);
1557 }
1558 } else {
1559 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", caller_id_name);
1560 }
1561 }
1562 }
1563
1564 if (!switch_event_get_header(ovars, "origination_caller_id_number")) {
1565 if ((cid_num = switch_event_get_header(pop, "caller-caller-id-number"))) {
1566 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_number", cid_num);
1567 }
1568 }
1569
1570 if ((id = switch_event_get_header(pop, "unique-id"))) {
1571 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_bridge_uuid", id);
1572 }
1573
1574 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_originate_uuid", uuid_str);
1575
1576 if ((export = switch_event_get_header(pop, "variable_fifo_export"))) {
1577 int argc;
1578 char *argv[100] = { 0 };
1579 char *mydata = strdup(export);
1580 char *tmp;
1581
1582 argc = switch_split(mydata, ',', argv);
1583
1584 for (x = 0; x < argc; x++) {
1585 char *name = switch_mprintf("variable_%s", argv[x]);
1586
1587 if ((tmp = switch_event_get_header(pop, name))) {
1588 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, argv[x], tmp);
1589 }
1590
1591 free(name);
1592 }
1593
1594 switch_safe_free(mydata);
1595 }
1596
1597 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1598 switch_core_session_t *session;
1599 if (id && (session = switch_core_session_locate(id))) {
1600 switch_channel_t *channel = switch_core_session_get_channel(session);
1601
1602 switch_channel_set_variable(channel, "fifo_originate_uuid", uuid_str);
1603 switch_channel_event_set_data(channel, event);
1604 switch_core_session_rwunlock(session);
1605 }
1606
1607 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1608 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "pre-dial");
1609 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1610 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1611 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1612 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1613
1614 switch_event_fire(&event);
1615 }
1616
1617 for (i = 0; i < cbh->rowcount; i++) {
1618 struct call_helper *h = cbh->rows[i];
1619 char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%q'", h->uuid);
1620
1621 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1622 }
1623
1624 if (!globals.allow_transcoding && !switch_true(switch_event_get_header(pop, "variable_fifo_allow_transcoding")) &&
1625 (codec = switch_event_get_header(pop, "variable_rtp_use_codec_name"))) {
1626 const char *rate = switch_event_get_header(pop, "variable_rtp_use_codec_rate");
1627 const char *ptime = switch_event_get_header(pop, "variable_rtp_use_codec_ptime");
1628 char nstr[256] = "";
1629
1630 if (strcasecmp(codec, "PCMU") && strcasecmp(codec, "PCMA")) {
1631 switch_snprintf(nstr, sizeof(nstr), "%s@%si@%sh,PCMU@%si,PCMA@%si", codec, ptime, rate, ptime, ptime);
1632 } else {
1633 switch_snprintf(nstr, sizeof(nstr), "%s@%si@%sh", codec, ptime, rate);
1634 }
1635
1636 switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr);
1637 }
1638
1639 add_caller_outbound_call(id, &cancel_cause);
1640
1641 if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s dialing: %s\n", node->name, originate_string);
1642
1643 status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause, NULL);
1644
1645 del_caller_outbound_call(id);
1646
1647 if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
1648 const char *acceptable = "false";
1649
1650 switch (cause) {
1651 case SWITCH_CAUSE_ORIGINATOR_CANCEL:
1652 case SWITCH_CAUSE_PICKED_OFF:
1653 {
1654 acceptable = "true";
1655
1656 for (i = 0; i < cbh->rowcount; i++) {
1657 struct call_helper *h = cbh->rows[i];
1658 char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 "
1659 "where uuid='%q' and ring_count > 0", h->uuid);
1660 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1661 }
1662 }
1663 break;
1664 default:
1665 {
1666 for (i = 0; i < cbh->rowcount; i++) {
1667 struct call_helper *h = cbh->rows[i];
1668 char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
1669 "outbound_fail_count=outbound_fail_count+1, "
1670 "outbound_fail_total_count = outbound_fail_total_count+1, "
1671 "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
1672 (long) switch_epoch_time_now(NULL) + node->retry_delay, h->uuid);
1673 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1674 }
1675 }
1676 }
1677
1678 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1679 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1680 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1681 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1682 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1683 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "failure");
1684 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "acceptable", acceptable);
1685 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "cause", switch_channel_cause2str(cause));
1686 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1687 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1688 switch_event_fire(&event);
1689 }
1690
1691 goto end;
1692 }
1693
1694 channel = switch_core_session_get_channel(session);
1695
1696 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1697 switch_channel_event_set_data(channel, event);
1698 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1699 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1700 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1701 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1702 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", switch_channel_get_variable(channel, "fifo_outbound_uuid"));
1703 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1704 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "success");
1705 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1706 switch_event_fire(&event);
1707 }
1708
1709 switch_channel_set_variable(channel, "fifo_pop_order", NULL);
1710
1711 app_name = "fifo";
1712 arg = switch_core_session_sprintf(session, "%s out nowait", node_name);
1713 extension = switch_caller_extension_new(session, app_name, arg);
1714 switch_caller_extension_add_application(session, extension, app_name, arg);
1715 switch_channel_set_caller_extension(channel, extension);
1716 switch_channel_set_state(channel, CS_EXECUTE);
1717 switch_channel_wait_for_state(channel, NULL, CS_EXECUTE);
1718 switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL);
1719
1720 switch_core_session_rwunlock(session);
1721
1722 for (i = 0; i < cbh->rowcount; i++) {
1723 struct call_helper *h = cbh->rows[i];
1724 char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
1725 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1726 }
1727
1728 end:
1729
1730 cbh->ready = 1;
1731
1732 if (node) {
1733 switch_mutex_lock(node->update_mutex);
1734 if (--node->ring_consumer_count < 0) {
1735 node->ring_consumer_count = 0;
1736 }
1737 node->busy = 0;
1738 switch_mutex_unlock(node->update_mutex);
1739 switch_thread_rwlock_unlock(node->rwlock);
1740 }
1741
1742 for (i = 0; i < cbh->rowcount; i++) {
1743 struct call_helper *h = cbh->rows[i];
1744 del_consumer_outbound_call(h->uuid);
1745 }
1746
1747 switch_safe_free(originate_string);
1748 switch_safe_free(uuid_list);
1749
1750 if (ovars) {
1751 switch_event_destroy(&ovars);
1752 }
1753
1754 if (pop_dup) {
1755 switch_event_destroy(&pop_dup);
1756 }
1757
1758 dpool:
1759
1760 pool = cbh->pool;
1761 switch_core_destroy_memory_pool(&pool);
1762
1763 switch_mutex_lock(globals.mutex);
1764 globals.threads--;
1765 switch_mutex_unlock(globals.mutex);
1766
1767 return NULL;
1768 }
1769
1770 /*!\brief Send a call to an outbound member with the enterprise strategy
1771 *
1772 * A fifo and an outbound member have been picked out for us and our
1773 * job is to create a channel to the member and deliver that channel
1774 * into the `fifo <fifo> out` application.
1775 *
1776 * We haven't picked a caller yet, and we won't do so here. We'll let
1777 * the fifo application take care of that work.
1778 */
outbound_enterprise_thread_run(switch_thread_t * thread,void * obj)1779 static void *SWITCH_THREAD_FUNC outbound_enterprise_thread_run(switch_thread_t *thread, void *obj)
1780 {
1781 struct call_helper *h = (struct call_helper *) obj;
1782
1783 switch_core_session_t *session = NULL;
1784 switch_channel_t *channel;
1785 switch_call_cause_t cause = SWITCH_CAUSE_NONE;
1786 switch_caller_extension_t *extension = NULL;
1787 char *app_name, *arg = NULL, *originate_string = NULL;
1788 const char *member_wait = NULL;
1789 fifo_node_t *node = NULL;
1790 switch_event_t *ovars = NULL;
1791 switch_status_t status = SWITCH_STATUS_FALSE;
1792 switch_event_t *event = NULL;
1793 char *sql = NULL;
1794 char *expanded_originate_string = NULL;
1795
1796 if (!globals.running) return NULL;
1797
1798 switch_mutex_lock(globals.mutex);
1799 globals.threads++;
1800 switch_mutex_unlock(globals.mutex);
1801
1802 switch_mutex_lock(globals.mutex);
1803 node = switch_core_hash_find(globals.fifo_hash, h->node_name);
1804 if (node) switch_thread_rwlock_rdlock(node->rwlock);
1805 switch_mutex_unlock(globals.mutex);
1806
1807 if (node) {
1808 switch_mutex_lock(node->update_mutex);
1809 node->ring_consumer_count++;
1810 node->busy = 0;
1811 switch_mutex_unlock(node->update_mutex);
1812 }
1813
1814 switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
1815 switch_assert(ovars);
1816 switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "originate_timeout", "%d", h->timeout);
1817
1818 expanded_originate_string = switch_event_expand_headers(ovars, h->originate_string);
1819
1820 if (node && switch_stristr("origination_caller", expanded_originate_string)) {
1821 originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q'}%s",
1822 node->name, node->name, expanded_originate_string);
1823 } else {
1824 if (node && !zstr(node->outbound_name)) {
1825 originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q',"
1826 "origination_caller_id_name=Queue,origination_caller_id_number='Queue: %q'}%s",
1827 node->name, node->name, node->outbound_name, expanded_originate_string);
1828 } else if (node) {
1829 originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q',"
1830 "origination_caller_id_name=Queue,origination_caller_id_number='Queue: %q'}%s",
1831 node->name, node->name, node->name, expanded_originate_string);
1832 }
1833 }
1834
1835 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1836 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1837 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "pre-dial");
1838 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1839 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1840 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1841 switch_event_fire(&event);
1842 }
1843
1844 sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%q'", h->uuid);
1845 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1846
1847 status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL, NULL);
1848
1849 if (status != SWITCH_STATUS_SUCCESS) {
1850 sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
1851 "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
1852 (long) switch_epoch_time_now(NULL) + (node ? node->retry_delay : 0), h->uuid);
1853 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1854
1855 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1856 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1857 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1858 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1859 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1860 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "failure");
1861 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "cause", switch_channel_cause2str(cause));
1862 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1863 switch_event_fire(&event);
1864 }
1865
1866 goto end;
1867 }
1868
1869 channel = switch_core_session_get_channel(session);
1870
1871 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1872 switch_channel_event_set_data(channel, event);
1873 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1874 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1875 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1876 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1877 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "success");
1878 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1879 switch_event_fire(&event);
1880 }
1881
1882 if ((member_wait = switch_channel_get_variable(channel, "fifo_member_wait")) || (member_wait = switch_channel_get_variable(channel, "member_wait"))) {
1883 if (strcasecmp(member_wait, "wait") && strcasecmp(member_wait, "nowait")) {
1884 member_wait = NULL;
1885 }
1886 }
1887
1888 switch_channel_set_variable(channel, "fifo_outbound_uuid", h->uuid);
1889 app_name = "fifo";
1890 arg = switch_core_session_sprintf(session, "%s out %s", h->node_name, member_wait ? member_wait : "wait");
1891 extension = switch_caller_extension_new(session, app_name, arg);
1892 switch_caller_extension_add_application(session, extension, app_name, arg);
1893 switch_channel_set_caller_extension(channel, extension);
1894 switch_channel_set_state(channel, CS_EXECUTE);
1895 switch_core_session_rwunlock(session);
1896
1897 sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
1898 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1899
1900 end:
1901
1902 if ( originate_string ){
1903 switch_safe_free(originate_string);
1904 }
1905
1906 if (expanded_originate_string && expanded_originate_string != h->originate_string) {
1907 switch_safe_free(expanded_originate_string);
1908 }
1909
1910 switch_event_destroy(&ovars);
1911 if (node) {
1912 switch_mutex_lock(node->update_mutex);
1913 if (--node->ring_consumer_count < 0) {
1914 node->ring_consumer_count = 0;
1915 }
1916 node->busy = 0;
1917 switch_mutex_unlock(node->update_mutex);
1918 switch_thread_rwlock_unlock(node->rwlock);
1919 }
1920 switch_core_destroy_memory_pool(&h->pool);
1921
1922 switch_mutex_lock(globals.mutex);
1923 globals.threads--;
1924 switch_mutex_unlock(globals.mutex);
1925
1926 return NULL;
1927 }
1928
1929 /*!\brief Extract the outbound member results and accumulate them for
1930 * the ringall strategy handler
1931 */
place_call_ringall_callback(void * pArg,int argc,char ** argv,char ** columnNames)1932 static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames)
1933 {
1934 struct callback_helper *cbh = (struct callback_helper *) pArg;
1935 struct call_helper *h;
1936
1937 h = switch_core_alloc(cbh->pool, sizeof(*h));
1938 h->pool = cbh->pool;
1939 h->uuid = switch_core_strdup(h->pool, argv[0]);
1940 h->node_name = switch_core_strdup(h->pool, argv[1]);
1941 h->originate_string = switch_core_strdup(h->pool, argv[2]);
1942 h->timeout = atoi(argv[5]);
1943
1944 cbh->rows[cbh->rowcount++] = h;
1945
1946 if (cbh->rowcount == MAX_ROWS) return -1;
1947
1948 if (cbh->need) {
1949 cbh->need--;
1950 return cbh->need ? 0 : -1;
1951 }
1952
1953 return 0;
1954 }
1955
1956 /*!\brief Extract the outbound member results and invoke the
1957 * enterprise strategy handler
1958 */
place_call_enterprise_callback(void * pArg,int argc,char ** argv,char ** columnNames)1959 static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames)
1960 {
1961 int *need = (int *) pArg;
1962
1963 switch_thread_t *thread;
1964 switch_threadattr_t *thd_attr = NULL;
1965 switch_memory_pool_t *pool;
1966 struct call_helper *h;
1967
1968 switch_core_new_memory_pool(&pool);
1969 h = switch_core_alloc(pool, sizeof(*h));
1970 h->pool = pool;
1971 h->uuid = switch_core_strdup(h->pool, argv[0]);
1972 h->node_name = switch_core_strdup(h->pool, argv[1]);
1973 h->originate_string = switch_core_strdup(h->pool, argv[2]);
1974 h->timeout = atoi(argv[5]);
1975
1976 switch_threadattr_create(&thd_attr, h->pool);
1977 switch_threadattr_detach_set(thd_attr, 1);
1978 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1979 switch_thread_create(&thread, thd_attr, outbound_enterprise_thread_run, h, h->pool);
1980
1981 (*need)--;
1982
1983 return *need ? 0 : -1;
1984 }
1985
1986 /*!\brief Find outbound members to call for a given fifo node
1987 *
1988 * We're given a fifo node that has callers to be delivered to agents.
1989 * Our job is to find available outbound members and pass them to the
1990 * appropriate outbound strategy handler.
1991 *
1992 * The ringall strategy handler needs the full list of members to do
1993 * its job, so we first let `place_call_ringall_callback` accumulate
1994 * the results. The enterprise strategy handler can simply take each
1995 * member one at a time, so the `place_call_enterprise_callback` takes
1996 * care of invoking the handler.
1997 *
1998 * Within the ringall call strategy outbound_per_cycle is used to define
1999 * how many agents exactly are assigned to the caller. With ringall if
2000 * multiple callers are calling in and one is answered, because the call
2001 * is assigned to all agents the call to the agents that is not answered
2002 * will be lose raced and the other agents will drop the call before the
2003 * next one will begin to ring. When oubound_per_cycle is used in the
2004 * enterprise strategy it acts as a maximum value for how many agents
2005 * are rung at once on any call, the caller is not assigned to any agent
2006 * until the call is answered. Enterprise only rings the number of phones
2007 * that are needed, so outbound_per_cycle as a max does not give you the
2008 * effect of ringall. outbound_per_cycle_min defines how many agents minimum
2009 * will be rung by an incoming caller through fifo, which can give a ringall
2010 * effect. outbound_per_cycle and outbound_per_cycle_min both default to 1.
2011 *
2012 */
find_consumers(fifo_node_t * node)2013 static int find_consumers(fifo_node_t *node)
2014 {
2015 char *sql;
2016 int ret = 0;
2017
2018 sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
2019 "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
2020 "from fifo_outbound "
2021 "where taking_calls = 1 and (fifo_name = '%q') and ((use_count+ring_count) < simo_count) and (next_avail = 0 or next_avail <= %ld) "
2022 "order by next_avail, outbound_fail_count, outbound_call_count",
2023 node->name, (long) switch_epoch_time_now(NULL)
2024 );
2025
2026 switch(node->outbound_strategy) {
2027 case NODE_STRATEGY_ENTERPRISE:
2028 {
2029 int need = node_caller_count(node);
2030 int count;
2031
2032 if (node->outbound_per_cycle && node->outbound_per_cycle < need) {
2033 need = node->outbound_per_cycle;
2034 } else if (node->outbound_per_cycle_min && node->outbound_per_cycle_min > need) {
2035 need = node->outbound_per_cycle_min;
2036 }
2037
2038 count = need;
2039 fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
2040 ret = count - need;
2041 }
2042 break;
2043 case NODE_STRATEGY_RINGALL:
2044 {
2045 switch_thread_t *thread;
2046 switch_threadattr_t *thd_attr = NULL;
2047 struct callback_helper *cbh = NULL;
2048 switch_memory_pool_t *pool = NULL;
2049
2050 switch_core_new_memory_pool(&pool);
2051 cbh = switch_core_alloc(pool, sizeof(*cbh));
2052 cbh->pool = pool;
2053 cbh->need = 1;
2054
2055 if (node->outbound_per_cycle != cbh->need) {
2056 cbh->need = node->outbound_per_cycle;
2057 }
2058
2059 fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
2060
2061 if (cbh->rowcount) {
2062 ret = cbh->rowcount;
2063 switch_threadattr_create(&thd_attr, cbh->pool);
2064 switch_threadattr_detach_set(thd_attr, 1);
2065 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
2066 switch_thread_create(&thread, thd_attr, outbound_ringall_thread_run, cbh, cbh->pool);
2067 } else {
2068 switch_core_destroy_memory_pool(&pool);
2069 }
2070 }
2071 break;
2072 default:
2073 break;
2074 }
2075
2076 switch_safe_free(sql);
2077 return ret;
2078 }
2079
2080 /*\brief Continuously attempt to deliver calls to outbound members
2081 *
2082 * For each outbound priority level 1-10, find fifo nodes with a
2083 * matching priority. For each of those nodes with outbound members,
2084 * run `find_consumers()` if the fifo node has calls needing to be
2085 * delivered and not enough ready and waiting inbound consumers.
2086 *
2087 * In the event of nothing needing to be done, each cycle starts at
2088 * priority 1 and ends at priority 10, yielding for one second
2089 * afterward. We also yield after initiating outbound calls, starting
2090 * again where we left off on the next node.
2091 *
2092 * We also take care of cleaning up after nodes queued for deletion.
2093 */
node_thread_run(switch_thread_t * thread,void * obj)2094 static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
2095 {
2096 fifo_node_t *node, *last, *this_node;
2097 int cur_priority = 1;
2098
2099 globals.node_thread_running = 1;
2100
2101 while (globals.node_thread_running == 1) {
2102 int ppl_waiting, consumer_total, idle_consumers, need_sleep = 0;
2103
2104 switch_mutex_lock(globals.mutex);
2105
2106 if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
2107
2108 last = NULL;
2109 node = globals.nodes;
2110
2111 while(node) {
2112 int x = 0;
2113 switch_event_t *pop = NULL;
2114
2115 this_node = node;
2116 node = node->next;
2117
2118 if (this_node->ready == 0) {
2119 for (x = 0; x < MAX_PRI; x++) {
2120 while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
2121 const char *caller_uuid = switch_event_get_header(pop, "unique-id");
2122 switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST);
2123 switch_event_destroy(&pop);
2124 }
2125 }
2126 }
2127
2128 if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
2129 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
2130
2131 for (x = 0; x < MAX_PRI; x++) {
2132 while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
2133 switch_event_destroy(&pop);
2134 }
2135 }
2136
2137 if (last) {
2138 last->next = this_node->next;
2139 } else {
2140 globals.nodes = this_node->next;
2141 }
2142
2143 switch_core_hash_destroy(&this_node->consumer_hash);
2144 switch_mutex_unlock(this_node->mutex);
2145 switch_mutex_unlock(this_node->update_mutex);
2146 switch_thread_rwlock_unlock(this_node->rwlock);
2147 switch_core_destroy_memory_pool(&this_node->pool);
2148 continue;
2149 }
2150
2151 last = this_node;
2152
2153 if (this_node->outbound_priority == 0) this_node->outbound_priority = 5;
2154
2155 if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) {
2156 ppl_waiting = node_caller_count(this_node);
2157 consumer_total = this_node->consumer_count;
2158 idle_consumers = node_idle_consumers(this_node);
2159
2160 if (globals.debug) {
2161 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
2162 "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
2163 this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
2164 }
2165
2166 if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
2167 if (find_consumers(this_node)) {
2168 need_sleep++;
2169 }
2170 }
2171 }
2172 }
2173
2174 if (++cur_priority > 10) {
2175 cur_priority = 1;
2176 }
2177
2178 switch_mutex_unlock(globals.mutex);
2179
2180 if (cur_priority == 1 || need_sleep) {
2181 switch_yield(1000000);
2182 }
2183 }
2184
2185 globals.node_thread_running = 0;
2186
2187 return NULL;
2188 }
2189
start_node_thread(switch_memory_pool_t * pool)2190 static void start_node_thread(switch_memory_pool_t *pool)
2191 {
2192 switch_threadattr_t *thd_attr = NULL;
2193
2194 switch_threadattr_create(&thd_attr, pool);
2195 //switch_threadattr_detach_set(thd_attr, 1);
2196 switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
2197 switch_thread_create(&globals.node_thread, thd_attr, node_thread_run, pool, pool);
2198 }
2199
stop_node_thread(void)2200 static int stop_node_thread(void)
2201 {
2202 switch_status_t st = SWITCH_STATUS_SUCCESS;
2203
2204 globals.node_thread_running = -1;
2205 switch_thread_join(&st, globals.node_thread);
2206
2207 return 0;
2208 }
2209
check_cancel(fifo_node_t * node)2210 static void check_cancel(fifo_node_t *node)
2211 {
2212 int ppl_waiting;
2213
2214 if (node->outbound_strategy != NODE_STRATEGY_ENTERPRISE) {
2215 return;
2216 }
2217
2218 ppl_waiting = node_caller_count(node);
2219
2220 if (node->ring_consumer_count > 0 && ppl_waiting < 1) {
2221 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Outbound call count (%d) exceeds required value for queue %s (%d), "
2222 "Ending extraneous calls\n", node->ring_consumer_count, node->name, ppl_waiting);
2223
2224 switch_core_session_hupall_matching_var("fifo_hangup_check", node->name, SWITCH_CAUSE_ORIGINATOR_CANCEL);
2225 }
2226 }
2227
send_presence(fifo_node_t * node)2228 static void send_presence(fifo_node_t *node)
2229 {
2230 switch_event_t *event;
2231 int wait_count = 0;
2232
2233 if (!globals.running) {
2234 return;
2235 }
2236
2237 if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
2238 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto", "queue");
2239
2240 if (node->domain_name) {
2241 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s@%s", node->name, node->domain_name);
2242 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", node->name, node->domain_name);
2243 } else {
2244 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "login", node->name);
2245 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "from", node->name);
2246 }
2247
2248 if ((wait_count = node_caller_count(node)) > 0) {
2249 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "force-status", "Active (%d waiting)", wait_count);
2250 } else {
2251 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "force-status", "Idle");
2252 }
2253 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "rpid", "unknown");
2254 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
2255 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
2256 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_count", "%d", 0);
2257
2258 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "channel-state", wait_count > 0 ? "CS_ROUTING" : "CS_HANGUP");
2259 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "unique-id", node->name);
2260 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "answer-state", wait_count > 0 ? "confirmed" : "terminated");
2261 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "presence-call-direction", "inbound");
2262 switch_event_fire(&event);
2263 }
2264 }
2265
pres_event_handler(switch_event_t * event)2266 static void pres_event_handler(switch_event_t *event)
2267 {
2268 char *to = switch_event_get_header(event, "to");
2269 char *domain_name = NULL;
2270 char *dup_to = NULL, *node_name , *dup_node_name;
2271 fifo_node_t *node;
2272
2273 if (!globals.running) {
2274 return;
2275 }
2276
2277 if (!to || strncasecmp(to, "queue+", 6) || !strchr(to, '@')) {
2278 return;
2279 }
2280
2281 dup_to = strdup(to);
2282 switch_assert(dup_to);
2283
2284 node_name = dup_to + 6;
2285
2286 if ((domain_name = strchr(node_name, '@'))) {
2287 *domain_name++ = '\0';
2288 }
2289
2290 dup_node_name = switch_mprintf("%q@%q", node_name, domain_name);
2291
2292 switch_mutex_lock(globals.mutex);
2293 if (!(node = switch_core_hash_find(globals.fifo_hash, node_name)) && !(node = switch_core_hash_find(globals.fifo_hash, dup_node_name))) {
2294 node = create_node(node_name, 0, globals.sql_mutex);
2295 node->domain_name = switch_core_strdup(node->pool, domain_name);
2296 node->ready = 1;
2297 }
2298
2299 switch_thread_rwlock_rdlock(node->rwlock);
2300 send_presence(node);
2301 switch_thread_rwlock_unlock(node->rwlock);
2302
2303 switch_mutex_unlock(globals.mutex);
2304
2305 switch_safe_free(dup_to);
2306 switch_safe_free(dup_node_name);
2307 }
2308
fifo_add_outbound(const char * node_name,const char * url,uint32_t priority)2309 static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32_t priority)
2310 {
2311 fifo_node_t *node;
2312 switch_event_t *call_event;
2313 uint32_t i = 0;
2314
2315 if (priority >= MAX_PRI) {
2316 priority = MAX_PRI - 1;
2317 }
2318
2319 if (!node_name) return 0;
2320
2321 switch_mutex_lock(globals.mutex);
2322
2323 if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) {
2324 node = create_node(node_name, 0, globals.sql_mutex);
2325 }
2326
2327 switch_thread_rwlock_rdlock(node->rwlock);
2328
2329 switch_mutex_unlock(globals.mutex);
2330
2331 switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
2332 switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "dial-url", url);
2333
2334 fifo_queue_push(node->fifo_list[priority], call_event);
2335 call_event = NULL;
2336
2337 i = fifo_queue_size(node->fifo_list[priority]);
2338
2339 switch_thread_rwlock_unlock(node->rwlock);
2340
2341 return i;
2342 }
2343
SWITCH_STANDARD_API(fifo_check_bridge_function)2344 SWITCH_STANDARD_API(fifo_check_bridge_function)
2345 {
2346 stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false");
2347
2348 return SWITCH_STATUS_SUCCESS;
2349 }
2350
SWITCH_STANDARD_API(fifo_add_outbound_function)2351 SWITCH_STANDARD_API(fifo_add_outbound_function)
2352 {
2353 char *data = NULL, *argv[4] = { 0 };
2354 int argc;
2355 uint32_t priority = 0;
2356
2357 if (zstr(cmd)) {
2358 goto fail;
2359 }
2360
2361 data = strdup(cmd);
2362
2363 if ((argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 2 || !argv[0]) {
2364 goto fail;
2365 }
2366
2367 if (argv[2]) {
2368 int tmp = atoi(argv[2]);
2369 if (tmp > 0) {
2370 priority = tmp;
2371 }
2372 }
2373
2374 stream->write_function(stream, "%d", fifo_add_outbound(argv[0], argv[1], priority));
2375
2376 free(data);
2377 return SWITCH_STATUS_SUCCESS;
2378
2379 fail:
2380
2381 free(data);
2382 stream->write_function(stream, "0");
2383 return SWITCH_STATUS_SUCCESS;
2384 }
2385
dec_use_count(switch_core_session_t * session,const char * type)2386 static void dec_use_count(switch_core_session_t *session, const char *type)
2387 {
2388 char *sql;
2389 const char *outbound_id = NULL;
2390 switch_event_t *event;
2391 long now = (long) switch_epoch_time_now(NULL);
2392 switch_channel_t *channel = switch_core_session_get_channel(session);
2393
2394 if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
2395 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s untracking call on uuid %s!\n", switch_channel_get_name(channel), outbound_id);
2396
2397 sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
2398 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
2399
2400 del_bridge_call(outbound_id);
2401 sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
2402 now, now, outbound_id);
2403 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2404 fifo_dec_use_count(outbound_id);
2405 }
2406
2407 do_unbridge(session, NULL);
2408
2409 if (type) {
2410 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2411 uint64_t hold_usec = 0, tt_usec = 0;
2412 switch_caller_profile_t *originator_cp = NULL;
2413
2414 originator_cp = switch_channel_get_caller_profile(channel);
2415 switch_channel_event_set_data(channel, event);
2416 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
2417 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-stop");
2418 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", type);
2419 if (outbound_id) {
2420 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
2421 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
2422 }
2423 hold_usec = originator_cp->times->hold_accum;
2424 tt_usec = (switch_micro_time_now() - originator_cp->times->bridged) - hold_usec;
2425 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-us", "%"SWITCH_TIME_T_FMT, originator_cp->times->bridged);
2426 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000));
2427 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000000));
2428 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
2429 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
2430 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
2431 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
2432 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
2433 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
2434
2435 switch_event_fire(&event);
2436 }
2437 }
2438 }
2439
hanguphook(switch_core_session_t * session)2440 static switch_status_t hanguphook(switch_core_session_t *session)
2441 {
2442 switch_channel_t *channel = switch_core_session_get_channel(session);
2443 switch_channel_state_t state = switch_channel_get_state(channel);
2444
2445 if (state >= CS_HANGUP && !switch_channel_test_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_DID_HOOK)) {
2446 dec_use_count(session, "manual");
2447 switch_core_event_hook_remove_state_change(session, hanguphook);
2448 switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_DID_HOOK);
2449 }
2450
2451 return SWITCH_STATUS_SUCCESS;
2452 }
2453
SWITCH_STANDARD_APP(fifo_track_call_function)2454 SWITCH_STANDARD_APP(fifo_track_call_function)
2455 {
2456 switch_channel_t *channel = switch_core_session_get_channel(session);
2457 char *sql;
2458 const char *col1 = NULL, *col2 = NULL, *cid_name, *cid_number;
2459 switch_event_t *event;
2460
2461 if (zstr(data)) {
2462 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid!\n");
2463 return;
2464 }
2465
2466 if (switch_channel_test_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING)) {
2467 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s trying to double-track call!\n", switch_channel_get_name(channel));
2468 return;
2469 }
2470
2471 switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
2472 switch_channel_set_variable(channel, "fifo_track_call", "true");
2473
2474 add_bridge_call(data);
2475
2476 switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING);
2477
2478 switch_core_event_hook_add_receive_message(session, messagehook);
2479 switch_core_event_hook_add_state_run(session, hanguphook);
2480
2481 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s tracking call on uuid %s!\n", switch_channel_get_name(channel), data);
2482
2483 if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
2484 col1 = "manual_calls_in_count";
2485 col2 = "manual_calls_in_total_count";
2486 } else {
2487 col1 = "manual_calls_out_count";
2488 col2 = "manual_calls_out_total_count";
2489 }
2490
2491 sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
2492 (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
2493 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2494 fifo_inc_use_count(data);
2495
2496 if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
2497 cid_name = switch_channel_get_variable(channel, "destination_number");
2498 cid_number = cid_name;
2499 } else {
2500 cid_name = switch_channel_get_variable(channel, "caller_id_name");
2501 cid_number = switch_channel_get_variable(channel, "caller_id_number");
2502 }
2503
2504 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2505 switch_channel_event_set_data(channel, event);
2506 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
2507 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-start");
2508 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", data);
2509 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(data));
2510 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "manual");
2511 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", cid_name);
2512 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", cid_number);
2513 switch_event_fire(&event);
2514 }
2515 }
2516
fifo_caller_add(fifo_node_t * node,switch_core_session_t * session)2517 static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session)
2518 {
2519 char *sql;
2520 switch_channel_t *channel = switch_core_session_get_channel(session);
2521
2522 sql = switch_mprintf("insert into fifo_callers (fifo_name,uuid,caller_caller_id_name,caller_caller_id_number,timestamp) "
2523 "values ('%q','%q','%q','%q',%ld)",
2524 node->name,
2525 switch_core_session_get_uuid(session),
2526 switch_str_nil(switch_channel_get_variable(channel, "caller_id_name")),
2527 switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")),
2528 switch_epoch_time_now(NULL));
2529
2530 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2531 }
2532
fifo_caller_del(const char * uuid)2533 static void fifo_caller_del(const char *uuid)
2534 {
2535 char *sql;
2536
2537 if (uuid) {
2538 sql = switch_mprintf("delete from fifo_callers where uuid='%q'", uuid);
2539 } else {
2540 sql = switch_mprintf("delete from fifo_callers");
2541 }
2542
2543 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2544 }
2545
2546 typedef enum {
2547 STRAT_MORE_PPL,
2548 STRAT_WAITING_LONGER,
2549 } fifo_strategy_t;
2550
2551 #define MAX_NODES_PER_CONSUMER 25
2552 #define FIFO_DESC "Fifo for stacking parked calls."
2553 #define FIFO_USAGE "<fifo name>[!<importance_number>] [in [<announce file>|undef] [<music file>|undef] | out [wait|nowait] [<announce file>|undef] [<music file>|undef]]"
SWITCH_STANDARD_APP(fifo_function)2554 SWITCH_STANDARD_APP(fifo_function)
2555 {
2556 int argc;
2557 char *mydata = NULL, *argv[5] = { 0 };
2558 fifo_node_t *node = NULL, *node_list[MAX_NODES_PER_CONSUMER + 1] = { 0 };
2559 switch_channel_t *channel = switch_core_session_get_channel(session);
2560 int do_destroy = 0, do_wait = 1, node_count = 0, i = 0;
2561 const char *moh = NULL;
2562 const char *announce = NULL;
2563 switch_event_t *event = NULL;
2564 char date[80] = "";
2565 switch_time_exp_t tm;
2566 switch_time_t ts;
2567 switch_size_t retsize;
2568 char *list_string;
2569 int nlist_count;
2570 char *nlist[MAX_NODES_PER_CONSUMER];
2571 int consumer = 0, in_table = 0;
2572 const char *arg_fifo_name = NULL;
2573 const char *arg_inout = NULL;
2574 const char *serviced_uuid = NULL;
2575
2576 if (!globals.running) {
2577 return;
2578 }
2579
2580 if (zstr(data)) {
2581 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "No Args\n");
2582 return;
2583 }
2584
2585 switch_channel_set_variable(channel, "fifo_hangup_check", NULL);
2586
2587 mydata = switch_core_session_strdup(session, data);
2588 switch_assert(mydata);
2589
2590 argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
2591 arg_fifo_name = argv[0];
2592 arg_inout = argv[1];
2593
2594 if (!(arg_fifo_name && arg_inout)) {
2595 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2596 return;
2597 }
2598
2599 if (!strcasecmp(arg_inout, "out")) {
2600 consumer = 1;
2601 } else if (strcasecmp(arg_inout, "in")) {
2602 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2603 return;
2604 }
2605
2606 list_string = switch_core_session_strdup(session, arg_fifo_name);
2607
2608 if (!(nlist_count = switch_separate_string(list_string, ',', nlist, (sizeof(nlist) / sizeof(nlist[0]))))) {
2609 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2610 return;
2611 }
2612
2613 if (!consumer && nlist_count > 1) {
2614 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2615 return;
2616 }
2617
2618 switch_mutex_lock(globals.mutex);
2619 for (i = 0; i < nlist_count; i++) {
2620 int importance = 0;
2621 char *p;
2622
2623 if ((p = strrchr(nlist[i], '!'))) {
2624 *p++ = '\0';
2625 importance = atoi(p);
2626 if (importance < 0) {
2627 importance = 0;
2628 }
2629 }
2630
2631 if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
2632 node = create_node(nlist[i], importance, globals.sql_mutex);
2633 node->ready = 1;
2634 }
2635
2636 switch_thread_rwlock_rdlock(node->rwlock);
2637 node_list[node_count++] = node;
2638 }
2639
2640 switch_mutex_unlock(globals.mutex);
2641
2642 moh = switch_channel_get_variable(channel, "fifo_music");
2643 announce = switch_channel_get_variable(channel, "fifo_announce");
2644
2645 if (consumer) {
2646 if (argc > 3) {
2647 announce = argv[3];
2648 }
2649
2650 if (argc > 4) {
2651 moh = argv[4];
2652 }
2653 } else {
2654 if (argc > 2) {
2655 announce = argv[2];
2656 }
2657
2658 if (argc > 3) {
2659 moh = argv[3];
2660 }
2661 }
2662
2663 if (moh && !strcasecmp(moh, "silence")) {
2664 moh = NULL;
2665 }
2666
2667 cleanup_fifo_arg(&announce);
2668 cleanup_fifo_arg(&moh);
2669 switch_assert(node);
2670
2671 switch_core_media_bug_pause(session);
2672
2673 if (!consumer) {
2674 switch_core_session_t *other_session;
2675 switch_channel_t *other_channel;
2676 const char *uuid = switch_core_session_get_uuid(session);
2677 const char *pri;
2678 char tmp[25] = "";
2679 int p = 0;
2680 int aborted = 0;
2681 fifo_chime_data_t cd = { {0} };
2682 const char *chime_list = switch_channel_get_variable(channel, "fifo_chime_list");
2683 const char *chime_freq = switch_channel_get_variable(channel, "fifo_chime_freq");
2684 const char *orbit_exten = switch_channel_get_variable(channel, "fifo_orbit_exten");
2685 const char *orbit_dialplan = switch_channel_get_variable(channel, "fifo_orbit_dialplan");
2686 const char *orbit_context = switch_channel_get_variable(channel, "fifo_orbit_context");
2687
2688 const char *orbit_ann = switch_channel_get_variable(channel, "fifo_orbit_announce");
2689 const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
2690 int freq = 30;
2691 int ftmp = 0;
2692 int to = 60;
2693 switch_event_t *call_event;
2694
2695 if (orbit_exten) {
2696 char *ot;
2697 if ((cd.orbit_exten = switch_core_session_strdup(session, orbit_exten))) {
2698 if ((ot = strchr(cd.orbit_exten, ':'))) {
2699 *ot++ = '\0';
2700 if ((to = atoi(ot)) < 0) {
2701 to = 60;
2702 }
2703 }
2704 cd.orbit_timeout = switch_epoch_time_now(NULL) + to;
2705 }
2706 cd.orbit_dialplan = switch_core_session_strdup(session, orbit_dialplan);
2707 cd.orbit_context = switch_core_session_strdup(session, orbit_context);
2708 }
2709
2710 if (chime_freq) {
2711 ftmp = atoi(chime_freq);
2712 if (ftmp > 0) {
2713 freq = ftmp;
2714 }
2715 }
2716
2717 switch_channel_answer(channel);
2718
2719 switch_mutex_lock(node->update_mutex);
2720
2721 if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
2722 p = atoi(pri);
2723 }
2724
2725 if (p >= MAX_PRI) {
2726 p = MAX_PRI - 1;
2727 }
2728
2729 if (!node_caller_count(node)) {
2730 node->start_waiting = switch_micro_time_now();
2731 }
2732
2733 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2734 switch_channel_event_set_data(channel, event);
2735 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2736 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push");
2737 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p);
2738 switch_event_fire(&event);
2739 }
2740
2741 switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
2742 switch_channel_event_set_data(channel, call_event);
2743
2744 fifo_queue_push(node->fifo_list[p], call_event);
2745 fifo_caller_add(node, session);
2746 in_table = 1;
2747
2748 call_event = NULL;
2749 switch_snprintf(tmp, sizeof(tmp), "%d", fifo_queue_size(node->fifo_list[p]));
2750 switch_channel_set_variable(channel, "fifo_position", tmp);
2751
2752 if (!pri) {
2753 switch_snprintf(tmp, sizeof(tmp), "%d", p);
2754 switch_channel_set_variable(channel, "fifo_priority", tmp);
2755 }
2756
2757 switch_mutex_unlock(node->update_mutex);
2758
2759 ts = switch_micro_time_now();
2760 switch_time_exp_lt(&tm, ts);
2761 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2762 switch_channel_set_variable(channel, "fifo_status", "WAITING");
2763 switch_channel_set_variable(channel, "fifo_timestamp", date);
2764 switch_channel_set_variable(channel, "fifo_push_timestamp", date);
2765 switch_channel_set_variable(channel, "fifo_serviced_uuid", NULL);
2766
2767 switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
2768
2769 if (chime_list) {
2770 char *list_dup = switch_core_session_strdup(session, chime_list);
2771 cd.total = switch_separate_string(list_dup, ',', cd.list, (sizeof(cd.list) / sizeof(cd.list[0])));
2772 cd.freq = freq;
2773 cd.next = switch_epoch_time_now(NULL) + cd.freq;
2774 cd.exit_key = (char *) switch_channel_get_variable(channel, "fifo_caller_exit_key");
2775 }
2776
2777 send_presence(node);
2778
2779 while (switch_channel_ready(channel)) {
2780 switch_input_args_t args = { 0 };
2781 char buf[25] = "";
2782 switch_status_t rstatus;
2783
2784 args.input_callback = moh_on_dtmf;
2785 args.buf = buf;
2786 args.buflen = sizeof(buf);
2787
2788 if (cd.total || cd.orbit_timeout) {
2789 args.read_frame_callback = caller_read_frame_callback;
2790 args.user_data = &cd;
2791 }
2792
2793 if (cd.abort || cd.do_orbit) {
2794 aborted = 1;
2795 goto abort;
2796 }
2797
2798 if ((serviced_uuid = switch_channel_get_variable(channel, "fifo_serviced_uuid"))) {
2799 break;
2800 }
2801
2802 switch_core_session_flush_private_events(session);
2803
2804 if (moh) {
2805 rstatus = switch_ivr_play_file(session, NULL, moh, &args);
2806 } else {
2807 rstatus = switch_ivr_collect_digits_callback(session, &args, 0, 0);
2808 }
2809
2810 if (!SWITCH_READ_ACCEPTABLE(rstatus)) {
2811 aborted = 1;
2812 goto abort;
2813 }
2814
2815 if (caller_exit_key && *buf && strchr(caller_exit_key, *buf)) {
2816 switch_channel_set_variable(channel, "fifo_caller_exit_key", (char *)buf);
2817 aborted = 1;
2818 goto abort;
2819 }
2820 }
2821
2822 if (!serviced_uuid && switch_channel_ready(channel)) {
2823 switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
2824 } else if ((other_session = switch_core_session_locate(serviced_uuid))) {
2825 int ready;
2826 other_channel = switch_core_session_get_channel(other_session);
2827 ready = switch_channel_ready(other_channel);
2828 switch_core_session_rwunlock(other_session);
2829 if (!ready) {
2830 switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
2831 }
2832 }
2833
2834 switch_core_session_flush_private_events(session);
2835
2836 if (switch_channel_ready(channel)) {
2837 if (announce) {
2838 switch_ivr_play_file(session, NULL, announce, NULL);
2839 }
2840 }
2841
2842 abort:
2843
2844 switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
2845
2846 if (!aborted && switch_channel_ready(channel)) {
2847 switch_channel_set_state(channel, CS_HIBERNATE);
2848 goto done;
2849 } else {
2850 ts = switch_micro_time_now();
2851 switch_time_exp_lt(&tm, ts);
2852 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2853 switch_channel_set_variable(channel, "fifo_status", cd.do_orbit ? "TIMEOUT" : "ABORTED");
2854 switch_channel_set_variable(channel, "fifo_timestamp", date);
2855
2856 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2857 switch_channel_event_set_data(channel, event);
2858 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2859 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort");
2860 switch_event_fire(&event);
2861 }
2862
2863 switch_mutex_lock(globals.mutex);
2864 switch_mutex_lock(node->update_mutex);
2865 node_remove_uuid(node, uuid);
2866 switch_mutex_unlock(node->update_mutex);
2867 send_presence(node);
2868 check_cancel(node);
2869 switch_mutex_unlock(globals.mutex);
2870 }
2871
2872 if ((switch_true(switch_channel_get_variable(channel, "fifo_caller_exit_to_orbit")) || cd.do_orbit) && cd.orbit_exten) {
2873 if (orbit_ann) {
2874 switch_ivr_play_file(session, NULL, orbit_ann, NULL);
2875 }
2876
2877 if (strcmp(cd.orbit_exten, "_continue_")) {
2878 switch_ivr_session_transfer(session, cd.orbit_exten, cd.orbit_dialplan, cd.orbit_context);
2879 }
2880 }
2881
2882 cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
2883
2884 goto done;
2885 } else { /* consumer */
2886 switch_event_t *pop = NULL;
2887 switch_frame_t *read_frame;
2888 switch_status_t status;
2889 switch_core_session_t *other_session;
2890 switch_input_args_t args = { 0 };
2891 const char *pop_order = NULL;
2892 int custom_pop = 0;
2893 int pop_array[MAX_PRI] = { 0 };
2894 char *pop_list[MAX_PRI] = { 0 };
2895 const char *fifo_consumer_wrapup_sound = NULL;
2896 const char *fifo_consumer_wrapup_key = NULL;
2897 const char *sfifo_consumer_wrapup_time = NULL;
2898 uint32_t fifo_consumer_wrapup_time = 0;
2899 switch_time_t wrapup_time_elapsed = 0, wrapup_time_started = 0, wrapup_time_remaining = 0;
2900 const char *my_id;
2901 char buf[5] = "";
2902 const char *strat_str = switch_channel_get_variable(channel, "fifo_strategy");
2903 fifo_strategy_t strat = STRAT_WAITING_LONGER;
2904 const char *url = NULL;
2905 const char *caller_uuid = NULL;
2906 const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid");
2907 switch_event_t *event;
2908 const char *cid_name = NULL, *cid_number = NULL;
2909
2910 //const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count");
2911 //int do_track = switch_true(track_use_count);
2912
2913 if (switch_core_event_hook_remove_receive_message(session, messagehook) == SWITCH_STATUS_SUCCESS) {
2914 dec_use_count(session, NULL);
2915 switch_core_event_hook_remove_state_change(session, hanguphook);
2916 switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING);
2917 }
2918
2919 if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
2920 cid_name = switch_channel_get_variable(channel, "callee_id_name");
2921 cid_number = switch_channel_get_variable(channel, "callee_id_number");
2922
2923 if (!cid_name) {
2924 cid_name = switch_channel_get_variable(channel, "destination_number");
2925 }
2926 if (!cid_number) {
2927 cid_number = cid_name;
2928 }
2929 } else {
2930 cid_name = switch_channel_get_variable(channel, "caller_id_name");
2931 cid_number = switch_channel_get_variable(channel, "caller_id_number");
2932 }
2933
2934 if (!zstr(strat_str)) {
2935 if (!strcasecmp(strat_str, "more_ppl")) {
2936 strat = STRAT_MORE_PPL;
2937 } else if (!strcasecmp(strat_str, "waiting_longer")) {
2938 strat = STRAT_WAITING_LONGER;
2939 } else {
2940 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Invalid strategy\n");
2941 goto done;
2942 }
2943 }
2944
2945 if (argc > 2) {
2946 if (!strcasecmp(argv[2], "nowait")) {
2947 do_wait = 0;
2948 } else if (strcasecmp(argv[2], "wait")) {
2949 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2950 goto done;
2951 }
2952 }
2953
2954 if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) {
2955 my_id = switch_core_session_get_uuid(session);
2956 }
2957
2958 if (do_wait) {
2959 for (i = 0; i < node_count; i++) {
2960 if (!(node = node_list[i])) {
2961 continue;
2962 }
2963 switch_mutex_lock(node->mutex);
2964 node->consumer_count++;
2965 switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session);
2966 switch_mutex_unlock(node->mutex);
2967 }
2968 switch_channel_answer(channel);
2969 }
2970
2971 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2972 switch_channel_event_set_data(channel, event);
2973 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2974 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_start");
2975 switch_event_fire(&event);
2976 }
2977
2978 ts = switch_micro_time_now();
2979 switch_time_exp_lt(&tm, ts);
2980 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2981 switch_channel_set_variable(channel, "fifo_status", "WAITING");
2982 switch_channel_set_variable(channel, "fifo_timestamp", date);
2983
2984 if ((pop_order = switch_channel_get_variable(channel, "fifo_pop_order"))) {
2985 char *tmp = switch_core_session_strdup(session, pop_order);
2986 int x;
2987 custom_pop = switch_separate_string(tmp, ',', pop_list, (sizeof(pop_list) / sizeof(pop_list[0])));
2988 if (custom_pop >= MAX_PRI) {
2989 custom_pop = MAX_PRI - 1;
2990 }
2991
2992 for (x = 0; x < custom_pop; x++) {
2993 int temp;
2994 switch_assert(pop_list[x]);
2995 temp = atoi(pop_list[x]);
2996 if (temp > -1 && temp < MAX_PRI) {
2997 pop_array[x] = temp;
2998 }
2999 }
3000 } else {
3001 int x = 0;
3002 for (x = 0; x < MAX_PRI; x++) {
3003 pop_array[x] = x;
3004 }
3005 }
3006
3007 while (switch_channel_ready(channel)) {
3008 int x = 0, winner = -1;
3009 switch_time_t longest = (0xFFFFFFFFFFFFFFFFULL / 2);
3010 uint32_t importance = 0, waiting = 0, most_waiting = 0;
3011
3012 pop = NULL;
3013
3014 if (moh && do_wait) {
3015 switch_status_t moh_status;
3016 memset(&args, 0, sizeof(args));
3017 args.read_frame_callback = consumer_read_frame_callback;
3018 args.user_data = node_list;
3019 moh_status = switch_ivr_play_file(session, NULL, moh, &args);
3020
3021 if (!SWITCH_READ_ACCEPTABLE(moh_status)) {
3022 break;
3023 }
3024 }
3025
3026 /* Before we can pick a caller we have to decide on a fifo
3027 node to service if the consumer can service more than
3028 one.
3029
3030 If all fifos have an importance of zero, we'll find the
3031 first node that wins based on the chosen strategy.
3032
3033 The `waiting_longer` strategy will choose the node that
3034 hasn't been empty for the longest time.
3035
3036 The `more_ppl` strategy will choose the node that has
3037 the most people waiting.
3038
3039 If a node has an importance value set, it will cause us
3040 to ignore later nodes with equivalent or lower
3041 importance values. This means that a node with the
3042 same importance that would otherwise win based on the
3043 strategy will not be considered at all if it comes
3044 later in the list. Note also that the high importance
3045 node may still lose if a considered fifo earlier in the
3046 list beats it per the strategy.
3047
3048 Note that when the consumer has been delivered by an
3049 outbound strategy there will only be one fifo node
3050 passed to us, so neither the importance nor the
3051 strategy here will have any effect.
3052 */
3053 for (i = 0; i < node_count; i++) {
3054 if (!(node = node_list[i])) {
3055 continue;
3056 }
3057
3058 if ((waiting = node_caller_count(node))) {
3059 if (!importance || node->importance > importance) {
3060 if (strat == STRAT_WAITING_LONGER) {
3061 if (node->start_waiting < longest) {
3062 longest = node->start_waiting;
3063 winner = i;
3064 }
3065 } else {
3066 if (waiting > most_waiting) {
3067 most_waiting = waiting;
3068 winner = i;
3069 }
3070 }
3071 }
3072
3073 if (node->importance > importance) {
3074 importance = node->importance;
3075 }
3076 }
3077 }
3078
3079 if (winner > -1) {
3080 node = node_list[winner];
3081 } else {
3082 node = NULL;
3083 }
3084
3085 if (node) {
3086 const char *varval, *check = NULL;
3087
3088 check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
3089
3090 /* Handle predestined calls, including calls from the ringall strategy */
3091 if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
3092 if (check_bridge_call(varval) && switch_true(check)) {
3093 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
3094 switch_channel_get_name(channel));
3095 goto done;
3096 }
3097
3098 cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
3099
3100 for (x = 0; x < MAX_PRI; x++) {
3101 if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3102 cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
3103 break;
3104 }
3105 }
3106 if (!pop && switch_true(check)) {
3107 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
3108 switch_channel_get_name(channel));
3109
3110 goto done;
3111 }
3112 }
3113
3114 if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
3115 for (x = 0; x < MAX_PRI; x++) {
3116 if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_skill",
3117 varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3118 break;
3119 }
3120 }
3121 }
3122
3123 if (!pop) {
3124 for (x = 0; x < MAX_PRI; x++) {
3125 if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_vip", "true",
3126 &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3127 break;
3128 }
3129 }
3130 }
3131
3132 if (!pop) {
3133 if (custom_pop) {
3134 for (x = 0; x < MAX_PRI; x++) {
3135 if (fifo_queue_pop(node->fifo_list[pop_array[x]], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3136 break;
3137 }
3138 }
3139 } else {
3140 for (x = 0; x < MAX_PRI; x++) {
3141 if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3142 break;
3143 }
3144 }
3145 }
3146 }
3147
3148 if (pop && !node_caller_count(node)) {
3149 switch_mutex_lock(node->update_mutex);
3150 node->start_waiting = 0;
3151 switch_mutex_unlock(node->update_mutex);
3152 }
3153 }
3154
3155 if (!pop) {
3156 if (!do_wait) {
3157 break;
3158 }
3159
3160 status = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0);
3161
3162 if (!SWITCH_READ_ACCEPTABLE(status)) {
3163 break;
3164 }
3165
3166 continue;
3167 }
3168
3169 url = switch_event_get_header(pop, "dial-url");
3170 caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id"));
3171 switch_event_destroy(&pop);
3172
3173 if (url) {
3174 switch_call_cause_t cause = SWITCH_CAUSE_NONE;
3175 const char *o_announce = NULL;
3176
3177 if ((o_announce = switch_channel_get_variable(channel, "fifo_outbound_announce"))) {
3178 status = switch_ivr_play_file(session, NULL, o_announce, NULL);
3179 if (!SWITCH_READ_ACCEPTABLE(status)) {
3180 break;
3181 }
3182 }
3183
3184 if (switch_ivr_originate(session, &other_session, &cause, url, 120, NULL, NULL, NULL, NULL, NULL, SOF_NONE, NULL, NULL) != SWITCH_STATUS_SUCCESS) {
3185 other_session = NULL;
3186 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Originate to [%s] failed, cause: %s\n", url,
3187 switch_channel_cause2str(cause));
3188
3189 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3190 switch_channel_event_set_data(channel, event);
3191 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3192 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_outbound");
3193 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Result", "failure:%s", switch_channel_cause2str(cause));
3194 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Outbound-URL", url);
3195 switch_event_fire(&event);
3196 }
3197 } else {
3198 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3199 switch_channel_event_set_data(channel, event);
3200 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3201 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_outbound");
3202 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Result", "success");
3203 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Outbound-URL", url);
3204 switch_event_fire(&event);
3205 }
3206 url = NULL;
3207 caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session));
3208 }
3209 } else {
3210 if ((other_session = switch_core_session_locate(caller_uuid))) {
3211 switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
3212 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3213 switch_channel_event_set_data(other_channel, event);
3214 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3215 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_pop");
3216 switch_event_fire(&event);
3217 }
3218 }
3219 }
3220
3221 if (node && other_session) {
3222 switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
3223 switch_caller_profile_t *originator_cp, *originatee_cp;
3224 const char *o_announce = NULL;
3225 const char *record_template = switch_channel_get_variable(channel, "fifo_record_template");
3226 char *expanded = NULL;
3227 char *sql = NULL;
3228 long epoch_start, epoch_end;
3229
3230 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3231 switch_channel_event_set_data(channel, event);
3232 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3233 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop");
3234 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session));
3235 switch_event_fire(&event);
3236 }
3237
3238 if ((o_announce = switch_channel_get_variable(other_channel, "fifo_override_announce"))) {
3239 announce = o_announce;
3240 }
3241
3242 if (announce) {
3243 status = switch_ivr_play_file(session, NULL, announce, NULL);
3244 if (!SWITCH_READ_ACCEPTABLE(status)) {
3245 break;
3246 }
3247 }
3248
3249 switch_channel_set_variable(other_channel, "fifo_serviced_by", my_id);
3250 switch_channel_set_variable(other_channel, "fifo_serviced_uuid", switch_core_session_get_uuid(session));
3251 switch_channel_set_flag(other_channel, CF_BREAK);
3252
3253 while (switch_channel_ready(channel) && switch_channel_ready(other_channel) &&
3254 switch_channel_test_app_flag_key(FIFO_APP_KEY, other_channel, FIFO_APP_BRIDGE_TAG)) {
3255 status = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0);
3256 if (!SWITCH_READ_ACCEPTABLE(status)) {
3257 break;
3258 }
3259 }
3260
3261 if (!(switch_channel_ready(channel))) {
3262 const char *app = switch_channel_get_variable(other_channel, "current_application");
3263 const char *arg = switch_channel_get_variable(other_channel, "current_application_data");
3264 switch_caller_extension_t *extension = NULL;
3265
3266 switch_channel_set_variable_printf(channel, "last_sent_callee_id_name", "%s (AGENT FAIL)",
3267 switch_channel_get_variable(other_channel, "caller_id_name"));
3268 switch_channel_set_variable(channel, "last_sent_callee_id_number", switch_channel_get_variable(other_channel, "caller_id_number"));
3269
3270 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING,
3271 "Customer %s %s [%s] appears to be abandoned by agent %s [%s] "
3272 "but is still on the line, redirecting them back to the queue with VIP status.\n",
3273 switch_channel_get_name(other_channel),
3274 switch_channel_get_variable(other_channel, "caller_id_name"),
3275 switch_channel_get_variable(other_channel, "caller_id_number"),
3276 switch_channel_get_variable(channel, "caller_id_name"),
3277 switch_channel_get_variable(channel, "caller_id_number"));
3278
3279 switch_channel_wait_for_state_timeout(other_channel, CS_HIBERNATE, 5000);
3280
3281 send_presence(node);
3282 check_cancel(node);
3283
3284 if (app) {
3285 extension = switch_caller_extension_new(other_session, app, arg);
3286 switch_caller_extension_add_application(other_session, extension, app, arg);
3287 switch_channel_set_caller_extension(other_channel, extension);
3288 switch_channel_set_state(other_channel, CS_EXECUTE);
3289 } else {
3290 switch_channel_hangup(other_channel, SWITCH_CAUSE_NORMAL_CLEARING);
3291 }
3292 switch_channel_set_variable(other_channel, "fifo_vip", "true");
3293
3294 switch_core_session_rwunlock(other_session);
3295 break;
3296 }
3297
3298 switch_channel_answer(channel);
3299
3300 if (switch_channel_inbound_display(other_channel)) {
3301 if (switch_channel_direction(other_channel) == SWITCH_CALL_DIRECTION_INBOUND) {
3302 switch_channel_set_flag(other_channel, CF_BLEG);
3303 }
3304 }
3305
3306 switch_channel_step_caller_profile(channel);
3307 switch_channel_step_caller_profile(other_channel);
3308
3309 originator_cp = switch_channel_get_caller_profile(channel);
3310 originatee_cp = switch_channel_get_caller_profile(other_channel);
3311
3312 switch_channel_set_originator_caller_profile(other_channel, switch_caller_profile_clone(other_session, originator_cp));
3313 switch_channel_set_originatee_caller_profile(channel, switch_caller_profile_clone(session, originatee_cp));
3314
3315 originator_cp->callee_id_name = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_name);
3316 originator_cp->callee_id_number = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_number);
3317
3318 originatee_cp->callee_id_name = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_name);
3319 originatee_cp->callee_id_number = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_number);
3320
3321 originatee_cp->caller_id_name = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_name);
3322 originatee_cp->caller_id_number = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_number);
3323
3324 ts = switch_micro_time_now();
3325 switch_time_exp_lt(&tm, ts);
3326 epoch_start = (long)switch_epoch_time_now(NULL);
3327 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
3328 switch_channel_set_variable(channel, "fifo_status", "TALKING");
3329 switch_channel_set_variable(channel, "fifo_target", caller_uuid);
3330 switch_channel_set_variable(channel, "fifo_timestamp", date);
3331 switch_channel_set_variable_printf(channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
3332 switch_channel_set_variable(channel, "fifo_role", "consumer");
3333
3334 switch_channel_set_variable(other_channel, "fifo_status", "TALKING");
3335 switch_channel_set_variable(other_channel, "fifo_timestamp", date);
3336 switch_channel_set_variable_printf(other_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
3337 switch_channel_set_variable(other_channel, "fifo_target", switch_core_session_get_uuid(session));
3338 switch_channel_set_variable(other_channel, "fifo_role", "caller");
3339
3340 send_presence(node);
3341
3342 if (record_template) {
3343 expanded = switch_channel_expand_variables(other_channel, record_template);
3344 switch_ivr_record_session(session, expanded, 0, NULL);
3345 }
3346
3347 switch_core_media_bug_resume(session);
3348 switch_core_media_bug_resume(other_session);
3349
3350 switch_process_import(session, other_channel, "fifo_caller_consumer_import", switch_channel_get_variable(channel, "fifo_import_prefix"));
3351 switch_process_import(other_session, channel, "fifo_consumer_caller_import", switch_channel_get_variable(other_channel, "fifo_import_prefix"));
3352
3353 if (outbound_id) {
3354 cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
3355 add_bridge_call(outbound_id);
3356
3357 sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%q'",
3358 switch_epoch_time_now(NULL), outbound_id);
3359
3360 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
3361 fifo_inc_use_count(outbound_id);
3362 }
3363
3364 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3365 switch_channel_event_set_data(channel, event);
3366 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3367 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-start");
3368 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "onhook");
3369 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", cid_name);
3370 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", cid_number);
3371 if (outbound_id) {
3372 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3373 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3374 }
3375 switch_event_fire(&event);
3376 }
3377
3378 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3379 switch_channel_event_set_data(channel, event);
3380 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3381 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-start");
3382 if (outbound_id) {
3383 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3384 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3385 }
3386
3387 switch_event_fire(&event);
3388 }
3389 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3390 switch_channel_event_set_data(other_channel, event);
3391 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3392 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-start");
3393 switch_event_fire(&event);
3394 }
3395
3396 add_bridge_call(switch_core_session_get_uuid(other_session));
3397 add_bridge_call(switch_core_session_get_uuid(session));
3398
3399 sql = switch_mprintf("insert into fifo_bridge "
3400 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
3401 "values ('%q','%q','%q','%q','%q','%q',%ld)",
3402 node->name,
3403 switch_core_session_get_uuid(other_session),
3404 switch_str_nil(switch_channel_get_variable(other_channel, "caller_id_name")),
3405 switch_str_nil(switch_channel_get_variable(other_channel, "caller_id_number")),
3406 switch_core_session_get_uuid(session),
3407 switch_str_nil(outbound_id),
3408 (long) switch_epoch_time_now(NULL)
3409 );
3410
3411 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
3412
3413 switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
3414 switch_channel_set_variable(other_channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(session));
3415
3416 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", "true");
3417 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", "caller");
3418 switch_channel_set_variable(switch_core_session_get_channel(session), "fifo_initiated_bridge", "true");
3419 switch_channel_set_variable(switch_core_session_get_channel(session), "fifo_bridge_role", "consumer");
3420
3421 switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
3422
3423 if (switch_channel_test_flag(other_channel, CF_TRANSFER) && switch_channel_up(other_channel)) {
3424 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
3425 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
3426 }
3427
3428 if (switch_channel_test_flag(channel, CF_TRANSFER) && switch_channel_up(channel)) {
3429 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
3430 switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
3431 }
3432
3433 if (outbound_id) {
3434 long now = (long) switch_epoch_time_now(NULL);
3435
3436 sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, "
3437 "outbound_call_total_count=outbound_call_total_count+1, "
3438 "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%q' and use_count > 0",
3439 now, now, outbound_id);
3440
3441 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
3442
3443 del_bridge_call(outbound_id);
3444 fifo_dec_use_count(outbound_id);
3445 }
3446
3447 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3448 uint64_t hold_usec = 0, tt_usec = 0;
3449 switch_channel_event_set_data(channel, event);
3450 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", arg_fifo_name);
3451 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-stop");
3452 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "onhook");
3453 if (outbound_id) {
3454 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3455 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3456 }
3457 hold_usec = originator_cp->times->hold_accum;
3458 tt_usec = (switch_micro_time_now() - originator_cp->times->bridged) - hold_usec;
3459 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-us", "%"SWITCH_TIME_T_FMT, originator_cp->times->bridged);
3460 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000));
3461 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000000));
3462 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
3463 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
3464 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
3465 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
3466 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
3467 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
3468
3469 switch_event_fire(&event);
3470 }
3471
3472 del_bridge_call(switch_core_session_get_uuid(session));
3473 del_bridge_call(switch_core_session_get_uuid(other_session));
3474
3475 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3476 switch_channel_event_set_data(channel, event);
3477 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3478 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
3479 if (outbound_id) {
3480 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3481 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3482 }
3483 switch_event_fire(&event);
3484 }
3485 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3486 uint64_t hold_usec = 0, tt_usec = 0;
3487 switch_channel_event_set_data(other_channel, event);
3488 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3489 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop");
3490 hold_usec = originatee_cp->times->hold_accum;
3491 tt_usec = (switch_micro_time_now() - originatee_cp->times->bridged) - hold_usec;
3492 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
3493 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
3494 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
3495 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
3496 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
3497 switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
3498 switch_event_fire(&event);
3499 }
3500
3501 epoch_end = (long)switch_epoch_time_now(NULL);
3502
3503 switch_channel_set_variable_printf(channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
3504 switch_channel_set_variable_printf(channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
3505
3506 switch_channel_set_variable_printf(other_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
3507 switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
3508
3509 sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
3510 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
3511
3512 if (switch_channel_ready(channel)) {
3513 switch_core_media_bug_pause(session);
3514 }
3515
3516 if (record_template) {
3517 switch_ivr_stop_record_session(session, expanded);
3518 if (expanded != record_template) {
3519 switch_safe_free(expanded);
3520 }
3521 }
3522
3523 ts = switch_micro_time_now();
3524 switch_time_exp_lt(&tm, ts);
3525 switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
3526 switch_channel_set_variable(channel, "fifo_status", "WAITING");
3527 switch_channel_set_variable(channel, "fifo_timestamp", date);
3528
3529 switch_channel_set_variable(other_channel, "fifo_status", "DONE");
3530 switch_channel_set_variable(other_channel, "fifo_timestamp", date);
3531
3532 send_presence(node);
3533 check_cancel(node);
3534
3535 switch_core_session_rwunlock(other_session);
3536
3537 if (!do_wait || !switch_channel_ready(channel)) {
3538 break;
3539 }
3540
3541 fifo_consumer_wrapup_sound = switch_channel_get_variable(channel, "fifo_consumer_wrapup_sound");
3542 fifo_consumer_wrapup_key = switch_channel_get_variable(channel, "fifo_consumer_wrapup_key");
3543 sfifo_consumer_wrapup_time = switch_channel_get_variable(channel, "fifo_consumer_wrapup_time");
3544 if (!zstr(sfifo_consumer_wrapup_time)) {
3545 fifo_consumer_wrapup_time = atoi(sfifo_consumer_wrapup_time);
3546 } else {
3547 fifo_consumer_wrapup_time = 5000;
3548 }
3549
3550 memset(buf, 0, sizeof(buf));
3551
3552 if (fifo_consumer_wrapup_time || !zstr(fifo_consumer_wrapup_key)) {
3553 switch_channel_set_variable(channel, "fifo_status", "WRAPUP");
3554 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3555 switch_channel_event_set_data(channel, event);
3556 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3557 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_wrapup");
3558 switch_event_fire(&event);
3559 }
3560 }
3561
3562 if (!zstr(fifo_consumer_wrapup_sound)) {
3563 memset(&args, 0, sizeof(args));
3564 args.buf = buf;
3565 args.buflen = sizeof(buf);
3566 status = switch_ivr_play_file(session, NULL, fifo_consumer_wrapup_sound, &args);
3567 if (!SWITCH_READ_ACCEPTABLE(status)) {
3568 break;
3569 }
3570 }
3571
3572 if (fifo_consumer_wrapup_time) {
3573 wrapup_time_started = switch_micro_time_now();
3574
3575 if (!zstr(fifo_consumer_wrapup_key) && strcmp(buf, fifo_consumer_wrapup_key)) {
3576 while (switch_channel_ready(channel)) {
3577 char terminator = 0;
3578
3579 if (fifo_consumer_wrapup_time) {
3580 wrapup_time_elapsed = (switch_micro_time_now() - wrapup_time_started) / 1000;
3581 if (wrapup_time_elapsed > fifo_consumer_wrapup_time) {
3582 break;
3583 } else {
3584 wrapup_time_remaining = fifo_consumer_wrapup_time - wrapup_time_elapsed + 100;
3585 }
3586 }
3587
3588 switch_ivr_collect_digits_count(session, buf, sizeof(buf) - 1, 1, fifo_consumer_wrapup_key, &terminator, 0, 0,
3589 (uint32_t) wrapup_time_remaining);
3590 if ((terminator == *fifo_consumer_wrapup_key) || !(switch_channel_ready(channel))) {
3591 break;
3592 }
3593 }
3594 } else if ((zstr(fifo_consumer_wrapup_key) || !strcmp(buf, fifo_consumer_wrapup_key))) {
3595 while (switch_channel_ready(channel)) {
3596 wrapup_time_elapsed = (switch_micro_time_now() - wrapup_time_started) / 1000;
3597 if (wrapup_time_elapsed > fifo_consumer_wrapup_time) {
3598 break;
3599 }
3600 switch_yield(500);
3601 }
3602 }
3603 }
3604
3605 switch_channel_set_variable(channel, "fifo_status", "WAITING");
3606 }
3607
3608 if (do_wait && switch_channel_ready(channel)) {
3609 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3610 switch_channel_event_set_data(channel, event);
3611 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3612 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_reentrance");
3613 switch_event_fire(&event);
3614 }
3615 }
3616 }
3617
3618 if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3619 switch_channel_event_set_data(channel, event);
3620 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3621 switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop");
3622 switch_event_fire(&event);
3623 }
3624
3625 if (do_wait) {
3626 for (i = 0; i < node_count; i++) {
3627 if (!(node = node_list[i])) {
3628 continue;
3629 }
3630 switch_mutex_lock(node->mutex);
3631 switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session));
3632 node->consumer_count--;
3633 switch_mutex_unlock(node->mutex);
3634 }
3635 }
3636
3637 if (outbound_id && switch_channel_up(channel)) {
3638 switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s is still alive, tracking call.\n", switch_channel_get_name(channel));
3639 fifo_track_call_function(session, outbound_id);
3640 }
3641 }
3642
3643 done:
3644
3645 if (!consumer && in_table) {
3646 fifo_caller_del(switch_core_session_get_uuid(session));
3647 }
3648
3649 if (switch_true(switch_channel_get_variable(channel, "fifo_destroy_after_use"))) {
3650 do_destroy = 1;
3651 }
3652
3653 switch_mutex_lock(globals.mutex);
3654 for (i = 0; i < node_count; i++) {
3655 if (!(node = node_list[i])) {
3656 continue;
3657 }
3658 switch_thread_rwlock_unlock(node->rwlock);
3659
3660 if (node->ready == 1 && do_destroy && node_caller_count(node) == 0 && node->consumer_count == 0) {
3661 switch_core_hash_delete(globals.fifo_hash, node->name);
3662 node->ready = 0;
3663 }
3664 }
3665 switch_mutex_unlock(globals.mutex);
3666
3667 switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
3668
3669 switch_core_media_bug_resume(session);
3670 }
3671
3672 struct xml_helper {
3673 switch_xml_t xml;
3674 fifo_node_t *node;
3675 char *container;
3676 char *tag;
3677 int cc_off;
3678 int row_off;
3679 int verbose;
3680 };
3681
xml_callback(void * pArg,int argc,char ** argv,char ** columnNames)3682 static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
3683 {
3684 struct xml_helper *h = (struct xml_helper *) pArg;
3685 switch_xml_t x_out;
3686 int c_off = 0;
3687 char exp_buf[128] = { 0 };
3688 switch_time_exp_t tm;
3689 switch_time_t etime = 0;
3690 char atime[128] = "";
3691 char *expires = exp_buf, *tb = atime;
3692 int arg = 0;
3693
3694 for(arg = 0; arg < argc; arg++) {
3695 if (!argv[arg]) {
3696 argv[arg] = "";
3697 }
3698 }
3699
3700 if (argv[7]) {
3701 if ((etime = atol(argv[7]))) {
3702 switch_size_t retsize;
3703
3704 switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3705 switch_strftime_nocheck(exp_buf, &retsize, sizeof(exp_buf), "%Y-%m-%d %T", &tm);
3706 } else {
3707 switch_set_string(exp_buf, "now");
3708 }
3709 }
3710
3711 if (atoi(argv[13])) {
3712 arg = 17;
3713 } else {
3714 arg = 18;
3715 }
3716
3717 if ((etime = atol(argv[arg]))) {
3718 switch_size_t retsize;
3719 switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3720 switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3721 } else {
3722 switch_set_string(atime, "now");
3723 }
3724
3725 x_out = switch_xml_add_child_d(h->xml, h->tag, c_off++);
3726 switch_xml_set_attr_d(x_out, "simo", argv[3]);
3727 switch_xml_set_attr_d(x_out, "use_count", argv[4]);
3728 switch_xml_set_attr_d(x_out, "timeout", argv[5]);
3729 switch_xml_set_attr_d(x_out, "lag", argv[6]);
3730 switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]);
3731 switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]);
3732 switch_xml_set_attr_d(x_out, "taking-calls", argv[13]);
3733 switch_xml_set_attr_d(x_out, "status", argv[14]);
3734
3735 switch_xml_set_attr_d(x_out, "outbound-call-total-count", argv[15]);
3736 switch_xml_set_attr_d(x_out, "outbound-fail-total-count", argv[16]);
3737
3738 if (arg == 17) {
3739 switch_xml_set_attr_d(x_out, "logged-on-since", tb);
3740 } else {
3741 switch_xml_set_attr_d(x_out, "logged-off-since", tb);
3742 }
3743
3744 switch_xml_set_attr_d(x_out, "manual-calls-out-count", argv[19]);
3745 switch_xml_set_attr_d(x_out, "manual-calls-in-count", argv[20]);
3746 switch_xml_set_attr_d(x_out, "manual-calls-out-total-count", argv[21]);
3747 switch_xml_set_attr_d(x_out, "manual-calls-in-total-count", argv[22]);
3748
3749 if (argc > 23) {
3750 switch_xml_set_attr_d(x_out, "ring-count", argv[23]);
3751
3752 if ((etime = atol(argv[24]))) {
3753 switch_size_t retsize;
3754 switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3755 switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3756 } else {
3757 switch_set_string(atime, "never");
3758 }
3759
3760 switch_xml_set_attr_d(x_out, "start-time", tb);
3761
3762 if ((etime = atol(argv[25]))) {
3763 switch_size_t retsize;
3764 switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3765 switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3766 } else {
3767 switch_set_string(atime, "never");
3768 }
3769
3770 switch_xml_set_attr_d(x_out, "stop-time", tb);
3771 }
3772
3773 switch_xml_set_attr_d(x_out, "next-available", expires);
3774
3775 switch_xml_set_txt_d(x_out, argv[2]);
3776
3777 return 0;
3778 }
3779
xml_outbound(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3780 static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3781 {
3782 struct xml_helper h = { 0 };
3783 char *sql;
3784
3785 if (!strcmp(node->name, MANUAL_QUEUE_NAME)) {
3786 sql = switch_mprintf("select uuid, '%q', originate_string, simo_count, use_count, timeout,"
3787 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count,"
3788 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
3789 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count from fifo_outbound "
3790 "group by "
3791 "uuid, originate_string, simo_count, use_count, timeout,"
3792 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count,"
3793 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
3794 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count",
3795 MANUAL_QUEUE_NAME);
3796 } else {
3797 sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
3798 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, "
3799 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time, "
3800 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count,"
3801 "ring_count,start_time,stop_time "
3802 "from fifo_outbound where fifo_name = '%q'", node->name);
3803 }
3804
3805 h.xml = xml;
3806 h.node = node;
3807 h.container = container;
3808 h.tag = tag;
3809 h.cc_off = cc_off;
3810 h.row_off = 0;
3811 h.verbose = verbose;
3812
3813 h.xml = switch_xml_add_child_d(h.xml, h.container, h.cc_off++);
3814
3815 fifo_execute_sql_callback(globals.sql_mutex, sql, xml_callback, &h);
3816
3817 switch_safe_free(sql);
3818
3819 return h.cc_off;
3820 }
3821
xml_bridge_callback(void * pArg,int argc,char ** argv,char ** columnNames)3822 static int xml_bridge_callback(void *pArg, int argc, char **argv, char **columnNames)
3823 {
3824 struct xml_helper *h = (struct xml_helper *) pArg;
3825 switch_xml_t x_bridge, x_var, x_caller, x_consumer, x_cdr;
3826 char exp_buf[128] = "";
3827 switch_time_exp_t tm;
3828 switch_time_t etime = 0;
3829 int off = 0, tag_off = 0;
3830 switch_core_session_t *session;
3831 char url_buf[512] = "";
3832 char *encoded;
3833
3834 if ((etime = atol(argv[6]))) {
3835 switch_size_t retsize;
3836
3837 switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3838 switch_strftime_nocheck(exp_buf, &retsize, sizeof(exp_buf), "%Y-%m-%d %T", &tm);
3839 } else {
3840 switch_set_string(exp_buf, "now");
3841 }
3842
3843 x_bridge = switch_xml_add_child_d(h->xml, h->tag, h->row_off++);
3844
3845 switch_xml_set_attr_d(x_bridge, "fifo_name", argv[0]);
3846 switch_xml_set_attr_d_buf(x_bridge, "bridge_start", exp_buf);
3847 switch_xml_set_attr_d(x_bridge, "bridge_start_epoch", argv[6]);
3848
3849 x_caller = switch_xml_add_child_d(x_bridge, "caller", tag_off++);
3850
3851 switch_xml_set_attr_d(x_caller, "uuid", argv[1]);
3852
3853 encoded = switch_url_encode(argv[2], url_buf, sizeof(url_buf));
3854 switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
3855
3856 encoded = switch_url_encode(argv[3], url_buf, sizeof(url_buf));
3857 switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
3858
3859 if (h->verbose) {
3860 if ((session = switch_core_session_locate(argv[1]))) {
3861 x_cdr = switch_xml_add_child_d(x_caller, "cdr", 0);
3862 switch_ivr_generate_xml_cdr(session, &x_cdr);
3863 switch_core_session_rwunlock(session);
3864 }
3865 }
3866
3867 off = 0;
3868
3869 x_consumer = switch_xml_add_child_d(x_bridge, "consumer", tag_off++);
3870
3871 x_var = switch_xml_add_child_d(x_consumer, "uuid", off++);
3872 switch_xml_set_txt_d(x_var, argv[4]);
3873 x_var = switch_xml_add_child_d(x_consumer, "outgoing_uuid", off++);
3874 switch_xml_set_txt_d(x_var, argv[5]);
3875
3876 if (h->verbose) {
3877 if ((session = switch_core_session_locate(argv[1]))) {
3878 x_cdr = switch_xml_add_child_d(x_consumer, "cdr", 0);
3879 switch_ivr_generate_xml_cdr(session, &x_cdr);
3880 switch_core_session_rwunlock(session);
3881 }
3882 }
3883
3884 return 0;
3885 }
3886
xml_bridges(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3887 static int xml_bridges(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3888 {
3889 struct xml_helper h = { 0 };
3890 char *sql = switch_mprintf("select "
3891 "fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start "
3892 "from fifo_bridge where fifo_name = '%q'", node->name);
3893
3894 h.xml = xml;
3895 h.node = node;
3896 h.container = container;
3897 h.tag = tag;
3898 h.cc_off = cc_off;
3899 h.row_off = 0;
3900 h.verbose = verbose;
3901
3902 h.xml = switch_xml_add_child_d(h.xml, h.container, h.cc_off++);
3903
3904 fifo_execute_sql_callback(globals.sql_mutex, sql, xml_bridge_callback, &h);
3905
3906 switch_safe_free(sql);
3907
3908 return h.cc_off;
3909 }
3910
xml_hash(switch_xml_t xml,switch_hash_t * hash,char * container,char * tag,int cc_off,int verbose)3911 static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char *tag, int cc_off, int verbose)
3912 {
3913 switch_xml_t x_tmp, x_caller, x_cp;
3914 switch_hash_index_t *hi;
3915 switch_core_session_t *session;
3916 switch_channel_t *channel;
3917 void *val;
3918 const void *var;
3919
3920 x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
3921 switch_assert(x_tmp);
3922
3923 for (hi = switch_core_hash_first(hash); hi; hi = switch_core_hash_next(&hi)) {
3924 int c_off = 0, d_off = 0;
3925 const char *status;
3926 const char *ts;
3927 char url_buf[512] = "";
3928 char *encoded;
3929
3930 switch_core_hash_this(hi, &var, NULL, &val);
3931 session = (switch_core_session_t *) val;
3932 channel = switch_core_session_get_channel(session);
3933 x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
3934 switch_assert(x_caller);
3935
3936 switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
3937
3938 if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
3939 switch_xml_set_attr_d(x_caller, "status", status);
3940 }
3941
3942 if ((status = switch_channel_get_variable(channel, "caller_id_name"))) {
3943 encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
3944 switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
3945 }
3946
3947 if ((status = switch_channel_get_variable(channel, "caller_id_number"))) {
3948 encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
3949 switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
3950 }
3951
3952 if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
3953 switch_xml_set_attr_d(x_caller, "timestamp", ts);
3954 }
3955
3956 if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
3957 switch_xml_set_attr_d(x_caller, "target", ts);
3958 }
3959
3960 if (verbose) {
3961 if (!(x_cp = switch_xml_add_child_d(x_caller, "cdr", d_off++))) {
3962 abort();
3963 }
3964
3965 switch_ivr_generate_xml_cdr(session, &x_cp);
3966 }
3967 }
3968
3969 return cc_off;
3970 }
3971
xml_caller(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3972 static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3973 {
3974 switch_xml_t x_tmp, x_caller, x_cp;
3975 int i, x;
3976 switch_core_session_t *session;
3977 switch_channel_t *channel;
3978
3979 x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
3980 switch_assert(x_tmp);
3981
3982 for (x = 0; x < MAX_PRI; x++) {
3983 fifo_queue_t *q = node->fifo_list[x];
3984
3985 switch_mutex_lock(q->mutex);
3986
3987 for (i = 0; i < q->idx; i++) {
3988 int c_off = 0, d_off = 0;
3989 const char *status;
3990 const char *ts;
3991 const char *uuid = switch_event_get_header(q->data[i], "unique-id");
3992 char sl[30] = "";
3993 char url_buf[512] = "";
3994 char *encoded;
3995
3996 if (!uuid) {
3997 continue;
3998 }
3999
4000 if (!(session = switch_core_session_locate(uuid))) {
4001 continue;
4002 }
4003
4004 channel = switch_core_session_get_channel(session);
4005 x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
4006 switch_assert(x_caller);
4007
4008 switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
4009
4010 if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
4011 switch_xml_set_attr_d(x_caller, "status", status);
4012 }
4013
4014 if ((status = switch_channel_get_variable(channel, "caller_id_name"))) {
4015 encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
4016 switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
4017 }
4018
4019 if ((status = switch_channel_get_variable(channel, "caller_id_number"))) {
4020 encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
4021 switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
4022 }
4023
4024 if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
4025 switch_xml_set_attr_d(x_caller, "timestamp", ts);
4026 }
4027
4028 if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
4029 switch_xml_set_attr_d(x_caller, "target", ts);
4030 }
4031
4032 if ((ts = switch_channel_get_variable(channel, "fifo_position"))) {
4033 switch_xml_set_attr_d(x_caller, "position", ts);
4034 }
4035
4036 switch_snprintf(sl, sizeof(sl), "%d", x);
4037 switch_xml_set_attr_d_buf(x_caller, "slot", sl);
4038
4039 if (verbose) {
4040 if (!(x_cp = switch_xml_add_child_d(x_caller, "cdr", d_off++))) {
4041 abort();
4042 }
4043
4044 switch_ivr_generate_xml_cdr(session, &x_cp);
4045 }
4046
4047 switch_core_session_rwunlock(session);
4048 session = NULL;
4049 }
4050
4051 switch_mutex_unlock(q->mutex);
4052 }
4053
4054 return cc_off;
4055 }
4056
list_node(fifo_node_t * node,switch_xml_t x_report,int * off,int verbose)4057 static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int verbose)
4058 {
4059 switch_xml_t x_fifo;
4060 int cc_off = 0;
4061 char buffer[35];
4062 char *tmp = buffer;
4063
4064 x_fifo = switch_xml_add_child_d(x_report, "fifo", (*off)++);;
4065 switch_assert(x_fifo);
4066
4067 switch_xml_set_attr_d(x_fifo, "name", node->name);
4068 switch_snprintf(tmp, sizeof(buffer), "%d", node->consumer_count);
4069 switch_xml_set_attr_d(x_fifo, "consumer_count", tmp);
4070 switch_snprintf(tmp, sizeof(buffer), "%d", node_caller_count(node));
4071 switch_xml_set_attr_d(x_fifo, "caller_count", tmp);
4072 switch_snprintf(tmp, sizeof(buffer), "%d", node_caller_count(node));
4073 switch_xml_set_attr_d(x_fifo, "waiting_count", tmp);
4074 switch_snprintf(tmp, sizeof(buffer), "%u", node->importance);
4075 switch_xml_set_attr_d(x_fifo, "importance", tmp);
4076
4077 switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_per_cycle);
4078 switch_xml_set_attr_d(x_fifo, "outbound_per_cycle", tmp);
4079
4080 switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_per_cycle_min);
4081 switch_xml_set_attr_d(x_fifo, "outbound_per_cycle_min", tmp);
4082
4083 switch_snprintf(tmp, sizeof(buffer), "%u", node->ring_timeout);
4084 switch_xml_set_attr_d(x_fifo, "ring_timeout", tmp);
4085
4086 switch_snprintf(tmp, sizeof(buffer), "%u", node->default_lag);
4087 switch_xml_set_attr_d(x_fifo, "default_lag", tmp);
4088
4089 switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_priority);
4090 switch_xml_set_attr_d(x_fifo, "outbound_priority", tmp);
4091
4092 switch_xml_set_attr_d(x_fifo, "outbound_strategy", print_strategy(node->outbound_strategy));
4093
4094 cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose);
4095 cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose);
4096 cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose);
4097 xml_bridges(x_fifo, node, "bridges", "bridge", cc_off, verbose);
4098 }
4099
dump_hash(switch_hash_t * hash,switch_stream_handle_t * stream)4100 void dump_hash(switch_hash_t *hash, switch_stream_handle_t *stream)
4101 {
4102 switch_hash_index_t *hi;
4103 void *val;
4104 const void *var;
4105
4106 switch_mutex_lock(globals.mutex);
4107 for (hi = switch_core_hash_first(hash); hi; hi = switch_core_hash_next(&hi)) {
4108 switch_core_hash_this(hi, &var, NULL, &val);
4109 stream->write_function(stream, " %s\n", (char *)var);
4110 }
4111 switch_mutex_unlock(globals.mutex);
4112 }
4113
node_dump(switch_stream_handle_t * stream)4114 void node_dump(switch_stream_handle_t *stream)
4115 {
4116 switch_hash_index_t *hi;
4117 fifo_node_t *node;
4118 void *val;
4119 switch_mutex_lock(globals.mutex);
4120 for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4121 switch_core_hash_this(hi, NULL, NULL, &val);
4122 if ((node = (fifo_node_t *) val)) {
4123 stream->write_function(stream, "node: %s\n"
4124 " outbound_name: %s\n"
4125 " outbound_per_cycle: %d"
4126 " outbound_per_cycle_min: %d"
4127 " outbound_priority: %d"
4128 " outbound_strategy: %s\n"
4129 " has_outbound: %d\n"
4130 " outbound_priority: %d\n"
4131 " busy: %d\n"
4132 " ready: %d\n"
4133 " waiting: %d\n"
4134 ,
4135 node->name, node->outbound_name, node->outbound_per_cycle, node->outbound_per_cycle_min,
4136 node->outbound_priority, print_strategy(node->outbound_strategy),
4137 node->has_outbound,
4138 node->outbound_priority,
4139 node->busy,
4140 node->ready,
4141 node_caller_count(node)
4142
4143 );
4144 }
4145 }
4146
4147 stream->write_function(stream, " caller_orig:\n");
4148 dump_hash(globals.caller_orig_hash, stream);
4149 stream->write_function(stream, " consumer_orig:\n");
4150 dump_hash(globals.consumer_orig_hash, stream);
4151 stream->write_function(stream, " bridge:\n");
4152 dump_hash(globals.bridge_hash, stream);
4153
4154 switch_mutex_unlock(globals.mutex);
4155 }
4156
4157 #define FIFO_API_SYNTAX "list|list_verbose|count|debug|status|has_outbound|importance [<fifo name>]|reparse [del_all]"
SWITCH_STANDARD_API(fifo_api_function)4158 SWITCH_STANDARD_API(fifo_api_function)
4159 {
4160 fifo_node_t *node;
4161 char *data = NULL;
4162 int argc = 0;
4163 char *argv[5] = { 0 };
4164 switch_hash_index_t *hi;
4165 void *val;
4166 const void *var;
4167 int x = 0, verbose = 0;
4168
4169 if (!globals.running) {
4170 return SWITCH_STATUS_FALSE;
4171 }
4172
4173 if (!zstr(cmd)) {
4174 data = strdup(cmd);
4175 switch_assert(data);
4176 }
4177
4178 switch_mutex_lock(globals.mutex);
4179
4180 if (zstr(cmd) || (argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 1 || !argv[0]) {
4181 stream->write_function(stream, "%s\n", FIFO_API_SYNTAX);
4182 goto done;
4183 }
4184
4185 if (!strcasecmp(argv[0], "status")) {
4186 node_dump(stream);
4187 goto done;
4188 }
4189
4190 if (!strcasecmp(argv[0], "debug")) {
4191 if (argv[1]) {
4192 if ((globals.debug = atoi(argv[1])) < 0) {
4193 globals.debug = 0;
4194 }
4195 }
4196 stream->write_function(stream, "debug %d\n", globals.debug);
4197 goto done;
4198 }
4199
4200 verbose = !strcasecmp(argv[0], "list_verbose");
4201
4202 if (!strcasecmp(argv[0], "reparse")) {
4203 load_config(1, argv[1] && !strcasecmp(argv[1], "del_all"));
4204 stream->write_function(stream, "+OK\n");
4205 goto done;
4206 }
4207
4208 if (!strcasecmp(argv[0], "list") || verbose) {
4209 char *xml_text = NULL;
4210 switch_xml_t x_report = switch_xml_new("fifo_report");
4211 switch_assert(x_report);
4212
4213 if (argc < 2) {
4214 for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4215 switch_core_hash_this(hi, &var, NULL, &val);
4216 node = (fifo_node_t *) val;
4217
4218 switch_mutex_lock(node->mutex);
4219 list_node(node, x_report, &x, verbose);
4220 switch_mutex_unlock(node->mutex);
4221 }
4222 } else {
4223 if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4224 switch_mutex_lock(node->mutex);
4225 list_node(node, x_report, &x, verbose);
4226 switch_mutex_unlock(node->mutex);
4227 }
4228 }
4229 xml_text = switch_xml_toxml(x_report, SWITCH_FALSE);
4230 switch_assert(xml_text);
4231 stream->write_function(stream, "%s\n", xml_text);
4232 switch_xml_free(x_report);
4233 switch_safe_free(xml_text);
4234 } else if (!strcasecmp(argv[0], "importance")) {
4235 if (argv[1] && (node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4236 int importance = 0;
4237 if (argc > 2) {
4238 importance = atoi(argv[2]);
4239 if (importance < 0) {
4240 importance = 0;
4241 }
4242 node->importance = importance;
4243 }
4244 stream->write_function(stream, "importance: %u\n", node->importance);
4245 } else {
4246 stream->write_function(stream, "no fifo by that name\n");
4247 }
4248 } else if (!strcasecmp(argv[0], "count")) {
4249 if (argc < 2) {
4250 for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4251 switch_core_hash_this(hi, &var, NULL, &val);
4252 node = (fifo_node_t *) val;
4253 switch_mutex_lock(node->update_mutex);
4254 stream->write_function(stream, "%s:%d:%d:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), node->member_count, node->ring_consumer_count, node_idle_consumers(node));
4255 switch_mutex_unlock(node->update_mutex);
4256 x++;
4257 }
4258
4259 if (!x) {
4260 stream->write_function(stream, "none\n");
4261 }
4262 } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4263 switch_mutex_lock(node->update_mutex);
4264 stream->write_function(stream, "%s:%d:%d:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), node->member_count, node->ring_consumer_count, node_idle_consumers(node));
4265 switch_mutex_unlock(node->update_mutex);
4266 } else {
4267 stream->write_function(stream, "none\n");
4268 }
4269 } else if (!strcasecmp(argv[0], "has_outbound")) {
4270 if (argc < 2) {
4271 for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4272 switch_core_hash_this(hi, &var, NULL, &val);
4273 node = (fifo_node_t *) val;
4274 switch_mutex_lock(node->update_mutex);
4275 stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
4276 switch_mutex_unlock(node->update_mutex);
4277 x++;
4278 }
4279
4280 if (!x) {
4281 stream->write_function(stream, "none\n");
4282 }
4283 } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4284 switch_mutex_lock(node->update_mutex);
4285 stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
4286 switch_mutex_unlock(node->update_mutex);
4287 } else {
4288 stream->write_function(stream, "none\n");
4289 }
4290 } else {
4291 stream->write_function(stream, "-ERR Usage: %s\n", FIFO_API_SYNTAX);
4292 }
4293
4294 done:
4295
4296 switch_safe_free(data);
4297
4298 switch_mutex_unlock(globals.mutex);
4299 return SWITCH_STATUS_SUCCESS;
4300 }
4301
4302 const char outbound_sql[] =
4303 "create table fifo_outbound (\n"
4304 " uuid varchar(255),\n"
4305 " fifo_name varchar(255),\n"
4306 " originate_string varchar(255),\n"
4307 " simo_count integer,\n"
4308 " use_count integer,\n"
4309 " timeout integer,\n"
4310 " lag integer,\n"
4311 " next_avail integer not null default 0,\n"
4312 " expires integer not null default 0,\n"
4313 " static integer not null default 0,\n"
4314 " outbound_call_count integer not null default 0,\n"
4315 " outbound_fail_count integer not null default 0,\n"
4316 " hostname varchar(255),\n"
4317 " taking_calls integer not null default 1,\n"
4318 " status varchar(255),\n"
4319 " outbound_call_total_count integer not null default 0,\n"
4320 " outbound_fail_total_count integer not null default 0,\n"
4321 " active_time integer not null default 0,\n"
4322 " inactive_time integer not null default 0,\n"
4323 " manual_calls_out_count integer not null default 0,\n"
4324 " manual_calls_in_count integer not null default 0,\n"
4325 " manual_calls_out_total_count integer not null default 0,\n"
4326 " manual_calls_in_total_count integer not null default 0,\n"
4327 " ring_count integer not null default 0,\n"
4328 " start_time integer not null default 0,\n"
4329 " stop_time integer not null default 0\n"
4330 ");\n";
4331
4332 const char bridge_sql[] =
4333 "create table fifo_bridge (\n"
4334 " fifo_name varchar(1024) not null,\n"
4335 " caller_uuid varchar(255) not null,\n"
4336 " caller_caller_id_name varchar(255),\n"
4337 " caller_caller_id_number varchar(255),\n"
4338
4339 " consumer_uuid varchar(255) not null,\n"
4340 " consumer_outgoing_uuid varchar(255),\n"
4341 " bridge_start integer\n"
4342 ");\n"
4343 ;
4344
4345 const char callers_sql[] =
4346 "create table fifo_callers (\n"
4347 " fifo_name varchar(255) not null,\n"
4348 " uuid varchar(255) not null,\n"
4349 " caller_caller_id_name varchar(255),\n"
4350 " caller_caller_id_number varchar(255),\n"
4351 " timestamp integer\n"
4352 ");\n"
4353 ;
4354
extract_fifo_outbound_uuid(char * string,char * uuid,switch_size_t len)4355 static void extract_fifo_outbound_uuid(char *string, char *uuid, switch_size_t len)
4356 {
4357 switch_event_t *ovars;
4358 char *parsed = NULL;
4359 const char *fifo_outbound_uuid;
4360
4361 switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
4362 switch_event_create_brackets(string, '{', '}', ',', &ovars, &parsed, SWITCH_TRUE);
4363
4364 if ((fifo_outbound_uuid = switch_event_get_header(ovars, "fifo_outbound_uuid"))) {
4365 switch_snprintf(uuid, len, "%s", fifo_outbound_uuid);
4366 }
4367
4368 switch_safe_free(parsed);
4369 switch_event_destroy(&ovars);
4370 }
4371
read_config_file(switch_xml_t * xml,switch_xml_t * cfg)4372 static switch_status_t read_config_file(switch_xml_t *xml, switch_xml_t *cfg) {
4373 const char *cf = "fifo.conf";
4374 switch_xml_t settings;
4375
4376 if (!(*xml = switch_xml_open_cfg(cf, cfg, NULL))) {
4377 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
4378 return SWITCH_STATUS_TERM;
4379 }
4380 if ((settings = switch_xml_child(*cfg, "settings"))) {
4381 switch_xml_t param;
4382 for (param = switch_xml_child(settings, "param"); param; param = param->next) {
4383 char *var = (char*)switch_xml_attr_soft(param, "name");
4384 char *val = (char*)switch_xml_attr_soft(param, "value");
4385
4386 if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) {
4387 globals.default_strategy = parse_strategy(val);
4388 } else if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
4389 if (switch_database_available(val) == SWITCH_STATUS_SUCCESS) {
4390 switch_set_string(globals.odbc_dsn, val);
4391 } else {
4392 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE IS NOT AVAILABLE!\n");
4393 }
4394 } else if (!strcasecmp(var, "dbname") && !zstr(val)) {
4395 globals.dbname = switch_core_strdup(globals.pool, val);
4396 } else if (!strcasecmp(var, "allow-transcoding") && !zstr(val)) {
4397 globals.allow_transcoding = switch_true(val);
4398 } else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) {
4399 globals.pre_trans_execute = switch_core_strdup(globals.pool, val);
4400 } else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) {
4401 globals.post_trans_execute = switch_core_strdup(globals.pool, val);
4402 } else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) {
4403 globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val);
4404 } else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
4405 globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
4406 } else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
4407 globals.delete_all_members_on_startup = switch_true(val);
4408 }
4409 }
4410 }
4411 return SWITCH_STATUS_SUCCESS;
4412 }
4413
4414 /*!\brief Load or reload the configuration
4415 *
4416 * On the initial load, non-static members are preserved unless the
4417 * parameter `delete-all-outbound-members-on-startup` is set. The
4418 * parameter `del_all` is ignored in this case.
4419 *
4420 * On reload, non-static members are preserved unless `del_all` is
4421 * set.
4422 *
4423 * \param reload true if we're reloading the config
4424 * \param del_all delete all outbound members when reloading;
4425 * not used unless reload is true
4426 */
load_config(int reload,int del_all)4427 static switch_status_t load_config(int reload, int del_all)
4428 {
4429 switch_xml_t xml, cfg, fifo, fifos, member;
4430 switch_status_t status = SWITCH_STATUS_SUCCESS;
4431 char *sql;
4432 switch_cache_db_handle_t *dbh = NULL;
4433 fifo_node_t *node;
4434
4435 strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);
4436 globals.dbname = "fifo";
4437 globals.default_strategy = NODE_STRATEGY_RINGALL;
4438 globals.delete_all_members_on_startup = SWITCH_FALSE;
4439
4440 if ((status = read_config_file(&xml, &cfg)) != SWITCH_STATUS_SUCCESS) return status;
4441
4442 if (!(dbh = fifo_get_db_handle())) {
4443 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot open DB!\n");
4444 goto done;
4445 }
4446
4447 if (!reload) {
4448 switch_sql_queue_manager_init_name("fifo",
4449 &globals.qm,
4450 2,
4451 !zstr(globals.odbc_dsn) ? globals.odbc_dsn : globals.dbname,
4452 SWITCH_MAX_TRANS,
4453 globals.pre_trans_execute,
4454 globals.post_trans_execute,
4455 globals.inner_pre_trans_execute,
4456 globals.inner_post_trans_execute);
4457 switch_sql_queue_manager_start(globals.qm);
4458
4459 switch_cache_db_test_reactive(dbh, "delete from fifo_outbound where static = 1 or taking_calls < 0 or stop_time < 0",
4460 "drop table fifo_outbound", outbound_sql);
4461 switch_cache_db_test_reactive(dbh, "delete from fifo_bridge", "drop table fifo_bridge", bridge_sql);
4462 switch_cache_db_test_reactive(dbh, "delete from fifo_callers", "drop table fifo_callers", callers_sql);
4463 }
4464
4465 switch_cache_db_release_db_handle(&dbh);
4466
4467 if (!reload) {
4468 char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0";
4469 fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE);
4470 fifo_init_use_count();
4471 }
4472
4473 if (reload) {
4474 switch_hash_index_t *hi;
4475 fifo_node_t *node;
4476 void *val;
4477 switch_mutex_lock(globals.mutex);
4478 for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4479 switch_core_hash_this(hi, NULL, NULL, &val);
4480 if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) {
4481 node->ready = -1;
4482 }
4483 }
4484 switch_mutex_unlock(globals.mutex);
4485 }
4486
4487 if ((reload && del_all) || (!reload && globals.delete_all_members_on_startup)) {
4488 sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname);
4489 } else {
4490 sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
4491 }
4492
4493 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4494
4495 if (!switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME)) {
4496 node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex);
4497 node->ready = 2;
4498 node->is_static = 0;
4499 }
4500
4501 if ((fifos = switch_xml_child(cfg, "fifos"))) {
4502 for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) {
4503 const char *name, *sp;
4504 const char *val;
4505 int i, importance = 0;
4506
4507 if (!(name = switch_xml_attr(fifo, "name"))) {
4508 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fifo has no name!\n");
4509 continue;
4510 }
4511
4512 if (!strcasecmp(name, MANUAL_QUEUE_NAME)) {
4513 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s is a reserved name, use another name please.\n", MANUAL_QUEUE_NAME);
4514 continue;
4515 }
4516
4517 if ((val = switch_xml_attr(fifo, "importance")) && !((i = atoi(val)) < 0)) {
4518 importance = i;
4519 }
4520
4521 switch_mutex_lock(globals.mutex);
4522 if (!(node = switch_core_hash_find(globals.fifo_hash, name))) {
4523 node = create_node(name, importance, globals.sql_mutex);
4524 }
4525
4526 if ((val = switch_xml_attr(fifo, "outbound_name"))) {
4527 node->outbound_name = switch_core_strdup(node->pool, val);
4528 }
4529
4530 switch_mutex_unlock(globals.mutex);
4531 switch_assert(node);
4532 switch_mutex_lock(node->mutex);
4533
4534 if ((sp = switch_xml_attr(fifo, "outbound_strategy"))) {
4535 node->outbound_strategy = parse_strategy(sp);
4536 node->has_outbound = 1;
4537 }
4538
4539 node->outbound_per_cycle = 1;
4540 if ((val = switch_xml_attr(fifo, "outbound_per_cycle"))) {
4541 if (!((i = atoi(val)) < 0)) {
4542 node->outbound_per_cycle = i;
4543 }
4544 node->has_outbound = 1;
4545 }
4546
4547 node->outbound_per_cycle_min = 1;
4548 if ((val = switch_xml_attr(fifo, "outbound_per_cycle_min"))) {
4549 if (!((i = atoi(val)) < 0)) {
4550 node->outbound_per_cycle_min = i;
4551 }
4552 }
4553
4554 if ((val = switch_xml_attr(fifo, "retry_delay"))) {
4555 if ((i = atoi(val)) < 0) i = 0;
4556 node->retry_delay = i;
4557 }
4558
4559 node->outbound_priority = 5;
4560 if ((val = switch_xml_attr(fifo, "outbound_priority"))) {
4561 i = atoi(val);
4562 if (!(i < 1 || i > 10)) {
4563 node->outbound_priority = i;
4564 }
4565 node->has_outbound = 1;
4566 }
4567
4568 node->ring_timeout = 60;
4569 if ((val = switch_xml_attr(fifo, "outbound_ring_timeout"))) {
4570 if ((i = atoi(val)) > 10) {
4571 node->ring_timeout = i;
4572 } else {
4573 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid ring_timeout: must be > 10 for queue %s\n", node->name);
4574 }
4575 }
4576
4577 node->default_lag = 30;
4578 if ((val = switch_xml_attr(fifo, "outbound_default_lag"))) {
4579 if ((i = atoi(val)) > 10) {
4580 node->default_lag = i;
4581 } else {
4582 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid default_lag: must be > 10 for queue %s\n", node->name);
4583 }
4584 }
4585
4586 for (member = switch_xml_child(fifo, "member"); member; member = member->next) {
4587 const char *simo, *taking_calls, *timeout, *lag;
4588 int simo_i = 1, taking_calls_i = 1, timeout_i = 60, lag_i = 10;
4589 char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4590
4591 if (switch_stristr("fifo_outbound_uuid=", member->txt)) {
4592 extract_fifo_outbound_uuid(member->txt, digest, sizeof(digest));
4593 } else {
4594 switch_md5_string(digest, (void *) member->txt, strlen(member->txt));
4595 }
4596
4597 if ((simo = switch_xml_attr_soft(member, "simo"))) {
4598 simo_i = atoi(simo);
4599 }
4600
4601 if ((taking_calls = switch_xml_attr_soft(member, "taking_calls"))
4602 && (taking_calls_i = atoi(taking_calls)) < 1) {
4603 taking_calls_i = 1;
4604 }
4605
4606 if ((timeout = switch_xml_attr_soft(member, "timeout"))
4607 && (timeout_i = atoi(timeout)) < 10) {
4608 timeout_i = node->ring_timeout;
4609 }
4610
4611 if ((lag = switch_xml_attr_soft(member, "lag"))
4612 && (lag_i = atoi(lag)) < 0) {
4613 lag_i = node->default_lag;
4614 }
4615
4616 sql = switch_mprintf("insert into fifo_outbound "
4617 "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
4618 "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname, taking_calls, "
4619 "active_time, inactive_time) "
4620 "values ('%q','%q','%q',%d,%d,%d,%d,0,0,1,0,0,'%q',%d,%ld,0)",
4621 digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, globals.hostname, taking_calls_i,
4622 (long) switch_epoch_time_now(NULL));
4623 switch_assert(sql);
4624 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
4625 node->has_outbound = 1;
4626 node->member_count++;
4627 }
4628 node->ready = 1;
4629 node->is_static = 1;
4630 switch_mutex_unlock(node->mutex);
4631 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s configured\n", node->name);
4632 }
4633 }
4634
4635 done:
4636
4637 switch_xml_free(xml);
4638 if (reload) {
4639 fifo_node_t *node;
4640 switch_mutex_lock(globals.mutex);
4641 for (node = globals.nodes; node; node = node->next) {
4642 if (node->ready == -1) {
4643 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name);
4644 switch_core_hash_delete(globals.fifo_hash, node->name);
4645 node->ready = 0;
4646 }
4647 }
4648 switch_mutex_unlock(globals.mutex);
4649 }
4650
4651 return status;
4652 }
4653
fifo_member_add(char * fifo_name,char * originate_string,int simo_count,int timeout,int lag,time_t expires,int taking_calls)4654 static void fifo_member_add(char *fifo_name, char *originate_string, int simo_count, int timeout, int lag, time_t expires, int taking_calls)
4655 {
4656 char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4657 char *sql, *name_dup, *p;
4658 char outbound_count[80] = "";
4659 callback_t cbt = { 0 };
4660 fifo_node_t *node = NULL;
4661
4662 if (!fifo_name) return;
4663
4664 if (switch_stristr("fifo_outbound_uuid=", originate_string)) {
4665 extract_fifo_outbound_uuid(originate_string, digest, sizeof(digest));
4666 } else {
4667 switch_md5_string(digest, (void *) originate_string, strlen(originate_string));
4668 }
4669
4670 sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest);
4671 switch_assert(sql);
4672 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4673
4674 switch_mutex_lock(globals.mutex);
4675 if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
4676 node = create_node(fifo_name, 0, globals.sql_mutex);
4677 node->ready = 1;
4678 }
4679 switch_mutex_unlock(globals.mutex);
4680
4681 name_dup = strdup(fifo_name);
4682 if ((p = strchr(name_dup, '@'))) {
4683 *p = '\0';
4684 }
4685
4686 sql = switch_mprintf("insert into fifo_outbound "
4687 "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
4688 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname, taking_calls, active_time, inactive_time) "
4689 "values ('%q','%q','%q',%d,%d,%d,%d,%d,%ld,0,0,0,'%q',%d,%ld,0)",
4690 digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls,
4691 (long)switch_epoch_time_now(NULL));
4692 switch_assert(sql);
4693 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4694 free(name_dup);
4695
4696 cbt.buf = outbound_count;
4697 cbt.len = sizeof(outbound_count);
4698 sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", fifo_name);
4699 fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
4700 node->member_count = atoi(outbound_count);
4701 if (node->member_count > 0) {
4702 node->has_outbound = 1;
4703 } else {
4704 node->has_outbound = 0;
4705 }
4706 switch_safe_free(sql);
4707 }
4708
fifo_member_del(char * fifo_name,char * originate_string)4709 static void fifo_member_del(char *fifo_name, char *originate_string)
4710 {
4711 char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4712 char *sql;
4713 char outbound_count[80] = "";
4714 callback_t cbt = { 0 };
4715 fifo_node_t *node = NULL;
4716
4717 if (!fifo_name) return;
4718
4719 if (switch_stristr("fifo_outbound_uuid=", originate_string)) {
4720 extract_fifo_outbound_uuid(originate_string, digest, sizeof(digest));
4721 } else {
4722 switch_md5_string(digest, (void *) originate_string, strlen(originate_string));
4723 }
4724
4725 sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname);
4726 switch_assert(sql);
4727 fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4728
4729 switch_mutex_lock(globals.mutex);
4730 if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
4731 node = create_node(fifo_name, 0, globals.sql_mutex);
4732 node->ready = 1;
4733 }
4734 switch_mutex_unlock(globals.mutex);
4735
4736 cbt.buf = outbound_count;
4737 cbt.len = sizeof(outbound_count);
4738 sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
4739 fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
4740 node->member_count = atoi(outbound_count);
4741 if (node->member_count > 0) {
4742 node->has_outbound = 1;
4743 } else {
4744 node->has_outbound = 0;
4745 }
4746 switch_safe_free(sql);
4747 }
4748
4749 #define FIFO_MEMBER_API_SYNTAX "[add <fifo_name> <originate_string> [<simo_count>] [<timeout>] [<lag>] [<expires>] [<taking_calls>] | del <fifo_name> <originate_string>]"
SWITCH_STANDARD_API(fifo_member_api_function)4750 SWITCH_STANDARD_API(fifo_member_api_function)
4751 {
4752 char *fifo_name;
4753 char *originate_string;
4754 int simo_count = 1;
4755 int timeout = 60;
4756 int lag = 5;
4757 int taking_calls = 1;
4758 char *action;
4759 char *mydata = NULL, *argv[8] = { 0 };
4760 int argc;
4761 time_t expires = 0;
4762
4763 if (!globals.running) {
4764 return SWITCH_STATUS_FALSE;
4765 }
4766
4767 if (zstr(cmd)) {
4768 stream->write_function(stream, "-USAGE: %s\n", FIFO_MEMBER_API_SYNTAX);
4769 return SWITCH_STATUS_SUCCESS;
4770 }
4771
4772 mydata = strdup(cmd);
4773 switch_assert(mydata);
4774
4775 argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
4776
4777 if (argc < 3) {
4778 stream->write_function(stream, "%s", "-ERR Invalid!\n");
4779 goto done;
4780 }
4781
4782 action = argv[0];
4783 fifo_name = argv[1];
4784 originate_string = argv[2];
4785
4786 if (action && !strcasecmp(action, "add")) {
4787 if (argc > 3) {
4788 simo_count = atoi(argv[3]);
4789 }
4790 if (argc > 4) {
4791 timeout = atoi(argv[4]);
4792 }
4793 if (argc > 5) {
4794 lag = atoi(argv[5]);
4795 }
4796 if (argc > 6) {
4797 expires = switch_epoch_time_now(NULL) + atoi(argv[6]);
4798 }
4799 if (argc > 7) {
4800 taking_calls = atoi(argv[7]);
4801 }
4802 if (simo_count < 0) {
4803 simo_count = 1;
4804 }
4805 if (timeout < 0) {
4806 timeout = 60;
4807 }
4808 if (lag < 0) {
4809 lag = 5;
4810 }
4811 if (taking_calls < 1) {
4812 taking_calls = 1;
4813 }
4814
4815 fifo_member_add(fifo_name, originate_string, simo_count, timeout, lag, expires, taking_calls);
4816 stream->write_function(stream, "%s", "+OK\n");
4817 } else if (action && !strcasecmp(action, "del")) {
4818 fifo_member_del(fifo_name, originate_string);
4819 stream->write_function(stream, "%s", "+OK\n");
4820 } else {
4821 stream->write_function(stream, "%s", "-ERR Invalid!\n");
4822 goto done;
4823 }
4824
4825 done:
4826
4827 free(mydata);
4828
4829 return SWITCH_STATUS_SUCCESS;
4830 }
4831
SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)4832 SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
4833 {
4834 switch_application_interface_t *app_interface;
4835 switch_api_interface_t *commands_api_interface;
4836 switch_status_t status;
4837
4838 /* create/register custom event message type */
4839 if (switch_event_reserve_subclass(FIFO_EVENT) != SWITCH_STATUS_SUCCESS) {
4840 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!", FIFO_EVENT);
4841 return SWITCH_STATUS_TERM;
4842 }
4843
4844 /* Subscribe to presence request events */
4845 if (switch_event_bind_removable(modname, SWITCH_EVENT_PRESENCE_PROBE, SWITCH_EVENT_SUBCLASS_ANY,
4846 pres_event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
4847 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't subscribe to presence request events!\n");
4848 return SWITCH_STATUS_GENERR;
4849 }
4850
4851 globals.pool = pool;
4852 switch_core_hash_init(&globals.fifo_hash);
4853
4854 switch_core_hash_init(&globals.caller_orig_hash);
4855 switch_core_hash_init(&globals.consumer_orig_hash);
4856 switch_core_hash_init(&globals.bridge_hash);
4857 switch_core_hash_init(&globals.use_hash);
4858 switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4859 switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4860 switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4861
4862 switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
4863 switch_mutex_init(&globals.use_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4864 switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4865
4866 globals.running = 1;
4867
4868 if ((status = load_config(0, 1)) != SWITCH_STATUS_SUCCESS) {
4869 switch_event_unbind(&globals.node);
4870 switch_event_free_subclass(FIFO_EVENT);
4871 switch_core_hash_destroy(&globals.fifo_hash);
4872 return status;
4873 }
4874
4875 /* connect my internal structure to the blank pointer passed to me */
4876 *module_interface = switch_loadable_module_create_module_interface(pool, modname);
4877 SWITCH_ADD_APP(app_interface, "fifo", "Park with FIFO", FIFO_DESC, fifo_function, FIFO_USAGE, SAF_NONE);
4878 SWITCH_ADD_APP(app_interface, "fifo_track_call", "Count a call as a fifo call in the manual_calls queue",
4879 "", fifo_track_call_function, "<fifo_outbound_uuid>", SAF_SUPPORT_NOMEDIA);
4880 SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX);
4881 SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX);
4882 SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, "<node> <url> [<priority>]");
4883 SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "<uuid>|<outbound_id>");
4884 switch_console_set_complete("add fifo list");
4885 switch_console_set_complete("add fifo list_verbose");
4886 switch_console_set_complete("add fifo count");
4887 switch_console_set_complete("add fifo debug");
4888 switch_console_set_complete("add fifo status");
4889 switch_console_set_complete("add fifo has_outbound");
4890 switch_console_set_complete("add fifo importance");
4891 switch_console_set_complete("add fifo reparse");
4892 switch_console_set_complete("add fifo_check_bridge ::console::list_uuid");
4893
4894 start_node_thread(globals.pool);
4895
4896 return SWITCH_STATUS_SUCCESS;
4897 }
4898
4899 /*
4900 Called when the system shuts down
4901 */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)4902 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
4903 {
4904 switch_event_t *pop = NULL;
4905 fifo_node_t *node, *this_node;
4906 switch_mutex_t *mutex = globals.mutex;
4907
4908 switch_sql_queue_manager_destroy(&globals.qm);
4909
4910 switch_event_unbind(&globals.node);
4911 switch_event_free_subclass(FIFO_EVENT);
4912
4913 switch_mutex_lock(mutex);
4914
4915 globals.running = 0;
4916 /* Cleanup */
4917
4918 stop_node_thread();
4919
4920 while(globals.threads) {
4921 switch_cond_next();
4922 }
4923
4924 node = globals.nodes;
4925
4926 while(node) {
4927 int x = 0;
4928
4929 this_node = node;
4930 node = node->next;
4931
4932 switch_mutex_lock(this_node->update_mutex);
4933 switch_mutex_lock(this_node->mutex);
4934 for (x = 0; x < MAX_PRI; x++) {
4935 while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
4936 switch_event_destroy(&pop);
4937 }
4938 }
4939 switch_mutex_unlock(this_node->mutex);
4940 switch_core_hash_delete(globals.fifo_hash, this_node->name);
4941 switch_core_hash_destroy(&this_node->consumer_hash);
4942 switch_mutex_unlock(this_node->update_mutex);
4943 switch_core_destroy_memory_pool(&this_node->pool);
4944 }
4945
4946 switch_core_hash_destroy(&globals.fifo_hash);
4947 switch_core_hash_destroy(&globals.caller_orig_hash);
4948 switch_core_hash_destroy(&globals.consumer_orig_hash);
4949 switch_core_hash_destroy(&globals.bridge_hash);
4950 switch_core_hash_destroy(&globals.use_hash);
4951 memset(&globals, 0, sizeof(globals));
4952 switch_mutex_unlock(mutex);
4953
4954 return SWITCH_STATUS_SUCCESS;
4955 }
4956
4957 /* For Emacs:
4958 * Local Variables:
4959 * mode:c
4960 * indent-tabs-mode:t
4961 * tab-width:4
4962 * c-basic-offset:4
4963 * End:
4964 * For VIM:
4965 * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
4966 */
4967