1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  * Dragos Oancea <dragos@freeswitch.org>
28  *
29  * switch_jitterbuffer.c -- Audio/Video Jitter Buffer
30  *
31  */
32 #include <switch.h>
33 #include <switch_jitterbuffer.h>
34 #include "private/switch_hashtable_private.h"
35 
36 #define NACK_TIME 80000
37 #define RENACK_TIME 100000
38 #define MAX_FRAME_PADDING 2
39 #define MAX_MISSING_SEQ 20
40 #define jb_debug(_jb, _level, _format, ...) if (_jb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(_jb->session), SWITCH_LOG_ALERT, "JB:%p:%s:%d/%d lv:%d ln:%.4d sz:%.3u/%.3u/%.3u/%.3u c:%.3u %.3u/%.3u/%.3u/%.3u %.2f%% ->" _format, (void *) _jb, (jb->type == SJB_TEXT ? "txt" : (jb->type == SJB_AUDIO ? "aud" : "vid")), _jb->allocated_nodes, _jb->visible_nodes, _level, __LINE__,  _jb->min_frame_len, _jb->max_frame_len, _jb->frame_len, _jb->complete_frames, _jb->period_count, _jb->consec_good_count, _jb->period_good_count, _jb->consec_miss_count, _jb->period_miss_count, _jb->period_miss_pct, __VA_ARGS__)
41 
42 //const char *TOKEN_1 = "ONE";
43 //const char *TOKEN_2 = "TWO";
44 
45 struct switch_jb_s;
46 
47 typedef struct switch_jb_node_s {
48 	struct switch_jb_s *parent;
49 	switch_rtp_packet_t packet;
50 	uint32_t len;
51 	uint8_t visible;
52 	uint8_t bad_hits;
53 	struct switch_jb_node_s *prev;
54 	struct switch_jb_node_s *next;
55 	/* used for counting the number of partial or complete frames currently in the JB */
56 	switch_bool_t complete_frame_mark;
57 } switch_jb_node_t;
58 
59 struct switch_jb_s {
60 	struct switch_jb_node_s *node_list;
61 	uint32_t last_target_seq;
62 	uint32_t highest_read_ts;
63 	uint32_t highest_dropped_ts;
64 	uint32_t highest_read_seq;
65 	uint32_t highest_wrote_ts;
66 	uint16_t highest_wrote_seq;
67 	uint16_t target_seq;
68 	uint32_t target_ts;
69 	uint32_t last_target_ts;
70 	uint16_t psuedo_seq;
71 	uint16_t last_psuedo_seq;
72 	uint32_t visible_nodes;
73 	uint32_t allocated_nodes;
74 	uint32_t complete_frames;
75 	uint32_t frame_len;
76 	uint32_t min_frame_len;
77 	uint32_t max_frame_len;
78 	uint32_t highest_frame_len;
79 	uint32_t period_miss_count;
80 	uint32_t consec_miss_count;
81 	uint32_t period_miss_inc;
82 	double period_miss_pct;
83 	uint32_t period_good_count;
84 	uint32_t consec_good_count;
85 	uint32_t period_count;
86 	uint32_t dropped;
87 	uint32_t samples_per_frame;
88 	uint32_t samples_per_second;
89 	uint32_t bitrate_control;
90 	uint32_t video_low_bitrate;
91 	uint8_t write_init;
92 	uint8_t read_init;
93 	uint8_t debug_level;
94 	uint16_t next_seq;
95 	switch_size_t last_len;
96 	switch_inthash_t *missing_seq_hash;
97 	switch_inthash_t *node_hash;
98 	switch_inthash_t *node_hash_ts;
99 	switch_mutex_t *mutex;
100 	switch_mutex_t *list_mutex;
101 	switch_memory_pool_t *pool;
102 	int free_pool;
103 	int drop_flag;
104 	switch_jb_flag_t flags;
105 	switch_jb_type_t type;
106 	switch_core_session_t *session;
107 	switch_channel_t *channel;
108 	uint32_t buffer_lag;
109 	uint32_t flush;
110 	uint32_t packet_count;
111 	uint32_t max_packet_len;
112 	uint32_t period_len;
113 	uint32_t nack_saved_the_day;
114 	uint32_t nack_didnt_save_the_day;
115 };
116 
117 
node_cmp(const void * l,const void * r)118 static int node_cmp(const void *l, const void *r)
119 {
120 	switch_jb_node_t *a = (switch_jb_node_t *) l;
121 	switch_jb_node_t *b = (switch_jb_node_t *) r;
122 
123 	if (!a->visible) return 0;
124 	if (!b->visible) return 1;
125 
126 	return ntohs(a->packet.header.seq) - ntohs(b->packet.header.seq);
127 }
128 
129 //http://www.chiark.greenend.org.uk/~sgtatham/algorithms/listsort.c
sort_nodes(switch_jb_node_t * list,int (* cmp)(const void *,const void *))130 switch_jb_node_t *sort_nodes(switch_jb_node_t *list, int (*cmp)(const void *, const void *)) {
131 	switch_jb_node_t *p, *q, *e, *tail;
132 	int insize, nmerges, psize, qsize, i;
133 
134 	if (!list) {
135 		return NULL;
136 	}
137 
138 	insize = 1;
139 
140 	while (1) {
141 		p = list;
142 		list = NULL;
143 		tail = NULL;
144 
145 		nmerges = 0;  /* count number of merges we do in this pass */
146 
147 		while (p) {
148 			nmerges++;  /* there exists a merge to be done */
149 			            /* step `insize' places along from p */
150 			q = p;
151 			psize = 0;
152 			for (i = 0; i < insize; i++) {
153 				psize++;
154 				q = q->next;
155 				if (!q) break;
156 			}
157 
158 			/* if q hasn't fallen off end, we have two lists to merge */
159 			qsize = insize;
160 
161 			/* now we have two lists; merge them */
162 			while (psize > 0 || (qsize > 0 && q)) {
163 
164 				/* decide whether next switch_jb_node_t of merge comes from p or q */
165 				if (psize == 0) {
166 					/* p is empty; e must come from q. */
167 					e = q; q = q->next; qsize--;
168 				} else if (qsize == 0 || !q) {
169 					/* q is empty; e must come from p. */
170 					e = p; p = p->next; psize--;
171 				} else if (cmp(p,q) <= 0) {
172 					/* First switch_jb_node_t of p is lower (or same);
173 					 * e must come from p. */
174 					e = p; p = p->next; psize--;
175 				} else {
176 					/* First switch_jb_node_t of q is lower; e must come from q. */
177 					e = q; q = q->next; qsize--;
178 				}
179 
180 				/* add the next switch_jb_node_t to the merged list */
181 				if (tail) {
182 					tail->next = e;
183 				} else {
184 					list = e;
185 				}
186 
187 				/* Maintain reverse pointers in a doubly linked list. */
188 				e->prev = tail;
189 
190 				tail = e;
191 			}
192 
193 			/* now p has stepped `insize' places along, and q has too */
194 			p = q;
195 		}
196 
197 		tail->next = NULL;
198 
199 		/* If we have done only one merge, we're finished. */
200 		if (nmerges <= 1)   /* allow for nmerges==0, the empty list case */
201 			return list;
202 
203 		/* Otherwise repeat, merging lists twice the size */
204 		insize *= 2;
205 	}
206 }
207 
208 // static inline void thin_frames(switch_jb_t *jb, int freq, int max);
209 
210 
new_node(switch_jb_t * jb)211 static inline switch_jb_node_t *new_node(switch_jb_t *jb)
212 {
213 	switch_jb_node_t *np;
214 
215 	switch_mutex_lock(jb->list_mutex);
216 
217 	for (np = jb->node_list; np; np = np->next) {
218 		if (!np->visible) {
219 			break;
220 		}
221 	}
222 
223 	if (!np) {
224 		int mult = 2;
225 
226 		if (jb->type != SJB_VIDEO) {
227 			mult = 2;
228 		} else {
229 			if (jb->max_packet_len > mult) {
230 				mult = jb->max_packet_len;
231 			}
232 		}
233 
234 		if (jb->allocated_nodes > jb->max_frame_len * mult) {
235 			jb_debug(jb, 2, "ALLOCATED FRAMES TOO HIGH! %d\n", jb->allocated_nodes);
236 			switch_jb_reset(jb);
237 			switch_mutex_unlock(jb->list_mutex);
238 			return NULL;
239 		}
240 
241 		np = switch_core_alloc(jb->pool, sizeof(*np));
242 		jb->allocated_nodes++;
243 		np->next = jb->node_list;
244 		if (np->next) {
245 			np->next->prev = np;
246 		}
247 		jb->node_list = np;
248 
249 	}
250 
251 	switch_assert(np);
252 	np->bad_hits = 0;
253 	np->visible = 1;
254 	jb->visible_nodes++;
255 	np->parent = jb;
256 
257 	switch_mutex_unlock(jb->list_mutex);
258 
259 	return np;
260 }
261 
push_to_top(switch_jb_t * jb,switch_jb_node_t * node)262 static inline void push_to_top(switch_jb_t *jb, switch_jb_node_t *node)
263 {
264 	if (node == jb->node_list) {
265 		jb->node_list = node->next;
266 	} else if (node->prev) {
267 		node->prev->next = node->next;
268 	}
269 
270 	if (node->next) {
271 		node->next->prev = node->prev;
272 	}
273 
274 	node->next = jb->node_list;
275 	node->prev = NULL;
276 
277 	if (node->next) {
278 		node->next->prev = node;
279 	}
280 
281 	jb->node_list = node;
282 
283 	switch_assert(node->next != node);
284 	switch_assert(node->prev != node);
285 }
286 
hide_node(switch_jb_node_t * node,switch_bool_t pop)287 static inline void hide_node(switch_jb_node_t *node, switch_bool_t pop)
288 {
289 	switch_jb_t *jb = node->parent;
290 
291 	switch_mutex_lock(jb->list_mutex);
292 
293 	if (node->visible) {
294 		node->visible = 0;
295 		node->bad_hits = 0;
296 		jb->visible_nodes--;
297 
298 		if (pop) {
299 			push_to_top(jb, node);
300 		}
301 	}
302 
303 	if (jb->node_hash_ts) {
304 		switch_core_inthash_delete(jb->node_hash_ts, node->packet.header.ts);
305 	}
306 
307 	if (switch_core_inthash_delete(jb->node_hash, node->packet.header.seq)) {
308 		if (node->complete_frame_mark && jb->type == SJB_VIDEO) {
309 			jb->complete_frames--;
310 			node->complete_frame_mark = FALSE;
311 		}
312 	}
313 
314 	switch_mutex_unlock(jb->list_mutex);
315 }
316 
sort_free_nodes(switch_jb_t * jb)317 static inline void sort_free_nodes(switch_jb_t *jb)
318 {
319 	switch_mutex_lock(jb->list_mutex);
320 	jb->node_list = sort_nodes(jb->node_list, node_cmp);
321 	switch_mutex_unlock(jb->list_mutex);
322 }
323 
hide_nodes(switch_jb_t * jb)324 static inline void hide_nodes(switch_jb_t *jb)
325 {
326 	switch_jb_node_t *np;
327 
328 	switch_mutex_lock(jb->list_mutex);
329 	for (np = jb->node_list; np; np = np->next) {
330 		hide_node(np, SWITCH_FALSE);
331 	}
332 	switch_mutex_unlock(jb->list_mutex);
333 }
334 
drop_ts(switch_jb_t * jb,uint32_t ts)335 static inline void drop_ts(switch_jb_t *jb, uint32_t ts)
336 {
337 	switch_jb_node_t *np;
338 	int x = 0;
339 
340 	switch_mutex_lock(jb->list_mutex);
341 	for (np = jb->node_list; np; np = np->next) {
342 		if (!np->visible) continue;
343 
344 		if (ts == np->packet.header.ts) {
345 			hide_node(np, SWITCH_FALSE);
346 			x++;
347 		}
348 	}
349 
350 	if (x) {
351 		sort_free_nodes(jb);
352 	}
353 
354 	switch_mutex_unlock(jb->list_mutex);
355 }
356 
jb_find_lowest_seq(switch_jb_t * jb,uint32_t ts)357 static inline switch_jb_node_t *jb_find_lowest_seq(switch_jb_t *jb, uint32_t ts)
358 {
359 	switch_jb_node_t *np, *lowest = NULL;
360 
361 	switch_mutex_lock(jb->list_mutex);
362 	for (np = jb->node_list; np; np = np->next) {
363 		if (!np->visible) continue;
364 
365 		if (ts && ts != np->packet.header.ts) continue;
366 
367 		if (!lowest || ntohs(lowest->packet.header.seq) > ntohs(np->packet.header.seq)) {
368 			lowest = np;
369 		}
370 	}
371 	switch_mutex_unlock(jb->list_mutex);
372 
373 	return lowest;
374 }
375 
jb_find_lowest_node(switch_jb_t * jb)376 static inline switch_jb_node_t *jb_find_lowest_node(switch_jb_t *jb)
377 {
378 	switch_jb_node_t *np, *lowest = NULL;
379 
380 	switch_mutex_lock(jb->list_mutex);
381 	for (np = jb->node_list; np; np = np->next) {
382 		if (!np->visible) continue;
383 
384 		if (!lowest || ntohl(lowest->packet.header.ts) > ntohl(np->packet.header.ts)) {
385 			lowest = np;
386 		}
387 	}
388 	switch_mutex_unlock(jb->list_mutex);
389 
390 	return lowest ? lowest : NULL;
391 }
392 
jb_find_lowest_ts(switch_jb_t * jb)393 static inline uint32_t jb_find_lowest_ts(switch_jb_t *jb)
394 {
395 	switch_jb_node_t *lowest = jb_find_lowest_node(jb);
396 
397 	return lowest ? lowest->packet.header.ts : 0;
398 }
399 
400 #if 0
401 static inline void thin_frames(switch_jb_t *jb, int freq, int max)
402 {
403 	switch_jb_node_t *node, *this_node;
404 	int i = -1;
405 	int dropped = 0;
406 
407 	switch_mutex_lock(jb->list_mutex);
408 	node = jb->node_list;
409 
410 	while (node && dropped <= max) {
411 		this_node = node;
412 		node = node->next;
413 
414 		if (this_node->visible) {
415 			i++;
416 		} else {
417 			continue;
418 		}
419 
420 		if ((i % freq) == 0) {
421 			drop_ts(jb, this_node->packet.header.ts);
422 			dropped++;
423 		}
424 	}
425 
426 	sort_free_nodes(jb);
427 	switch_mutex_unlock(jb->list_mutex);
428 }
429 
430 static inline switch_jb_node_t *jb_find_highest_node(switch_jb_t *jb)
431 {
432 	switch_jb_node_t *np, *highest = NULL;
433 
434 	switch_mutex_lock(jb->list_mutex);
435 	for (np = jb->node_list; np; np = np->next) {
436 		if (!np->visible) continue;
437 
438 		if (!highest || ntohl(highest->packet.header.ts) < ntohl(np->packet.header.ts)) {
439 			highest = np;
440 		}
441 	}
442 	switch_mutex_unlock(jb->list_mutex);
443 
444 	return highest ? highest : NULL;
445 }
446 
447 
448 static inline uint32_t jb_find_highest_ts(switch_jb_t *jb)
449 {
450 	switch_jb_node_t *highest = jb_find_highest_node(jb);
451 
452 	return highest ? highest->packet.header.ts : 0;
453 }
454 
455 static inline void drop_newest_frame(switch_jb_t *jb)
456 {
457 	uint32_t ts = jb_find_highest_ts(jb);
458 
459 	drop_ts(jb, ts);
460 	jb_debug(jb, 1, "Dropping highest frame ts:%u\n", ntohl(ts));
461 }
462 
463 
464 
465 
466 static inline switch_jb_node_t *jb_find_penultimate_node(switch_jb_t *jb)
467 {
468 	switch_jb_node_t *np, *highest = NULL, *second_highest = NULL;
469 
470 	switch_mutex_lock(jb->list_mutex);
471 	for (np = jb->node_list; np; np = np->next) {
472 		if (!np->visible) continue;
473 
474 		if (!highest || ntohl(highest->packet.header.ts) < ntohl(np->packet.header.ts)) {
475 			if (highest) second_highest = highest;
476 			highest = np;
477 		}
478 	}
479 	switch_mutex_unlock(jb->list_mutex);
480 
481 	return second_highest ? second_highest : highest;
482 }
483 #endif
484 
jb_hit(switch_jb_t * jb)485 static inline void jb_hit(switch_jb_t *jb)
486 {
487 	jb->period_good_count++;
488 	jb->consec_good_count++;
489 	jb->consec_miss_count = 0;
490 }
491 
jb_frame_inc_line(switch_jb_t * jb,int i,int line)492 static void jb_frame_inc_line(switch_jb_t *jb, int i, int line)
493 {
494 	uint32_t old_frame_len = jb->frame_len;
495 
496 	if (i == 0) {
497 		jb->frame_len = jb->min_frame_len;
498 		goto end;
499 	}
500 
501 	if (i > 0) {
502 		if ((jb->frame_len + i) < jb->max_frame_len) {
503 			jb->frame_len += i;
504 		} else {
505 			jb->frame_len = jb->max_frame_len;
506 		}
507 
508 		goto end;
509 	}
510 
511 	/* i < 0 */
512 	if ((jb->frame_len + i) > jb->min_frame_len) {
513 		jb->frame_len += i;
514 	} else {
515 		jb->frame_len = jb->min_frame_len;
516 	}
517 
518  end:
519 
520 	if (jb->frame_len > jb->highest_frame_len) {
521 		jb->highest_frame_len = jb->frame_len;
522 	}
523 
524 	if (old_frame_len != jb->frame_len) {
525 		jb_debug(jb, 1, "%d Change framelen from %u to %u\n", line, old_frame_len, jb->frame_len);
526 
527 		//if (jb->session) {
528 		//	switch_core_session_request_video_refresh(jb->session);
529 		//}
530 	}
531 
532 }
533 
534 #define jb_frame_inc(_jb, _i) jb_frame_inc_line(_jb, _i, __LINE__)
535 
536 
jb_miss(switch_jb_t * jb)537 static inline void jb_miss(switch_jb_t *jb)
538 {
539 	jb->period_miss_count++;
540 	jb->consec_miss_count++;
541 	jb->consec_good_count = 0;
542 }
543 
544 #if 0
545 static inline int verify_oldest_frame(switch_jb_t *jb)
546 {
547 	switch_jb_node_t *lowest = NULL, *np = NULL;
548 	int r = 0;
549 
550 	lowest = jb_find_lowest_node(jb);
551 
552 	if (!lowest || !(lowest = jb_find_lowest_seq(jb, lowest->packet.header.ts))) {
553 		goto end;
554 	}
555 
556 	switch_mutex_lock(jb->mutex);
557 
558 	jb->node_list = sort_nodes(jb->node_list, node_cmp);
559 
560 	for (np = lowest->next; np; np = np->next) {
561 
562 		if (!np->visible) continue;
563 
564 		if (ntohs(np->packet.header.seq) != ntohs(np->prev->packet.header.seq) + 1) {
565 			uint32_t val = (uint32_t)htons(ntohs(np->prev->packet.header.seq) + 1);
566 
567 			if (!switch_core_inthash_find(jb->missing_seq_hash, val)) {
568 				switch_core_inthash_insert(jb->missing_seq_hash, val, (void *)(intptr_t)1);
569 			}
570 			break;
571 		}
572 
573 		if (np->packet.header.ts != lowest->packet.header.ts || !np->next) {
574 			r = 1;
575 		}
576 	}
577 
578 	switch_mutex_unlock(jb->mutex);
579 
580  end:
581 
582 	return r;
583 }
584 #endif
585 
drop_oldest_frame(switch_jb_t * jb)586 static inline void drop_oldest_frame(switch_jb_t *jb)
587 {
588 	uint32_t ts = jb_find_lowest_ts(jb);
589 
590 	drop_ts(jb, ts);
591 	jb_debug(jb, 1, "Dropping oldest frame ts:%u\n", ntohl(ts));
592 }
593 
594 
595 
596 #if 0
597 static inline void drop_second_newest_frame(switch_jb_t *jb)
598 {
599 	switch_jb_node_t *second_newest = jb_find_penultimate_node(jb);
600 
601 	if (second_newest) {
602 		drop_ts(jb, second_newest->packet.header.ts);
603 		jb_debug(jb, 1, "Dropping second highest frame ts:%u\n", ntohl(second_newest->packet.header.ts));
604 	}
605 }
606 #endif
607 
check_seq(uint16_t a,uint16_t b)608 static inline int check_seq(uint16_t a, uint16_t b)
609 {
610 	a = ntohs(a);
611 	b = ntohs(b);
612 
613 	if (a >= b || (b > a && b > USHRT_MAX / 2 && a < USHRT_MAX / 2)) {
614 		return 1;
615 	}
616 
617 	return 0;
618 }
619 
check_ts(uint32_t a,uint32_t b)620 static inline int check_ts(uint32_t a, uint32_t b)
621 {
622 	a = ntohl(a);
623 	b = ntohl(b);
624 
625 	if (a > b || (b > a && b > UINT_MAX / 2 && a < UINT_MAX / 2)) {
626 		return 1;
627 	}
628 
629 	return 0;
630 }
631 
add_node(switch_jb_t * jb,switch_rtp_packet_t * packet,switch_size_t len)632 static inline void add_node(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t len)
633 {
634 	switch_jb_node_t *node = new_node(jb);
635 
636 	if (!node) {
637 		return;
638 	}
639 
640 
641 	node->packet = *packet;
642 	node->len = len;
643 	memcpy(node->packet.body, packet->body, len);
644 
645 	switch_core_inthash_insert(jb->node_hash, node->packet.header.seq, node);
646 
647 	if (jb->node_hash_ts) {
648 		switch_core_inthash_insert(jb->node_hash_ts, node->packet.header.ts, node);
649 	}
650 
651 	jb_debug(jb, (packet->header.m ? 2 : 3), "PUT packet last_ts:%u ts:%u seq:%u%s\n",
652 			 ntohl(jb->highest_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " <MARK>" : "");
653 
654 	if (jb->write_init && jb->type == SJB_VIDEO) {
655 		int seq_diff = 0, ts_diff = 0;
656 
657 		if (ntohs(jb->highest_wrote_seq) > (USHRT_MAX - 100) && ntohs(packet->header.seq) < 100) {
658 			seq_diff = (USHRT_MAX - ntohs(jb->highest_wrote_seq)) + ntohs(packet->header.seq);
659 		} else {
660 			seq_diff = abs(((int)ntohs(packet->header.seq) - ntohs(jb->highest_wrote_seq)));
661 		}
662 
663 		if (ntohl(jb->highest_wrote_ts) > (UINT_MAX - 1000) && ntohl(node->packet.header.ts) < 1000) {
664 			ts_diff = (UINT_MAX - ntohl(node->packet.header.ts)) + ntohl(node->packet.header.ts);
665 		} else {
666 			ts_diff = abs((int)((int64_t)ntohl(node->packet.header.ts) - (int64_t)ntohl(jb->highest_wrote_ts)));
667 		}
668 
669 		if (((seq_diff >= 100) || (ts_diff > (900000 * 5)))) {
670 			jb_debug(jb, 2, "CHANGE DETECTED, PUNT %u\n", abs(((int)ntohs(packet->header.seq) - ntohs(jb->highest_wrote_seq))));
671 			switch_jb_reset(jb);
672 		}
673 	}
674 
675 	if (!jb->write_init || ntohs(packet->header.seq) > ntohs(jb->highest_wrote_seq) ||
676 		(ntohs(jb->highest_wrote_seq) > USHRT_MAX - 100 && ntohs(packet->header.seq) < 100) ) {
677 		jb->highest_wrote_seq = packet->header.seq;
678 	}
679 
680 	if (jb->type == SJB_VIDEO) {
681 		jb->packet_count++;
682 
683 		if (jb->write_init && check_seq(packet->header.seq, jb->highest_wrote_seq) && check_ts(node->packet.header.ts, jb->highest_wrote_ts)) {
684 			jb_debug(jb, 2, "WRITE frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), jb->complete_frames , jb->frame_len, jb->visible_nodes);
685 			jb->highest_wrote_ts = packet->header.ts;
686 			jb->complete_frames++;
687 
688 			jb->packet_count--;
689 			if (jb->packet_count > jb->max_packet_len) {
690 				jb->max_packet_len = jb->packet_count;
691 			}
692 			jb->packet_count = 1;
693 			node->complete_frame_mark = TRUE;
694 		} else if (!jb->write_init) {
695 			jb->highest_wrote_ts = packet->header.ts;
696 		}
697 	} else {
698 		if (jb->write_init || jb->type == SJB_TEXT || jb->type == SJB_AUDIO) {
699 			jb_debug(jb, 2, "WRITE frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), jb->complete_frames , jb->frame_len, jb->visible_nodes);
700 			jb->complete_frames++;
701 		} else {
702 			jb->highest_wrote_ts = packet->header.ts;
703 		}
704 	}
705 
706 	if (!jb->write_init) jb->write_init = 1;
707 }
708 
increment_ts(switch_jb_t * jb)709 static inline void increment_ts(switch_jb_t *jb)
710 {
711 	if (!jb->target_ts) return;
712 
713 	jb->last_psuedo_seq = jb->psuedo_seq;
714 	jb->last_target_ts = jb->target_ts;
715 	jb->target_ts = htonl((ntohl(jb->target_ts) + jb->samples_per_frame));
716 	jb->psuedo_seq++;
717 }
718 
set_read_ts(switch_jb_t * jb,uint32_t ts)719 static inline void set_read_ts(switch_jb_t *jb, uint32_t ts)
720 {
721 	if (!ts) return;
722 
723 	jb->last_psuedo_seq = jb->psuedo_seq;
724 	jb->last_target_ts = ts;
725 	jb->target_ts = htonl((ntohl(jb->last_target_ts) + jb->samples_per_frame));
726 	jb->psuedo_seq++;
727 }
728 
729 
increment_seq(switch_jb_t * jb)730 static inline void increment_seq(switch_jb_t *jb)
731 {
732 	jb->last_target_seq = jb->target_seq;
733 	jb->target_seq = htons((ntohs(jb->target_seq) + 1));
734 }
735 
set_read_seq(switch_jb_t * jb,uint16_t seq)736 static inline void set_read_seq(switch_jb_t *jb, uint16_t seq)
737 {
738 	jb->last_target_seq = seq;
739 	jb->target_seq = htons((ntohs(jb->last_target_seq) + 1));
740 }
741 
jb_next_packet_by_seq(switch_jb_t * jb,switch_jb_node_t ** nodep)742 static inline switch_status_t jb_next_packet_by_seq(switch_jb_t *jb, switch_jb_node_t **nodep)
743 {
744 	switch_jb_node_t *node = NULL;
745 
746  top:
747 
748 	if (jb->type == SJB_VIDEO) {
749 		if (jb->dropped) {
750 			jb->dropped = 0;
751 			jb_debug(jb, 2, "%s", "DROPPED FRAME DETECTED RESYNCING\n");
752 			jb->target_seq = 0;
753 
754 			if (jb->session) {
755 				switch_core_session_request_video_refresh(jb->session);
756 			}
757 		}
758 	}
759 
760 	if (!jb->target_seq) {
761 		if ((node = switch_core_inthash_find(jb->node_hash, jb->target_seq))) {
762 			jb_debug(jb, 2, "FOUND rollover seq: %u\n", ntohs(jb->target_seq));
763 		} else if ((node = jb_find_lowest_seq(jb, 0))) {
764 			jb_debug(jb, 2, "No target seq using seq: %u as a starting point\n", ntohs(node->packet.header.seq));
765 		} else {
766 			jb_debug(jb, 1, "%s", "No nodes available....\n");
767 		}
768 		jb_hit(jb);
769 	} else if ((node = switch_core_inthash_find(jb->node_hash, jb->target_seq))) {
770 		jb_debug(jb, 2, "FOUND desired seq: %u\n", ntohs(jb->target_seq));
771 		jb_hit(jb);
772 	} else {
773 		jb_debug(jb, 2, "MISSING desired seq: %u\n", ntohs(jb->target_seq));
774 		jb_miss(jb);
775 
776 		if (jb->type == SJB_VIDEO) {
777 			int x;
778 
779 			if (jb->session) {
780 				switch_core_session_request_video_refresh(jb->session);
781 			}
782 
783 			for (x = 0; x < 10; x++) {
784 				increment_seq(jb);
785 				if ((node = switch_core_inthash_find(jb->node_hash, jb->target_seq))) {
786 					jb_debug(jb, 2, "FOUND incremental seq: %u\n", ntohs(jb->target_seq));
787 
788 					if (node->packet.header.m ||  node->packet.header.ts == jb->highest_read_ts) {
789 						jb_debug(jb, 2, "%s", "SAME FRAME DROPPING\n");
790 						jb->dropped++;
791 						drop_ts(jb, node->packet.header.ts);
792 						jb->highest_dropped_ts = ntohl(node->packet.header.ts);
793 
794 
795 						if (jb->period_miss_count > 2 && jb->period_miss_inc < 1) {
796 							jb->period_miss_inc++;
797 							jb_frame_inc(jb, 1);
798 						}
799 
800 						node = NULL;
801 						goto top;
802 					}
803 					break;
804 				} else {
805 					jb_debug(jb, 2, "MISSING incremental seq: %u\n", ntohs(jb->target_seq));
806 				}
807 			}
808 
809 		} else {
810 			increment_seq(jb);
811 		}
812 	}
813 
814 	*nodep = node;
815 
816 	if (node) {
817 		set_read_seq(jb, node->packet.header.seq);
818 		return SWITCH_STATUS_SUCCESS;
819 	}
820 
821 	return SWITCH_STATUS_NOTFOUND;
822 
823 }
824 
825 
jb_next_packet_by_ts(switch_jb_t * jb,switch_jb_node_t ** nodep)826 static inline switch_status_t jb_next_packet_by_ts(switch_jb_t *jb, switch_jb_node_t **nodep)
827 {
828 	switch_jb_node_t *node = NULL;
829 
830 	if (!jb->target_ts) {
831 		if ((node = jb_find_lowest_node(jb))) {
832 			jb_debug(jb, 2, "No target ts using ts: %u as a starting point\n", ntohl(node->packet.header.ts));
833 		} else {
834 			jb_debug(jb, 1, "%s", "No nodes available....\n");
835 		}
836 		jb_hit(jb);
837 	} else if ((node = switch_core_inthash_find(jb->node_hash_ts, jb->target_ts))) {
838 		jb_debug(jb, 2, "FOUND desired ts: %u\n", ntohl(jb->target_ts));
839 		jb_hit(jb);
840 	} else {
841 		jb_debug(jb, 2, "MISSING desired ts: %u\n", ntohl(jb->target_ts));
842 		jb_miss(jb);
843 		increment_ts(jb);
844 	}
845 
846 	*nodep = node;
847 
848 	if (node) {
849 		set_read_ts(jb, node->packet.header.ts);
850 		node->packet.header.seq = htons(jb->psuedo_seq);
851 		return SWITCH_STATUS_SUCCESS;
852 	}
853 
854 	return SWITCH_STATUS_NOTFOUND;
855 
856 }
857 
jb_next_packet(switch_jb_t * jb,switch_jb_node_t ** nodep)858 static inline switch_status_t jb_next_packet(switch_jb_t *jb, switch_jb_node_t **nodep)
859 {
860 	if (jb->samples_per_frame) {
861 		return jb_next_packet_by_ts(jb, nodep);
862 	} else {
863 		return jb_next_packet_by_seq(jb, nodep);
864 	}
865 }
866 
free_nodes(switch_jb_t * jb)867 static inline void free_nodes(switch_jb_t *jb)
868 {
869 	switch_mutex_lock(jb->list_mutex);
870 	jb->node_list = NULL;
871 	switch_mutex_unlock(jb->list_mutex);
872 }
873 
switch_jb_ts_mode(switch_jb_t * jb,uint32_t samples_per_frame,uint32_t samples_per_second)874 SWITCH_DECLARE(void) switch_jb_ts_mode(switch_jb_t *jb, uint32_t samples_per_frame, uint32_t samples_per_second)
875 {
876 	jb->samples_per_frame = samples_per_frame;
877 	jb->samples_per_second = samples_per_second;
878 	switch_core_inthash_init(&jb->node_hash_ts);
879 }
880 
switch_jb_set_session(switch_jb_t * jb,switch_core_session_t * session)881 SWITCH_DECLARE(void) switch_jb_set_session(switch_jb_t *jb, switch_core_session_t *session)
882 {
883 	const char *var;
884 
885 	if (session) {
886 		jb->session = session;
887 		jb->channel = switch_core_session_get_channel(session);
888 		if (jb->type == SJB_VIDEO && !switch_test_flag(jb, SJB_QUEUE_ONLY) &&
889 			(var = switch_channel_get_variable_dup(jb->channel, "jb_video_low_bitrate", SWITCH_FALSE, -1))) {
890 			int tmp = atoi(var);
891 			if (tmp >= 128 && tmp <= 10240) {
892 				jb->video_low_bitrate = (uint32_t)tmp;
893 			}
894 		}
895 	}
896 }
897 
switch_jb_set_flag(switch_jb_t * jb,switch_jb_flag_t flag)898 SWITCH_DECLARE(void) switch_jb_set_flag(switch_jb_t *jb, switch_jb_flag_t flag)
899 {
900 	switch_set_flag(jb, flag);
901 }
902 
switch_jb_clear_flag(switch_jb_t * jb,switch_jb_flag_t flag)903 SWITCH_DECLARE(void) switch_jb_clear_flag(switch_jb_t *jb, switch_jb_flag_t flag)
904 {
905 	switch_clear_flag(jb, flag);
906 }
907 
switch_jb_poll(switch_jb_t * jb)908 SWITCH_DECLARE(int) switch_jb_poll(switch_jb_t *jb)
909 {
910 	if (jb->type == SJB_TEXT) {
911 		if (jb->complete_frames < jb->frame_len) {
912 			if (jb->complete_frames && !jb->buffer_lag) {
913 				jb->buffer_lag = 10;
914 			}
915 			if (jb->buffer_lag && --jb->buffer_lag == 0) {
916 				jb->flush = 1;
917 			}
918 		}
919 	}
920 
921 	return (jb->complete_frames >= jb->frame_len) || jb->flush;
922 }
923 
switch_jb_frame_count(switch_jb_t * jb)924 SWITCH_DECLARE(int) switch_jb_frame_count(switch_jb_t *jb)
925 {
926 	return jb->complete_frames;
927 }
928 
switch_jb_debug_level(switch_jb_t * jb,uint8_t level)929 SWITCH_DECLARE(void) switch_jb_debug_level(switch_jb_t *jb, uint8_t level)
930 {
931 	jb->debug_level = level;
932 }
933 
switch_jb_reset(switch_jb_t * jb)934 SWITCH_DECLARE(void) switch_jb_reset(switch_jb_t *jb)
935 {
936 
937 	if (jb->type == SJB_VIDEO) {
938 		switch_mutex_lock(jb->mutex);
939 		switch_core_inthash_destroy(&jb->missing_seq_hash);
940 		switch_core_inthash_init(&jb->missing_seq_hash);
941 		switch_mutex_unlock(jb->mutex);
942 
943 		if (jb->session) {
944 			switch_core_session_request_video_refresh(jb->session);
945 		}
946 	}
947 
948 	jb_debug(jb, 2, "%s", "RESET BUFFER\n");
949 
950 	switch_mutex_lock(jb->mutex);
951 	hide_nodes(jb);
952 	switch_mutex_unlock(jb->mutex);
953 
954 	jb->drop_flag = 0;
955 	jb->last_target_seq = 0;
956 	jb->target_seq = 0;
957 	jb->write_init = 0;
958 	jb->highest_wrote_seq = 0;
959 	jb->highest_wrote_ts = 0;
960 	jb->next_seq = 0;
961 	jb->highest_read_ts = 0;
962 	jb->highest_read_seq = 0;
963 	jb->read_init = 0;
964 	jb->complete_frames = 0;
965 	jb->period_miss_count = 0;
966 	jb->consec_miss_count = 0;
967 	jb->period_miss_pct = 0;
968 	jb->period_good_count = 0;
969 	jb->consec_good_count = 0;
970 	jb->period_count = 0;
971 	jb->period_miss_inc = 0;
972 	jb->target_ts = 0;
973 	jb->last_target_ts = 0;
974 }
975 
switch_jb_get_nack_success(switch_jb_t * jb)976 SWITCH_DECLARE(uint32_t) switch_jb_get_nack_success(switch_jb_t *jb)
977 {
978 	uint32_t nack_recovered; /*count*/
979 	switch_mutex_lock(jb->mutex);
980 	nack_recovered = jb->nack_saved_the_day + jb->nack_didnt_save_the_day;
981 	switch_mutex_unlock(jb->mutex);
982 	return nack_recovered;
983 }
984 
switch_jb_get_packets_per_frame(switch_jb_t * jb)985 SWITCH_DECLARE(uint32_t) switch_jb_get_packets_per_frame(switch_jb_t *jb)
986 {
987 	uint32_t ppf;
988 	switch_mutex_lock(jb->mutex);
989 	ppf = jb->packet_count; /* get current packets per frame */
990 	switch_mutex_unlock(jb->mutex);
991 	return ppf;
992 }
993 
switch_jb_peek_frame(switch_jb_t * jb,uint32_t ts,uint16_t seq,int peek,switch_frame_t * frame)994 SWITCH_DECLARE(switch_status_t) switch_jb_peek_frame(switch_jb_t *jb, uint32_t ts, uint16_t seq, int peek, switch_frame_t *frame)
995 {
996 	switch_jb_node_t *node = NULL;
997 	if (seq) {
998 		uint16_t want_seq = seq + peek;
999 		node = switch_core_inthash_find(jb->node_hash, htons(want_seq));
1000 	} else if (ts && jb->samples_per_frame) {
1001 		uint32_t want_ts = ts + (peek * jb->samples_per_frame);
1002 		node = switch_core_inthash_find(jb->node_hash_ts, htonl(want_ts));
1003 	}
1004 
1005 	if (node) {
1006 		frame->seq = ntohs(node->packet.header.seq);
1007 		frame->timestamp = ntohl(node->packet.header.ts);
1008 		frame->m = node->packet.header.m;
1009 		frame->datalen = node->len - 12;
1010 
1011 		if (frame->data && frame->buflen > node->len - 12) {
1012 			memcpy(frame->data, node->packet.body, node->len - 12);
1013 		}
1014 		return SWITCH_STATUS_SUCCESS;
1015 	}
1016 
1017 	return SWITCH_STATUS_FALSE;
1018 }
1019 
switch_jb_get_frames(switch_jb_t * jb,uint32_t * min_frame_len,uint32_t * max_frame_len,uint32_t * cur_frame_len,uint32_t * highest_frame_len)1020 SWITCH_DECLARE(switch_status_t) switch_jb_get_frames(switch_jb_t *jb, uint32_t *min_frame_len, uint32_t *max_frame_len, uint32_t *cur_frame_len, uint32_t *highest_frame_len)
1021 {
1022 
1023 	switch_mutex_lock(jb->mutex);
1024 
1025 	if (min_frame_len) {
1026 		*min_frame_len = jb->min_frame_len;
1027 	}
1028 
1029 	if (max_frame_len) {
1030 		*max_frame_len = jb->max_frame_len;
1031 	}
1032 
1033 	if (cur_frame_len) {
1034 		*cur_frame_len = jb->frame_len;
1035 	}
1036 
1037 	switch_mutex_unlock(jb->mutex);
1038 
1039 	return SWITCH_STATUS_SUCCESS;
1040 }
1041 
switch_jb_set_frames(switch_jb_t * jb,uint32_t min_frame_len,uint32_t max_frame_len)1042 SWITCH_DECLARE(switch_status_t) switch_jb_set_frames(switch_jb_t *jb, uint32_t min_frame_len, uint32_t max_frame_len)
1043 {
1044 	int lowest = 0;
1045 
1046 	switch_mutex_lock(jb->mutex);
1047 
1048 	if (jb->frame_len == jb->min_frame_len) lowest = 1;
1049 
1050 	jb->min_frame_len = min_frame_len;
1051 	jb->max_frame_len = max_frame_len;
1052 
1053 	if (jb->frame_len > jb->max_frame_len) {
1054 		jb->frame_len = jb->max_frame_len;
1055 	}
1056 
1057 	if (jb->frame_len < jb->min_frame_len) {
1058 		jb->frame_len = jb->min_frame_len;
1059 	}
1060 
1061 	if (jb->frame_len > jb->highest_frame_len) {
1062 		jb->highest_frame_len = jb->frame_len;
1063 	}
1064 
1065 	if (lowest) {
1066 		jb->frame_len = jb->min_frame_len;
1067 	}
1068 
1069 	switch_mutex_unlock(jb->mutex);
1070 
1071 	return SWITCH_STATUS_SUCCESS;
1072 }
1073 
switch_jb_create(switch_jb_t ** jbp,switch_jb_type_t type,uint32_t min_frame_len,uint32_t max_frame_len,switch_memory_pool_t * pool)1074 SWITCH_DECLARE(switch_status_t) switch_jb_create(switch_jb_t **jbp, switch_jb_type_t type,
1075 												 uint32_t min_frame_len, uint32_t max_frame_len, switch_memory_pool_t *pool)
1076 {
1077 	switch_jb_t *jb;
1078 	int free_pool = 0;
1079 
1080 	if (!pool) {
1081 		switch_core_new_memory_pool(&pool);
1082 		free_pool = 1;
1083 	}
1084 
1085 	jb = switch_core_alloc(pool, sizeof(*jb));
1086 	jb->free_pool = free_pool;
1087 	jb->min_frame_len = jb->frame_len = min_frame_len;
1088 	jb->max_frame_len = max_frame_len;
1089 	jb->pool = pool;
1090 	jb->type = type;
1091 	jb->highest_frame_len = jb->frame_len;
1092 
1093 	if (jb->type == SJB_VIDEO) {
1094 		switch_core_inthash_init(&jb->missing_seq_hash);
1095 		jb->period_len = 2500;
1096 	} else {
1097 		jb->period_len = 250;
1098 	}
1099 
1100 	switch_core_inthash_init(&jb->node_hash);
1101 	switch_mutex_init(&jb->mutex, SWITCH_MUTEX_NESTED, pool);
1102 	switch_mutex_init(&jb->list_mutex, SWITCH_MUTEX_NESTED, pool);
1103 
1104 	*jbp = jb;
1105 
1106 	return SWITCH_STATUS_SUCCESS;
1107 }
1108 
switch_jb_destroy(switch_jb_t ** jbp)1109 SWITCH_DECLARE(switch_status_t) switch_jb_destroy(switch_jb_t **jbp)
1110 {
1111 	switch_jb_t *jb = *jbp;
1112 	*jbp = NULL;
1113 
1114 	if (jb->type == SJB_VIDEO && !switch_test_flag(jb, SJB_QUEUE_ONLY)) {
1115 		jb_debug(jb, 3, "Stats: NACK saved the day: %u\n", jb->nack_saved_the_day);
1116 		jb_debug(jb, 3, "Stats: NACK was late: %u\n", jb->nack_didnt_save_the_day);
1117 		jb_debug(jb, 3, "Stats: Hash entrycount: missing_seq_hash %u\n", switch_hashtable_count(jb->missing_seq_hash));
1118 	}
1119 	if (jb->type == SJB_VIDEO) {
1120 		switch_core_inthash_destroy(&jb->missing_seq_hash);
1121 	}
1122 	switch_core_inthash_destroy(&jb->node_hash);
1123 
1124 	if (jb->node_hash_ts) {
1125 		switch_core_inthash_destroy(&jb->node_hash_ts);
1126 	}
1127 
1128 	free_nodes(jb);
1129 
1130 	if (jb->free_pool) {
1131 		switch_core_destroy_memory_pool(&jb->pool);
1132 	}
1133 
1134 	return SWITCH_STATUS_SUCCESS;
1135 }
1136 
switch_jb_pop_nack(switch_jb_t * jb)1137 SWITCH_DECLARE(uint32_t) switch_jb_pop_nack(switch_jb_t *jb)
1138 {
1139 	switch_hash_index_t *hi = NULL;
1140 	uint32_t nack = 0;
1141 	uint16_t blp = 0;
1142 	uint16_t least = 0;
1143 	int i = 0;
1144 	void *val;
1145 	const void *var;
1146 
1147 	if (jb->type != SJB_VIDEO) {
1148 		return 0;
1149 	}
1150 
1151 	switch_mutex_lock(jb->mutex);
1152 
1153  top:
1154 
1155 	for (hi = switch_core_hash_first_iter(jb->missing_seq_hash, hi); hi; hi = switch_core_hash_next(&hi)) {
1156 		uint16_t seq;
1157 		//const char *token;
1158 		switch_time_t then = 0;
1159 
1160 		switch_core_hash_this(hi, &var, NULL, &val);
1161 		//token = (const char *) val;
1162 
1163 		//if (token == TOKEN_2) {
1164 			//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SKIP %u %s\n", ntohs(*((uint16_t *) var)), token);
1165 			//printf("WTf\n");
1166 		//	continue;
1167 		//}
1168 
1169 		seq = ntohs(*((uint16_t *) var));
1170 		then = (intptr_t) val;
1171 
1172 		if (then != 1 && ((uint32_t)(switch_time_now() - then)) < RENACK_TIME) {
1173 			jb_debug(jb, 3, "NACKABLE seq %u too soon to repeat\n", seq);
1174 			continue;
1175 		}
1176 
1177 		//if (then != 1) {
1178 		//	jb_debug(jb, 3, "NACKABLE seq %u not too soon to repeat %lu\n", seq, switch_time_now() - then);
1179 		//}
1180 
1181 		if (seq < ntohs(jb->target_seq) - jb->frame_len) {
1182 			jb_debug(jb, 3, "NACKABLE seq %u expired\n", seq);
1183 			switch_core_inthash_delete(jb->missing_seq_hash, (uint32_t)htons(seq));
1184 			goto top;
1185 		}
1186 
1187 		if (!least || seq < least) {
1188 			least = seq;
1189 		}
1190 	}
1191 
1192 	switch_safe_free(hi);
1193 
1194 	if (least && switch_core_inthash_delete(jb->missing_seq_hash, (uint32_t)htons(least))) {
1195 		jb_debug(jb, 3, "Found NACKABLE seq %u\n", least);
1196 		nack = (uint32_t) htons(least);
1197 		switch_core_inthash_insert(jb->missing_seq_hash, nack, (void *) (intptr_t)switch_time_now());
1198 
1199 		for(i = 0; i < 16; i++) {
1200 			if (switch_core_inthash_delete(jb->missing_seq_hash, (uint32_t)htons(least + i + 1))) {
1201 				switch_core_inthash_insert(jb->missing_seq_hash, (uint32_t)htons(least + i + 1), (void *)(intptr_t)switch_time_now());
1202 				jb_debug(jb, 3, "Found addtl NACKABLE seq %u\n", least + i + 1);
1203 				blp |= (1 << i);
1204 			}
1205 		}
1206 
1207 		blp = htons(blp);
1208 		nack |= (uint32_t) blp << 16;
1209 
1210 		//jb_frame_inc(jb, 1);
1211 	}
1212 
1213 	switch_mutex_unlock(jb->mutex);
1214 
1215 
1216 	return nack;
1217 }
1218 
switch_jb_put_packet(switch_jb_t * jb,switch_rtp_packet_t * packet,switch_size_t len)1219 SWITCH_DECLARE(switch_status_t) switch_jb_put_packet(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t len)
1220 {
1221 	uint32_t i;
1222 	uint16_t want = ntohs(jb->next_seq), got = ntohs(packet->header.seq);
1223 
1224 	if (len >= sizeof(switch_rtp_packet_t)) {
1225 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "trying to put %" SWITCH_SIZE_T_FMT " bytes exceeding buffer, truncate to %" SWITCH_SIZE_T_FMT "\n", len, sizeof(switch_rtp_packet_t));
1226 		len = sizeof(switch_rtp_packet_t);
1227 	}
1228 
1229 	switch_mutex_lock(jb->mutex);
1230 
1231 	if (jb->highest_dropped_ts) {
1232 		if (ntohl(packet->header.ts) < jb->highest_dropped_ts) {
1233 			jb_debug(jb, 2, "%s", "TS ALREADY DROPPED, DROPPING PACKET\n");
1234 			switch_mutex_unlock(jb->mutex);
1235 			return SWITCH_STATUS_SUCCESS;
1236 		}
1237 		jb->highest_dropped_ts = 0;
1238 	}
1239 
1240 
1241 	if (!want) want = got;
1242 
1243 	if (switch_test_flag(jb, SJB_QUEUE_ONLY) || jb->type == SJB_AUDIO || jb->type == SJB_TEXT) {
1244 		jb->next_seq = htons(got + 1);
1245 	} else {
1246 
1247 		if (switch_core_inthash_delete(jb->missing_seq_hash, (uint32_t)htons(got))) {
1248 			if (got < ntohs(jb->target_seq)) {
1249 				jb_debug(jb, 2, "got nacked seq %u too late\n", got);
1250 				jb_frame_inc(jb, 1);
1251 				jb->nack_didnt_save_the_day++;
1252 			} else {
1253 				jb_debug(jb, 2, "got nacked %u saved the day!\n", got);
1254 				jb->nack_saved_the_day++;
1255 			}
1256 		}
1257 
1258 		if (got > want) {
1259 			if (got - want > jb->max_frame_len && got - want > 17) {
1260 				jb_debug(jb, 2, "Missing %u frames, Resetting\n", got - want);
1261 				switch_jb_reset(jb);
1262 			} else {
1263 
1264 				if (jb->type != SJB_VIDEO && jb->frame_len < got - want) {
1265 					jb_frame_inc(jb, 1);
1266 				}
1267 
1268 				jb_debug(jb, 2, "GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1);
1269 
1270 				for (i = want; i < got; i++) {
1271 					jb_debug(jb, 2, "MARK MISSING %u ts:%u\n", i, ntohl(packet->header.ts));
1272 					switch_core_inthash_insert(jb->missing_seq_hash, (uint32_t)htons(i), (void *)(intptr_t)1);
1273 				}
1274 			}
1275 		}
1276 
1277 		if (got >= want || (want - got) > 1000) {
1278 			jb->next_seq = htons(got + 1);
1279 		}
1280 	}
1281 
1282 
1283 	add_node(jb, packet, len);
1284 
1285 	if (switch_test_flag(jb, SJB_QUEUE_ONLY) && jb->max_packet_len && jb->max_frame_len * 2 > jb->max_packet_len &&
1286 			jb->allocated_nodes > jb->max_frame_len * 2 - 1) {
1287 		while ((jb->max_frame_len * 2 - jb->visible_nodes) < jb->max_packet_len) {
1288 			drop_oldest_frame(jb);
1289 		}
1290 	} else if (switch_test_flag(jb, SJB_QUEUE_ONLY) && jb->max_packet_len && jb->max_frame_len * 2 < jb->max_packet_len) {
1291 		/* rtp_nack_buffer_size less than initial max_packet_len */
1292 		drop_oldest_frame(jb);
1293 	}
1294 
1295 	switch_mutex_unlock(jb->mutex);
1296 
1297 	return SWITCH_STATUS_SUCCESS;
1298 }
1299 
switch_jb_get_packet_by_seq(switch_jb_t * jb,uint16_t seq,switch_rtp_packet_t * packet,switch_size_t * len)1300 SWITCH_DECLARE(switch_status_t) switch_jb_get_packet_by_seq(switch_jb_t *jb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len)
1301 {
1302 	switch_jb_node_t *node;
1303 	switch_status_t status = SWITCH_STATUS_NOTFOUND;
1304 
1305 	switch_mutex_lock(jb->mutex);
1306 	if ((node = switch_core_inthash_find(jb->node_hash, seq))) {
1307 		jb_debug(jb, 2, "Found buffered seq: %u\n", ntohs(seq));
1308 		*packet = node->packet;
1309 		*len = node->len;
1310 		memcpy(packet->body, node->packet.body, node->len);
1311 		packet->header.version = 2;
1312 		status = SWITCH_STATUS_SUCCESS;
1313 	} else {
1314 		jb_debug(jb, 2, "Missing buffered seq: %u\n", ntohs(seq));
1315 	}
1316 	switch_mutex_unlock(jb->mutex);
1317 
1318 	return status;
1319 }
1320 
switch_jb_get_last_read_len(switch_jb_t * jb)1321 SWITCH_DECLARE(switch_size_t) switch_jb_get_last_read_len(switch_jb_t *jb)
1322 {
1323 	return jb->last_len;
1324 }
1325 
switch_jb_get_packet(switch_jb_t * jb,switch_rtp_packet_t * packet,switch_size_t * len)1326 SWITCH_DECLARE(switch_status_t) switch_jb_get_packet(switch_jb_t *jb, switch_rtp_packet_t *packet, switch_size_t *len)
1327 {
1328 	switch_jb_node_t *node = NULL;
1329 	switch_status_t status;
1330 	int plc = 0;
1331 
1332 	switch_mutex_lock(jb->mutex);
1333 
1334 	if (jb->complete_frames == 0) {
1335 		jb->flush = 0;
1336 		switch_goto_status(SWITCH_STATUS_BREAK, end);
1337 	}
1338 
1339 	if (jb->complete_frames < jb->frame_len) {
1340 
1341 		switch_jb_poll(jb);
1342 
1343 		if (!jb->flush) {
1344 			jb_debug(jb, 2, "BUFFERING %u/%u\n", jb->complete_frames , jb->frame_len);
1345 			switch_goto_status(SWITCH_STATUS_MORE_DATA, end);
1346 		}
1347 	}
1348 
1349 	jb_debug(jb, 2, "GET PACKET %u/%u n:%d\n", jb->complete_frames , jb->frame_len, jb->visible_nodes);
1350 
1351 	if (++jb->period_count >= jb->period_len) {
1352 
1353 		if (jb->consec_good_count >= (jb->period_len - 5)) {
1354 			jb_frame_inc(jb, -1);
1355 		}
1356 
1357 		jb->period_count = 1;
1358 		jb->period_miss_inc = 0;
1359 		jb->period_miss_count = 0;
1360 		jb->period_good_count = 0;
1361 		jb->consec_miss_count = 0;
1362 		jb->consec_good_count = 0;
1363 
1364 		if (jb->type == SJB_VIDEO && jb->channel && jb->video_low_bitrate) {
1365 			//switch_time_t now = switch_time_now();
1366 			//int ok = (now - jb->last_bitrate_change) > 10000;
1367 
1368 			if (switch_channel_test_flag(jb->channel, CF_VIDEO_BITRATE_UNMANAGABLE) && jb->frame_len == jb->min_frame_len) {
1369 				jb_debug(jb, 2, "%s", "Allow BITRATE changes\n");
1370 				switch_channel_clear_flag_recursive(jb->channel, CF_VIDEO_BITRATE_UNMANAGABLE);
1371 				jb->bitrate_control = 0;
1372 				if (jb->session) {
1373 					switch_core_session_request_video_refresh(jb->session);
1374 				}
1375 			} else if (!switch_channel_test_flag(jb->channel, CF_VIDEO_BITRATE_UNMANAGABLE) && jb->frame_len > jb->max_frame_len / 2) {
1376 				switch_core_session_message_t msg = { 0 };
1377 
1378 				jb->bitrate_control = jb->video_low_bitrate;
1379 
1380 				msg.message_id = SWITCH_MESSAGE_INDICATE_BITRATE_REQ;
1381 				msg.numeric_arg = jb->bitrate_control * 1024;
1382 				msg.from = __FILE__;
1383 
1384 				jb_debug(jb, 2, "Force BITRATE to %d\n", jb->bitrate_control);
1385 
1386 				switch_core_session_receive_message(jb->session, &msg);
1387 				switch_channel_set_flag_recursive(jb->channel, CF_VIDEO_BITRATE_UNMANAGABLE);
1388 				if (jb->session) {
1389 					switch_core_session_request_video_refresh(jb->session);
1390 				}
1391 			}
1392 		}
1393 	}
1394 
1395 
1396 	jb->period_miss_pct = ((double)jb->period_miss_count / jb->period_count) * 100;
1397 
1398 	//if (jb->period_miss_pct > 60.0f) {
1399 	//	jb_debug(jb, 2, "Miss percent %02f too high, resetting buffer.\n", jb->period_miss_pct);
1400 	//	switch_jb_reset(jb);
1401 	//}
1402 
1403 	if ((status = jb_next_packet(jb, &node)) == SWITCH_STATUS_SUCCESS) {
1404 		jb_debug(jb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(node->packet.header.ts), htons(node->packet.header.seq));
1405 
1406 		if (!jb->read_init || check_seq(node->packet.header.seq, jb->highest_read_seq)) {
1407 			jb->highest_read_seq = node->packet.header.seq;
1408 		}
1409 
1410 		if (jb->type != SJB_VIDEO ||
1411 			(jb->read_init && check_seq(node->packet.header.seq, jb->highest_read_seq) && check_ts(node->packet.header.ts, jb->highest_read_ts))) {
1412 
1413 			if (jb->type != SJB_VIDEO) {
1414 				jb->complete_frames--;
1415 			}
1416 			jb_debug(jb, 2, "READ frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), jb->complete_frames , jb->frame_len, jb->visible_nodes);
1417 			jb->highest_read_ts = node->packet.header.ts;
1418 		} else if (!jb->read_init) {
1419 			jb->highest_read_ts = node->packet.header.ts;
1420 		}
1421 
1422 		if (!jb->read_init) jb->read_init = 1;
1423 	} else {
1424 		if (jb->type == SJB_VIDEO) {
1425 			//switch_jb_reset(jb);
1426 
1427 			switch(status) {
1428 			case SWITCH_STATUS_RESTART:
1429 				jb_debug(jb, 2, "%s", "Error encountered ask for new keyframe\n");
1430 				switch_goto_status(SWITCH_STATUS_RESTART, end);
1431 			case SWITCH_STATUS_NOTFOUND:
1432 			default:
1433 				jb_debug(jb, 2, "%s", "No frames found wait for more\n");
1434 				switch_goto_status(SWITCH_STATUS_MORE_DATA, end);
1435 			}
1436 		} else {
1437 			switch(status) {
1438 			case SWITCH_STATUS_RESTART:
1439 				jb_debug(jb, 2, "%s", "Error encountered\n");
1440 				switch_jb_reset(jb);
1441 				switch_goto_status(SWITCH_STATUS_RESTART, end);
1442 			case SWITCH_STATUS_NOTFOUND:
1443 			default:
1444 				if (jb->consec_miss_count > jb->frame_len) {
1445 					//switch_jb_reset(jb);
1446 					jb_frame_inc(jb, 1);
1447 					jb_debug(jb, 2, "%s", "Too many frames not found, RESIZE\n");
1448 					switch_goto_status(SWITCH_STATUS_RESTART, end);
1449 				} else {
1450 					jb_debug(jb, 2, "%s", "Frame not found suggest PLC\n");
1451 					plc = 1;
1452 					switch_goto_status(SWITCH_STATUS_NOTFOUND, end);
1453 				}
1454 			}
1455 		}
1456 	}
1457 
1458 	if (node) {
1459 		status = SWITCH_STATUS_SUCCESS;
1460 
1461 		*packet = node->packet;
1462 		*len = node->len;
1463 		jb->last_len = *len;
1464 		memcpy(packet->body, node->packet.body, node->len);
1465 		packet->header.version = 2;
1466 		hide_node(node, SWITCH_TRUE);
1467 
1468 		jb_debug(jb, 2, "GET packet ts:%u seq:%u %s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), packet->header.m ? " <MARK>" : "");
1469 
1470 	} else {
1471 		status = SWITCH_STATUS_MORE_DATA;
1472 	}
1473 
1474  end:
1475 
1476 	if (plc) {
1477 		uint16_t seq;
1478 		uint32_t ts = 0;
1479 
1480 		if (jb->samples_per_frame) {
1481 			seq = htons(jb->last_psuedo_seq);
1482 			ts = jb->last_target_ts;
1483 		} else {
1484 			seq = jb->last_target_seq;
1485 		}
1486 
1487 		packet->header.seq = seq;
1488 		packet->header.ts = ts;
1489 	}
1490 
1491 	switch_mutex_unlock(jb->mutex);
1492 
1493 	if (jb->type == SJB_VIDEO) {
1494 		if (jb->complete_frames > jb->max_frame_len * 2) {
1495 			jb_debug(jb, 2, "JB TOO BIG (%d), RESET\n", jb->complete_frames);
1496 			switch_jb_reset(jb);
1497 		}
1498 	} else {
1499 		int too_big = (int)(jb->max_frame_len * 1.5);
1500 		if (jb->visible_nodes > too_big && status == SWITCH_STATUS_SUCCESS) {
1501 			status = SWITCH_STATUS_TIMEOUT;
1502 		}
1503 	}
1504 
1505 	return status;
1506 }
1507 
1508 
1509 /* For Emacs:
1510  * Local Variables:
1511  * mode:c
1512  * indent-tabs-mode:t
1513  * tab-width:4
1514  * c-basic-offset:4
1515  * End:
1516  * For VIM:
1517  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
1518  */
1519