1 /*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA
15 *
16 * Author : Deon van der Westhuysen - June 2012 - Vodacom PTY LTD
17 */
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22
23 #include "sipp.hpp"
24 #include <unistd.h>
25 #include <stdint.h>
26 #include <fcntl.h>
27 #include <sys/socket.h>
28 #include <pthread.h>
29 #include "rtpstream.hpp"
30
31 /* stub to add extra debugging/logging... */
debugprint(const char * Format,...)32 static void debugprint(const char *Format, ...)
33 {
34 }
35
36 #define RTPSTREAM_FILESPERBLOCK 16
37 #define BIND_MAX_TRIES 100
38 #define RTPSTREAM_THREADBLOCKSIZE 16
39 #define MAX_UDP_RECV_BUFFER 8192
40
41 #define TI_NULL_AUDIOIP 0x01
42 #define TI_NULL_VIDEOIP 0x02
43 #define TI_NULLIP (TI_NULL_AUDIOIP|TI_NULL_VIDEOIP)
44 #define TI_PAUSERTP 0x04
45 #define TI_ECHORTP 0x08 /* Not currently implemented */
46 #define TI_KILLTASK 0x10
47 #define TI_RECONNECTSOCKET 0x20
48 #define TI_PLAYFILE 0x40
49 #define TI_CONFIGFLAGS (TI_KILLTASK|TI_RECONNECTSOCKET|TI_PLAYFILE)
50
51 struct rtp_header_t
52 {
53 uint16_t flags;
54 uint16_t seq;
55 uint32_t timestamp;
56 uint32_t ssrc_id;
57 };
58
59 struct taskentry_t
60 {
61 threaddata_t *parent_thread;
62 unsigned long nextwake_ms;
63 volatile int flags;
64
65 /* rtp stream information */
66 unsigned long long last_timestamp;
67 unsigned short seq;
68 char payload_type;
69 unsigned int ssrc_id;
70
71 /* current playback information */
72 int loop_count;
73 char *file_bytes_start;
74 char *current_file_bytes;
75 int file_num_bytes;
76 int file_bytes_left;
77 /* playback timing information */
78 int ms_per_packet;
79 int bytes_per_packet;
80 int timeticks_per_packet;
81 int timeticks_per_ms;
82
83 /* new file playback information */
84 char new_payload_type;
85 int new_loop_count;
86 int new_file_size;
87 char *new_file_bytes;
88 int new_ms_per_packet;
89 int new_bytes_per_packet;
90 int new_timeticks_per_packet;
91 /* sockets for audio/video rtp_rtcp */
92 int audio_rtp_socket;
93 int audio_rtcp_socket;
94 int video_rtp_socket;
95 int video_rtcp_socket;
96
97 /* rtp peer address structures */
98 struct sockaddr_storage remote_audio_rtp_addr;
99 struct sockaddr_storage remote_audio_rtcp_addr;
100 struct sockaddr_storage remote_video_rtp_addr;
101 struct sockaddr_storage remote_video_rtcp_addr;
102
103 /* we will have a mutex per call. should we consider refactoring to */
104 /* share mutexes across calls? makes the per-call code more complex */
105
106 /* thread mananagment structures */
107 pthread_mutex_t mutex;
108 };
109
110 struct threaddata_t
111 {
112 pthread_mutex_t tasklist_mutex;
113 int busy_list_index;
114 int max_tasks;
115 volatile int num_tasks;
116 volatile int del_pending;
117 volatile int exit_flag;
118 taskentry_t *tasklist;
119 };
120
121 struct cached_file_t
122 {
123 char filename[RTPSTREAM_MAX_FILENAMELEN];
124 char *bytes;
125 int filesize;
126 };
127
128 cached_file_t *cached_files= NULL;
129 int num_cached_files= 0;
130 int next_rtp_port= 0;
131
132 threaddata_t **ready_threads= NULL;
133 threaddata_t **busy_threads= NULL;
134 int num_busy_threads= 0;
135 int num_ready_threads= 0;
136 int busy_threads_max= 0;
137 int ready_threads_max= 0;
138
139 unsigned int global_ssrc_id= 0xCA110000;
140
141 //===================================================================================================
142
143 /* code checked */
rtpstream_free_taskinfo(taskentry_t * taskinfo)144 static void rtpstream_free_taskinfo(taskentry_t* taskinfo)
145 {
146 if (taskinfo) {
147 /* close sockets associated with this call */
148 if (taskinfo->audio_rtp_socket!=-1) {
149 close (taskinfo->audio_rtp_socket);
150 }
151 if (taskinfo->audio_rtcp_socket!=-1) {
152 close (taskinfo->audio_rtcp_socket);
153 }
154 if (taskinfo->video_rtp_socket!=-1) {
155 close (taskinfo->video_rtp_socket);
156 }
157 if (taskinfo->video_rtcp_socket!=-1) {
158 close (taskinfo->video_rtcp_socket);
159 }
160
161 /* cleanup pthread library structure */
162 pthread_mutex_destroy(&(taskinfo->mutex));
163
164 free (taskinfo);
165 }
166 }
167
168 /* code checked */
rtpstream_process_task_flags(taskentry_t * taskinfo)169 static void rtpstream_process_task_flags(taskentry_t* taskinfo)
170 {
171 if (taskinfo->flags&TI_RECONNECTSOCKET) {
172 int remote_addr_len;
173 int rc;
174
175 remote_addr_len = media_ip_is_ipv6 ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
176
177 /* enter critical section to lock address updates */
178 /* may want to leave this out -- low chance of race condition */
179 pthread_mutex_lock (&(taskinfo->mutex));
180
181 /* If we have valid ip and port numbers for audio rtp stream */
182 if (!(taskinfo->flags & TI_NULL_AUDIOIP))
183 {
184 if (taskinfo->audio_rtcp_socket != -1) {
185 rc = connect(taskinfo->audio_rtcp_socket, (struct sockaddr *)&taskinfo->remote_audio_rtcp_addr,
186 remote_addr_len);
187 if (rc < 0) {
188 debugprint("closing audio rtcp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
189 taskinfo->audio_rtcp_socket, errno, taskinfo);
190 close(taskinfo->audio_rtcp_socket);
191 taskinfo->audio_rtcp_socket = -1;
192 }
193 }
194
195 if (taskinfo->audio_rtp_socket != -1) {
196 rc = connect(taskinfo->audio_rtp_socket, (struct sockaddr *)&taskinfo->remote_audio_rtp_addr, remote_addr_len);
197 if (rc < 0) {
198 debugprint("closing audio rtp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
199 taskinfo->audio_rtp_socket, errno, taskinfo);
200 close(taskinfo->audio_rtp_socket);
201 taskinfo->audio_rtp_socket = -1;
202 }
203 }
204 }
205
206 /* If we have valid ip and port numbers for video rtp stream */
207 if (!(taskinfo->flags & TI_NULL_VIDEOIP))
208 {
209 if (taskinfo->video_rtcp_socket != -1) {
210 rc = connect(taskinfo->video_rtcp_socket, (struct sockaddr *)&taskinfo->remote_video_rtcp_addr,
211 remote_addr_len);
212 if (rc < 0) {
213 debugprint("closing video rtcp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
214 taskinfo->video_rtcp_socket, errno, taskinfo);
215 close(taskinfo->video_rtcp_socket);
216 taskinfo->video_rtcp_socket = -1;
217 }
218 }
219 if (taskinfo->video_rtp_socket != -1) {
220 rc = connect(taskinfo->video_rtp_socket, (struct sockaddr *)&taskinfo->remote_video_rtp_addr,
221 remote_addr_len);
222 if (rc < 0) {
223 debugprint("closing video rtp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
224 taskinfo->video_rtp_socket, errno, taskinfo);
225 close(taskinfo->video_rtp_socket);
226 taskinfo->video_rtp_socket = -1;
227 }
228 }
229 }
230
231 taskinfo->flags&= ~TI_RECONNECTSOCKET;
232 pthread_mutex_unlock (&(taskinfo->mutex));
233 }
234 if (taskinfo->flags&TI_PLAYFILE) {
235 /* copy playback information */
236 taskinfo->loop_count= taskinfo->new_loop_count;
237 taskinfo->file_bytes_start= taskinfo->new_file_bytes;
238 taskinfo->current_file_bytes= taskinfo->new_file_bytes;
239 taskinfo->file_num_bytes= taskinfo->new_file_size;
240 taskinfo->file_bytes_left= taskinfo->new_file_size;
241 taskinfo->payload_type= taskinfo->new_payload_type;
242
243 taskinfo->ms_per_packet= taskinfo->new_ms_per_packet;
244 taskinfo->bytes_per_packet= taskinfo->new_bytes_per_packet;
245 taskinfo->timeticks_per_packet= taskinfo->new_timeticks_per_packet;
246 taskinfo->timeticks_per_ms= taskinfo->timeticks_per_packet/taskinfo->ms_per_packet;
247
248 taskinfo->last_timestamp= getmilliseconds()*taskinfo->timeticks_per_ms;
249 taskinfo->flags&= ~TI_PLAYFILE;
250 }
251 }
252
253 /**** todo - check code ****/
rtpstream_playrtptask(taskentry_t * taskinfo,unsigned long timenow_ms)254 static unsigned long rtpstream_playrtptask(taskentry_t *taskinfo, unsigned long timenow_ms)
255 {
256 int rc;
257 unsigned long next_wake;
258 unsigned long long target_timestamp;
259
260 union {
261 rtp_header_t hdr;
262 char buffer[MAX_UDP_RECV_BUFFER];
263 } udp;
264
265 /* OK, now to play - sockets are supposed to be non-blocking */
266 /* no support for video stream at this stage. will need some work */
267
268 next_wake = timenow_ms + 100; /* default next wakeup time */
269
270 if (taskinfo->audio_rtcp_socket != -1) {
271 /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
272 while ((rc = recv(taskinfo->audio_rtcp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
273 /*
274 * rtpstream_bytes_in+= rc;
275 */
276 }
277 }
278
279 if (taskinfo->video_rtp_socket != -1) {
280 /* just keep listening on rtp socket (is this really required?) - ignore any errors */
281 while ((rc = recv(taskinfo->video_rtp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
282 /*
283 * rtpstream_bytes_in += rc;
284 */
285 }
286 }
287
288 if (taskinfo->video_rtcp_socket != -1) {
289 /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
290 while ((rc = recv(taskinfo->video_rtcp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
291 /*
292 * rtpstream_bytes_in+= rc;
293 */
294 }
295 }
296
297 if (taskinfo->audio_rtp_socket != -1) {
298 /* this is temp code - will have to reorganize if/when we include echo functionality */
299 /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
300 while ((rc = recv(taskinfo->audio_rtp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
301 /* for now we will just ignore any received data or receive errors */
302 /* separate code path for RTP echo */
303 rtpstream_bytes_in += rc;
304 }
305 /* are we playing back an audio file? */
306 if (taskinfo->loop_count) {
307 target_timestamp = timenow_ms * taskinfo->timeticks_per_ms;
308 next_wake = timenow_ms + taskinfo->ms_per_packet - timenow_ms % taskinfo->ms_per_packet;
309 if (taskinfo->flags & (TI_NULL_AUDIOIP|TI_PAUSERTP)) {
310 /* when paused, set timestamp so stream appears to be up to date */
311 taskinfo->last_timestamp = target_timestamp;
312 }
313 if (taskinfo->last_timestamp < target_timestamp) {
314 /* need to send rtp payload - build rtp packet header... */
315 udp.hdr.flags = htons(0x8000 | taskinfo->payload_type);
316 udp.hdr.seq = htons(taskinfo->seq);
317 udp.hdr.timestamp = htonl((uint32_t)(taskinfo->last_timestamp & 0xFFFFFFFF));
318 udp.hdr.ssrc_id = htonl(taskinfo->ssrc_id);
319 /* add payload data to the packet - handle buffer wraparound */
320 if (taskinfo->file_bytes_left >= taskinfo->bytes_per_packet) {
321 /* no need for fancy acrobatics */
322 memcpy(udp.buffer + sizeof(rtp_header_t), taskinfo->current_file_bytes, taskinfo->bytes_per_packet);
323 } else {
324 /* copy from end and then begining of file. does not handle the */
325 /* case where file is shorter than the packet length!! */
326 memcpy(udp.buffer + sizeof(rtp_header_t), taskinfo->current_file_bytes, taskinfo->file_bytes_left);
327 memcpy(udp.buffer + sizeof(rtp_header_t) + taskinfo->file_bytes_left,
328 taskinfo->file_bytes_start, taskinfo->bytes_per_packet-taskinfo->file_bytes_left);
329 }
330 /* now send the actual packet */
331 rc = send(taskinfo->audio_rtp_socket, udp.buffer, taskinfo->bytes_per_packet + sizeof(rtp_header_t), 0);
332 if (rc < 0) {
333 /* handle sending errors */
334 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
335 next_wake= timenow_ms + 2; /* retry after short sleep */
336 } else {
337 /* this looks like a permanent error - should we ignore ENETUNREACH? */
338 debugprint("closing rtp socket %d due to error %d in rtpstream_new_call taskinfo=%p\n",
339 taskinfo->audio_rtp_socket, errno, taskinfo);
340 close(taskinfo->audio_rtp_socket);
341 taskinfo->audio_rtp_socket= -1;
342 }
343 } else {
344 /* statistics - only count successful sends */
345 rtpstream_bytes_out += taskinfo->bytes_per_packet + sizeof(rtp_header_t);
346 rtpstream_pckts++;
347 /* advance playback pointer to next packet */
348 taskinfo->seq++;
349 /* must change if timer ticks per packet can be fractional */
350 taskinfo->last_timestamp += taskinfo->timeticks_per_packet;
351 taskinfo->file_bytes_left -= taskinfo->bytes_per_packet;
352 if (taskinfo->file_bytes_left > 0) {
353 taskinfo->current_file_bytes += taskinfo->bytes_per_packet;
354 } else {
355 taskinfo->current_file_bytes = taskinfo->file_bytes_start-taskinfo->file_bytes_left;
356 taskinfo->file_bytes_left += taskinfo->file_num_bytes;
357 if (taskinfo->loop_count > 0) {
358 /* one less loop to play. -1 (infinite loops) will stay as is */
359 taskinfo->loop_count--;
360 }
361 }
362 if (taskinfo->last_timestamp < target_timestamp) {
363 /* no sleep if we are behind */
364 next_wake= timenow_ms;
365 }
366 }
367 }
368 } else {
369 /* not busy playing back a file - put possible rtp echo code here. */
370 }
371 }
372
373 return next_wake;
374 }
375
376
377
378 /*********************************************************************************/
379 /*********************************************************************************/
380 /*********************************************************************************/
381
382
383 /* code checked */
rtpstream_playback_thread(void * params)384 static void* rtpstream_playback_thread(void* params)
385 {
386 threaddata_t *threaddata= (threaddata_t *) params;
387 taskentry_t *taskinfo;
388 int taskindex;
389
390 unsigned long timenow_ms;
391 unsigned long waketime_ms;
392 int sleeptime_us;
393
394 rtpstream_numthreads++; /* perhaps wrap this in a mutex? */
395
396 while (!threaddata->exit_flag) {
397 timenow_ms= getmilliseconds();
398 waketime_ms= timenow_ms+ 100; /* default sleep 100ms */
399
400 /* iterate through tasks and handle playback and other actions */
401 for (taskindex=0;taskindex<threaddata->num_tasks;taskindex++) {
402 taskinfo= (&threaddata->tasklist)[taskindex];
403 if (taskinfo->flags&TI_CONFIGFLAGS) {
404 if (taskinfo->flags&TI_KILLTASK) {
405 /* remove this task entry and release its resources */
406 pthread_mutex_lock (&(threaddata->tasklist_mutex));
407 (&threaddata->tasklist)[taskindex--]= (&threaddata->tasklist)[--threaddata->num_tasks];
408 threaddata->del_pending--; /* must decrease del_pending after num_tasks */
409 pthread_mutex_unlock (&(threaddata->tasklist_mutex));
410 rtpstream_free_taskinfo (taskinfo);
411 continue;
412 }
413 /* handle any other config related flags */
414 rtpstream_process_task_flags (taskinfo);
415 }
416
417 /* should we update current time inbetween tasks? */
418 if (taskinfo->nextwake_ms<=timenow_ms) {
419 /* task needs to execute now */
420 taskinfo->nextwake_ms= rtpstream_playrtptask (taskinfo,timenow_ms);
421 }
422 if (waketime_ms>taskinfo->nextwake_ms) {
423 waketime_ms= taskinfo->nextwake_ms;
424 }
425 }
426 /* sleep until next iteration of playback loop */
427 sleeptime_us= (waketime_ms-getmilliseconds())*1000;
428 if (sleeptime_us>0) {
429 usleep (sleeptime_us);
430 }
431 }
432
433 /* Free all task and thread resources and exit the thread */
434 for (taskindex=0;taskindex<threaddata->num_tasks;taskindex++) {
435 /* check if we should delete this thread, else let owner call clear it */
436 /* small chance of race condition in this code */
437 taskinfo= (&threaddata->tasklist)[taskindex];
438 if (taskinfo->flags&TI_KILLTASK) {
439 rtpstream_free_taskinfo (taskinfo);
440 } else {
441 taskinfo->parent_thread= NULL; /* no longer associated with a thread */
442 }
443 }
444 pthread_mutex_destroy(&(threaddata->tasklist_mutex));
445 free (threaddata);
446 rtpstream_numthreads--; /* perhaps wrap this in a mutex? */
447
448 return NULL;
449 }
450
451 /* code checked */
rtpstream_start_task(rtpstream_callinfo_t * callinfo)452 static int rtpstream_start_task (rtpstream_callinfo_t *callinfo)
453 {
454 int ready_index;
455 int allocsize;
456 threaddata_t **threadlist;
457 threaddata_t *threaddata;
458 pthread_t newthread;
459
460 /* safety check... */
461 if (!callinfo->taskinfo) {
462 return 0;
463 }
464
465 /* we count on the fact that only one thread can add/remove playback tasks */
466 /* thus we don't have mutexes to protect the thread list objects. */
467 for (ready_index=0;ready_index<num_ready_threads;ready_index++) {
468 /* ready threads have a spare task slot or should have one very shortly */
469 /* if we find a task with no spare slots, just skip to the next one. */
470 if (ready_threads[ready_index]->num_tasks<ready_threads[ready_index]->max_tasks) {
471 /* we found a thread with an open task slot. */
472 break;
473 }
474 }
475
476 if (ready_index==num_ready_threads) {
477 /* did not find a thread with spare task slots, thus we create one here */
478 if (num_ready_threads>=ready_threads_max) {
479 /* need to allocate more memory for thread list */
480 ready_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
481 threadlist= (threaddata_t **) realloc (ready_threads,sizeof(*ready_threads)*ready_threads_max);
482 if (!threadlist) {
483 /* could not allocate bigger block... worry [about it later] */
484 ready_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
485 return 0;
486 }
487 ready_threads= threadlist;
488 }
489 /* create and initialise data structure for new thread */
490 allocsize= sizeof(*threaddata)+sizeof(threaddata->tasklist)*(rtp_tasks_per_thread-1);
491 threaddata= (threaddata_t *) malloc (allocsize);
492 if (!threaddata) {
493 return 0;
494 }
495 memset (threaddata,0,allocsize);
496 threaddata->max_tasks= rtp_tasks_per_thread;
497 threaddata->busy_list_index= -1;
498 pthread_mutex_init(&(threaddata->tasklist_mutex),NULL);
499 /* create the thread itself */
500 if (pthread_create(&newthread,NULL,rtpstream_playback_thread,threaddata)) {
501 /* error creating the thread */
502 free (threaddata);
503 return 0;
504 }
505 /* Add thread to list of ready (spare capacity) threads */
506 ready_threads[num_ready_threads++]= threaddata;
507 }
508
509 /* now add new task to a spare slot in our thread tasklist */
510 threaddata= ready_threads[ready_index];
511 callinfo->taskinfo->parent_thread= threaddata;
512 pthread_mutex_lock (&(threaddata->tasklist_mutex));
513 (&threaddata->tasklist)[threaddata->num_tasks++]= callinfo->taskinfo;
514 pthread_mutex_unlock (&(threaddata->tasklist_mutex));
515
516 /* this check relies on playback thread to decrement num_tasks before */
517 /* decrementing del_pending -- else we need to lock before this test */
518 if ((threaddata->del_pending==0)&&(threaddata->num_tasks>=threaddata->max_tasks)) {
519 /* move this thread to the busy list - no free task slots */
520 /* first check if the busy list is big enough to hold new thread */
521 if (num_busy_threads>=busy_threads_max) {
522 /* need to allocate more memory for thread list */
523 busy_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
524 threadlist= (threaddata_t **) realloc (busy_threads,sizeof(*busy_threads)*busy_threads_max);
525 if (!threadlist) {
526 /* could not allocate bigger block... leave thread in ready list */
527 busy_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
528 return 1; /* success, sort of */
529 }
530 busy_threads= threadlist;
531 }
532 /* add to busy list */
533 threaddata->busy_list_index= num_busy_threads;
534 busy_threads[num_busy_threads++]= threaddata;
535 /* remove from ready list */
536 ready_threads[ready_index]= ready_threads[--num_ready_threads];
537 }
538
539 return 1; /* done! */
540 }
541
542 /* code checked */
rtpstream_stop_task(rtpstream_callinfo_t * callinfo)543 static void rtpstream_stop_task(rtpstream_callinfo_t* callinfo)
544 {
545 threaddata_t **threadlist;
546 taskentry_t *taskinfo= callinfo->taskinfo;
547 int busy_index;
548
549 if (taskinfo) {
550 if (taskinfo->parent_thread) {
551 /* this call's task is registered with an executing thread */
552 /* first move owning thread to the ready list - will be ready soon */
553 busy_index= taskinfo->parent_thread->busy_list_index;
554 if (busy_index>=0) {
555 /* make sure we have enough entries in ready list */
556 if (num_ready_threads>=ready_threads_max) {
557 /* need to allocate more memory for thread list */
558 ready_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
559 threadlist= (threaddata_t **) realloc (ready_threads,sizeof(*ready_threads)*ready_threads_max);
560 if (!threadlist) {
561 /* could not allocate bigger block... reset max threads */
562 /* this is a problem - ready thread gets "lost" on busy list */
563 ready_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
564 } else {
565 ready_threads= threadlist;
566 }
567 }
568
569 if (num_ready_threads<ready_threads_max) {
570 /* OK, got space on ready list, move to ready list */
571 busy_threads[busy_index]->busy_list_index= -1;
572 ready_threads[num_ready_threads++]= busy_threads[busy_index];
573 num_busy_threads--;
574 /* fill up gap in the busy thread list */
575 if (busy_index!=num_busy_threads) {
576 busy_threads[busy_index]= busy_threads[num_busy_threads];
577 busy_threads[busy_index]->busy_list_index= busy_index;
578 }
579 }
580 }
581 /* then ask the thread to destory this task (and its memory) */
582 pthread_mutex_lock (&(taskinfo->parent_thread->tasklist_mutex));
583 taskinfo->parent_thread->del_pending++;
584 taskinfo->flags|= TI_KILLTASK;
585 pthread_mutex_unlock (&(taskinfo->parent_thread->tasklist_mutex));
586 } else {
587 /* no playback thread owner, just free it */
588 rtpstream_free_taskinfo (taskinfo);
589 }
590 callinfo->taskinfo= NULL;
591 }
592 }
593
594 /* code checked */
rtpstream_new_call(rtpstream_callinfo_t * callinfo)595 int rtpstream_new_call (rtpstream_callinfo_t *callinfo)
596 {
597 debugprint ("rtpstream_new_call callinfo=%p\n",callinfo);
598
599 taskentry_t *taskinfo;
600
601 /* general init */
602 memset (callinfo,0,sizeof(*callinfo));
603
604 taskinfo= (taskentry_t *) malloc (sizeof(*taskinfo));
605 if (!taskinfo) {
606 /* cannot allocate taskinfo memory - bubble error up */
607 return 0;
608 }
609 callinfo->taskinfo= taskinfo;
610
611 memset (taskinfo,0,sizeof(*taskinfo));
612 taskinfo->flags= TI_NULLIP;
613 /* socket descriptors */
614 taskinfo->audio_rtp_socket= -1;
615 taskinfo->audio_rtcp_socket= -1;
616 taskinfo->video_rtp_socket= -1;
617 taskinfo->video_rtcp_socket= -1;
618 /* rtp stream members */
619 taskinfo->ssrc_id= global_ssrc_id++;
620 /* pthread mutexes */
621 pthread_mutex_init(&(callinfo->taskinfo->mutex),NULL);
622
623 return 1;
624 }
625
626 /* code checked */
rtpstream_end_call(rtpstream_callinfo_t * callinfo)627 void rtpstream_end_call (rtpstream_callinfo_t *callinfo)
628 {
629 debugprint ("rtpstream_end_call callinfo=%p\n",callinfo);
630
631 /* stop playback thread(s) for this call */
632 rtpstream_stop_task (callinfo);
633 }
634
635 /* code checked */
rtpstream_cache_file(char * filename)636 int rtpstream_cache_file (char *filename)
637 {
638 int count= 0;
639 cached_file_t *newcachelist;
640 char *filecontents;
641 struct stat statbuffer;
642 FILE *f;
643
644 debugprint ("rtpstream_cache_file filename=%s\n",filename);
645
646 /* cached file entries are stored in a dynamically grown array. */
647 /* could use a binary (or avl) tree but number of files should */
648 /* be small and doesn't really justify the effort. */
649 while (count<num_cached_files) {
650 if (!strcmp(cached_files[count].filename,filename)) {
651 /* found the file already loaded. just return index */
652 return count;
653 }
654 count++;
655 }
656
657 /* Allocate memory and load file */
658 if (stat(filename,&statbuffer)) {
659 /* could not get file information */
660 return -1;
661 }
662 f= fopen(filename,"rb");
663 if (!f) {
664 /* could not open file */
665 return -1;
666 }
667
668 filecontents= (char *)malloc (statbuffer.st_size);
669 if (!filecontents) {
670 /* could not alloc mem */
671 return -1;
672 }
673 if (!fread (filecontents,statbuffer.st_size,1,f)) {
674 /* could not read file */
675 free (filecontents);
676 return -1;
677 }
678 fclose (f);
679
680 if (!(num_cached_files%RTPSTREAM_FILESPERBLOCK)) {
681 /* Time to allocate more memory for the next block of files */
682 newcachelist= (cached_file_t*) realloc(cached_files,sizeof(*cached_files)*(num_cached_files+RTPSTREAM_FILESPERBLOCK));
683 if (!newcachelist) {
684 /* out of memory */
685 free (filecontents);
686 return -1;
687 }
688 cached_files= newcachelist;
689 }
690 cached_files[num_cached_files].bytes= filecontents;
691 strncpy(cached_files[num_cached_files].filename,filename,
692 sizeof(cached_files[num_cached_files].filename) - 1);
693 cached_files[num_cached_files].filesize=statbuffer.st_size;
694 return num_cached_files++;
695 }
696
rtpstream_setsocketoptions(int sock)697 static int rtpstream_setsocketoptions (int sock)
698 {
699 /* set socket non-blocking */
700 int flags= fcntl(sock,F_GETFL,0);
701 if (fcntl(sock,F_SETFL,flags|O_NONBLOCK)==-1) {
702 return 0;
703 }
704
705 /* set buffer size */
706 unsigned int buffsize= rtp_buffsize;
707
708 /* Increase buffer sizes for this sockets */
709 if(setsockopt(sock,SOL_SOCKET,SO_SNDBUF,(char*)&buffsize,sizeof(buffsize))) {
710 return 0;
711 }
712 if(setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&buffsize,sizeof(buffsize))) {
713 return 0;
714 }
715
716 return 1; /* success */
717 }
718
719 /* code checked */
rtpstream_get_localport(int * rtpsocket,int * rtcpsocket)720 static int rtpstream_get_localport (int *rtpsocket, int *rtcpsocket)
721 {
722 int port_number;
723 int tries;
724 struct sockaddr_storage address;
725
726 debugprint ("rtpstream_get_localport\n");
727
728 if (next_rtp_port<min_rtp_port) {
729 /* initialise RTP port number counter */
730 next_rtp_port= min_rtp_port;
731 }
732
733 /* initialise address family and IP address for media socket */
734 memset(&address,0,sizeof(address));
735 address.ss_family= media_ip_is_ipv6?AF_INET6:AF_INET;
736 if ((media_ip_is_ipv6?
737 inet_pton(AF_INET6,media_ip,&((_RCAST(struct sockaddr_in6 *,&address))->sin6_addr)):
738 inet_pton(AF_INET,media_ip,&((_RCAST(struct sockaddr_in *,&address))->sin_addr)))!=1) {
739 WARNING("Could not set up media IP for RTP streaming");
740 return 0;
741 }
742
743 /* create new UDP listen socket */
744 *rtpsocket= socket(media_ip_is_ipv6?PF_INET6:PF_INET,SOCK_DGRAM,0);
745 if (*rtpsocket==-1) {
746 WARNING("Could not open socket for RTP streaming: %s", strerror(errno));
747 return 0;
748 }
749
750 for (tries=0;tries<BIND_MAX_TRIES;tries++) {
751 /* try a sequence of port numbers until we find one where we can bind */
752 /* should normally be the first port we try, unless we have long-running */
753 /* calls or somebody else is nicking ports. */
754
755 port_number= next_rtp_port;
756 /* skip rtp ports in multples of 2 (allow for rtp plus rtcp) */
757 next_rtp_port+= 2;
758 if (next_rtp_port>(max_rtp_port-1)) {
759 next_rtp_port= min_rtp_port;
760 }
761
762 if (media_ip_is_ipv6) {
763 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port =
764 htons((short)port_number);
765 } else {
766 (_RCAST(struct sockaddr_in *,&address))->sin_port=
767 htons((short)port_number);
768 }
769 if (::bind(*rtpsocket,(sockaddr *)(void *)&address,
770 SOCK_ADDR_SIZE(&address)) == 0) {
771 break;
772 }
773 }
774 /* Exit here if we didn't get a suitable port for rtp stream */
775 if (tries==BIND_MAX_TRIES) {
776 close (*rtpsocket);
777 *rtpsocket= -1;
778 WARNING("Could not bind port for RTP streaming after %d tries", tries);
779 return 0;
780 }
781
782 if (!rtpstream_setsocketoptions (*rtpsocket)) {
783 close (*rtpsocket);
784 *rtpsocket= -1;
785 WARNING("Could not set socket options for RTP streaming");
786 return 0;
787 }
788
789 /* create socket for rtcp - ignore any errors */
790 *rtcpsocket= socket(media_ip_is_ipv6?PF_INET6:PF_INET,SOCK_DGRAM,0);
791 if (*rtcpsocket!=-1) {
792 /* try to bind it to our preferred address */
793 if (media_ip_is_ipv6) {
794 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port =
795 htons((short)port_number+1);
796 } else {
797 (_RCAST(struct sockaddr_in *,&address))->sin_port=
798 htons((short)port_number+1);
799 }
800 if (::bind(*rtcpsocket,(sockaddr *)(void *)&address,
801 SOCK_ADDR_SIZE(&address))) {
802 /* could not bind the rtcp socket to required port. so we delete it */
803 close (*rtcpsocket);
804 *rtcpsocket= -1;
805 }
806 if (!rtpstream_setsocketoptions (*rtcpsocket)) {
807 close (*rtcpsocket);
808 *rtcpsocket= -1;
809 }
810 }
811
812 return port_number;
813 }
814
815 /* code checked */
rtpstream_get_audioport(rtpstream_callinfo_t * callinfo)816 int rtpstream_get_audioport (rtpstream_callinfo_t *callinfo)
817 {
818 debugprint ("rtpstream_get_audioport callinfo=%p",callinfo);
819
820 int rtp_socket;
821 int rtcp_socket;
822
823 if (!callinfo->taskinfo) {
824 return 0;
825 }
826
827 if (callinfo->audioport) {
828 /* already a port assigned to this call */
829 debugprint (" ==> %d\n",callinfo->audioport);
830 return callinfo->audioport;
831 }
832
833 callinfo->audioport= rtpstream_get_localport (&rtp_socket,&rtcp_socket);
834 debugprint (" ==> %d\n",callinfo->audioport);
835
836 /* assign rtp and rtcp sockets to callinfo. must assign rtcp socket first */
837 callinfo->taskinfo->audio_rtcp_socket= rtcp_socket;
838 callinfo->taskinfo->audio_rtp_socket= rtp_socket;
839
840 /* start playback task if not already started */
841 if (!callinfo->taskinfo->parent_thread) {
842 if (!rtpstream_start_task (callinfo)) {
843 /* error starting playback task */
844 return 0;
845 }
846 }
847
848 /* make sure the new socket gets bound to destination address (if any) */
849 callinfo->taskinfo->flags|= TI_RECONNECTSOCKET;
850
851 return callinfo->audioport;
852 }
853
854 /* code checked */
rtpstream_get_videoport(rtpstream_callinfo_t * callinfo)855 int rtpstream_get_videoport (rtpstream_callinfo_t *callinfo)
856 {
857 debugprint ("rtpstream_get_videoport callinfo=%p",callinfo);
858
859 int rtp_socket;
860 int rtcp_socket;
861
862 if (!callinfo->taskinfo) {
863 return 0;
864 }
865
866 if (callinfo->videoport) {
867 /* already a port assigned to this call */
868 debugprint (" ==> %d\n",callinfo->videoport);
869 return callinfo->videoport;
870 }
871
872 callinfo->videoport= rtpstream_get_localport (&rtp_socket,&rtcp_socket);
873 debugprint (" ==> %d\n",callinfo->videoport);
874
875 /* assign rtp and rtcp sockets to callinfo. must assign rtcp socket first */
876 callinfo->taskinfo->video_rtcp_socket= rtcp_socket;
877 callinfo->taskinfo->video_rtp_socket= rtp_socket;
878
879 /* start playback task if not already started */
880 if (!callinfo->taskinfo->parent_thread) {
881 if (!rtpstream_start_task (callinfo)) {
882 /* error starting playback task */
883 return 0;
884 }
885 }
886
887 /* make sure the new socket gets bound to destination address (if any) */
888 callinfo->taskinfo->flags|= TI_RECONNECTSOCKET;
889
890 return callinfo->videoport;
891 }
892
893 /* code checked */
rtpstream_set_remote(rtpstream_callinfo_t * callinfo,int ip_ver,char * ip_addr,int audio_port,int video_port)894 void rtpstream_set_remote (rtpstream_callinfo_t *callinfo, int ip_ver, char *ip_addr,
895 int audio_port, int video_port)
896 {
897 struct sockaddr_storage address;
898 struct in_addr *ip4_addr;
899 struct in6_addr *ip6_addr;
900 taskentry_t *taskinfo;
901 unsigned count;
902 int nonzero_ip;
903
904 debugprint("rtpstream_set_remote callinfo=%p, ip_ver %d ip_addr %s audio %d video %d\n",
905 callinfo, ip_ver, ip_addr, audio_port, video_port);
906
907 taskinfo= callinfo->taskinfo;
908 if (!taskinfo) {
909 /* no task info found - cannot set remote data. just return */
910 return;
911 }
912
913 nonzero_ip= 0;
914 taskinfo->flags|= TI_NULLIP; /// TODO: this (may) cause a gap in playback, if playback thread gets to exec while this is set and before new IP is checked.
915
916 /* test that media ip address version match remote ip address version? */
917
918 /* initialise address family and IP address for remote socket */
919 memset(&address,0,sizeof(address));
920 if (media_ip_is_ipv6) {
921 /* process ipv6 address */
922 address.ss_family= AF_INET6;
923 ip6_addr= &((_RCAST(struct sockaddr_in6 *,&address))->sin6_addr);
924 if (inet_pton(AF_INET6,ip_addr,ip6_addr)==1) {
925 for (count=0;count<sizeof(*ip6_addr);count++) {
926 if (((char*)ip6_addr)[count]) {
927 nonzero_ip= 1;
928 break;
929 }
930 }
931 }
932 } else {
933 /* process ipv4 address */
934 address.ss_family= AF_INET;
935 ip4_addr= &((_RCAST(struct sockaddr_in *,&address))->sin_addr);
936 if (inet_pton(AF_INET,ip_addr,ip4_addr)==1) {
937 for (count=0;count<sizeof(*ip4_addr);count++) {
938 if (((char*)ip4_addr)[count]) {
939 nonzero_ip= 1;
940 break;
941 }
942 }
943 }
944 }
945
946 if (!nonzero_ip) {
947 return;
948 }
949
950 /* enter critical section to lock address updates */
951 /* may want to leave this out -- low chance of race condition */
952 pthread_mutex_lock (&(taskinfo->mutex));
953
954 /* clear out existing addresses */
955 memset (&(taskinfo->remote_audio_rtp_addr),0,sizeof(taskinfo->remote_audio_rtp_addr));
956 memset (&(taskinfo->remote_audio_rtcp_addr),0,sizeof(taskinfo->remote_audio_rtcp_addr));
957 memset (&(taskinfo->remote_video_rtp_addr),0,sizeof(taskinfo->remote_video_rtp_addr));
958 memset (&(taskinfo->remote_video_rtcp_addr),0,sizeof(taskinfo->remote_video_rtcp_addr));
959
960 /* Audio */
961 if (audio_port) {
962 if (media_ip_is_ipv6) {
963 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)audio_port);
964 } else {
965 (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)audio_port);
966 }
967 memcpy (&(taskinfo->remote_audio_rtp_addr),&address,sizeof(address));
968
969 if (media_ip_is_ipv6) {
970 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)audio_port+1);
971 } else {
972 (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)audio_port+1);
973 }
974 memcpy (&(taskinfo->remote_audio_rtcp_addr),&address,sizeof(address));
975
976 taskinfo->flags&= ~TI_NULL_AUDIOIP;
977 }
978
979 /* Video */
980 if (video_port) {
981 if (media_ip_is_ipv6) {
982 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)video_port);
983 } else {
984 (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)video_port);
985 }
986 memcpy (&(taskinfo->remote_video_rtp_addr),&address,sizeof(address));
987
988 if (media_ip_is_ipv6) {
989 (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)video_port+1);
990 } else {
991 (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)video_port+1);
992 }
993 memcpy (&(taskinfo->remote_video_rtcp_addr),&address,sizeof(address));
994
995 taskinfo->flags&= ~TI_NULL_VIDEOIP;
996 }
997
998 /* ok, we are done with the shared memory objects. let go mutex */
999 pthread_mutex_unlock (&(taskinfo->mutex));
1000
1001 taskinfo->flags|= TI_RECONNECTSOCKET;
1002
1003 /* may want to start a playback (listen) task here if no task running? */
1004 /* only makes sense if we decide to send 0-filled packets on idle */
1005 }
1006
1007 /* code checked */
rtpstream_play(rtpstream_callinfo_t * callinfo,rtpstream_actinfo_t * actioninfo)1008 void rtpstream_play (rtpstream_callinfo_t *callinfo, rtpstream_actinfo_t *actioninfo)
1009 {
1010 debugprint ("rtpstream_play callinfo=%p filename %s loop %d bytes %d payload %d ptime %d tick %d\n",callinfo,actioninfo->filename,actioninfo->loop_count,actioninfo->bytes_per_packet,actioninfo->payload_type,actioninfo->ms_per_packet,actioninfo->ticks_per_packet);
1011
1012 int file_index= rtpstream_cache_file (actioninfo->filename);
1013 taskentry_t *taskinfo= callinfo->taskinfo;
1014
1015 if (file_index<0) {
1016 return; /* cannot find file to play */
1017 }
1018
1019 if (!taskinfo) {
1020 return; /* no task data structure */
1021 }
1022
1023 /* make sure we have an open socket from which to play the audio file */
1024 rtpstream_get_audioport (callinfo);
1025
1026 /* save file parameter in taskinfo structure */
1027 taskinfo->new_loop_count= actioninfo->loop_count;
1028 taskinfo->new_bytes_per_packet= actioninfo->bytes_per_packet;
1029 taskinfo->new_file_size= cached_files[file_index].filesize;
1030 taskinfo->new_file_bytes= cached_files[file_index].bytes;
1031 taskinfo->new_ms_per_packet= actioninfo->ms_per_packet;
1032 taskinfo->new_timeticks_per_packet= actioninfo->ticks_per_packet;
1033 taskinfo->new_payload_type= actioninfo->payload_type;
1034
1035 /* set flag that we have a new file to play */
1036 taskinfo->flags|= TI_PLAYFILE;
1037 }
1038
1039 /* code checked */
rtpstream_pause(rtpstream_callinfo_t * callinfo)1040 void rtpstream_pause (rtpstream_callinfo_t *callinfo)
1041 {
1042 debugprint ("rtpstream_pause callinfo=%p\n",callinfo);
1043
1044 if (callinfo->taskinfo) {
1045 callinfo->taskinfo->flags|= TI_PAUSERTP;
1046 }
1047 }
1048
1049 /* code checked */
rtpstream_resume(rtpstream_callinfo_t * callinfo)1050 void rtpstream_resume (rtpstream_callinfo_t *callinfo)
1051 {
1052 debugprint ("rtpstream_resume callinfo=%p\n",callinfo);
1053
1054 if (callinfo->taskinfo) {
1055 callinfo->taskinfo->flags&= ~TI_PAUSERTP;
1056 }
1057 }
1058
1059 /* code checked */
rtpstream_shutdown(void)1060 void rtpstream_shutdown (void)
1061 {
1062 int count= 0;
1063
1064 debugprint ("rtpstream_shutdown\n");
1065
1066 /* signal all playback threads that they should exit */
1067 if (ready_threads) {
1068 for (count=0;count<num_ready_threads;count++) {
1069 ready_threads[count]->exit_flag= 1;
1070 }
1071 free (ready_threads);
1072 ready_threads= NULL;
1073 }
1074
1075 if (busy_threads) {
1076 for (count=0;count<num_busy_threads;count++) {
1077 busy_threads[count]->exit_flag= 1;
1078 }
1079 free(busy_threads);
1080 busy_threads = NULL;
1081 }
1082
1083 /* first make sure no playback threads are accessing the file buffers */
1084 /* else small chance the playback thread tries to access freed memory */
1085 while (rtpstream_numthreads) {
1086 usleep (50000);
1087 }
1088
1089 /* now free cached file bytes and structure */
1090 for (count=0;count<num_cached_files;count++) {
1091 free (cached_files[count].bytes);
1092 }
1093 if (cached_files) {
1094 free (cached_files);
1095 cached_files= NULL;
1096 }
1097 }
1098