1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  * Travis Cross <tc@traviscross.com>
28  *
29  * mod_fifo.c -- FIFO
30  *
31  */
32 #include <switch.h>
33 #define FIFO_APP_KEY "mod_fifo"
34 
35 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown);
36 SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load);
37 SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
38 
39 /*!\file
40  * # Theory of operation
41  *
42  * ## Kinds of things
43  *
44  * The fifo systems deals in various kinds of things: /fifos nodes/,
45  * /queues/, /(inbound) callers/, /outbound callers/, /consumers/, and
46  * /(outbound) members/.
47  *
48  * /fifo nodes/ accept callers and work to deliver those callers to
49  * consumers and members.  The nodes contain an array of /queues/
50  * indexed by a priority value.
51  *
52  * /queues/ contain an array of callers treated as a list.
53  *
54  * /callers/ are the channels placed into a fifo node's queue for
55  * eventual delivery to consumers and members.
56  *
57  * /outbound callers/ are persons waiting to be called back via a
58  * dial string.
59  *
60  * /consumers/ are channels for agents who have dialed in to one or
61  * more fifos and will have callers connected to them.
62  *
63  * /members/ are agents who we'll place calls to via a dial string to
64  * attempt to deliver callers.
65  *
66  * An /agent/ may refer to either a /consumer/ or an /member/.
67  *
68  * ## Outbound Strategies
69  *
70  * An outbound strategy defines the way in which we attempt to deliver
71  * callers to members.
72  *
73  * The default strategy, /ringall/, preserves the caller ID of the
74  * caller being delivered.  Because this means we must choose a caller
75  * before we place the call to the member, this impacts the order in
76  * which calls are delivered and the rate at which we can deliver
77  * those calls.
78  *
79  * The /enterprise/ outbound strategy does not preserve the caller ID
80  * of the caller thereby allowing deliver of callers to agents at the
81  * fastest possible rate.
82  *
83  * outbound_per_cycle is used to define the maximum number of agents
84  * who will be available to answer a single caller. In ringall this
85  * maximum is the number who will be called, in enterprise the need defines
86  * how many agents will be called. outbound_per_cycle_min will define
87  * the minimum agents who will be called to answer a caller regardless of
88  * need, giving the enterprise strategy the ability to ring through more
89  * than one agent for one caller.
90 
91  *
92  * ## Manual calls
93  *
94  * The fifo system provides a way to prevent members on non-fifo calls
95  * from receiving a call from the fifo system.  We do this by tracking
96  * non-fifo calls in a special fifo named `manual_calls`.  When
97  * creating a channel for an agent we set the channel variable
98  * `fifo_outbound_uuid` to an arbitrary unique value <= 32 characters
99  * for that agent, then call `fifo_track_call`.  For the corresponding
100  * member we must also set `{fifo_outbound_uuid=}` to the same value.
101  * We expect the value of `fifo_outbound_uuid` to be the MD5 hash of
102  * the unique ID.  Values longer than 32 characters will cause the
103  * mechanism to fail to work as expected.
104  *
105  * ## Importance
106  *
107  * Importance is a value 0-9 that can be associated with a fifo.  The
108  * default value is 0.  If the fifo is being dynamically created the
109  * importance of the fifo can be set when calling the `fifo`
110  * application.  If the fifo already exists the importance value
111  * passed to the fifo application will be ignored.
112  */
113 
114 #define MANUAL_QUEUE_NAME "manual_calls"
115 #define FIFO_EVENT "fifo::info"
116 
117 static switch_status_t load_config(int reload, int del_all);
118 #define MAX_PRI 10
119 
120 typedef enum {
121 	NODE_STRATEGY_INVALID = -1,
122 	NODE_STRATEGY_RINGALL = 0,
123 	NODE_STRATEGY_ENTERPRISE
124 } outbound_strategy_t;
125 
126 /*!\struct fifo_queue_t
127  * \brief Queue of callers
128  *
129  * Callers are placed into a queue as events in `data` which is an
130  * array of such events.  The array size is hard-coded as 1000
131  * elements.
132  *
133  * Fifo nodes are composed of an array of these queues representing
134  * each priority level of the fifo.
135  */
136 typedef struct {
137 	int nelm;
138 	int idx;
139 	switch_event_t **data;
140 	switch_memory_pool_t *pool;
141 	switch_mutex_t *mutex;
142 } fifo_queue_t;
143 
144 typedef enum {
145 	FIFO_APP_BRIDGE_TAG = (1 << 0),
146 	FIFO_APP_TRACKING = (1 << 1),
147 	FIFO_APP_DID_HOOK = (1 << 2)
148 } fifo_app_flag_t;
149 
150 static int check_caller_outbound_call(const char *key);
151 static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
152 static void del_caller_outbound_call(const char *key);
153 static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause);
154 static int check_consumer_outbound_call(const char *key);
155 static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause);
156 static void del_consumer_outbound_call(const char *key);
157 static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause);
158 
159 static int check_bridge_call(const char *key);
160 static void add_bridge_call(const char *key);
161 static void del_bridge_call(const char *key);
162 
fifo_queue_create(fifo_queue_t ** queue,int size,switch_memory_pool_t * pool)163 switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
164 {
165 	fifo_queue_t *q;
166 
167 	q = switch_core_alloc(pool, sizeof(*q));
168 	q->pool = pool;
169 	q->nelm = size - 1;
170 	q->data = switch_core_alloc(pool, size * sizeof(switch_event_t *));
171 	switch_mutex_init(&q->mutex, SWITCH_MUTEX_NESTED, pool);
172 
173 	*queue = q;
174 
175 	return SWITCH_STATUS_SUCCESS;
176 }
177 
change_pos(switch_event_t * event,int pos)178 static void change_pos(switch_event_t *event, int pos)
179 {
180 	const char *uuid = switch_event_get_header(event, "unique-id");
181 	switch_core_session_t *session;
182 	switch_channel_t *channel;
183 	char tmp[30] = "";
184 
185 	if (zstr(uuid)) return;
186 	if (!(session = switch_core_session_locate(uuid))) {
187 		return;
188 	}
189 	channel = switch_core_session_get_channel(session);
190 	switch_snprintf(tmp, sizeof(tmp), "%d", pos);
191 	switch_channel_set_variable(channel, "fifo_position", tmp);
192 	switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp);
193 	switch_core_session_rwunlock(session);
194 }
195 
fifo_queue_push(fifo_queue_t * queue,switch_event_t * ptr)196 static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
197 {
198 	switch_mutex_lock(queue->mutex);
199 	if (queue->idx == queue->nelm) {
200 		switch_mutex_unlock(queue->mutex);
201 		return SWITCH_STATUS_FALSE;
202 	}
203 	queue->data[queue->idx++] = ptr;
204 	switch_mutex_unlock(queue->mutex);
205 	return SWITCH_STATUS_SUCCESS;
206 }
207 
fifo_queue_size(fifo_queue_t * queue)208 static int fifo_queue_size(fifo_queue_t *queue)
209 {
210 	int s;
211 	switch_mutex_lock(queue->mutex);
212 	s = queue->idx;
213 	switch_mutex_unlock(queue->mutex);
214 	return s;
215 }
216 
217 /*!
218  * \param remove Whether to remove the popped event from the queue
219  *   If remove is 0, do not remove the popped event.  If it is 1,
220  *   remove it if it is not an event for an outbound caller.  If it is
221  *   2, always remove it.
222  */
fifo_queue_pop(fifo_queue_t * queue,switch_event_t ** pop,int remove)223 static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
224 {
225 	int i, j;
226 
227 	switch_mutex_lock(queue->mutex);
228 
229 	if (queue->idx == 0) {
230 		switch_mutex_unlock(queue->mutex);
231 		return SWITCH_STATUS_FALSE;
232 	}
233 
234 	for (j = 0; j < queue->idx; j++) {
235 		const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
236 		if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) {
237 			if (remove) {
238 				*pop = queue->data[j];
239 			} else {
240 				switch_event_dup(pop, queue->data[j]);
241 			}
242 			break;
243 		}
244 	}
245 
246 	if (j == queue->idx) {
247 		switch_mutex_unlock(queue->mutex);
248 		return SWITCH_STATUS_FALSE;
249 	}
250 
251 	if (remove) {
252 		for (i = j+1; i < queue->idx; i++) {
253 			queue->data[i-1] = queue->data[i];
254 			queue->data[i] = NULL;
255 			change_pos(queue->data[i-1], i);
256 		}
257 
258 		queue->idx--;
259 	}
260 
261 	switch_mutex_unlock(queue->mutex);
262 	return SWITCH_STATUS_SUCCESS;
263 }
264 
265 /*!\brief Remove matching event from queue
266  *
267  * Each event in the queue will be checked to see whether it has a
268  * header equal to name whose value is equal to val.  If it does, that
269  * event will be returned unless the event is for an outbound caller.
270  * If name starts with '+' or remove == 2 then forcing is enabled and
271  * the event will be returned in any case.  If remove > 0 then the
272  * returned event will be removed from the queue and the remaining
273  * elements shifted to make them contiguous.
274  */
fifo_queue_pop_nameval(fifo_queue_t * queue,const char * name,const char * val,switch_event_t ** pop,int remove)275 static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
276 {
277 	int i, j, force = 0;
278 
279 	switch_mutex_lock(queue->mutex);
280 
281 	if (name && *name == '+') {
282 		name++;
283 		force = 1;
284 	}
285 
286 	if (remove == 2) {
287 		force = 1;
288 	}
289 
290 	if (queue->idx == 0 || zstr(name) || zstr(val)) {
291 		switch_mutex_unlock(queue->mutex);
292 		return SWITCH_STATUS_FALSE;
293 	}
294 
295 	for (j = 0; j < queue->idx; j++) {
296 		const char *j_val = switch_event_get_header(queue->data[j], name);
297 		const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
298 		if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) {
299 			if (remove) {
300 				*pop = queue->data[j];
301 			} else {
302 				switch_event_dup(pop, queue->data[j]);
303 			}
304 			break;
305 		}
306 	}
307 
308 	if (j == queue->idx) {
309 		switch_mutex_unlock(queue->mutex);
310 		return SWITCH_STATUS_FALSE;
311 	}
312 
313 	if (remove) {
314 		for (i = j+1; i < queue->idx; i++) {
315 			queue->data[i-1] = queue->data[i];
316 			queue->data[i] = NULL;
317 			change_pos(queue->data[i-1], i);
318 		}
319 
320 		queue->idx--;
321 	}
322 
323 	switch_mutex_unlock(queue->mutex);
324 
325 	return SWITCH_STATUS_SUCCESS;
326 }
327 
328 /*!\brief Destroy event with given uuid and remove it from queue
329  *
330  * Elements of the queue are searched until a matching uuid is found.
331  * That uuid is then destroyed and removed from the queue.  The
332  * remaining elements are shifted to make them contiguous.
333  */
fifo_queue_popfly(fifo_queue_t * queue,const char * uuid)334 static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
335 {
336 	int i, j;
337 
338 	switch_mutex_lock(queue->mutex);
339 
340 	if (queue->idx == 0 || zstr(uuid)) {
341 		switch_mutex_unlock(queue->mutex);
342 		return SWITCH_STATUS_FALSE;
343 	}
344 
345 	for (j = 0; j < queue->idx; j++) {
346 		const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
347 		if (j_uuid && !strcmp(j_uuid, uuid)) {
348 			switch_event_destroy(&queue->data[j]);
349 			break;
350 		}
351 	}
352 
353 	if (j == queue->idx) {
354 		switch_mutex_unlock(queue->mutex);
355 		return SWITCH_STATUS_FALSE;
356 	}
357 
358 	for (i = j+1; i < queue->idx; i++) {
359 		queue->data[i-1] = queue->data[i];
360 		queue->data[i] = NULL;
361 		change_pos(queue->data[i-1], i);
362 	}
363 
364 	queue->idx--;
365 
366 	switch_mutex_unlock(queue->mutex);
367 
368 	return SWITCH_STATUS_SUCCESS;
369 }
370 
371 /*!\struct fifo_node
372  *
373  * \var fifo_node::outbound_name
374  * \brief Name of fifo in caller ID
375  *
376  * For the ringall strategy, this value is a prefix for the
377  * caller ID shown to agents.  If the value starts with '=' then the
378  * actual caller ID is replaced completely.
379  *
380  * For the enterprise strategy, this value is used instead of the
381  * queue name in the caller ID.
382  */
383 struct fifo_node {
384 	char *name;
385 	switch_mutex_t *mutex;
386 	switch_mutex_t *update_mutex;
387 	fifo_queue_t *fifo_list[MAX_PRI];
388 	switch_hash_t *consumer_hash;
389 	int outbound_priority;
390 	int caller_count;
391 	int consumer_count;
392 	int ring_consumer_count;
393 	int member_count;
394 	switch_time_t start_waiting;
395 	uint32_t importance;
396 	switch_thread_rwlock_t *rwlock;
397 	switch_memory_pool_t *pool;
398 	int has_outbound;
399 	int ready;
400 	long busy;
401 	int is_static;
402 	int outbound_per_cycle;
403 	int outbound_per_cycle_min;
404 	char *outbound_name;
405 	outbound_strategy_t outbound_strategy;
406 	int ring_timeout;
407 	int default_lag;
408 	char *domain_name;
409 	int retry_delay;
410 	struct fifo_node *next;
411 };
412 
413 typedef struct fifo_node fifo_node_t;
414 
415 static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session);
416 static void fifo_caller_del(const char *uuid);
417 
418 struct callback {
419 	char *buf;
420 	size_t len;
421 	int matches;
422 };
423 typedef struct callback callback_t;
424 
print_strategy(outbound_strategy_t s)425 static const char *print_strategy(outbound_strategy_t s)
426 {
427 	switch (s) {
428 	case NODE_STRATEGY_RINGALL:
429 		return "ringall";
430 	case NODE_STRATEGY_ENTERPRISE:
431 		return "enterprise";
432 	default:
433 		break;
434 	}
435 
436 	return "invalid";
437 }
438 
parse_strategy(const char * name)439 static outbound_strategy_t parse_strategy(const char *name)
440 {
441 	if (!strcasecmp(name, "ringall")) {
442 		return NODE_STRATEGY_RINGALL;
443 	}
444 
445 	if (!strcasecmp(name, "enterprise")) {
446 		return NODE_STRATEGY_ENTERPRISE;
447 	}
448 
449 	return NODE_STRATEGY_INVALID;
450 }
451 
sql2str_callback(void * pArg,int argc,char ** argv,char ** columnNames)452 static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
453 {
454 	callback_t *cbt = (callback_t *) pArg;
455 
456 	switch_copy_string(cbt->buf, argv[0], cbt->len);
457 	cbt->matches++;
458 	return 0;
459 }
460 
461 /*!\brief Handler for consumer DTMF
462  *
463  * When `fifo_consumer_exit_key` is pressed by the consumer we hangup
464  * on the caller (unless we've put the caller on hold).  The default
465  * exit key is '*'.
466  *
467  * When the consumer presses '0' we put both legs on hold and play
468  * hold music as follows.  To the caller we play `fifo_music` or the
469  * default hold music for the channel.  To the consumer we play
470  * `fifo_hold_music`, or `fifo_music`, or the default hold music for
471  * the channel.  The consumer can press '0' again to pick up the
472  * caller from hold.
473  */
on_dtmf(switch_core_session_t * session,void * input,switch_input_type_t itype,void * buf,unsigned int buflen)474 static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
475 {
476 	switch_core_session_t *bleg = (switch_core_session_t *) buf;
477 
478 	switch (itype) {
479 	case SWITCH_INPUT_TYPE_DTMF:
480 		{
481 			switch_dtmf_t *dtmf = (switch_dtmf_t *) input;
482 			switch_channel_t *bchan = switch_core_session_get_channel(bleg);
483 			switch_channel_t *channel = switch_core_session_get_channel(session);
484 
485 			if (switch_channel_test_flag(switch_core_session_get_channel(session), CF_BRIDGE_ORIGINATOR)) {
486 				const char *consumer_exit_key = switch_channel_get_variable(channel, "fifo_consumer_exit_key");
487 				if (!consumer_exit_key) consumer_exit_key = "*";
488 				if (dtmf->digit == *consumer_exit_key) {
489 					switch_channel_hangup(bchan, SWITCH_CAUSE_NORMAL_CLEARING);
490 					return SWITCH_STATUS_BREAK;
491 				} else if (dtmf->digit == '0') {
492 					const char *moh_a = NULL, *moh_b = NULL;
493 
494 					if (!(moh_b = switch_channel_get_variable(bchan, "fifo_music"))) {
495 						moh_b = switch_channel_get_hold_music(bchan);
496 					}
497 
498 					if (!(moh_a = switch_channel_get_variable(channel, "fifo_hold_music"))) {
499 						if (!(moh_a = switch_channel_get_variable(channel, "fifo_music"))) {
500 							moh_a = switch_channel_get_hold_music(channel);
501 						}
502 					}
503 
504 					switch_ivr_soft_hold(session, "0", moh_a, moh_b);
505 					return SWITCH_STATUS_IGNORE;
506 				}
507 			}
508 		}
509 		break;
510 	default:
511 		break;
512 	}
513 
514 	return SWITCH_STATUS_SUCCESS;
515 }
516 
517 /*!\brief Handler for caller DTMF
518  *
519  * The channel variable `fifo_caller_exit_key` can be set to one or
520  * more digits that when pressed will cause the caller to exit from
521  * the fifo.  We'll return via a single character in `buf` the digit
522  * that was pressed (not null-terminated).
523  */
moh_on_dtmf(switch_core_session_t * session,void * input,switch_input_type_t itype,void * buf,unsigned int buflen)524 static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
525 {
526 	switch (itype) {
527 	case SWITCH_INPUT_TYPE_DTMF:
528 		{
529 			switch_dtmf_t *dtmf = (switch_dtmf_t *) input;
530 			switch_channel_t *channel = switch_core_session_get_channel(session);
531 			const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
532 
533 			if (caller_exit_key && dtmf->digit && strchr(caller_exit_key, dtmf->digit)) {
534 				char *bp = buf;
535 				*bp = dtmf->digit;
536 				return SWITCH_STATUS_BREAK;
537 			}
538 		}
539 		break;
540 	default:
541 		break;
542 	}
543 
544 	return SWITCH_STATUS_SUCCESS;
545 }
546 
cleanup_fifo_arg(const char ** s)547 static inline void cleanup_fifo_arg(const char **s) {
548 	if (!zstr(*s) && !strcasecmp(*s, "undef")) *s = NULL;
549 }
550 
node_caller_count(fifo_node_t * node)551 static int node_caller_count(fifo_node_t *node)
552 {
553 	int i, len = 0;
554 
555 	for (i = 0; i < MAX_PRI; i++) {
556 		len += fifo_queue_size(node->fifo_list[i]);
557 	}
558 
559 	return len;
560 }
561 
node_remove_uuid(fifo_node_t * node,const char * uuid)562 static void node_remove_uuid(fifo_node_t *node, const char *uuid)
563 {
564 	int i = 0;
565 
566 	for (i = 0; i < MAX_PRI; i++) {
567 		fifo_queue_popfly(node->fifo_list[i], uuid);
568 	}
569 
570 	if (!node_caller_count(node)) {
571 		node->start_waiting = 0;
572 	}
573 
574 	fifo_caller_del(uuid);
575 
576 	return;
577 }
578 
579 /*!\struct fifo_chime_data
580  *
581  * \var fifo_chime_data::list
582  * A list of strings representing things to play back to the caller
583  * while they are waiting to be connected with an agent.
584  */
585 #define MAX_CHIME 25
586 struct fifo_chime_data {
587 	char *list[MAX_CHIME];
588 	int total;
589 	int index;
590 	time_t next;
591 	int freq;
592 	int abort;
593 	time_t orbit_timeout;
594 	int do_orbit;
595 	char *orbit_exten;
596 	char *orbit_dialplan;
597 	char *orbit_context;
598 	char *exit_key;
599 };
600 
601 typedef struct fifo_chime_data fifo_chime_data_t;
602 
603 /*!\brief Enforce the `fifo_orbit_timeout`
604  *
605  * If the caller has been waiting longer than the `fifo_orbit_timeout`
606  * we break out so the orbit can do something else with the call.
607  */
chime_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)608 static switch_status_t chime_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
609 {
610 	fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
611 
612 	if (cd && cd->orbit_timeout && switch_epoch_time_now(NULL) >= cd->orbit_timeout) {
613 		cd->do_orbit = 1;
614 		return SWITCH_STATUS_BREAK;
615 	}
616 
617 	return SWITCH_STATUS_SUCCESS;
618 }
619 
620 /*!\brief Handle chimes and timeouts for callers
621  *
622  * Play back the chimes in order spaced out by the given `freq` while
623  * ensuring that we don't exceed the `orbit_timeout`.
624  */
caller_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)625 static switch_status_t caller_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
626 {
627 	fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data;
628 
629 	if (!cd) {
630 		return SWITCH_STATUS_SUCCESS;
631 	}
632 
633 	if (cd->total && switch_epoch_time_now(NULL) >= cd->next) {
634 		if (cd->index == MAX_CHIME || cd->index == cd->total || !cd->list[cd->index]) {
635 			cd->index = 0;
636 		}
637 
638 		if (cd->list[cd->index]) {
639 			switch_input_args_t args = { 0 };
640 			char buf[25] = "";
641 			switch_status_t status;
642 
643 			args.input_callback = moh_on_dtmf;
644 			args.buf = buf;
645 			args.buflen = sizeof(buf);
646 			args.read_frame_callback = chime_read_frame_callback;
647 			args.user_data = user_data;
648 
649 			status = switch_ivr_play_file(session, NULL, cd->list[cd->index], &args);
650 
651 			if (cd->exit_key && *buf && strchr(cd->exit_key, *buf)) {
652 				cd->abort = 1;
653 				return SWITCH_STATUS_BREAK;
654 			}
655 
656 			if (status != SWITCH_STATUS_SUCCESS) {
657 				return SWITCH_STATUS_BREAK;
658 			}
659 
660 			cd->next = switch_epoch_time_now(NULL) + cd->freq;
661 			cd->index++;
662 		}
663 	}
664 
665 	return chime_read_frame_callback(session, frame, user_data);
666 }
667 
668 /*!\brief Handler for waiting consumers
669  *
670  * In `user_data` we'll be passed an array of fifo_nodes representing
671  * the fifos for which this consumer will accept calls.  If any of
672  * those fifos have a caller in them, we break out so we can accept
673  * the call.
674  */
consumer_read_frame_callback(switch_core_session_t * session,switch_frame_t * frame,void * user_data)675 static switch_status_t consumer_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
676 {
677 	fifo_node_t *node, **node_list = (fifo_node_t **) user_data;
678 	int total = 0, i = 0;
679 
680 	for (i = 0;; i++) {
681 		if (!(node = node_list[i])) {
682 			break;
683 		}
684 		total += node_caller_count(node);
685 	}
686 
687 	if (total) {
688 		return SWITCH_STATUS_BREAK;
689 	}
690 
691 	return SWITCH_STATUS_SUCCESS;
692 }
693 
694 static struct {
695 	switch_hash_t *caller_orig_hash;
696 	switch_hash_t *consumer_orig_hash;
697 	switch_hash_t *bridge_hash;
698 	switch_hash_t *use_hash;
699 	switch_mutex_t *use_mutex;
700 	switch_mutex_t *caller_orig_mutex;
701 	switch_mutex_t *consumer_orig_mutex;
702 	switch_mutex_t *bridge_mutex;
703 	switch_hash_t *fifo_hash;
704 	switch_mutex_t *mutex;
705 	switch_mutex_t *sql_mutex;
706 	switch_memory_pool_t *pool;
707 	int running;
708 	switch_event_node_t *node;
709 	char hostname[256];
710 	char *dbname;
711 	char odbc_dsn[1024];
712 	int node_thread_running;
713 	switch_odbc_handle_t *master_odbc;
714 	int threads;
715 	switch_thread_t *node_thread;
716 	int debug;
717 	struct fifo_node *nodes;
718 	char *pre_trans_execute;
719 	char *post_trans_execute;
720 	char *inner_pre_trans_execute;
721 	char *inner_post_trans_execute;
722 	switch_sql_queue_manager_t *qm;
723 	int allow_transcoding;
724 	switch_bool_t delete_all_members_on_startup;
725 	outbound_strategy_t default_strategy;
726 } globals;
727 
fifo_dec_use_count(const char * outbound_id)728 static int fifo_dec_use_count(const char *outbound_id)
729 {
730 	int r = 0, *count;
731 
732 	switch_mutex_lock(globals.use_mutex);
733 	if ((count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
734 		if (*count > 0) {
735 			r = --(*count);
736 		}
737 	}
738 	switch_mutex_unlock(globals.use_mutex);
739 
740 	return r;
741 }
742 
fifo_get_use_count(const char * outbound_id)743 static int fifo_get_use_count(const char *outbound_id)
744 {
745 	int r = 0, *count;
746 
747 	switch_mutex_lock(globals.use_mutex);
748 	if ((count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
749 		r = *count;
750 	}
751 	switch_mutex_unlock(globals.use_mutex);
752 
753 	return r;
754 }
755 
fifo_inc_use_count(const char * outbound_id)756 static int fifo_inc_use_count(const char *outbound_id)
757 {
758 	int r = 0, *count;
759 
760 	switch_mutex_lock(globals.use_mutex);
761 	if (!(count = (int *) switch_core_hash_find(globals.use_hash, outbound_id))) {
762 		count = switch_core_alloc(globals.pool, sizeof(int));
763 		switch_core_hash_insert(globals.use_hash, outbound_id, count);
764 	}
765 
766 	r = ++(*count);
767 
768 	switch_mutex_unlock(globals.use_mutex);
769 
770 	return r;
771 }
772 
fifo_init_use_count(void)773 static void fifo_init_use_count(void)
774 {
775 	switch_mutex_lock(globals.use_mutex);
776 	if (globals.use_hash) {
777 		switch_core_hash_destroy(&globals.use_hash);
778 	}
779 	switch_core_hash_init(&globals.use_hash);
780 	switch_mutex_unlock(globals.use_mutex);
781 }
782 
check_caller_outbound_call(const char * key)783 static int check_caller_outbound_call(const char *key)
784 {
785 	int x = 0;
786 
787 	if (!key) return x;
788 
789 	switch_mutex_lock(globals.caller_orig_mutex);
790 	x = !!switch_core_hash_find(globals.caller_orig_hash, key);
791 	switch_mutex_unlock(globals.caller_orig_mutex);
792 	return x;
793 }
794 
add_caller_outbound_call(const char * key,switch_call_cause_t * cancel_cause)795 static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
796 {
797 	if (!key) return;
798 
799 	switch_mutex_lock(globals.caller_orig_mutex);
800 	switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause);
801 	switch_mutex_unlock(globals.caller_orig_mutex);
802 }
803 
del_caller_outbound_call(const char * key)804 static void del_caller_outbound_call(const char *key)
805 {
806 	if (!key) return;
807 
808 	switch_mutex_lock(globals.caller_orig_mutex);
809 	switch_core_hash_delete(globals.caller_orig_hash, key);
810 	switch_mutex_unlock(globals.caller_orig_mutex);
811 }
812 
cancel_caller_outbound_call(const char * key,switch_call_cause_t cause)813 static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause)
814 {
815 	switch_call_cause_t *cancel_cause = NULL;
816 
817 	if (!key) return;
818 
819 	switch_mutex_lock(globals.caller_orig_mutex);
820 	if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) {
821 		*cancel_cause = cause;
822 	}
823 	switch_mutex_unlock(globals.caller_orig_mutex);
824 }
825 
check_bridge_call(const char * key)826 static int check_bridge_call(const char *key)
827 {
828 	int x = 0;
829 
830 	if (!key) return x;
831 
832 	switch_mutex_lock(globals.bridge_mutex);
833 	x = !!switch_core_hash_find(globals.bridge_hash, key);
834 	switch_mutex_unlock(globals.bridge_mutex);
835 	return x;
836 }
837 
add_bridge_call(const char * key)838 static void add_bridge_call(const char *key)
839 {
840 	static int marker = 1;
841 	if (!key) return;
842 
843 	switch_mutex_lock(globals.bridge_mutex);
844 	switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker);
845 	switch_mutex_unlock(globals.bridge_mutex);
846 }
847 
del_bridge_call(const char * key)848 static void del_bridge_call(const char *key)
849 {
850 	switch_mutex_lock(globals.bridge_mutex);
851 	switch_core_hash_delete(globals.bridge_hash, key);
852 	switch_mutex_unlock(globals.bridge_mutex);
853 }
854 
check_consumer_outbound_call(const char * key)855 static int check_consumer_outbound_call(const char *key)
856 {
857 	int x = 0;
858 
859 	if (!key) return x;
860 
861 	switch_mutex_lock(globals.consumer_orig_mutex);
862 	x = !!switch_core_hash_find(globals.consumer_orig_hash, key);
863 	switch_mutex_unlock(globals.consumer_orig_mutex);
864 	return x;
865 }
866 
add_consumer_outbound_call(const char * key,switch_call_cause_t * cancel_cause)867 static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause)
868 {
869 	if (!key) return;
870 
871 	switch_mutex_lock(globals.consumer_orig_mutex);
872 	switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause);
873 	switch_mutex_unlock(globals.consumer_orig_mutex);
874 }
875 
del_consumer_outbound_call(const char * key)876 static void del_consumer_outbound_call(const char *key)
877 {
878 	if (!key) return;
879 
880 	switch_mutex_lock(globals.consumer_orig_mutex);
881 	switch_core_hash_delete(globals.consumer_orig_hash, key);
882 	switch_mutex_unlock(globals.consumer_orig_mutex);
883 }
884 
cancel_consumer_outbound_call(const char * key,switch_call_cause_t cause)885 static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause)
886 {
887 	switch_call_cause_t *cancel_cause = NULL;
888 
889 	if (!key) return;
890 
891 	switch_mutex_lock(globals.consumer_orig_mutex);
892 	if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) {
893 		*cancel_cause = cause;
894 	}
895 	switch_mutex_unlock(globals.consumer_orig_mutex);
896 }
897 
fifo_get_db_handle(void)898 switch_cache_db_handle_t *fifo_get_db_handle(void)
899 {
900 	switch_cache_db_handle_t *dbh = NULL;
901 	char *dsn;
902 
903 	if (!zstr(globals.odbc_dsn)) {
904 		dsn = globals.odbc_dsn;
905 	} else {
906 		dsn = globals.dbname;
907 	}
908 
909 	if (switch_cache_db_get_db_handle_dsn(&dbh, dsn) != SWITCH_STATUS_SUCCESS) {
910 		dbh = NULL;
911 	}
912 
913 	return dbh;
914 }
915 
fifo_execute_sql_queued(char ** sqlp,switch_bool_t sql_already_dynamic,switch_bool_t block)916 static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block)
917 {
918 	int index = 1;
919 	char *sql;
920 
921 	switch_assert(sqlp && *sqlp);
922 	sql = *sqlp;
923 
924 	if (switch_stristr("insert", sql)) {
925 		index = 0;
926 	}
927 
928 	if (block) {
929 		switch_sql_queue_manager_push_confirm(globals.qm, sql, index, !sql_already_dynamic);
930 	} else {
931 		switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic);
932 	}
933 
934 	if (sql_already_dynamic) {
935 		*sqlp = NULL;
936 	}
937 
938 	return SWITCH_STATUS_SUCCESS;
939 }
940 #if 0
941 static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex)
942 {
943 	switch_cache_db_handle_t *dbh = NULL;
944 	switch_status_t status = SWITCH_STATUS_FALSE;
945 
946 	if (mutex) {
947 		switch_mutex_lock(mutex);
948 	}
949 
950 	if (!(dbh = fifo_get_db_handle())) {
951 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
952 		goto end;
953 	}
954 
955 	if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);
956 
957 	status = switch_cache_db_execute_sql(dbh, sql, NULL);
958 
959   end:
960 
961 	switch_cache_db_release_db_handle(&dbh);
962 
963 	if (mutex) {
964 		switch_mutex_unlock(mutex);
965 	}
966 
967 	return status;
968 }
969 #endif
970 
fifo_execute_sql_callback(switch_mutex_t * mutex,char * sql,switch_core_db_callback_func_t callback,void * pdata)971 static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
972 {
973 	switch_bool_t ret = SWITCH_FALSE;
974 	char *errmsg = NULL;
975 	switch_cache_db_handle_t *dbh = NULL;
976 
977 	if (mutex) {
978 		switch_mutex_lock(mutex);
979 	}
980 
981 	if (!(dbh = fifo_get_db_handle())) {
982 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
983 		goto end;
984 	}
985 
986 	if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);
987 
988 	switch_cache_db_execute_sql_callback(dbh, sql, callback, pdata, &errmsg);
989 
990 	if (errmsg) {
991 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
992 		free(errmsg);
993 	}
994 
995   end:
996 
997 	switch_cache_db_release_db_handle(&dbh);
998 
999 	if (mutex) {
1000 		switch_mutex_unlock(mutex);
1001 	}
1002 
1003 	return ret;
1004 }
1005 
create_node(const char * name,uint32_t importance,switch_mutex_t * mutex)1006 static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mutex_t *mutex)
1007 {
1008 	fifo_node_t *node;
1009 	int x = 0;
1010 	switch_memory_pool_t *pool;
1011 	char outbound_count[80] = "";
1012 	callback_t cbt = { 0 };
1013 	char *sql = NULL;
1014 	if (!globals.running) {
1015 		return NULL;
1016 	}
1017 
1018 	switch_core_new_memory_pool(&pool);
1019 
1020 	node = switch_core_alloc(pool, sizeof(*node));
1021 	node->pool = pool;
1022 	node->outbound_strategy = globals.default_strategy;
1023 	node->name = switch_core_strdup(node->pool, name);
1024 
1025 	if (!strchr(name, '@')) {
1026 		node->domain_name = switch_core_strdup(node->pool, switch_core_get_domain(SWITCH_FALSE));
1027 	}
1028 
1029 	for (x = 0; x < MAX_PRI; x++) {
1030 		fifo_queue_create(&node->fifo_list[x], 1000, node->pool);
1031 		switch_assert(node->fifo_list[x]);
1032 	}
1033 
1034 	switch_core_hash_init(&node->consumer_hash);
1035 	switch_thread_rwlock_create(&node->rwlock, node->pool);
1036 	switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
1037 	switch_mutex_init(&node->update_mutex, SWITCH_MUTEX_NESTED, node->pool);
1038 	cbt.buf = outbound_count;
1039 	cbt.len = sizeof(outbound_count);
1040 	sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
1041 	fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
1042 	node->member_count = atoi(outbound_count);
1043 	node->has_outbound = (node->member_count > 0) ? 1 : 0;
1044 	switch_safe_free(sql);
1045 
1046 	node->importance = importance;
1047 
1048 	switch_mutex_lock(globals.mutex);
1049 
1050 	switch_core_hash_insert(globals.fifo_hash, name, node);
1051 	node->next = globals.nodes;
1052 	globals.nodes = node;
1053 	switch_mutex_unlock(globals.mutex);
1054 
1055 	return node;
1056 }
1057 
node_idle_consumers(fifo_node_t * node)1058 static int node_idle_consumers(fifo_node_t *node)
1059 {
1060 	switch_hash_index_t *hi;
1061 	void *val;
1062 	const void *var;
1063 	switch_core_session_t *session;
1064 	switch_channel_t *channel;
1065 	int total = 0;
1066 
1067 	switch_mutex_lock(node->mutex);
1068 	for (hi = switch_core_hash_first(node->consumer_hash); hi; hi = switch_core_hash_next(&hi)) {
1069 		switch_core_hash_this(hi, &var, NULL, &val);
1070 		session = (switch_core_session_t *) val;
1071 		channel = switch_core_session_get_channel(session);
1072 		if (!switch_channel_test_flag(channel, CF_BRIDGED)) {
1073 			total++;
1074 		}
1075 	}
1076 	switch_mutex_unlock(node->mutex);
1077 
1078 	return total;
1079 }
1080 
1081 struct call_helper {
1082 	char *uuid;
1083 	char *node_name;
1084 	char *originate_string;
1085 	int timeout;
1086 	switch_memory_pool_t *pool;
1087 };
1088 
1089 #define MAX_ROWS 250
1090 struct callback_helper {
1091 	int need;
1092 	switch_memory_pool_t *pool;
1093 	struct call_helper *rows[MAX_ROWS];
1094 	int rowcount;
1095 	int ready;
1096 };
1097 
1098 /*!\brief Handle unbridging of manually tracked calls
1099  */
do_unbridge(switch_core_session_t * consumer_session,switch_core_session_t * caller_session)1100 static void do_unbridge(switch_core_session_t *consumer_session, switch_core_session_t *caller_session)
1101 {
1102 	switch_channel_t *consumer_channel = switch_core_session_get_channel(consumer_session);
1103 	switch_channel_t *caller_channel = NULL;
1104 
1105 	if (caller_session) {
1106 		caller_channel = switch_core_session_get_channel(caller_session);
1107 	}
1108 
1109 	if (switch_channel_test_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG)) {
1110 		char date[80] = "";
1111 		switch_time_exp_t tm;
1112 		switch_time_t ts = switch_micro_time_now();
1113 		switch_size_t retsize;
1114 		long epoch_start = 0, epoch_end = 0;
1115 		const char *epoch_start_a = NULL;
1116 		char *sql;
1117 		switch_event_t *event;
1118 		const char *outbound_id = NULL;
1119 		int use_count = 0;
1120 
1121 		switch_channel_clear_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG);
1122 		switch_channel_set_variable(consumer_channel, "fifo_bridged", NULL);
1123 
1124 		if ((outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid"))) {
1125 			use_count = fifo_get_use_count(outbound_id);
1126 		}
1127 
1128 		switch_time_exp_lt(&tm, ts);
1129 		switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
1130 
1131 		sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session));
1132 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1133 
1134 		switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING");
1135 		switch_channel_set_variable(consumer_channel, "fifo_timestamp", date);
1136 
1137 		if (caller_channel) {
1138 			switch_channel_set_variable(caller_channel, "fifo_status", "DONE");
1139 			switch_channel_set_variable(caller_channel, "fifo_timestamp", date);
1140 		}
1141 
1142 		if ((epoch_start_a = switch_channel_get_variable(consumer_channel, "fifo_epoch_start_bridge"))) {
1143 			epoch_start = atol(epoch_start_a);
1144 		}
1145 
1146 		epoch_end = (long)switch_epoch_time_now(NULL);
1147 
1148 		switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
1149 		switch_channel_set_variable_printf(consumer_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
1150 
1151 		if (caller_channel) {
1152 			switch_channel_set_variable_printf(caller_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
1153 			switch_channel_set_variable_printf(caller_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
1154 		}
1155 
1156 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1157 			switch_channel_event_set_data(consumer_channel, event);
1158 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1159 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
1160 			if (outbound_id) {
1161 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
1162 				switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", use_count);
1163 			}
1164 			switch_event_fire(&event);
1165 		}
1166 
1167 		if (caller_channel) {
1168 			if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1169 				switch_channel_event_set_data(caller_channel, event);
1170 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1171 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop");
1172 				switch_event_fire(&event);
1173 			}
1174 		}
1175 
1176 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1177 			switch_channel_event_set_data(consumer_channel, event);
1178 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1179 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop");
1180 			switch_event_fire(&event);
1181 		}
1182 	}
1183 }
1184 
1185 /*!\brief Handle session messages for manually tracked calls
1186  */
messagehook(switch_core_session_t * session,switch_core_session_message_t * msg)1187 static switch_status_t messagehook (switch_core_session_t *session, switch_core_session_message_t *msg)
1188 {
1189 	switch_event_t *event;
1190 	switch_core_session_t *caller_session = NULL, *consumer_session = NULL;
1191 	switch_channel_t *caller_channel = NULL, *consumer_channel = NULL;
1192 	const char *outbound_id;
1193 	char *sql;
1194 
1195 	consumer_session = session;
1196 	consumer_channel = switch_core_session_get_channel(consumer_session);
1197 	outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
1198 
1199 	if (!outbound_id) return SWITCH_STATUS_SUCCESS;
1200 
1201 	switch (msg->message_id) {
1202 	case SWITCH_MESSAGE_INDICATE_BRIDGE:
1203 	case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
1204 		if (msg->numeric_arg == 42) {
1205 			/* See audio_bridge_thread() for source of 42 constant. */
1206 			/* When a session is interrupted to execute an application
1207 			   (e.g. by uuid_execute) we need to tell everything in FS
1208 			   to unbridge the channel (e.g. to turn on the
1209 			   jitterbuffer) but we need mod_fifo not to see the
1210 			   unbridge because we don't want fifo to stop tracking
1211 			   the call.  So this magic number is a complete hack to
1212 			   make this happen.  So we ignore it here and simply fall
1213 			   through. */
1214 			goto end;
1215 		}
1216 		if ((caller_session = switch_core_session_locate(msg->string_arg))) {
1217 			caller_channel = switch_core_session_get_channel(caller_session);
1218 			if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) {
1219 				cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
1220 				switch_core_session_soft_lock(caller_session, 5);
1221 			} else {
1222 				switch_core_session_soft_unlock(caller_session);
1223 			}
1224 		}
1225 		break;
1226 	case SWITCH_MESSAGE_INDICATE_DISPLAY:
1227 		sql = switch_mprintf("update fifo_bridge set caller_caller_id_name='%q', caller_caller_id_number='%q' where consumer_uuid='%q'",
1228 							 switch_str_nil(msg->string_array_arg[0]),
1229 							 switch_str_nil(msg->string_array_arg[1]),
1230 							 switch_core_session_get_uuid(session));
1231 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1232 		goto end;
1233 	default:
1234 		goto end;
1235 	}
1236 
1237 	switch (msg->message_id) {
1238 	case SWITCH_MESSAGE_INDICATE_BRIDGE:
1239 		{
1240 			long epoch_start = 0;
1241 			char date[80] = "";
1242 			switch_time_t ts;
1243 			switch_time_exp_t tm;
1244 			switch_size_t retsize;
1245 			const char *ced_name, *ced_number, *cid_name, *cid_number, *outbound_id;
1246 
1247 			if (switch_channel_test_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG)) {
1248 				goto end;
1249 			}
1250 
1251 			switch_channel_set_app_flag_key(FIFO_APP_KEY, consumer_channel, FIFO_APP_BRIDGE_TAG);
1252 
1253 			switch_channel_set_variable(consumer_channel, "fifo_bridged", "true");
1254 			switch_channel_set_variable(consumer_channel, "fifo_manual_bridge", "true");
1255 			switch_channel_set_variable(consumer_channel, "fifo_role", "consumer");
1256 			outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid");
1257 
1258 			if (caller_channel) {
1259 				switch_channel_set_variable(caller_channel, "fifo_role", "caller");
1260 				switch_process_import(consumer_session, caller_channel, "fifo_caller_consumer_import",
1261 									  switch_channel_get_variable(consumer_channel, "fifo_import_prefix"));
1262 				switch_process_import(caller_session, consumer_channel, "fifo_consumer_caller_import",
1263 									  switch_channel_get_variable(caller_channel, "fifo_import_prefix"));
1264 			}
1265 
1266 			ced_name = switch_channel_get_variable(consumer_channel, "callee_id_name");
1267 			ced_number = switch_channel_get_variable(consumer_channel, "callee_id_number");
1268 
1269 			cid_name = switch_channel_get_variable(consumer_channel, "caller_id_name");
1270 			cid_number = switch_channel_get_variable(consumer_channel, "caller_id_number");
1271 
1272 			if (switch_channel_direction(consumer_channel) == SWITCH_CALL_DIRECTION_INBOUND) {
1273 				if (zstr(ced_name) || !strcmp(ced_name, cid_name)) {
1274 					ced_name = ced_number;
1275 				}
1276 
1277 				if (zstr(ced_number) || !strcmp(ced_number, cid_number)) {
1278 					ced_name = switch_channel_get_variable(consumer_channel, "destination_number");
1279 					ced_number = ced_name;
1280 				}
1281 			} else {
1282 				ced_name = cid_name;
1283 				ced_number = cid_number;
1284 			}
1285 
1286 			if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1287 				switch_channel_event_set_data(consumer_channel, event);
1288 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1289 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-start");
1290 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", ced_name);
1291 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", ced_number);
1292 				if (outbound_id) {
1293 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
1294 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
1295 				}
1296 				switch_event_fire(&event);
1297 			}
1298 
1299 			if (caller_channel) {
1300 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1301 					switch_channel_event_set_data(caller_channel, event);
1302 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
1303 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-start");
1304 					switch_event_fire(&event);
1305 				}
1306 
1307 				sql = switch_mprintf("insert into fifo_bridge "
1308 									 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
1309 									 "values ('%q','%q','%q','%q','%q','%q',%ld)",
1310 									 MANUAL_QUEUE_NAME,
1311 									 switch_core_session_get_uuid(caller_session),
1312 									 ced_name,
1313 									 ced_number,
1314 									 switch_core_session_get_uuid(session),
1315 									 switch_str_nil(outbound_id),
1316 									 (long) switch_epoch_time_now(NULL)
1317 									 );
1318 			} else {
1319 				sql = switch_mprintf("insert into fifo_bridge "
1320 									 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
1321 									 "values ('%q','%q','%q','%q','%q','%q',%ld)",
1322 									 MANUAL_QUEUE_NAME,
1323 									 (msg->string_arg && strchr(msg->string_arg, '-')) ? msg->string_arg : "00000000-0000-0000-0000-000000000000",
1324 									 ced_name,
1325 									 ced_number,
1326 									 switch_core_session_get_uuid(session),
1327 									 switch_str_nil(outbound_id),
1328 									 (long) switch_epoch_time_now(NULL)
1329 									 );
1330 			}
1331 
1332 			fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
1333 
1334 			ts = switch_micro_time_now();
1335 			switch_time_exp_lt(&tm, ts);
1336 			epoch_start = (long)switch_epoch_time_now(NULL);
1337 			switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
1338 			switch_channel_set_variable(consumer_channel, "fifo_status", "TALKING");
1339 			if (caller_session) {
1340 				switch_channel_set_variable(consumer_channel, "fifo_target", switch_core_session_get_uuid(caller_session));
1341 			}
1342 			switch_channel_set_variable(consumer_channel, "fifo_timestamp", date);
1343 			switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
1344 			switch_channel_set_variable(consumer_channel, "fifo_role", "consumer");
1345 
1346 			if (caller_channel) {
1347 				switch_channel_set_variable(caller_channel, "fifo_status", "TALKING");
1348 				switch_channel_set_variable(caller_channel, "fifo_timestamp", date);
1349 				switch_channel_set_variable_printf(caller_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
1350 				switch_channel_set_variable(caller_channel, "fifo_target", switch_core_session_get_uuid(session));
1351 				switch_channel_set_variable(caller_channel, "fifo_role", "caller");
1352 			}
1353 		}
1354 		break;
1355 	case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
1356 		{
1357 			do_unbridge(consumer_session, caller_session);
1358 		}
1359 		break;
1360 	default:
1361 		break;
1362 	}
1363 
1364  end:
1365 
1366 	if (caller_session) {
1367 		switch_core_session_rwunlock(caller_session);
1368 	}
1369 
1370 	return SWITCH_STATUS_SUCCESS;
1371 }
1372 
1373 /*!\brief Create calls to outbound members with ringall strategy
1374  *
1375  * A fifo node has been selected for us and we've been given a list of
1376  * outbound members to ring.  We're going to pick a single caller by
1377  * searching through the fifo node queues in order of priority
1378  * (`fifo_priority`) from lowest to highest.  We'll look first for
1379  * callers with fifo_vip=true.  Finding none, we'll consider the
1380  * plebs.
1381  *
1382  * Once we have a caller to service, we'll set fifo_bridge_uuid for
1383  * that caller to let the fifo application in on our decision.  Our
1384  * job being done, we'll let the fifo application deal with the
1385  * remaining details.
1386  */
outbound_ringall_thread_run(switch_thread_t * thread,void * obj)1387 static void *SWITCH_THREAD_FUNC outbound_ringall_thread_run(switch_thread_t *thread, void *obj)
1388 {
1389 	struct callback_helper *cbh = (struct callback_helper *) obj;
1390 	char *node_name;
1391 	int i = 0;
1392 	int timeout = 0;
1393 	switch_stream_handle_t stream = { 0 };
1394 	switch_stream_handle_t stream2 = { 0 };
1395 	fifo_node_t *node = NULL;
1396 	char *originate_string = NULL;
1397 	switch_event_t *ovars = NULL;
1398 	switch_status_t status;
1399 	switch_core_session_t *session = NULL;
1400 	switch_call_cause_t cause = SWITCH_CAUSE_NONE;
1401 	char *app_name = NULL, *arg = NULL;
1402 	switch_caller_extension_t *extension = NULL;
1403 	switch_channel_t *channel;
1404 	char *caller_id_name = NULL, *cid_num = NULL, *id = NULL;
1405 	switch_event_t *pop = NULL, *pop_dup = NULL;
1406 	fifo_queue_t *q = NULL;
1407 	int x = 0;
1408 	switch_event_t *event;
1409 	switch_uuid_t uuid;
1410 	char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
1411 	switch_call_cause_t cancel_cause = 0;
1412 	char *uuid_list = NULL;
1413 	int total = 0;
1414 	const char *codec;
1415 	struct call_helper *rows[MAX_ROWS] = { 0 };
1416 	int rowcount = 0;
1417 	switch_memory_pool_t *pool;
1418 	char *export = NULL;
1419 
1420 	switch_mutex_lock(globals.mutex);
1421 	globals.threads++;
1422 	switch_mutex_unlock(globals.mutex);
1423 
1424 	if (!globals.running) goto dpool;
1425 
1426 	switch_uuid_get(&uuid);
1427 	switch_uuid_format(uuid_str, &uuid);
1428 
1429 	if (!cbh->rowcount) {
1430 		goto end;
1431 	}
1432 
1433 	node_name = cbh->rows[0]->node_name;
1434 
1435 	switch_mutex_lock(globals.mutex);
1436 	if ((node = switch_core_hash_find(globals.fifo_hash, node_name))) {
1437 		switch_thread_rwlock_rdlock(node->rwlock);
1438 	}
1439 	switch_mutex_unlock(globals.mutex);
1440 
1441 	if (!node) {
1442 		goto end;
1443 	}
1444 
1445 	for (i = 0; i < cbh->rowcount; i++) {
1446 		struct call_helper *h = cbh->rows[i];
1447 
1448 		if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) {
1449 			continue;
1450 		}
1451 
1452 		rows[rowcount++] = h;
1453 		add_consumer_outbound_call(h->uuid, &cancel_cause);
1454 		total++;
1455 	}
1456 
1457 	for (i = 0; i < rowcount; i++) {
1458 		struct call_helper *h = rows[i];
1459 		cbh->rows[i] = h;
1460 	}
1461 
1462 	cbh->rowcount = rowcount;
1463 
1464 	cbh->ready = 1;
1465 
1466 	if (!total) {
1467 		goto end;
1468 	}
1469 
1470 	switch_mutex_lock(node->update_mutex);
1471 	node->busy = 0;
1472 	node->ring_consumer_count++;
1473 	switch_mutex_unlock(node->update_mutex);
1474 
1475 	SWITCH_STANDARD_STREAM(stream);
1476 	SWITCH_STANDARD_STREAM(stream2);
1477 
1478 	switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
1479 	switch_assert(ovars);
1480 
1481 	for (i = 0; i < cbh->rowcount; i++) {
1482 		struct call_helper *h = cbh->rows[i];
1483 		char *parsed = NULL;
1484 		int use_ent = 0;
1485 		char *expanded_originate_string = switch_event_expand_headers(ovars, h->originate_string);
1486 
1487 		switch_assert(expanded_originate_string);
1488 		if (strstr(expanded_originate_string, "user/")) {
1489 			switch_event_create_brackets(expanded_originate_string, '<', '>', ',', &ovars, &parsed, SWITCH_TRUE);
1490 			use_ent = 1;
1491 		} else {
1492 			switch_event_create_brackets(expanded_originate_string, '{', '}', ',', &ovars, &parsed, SWITCH_TRUE);
1493 		}
1494 
1495 		switch_event_del_header(ovars, "fifo_outbound_uuid");
1496 
1497 		if (!h->timeout) h->timeout = node->ring_timeout;
1498 		if (timeout < h->timeout) timeout = h->timeout;
1499 
1500 		if (use_ent) {
1501 			stream.write_function(&stream, "{ignore_early_media=true,outbound_redirect_fatal=true,leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s}%s%s",
1502 								  h->timeout, h->uuid, node->name,
1503 								  parsed ? parsed : expanded_originate_string, (i == cbh->rowcount - 1) ? "" : SWITCH_ENT_ORIGINATE_DELIM);
1504 		} else {
1505 			stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s,fifo_name=%s]%s,",
1506 								  h->timeout, h->uuid, node->name, parsed ? parsed : expanded_originate_string);
1507 		}
1508 
1509 		stream2.write_function(&stream2, "%s,", h->uuid);
1510 		switch_safe_free(parsed);
1511 
1512 		if (expanded_originate_string != h->originate_string) {
1513 			switch_safe_free(expanded_originate_string);
1514 		}
1515 	}
1516 
1517 	originate_string = (char *) stream.data;
1518 
1519 	uuid_list = (char *) stream2.data;
1520 
1521 	if (uuid_list) {
1522 		end_of(uuid_list) = '\0';
1523 	}
1524 
1525 	if (!timeout) timeout = 60;
1526 
1527 	pop = pop_dup = NULL;
1528 
1529 	for (x = 0; x < MAX_PRI; x++) {
1530 		q = node->fifo_list[x];
1531 		if (fifo_queue_pop_nameval(q, "variable_fifo_vip", "true", &pop_dup, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop_dup) {
1532 			pop = pop_dup;
1533 			break;
1534 		}
1535 	}
1536 
1537 	if (!pop) {
1538 		for (x = 0; x < MAX_PRI; x++) {
1539 			if (fifo_queue_pop(node->fifo_list[x], &pop_dup, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop_dup) {
1540 				pop = pop_dup;
1541 				break;
1542 			}
1543 		}
1544 	}
1545 
1546 	if (!pop) {
1547 		goto end;
1548 	}
1549 
1550 	if (!switch_event_get_header(ovars, "origination_caller_id_name")) {
1551 		if ((caller_id_name = switch_event_get_header(pop, "caller-caller-id-name"))) {
1552 			if (!zstr(node->outbound_name)) {
1553 				if ( node->outbound_name[0] == '=' ) {
1554 					switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", "%s", node->outbound_name + 1);
1555 				} else {
1556 					switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", "(%s) %s", node->outbound_name, caller_id_name);
1557 				}
1558 			} else {
1559 				switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", caller_id_name);
1560 			}
1561 		}
1562 	}
1563 
1564 	if (!switch_event_get_header(ovars, "origination_caller_id_number")) {
1565 		if ((cid_num = switch_event_get_header(pop, "caller-caller-id-number"))) {
1566 			switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_number", cid_num);
1567 		}
1568 	}
1569 
1570 	if ((id = switch_event_get_header(pop, "unique-id"))) {
1571 		switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_bridge_uuid", id);
1572 	}
1573 
1574 	switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_originate_uuid", uuid_str);
1575 
1576 	if ((export = switch_event_get_header(pop, "variable_fifo_export"))) {
1577 		int argc;
1578 		char *argv[100] = { 0 };
1579 		char *mydata = strdup(export);
1580 		char *tmp;
1581 
1582 		argc = switch_split(mydata, ',', argv);
1583 
1584 		for (x = 0; x < argc; x++) {
1585 			char *name = switch_mprintf("variable_%s", argv[x]);
1586 
1587 			if ((tmp = switch_event_get_header(pop, name))) {
1588 				switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, argv[x], tmp);
1589 			}
1590 
1591 			free(name);
1592 		}
1593 
1594 		switch_safe_free(mydata);
1595 	}
1596 
1597 	if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1598 		switch_core_session_t *session;
1599 		if (id && (session = switch_core_session_locate(id))) {
1600 			switch_channel_t *channel = switch_core_session_get_channel(session);
1601 
1602 			switch_channel_set_variable(channel, "fifo_originate_uuid", uuid_str);
1603 			switch_channel_event_set_data(channel, event);
1604 			switch_core_session_rwunlock(session);
1605 		}
1606 
1607 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1608 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "pre-dial");
1609 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1610 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1611 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1612 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1613 
1614 		switch_event_fire(&event);
1615 	}
1616 
1617 	for (i = 0; i < cbh->rowcount; i++) {
1618 		struct call_helper *h = cbh->rows[i];
1619 		char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%q'", h->uuid);
1620 
1621 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1622 	}
1623 
1624 	if (!globals.allow_transcoding && !switch_true(switch_event_get_header(pop, "variable_fifo_allow_transcoding")) &&
1625 		(codec = switch_event_get_header(pop, "variable_rtp_use_codec_name"))) {
1626 		const char *rate = switch_event_get_header(pop, "variable_rtp_use_codec_rate");
1627 		const char *ptime = switch_event_get_header(pop, "variable_rtp_use_codec_ptime");
1628 		char nstr[256] = "";
1629 
1630 		if (strcasecmp(codec, "PCMU") && strcasecmp(codec, "PCMA")) {
1631 			switch_snprintf(nstr, sizeof(nstr), "%s@%si@%sh,PCMU@%si,PCMA@%si", codec, ptime, rate, ptime, ptime);
1632 		} else {
1633 			switch_snprintf(nstr, sizeof(nstr), "%s@%si@%sh", codec, ptime, rate);
1634 		}
1635 
1636 		switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr);
1637 	}
1638 
1639 	add_caller_outbound_call(id, &cancel_cause);
1640 
1641 	if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s dialing: %s\n", node->name, originate_string);
1642 
1643 	status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause, NULL);
1644 
1645 	del_caller_outbound_call(id);
1646 
1647 	if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) {
1648 		const char *acceptable = "false";
1649 
1650 		switch (cause) {
1651 		case SWITCH_CAUSE_ORIGINATOR_CANCEL:
1652 		case SWITCH_CAUSE_PICKED_OFF:
1653 			{
1654 				acceptable = "true";
1655 
1656 				for (i = 0; i < cbh->rowcount; i++) {
1657 					struct call_helper *h = cbh->rows[i];
1658 					char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 "
1659 											   "where uuid='%q' and ring_count > 0", h->uuid);
1660 					fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1661 				}
1662 			}
1663 			break;
1664 		default:
1665 			{
1666 				for (i = 0; i < cbh->rowcount; i++) {
1667 					struct call_helper *h = cbh->rows[i];
1668 					char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
1669 											   "outbound_fail_count=outbound_fail_count+1, "
1670 											   "outbound_fail_total_count = outbound_fail_total_count+1, "
1671 											   "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0",
1672 											   (long) switch_epoch_time_now(NULL) + node->retry_delay, h->uuid);
1673 					fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1674 				}
1675 			}
1676 		}
1677 
1678 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1679 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1680 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1681 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1682 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1683 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "failure");
1684 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "acceptable", acceptable);
1685 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "cause", switch_channel_cause2str(cause));
1686 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1687 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1688 			switch_event_fire(&event);
1689 		}
1690 
1691 		goto end;
1692 	}
1693 
1694 	channel = switch_core_session_get_channel(session);
1695 
1696 	if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1697 		switch_channel_event_set_data(channel, event);
1698 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name);
1699 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1700 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "ringall");
1701 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "caller-uuid", id);
1702 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", switch_channel_get_variable(channel, "fifo_outbound_uuid"));
1703 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID-List", uuid_list);
1704 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "success");
1705 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1706 		switch_event_fire(&event);
1707 	}
1708 
1709 	switch_channel_set_variable(channel, "fifo_pop_order", NULL);
1710 
1711 	app_name = "fifo";
1712 	arg = switch_core_session_sprintf(session, "%s out nowait", node_name);
1713 	extension = switch_caller_extension_new(session, app_name, arg);
1714 	switch_caller_extension_add_application(session, extension, app_name, arg);
1715 	switch_channel_set_caller_extension(channel, extension);
1716 	switch_channel_set_state(channel, CS_EXECUTE);
1717 	switch_channel_wait_for_state(channel, NULL, CS_EXECUTE);
1718 	switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL);
1719 
1720 	switch_core_session_rwunlock(session);
1721 
1722 	for (i = 0; i < cbh->rowcount; i++) {
1723 		struct call_helper *h = cbh->rows[i];
1724 		char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0",  h->uuid);
1725 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1726 	}
1727 
1728   end:
1729 
1730 	cbh->ready = 1;
1731 
1732 	if (node) {
1733 		switch_mutex_lock(node->update_mutex);
1734 		if (--node->ring_consumer_count < 0) {
1735 			node->ring_consumer_count = 0;
1736 		}
1737 		node->busy = 0;
1738 		switch_mutex_unlock(node->update_mutex);
1739 		switch_thread_rwlock_unlock(node->rwlock);
1740 	}
1741 
1742 	for (i = 0; i < cbh->rowcount; i++) {
1743 		struct call_helper *h = cbh->rows[i];
1744 		del_consumer_outbound_call(h->uuid);
1745 	}
1746 
1747 	switch_safe_free(originate_string);
1748 	switch_safe_free(uuid_list);
1749 
1750 	if (ovars) {
1751 		switch_event_destroy(&ovars);
1752 	}
1753 
1754 	if (pop_dup) {
1755 		switch_event_destroy(&pop_dup);
1756 	}
1757 
1758  dpool:
1759 
1760 	pool = cbh->pool;
1761 	switch_core_destroy_memory_pool(&pool);
1762 
1763 	switch_mutex_lock(globals.mutex);
1764 	globals.threads--;
1765 	switch_mutex_unlock(globals.mutex);
1766 
1767 	return NULL;
1768 }
1769 
1770 /*!\brief Send a call to an outbound member with the enterprise strategy
1771  *
1772  * A fifo and an outbound member have been picked out for us and our
1773  * job is to create a channel to the member and deliver that channel
1774  * into the `fifo <fifo> out` application.
1775  *
1776  * We haven't picked a caller yet, and we won't do so here.  We'll let
1777  * the fifo application take care of that work.
1778  */
outbound_enterprise_thread_run(switch_thread_t * thread,void * obj)1779 static void *SWITCH_THREAD_FUNC outbound_enterprise_thread_run(switch_thread_t *thread, void *obj)
1780 {
1781 	struct call_helper *h = (struct call_helper *) obj;
1782 
1783 	switch_core_session_t *session = NULL;
1784 	switch_channel_t *channel;
1785 	switch_call_cause_t cause = SWITCH_CAUSE_NONE;
1786 	switch_caller_extension_t *extension = NULL;
1787 	char *app_name, *arg = NULL, *originate_string = NULL;
1788 	const char *member_wait = NULL;
1789 	fifo_node_t *node = NULL;
1790 	switch_event_t *ovars = NULL;
1791 	switch_status_t status = SWITCH_STATUS_FALSE;
1792 	switch_event_t *event = NULL;
1793 	char *sql = NULL;
1794 	char *expanded_originate_string = NULL;
1795 
1796 	if (!globals.running) return NULL;
1797 
1798 	switch_mutex_lock(globals.mutex);
1799 	globals.threads++;
1800 	switch_mutex_unlock(globals.mutex);
1801 
1802 	switch_mutex_lock(globals.mutex);
1803 	node = switch_core_hash_find(globals.fifo_hash, h->node_name);
1804 	if (node) switch_thread_rwlock_rdlock(node->rwlock);
1805 	switch_mutex_unlock(globals.mutex);
1806 
1807 	if (node) {
1808 		switch_mutex_lock(node->update_mutex);
1809 		node->ring_consumer_count++;
1810 		node->busy = 0;
1811 		switch_mutex_unlock(node->update_mutex);
1812 	}
1813 
1814 	switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
1815 	switch_assert(ovars);
1816 	switch_event_add_header(ovars, SWITCH_STACK_BOTTOM, "originate_timeout", "%d", h->timeout);
1817 
1818 	expanded_originate_string = switch_event_expand_headers(ovars, h->originate_string);
1819 
1820 	if (node && switch_stristr("origination_caller", expanded_originate_string)) {
1821 		originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q'}%s",
1822 										  node->name, node->name, expanded_originate_string);
1823 	} else {
1824 		if (node && !zstr(node->outbound_name)) {
1825 			originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q',"
1826 											  "origination_caller_id_name=Queue,origination_caller_id_number='Queue: %q'}%s",
1827 											  node->name, node->name,  node->outbound_name, expanded_originate_string);
1828 		} else if (node) {
1829 			originate_string = switch_mprintf("{execute_on_answer='unset fifo_hangup_check',fifo_name='%q',fifo_hangup_check='%q',"
1830 											  "origination_caller_id_name=Queue,origination_caller_id_number='Queue: %q'}%s",
1831 											  node->name, node->name,  node->name, expanded_originate_string);
1832 		}
1833 	}
1834 
1835 	if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1836 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1837 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "pre-dial");
1838 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1839 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1840 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1841 		switch_event_fire(&event);
1842 	}
1843 
1844 	sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%q'", h->uuid);
1845 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1846 
1847 	status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL, NULL);
1848 
1849 	if (status != SWITCH_STATUS_SUCCESS) {
1850 		sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, "
1851 							 "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'",
1852 							 (long) switch_epoch_time_now(NULL) + (node ? node->retry_delay : 0), h->uuid);
1853 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1854 
1855 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1856 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1857 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1858 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1859 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1860 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "failure");
1861 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "cause", switch_channel_cause2str(cause));
1862 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1863 			switch_event_fire(&event);
1864 		}
1865 
1866 		goto end;
1867 	}
1868 
1869 	channel = switch_core_session_get_channel(session);
1870 
1871 	if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
1872 		switch_channel_event_set_data(channel, event);
1873 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node ? node->name : "");
1874 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "post-dial");
1875 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Outbound-UUID", h->uuid);
1876 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "outbound-strategy", "enterprise");
1877 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "result", "success");
1878 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "originate_string", originate_string);
1879 		switch_event_fire(&event);
1880 	}
1881 
1882 	if ((member_wait = switch_channel_get_variable(channel, "fifo_member_wait")) || (member_wait = switch_channel_get_variable(channel, "member_wait"))) {
1883 		if (strcasecmp(member_wait, "wait") && strcasecmp(member_wait, "nowait")) {
1884 			member_wait = NULL;
1885 		}
1886 	}
1887 
1888 	switch_channel_set_variable(channel, "fifo_outbound_uuid", h->uuid);
1889 	app_name = "fifo";
1890 	arg = switch_core_session_sprintf(session, "%s out %s", h->node_name, member_wait ? member_wait : "wait");
1891 	extension = switch_caller_extension_new(session, app_name, arg);
1892 	switch_caller_extension_add_application(session, extension, app_name, arg);
1893 	switch_channel_set_caller_extension(channel, extension);
1894 	switch_channel_set_state(channel, CS_EXECUTE);
1895 	switch_core_session_rwunlock(session);
1896 
1897 	sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid);
1898 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
1899 
1900   end:
1901 
1902 	if ( originate_string ){
1903 		switch_safe_free(originate_string);
1904 	}
1905 
1906 	if (expanded_originate_string && expanded_originate_string != h->originate_string) {
1907 		switch_safe_free(expanded_originate_string);
1908 	}
1909 
1910 	switch_event_destroy(&ovars);
1911 	if (node) {
1912 		switch_mutex_lock(node->update_mutex);
1913 		if (--node->ring_consumer_count < 0) {
1914 			node->ring_consumer_count = 0;
1915 		}
1916 		node->busy = 0;
1917 		switch_mutex_unlock(node->update_mutex);
1918 		switch_thread_rwlock_unlock(node->rwlock);
1919 	}
1920 	switch_core_destroy_memory_pool(&h->pool);
1921 
1922 	switch_mutex_lock(globals.mutex);
1923 	globals.threads--;
1924 	switch_mutex_unlock(globals.mutex);
1925 
1926 	return NULL;
1927 }
1928 
1929 /*!\brief Extract the outbound member results and accumulate them for
1930  * the ringall strategy handler
1931  */
place_call_ringall_callback(void * pArg,int argc,char ** argv,char ** columnNames)1932 static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames)
1933 {
1934 	struct callback_helper *cbh = (struct callback_helper *) pArg;
1935 	struct call_helper *h;
1936 
1937 	h = switch_core_alloc(cbh->pool, sizeof(*h));
1938 	h->pool = cbh->pool;
1939 	h->uuid = switch_core_strdup(h->pool, argv[0]);
1940 	h->node_name = switch_core_strdup(h->pool, argv[1]);
1941 	h->originate_string = switch_core_strdup(h->pool, argv[2]);
1942 	h->timeout = atoi(argv[5]);
1943 
1944 	cbh->rows[cbh->rowcount++] = h;
1945 
1946 	if (cbh->rowcount == MAX_ROWS) return -1;
1947 
1948 	if (cbh->need) {
1949 		cbh->need--;
1950 		return cbh->need ? 0 : -1;
1951 	}
1952 
1953 	return 0;
1954 }
1955 
1956 /*!\brief Extract the outbound member results and invoke the
1957  * enterprise strategy handler
1958  */
place_call_enterprise_callback(void * pArg,int argc,char ** argv,char ** columnNames)1959 static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames)
1960 {
1961 	int *need = (int *) pArg;
1962 
1963 	switch_thread_t *thread;
1964 	switch_threadattr_t *thd_attr = NULL;
1965 	switch_memory_pool_t *pool;
1966 	struct call_helper *h;
1967 
1968 	switch_core_new_memory_pool(&pool);
1969 	h = switch_core_alloc(pool, sizeof(*h));
1970 	h->pool = pool;
1971 	h->uuid = switch_core_strdup(h->pool, argv[0]);
1972 	h->node_name = switch_core_strdup(h->pool, argv[1]);
1973 	h->originate_string = switch_core_strdup(h->pool, argv[2]);
1974 	h->timeout = atoi(argv[5]);
1975 
1976 	switch_threadattr_create(&thd_attr, h->pool);
1977 	switch_threadattr_detach_set(thd_attr, 1);
1978 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
1979 	switch_thread_create(&thread, thd_attr, outbound_enterprise_thread_run, h, h->pool);
1980 
1981 	(*need)--;
1982 
1983 	return *need ? 0 : -1;
1984 }
1985 
1986 /*!\brief Find outbound members to call for a given fifo node
1987  *
1988  * We're given a fifo node that has callers to be delivered to agents.
1989  * Our job is to find available outbound members and pass them to the
1990  * appropriate outbound strategy handler.
1991  *
1992  * The ringall strategy handler needs the full list of members to do
1993  * its job, so we first let `place_call_ringall_callback` accumulate
1994  * the results.  The enterprise strategy handler can simply take each
1995  * member one at a time, so the `place_call_enterprise_callback` takes
1996  * care of invoking the handler.
1997  *
1998  * Within the ringall call strategy outbound_per_cycle is used to define
1999  * how many agents exactly are assigned to the caller. With ringall if
2000  * multiple callers are calling in and one is answered, because the call
2001  * is assigned to all agents the call to the agents that is not answered
2002  * will be lose raced and the other agents will drop the call before the
2003  * next one will begin to ring. When oubound_per_cycle is used in the
2004  * enterprise strategy it acts as a maximum value for how many agents
2005  * are rung at once on any call, the caller is not assigned to any agent
2006  * until the call is answered. Enterprise only rings the number of phones
2007  * that are needed, so outbound_per_cycle as a max does not give you the
2008  * effect of ringall. outbound_per_cycle_min defines how many agents minimum
2009  * will be rung by an incoming caller through fifo, which can give a ringall
2010  * effect. outbound_per_cycle and outbound_per_cycle_min both default to 1.
2011  *
2012  */
find_consumers(fifo_node_t * node)2013 static int find_consumers(fifo_node_t *node)
2014 {
2015 	char *sql;
2016 	int ret = 0;
2017 
2018 	sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
2019 						 "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
2020 						 "from fifo_outbound "
2021 						 "where taking_calls = 1 and (fifo_name = '%q') and ((use_count+ring_count) < simo_count) and (next_avail = 0 or next_avail <= %ld) "
2022 						 "order by next_avail, outbound_fail_count, outbound_call_count",
2023 						 node->name, (long) switch_epoch_time_now(NULL)
2024 						 );
2025 
2026 	switch(node->outbound_strategy) {
2027 	case NODE_STRATEGY_ENTERPRISE:
2028 		{
2029 			int need = node_caller_count(node);
2030 			int count;
2031 
2032 			if (node->outbound_per_cycle && node->outbound_per_cycle < need) {
2033 				need = node->outbound_per_cycle;
2034 			} else if (node->outbound_per_cycle_min && node->outbound_per_cycle_min > need) {
2035 				need = node->outbound_per_cycle_min;
2036 			}
2037 
2038 			count = need;
2039 			fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
2040 			ret = count - need;
2041 		}
2042 		break;
2043 	case NODE_STRATEGY_RINGALL:
2044 		{
2045 			switch_thread_t *thread;
2046 			switch_threadattr_t *thd_attr = NULL;
2047 			struct callback_helper *cbh = NULL;
2048 			switch_memory_pool_t *pool = NULL;
2049 
2050 			switch_core_new_memory_pool(&pool);
2051 			cbh = switch_core_alloc(pool, sizeof(*cbh));
2052 			cbh->pool = pool;
2053 			cbh->need = 1;
2054 
2055 			if (node->outbound_per_cycle != cbh->need) {
2056 				cbh->need = node->outbound_per_cycle;
2057 			}
2058 
2059 			fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
2060 
2061 			if (cbh->rowcount) {
2062 				ret = cbh->rowcount;
2063 				switch_threadattr_create(&thd_attr, cbh->pool);
2064 				switch_threadattr_detach_set(thd_attr, 1);
2065 				switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
2066 				switch_thread_create(&thread, thd_attr, outbound_ringall_thread_run, cbh, cbh->pool);
2067 			} else {
2068 				switch_core_destroy_memory_pool(&pool);
2069 			}
2070 		}
2071 		break;
2072 	default:
2073 		break;
2074 	}
2075 
2076 	switch_safe_free(sql);
2077 	return ret;
2078 }
2079 
2080 /*\brief Continuously attempt to deliver calls to outbound members
2081  *
2082  * For each outbound priority level 1-10, find fifo nodes with a
2083  * matching priority.  For each of those nodes with outbound members,
2084  * run `find_consumers()` if the fifo node has calls needing to be
2085  * delivered and not enough ready and waiting inbound consumers.
2086  *
2087  * In the event of nothing needing to be done, each cycle starts at
2088  * priority 1 and ends at priority 10, yielding for one second
2089  * afterward.  We also yield after initiating outbound calls, starting
2090  * again where we left off on the next node.
2091  *
2092  * We also take care of cleaning up after nodes queued for deletion.
2093  */
node_thread_run(switch_thread_t * thread,void * obj)2094 static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
2095 {
2096 	fifo_node_t *node, *last, *this_node;
2097 	int cur_priority = 1;
2098 
2099 	globals.node_thread_running = 1;
2100 
2101 	while (globals.node_thread_running == 1) {
2102 		int ppl_waiting, consumer_total, idle_consumers, need_sleep = 0;
2103 
2104 		switch_mutex_lock(globals.mutex);
2105 
2106 		if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
2107 
2108 		last = NULL;
2109 		node = globals.nodes;
2110 
2111 		while(node) {
2112 			int x = 0;
2113 			switch_event_t *pop = NULL;
2114 
2115 			this_node = node;
2116 			node = node->next;
2117 
2118 			if (this_node->ready == 0) {
2119 				for (x = 0; x < MAX_PRI; x++) {
2120 					while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
2121 						const char *caller_uuid = switch_event_get_header(pop, "unique-id");
2122 						switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST);
2123 						switch_event_destroy(&pop);
2124 					}
2125 				}
2126 			}
2127 
2128 			if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
2129 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
2130 
2131 				for (x = 0; x < MAX_PRI; x++) {
2132 					while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
2133 						switch_event_destroy(&pop);
2134 					}
2135 				}
2136 
2137 				if (last) {
2138 					last->next = this_node->next;
2139 				} else {
2140 					globals.nodes = this_node->next;
2141 				}
2142 
2143 				switch_core_hash_destroy(&this_node->consumer_hash);
2144 				switch_mutex_unlock(this_node->mutex);
2145 				switch_mutex_unlock(this_node->update_mutex);
2146 				switch_thread_rwlock_unlock(this_node->rwlock);
2147 				switch_core_destroy_memory_pool(&this_node->pool);
2148 				continue;
2149 			}
2150 
2151 			last = this_node;
2152 
2153 			if (this_node->outbound_priority == 0) this_node->outbound_priority = 5;
2154 
2155 			if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) {
2156 				ppl_waiting = node_caller_count(this_node);
2157 				consumer_total = this_node->consumer_count;
2158 				idle_consumers = node_idle_consumers(this_node);
2159 
2160 				if (globals.debug) {
2161 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
2162 									  "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
2163 									  this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
2164 				}
2165 
2166 				if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
2167 					if (find_consumers(this_node)) {
2168 						need_sleep++;
2169 					}
2170 				}
2171 			}
2172 		}
2173 
2174 		if (++cur_priority > 10) {
2175 			cur_priority = 1;
2176 		}
2177 
2178 		switch_mutex_unlock(globals.mutex);
2179 
2180 		if (cur_priority == 1 || need_sleep) {
2181 			switch_yield(1000000);
2182 		}
2183 	}
2184 
2185 	globals.node_thread_running = 0;
2186 
2187 	return NULL;
2188 }
2189 
start_node_thread(switch_memory_pool_t * pool)2190 static void start_node_thread(switch_memory_pool_t *pool)
2191 {
2192 	switch_threadattr_t *thd_attr = NULL;
2193 
2194 	switch_threadattr_create(&thd_attr, pool);
2195 	//switch_threadattr_detach_set(thd_attr, 1);
2196 	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
2197 	switch_thread_create(&globals.node_thread, thd_attr, node_thread_run, pool, pool);
2198 }
2199 
stop_node_thread(void)2200 static int stop_node_thread(void)
2201 {
2202 	switch_status_t st = SWITCH_STATUS_SUCCESS;
2203 
2204 	globals.node_thread_running = -1;
2205 	switch_thread_join(&st, globals.node_thread);
2206 
2207 	return 0;
2208 }
2209 
check_cancel(fifo_node_t * node)2210 static void check_cancel(fifo_node_t *node)
2211 {
2212 	int ppl_waiting;
2213 
2214 	if (node->outbound_strategy != NODE_STRATEGY_ENTERPRISE) {
2215 		return;
2216 	}
2217 
2218 	ppl_waiting = node_caller_count(node);
2219 
2220 	if (node->ring_consumer_count > 0 && ppl_waiting < 1) {
2221 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Outbound call count (%d) exceeds required value for queue %s (%d), "
2222 						  "Ending extraneous calls\n", node->ring_consumer_count, node->name, ppl_waiting);
2223 
2224 		switch_core_session_hupall_matching_var("fifo_hangup_check", node->name, SWITCH_CAUSE_ORIGINATOR_CANCEL);
2225 	}
2226 }
2227 
send_presence(fifo_node_t * node)2228 static void send_presence(fifo_node_t *node)
2229 {
2230 	switch_event_t *event;
2231 	int wait_count = 0;
2232 
2233 	if (!globals.running) {
2234 		return;
2235 	}
2236 
2237 	if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
2238 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto", "queue");
2239 
2240 		if (node->domain_name) {
2241 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s@%s", node->name, node->domain_name);
2242 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", node->name, node->domain_name);
2243 		} else {
2244 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "login", node->name);
2245 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "from", node->name);
2246 		}
2247 
2248 		if ((wait_count = node_caller_count(node)) > 0) {
2249 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "force-status", "Active (%d waiting)", wait_count);
2250 		} else {
2251 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "force-status", "Idle");
2252 		}
2253 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "rpid", "unknown");
2254 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
2255 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
2256 		switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_count", "%d", 0);
2257 
2258 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "channel-state", wait_count > 0 ? "CS_ROUTING" : "CS_HANGUP");
2259 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "unique-id", node->name);
2260 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "answer-state", wait_count > 0 ? "confirmed" : "terminated");
2261 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "presence-call-direction", "inbound");
2262 		switch_event_fire(&event);
2263 	}
2264 }
2265 
pres_event_handler(switch_event_t * event)2266 static void pres_event_handler(switch_event_t *event)
2267 {
2268 	char *to = switch_event_get_header(event, "to");
2269 	char *domain_name = NULL;
2270 	char *dup_to = NULL, *node_name , *dup_node_name;
2271 	fifo_node_t *node;
2272 
2273 	if (!globals.running) {
2274 		return;
2275 	}
2276 
2277 	if (!to || strncasecmp(to, "queue+", 6) || !strchr(to, '@')) {
2278 		return;
2279 	}
2280 
2281 	dup_to = strdup(to);
2282 	switch_assert(dup_to);
2283 
2284 	node_name = dup_to + 6;
2285 
2286 	if ((domain_name = strchr(node_name, '@'))) {
2287 		*domain_name++ = '\0';
2288 	}
2289 
2290 	dup_node_name = switch_mprintf("%q@%q", node_name, domain_name);
2291 
2292 	switch_mutex_lock(globals.mutex);
2293 	if (!(node = switch_core_hash_find(globals.fifo_hash, node_name)) && !(node = switch_core_hash_find(globals.fifo_hash, dup_node_name))) {
2294 		node = create_node(node_name, 0, globals.sql_mutex);
2295 		node->domain_name = switch_core_strdup(node->pool, domain_name);
2296 		node->ready = 1;
2297 	}
2298 
2299 	switch_thread_rwlock_rdlock(node->rwlock);
2300 	send_presence(node);
2301 	switch_thread_rwlock_unlock(node->rwlock);
2302 
2303 	switch_mutex_unlock(globals.mutex);
2304 
2305 	switch_safe_free(dup_to);
2306 	switch_safe_free(dup_node_name);
2307 }
2308 
fifo_add_outbound(const char * node_name,const char * url,uint32_t priority)2309 static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32_t priority)
2310 {
2311 	fifo_node_t *node;
2312 	switch_event_t *call_event;
2313 	uint32_t i = 0;
2314 
2315 	if (priority >= MAX_PRI) {
2316 		priority = MAX_PRI - 1;
2317 	}
2318 
2319 	if (!node_name) return 0;
2320 
2321 	switch_mutex_lock(globals.mutex);
2322 
2323 	if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) {
2324 		node = create_node(node_name, 0, globals.sql_mutex);
2325 	}
2326 
2327 	switch_thread_rwlock_rdlock(node->rwlock);
2328 
2329 	switch_mutex_unlock(globals.mutex);
2330 
2331 	switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
2332 	switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "dial-url", url);
2333 
2334 	fifo_queue_push(node->fifo_list[priority], call_event);
2335 	call_event = NULL;
2336 
2337 	i = fifo_queue_size(node->fifo_list[priority]);
2338 
2339 	switch_thread_rwlock_unlock(node->rwlock);
2340 
2341 	return i;
2342 }
2343 
SWITCH_STANDARD_API(fifo_check_bridge_function)2344 SWITCH_STANDARD_API(fifo_check_bridge_function)
2345 {
2346 	stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false");
2347 
2348 	return SWITCH_STATUS_SUCCESS;
2349 }
2350 
SWITCH_STANDARD_API(fifo_add_outbound_function)2351 SWITCH_STANDARD_API(fifo_add_outbound_function)
2352 {
2353 	char *data = NULL, *argv[4] = { 0 };
2354 	int argc;
2355 	uint32_t priority = 0;
2356 
2357 	if (zstr(cmd)) {
2358 		goto fail;
2359 	}
2360 
2361 	data = strdup(cmd);
2362 
2363 	if ((argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 2 || !argv[0]) {
2364 		goto fail;
2365 	}
2366 
2367 	if (argv[2]) {
2368 		int tmp = atoi(argv[2]);
2369 		if (tmp > 0) {
2370 			priority = tmp;
2371 		}
2372 	}
2373 
2374 	stream->write_function(stream, "%d", fifo_add_outbound(argv[0], argv[1], priority));
2375 
2376 	free(data);
2377 	return SWITCH_STATUS_SUCCESS;
2378 
2379   fail:
2380 
2381 	free(data);
2382 	stream->write_function(stream, "0");
2383 	return SWITCH_STATUS_SUCCESS;
2384 }
2385 
dec_use_count(switch_core_session_t * session,const char * type)2386 static void dec_use_count(switch_core_session_t *session, const char *type)
2387 {
2388 	char *sql;
2389 	const char *outbound_id = NULL;
2390 	switch_event_t *event;
2391 	long now = (long) switch_epoch_time_now(NULL);
2392 	switch_channel_t *channel = switch_core_session_get_channel(session);
2393 
2394 	if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) {
2395 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s untracking call on uuid %s!\n", switch_channel_get_name(channel), outbound_id);
2396 
2397 		sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
2398 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
2399 
2400 		del_bridge_call(outbound_id);
2401 		sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'",
2402 							 now, now, outbound_id);
2403 		fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2404 		fifo_dec_use_count(outbound_id);
2405 	}
2406 
2407 	do_unbridge(session, NULL);
2408 
2409 	if (type) {
2410 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2411 			uint64_t hold_usec = 0, tt_usec = 0;
2412 			switch_caller_profile_t *originator_cp = NULL;
2413 
2414 			originator_cp = switch_channel_get_caller_profile(channel);
2415 			switch_channel_event_set_data(channel, event);
2416 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
2417 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-stop");
2418 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", type);
2419 			if (outbound_id) {
2420 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
2421 				switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
2422 			}
2423 			hold_usec = originator_cp->times->hold_accum;
2424 			tt_usec = (switch_micro_time_now() - originator_cp->times->bridged) - hold_usec;
2425 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-us", "%"SWITCH_TIME_T_FMT, originator_cp->times->bridged);
2426 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000));
2427 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000000));
2428 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
2429 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
2430 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
2431 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
2432 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
2433 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
2434 
2435 			switch_event_fire(&event);
2436 		}
2437 	}
2438 }
2439 
hanguphook(switch_core_session_t * session)2440 static switch_status_t hanguphook(switch_core_session_t *session)
2441 {
2442 	switch_channel_t *channel = switch_core_session_get_channel(session);
2443 	switch_channel_state_t state = switch_channel_get_state(channel);
2444 
2445 	if (state >= CS_HANGUP && !switch_channel_test_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_DID_HOOK)) {
2446 		dec_use_count(session, "manual");
2447 		switch_core_event_hook_remove_state_change(session, hanguphook);
2448 		switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_DID_HOOK);
2449 	}
2450 
2451 	return SWITCH_STATUS_SUCCESS;
2452 }
2453 
SWITCH_STANDARD_APP(fifo_track_call_function)2454 SWITCH_STANDARD_APP(fifo_track_call_function)
2455 {
2456 	switch_channel_t *channel = switch_core_session_get_channel(session);
2457 	char *sql;
2458 	const char *col1 = NULL, *col2 = NULL, *cid_name, *cid_number;
2459 	switch_event_t *event;
2460 
2461 	if (zstr(data)) {
2462 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid!\n");
2463 		return;
2464 	}
2465 
2466 	if (switch_channel_test_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING)) {
2467 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s trying to double-track call!\n", switch_channel_get_name(channel));
2468 		return;
2469 	}
2470 
2471 	switch_channel_set_variable(channel, "fifo_outbound_uuid", data);
2472 	switch_channel_set_variable(channel, "fifo_track_call", "true");
2473 
2474 	add_bridge_call(data);
2475 
2476 	switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING);
2477 
2478 	switch_core_event_hook_add_receive_message(session, messagehook);
2479 	switch_core_event_hook_add_state_run(session, hanguphook);
2480 
2481 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s tracking call on uuid %s!\n", switch_channel_get_name(channel), data);
2482 
2483 	if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) {
2484 		col1 = "manual_calls_in_count";
2485 		col2 = "manual_calls_in_total_count";
2486 	} else {
2487 		col1 = "manual_calls_out_count";
2488 		col2 = "manual_calls_out_total_count";
2489 	}
2490 
2491 	sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'",
2492 						 (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data);
2493 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2494 	fifo_inc_use_count(data);
2495 
2496 	if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
2497 		cid_name = switch_channel_get_variable(channel, "destination_number");
2498 		cid_number = cid_name;
2499 	} else {
2500 		cid_name = switch_channel_get_variable(channel, "caller_id_name");
2501 		cid_number = switch_channel_get_variable(channel, "caller_id_number");
2502 	}
2503 
2504 	if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2505 		switch_channel_event_set_data(channel, event);
2506 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
2507 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-start");
2508 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", data);
2509 		switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(data));
2510 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "manual");
2511 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", cid_name);
2512 		switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", cid_number);
2513 		switch_event_fire(&event);
2514 	}
2515 }
2516 
fifo_caller_add(fifo_node_t * node,switch_core_session_t * session)2517 static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session)
2518 {
2519 	char *sql;
2520 	switch_channel_t *channel = switch_core_session_get_channel(session);
2521 
2522 	sql = switch_mprintf("insert into fifo_callers (fifo_name,uuid,caller_caller_id_name,caller_caller_id_number,timestamp) "
2523 						 "values ('%q','%q','%q','%q',%ld)",
2524 						 node->name,
2525 						 switch_core_session_get_uuid(session),
2526 						 switch_str_nil(switch_channel_get_variable(channel, "caller_id_name")),
2527 						 switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")),
2528 						 switch_epoch_time_now(NULL));
2529 
2530 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2531 }
2532 
fifo_caller_del(const char * uuid)2533 static void fifo_caller_del(const char *uuid)
2534 {
2535 	char *sql;
2536 
2537 	if (uuid) {
2538 		sql = switch_mprintf("delete from fifo_callers where uuid='%q'", uuid);
2539 	} else {
2540 		sql = switch_mprintf("delete from fifo_callers");
2541 	}
2542 
2543 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
2544 }
2545 
2546 typedef enum {
2547 	STRAT_MORE_PPL,
2548 	STRAT_WAITING_LONGER,
2549 } fifo_strategy_t;
2550 
2551 #define MAX_NODES_PER_CONSUMER 25
2552 #define FIFO_DESC "Fifo for stacking parked calls."
2553 #define FIFO_USAGE "<fifo name>[!<importance_number>] [in [<announce file>|undef] [<music file>|undef] | out [wait|nowait] [<announce file>|undef] [<music file>|undef]]"
SWITCH_STANDARD_APP(fifo_function)2554 SWITCH_STANDARD_APP(fifo_function)
2555 {
2556 	int argc;
2557 	char *mydata = NULL, *argv[5] = { 0 };
2558 	fifo_node_t *node = NULL, *node_list[MAX_NODES_PER_CONSUMER + 1] = { 0 };
2559 	switch_channel_t *channel = switch_core_session_get_channel(session);
2560 	int do_destroy = 0, do_wait = 1, node_count = 0, i = 0;
2561 	const char *moh = NULL;
2562 	const char *announce = NULL;
2563 	switch_event_t *event = NULL;
2564 	char date[80] = "";
2565 	switch_time_exp_t tm;
2566 	switch_time_t ts;
2567 	switch_size_t retsize;
2568 	char *list_string;
2569 	int nlist_count;
2570 	char *nlist[MAX_NODES_PER_CONSUMER];
2571 	int consumer = 0, in_table = 0;
2572 	const char *arg_fifo_name = NULL;
2573 	const char *arg_inout = NULL;
2574 	const char *serviced_uuid = NULL;
2575 
2576 	if (!globals.running) {
2577 		return;
2578 	}
2579 
2580 	if (zstr(data)) {
2581 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "No Args\n");
2582 		return;
2583 	}
2584 
2585 	switch_channel_set_variable(channel, "fifo_hangup_check", NULL);
2586 
2587 	mydata = switch_core_session_strdup(session, data);
2588 	switch_assert(mydata);
2589 
2590 	argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
2591 	arg_fifo_name = argv[0];
2592 	arg_inout = argv[1];
2593 
2594 	if (!(arg_fifo_name && arg_inout)) {
2595 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2596 		return;
2597 	}
2598 
2599 	if (!strcasecmp(arg_inout, "out")) {
2600 		consumer = 1;
2601 	} else if (strcasecmp(arg_inout, "in")) {
2602 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2603 		return;
2604 	}
2605 
2606 	list_string = switch_core_session_strdup(session, arg_fifo_name);
2607 
2608 	if (!(nlist_count = switch_separate_string(list_string, ',', nlist, (sizeof(nlist) / sizeof(nlist[0]))))) {
2609 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2610 		return;
2611 	}
2612 
2613 	if (!consumer && nlist_count > 1) {
2614 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2615 		return;
2616 	}
2617 
2618 	switch_mutex_lock(globals.mutex);
2619 	for (i = 0; i < nlist_count; i++) {
2620 		int importance = 0;
2621 		char *p;
2622 
2623 		if ((p = strrchr(nlist[i], '!'))) {
2624 			*p++ = '\0';
2625 			importance = atoi(p);
2626 			if (importance < 0) {
2627 				importance = 0;
2628 			}
2629 		}
2630 
2631 		if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
2632 			node = create_node(nlist[i], importance, globals.sql_mutex);
2633 			node->ready = 1;
2634 		}
2635 
2636 		switch_thread_rwlock_rdlock(node->rwlock);
2637 		node_list[node_count++] = node;
2638 	}
2639 
2640 	switch_mutex_unlock(globals.mutex);
2641 
2642 	moh = switch_channel_get_variable(channel, "fifo_music");
2643 	announce = switch_channel_get_variable(channel, "fifo_announce");
2644 
2645 	if (consumer) {
2646 		if (argc > 3) {
2647 			announce = argv[3];
2648 		}
2649 
2650 		if (argc > 4) {
2651 			moh = argv[4];
2652 		}
2653 	} else {
2654 		if (argc > 2) {
2655 			announce = argv[2];
2656 		}
2657 
2658 		if (argc > 3) {
2659 			moh = argv[3];
2660 		}
2661 	}
2662 
2663 	if (moh && !strcasecmp(moh, "silence")) {
2664 		moh = NULL;
2665 	}
2666 
2667 	cleanup_fifo_arg(&announce);
2668 	cleanup_fifo_arg(&moh);
2669 	switch_assert(node);
2670 
2671 	switch_core_media_bug_pause(session);
2672 
2673 	if (!consumer) {
2674 		switch_core_session_t *other_session;
2675 		switch_channel_t *other_channel;
2676 		const char *uuid = switch_core_session_get_uuid(session);
2677 		const char *pri;
2678 		char tmp[25] = "";
2679 		int p = 0;
2680 		int aborted = 0;
2681 		fifo_chime_data_t cd = { {0} };
2682 		const char *chime_list = switch_channel_get_variable(channel, "fifo_chime_list");
2683 		const char *chime_freq = switch_channel_get_variable(channel, "fifo_chime_freq");
2684 		const char *orbit_exten = switch_channel_get_variable(channel, "fifo_orbit_exten");
2685 		const char *orbit_dialplan = switch_channel_get_variable(channel, "fifo_orbit_dialplan");
2686 		const char *orbit_context = switch_channel_get_variable(channel, "fifo_orbit_context");
2687 
2688 		const char *orbit_ann = switch_channel_get_variable(channel, "fifo_orbit_announce");
2689 		const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key");
2690 		int freq = 30;
2691 		int ftmp = 0;
2692 		int to = 60;
2693 		switch_event_t *call_event;
2694 
2695 		if (orbit_exten) {
2696 			char *ot;
2697 			if ((cd.orbit_exten = switch_core_session_strdup(session, orbit_exten))) {
2698 				if ((ot = strchr(cd.orbit_exten, ':'))) {
2699 					*ot++ = '\0';
2700 					if ((to = atoi(ot)) < 0) {
2701 						to = 60;
2702 					}
2703 				}
2704 				cd.orbit_timeout = switch_epoch_time_now(NULL) + to;
2705 			}
2706 			cd.orbit_dialplan = switch_core_session_strdup(session, orbit_dialplan);
2707 			cd.orbit_context = switch_core_session_strdup(session, orbit_context);
2708 		}
2709 
2710 		if (chime_freq) {
2711 			ftmp = atoi(chime_freq);
2712 			if (ftmp > 0) {
2713 				freq = ftmp;
2714 			}
2715 		}
2716 
2717 		switch_channel_answer(channel);
2718 
2719 		switch_mutex_lock(node->update_mutex);
2720 
2721 		if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
2722 			p = atoi(pri);
2723 		}
2724 
2725 		if (p >= MAX_PRI) {
2726 			p = MAX_PRI - 1;
2727 		}
2728 
2729 		if (!node_caller_count(node)) {
2730 			node->start_waiting = switch_micro_time_now();
2731 		}
2732 
2733 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2734 			switch_channel_event_set_data(channel, event);
2735 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2736 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push");
2737 			switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p);
2738 			switch_event_fire(&event);
2739 		}
2740 
2741 		switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
2742 		switch_channel_event_set_data(channel, call_event);
2743 
2744 		fifo_queue_push(node->fifo_list[p], call_event);
2745 		fifo_caller_add(node, session);
2746 		in_table = 1;
2747 
2748 		call_event = NULL;
2749 		switch_snprintf(tmp, sizeof(tmp), "%d", fifo_queue_size(node->fifo_list[p]));
2750 		switch_channel_set_variable(channel, "fifo_position", tmp);
2751 
2752 		if (!pri) {
2753 			switch_snprintf(tmp, sizeof(tmp), "%d", p);
2754 			switch_channel_set_variable(channel, "fifo_priority", tmp);
2755 		}
2756 
2757 		switch_mutex_unlock(node->update_mutex);
2758 
2759 		ts = switch_micro_time_now();
2760 		switch_time_exp_lt(&tm, ts);
2761 		switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2762 		switch_channel_set_variable(channel, "fifo_status", "WAITING");
2763 		switch_channel_set_variable(channel, "fifo_timestamp", date);
2764 		switch_channel_set_variable(channel, "fifo_push_timestamp", date);
2765 		switch_channel_set_variable(channel, "fifo_serviced_uuid", NULL);
2766 
2767 		switch_channel_set_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
2768 
2769 		if (chime_list) {
2770 			char *list_dup = switch_core_session_strdup(session, chime_list);
2771 			cd.total = switch_separate_string(list_dup, ',', cd.list, (sizeof(cd.list) / sizeof(cd.list[0])));
2772 			cd.freq = freq;
2773 			cd.next = switch_epoch_time_now(NULL) + cd.freq;
2774 			cd.exit_key = (char *) switch_channel_get_variable(channel, "fifo_caller_exit_key");
2775 		}
2776 
2777 		send_presence(node);
2778 
2779 		while (switch_channel_ready(channel)) {
2780 			switch_input_args_t args = { 0 };
2781 			char buf[25] = "";
2782 			switch_status_t rstatus;
2783 
2784 			args.input_callback = moh_on_dtmf;
2785 			args.buf = buf;
2786 			args.buflen = sizeof(buf);
2787 
2788 			if (cd.total || cd.orbit_timeout) {
2789 				args.read_frame_callback = caller_read_frame_callback;
2790 				args.user_data = &cd;
2791 			}
2792 
2793 			if (cd.abort || cd.do_orbit) {
2794 				aborted = 1;
2795 				goto abort;
2796 			}
2797 
2798 			if ((serviced_uuid = switch_channel_get_variable(channel, "fifo_serviced_uuid"))) {
2799 				break;
2800 			}
2801 
2802 			switch_core_session_flush_private_events(session);
2803 
2804 			if (moh) {
2805 				rstatus = switch_ivr_play_file(session, NULL, moh, &args);
2806 			} else {
2807 				rstatus = switch_ivr_collect_digits_callback(session, &args, 0, 0);
2808 			}
2809 
2810 			if (!SWITCH_READ_ACCEPTABLE(rstatus)) {
2811 				aborted = 1;
2812 				goto abort;
2813 			}
2814 
2815 			if (caller_exit_key && *buf && strchr(caller_exit_key, *buf)) {
2816 				switch_channel_set_variable(channel, "fifo_caller_exit_key", (char *)buf);
2817 				aborted = 1;
2818 				goto abort;
2819 			}
2820 		}
2821 
2822 		if (!serviced_uuid && switch_channel_ready(channel)) {
2823 			switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
2824 		} else if ((other_session = switch_core_session_locate(serviced_uuid))) {
2825 			int ready;
2826 			other_channel = switch_core_session_get_channel(other_session);
2827 			ready = switch_channel_ready(other_channel);
2828 			switch_core_session_rwunlock(other_session);
2829 			if (!ready) {
2830 				switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
2831 			}
2832 		}
2833 
2834 		switch_core_session_flush_private_events(session);
2835 
2836 		if (switch_channel_ready(channel)) {
2837 			if (announce) {
2838 				switch_ivr_play_file(session, NULL, announce, NULL);
2839 			}
2840 		}
2841 
2842 	abort:
2843 
2844 		switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
2845 
2846 		if (!aborted && switch_channel_ready(channel)) {
2847 			switch_channel_set_state(channel, CS_HIBERNATE);
2848 			goto done;
2849 		} else {
2850 			ts = switch_micro_time_now();
2851 			switch_time_exp_lt(&tm, ts);
2852 			switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2853 			switch_channel_set_variable(channel, "fifo_status", cd.do_orbit ? "TIMEOUT" : "ABORTED");
2854 			switch_channel_set_variable(channel, "fifo_timestamp", date);
2855 
2856 			if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2857 				switch_channel_event_set_data(channel, event);
2858 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2859 				switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort");
2860 				switch_event_fire(&event);
2861 			}
2862 
2863 			switch_mutex_lock(globals.mutex);
2864 			switch_mutex_lock(node->update_mutex);
2865 			node_remove_uuid(node, uuid);
2866 			switch_mutex_unlock(node->update_mutex);
2867 			send_presence(node);
2868 			check_cancel(node);
2869 			switch_mutex_unlock(globals.mutex);
2870 		}
2871 
2872 		if ((switch_true(switch_channel_get_variable(channel, "fifo_caller_exit_to_orbit")) || cd.do_orbit) && cd.orbit_exten) {
2873 			if (orbit_ann) {
2874 				switch_ivr_play_file(session, NULL, orbit_ann, NULL);
2875 			}
2876 
2877 			if (strcmp(cd.orbit_exten, "_continue_")) {
2878 				switch_ivr_session_transfer(session, cd.orbit_exten, cd.orbit_dialplan, cd.orbit_context);
2879 			}
2880 		}
2881 
2882 		cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL);
2883 
2884 		goto done;
2885 	} else {					/* consumer */
2886 		switch_event_t *pop = NULL;
2887 		switch_frame_t *read_frame;
2888 		switch_status_t status;
2889 		switch_core_session_t *other_session;
2890 		switch_input_args_t args = { 0 };
2891 		const char *pop_order = NULL;
2892 		int custom_pop = 0;
2893 		int pop_array[MAX_PRI] = { 0 };
2894 		char *pop_list[MAX_PRI] = { 0 };
2895 		const char *fifo_consumer_wrapup_sound = NULL;
2896 		const char *fifo_consumer_wrapup_key = NULL;
2897 		const char *sfifo_consumer_wrapup_time = NULL;
2898 		uint32_t fifo_consumer_wrapup_time = 0;
2899 		switch_time_t wrapup_time_elapsed = 0, wrapup_time_started = 0, wrapup_time_remaining = 0;
2900 		const char *my_id;
2901 		char buf[5] = "";
2902 		const char *strat_str = switch_channel_get_variable(channel, "fifo_strategy");
2903 		fifo_strategy_t strat = STRAT_WAITING_LONGER;
2904 		const char *url = NULL;
2905 		const char *caller_uuid = NULL;
2906 		const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid");
2907 		switch_event_t *event;
2908 		const char *cid_name = NULL, *cid_number = NULL;
2909 
2910 		//const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count");
2911 		//int do_track = switch_true(track_use_count);
2912 
2913 		if (switch_core_event_hook_remove_receive_message(session, messagehook) == SWITCH_STATUS_SUCCESS) {
2914 			dec_use_count(session, NULL);
2915 			switch_core_event_hook_remove_state_change(session, hanguphook);
2916 			switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_TRACKING);
2917 		}
2918 
2919 		if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) {
2920 			cid_name = switch_channel_get_variable(channel, "callee_id_name");
2921 			cid_number = switch_channel_get_variable(channel, "callee_id_number");
2922 
2923 			if (!cid_name) {
2924 				cid_name = switch_channel_get_variable(channel, "destination_number");
2925 			}
2926 			if (!cid_number) {
2927 				cid_number = cid_name;
2928 			}
2929 		} else {
2930 			cid_name = switch_channel_get_variable(channel, "caller_id_name");
2931 			cid_number = switch_channel_get_variable(channel, "caller_id_number");
2932 		}
2933 
2934 		if (!zstr(strat_str)) {
2935 			if (!strcasecmp(strat_str, "more_ppl")) {
2936 				strat = STRAT_MORE_PPL;
2937 			} else if (!strcasecmp(strat_str, "waiting_longer")) {
2938 				strat = STRAT_WAITING_LONGER;
2939 			} else {
2940 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Invalid strategy\n");
2941 				goto done;
2942 			}
2943 		}
2944 
2945 		if (argc > 2) {
2946 			if (!strcasecmp(argv[2], "nowait")) {
2947 				do_wait = 0;
2948 			} else if (strcasecmp(argv[2], "wait")) {
2949 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE);
2950 				goto done;
2951 			}
2952 		}
2953 
2954 		if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) {
2955 			my_id = switch_core_session_get_uuid(session);
2956 		}
2957 
2958 		if (do_wait) {
2959 			for (i = 0; i < node_count; i++) {
2960 				if (!(node = node_list[i])) {
2961 					continue;
2962 				}
2963 				switch_mutex_lock(node->mutex);
2964 				node->consumer_count++;
2965 				switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session);
2966 				switch_mutex_unlock(node->mutex);
2967 			}
2968 			switch_channel_answer(channel);
2969 		}
2970 
2971 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
2972 			switch_channel_event_set_data(channel, event);
2973 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
2974 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_start");
2975 			switch_event_fire(&event);
2976 		}
2977 
2978 		ts = switch_micro_time_now();
2979 		switch_time_exp_lt(&tm, ts);
2980 		switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
2981 		switch_channel_set_variable(channel, "fifo_status", "WAITING");
2982 		switch_channel_set_variable(channel, "fifo_timestamp", date);
2983 
2984 		if ((pop_order = switch_channel_get_variable(channel, "fifo_pop_order"))) {
2985 			char *tmp = switch_core_session_strdup(session, pop_order);
2986 			int x;
2987 			custom_pop = switch_separate_string(tmp, ',', pop_list, (sizeof(pop_list) / sizeof(pop_list[0])));
2988 			if (custom_pop >= MAX_PRI) {
2989 				custom_pop = MAX_PRI - 1;
2990 			}
2991 
2992 			for (x = 0; x < custom_pop; x++) {
2993 				int temp;
2994 				switch_assert(pop_list[x]);
2995 				temp = atoi(pop_list[x]);
2996 				if (temp > -1 && temp < MAX_PRI) {
2997 					pop_array[x] = temp;
2998 				}
2999 			}
3000 		} else {
3001 			int x = 0;
3002 			for (x = 0; x < MAX_PRI; x++) {
3003 				pop_array[x] = x;
3004 			}
3005 		}
3006 
3007 		while (switch_channel_ready(channel)) {
3008 			int x = 0, winner = -1;
3009 			switch_time_t longest = (0xFFFFFFFFFFFFFFFFULL / 2);
3010 			uint32_t importance = 0, waiting = 0, most_waiting = 0;
3011 
3012 			pop = NULL;
3013 
3014 			if (moh && do_wait) {
3015 				switch_status_t moh_status;
3016 				memset(&args, 0, sizeof(args));
3017 				args.read_frame_callback = consumer_read_frame_callback;
3018 				args.user_data = node_list;
3019 				moh_status = switch_ivr_play_file(session, NULL, moh, &args);
3020 
3021 				if (!SWITCH_READ_ACCEPTABLE(moh_status)) {
3022 					break;
3023 				}
3024 			}
3025 
3026 			/* Before we can pick a caller we have to decide on a fifo
3027 			   node to service if the consumer can service more than
3028 			   one.
3029 
3030 			   If all fifos have an importance of zero, we'll find the
3031 			   first node that wins based on the chosen strategy.
3032 
3033 			   The `waiting_longer` strategy will choose the node that
3034 			   hasn't been empty for the longest time.
3035 
3036 			   The `more_ppl` strategy will choose the node that has
3037 			   the most people waiting.
3038 
3039 			   If a node has an importance value set, it will cause us
3040 			   to ignore later nodes with equivalent or lower
3041 			   importance values.  This means that a node with the
3042 			   same importance that would otherwise win based on the
3043 			   strategy will not be considered at all if it comes
3044 			   later in the list.  Note also that the high importance
3045 			   node may still lose if a considered fifo earlier in the
3046 			   list beats it per the strategy.
3047 
3048 			   Note that when the consumer has been delivered by an
3049 			   outbound strategy there will only be one fifo node
3050 			   passed to us, so neither the importance nor the
3051 			   strategy here will have any effect.
3052 			*/
3053 			for (i = 0; i < node_count; i++) {
3054 				if (!(node = node_list[i])) {
3055 					continue;
3056 				}
3057 
3058 				if ((waiting = node_caller_count(node))) {
3059 					if (!importance || node->importance > importance) {
3060 						if (strat == STRAT_WAITING_LONGER) {
3061 							if (node->start_waiting < longest) {
3062 								longest = node->start_waiting;
3063 								winner = i;
3064 							}
3065 						} else {
3066 							if (waiting > most_waiting) {
3067 								most_waiting = waiting;
3068 								winner = i;
3069 							}
3070 						}
3071 					}
3072 
3073 					if (node->importance > importance) {
3074 						importance = node->importance;
3075 					}
3076 				}
3077 			}
3078 
3079 			if (winner > -1) {
3080 				node = node_list[winner];
3081 			} else {
3082 				node = NULL;
3083 			}
3084 
3085 			if (node) {
3086 				const char *varval, *check = NULL;
3087 
3088 				check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required");
3089 
3090 				/* Handle predestined calls, including calls from the ringall strategy */
3091 				if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
3092 					if (check_bridge_call(varval) && switch_true(check)) {
3093 						switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
3094 										  switch_channel_get_name(channel));
3095 						goto done;
3096 					}
3097 
3098 					cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
3099 
3100 					for (x = 0; x < MAX_PRI; x++) {
3101 						if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3102 							cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF);
3103 							break;
3104 						}
3105 					}
3106 					if (!pop && switch_true(check)) {
3107 						switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n",
3108 										  switch_channel_get_name(channel));
3109 
3110 						goto done;
3111 					}
3112 				}
3113 
3114 				if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
3115 					for (x = 0; x < MAX_PRI; x++) {
3116 						if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_skill",
3117 												   varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3118 							break;
3119 						}
3120 					}
3121 				}
3122 
3123 				if (!pop) {
3124 					for (x = 0; x < MAX_PRI; x++) {
3125 						if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_vip", "true",
3126 												   &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3127 							break;
3128 						}
3129 					}
3130 				}
3131 
3132 				if (!pop) {
3133 					if (custom_pop) {
3134 						for (x = 0; x < MAX_PRI; x++) {
3135 							if (fifo_queue_pop(node->fifo_list[pop_array[x]], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3136 								break;
3137 							}
3138 						}
3139 					} else {
3140 						for (x = 0; x < MAX_PRI; x++) {
3141 							if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
3142 								break;
3143 							}
3144 						}
3145 					}
3146 				}
3147 
3148 				if (pop && !node_caller_count(node)) {
3149 					switch_mutex_lock(node->update_mutex);
3150 					node->start_waiting = 0;
3151 					switch_mutex_unlock(node->update_mutex);
3152 				}
3153 			}
3154 
3155 			if (!pop) {
3156 				if (!do_wait) {
3157 					break;
3158 				}
3159 
3160 				status = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0);
3161 
3162 				if (!SWITCH_READ_ACCEPTABLE(status)) {
3163 					break;
3164 				}
3165 
3166 				continue;
3167 			}
3168 
3169 			url = switch_event_get_header(pop, "dial-url");
3170 			caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id"));
3171 			switch_event_destroy(&pop);
3172 
3173 			if (url) {
3174 				switch_call_cause_t cause = SWITCH_CAUSE_NONE;
3175 				const char *o_announce = NULL;
3176 
3177 				if ((o_announce = switch_channel_get_variable(channel, "fifo_outbound_announce"))) {
3178 					status = switch_ivr_play_file(session, NULL, o_announce, NULL);
3179 					if (!SWITCH_READ_ACCEPTABLE(status)) {
3180 						break;
3181 					}
3182 				}
3183 
3184 				if (switch_ivr_originate(session, &other_session, &cause, url, 120, NULL, NULL, NULL, NULL, NULL, SOF_NONE, NULL, NULL) != SWITCH_STATUS_SUCCESS) {
3185 					other_session = NULL;
3186 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Originate to [%s] failed, cause: %s\n", url,
3187 									  switch_channel_cause2str(cause));
3188 
3189 					if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3190 						switch_channel_event_set_data(channel, event);
3191 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3192 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_outbound");
3193 						switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Result", "failure:%s", switch_channel_cause2str(cause));
3194 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Outbound-URL", url);
3195 						switch_event_fire(&event);
3196 					}
3197 				} else {
3198 					if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3199 						switch_channel_event_set_data(channel, event);
3200 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3201 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_outbound");
3202 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Result", "success");
3203 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Outbound-URL", url);
3204 						switch_event_fire(&event);
3205 					}
3206 					url = NULL;
3207 					caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session));
3208 				}
3209 			} else {
3210 				if ((other_session = switch_core_session_locate(caller_uuid))) {
3211 					switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
3212 					if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3213 						switch_channel_event_set_data(other_channel, event);
3214 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3215 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "caller_pop");
3216 						switch_event_fire(&event);
3217 					}
3218 				}
3219 			}
3220 
3221 			if (node && other_session) {
3222 				switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
3223 				switch_caller_profile_t *originator_cp, *originatee_cp;
3224 				const char *o_announce = NULL;
3225 				const char *record_template = switch_channel_get_variable(channel, "fifo_record_template");
3226 				char *expanded = NULL;
3227 				char *sql = NULL;
3228 				long epoch_start, epoch_end;
3229 
3230 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3231 					switch_channel_event_set_data(channel, event);
3232 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3233 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop");
3234 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session));
3235 					switch_event_fire(&event);
3236 				}
3237 
3238 				if ((o_announce = switch_channel_get_variable(other_channel, "fifo_override_announce"))) {
3239 					announce = o_announce;
3240 				}
3241 
3242 				if (announce) {
3243 					status = switch_ivr_play_file(session, NULL, announce, NULL);
3244 					if (!SWITCH_READ_ACCEPTABLE(status)) {
3245 						break;
3246 					}
3247 				}
3248 
3249 				switch_channel_set_variable(other_channel, "fifo_serviced_by", my_id);
3250 				switch_channel_set_variable(other_channel, "fifo_serviced_uuid", switch_core_session_get_uuid(session));
3251 				switch_channel_set_flag(other_channel, CF_BREAK);
3252 
3253 				while (switch_channel_ready(channel) && switch_channel_ready(other_channel) &&
3254 					   switch_channel_test_app_flag_key(FIFO_APP_KEY, other_channel, FIFO_APP_BRIDGE_TAG)) {
3255 					status = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0);
3256 					if (!SWITCH_READ_ACCEPTABLE(status)) {
3257 						break;
3258 					}
3259 				}
3260 
3261 				if (!(switch_channel_ready(channel))) {
3262 					const char *app = switch_channel_get_variable(other_channel, "current_application");
3263 					const char *arg = switch_channel_get_variable(other_channel, "current_application_data");
3264 					switch_caller_extension_t *extension = NULL;
3265 
3266 					switch_channel_set_variable_printf(channel, "last_sent_callee_id_name", "%s (AGENT FAIL)",
3267 													   switch_channel_get_variable(other_channel, "caller_id_name"));
3268 					switch_channel_set_variable(channel, "last_sent_callee_id_number", switch_channel_get_variable(other_channel, "caller_id_number"));
3269 
3270 					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING,
3271 									  "Customer %s %s [%s] appears to be abandoned by agent %s [%s] "
3272 									  "but is still on the line, redirecting them back to the queue with VIP status.\n",
3273 									  switch_channel_get_name(other_channel),
3274 									  switch_channel_get_variable(other_channel, "caller_id_name"),
3275 									  switch_channel_get_variable(other_channel, "caller_id_number"),
3276 									  switch_channel_get_variable(channel, "caller_id_name"),
3277 									  switch_channel_get_variable(channel, "caller_id_number"));
3278 
3279 					switch_channel_wait_for_state_timeout(other_channel, CS_HIBERNATE, 5000);
3280 
3281 					send_presence(node);
3282 					check_cancel(node);
3283 
3284 					if (app) {
3285 						extension = switch_caller_extension_new(other_session, app, arg);
3286 						switch_caller_extension_add_application(other_session, extension, app, arg);
3287 						switch_channel_set_caller_extension(other_channel, extension);
3288 						switch_channel_set_state(other_channel, CS_EXECUTE);
3289 					} else {
3290 						switch_channel_hangup(other_channel, SWITCH_CAUSE_NORMAL_CLEARING);
3291 					}
3292 					switch_channel_set_variable(other_channel, "fifo_vip", "true");
3293 
3294 					switch_core_session_rwunlock(other_session);
3295 					break;
3296 				}
3297 
3298 				switch_channel_answer(channel);
3299 
3300 				if (switch_channel_inbound_display(other_channel)) {
3301 					if (switch_channel_direction(other_channel) == SWITCH_CALL_DIRECTION_INBOUND) {
3302 						switch_channel_set_flag(other_channel, CF_BLEG);
3303 					}
3304 				}
3305 
3306 				switch_channel_step_caller_profile(channel);
3307 				switch_channel_step_caller_profile(other_channel);
3308 
3309 				originator_cp = switch_channel_get_caller_profile(channel);
3310 				originatee_cp = switch_channel_get_caller_profile(other_channel);
3311 
3312 				switch_channel_set_originator_caller_profile(other_channel, switch_caller_profile_clone(other_session, originator_cp));
3313 				switch_channel_set_originatee_caller_profile(channel, switch_caller_profile_clone(session, originatee_cp));
3314 
3315 				originator_cp->callee_id_name = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_name);
3316 				originator_cp->callee_id_number = switch_core_strdup(originator_cp->pool, originatee_cp->callee_id_number);
3317 
3318 				originatee_cp->callee_id_name = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_name);
3319 				originatee_cp->callee_id_number = switch_core_strdup(originatee_cp->pool, originatee_cp->caller_id_number);
3320 
3321 				originatee_cp->caller_id_name = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_name);
3322 				originatee_cp->caller_id_number = switch_core_strdup(originatee_cp->pool, originator_cp->caller_id_number);
3323 
3324 				ts = switch_micro_time_now();
3325 				switch_time_exp_lt(&tm, ts);
3326 				epoch_start = (long)switch_epoch_time_now(NULL);
3327 				switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
3328 				switch_channel_set_variable(channel, "fifo_status", "TALKING");
3329 				switch_channel_set_variable(channel, "fifo_target", caller_uuid);
3330 				switch_channel_set_variable(channel, "fifo_timestamp", date);
3331 				switch_channel_set_variable_printf(channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
3332 				switch_channel_set_variable(channel, "fifo_role", "consumer");
3333 
3334 				switch_channel_set_variable(other_channel, "fifo_status", "TALKING");
3335 				switch_channel_set_variable(other_channel, "fifo_timestamp", date);
3336 				switch_channel_set_variable_printf(other_channel, "fifo_epoch_start_bridge", "%ld", epoch_start);
3337 				switch_channel_set_variable(other_channel, "fifo_target", switch_core_session_get_uuid(session));
3338 				switch_channel_set_variable(other_channel, "fifo_role", "caller");
3339 
3340 				send_presence(node);
3341 
3342 				if (record_template) {
3343 					expanded = switch_channel_expand_variables(other_channel, record_template);
3344 					switch_ivr_record_session(session, expanded, 0, NULL);
3345 				}
3346 
3347 				switch_core_media_bug_resume(session);
3348 				switch_core_media_bug_resume(other_session);
3349 
3350 				switch_process_import(session, other_channel, "fifo_caller_consumer_import", switch_channel_get_variable(channel, "fifo_import_prefix"));
3351 				switch_process_import(other_session, channel, "fifo_consumer_caller_import", switch_channel_get_variable(other_channel, "fifo_import_prefix"));
3352 
3353 				if (outbound_id) {
3354 					cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL);
3355 					add_bridge_call(outbound_id);
3356 
3357 					sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%q'",
3358 										 switch_epoch_time_now(NULL), outbound_id);
3359 
3360 					fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
3361 					fifo_inc_use_count(outbound_id);
3362 				}
3363 
3364 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3365 					switch_channel_event_set_data(channel, event);
3366 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3367 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-start");
3368 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "onhook");
3369 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Name", cid_name);
3370 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-CID-Number", cid_number);
3371 					if (outbound_id) {
3372 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3373 						switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3374 					}
3375 					switch_event_fire(&event);
3376 				}
3377 
3378 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3379 					switch_channel_event_set_data(channel, event);
3380 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3381 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-start");
3382 					if (outbound_id) {
3383 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3384 						switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3385 					}
3386 
3387 					switch_event_fire(&event);
3388 				}
3389 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3390 					switch_channel_event_set_data(other_channel, event);
3391 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3392 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-start");
3393 					switch_event_fire(&event);
3394 				}
3395 
3396 				add_bridge_call(switch_core_session_get_uuid(other_session));
3397 				add_bridge_call(switch_core_session_get_uuid(session));
3398 
3399 				sql = switch_mprintf("insert into fifo_bridge "
3400 									 "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) "
3401 									 "values ('%q','%q','%q','%q','%q','%q',%ld)",
3402 									 node->name,
3403 									 switch_core_session_get_uuid(other_session),
3404 									 switch_str_nil(switch_channel_get_variable(other_channel, "caller_id_name")),
3405 									 switch_str_nil(switch_channel_get_variable(other_channel, "caller_id_number")),
3406 									 switch_core_session_get_uuid(session),
3407 									 switch_str_nil(outbound_id),
3408 									 (long) switch_epoch_time_now(NULL)
3409 									 );
3410 
3411 				fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
3412 
3413 				switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session));
3414 				switch_channel_set_variable(other_channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(session));
3415 
3416 				switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", "true");
3417 				switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", "caller");
3418 				switch_channel_set_variable(switch_core_session_get_channel(session), "fifo_initiated_bridge", "true");
3419 				switch_channel_set_variable(switch_core_session_get_channel(session), "fifo_bridge_role", "consumer");
3420 
3421 				switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session);
3422 
3423 				if (switch_channel_test_flag(other_channel, CF_TRANSFER) && switch_channel_up(other_channel)) {
3424 					switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
3425 					switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
3426 				}
3427 
3428 				if (switch_channel_test_flag(channel, CF_TRANSFER) && switch_channel_up(channel)) {
3429 					switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_initiated_bridge", NULL);
3430 					switch_channel_set_variable(switch_core_session_get_channel(other_session), "fifo_bridge_role", NULL);
3431 				}
3432 
3433 				if (outbound_id) {
3434 					long now = (long) switch_epoch_time_now(NULL);
3435 
3436 					sql = switch_mprintf("update fifo_outbound set stop_time=%ld, use_count=use_count-1, "
3437 										 "outbound_call_total_count=outbound_call_total_count+1, "
3438 										 "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%q' and use_count > 0",
3439 										 now, now, outbound_id);
3440 
3441 					fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
3442 
3443 					del_bridge_call(outbound_id);
3444 					fifo_dec_use_count(outbound_id);
3445 				}
3446 
3447 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3448 					uint64_t hold_usec = 0, tt_usec = 0;
3449 					switch_channel_event_set_data(channel, event);
3450 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", arg_fifo_name);
3451 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "channel-consumer-stop");
3452 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Type", "onhook");
3453 					if (outbound_id) {
3454 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3455 						switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3456 					}
3457 					hold_usec = originator_cp->times->hold_accum;
3458 					tt_usec = (switch_micro_time_now() - originator_cp->times->bridged) - hold_usec;
3459 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-us", "%"SWITCH_TIME_T_FMT, originator_cp->times->bridged);
3460 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000));
3461 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Bridge-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(originator_cp->times->bridged / 1000000));
3462 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
3463 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
3464 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
3465 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
3466 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
3467 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
3468 
3469 					switch_event_fire(&event);
3470 				}
3471 
3472 				del_bridge_call(switch_core_session_get_uuid(session));
3473 				del_bridge_call(switch_core_session_get_uuid(other_session));
3474 
3475 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3476 					switch_channel_event_set_data(channel, event);
3477 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3478 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
3479 					if (outbound_id) {
3480 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Outbound-ID", outbound_id);
3481 						switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Consumer-Use-Count", "%d", fifo_get_use_count(outbound_id));
3482 					}
3483 					switch_event_fire(&event);
3484 				}
3485 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3486 					uint64_t hold_usec = 0, tt_usec = 0;
3487 					switch_channel_event_set_data(other_channel, event);
3488 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3489 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop");
3490 					hold_usec = originatee_cp->times->hold_accum;
3491 					tt_usec = (switch_micro_time_now() - originatee_cp->times->bridged) - hold_usec;
3492 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-us", "%"SWITCH_TIME_T_FMT, tt_usec);
3493 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000));
3494 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Talk-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(tt_usec / 1000000));
3495 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-us", "%"SWITCH_TIME_T_FMT, hold_usec);
3496 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-ms", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000));
3497 					switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-Hold-Time-s", "%"SWITCH_TIME_T_FMT, (uint64_t)(hold_usec / 1000000));
3498 					switch_event_fire(&event);
3499 				}
3500 
3501 				epoch_end = (long)switch_epoch_time_now(NULL);
3502 
3503 				switch_channel_set_variable_printf(channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
3504 				switch_channel_set_variable_printf(channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
3505 
3506 				switch_channel_set_variable_printf(other_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end);
3507 				switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start);
3508 
3509 				sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session));
3510 				fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
3511 
3512 				if (switch_channel_ready(channel)) {
3513 					switch_core_media_bug_pause(session);
3514 				}
3515 
3516 				if (record_template) {
3517 					switch_ivr_stop_record_session(session, expanded);
3518 					if (expanded != record_template) {
3519 						switch_safe_free(expanded);
3520 					}
3521 				}
3522 
3523 				ts = switch_micro_time_now();
3524 				switch_time_exp_lt(&tm, ts);
3525 				switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
3526 				switch_channel_set_variable(channel, "fifo_status", "WAITING");
3527 				switch_channel_set_variable(channel, "fifo_timestamp", date);
3528 
3529 				switch_channel_set_variable(other_channel, "fifo_status", "DONE");
3530 				switch_channel_set_variable(other_channel, "fifo_timestamp", date);
3531 
3532 				send_presence(node);
3533 				check_cancel(node);
3534 
3535 				switch_core_session_rwunlock(other_session);
3536 
3537 				if (!do_wait || !switch_channel_ready(channel)) {
3538 					break;
3539 				}
3540 
3541 				fifo_consumer_wrapup_sound = switch_channel_get_variable(channel, "fifo_consumer_wrapup_sound");
3542 				fifo_consumer_wrapup_key = switch_channel_get_variable(channel, "fifo_consumer_wrapup_key");
3543 				sfifo_consumer_wrapup_time = switch_channel_get_variable(channel, "fifo_consumer_wrapup_time");
3544 				if (!zstr(sfifo_consumer_wrapup_time)) {
3545 					fifo_consumer_wrapup_time = atoi(sfifo_consumer_wrapup_time);
3546 				} else {
3547 					fifo_consumer_wrapup_time = 5000;
3548 				}
3549 
3550 				memset(buf, 0, sizeof(buf));
3551 
3552 				if (fifo_consumer_wrapup_time || !zstr(fifo_consumer_wrapup_key)) {
3553 					switch_channel_set_variable(channel, "fifo_status", "WRAPUP");
3554 					if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3555 						switch_channel_event_set_data(channel, event);
3556 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3557 						switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_wrapup");
3558 						switch_event_fire(&event);
3559 					}
3560 				}
3561 
3562 				if (!zstr(fifo_consumer_wrapup_sound)) {
3563 					memset(&args, 0, sizeof(args));
3564 					args.buf = buf;
3565 					args.buflen = sizeof(buf);
3566 					status = switch_ivr_play_file(session, NULL, fifo_consumer_wrapup_sound, &args);
3567 					if (!SWITCH_READ_ACCEPTABLE(status)) {
3568 						break;
3569 					}
3570 				}
3571 
3572 				if (fifo_consumer_wrapup_time) {
3573 					wrapup_time_started = switch_micro_time_now();
3574 
3575 					if (!zstr(fifo_consumer_wrapup_key) && strcmp(buf, fifo_consumer_wrapup_key)) {
3576 						while (switch_channel_ready(channel)) {
3577 							char terminator = 0;
3578 
3579 							if (fifo_consumer_wrapup_time) {
3580 								wrapup_time_elapsed = (switch_micro_time_now() - wrapup_time_started) / 1000;
3581 								if (wrapup_time_elapsed > fifo_consumer_wrapup_time) {
3582 									break;
3583 								} else {
3584 									wrapup_time_remaining = fifo_consumer_wrapup_time - wrapup_time_elapsed + 100;
3585 								}
3586 							}
3587 
3588 							switch_ivr_collect_digits_count(session, buf, sizeof(buf) - 1, 1, fifo_consumer_wrapup_key, &terminator, 0, 0,
3589 															(uint32_t) wrapup_time_remaining);
3590 							if ((terminator == *fifo_consumer_wrapup_key) || !(switch_channel_ready(channel))) {
3591 								break;
3592 							}
3593 						}
3594 					} else if ((zstr(fifo_consumer_wrapup_key) || !strcmp(buf, fifo_consumer_wrapup_key))) {
3595 						while (switch_channel_ready(channel)) {
3596 							wrapup_time_elapsed = (switch_micro_time_now() - wrapup_time_started) / 1000;
3597 							if (wrapup_time_elapsed > fifo_consumer_wrapup_time) {
3598 								break;
3599 							}
3600 							switch_yield(500);
3601 						}
3602 					}
3603 				}
3604 
3605 				switch_channel_set_variable(channel, "fifo_status", "WAITING");
3606 			}
3607 
3608 			if (do_wait && switch_channel_ready(channel)) {
3609 				if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3610 					switch_channel_event_set_data(channel, event);
3611 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3612 					switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_reentrance");
3613 					switch_event_fire(&event);
3614 				}
3615 			}
3616 		}
3617 
3618 		if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
3619 			switch_channel_event_set_data(channel, event);
3620 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
3621 			switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop");
3622 			switch_event_fire(&event);
3623 		}
3624 
3625 		if (do_wait) {
3626 			for (i = 0; i < node_count; i++) {
3627 				if (!(node = node_list[i])) {
3628 					continue;
3629 				}
3630 				switch_mutex_lock(node->mutex);
3631 				switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session));
3632 				node->consumer_count--;
3633 				switch_mutex_unlock(node->mutex);
3634 			}
3635 		}
3636 
3637 		if (outbound_id && switch_channel_up(channel)) {
3638 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s is still alive, tracking call.\n", switch_channel_get_name(channel));
3639 			fifo_track_call_function(session, outbound_id);
3640 		}
3641 	}
3642 
3643   done:
3644 
3645 	if (!consumer && in_table) {
3646 		fifo_caller_del(switch_core_session_get_uuid(session));
3647 	}
3648 
3649 	if (switch_true(switch_channel_get_variable(channel, "fifo_destroy_after_use"))) {
3650 		do_destroy = 1;
3651 	}
3652 
3653 	switch_mutex_lock(globals.mutex);
3654 	for (i = 0; i < node_count; i++) {
3655 		if (!(node = node_list[i])) {
3656 			continue;
3657 		}
3658 		switch_thread_rwlock_unlock(node->rwlock);
3659 
3660 		if (node->ready == 1 && do_destroy && node_caller_count(node) == 0 && node->consumer_count == 0) {
3661 			switch_core_hash_delete(globals.fifo_hash, node->name);
3662 			node->ready = 0;
3663 		}
3664 	}
3665 	switch_mutex_unlock(globals.mutex);
3666 
3667 	switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
3668 
3669 	switch_core_media_bug_resume(session);
3670 }
3671 
3672 struct xml_helper {
3673 	switch_xml_t xml;
3674 	fifo_node_t *node;
3675 	char *container;
3676 	char *tag;
3677 	int cc_off;
3678 	int row_off;
3679 	int verbose;
3680 };
3681 
xml_callback(void * pArg,int argc,char ** argv,char ** columnNames)3682 static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
3683 {
3684 	struct xml_helper *h = (struct xml_helper *) pArg;
3685 	switch_xml_t x_out;
3686 	int c_off = 0;
3687 	char exp_buf[128] = { 0 };
3688 	switch_time_exp_t tm;
3689 	switch_time_t etime = 0;
3690 	char atime[128] = "";
3691 	char *expires = exp_buf, *tb = atime;
3692 	int arg = 0;
3693 
3694 	for(arg = 0; arg < argc; arg++) {
3695 		if (!argv[arg]) {
3696 			argv[arg] = "";
3697 		}
3698 	}
3699 
3700 	if (argv[7]) {
3701 		if ((etime = atol(argv[7]))) {
3702 			switch_size_t retsize;
3703 
3704 			switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3705 			switch_strftime_nocheck(exp_buf, &retsize, sizeof(exp_buf), "%Y-%m-%d %T", &tm);
3706 		} else {
3707 			switch_set_string(exp_buf, "now");
3708 		}
3709 	}
3710 
3711 	if (atoi(argv[13])) {
3712 		arg = 17;
3713 	} else {
3714 		arg = 18;
3715 	}
3716 
3717 	if ((etime = atol(argv[arg]))) {
3718 		switch_size_t retsize;
3719 		switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3720 		switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3721 	} else {
3722 		switch_set_string(atime, "now");
3723 	}
3724 
3725 	x_out = switch_xml_add_child_d(h->xml, h->tag, c_off++);
3726 	switch_xml_set_attr_d(x_out, "simo", argv[3]);
3727 	switch_xml_set_attr_d(x_out, "use_count", argv[4]);
3728 	switch_xml_set_attr_d(x_out, "timeout", argv[5]);
3729 	switch_xml_set_attr_d(x_out, "lag", argv[6]);
3730 	switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]);
3731 	switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]);
3732 	switch_xml_set_attr_d(x_out, "taking-calls", argv[13]);
3733 	switch_xml_set_attr_d(x_out, "status", argv[14]);
3734 
3735 	switch_xml_set_attr_d(x_out, "outbound-call-total-count", argv[15]);
3736 	switch_xml_set_attr_d(x_out, "outbound-fail-total-count", argv[16]);
3737 
3738 	if (arg == 17) {
3739 		switch_xml_set_attr_d(x_out, "logged-on-since", tb);
3740 	} else {
3741 		switch_xml_set_attr_d(x_out, "logged-off-since", tb);
3742 	}
3743 
3744 	switch_xml_set_attr_d(x_out, "manual-calls-out-count", argv[19]);
3745 	switch_xml_set_attr_d(x_out, "manual-calls-in-count", argv[20]);
3746 	switch_xml_set_attr_d(x_out, "manual-calls-out-total-count", argv[21]);
3747 	switch_xml_set_attr_d(x_out, "manual-calls-in-total-count", argv[22]);
3748 
3749 	if (argc > 23) {
3750 		switch_xml_set_attr_d(x_out, "ring-count", argv[23]);
3751 
3752 		if ((etime = atol(argv[24]))) {
3753 			switch_size_t retsize;
3754 			switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3755 			switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3756 		} else {
3757 			switch_set_string(atime, "never");
3758 		}
3759 
3760 		switch_xml_set_attr_d(x_out, "start-time", tb);
3761 
3762 		if ((etime = atol(argv[25]))) {
3763 			switch_size_t retsize;
3764 			switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3765 			switch_strftime_nocheck(atime, &retsize, sizeof(atime), "%Y-%m-%d %T", &tm);
3766 		} else {
3767 			switch_set_string(atime, "never");
3768 		}
3769 
3770 		switch_xml_set_attr_d(x_out, "stop-time", tb);
3771 	}
3772 
3773 	switch_xml_set_attr_d(x_out, "next-available", expires);
3774 
3775 	switch_xml_set_txt_d(x_out, argv[2]);
3776 
3777 	return 0;
3778 }
3779 
xml_outbound(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3780 static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3781 {
3782 	struct xml_helper h = { 0 };
3783 	char *sql;
3784 
3785 	if (!strcmp(node->name, MANUAL_QUEUE_NAME)) {
3786 		sql = switch_mprintf("select uuid, '%q', originate_string, simo_count, use_count, timeout,"
3787 							 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count,"
3788 							 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
3789 							 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count from fifo_outbound "
3790 							 "group by "
3791 							 "uuid, originate_string, simo_count, use_count, timeout,"
3792 							 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count,"
3793 							 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time,"
3794 							 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count",
3795 							 MANUAL_QUEUE_NAME);
3796 	} else {
3797 		sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
3798 							 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, "
3799 							 "hostname, taking_calls, status, outbound_call_total_count, outbound_fail_total_count, active_time, inactive_time, "
3800 							 "manual_calls_out_count, manual_calls_in_count, manual_calls_out_total_count, manual_calls_in_total_count,"
3801 							 "ring_count,start_time,stop_time "
3802 							 "from fifo_outbound where fifo_name = '%q'", node->name);
3803 	}
3804 
3805 	h.xml = xml;
3806 	h.node = node;
3807 	h.container = container;
3808 	h.tag = tag;
3809 	h.cc_off = cc_off;
3810 	h.row_off = 0;
3811 	h.verbose = verbose;
3812 
3813 	h.xml = switch_xml_add_child_d(h.xml, h.container, h.cc_off++);
3814 
3815 	fifo_execute_sql_callback(globals.sql_mutex, sql, xml_callback, &h);
3816 
3817 	switch_safe_free(sql);
3818 
3819 	return h.cc_off;
3820 }
3821 
xml_bridge_callback(void * pArg,int argc,char ** argv,char ** columnNames)3822 static int xml_bridge_callback(void *pArg, int argc, char **argv, char **columnNames)
3823 {
3824 	struct xml_helper *h = (struct xml_helper *) pArg;
3825 	switch_xml_t x_bridge, x_var, x_caller, x_consumer, x_cdr;
3826 	char exp_buf[128] = "";
3827 	switch_time_exp_t tm;
3828 	switch_time_t etime = 0;
3829 	int off = 0, tag_off = 0;
3830 	switch_core_session_t *session;
3831 	char url_buf[512] = "";
3832 	char *encoded;
3833 
3834 	if ((etime = atol(argv[6]))) {
3835 		switch_size_t retsize;
3836 
3837 		switch_time_exp_lt(&tm, switch_time_from_sec(etime));
3838 		switch_strftime_nocheck(exp_buf, &retsize, sizeof(exp_buf), "%Y-%m-%d %T", &tm);
3839 	} else {
3840 		switch_set_string(exp_buf, "now");
3841 	}
3842 
3843 	x_bridge = switch_xml_add_child_d(h->xml, h->tag, h->row_off++);
3844 
3845 	switch_xml_set_attr_d(x_bridge, "fifo_name", argv[0]);
3846 	switch_xml_set_attr_d_buf(x_bridge, "bridge_start", exp_buf);
3847 	switch_xml_set_attr_d(x_bridge, "bridge_start_epoch", argv[6]);
3848 
3849 	x_caller = switch_xml_add_child_d(x_bridge, "caller", tag_off++);
3850 
3851 	switch_xml_set_attr_d(x_caller, "uuid", argv[1]);
3852 
3853 	encoded = switch_url_encode(argv[2], url_buf, sizeof(url_buf));
3854 	switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
3855 
3856 	encoded = switch_url_encode(argv[3], url_buf, sizeof(url_buf));
3857 	switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
3858 
3859 	if (h->verbose) {
3860 		if ((session = switch_core_session_locate(argv[1]))) {
3861 			x_cdr = switch_xml_add_child_d(x_caller, "cdr", 0);
3862 			switch_ivr_generate_xml_cdr(session, &x_cdr);
3863 			switch_core_session_rwunlock(session);
3864 		}
3865 	}
3866 
3867 	off = 0;
3868 
3869 	x_consumer = switch_xml_add_child_d(x_bridge, "consumer", tag_off++);
3870 
3871 	x_var = switch_xml_add_child_d(x_consumer, "uuid", off++);
3872 	switch_xml_set_txt_d(x_var, argv[4]);
3873 	x_var = switch_xml_add_child_d(x_consumer, "outgoing_uuid", off++);
3874 	switch_xml_set_txt_d(x_var, argv[5]);
3875 
3876 	if (h->verbose) {
3877 		if ((session = switch_core_session_locate(argv[1]))) {
3878 			x_cdr = switch_xml_add_child_d(x_consumer, "cdr", 0);
3879 			switch_ivr_generate_xml_cdr(session, &x_cdr);
3880 			switch_core_session_rwunlock(session);
3881 		}
3882 	}
3883 
3884 	return 0;
3885 }
3886 
xml_bridges(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3887 static int xml_bridges(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3888 {
3889 	struct xml_helper h = { 0 };
3890 	char *sql = switch_mprintf("select "
3891 							   "fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start "
3892 							   "from fifo_bridge where fifo_name = '%q'", node->name);
3893 
3894 	h.xml = xml;
3895 	h.node = node;
3896 	h.container = container;
3897 	h.tag = tag;
3898 	h.cc_off = cc_off;
3899 	h.row_off = 0;
3900 	h.verbose = verbose;
3901 
3902 	h.xml = switch_xml_add_child_d(h.xml, h.container, h.cc_off++);
3903 
3904 	fifo_execute_sql_callback(globals.sql_mutex, sql, xml_bridge_callback, &h);
3905 
3906 	switch_safe_free(sql);
3907 
3908 	return h.cc_off;
3909 }
3910 
xml_hash(switch_xml_t xml,switch_hash_t * hash,char * container,char * tag,int cc_off,int verbose)3911 static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char *tag, int cc_off, int verbose)
3912 {
3913 	switch_xml_t x_tmp, x_caller, x_cp;
3914 	switch_hash_index_t *hi;
3915 	switch_core_session_t *session;
3916 	switch_channel_t *channel;
3917 	void *val;
3918 	const void *var;
3919 
3920 	x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
3921 	switch_assert(x_tmp);
3922 
3923 	for (hi = switch_core_hash_first(hash); hi; hi = switch_core_hash_next(&hi)) {
3924 		int c_off = 0, d_off = 0;
3925 		const char *status;
3926 		const char *ts;
3927 		char url_buf[512] = "";
3928 		char *encoded;
3929 
3930 		switch_core_hash_this(hi, &var, NULL, &val);
3931 		session = (switch_core_session_t *) val;
3932 		channel = switch_core_session_get_channel(session);
3933 		x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
3934 		switch_assert(x_caller);
3935 
3936 		switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
3937 
3938 		if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
3939 			switch_xml_set_attr_d(x_caller, "status", status);
3940 		}
3941 
3942 		if ((status = switch_channel_get_variable(channel, "caller_id_name"))) {
3943 			encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
3944 			switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
3945 		}
3946 
3947 		if ((status = switch_channel_get_variable(channel, "caller_id_number"))) {
3948 			encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
3949 			switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
3950 		}
3951 
3952 		if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
3953 			switch_xml_set_attr_d(x_caller, "timestamp", ts);
3954 		}
3955 
3956 		if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
3957 			switch_xml_set_attr_d(x_caller, "target", ts);
3958 		}
3959 
3960 		if (verbose) {
3961 			if (!(x_cp = switch_xml_add_child_d(x_caller, "cdr", d_off++))) {
3962 				abort();
3963 			}
3964 
3965 			switch_ivr_generate_xml_cdr(session, &x_cp);
3966 		}
3967 	}
3968 
3969 	return cc_off;
3970 }
3971 
xml_caller(switch_xml_t xml,fifo_node_t * node,char * container,char * tag,int cc_off,int verbose)3972 static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
3973 {
3974 	switch_xml_t x_tmp, x_caller, x_cp;
3975 	int i, x;
3976 	switch_core_session_t *session;
3977 	switch_channel_t *channel;
3978 
3979 	x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
3980 	switch_assert(x_tmp);
3981 
3982 	for (x = 0; x < MAX_PRI; x++) {
3983 		fifo_queue_t *q = node->fifo_list[x];
3984 
3985 		switch_mutex_lock(q->mutex);
3986 
3987 		for (i = 0; i < q->idx; i++) {
3988 			int c_off = 0, d_off = 0;
3989 			const char *status;
3990 			const char *ts;
3991 			const char *uuid = switch_event_get_header(q->data[i], "unique-id");
3992 			char sl[30] = "";
3993 			char url_buf[512] = "";
3994 			char *encoded;
3995 
3996 			if (!uuid) {
3997 				continue;
3998 			}
3999 
4000 			if (!(session = switch_core_session_locate(uuid))) {
4001 				continue;
4002 			}
4003 
4004 			channel = switch_core_session_get_channel(session);
4005 			x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
4006 			switch_assert(x_caller);
4007 
4008 			switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
4009 
4010 			if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
4011 				switch_xml_set_attr_d(x_caller, "status", status);
4012 			}
4013 
4014 			if ((status = switch_channel_get_variable(channel, "caller_id_name"))) {
4015 				encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
4016 				switch_xml_set_attr_d(x_caller, "caller_id_name", encoded);
4017 			}
4018 
4019 			if ((status = switch_channel_get_variable(channel, "caller_id_number"))) {
4020 				encoded = switch_url_encode(status, url_buf, sizeof(url_buf));
4021 				switch_xml_set_attr_d(x_caller, "caller_id_number", encoded);
4022 			}
4023 
4024 			if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
4025 				switch_xml_set_attr_d(x_caller, "timestamp", ts);
4026 			}
4027 
4028 			if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
4029 				switch_xml_set_attr_d(x_caller, "target", ts);
4030 			}
4031 
4032 			if ((ts = switch_channel_get_variable(channel, "fifo_position"))) {
4033 				switch_xml_set_attr_d(x_caller, "position", ts);
4034 			}
4035 
4036 			switch_snprintf(sl, sizeof(sl), "%d", x);
4037 			switch_xml_set_attr_d_buf(x_caller, "slot", sl);
4038 
4039 			if (verbose) {
4040 				if (!(x_cp = switch_xml_add_child_d(x_caller, "cdr", d_off++))) {
4041 					abort();
4042 				}
4043 
4044 				switch_ivr_generate_xml_cdr(session, &x_cp);
4045 			}
4046 
4047 			switch_core_session_rwunlock(session);
4048 			session = NULL;
4049 		}
4050 
4051 		switch_mutex_unlock(q->mutex);
4052 	}
4053 
4054 	return cc_off;
4055 }
4056 
list_node(fifo_node_t * node,switch_xml_t x_report,int * off,int verbose)4057 static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int verbose)
4058 {
4059 	switch_xml_t x_fifo;
4060 	int cc_off = 0;
4061 	char buffer[35];
4062 	char *tmp = buffer;
4063 
4064 	x_fifo = switch_xml_add_child_d(x_report, "fifo", (*off)++);;
4065 	switch_assert(x_fifo);
4066 
4067 	switch_xml_set_attr_d(x_fifo, "name", node->name);
4068 	switch_snprintf(tmp, sizeof(buffer), "%d", node->consumer_count);
4069 	switch_xml_set_attr_d(x_fifo, "consumer_count", tmp);
4070 	switch_snprintf(tmp, sizeof(buffer), "%d", node_caller_count(node));
4071 	switch_xml_set_attr_d(x_fifo, "caller_count", tmp);
4072 	switch_snprintf(tmp, sizeof(buffer), "%d", node_caller_count(node));
4073 	switch_xml_set_attr_d(x_fifo, "waiting_count", tmp);
4074 	switch_snprintf(tmp, sizeof(buffer), "%u", node->importance);
4075 	switch_xml_set_attr_d(x_fifo, "importance", tmp);
4076 
4077 	switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_per_cycle);
4078 	switch_xml_set_attr_d(x_fifo, "outbound_per_cycle", tmp);
4079 
4080 	switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_per_cycle_min);
4081 	switch_xml_set_attr_d(x_fifo, "outbound_per_cycle_min", tmp);
4082 
4083 	switch_snprintf(tmp, sizeof(buffer), "%u", node->ring_timeout);
4084 	switch_xml_set_attr_d(x_fifo, "ring_timeout", tmp);
4085 
4086 	switch_snprintf(tmp, sizeof(buffer), "%u", node->default_lag);
4087 	switch_xml_set_attr_d(x_fifo, "default_lag", tmp);
4088 
4089 	switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_priority);
4090 	switch_xml_set_attr_d(x_fifo, "outbound_priority", tmp);
4091 
4092 	switch_xml_set_attr_d(x_fifo, "outbound_strategy", print_strategy(node->outbound_strategy));
4093 
4094 	cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose);
4095 	cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose);
4096 	cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose);
4097 	xml_bridges(x_fifo, node, "bridges", "bridge", cc_off, verbose);
4098 }
4099 
dump_hash(switch_hash_t * hash,switch_stream_handle_t * stream)4100 void dump_hash(switch_hash_t *hash, switch_stream_handle_t *stream)
4101 {
4102 	switch_hash_index_t *hi;
4103 	void *val;
4104 	const void *var;
4105 
4106 	switch_mutex_lock(globals.mutex);
4107 	for (hi = switch_core_hash_first(hash); hi; hi = switch_core_hash_next(&hi)) {
4108 		switch_core_hash_this(hi, &var, NULL, &val);
4109 		stream->write_function(stream, "  %s\n", (char *)var);
4110 	}
4111 	switch_mutex_unlock(globals.mutex);
4112 }
4113 
node_dump(switch_stream_handle_t * stream)4114 void node_dump(switch_stream_handle_t *stream)
4115 {
4116 	switch_hash_index_t *hi;
4117 	fifo_node_t *node;
4118 	void *val;
4119 	switch_mutex_lock(globals.mutex);
4120 	for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4121 		switch_core_hash_this(hi, NULL, NULL, &val);
4122 		if ((node = (fifo_node_t *) val)) {
4123 			stream->write_function(stream, "node: %s\n"
4124 								   " outbound_name: %s\n"
4125 								   " outbound_per_cycle: %d"
4126 								   " outbound_per_cycle_min: %d"
4127 								   " outbound_priority: %d"
4128 								   " outbound_strategy: %s\n"
4129 								   " has_outbound: %d\n"
4130 								   " outbound_priority: %d\n"
4131 								   " busy: %d\n"
4132 								   " ready: %d\n"
4133 								   " waiting: %d\n"
4134 								   ,
4135 								   node->name, node->outbound_name, node->outbound_per_cycle, node->outbound_per_cycle_min,
4136 								   node->outbound_priority, print_strategy(node->outbound_strategy),
4137 								   node->has_outbound,
4138 								   node->outbound_priority,
4139 								   node->busy,
4140 								   node->ready,
4141 								   node_caller_count(node)
4142 
4143 								   );
4144 		}
4145 	}
4146 
4147 	stream->write_function(stream, " caller_orig:\n");
4148 	dump_hash(globals.caller_orig_hash, stream);
4149 	stream->write_function(stream, " consumer_orig:\n");
4150 	dump_hash(globals.consumer_orig_hash, stream);
4151 	stream->write_function(stream, " bridge:\n");
4152 	dump_hash(globals.bridge_hash, stream);
4153 
4154 	switch_mutex_unlock(globals.mutex);
4155 }
4156 
4157 #define FIFO_API_SYNTAX "list|list_verbose|count|debug|status|has_outbound|importance [<fifo name>]|reparse [del_all]"
SWITCH_STANDARD_API(fifo_api_function)4158 SWITCH_STANDARD_API(fifo_api_function)
4159 {
4160 	fifo_node_t *node;
4161 	char *data = NULL;
4162 	int argc = 0;
4163 	char *argv[5] = { 0 };
4164 	switch_hash_index_t *hi;
4165 	void *val;
4166 	const void *var;
4167 	int x = 0, verbose = 0;
4168 
4169 	if (!globals.running) {
4170 		return SWITCH_STATUS_FALSE;
4171 	}
4172 
4173 	if (!zstr(cmd)) {
4174 		data = strdup(cmd);
4175 		switch_assert(data);
4176 	}
4177 
4178 	switch_mutex_lock(globals.mutex);
4179 
4180 	if (zstr(cmd) || (argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 1 || !argv[0]) {
4181 		stream->write_function(stream, "%s\n", FIFO_API_SYNTAX);
4182 		goto done;
4183 	}
4184 
4185 	if (!strcasecmp(argv[0], "status")) {
4186 		node_dump(stream);
4187 		goto done;
4188 	}
4189 
4190 	if (!strcasecmp(argv[0], "debug")) {
4191 		if (argv[1]) {
4192 			if ((globals.debug = atoi(argv[1])) < 0) {
4193 				globals.debug = 0;
4194 			}
4195 		}
4196 		stream->write_function(stream, "debug %d\n", globals.debug);
4197 		goto done;
4198 	}
4199 
4200 	verbose = !strcasecmp(argv[0], "list_verbose");
4201 
4202 	if (!strcasecmp(argv[0], "reparse")) {
4203 		load_config(1, argv[1] && !strcasecmp(argv[1], "del_all"));
4204 		stream->write_function(stream, "+OK\n");
4205 		goto done;
4206 	}
4207 
4208 	if (!strcasecmp(argv[0], "list") || verbose) {
4209 		char *xml_text = NULL;
4210 		switch_xml_t x_report = switch_xml_new("fifo_report");
4211 		switch_assert(x_report);
4212 
4213 		if (argc < 2) {
4214 			for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4215 				switch_core_hash_this(hi, &var, NULL, &val);
4216 				node = (fifo_node_t *) val;
4217 
4218 				switch_mutex_lock(node->mutex);
4219 				list_node(node, x_report, &x, verbose);
4220 				switch_mutex_unlock(node->mutex);
4221 			}
4222 		} else {
4223 			if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4224 				switch_mutex_lock(node->mutex);
4225 				list_node(node, x_report, &x, verbose);
4226 				switch_mutex_unlock(node->mutex);
4227 			}
4228 		}
4229 		xml_text = switch_xml_toxml(x_report, SWITCH_FALSE);
4230 		switch_assert(xml_text);
4231 		stream->write_function(stream, "%s\n", xml_text);
4232 		switch_xml_free(x_report);
4233 		switch_safe_free(xml_text);
4234 	} else if (!strcasecmp(argv[0], "importance")) {
4235 		if (argv[1] && (node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4236 			int importance = 0;
4237 			if (argc > 2) {
4238 				importance = atoi(argv[2]);
4239 				if (importance < 0) {
4240 					importance = 0;
4241 				}
4242 				node->importance = importance;
4243 			}
4244 			stream->write_function(stream, "importance: %u\n", node->importance);
4245 		} else {
4246 			stream->write_function(stream, "no fifo by that name\n");
4247 		}
4248 	} else if (!strcasecmp(argv[0], "count")) {
4249 		if (argc < 2) {
4250 			for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4251 				switch_core_hash_this(hi, &var, NULL, &val);
4252 				node = (fifo_node_t *) val;
4253 				switch_mutex_lock(node->update_mutex);
4254 				stream->write_function(stream, "%s:%d:%d:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), node->member_count, node->ring_consumer_count, node_idle_consumers(node));
4255 				switch_mutex_unlock(node->update_mutex);
4256 				x++;
4257 			}
4258 
4259 			if (!x) {
4260 				stream->write_function(stream, "none\n");
4261 			}
4262 		} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4263 			switch_mutex_lock(node->update_mutex);
4264 			stream->write_function(stream, "%s:%d:%d:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), node->member_count, node->ring_consumer_count, node_idle_consumers(node));
4265 			switch_mutex_unlock(node->update_mutex);
4266 		} else {
4267 			stream->write_function(stream, "none\n");
4268 		}
4269 	} else if (!strcasecmp(argv[0], "has_outbound")) {
4270 		if (argc < 2) {
4271 			for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4272 				switch_core_hash_this(hi, &var, NULL, &val);
4273 				node = (fifo_node_t *) val;
4274 				switch_mutex_lock(node->update_mutex);
4275 				stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
4276 				switch_mutex_unlock(node->update_mutex);
4277 				x++;
4278 			}
4279 
4280 			if (!x) {
4281 				stream->write_function(stream, "none\n");
4282 			}
4283 		} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
4284 			switch_mutex_lock(node->update_mutex);
4285 			stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
4286 			switch_mutex_unlock(node->update_mutex);
4287 		} else {
4288 			stream->write_function(stream, "none\n");
4289 		}
4290 	} else {
4291 		stream->write_function(stream, "-ERR Usage: %s\n", FIFO_API_SYNTAX);
4292 	}
4293 
4294   done:
4295 
4296 	switch_safe_free(data);
4297 
4298 	switch_mutex_unlock(globals.mutex);
4299 	return SWITCH_STATUS_SUCCESS;
4300 }
4301 
4302 const char outbound_sql[] =
4303 	"create table fifo_outbound (\n"
4304 	" uuid varchar(255),\n"
4305 	" fifo_name varchar(255),\n"
4306 	" originate_string varchar(255),\n"
4307 	" simo_count integer,\n"
4308 	" use_count integer,\n"
4309 	" timeout integer,\n"
4310 	" lag integer,\n"
4311 	" next_avail integer not null default 0,\n"
4312 	" expires integer not null default 0,\n"
4313 	" static integer not null default 0,\n"
4314 	" outbound_call_count integer not null default 0,\n"
4315 	" outbound_fail_count integer not null default 0,\n"
4316 	" hostname varchar(255),\n"
4317 	" taking_calls integer not null default 1,\n"
4318 	" status varchar(255),\n"
4319 	" outbound_call_total_count integer not null default 0,\n"
4320 	" outbound_fail_total_count integer not null default 0,\n"
4321 	" active_time integer not null default 0,\n"
4322 	" inactive_time integer not null default 0,\n"
4323 	" manual_calls_out_count integer not null default 0,\n"
4324 	" manual_calls_in_count integer not null default 0,\n"
4325 	" manual_calls_out_total_count integer not null default 0,\n"
4326 	" manual_calls_in_total_count integer not null default 0,\n"
4327 	" ring_count integer not null default 0,\n"
4328 	" start_time integer not null default 0,\n"
4329 	" stop_time integer not null default 0\n"
4330 	");\n";
4331 
4332 const char bridge_sql[] =
4333 	"create table fifo_bridge (\n"
4334 	" fifo_name varchar(1024) not null,\n"
4335 	" caller_uuid varchar(255) not null,\n"
4336 	" caller_caller_id_name varchar(255),\n"
4337 	" caller_caller_id_number varchar(255),\n"
4338 
4339 	" consumer_uuid varchar(255) not null,\n"
4340 	" consumer_outgoing_uuid varchar(255),\n"
4341 	" bridge_start integer\n"
4342 	");\n"
4343 ;
4344 
4345 const char callers_sql[] =
4346 	"create table fifo_callers (\n"
4347 	" fifo_name varchar(255) not null,\n"
4348 	" uuid varchar(255) not null,\n"
4349 	" caller_caller_id_name varchar(255),\n"
4350 	" caller_caller_id_number varchar(255),\n"
4351 	" timestamp integer\n"
4352 	");\n"
4353 ;
4354 
extract_fifo_outbound_uuid(char * string,char * uuid,switch_size_t len)4355 static void extract_fifo_outbound_uuid(char *string, char *uuid, switch_size_t len)
4356 {
4357 	switch_event_t *ovars;
4358 	char *parsed = NULL;
4359 	const char *fifo_outbound_uuid;
4360 
4361 	switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
4362 	switch_event_create_brackets(string, '{', '}', ',', &ovars, &parsed, SWITCH_TRUE);
4363 
4364 	if ((fifo_outbound_uuid = switch_event_get_header(ovars, "fifo_outbound_uuid"))) {
4365 		switch_snprintf(uuid, len, "%s", fifo_outbound_uuid);
4366 	}
4367 
4368 	switch_safe_free(parsed);
4369 	switch_event_destroy(&ovars);
4370 }
4371 
read_config_file(switch_xml_t * xml,switch_xml_t * cfg)4372 static switch_status_t read_config_file(switch_xml_t *xml, switch_xml_t *cfg) {
4373 	const char *cf = "fifo.conf";
4374 	switch_xml_t settings;
4375 
4376 	if (!(*xml = switch_xml_open_cfg(cf, cfg, NULL))) {
4377 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
4378 		return SWITCH_STATUS_TERM;
4379 	}
4380 	if ((settings = switch_xml_child(*cfg, "settings"))) {
4381 		switch_xml_t param;
4382 		for (param = switch_xml_child(settings, "param"); param; param = param->next) {
4383 			char *var = (char*)switch_xml_attr_soft(param, "name");
4384 			char *val = (char*)switch_xml_attr_soft(param, "value");
4385 
4386 			if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) {
4387 				globals.default_strategy = parse_strategy(val);
4388 			} else if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
4389 				if (switch_database_available(val) == SWITCH_STATUS_SUCCESS) {
4390 					switch_set_string(globals.odbc_dsn, val);
4391 				} else {
4392 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE IS NOT AVAILABLE!\n");
4393 				}
4394 			} else if (!strcasecmp(var, "dbname") && !zstr(val)) {
4395 				globals.dbname = switch_core_strdup(globals.pool, val);
4396 			} else if (!strcasecmp(var, "allow-transcoding") && !zstr(val)) {
4397 				globals.allow_transcoding = switch_true(val);
4398 			} else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) {
4399 				globals.pre_trans_execute = switch_core_strdup(globals.pool, val);
4400 			} else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) {
4401 				globals.post_trans_execute = switch_core_strdup(globals.pool, val);
4402 			} else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) {
4403 				globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val);
4404 			} else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) {
4405 				globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val);
4406 			} else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
4407 				globals.delete_all_members_on_startup = switch_true(val);
4408 			}
4409 		}
4410 	}
4411 	return SWITCH_STATUS_SUCCESS;
4412 }
4413 
4414 /*!\brief Load or reload the configuration
4415  *
4416  * On the initial load, non-static members are preserved unless the
4417  * parameter `delete-all-outbound-members-on-startup` is set.  The
4418  * parameter `del_all` is ignored in this case.
4419  *
4420  * On reload, non-static members are preserved unless `del_all` is
4421  * set.
4422  *
4423  * \param reload true if we're reloading the config
4424  * \param del_all delete all outbound members when reloading;
4425  *   not used unless reload is true
4426  */
load_config(int reload,int del_all)4427 static switch_status_t load_config(int reload, int del_all)
4428 {
4429 	switch_xml_t xml, cfg, fifo, fifos, member;
4430 	switch_status_t status = SWITCH_STATUS_SUCCESS;
4431 	char *sql;
4432 	switch_cache_db_handle_t *dbh = NULL;
4433 	fifo_node_t *node;
4434 
4435 	strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);
4436 	globals.dbname = "fifo";
4437 	globals.default_strategy = NODE_STRATEGY_RINGALL;
4438 	globals.delete_all_members_on_startup = SWITCH_FALSE;
4439 
4440 	if ((status = read_config_file(&xml, &cfg)) != SWITCH_STATUS_SUCCESS) return status;
4441 
4442 	if (!(dbh = fifo_get_db_handle())) {
4443 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot open DB!\n");
4444 		goto done;
4445 	}
4446 
4447 	if (!reload) {
4448 		switch_sql_queue_manager_init_name("fifo",
4449 										   &globals.qm,
4450 										   2,
4451 										   !zstr(globals.odbc_dsn) ? globals.odbc_dsn : globals.dbname,
4452 										   SWITCH_MAX_TRANS,
4453 										   globals.pre_trans_execute,
4454 										   globals.post_trans_execute,
4455 										   globals.inner_pre_trans_execute,
4456 										   globals.inner_post_trans_execute);
4457 		switch_sql_queue_manager_start(globals.qm);
4458 
4459 		switch_cache_db_test_reactive(dbh, "delete from fifo_outbound where static = 1 or taking_calls < 0 or stop_time < 0",
4460 									  "drop table fifo_outbound", outbound_sql);
4461 		switch_cache_db_test_reactive(dbh, "delete from fifo_bridge", "drop table fifo_bridge", bridge_sql);
4462 		switch_cache_db_test_reactive(dbh, "delete from fifo_callers", "drop table fifo_callers", callers_sql);
4463 	}
4464 
4465 	switch_cache_db_release_db_handle(&dbh);
4466 
4467 	if (!reload) {
4468 		char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0";
4469 		fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE);
4470 		fifo_init_use_count();
4471 	}
4472 
4473 	if (reload) {
4474 		switch_hash_index_t *hi;
4475 		fifo_node_t *node;
4476 		void *val;
4477 		switch_mutex_lock(globals.mutex);
4478 		for (hi = switch_core_hash_first(globals.fifo_hash); hi; hi = switch_core_hash_next(&hi)) {
4479 			switch_core_hash_this(hi, NULL, NULL, &val);
4480 			if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) {
4481 				node->ready = -1;
4482 			}
4483 		}
4484 		switch_mutex_unlock(globals.mutex);
4485 	}
4486 
4487 	if ((reload && del_all) || (!reload && globals.delete_all_members_on_startup)) {
4488 		sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname);
4489 	} else {
4490 		sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
4491 	}
4492 
4493 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4494 
4495 	if (!switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME)) {
4496 		node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex);
4497 		node->ready = 2;
4498 		node->is_static = 0;
4499 	}
4500 
4501 	if ((fifos = switch_xml_child(cfg, "fifos"))) {
4502 		for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) {
4503 			const char *name, *sp;
4504 			const char *val;
4505 			int i, importance = 0;
4506 
4507 			if (!(name = switch_xml_attr(fifo, "name"))) {
4508 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fifo has no name!\n");
4509 				continue;
4510 			}
4511 
4512 			if (!strcasecmp(name, MANUAL_QUEUE_NAME)) {
4513 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s is a reserved name, use another name please.\n", MANUAL_QUEUE_NAME);
4514 				continue;
4515 			}
4516 
4517 			if ((val = switch_xml_attr(fifo, "importance")) && !((i = atoi(val)) < 0)) {
4518 				importance = i;
4519 			}
4520 
4521 			switch_mutex_lock(globals.mutex);
4522 			if (!(node = switch_core_hash_find(globals.fifo_hash, name))) {
4523 				node = create_node(name, importance, globals.sql_mutex);
4524 			}
4525 
4526 			if ((val = switch_xml_attr(fifo, "outbound_name"))) {
4527 				node->outbound_name = switch_core_strdup(node->pool, val);
4528 			}
4529 
4530 			switch_mutex_unlock(globals.mutex);
4531 			switch_assert(node);
4532 			switch_mutex_lock(node->mutex);
4533 
4534 			if ((sp = switch_xml_attr(fifo, "outbound_strategy"))) {
4535 				node->outbound_strategy = parse_strategy(sp);
4536 				node->has_outbound = 1;
4537 			}
4538 
4539 			node->outbound_per_cycle = 1;
4540 			if ((val = switch_xml_attr(fifo, "outbound_per_cycle"))) {
4541 				if (!((i = atoi(val)) < 0)) {
4542 					node->outbound_per_cycle = i;
4543 				}
4544 				node->has_outbound = 1;
4545 			}
4546 
4547 			node->outbound_per_cycle_min = 1;
4548 			if ((val = switch_xml_attr(fifo, "outbound_per_cycle_min"))) {
4549 				if (!((i = atoi(val)) < 0)) {
4550 					node->outbound_per_cycle_min = i;
4551 				}
4552 			}
4553 
4554 			if ((val = switch_xml_attr(fifo, "retry_delay"))) {
4555 				if ((i = atoi(val)) < 0) i = 0;
4556 				node->retry_delay = i;
4557 			}
4558 
4559 			node->outbound_priority = 5;
4560 			if ((val = switch_xml_attr(fifo, "outbound_priority"))) {
4561 				i = atoi(val);
4562 				if (!(i < 1 || i > 10)) {
4563 					node->outbound_priority = i;
4564 				}
4565 				node->has_outbound = 1;
4566 			}
4567 
4568 			node->ring_timeout = 60;
4569 			if ((val = switch_xml_attr(fifo, "outbound_ring_timeout"))) {
4570 				if ((i = atoi(val)) > 10) {
4571 					node->ring_timeout = i;
4572 				} else {
4573 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid ring_timeout: must be > 10 for queue %s\n", node->name);
4574 				}
4575 			}
4576 
4577 			node->default_lag = 30;
4578 			if ((val = switch_xml_attr(fifo, "outbound_default_lag"))) {
4579 				if ((i = atoi(val)) > 10) {
4580 					node->default_lag = i;
4581 				} else {
4582 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Invalid default_lag: must be > 10 for queue %s\n", node->name);
4583 				}
4584 			}
4585 
4586 			for (member = switch_xml_child(fifo, "member"); member; member = member->next) {
4587 				const char *simo, *taking_calls, *timeout, *lag;
4588 				int simo_i = 1, taking_calls_i = 1, timeout_i = 60, lag_i = 10;
4589 				char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4590 
4591 				if (switch_stristr("fifo_outbound_uuid=", member->txt)) {
4592 					extract_fifo_outbound_uuid(member->txt, digest, sizeof(digest));
4593 				} else {
4594 					switch_md5_string(digest, (void *) member->txt, strlen(member->txt));
4595 				}
4596 
4597 				if ((simo = switch_xml_attr_soft(member, "simo"))) {
4598 					simo_i = atoi(simo);
4599 				}
4600 
4601 				if ((taking_calls = switch_xml_attr_soft(member, "taking_calls"))
4602 					&& (taking_calls_i = atoi(taking_calls)) < 1) {
4603 					taking_calls_i = 1;
4604 				}
4605 
4606 				if ((timeout = switch_xml_attr_soft(member, "timeout"))
4607 					&& (timeout_i = atoi(timeout)) < 10) {
4608 					timeout_i = node->ring_timeout;
4609 				}
4610 
4611 				if ((lag = switch_xml_attr_soft(member, "lag"))
4612 					&& (lag_i = atoi(lag)) < 0) {
4613 					lag_i = node->default_lag;
4614 				}
4615 
4616 				sql = switch_mprintf("insert into fifo_outbound "
4617 									 "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
4618 									 "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname, taking_calls, "
4619 									 "active_time, inactive_time) "
4620 									 "values ('%q','%q','%q',%d,%d,%d,%d,0,0,1,0,0,'%q',%d,%ld,0)",
4621 									 digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, globals.hostname, taking_calls_i,
4622 									 (long) switch_epoch_time_now(NULL));
4623 				switch_assert(sql);
4624 				fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE);
4625 				node->has_outbound = 1;
4626 				node->member_count++;
4627 			}
4628 			node->ready = 1;
4629 			node->is_static = 1;
4630 			switch_mutex_unlock(node->mutex);
4631 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s configured\n", node->name);
4632 		}
4633 	}
4634 
4635   done:
4636 
4637 	switch_xml_free(xml);
4638 	if (reload) {
4639 		fifo_node_t *node;
4640 		switch_mutex_lock(globals.mutex);
4641 		for (node = globals.nodes; node; node = node->next) {
4642 			if (node->ready == -1) {
4643 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name);
4644 				switch_core_hash_delete(globals.fifo_hash, node->name);
4645 				node->ready = 0;
4646 			}
4647 		}
4648 		switch_mutex_unlock(globals.mutex);
4649 	}
4650 
4651 	return status;
4652 }
4653 
fifo_member_add(char * fifo_name,char * originate_string,int simo_count,int timeout,int lag,time_t expires,int taking_calls)4654 static void fifo_member_add(char *fifo_name, char *originate_string, int simo_count, int timeout, int lag, time_t expires, int taking_calls)
4655 {
4656 	char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4657 	char *sql, *name_dup, *p;
4658 	char outbound_count[80] = "";
4659 	callback_t cbt = { 0 };
4660 	fifo_node_t *node = NULL;
4661 
4662 	if (!fifo_name) return;
4663 
4664 	if (switch_stristr("fifo_outbound_uuid=", originate_string)) {
4665 		extract_fifo_outbound_uuid(originate_string, digest, sizeof(digest));
4666 	} else {
4667 		switch_md5_string(digest, (void *) originate_string, strlen(originate_string));
4668 	}
4669 
4670 	sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest);
4671 	switch_assert(sql);
4672 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4673 
4674 	switch_mutex_lock(globals.mutex);
4675 	if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
4676 		node = create_node(fifo_name, 0, globals.sql_mutex);
4677 		node->ready = 1;
4678 	}
4679 	switch_mutex_unlock(globals.mutex);
4680 
4681 	name_dup = strdup(fifo_name);
4682 	if ((p = strchr(name_dup, '@'))) {
4683 		*p = '\0';
4684 	}
4685 
4686 	sql = switch_mprintf("insert into fifo_outbound "
4687 						 "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
4688 						 "lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname, taking_calls, active_time, inactive_time) "
4689 						 "values ('%q','%q','%q',%d,%d,%d,%d,%d,%ld,0,0,0,'%q',%d,%ld,0)",
4690 						 digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls,
4691 						 (long)switch_epoch_time_now(NULL));
4692 	switch_assert(sql);
4693 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4694 	free(name_dup);
4695 
4696 	cbt.buf = outbound_count;
4697 	cbt.len = sizeof(outbound_count);
4698 	sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", fifo_name);
4699 	fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
4700 	node->member_count = atoi(outbound_count);
4701 	if (node->member_count > 0) {
4702 		node->has_outbound = 1;
4703 	} else {
4704 		node->has_outbound = 0;
4705 	}
4706 	switch_safe_free(sql);
4707 }
4708 
fifo_member_del(char * fifo_name,char * originate_string)4709 static void fifo_member_del(char *fifo_name, char *originate_string)
4710 {
4711 	char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 };
4712 	char *sql;
4713 	char outbound_count[80] = "";
4714 	callback_t cbt = { 0 };
4715 	fifo_node_t *node = NULL;
4716 
4717 	if (!fifo_name) return;
4718 
4719 	if (switch_stristr("fifo_outbound_uuid=", originate_string)) {
4720 		extract_fifo_outbound_uuid(originate_string, digest, sizeof(digest));
4721 	} else {
4722 		switch_md5_string(digest, (void *) originate_string, strlen(originate_string));
4723 	}
4724 
4725 	sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname);
4726 	switch_assert(sql);
4727 	fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE);
4728 
4729 	switch_mutex_lock(globals.mutex);
4730 	if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
4731 		node = create_node(fifo_name, 0, globals.sql_mutex);
4732 		node->ready = 1;
4733 	}
4734 	switch_mutex_unlock(globals.mutex);
4735 
4736 	cbt.buf = outbound_count;
4737 	cbt.len = sizeof(outbound_count);
4738 	sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name);
4739 	fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
4740 	node->member_count = atoi(outbound_count);
4741 	if (node->member_count > 0) {
4742 		node->has_outbound = 1;
4743 	} else {
4744 		node->has_outbound = 0;
4745 	}
4746 	switch_safe_free(sql);
4747 }
4748 
4749 #define FIFO_MEMBER_API_SYNTAX "[add <fifo_name> <originate_string> [<simo_count>] [<timeout>] [<lag>] [<expires>] [<taking_calls>] | del <fifo_name> <originate_string>]"
SWITCH_STANDARD_API(fifo_member_api_function)4750 SWITCH_STANDARD_API(fifo_member_api_function)
4751 {
4752 	char *fifo_name;
4753 	char *originate_string;
4754 	int simo_count = 1;
4755 	int timeout = 60;
4756 	int lag = 5;
4757 	int taking_calls = 1;
4758 	char *action;
4759 	char *mydata = NULL, *argv[8] = { 0 };
4760 	int argc;
4761 	time_t expires = 0;
4762 
4763 	if (!globals.running) {
4764 		return SWITCH_STATUS_FALSE;
4765 	}
4766 
4767 	if (zstr(cmd)) {
4768 		stream->write_function(stream, "-USAGE: %s\n", FIFO_MEMBER_API_SYNTAX);
4769 		return SWITCH_STATUS_SUCCESS;
4770 	}
4771 
4772 	mydata = strdup(cmd);
4773 	switch_assert(mydata);
4774 
4775 	argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
4776 
4777 	if (argc < 3) {
4778 		stream->write_function(stream, "%s", "-ERR Invalid!\n");
4779 		goto done;
4780 	}
4781 
4782 	action = argv[0];
4783 	fifo_name = argv[1];
4784 	originate_string = argv[2];
4785 
4786 	if (action && !strcasecmp(action, "add")) {
4787 		if (argc > 3) {
4788 			simo_count = atoi(argv[3]);
4789 		}
4790 		if (argc > 4) {
4791 			timeout = atoi(argv[4]);
4792 		}
4793 		if (argc > 5) {
4794 			lag = atoi(argv[5]);
4795 		}
4796 		if (argc > 6) {
4797 			expires = switch_epoch_time_now(NULL) + atoi(argv[6]);
4798 		}
4799 		if (argc > 7) {
4800 			taking_calls = atoi(argv[7]);
4801 		}
4802 		if (simo_count < 0) {
4803 			simo_count = 1;
4804 		}
4805 		if (timeout < 0) {
4806 			timeout = 60;
4807 		}
4808 		if (lag < 0) {
4809 			lag = 5;
4810 		}
4811 		if (taking_calls < 1) {
4812 			taking_calls = 1;
4813 		}
4814 
4815 		fifo_member_add(fifo_name, originate_string, simo_count, timeout, lag, expires, taking_calls);
4816 		stream->write_function(stream, "%s", "+OK\n");
4817 	} else if (action && !strcasecmp(action, "del")) {
4818 		fifo_member_del(fifo_name, originate_string);
4819 		stream->write_function(stream, "%s", "+OK\n");
4820 	} else {
4821 		stream->write_function(stream, "%s", "-ERR Invalid!\n");
4822 		goto done;
4823 	}
4824 
4825   done:
4826 
4827 	free(mydata);
4828 
4829 	return SWITCH_STATUS_SUCCESS;
4830 }
4831 
SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)4832 SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
4833 {
4834 	switch_application_interface_t *app_interface;
4835 	switch_api_interface_t *commands_api_interface;
4836 	switch_status_t status;
4837 
4838 	/* create/register custom event message type */
4839 	if (switch_event_reserve_subclass(FIFO_EVENT) != SWITCH_STATUS_SUCCESS) {
4840 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!", FIFO_EVENT);
4841 		return SWITCH_STATUS_TERM;
4842 	}
4843 
4844 	/* Subscribe to presence request events */
4845 	if (switch_event_bind_removable(modname, SWITCH_EVENT_PRESENCE_PROBE, SWITCH_EVENT_SUBCLASS_ANY,
4846 									pres_event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
4847 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't subscribe to presence request events!\n");
4848 		return SWITCH_STATUS_GENERR;
4849 	}
4850 
4851 	globals.pool = pool;
4852 	switch_core_hash_init(&globals.fifo_hash);
4853 
4854 	switch_core_hash_init(&globals.caller_orig_hash);
4855 	switch_core_hash_init(&globals.consumer_orig_hash);
4856 	switch_core_hash_init(&globals.bridge_hash);
4857 	switch_core_hash_init(&globals.use_hash);
4858 	switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4859 	switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4860 	switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4861 
4862 	switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
4863 	switch_mutex_init(&globals.use_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4864 	switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool);
4865 
4866 	globals.running = 1;
4867 
4868 	if ((status = load_config(0, 1)) != SWITCH_STATUS_SUCCESS) {
4869 		switch_event_unbind(&globals.node);
4870 		switch_event_free_subclass(FIFO_EVENT);
4871 		switch_core_hash_destroy(&globals.fifo_hash);
4872 		return status;
4873 	}
4874 
4875 	/* connect my internal structure to the blank pointer passed to me */
4876 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
4877 	SWITCH_ADD_APP(app_interface, "fifo", "Park with FIFO", FIFO_DESC, fifo_function, FIFO_USAGE, SAF_NONE);
4878 	SWITCH_ADD_APP(app_interface, "fifo_track_call", "Count a call as a fifo call in the manual_calls queue",
4879 				   "", fifo_track_call_function, "<fifo_outbound_uuid>", SAF_SUPPORT_NOMEDIA);
4880 	SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX);
4881 	SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX);
4882 	SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, "<node> <url> [<priority>]");
4883 	SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "<uuid>|<outbound_id>");
4884 	switch_console_set_complete("add fifo list");
4885 	switch_console_set_complete("add fifo list_verbose");
4886 	switch_console_set_complete("add fifo count");
4887 	switch_console_set_complete("add fifo debug");
4888 	switch_console_set_complete("add fifo status");
4889 	switch_console_set_complete("add fifo has_outbound");
4890 	switch_console_set_complete("add fifo importance");
4891 	switch_console_set_complete("add fifo reparse");
4892 	switch_console_set_complete("add fifo_check_bridge ::console::list_uuid");
4893 
4894 	start_node_thread(globals.pool);
4895 
4896 	return SWITCH_STATUS_SUCCESS;
4897 }
4898 
4899 /*
4900   Called when the system shuts down
4901 */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)4902 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
4903 {
4904 	switch_event_t *pop = NULL;
4905 	fifo_node_t *node, *this_node;
4906 	switch_mutex_t *mutex = globals.mutex;
4907 
4908 	switch_sql_queue_manager_destroy(&globals.qm);
4909 
4910 	switch_event_unbind(&globals.node);
4911 	switch_event_free_subclass(FIFO_EVENT);
4912 
4913 	switch_mutex_lock(mutex);
4914 
4915 	globals.running = 0;
4916 	/* Cleanup */
4917 
4918 	stop_node_thread();
4919 
4920 	while(globals.threads) {
4921 		switch_cond_next();
4922 	}
4923 
4924 	node = globals.nodes;
4925 
4926 	while(node) {
4927 		int x = 0;
4928 
4929 		this_node = node;
4930 		node = node->next;
4931 
4932 		switch_mutex_lock(this_node->update_mutex);
4933 		switch_mutex_lock(this_node->mutex);
4934 		for (x = 0; x < MAX_PRI; x++) {
4935 			while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
4936 				switch_event_destroy(&pop);
4937 			}
4938 		}
4939 		switch_mutex_unlock(this_node->mutex);
4940 		switch_core_hash_delete(globals.fifo_hash, this_node->name);
4941 		switch_core_hash_destroy(&this_node->consumer_hash);
4942 		switch_mutex_unlock(this_node->update_mutex);
4943 		switch_core_destroy_memory_pool(&this_node->pool);
4944 	}
4945 
4946 	switch_core_hash_destroy(&globals.fifo_hash);
4947 	switch_core_hash_destroy(&globals.caller_orig_hash);
4948 	switch_core_hash_destroy(&globals.consumer_orig_hash);
4949 	switch_core_hash_destroy(&globals.bridge_hash);
4950 	switch_core_hash_destroy(&globals.use_hash);
4951 	memset(&globals, 0, sizeof(globals));
4952 	switch_mutex_unlock(mutex);
4953 
4954 	return SWITCH_STATUS_SUCCESS;
4955 }
4956 
4957 /* For Emacs:
4958  * Local Variables:
4959  * mode:c
4960  * indent-tabs-mode:t
4961  * tab-width:4
4962  * c-basic-offset:4
4963  * End:
4964  * For VIM:
4965  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
4966  */
4967