1 /*
2  *  This program is free software; you can redistribute it and/or modify
3  *  it under the terms of the GNU General Public License as published by
4  *  the Free Software Foundation; either version 2 of the License, or
5  *  (at your option) any later version.
6  *
7  *  This program is distributed in the hope that it will be useful,
8  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *  GNU General Public License for more details.
11  *
12  *  You should have received a copy of the GNU General Public License
13  *  along with this program; if not, write to the Free Software
14  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307, USA
15  *
16  *  Author : Deon van der Westhuysen - June 2012 - Vodacom PTY LTD
17  */
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 
23 #include "sipp.hpp"
24 #include <unistd.h>
25 #include <stdint.h>
26 #include <fcntl.h>
27 #include <sys/socket.h>
28 #include <pthread.h>
29 #include "rtpstream.hpp"
30 
31 /* stub to add extra debugging/logging... */
debugprint(const char * Format,...)32 static void debugprint(const char *Format, ...)
33 {
34 }
35 
36 #define RTPSTREAM_FILESPERBLOCK       16
37 #define BIND_MAX_TRIES                100
38 #define RTPSTREAM_THREADBLOCKSIZE     16
39 #define MAX_UDP_RECV_BUFFER           8192
40 
41 #define TI_NULL_AUDIOIP               0x01
42 #define TI_NULL_VIDEOIP               0x02
43 #define TI_NULLIP                     (TI_NULL_AUDIOIP|TI_NULL_VIDEOIP)
44 #define TI_PAUSERTP                   0x04
45 #define TI_ECHORTP                    0x08  /* Not currently implemented */
46 #define TI_KILLTASK                   0x10
47 #define TI_RECONNECTSOCKET            0x20
48 #define TI_PLAYFILE                   0x40
49 #define TI_CONFIGFLAGS                (TI_KILLTASK|TI_RECONNECTSOCKET|TI_PLAYFILE)
50 
51 struct rtp_header_t
52 {
53  uint16_t         flags;
54  uint16_t         seq;
55  uint32_t         timestamp;
56  uint32_t         ssrc_id;
57 };
58 
59 struct taskentry_t
60 {
61   threaddata_t         *parent_thread;
62   unsigned long        nextwake_ms;
63   volatile int         flags;
64 
65   /* rtp stream information */
66   unsigned long long   last_timestamp;
67   unsigned short       seq;
68   char                 payload_type;
69   unsigned int         ssrc_id;
70 
71   /* current playback information */
72   int                  loop_count;
73   char                 *file_bytes_start;
74   char                 *current_file_bytes;
75   int                  file_num_bytes;
76   int                  file_bytes_left;
77   /* playback timing information */
78   int                  ms_per_packet;
79   int                  bytes_per_packet;
80   int                  timeticks_per_packet;
81   int                  timeticks_per_ms;
82 
83   /* new file playback information */
84   char                 new_payload_type;
85   int                  new_loop_count;
86   int                  new_file_size;
87   char                 *new_file_bytes;
88   int                  new_ms_per_packet;
89   int                  new_bytes_per_packet;
90   int                  new_timeticks_per_packet;
91   /* sockets for audio/video rtp_rtcp */
92   int                  audio_rtp_socket;
93   int                  audio_rtcp_socket;
94   int                  video_rtp_socket;
95   int                  video_rtcp_socket;
96 
97   /* rtp peer address structures */
98   struct sockaddr_storage    remote_audio_rtp_addr;
99   struct sockaddr_storage    remote_audio_rtcp_addr;
100   struct sockaddr_storage    remote_video_rtp_addr;
101   struct sockaddr_storage    remote_video_rtcp_addr;
102 
103   /* we will have a mutex per call. should we consider refactoring to */
104   /* share mutexes across calls? makes the per-call code more complex */
105 
106   /* thread mananagment structures */
107   pthread_mutex_t      mutex;
108 };
109 
110 struct threaddata_t
111 {
112   pthread_mutex_t tasklist_mutex;
113   int             busy_list_index;
114   int             max_tasks;
115   volatile int    num_tasks;
116   volatile int    del_pending;
117   volatile int    exit_flag;
118   taskentry_t     *tasklist;
119 };
120 
121 struct cached_file_t
122 {
123   char   filename[RTPSTREAM_MAX_FILENAMELEN];
124   char   *bytes;
125   int    filesize;
126 };
127 
128 cached_file_t  *cached_files= NULL;
129 int            num_cached_files= 0;
130 int            next_rtp_port= 0;
131 
132 threaddata_t  **ready_threads= NULL;
133 threaddata_t  **busy_threads= NULL;
134 int           num_busy_threads= 0;
135 int           num_ready_threads= 0;
136 int           busy_threads_max= 0;
137 int           ready_threads_max= 0;
138 
139 unsigned int  global_ssrc_id= 0xCA110000;
140 
141 //===================================================================================================
142 
143 /* code checked */
rtpstream_free_taskinfo(taskentry_t * taskinfo)144 static void rtpstream_free_taskinfo(taskentry_t* taskinfo)
145 {
146   if (taskinfo) {
147     /* close sockets associated with this call */
148     if (taskinfo->audio_rtp_socket!=-1) {
149       close (taskinfo->audio_rtp_socket);
150     }
151     if (taskinfo->audio_rtcp_socket!=-1) {
152       close (taskinfo->audio_rtcp_socket);
153     }
154     if (taskinfo->video_rtp_socket!=-1) {
155       close (taskinfo->video_rtp_socket);
156     }
157     if (taskinfo->video_rtcp_socket!=-1) {
158       close (taskinfo->video_rtcp_socket);
159     }
160 
161     /* cleanup pthread library structure */
162     pthread_mutex_destroy(&(taskinfo->mutex));
163 
164     free (taskinfo);
165   }
166 }
167 
168 /* code checked */
rtpstream_process_task_flags(taskentry_t * taskinfo)169 static void rtpstream_process_task_flags(taskentry_t* taskinfo)
170 {
171   if (taskinfo->flags&TI_RECONNECTSOCKET) {
172     int remote_addr_len;
173     int rc;
174 
175     remote_addr_len = media_ip_is_ipv6 ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
176 
177     /* enter critical section to lock address updates */
178     /* may want to leave this out -- low chance of race condition */
179     pthread_mutex_lock (&(taskinfo->mutex));
180 
181     /* If we have valid ip and port numbers for audio rtp stream */
182     if (!(taskinfo->flags & TI_NULL_AUDIOIP))
183     {
184       if (taskinfo->audio_rtcp_socket != -1) {
185         rc = connect(taskinfo->audio_rtcp_socket, (struct sockaddr *)&taskinfo->remote_audio_rtcp_addr,
186                      remote_addr_len);
187         if (rc < 0) {
188           debugprint("closing audio rtcp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
189                      taskinfo->audio_rtcp_socket, errno, taskinfo);
190           close(taskinfo->audio_rtcp_socket);
191           taskinfo->audio_rtcp_socket = -1;
192         }
193       }
194 
195       if (taskinfo->audio_rtp_socket != -1) {
196         rc = connect(taskinfo->audio_rtp_socket, (struct sockaddr *)&taskinfo->remote_audio_rtp_addr, remote_addr_len);
197         if (rc < 0) {
198           debugprint("closing audio rtp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
199                      taskinfo->audio_rtp_socket, errno, taskinfo);
200           close(taskinfo->audio_rtp_socket);
201           taskinfo->audio_rtp_socket = -1;
202         }
203       }
204     }
205 
206     /* If we have valid ip and port numbers for video rtp stream */
207     if (!(taskinfo->flags & TI_NULL_VIDEOIP))
208     {
209       if (taskinfo->video_rtcp_socket != -1) {
210         rc = connect(taskinfo->video_rtcp_socket, (struct sockaddr *)&taskinfo->remote_video_rtcp_addr,
211                      remote_addr_len);
212         if (rc < 0) {
213           debugprint("closing video rtcp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
214                      taskinfo->video_rtcp_socket, errno, taskinfo);
215           close(taskinfo->video_rtcp_socket);
216           taskinfo->video_rtcp_socket = -1;
217         }
218       }
219       if (taskinfo->video_rtp_socket != -1) {
220         rc = connect(taskinfo->video_rtp_socket, (struct sockaddr *)&taskinfo->remote_video_rtp_addr,
221                      remote_addr_len);
222         if (rc < 0) {
223           debugprint("closing video rtp socket %d due to error %d in rtpstream_process_task_flags taskinfo=%p\n",
224                      taskinfo->video_rtp_socket, errno, taskinfo);
225           close(taskinfo->video_rtp_socket);
226           taskinfo->video_rtp_socket = -1;
227         }
228       }
229     }
230 
231     taskinfo->flags&= ~TI_RECONNECTSOCKET;
232     pthread_mutex_unlock (&(taskinfo->mutex));
233   }
234   if (taskinfo->flags&TI_PLAYFILE) {
235     /* copy playback information */
236     taskinfo->loop_count= taskinfo->new_loop_count;
237     taskinfo->file_bytes_start= taskinfo->new_file_bytes;
238     taskinfo->current_file_bytes= taskinfo->new_file_bytes;
239     taskinfo->file_num_bytes= taskinfo->new_file_size;
240     taskinfo->file_bytes_left= taskinfo->new_file_size;
241     taskinfo->payload_type= taskinfo->new_payload_type;
242 
243     taskinfo->ms_per_packet= taskinfo->new_ms_per_packet;
244     taskinfo->bytes_per_packet= taskinfo->new_bytes_per_packet;
245     taskinfo->timeticks_per_packet= taskinfo->new_timeticks_per_packet;
246     taskinfo->timeticks_per_ms= taskinfo->timeticks_per_packet/taskinfo->ms_per_packet;
247 
248     taskinfo->last_timestamp= getmilliseconds()*taskinfo->timeticks_per_ms;
249     taskinfo->flags&= ~TI_PLAYFILE;
250   }
251 }
252 
253 /**** todo - check code ****/
rtpstream_playrtptask(taskentry_t * taskinfo,unsigned long timenow_ms)254 static unsigned long rtpstream_playrtptask(taskentry_t *taskinfo, unsigned long  timenow_ms)
255 {
256     int rc;
257     unsigned long next_wake;
258     unsigned long long target_timestamp;
259 
260     union {
261         rtp_header_t hdr;
262         char buffer[MAX_UDP_RECV_BUFFER];
263     } udp;
264 
265     /* OK, now to play - sockets are supposed to be non-blocking */
266     /* no support for video stream at this stage. will need some work */
267 
268     next_wake = timenow_ms + 100; /* default next wakeup time */
269 
270     if (taskinfo->audio_rtcp_socket != -1) {
271         /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
272         while ((rc = recv(taskinfo->audio_rtcp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
273             /*
274              * rtpstream_bytes_in+= rc;
275              */
276         }
277     }
278 
279     if (taskinfo->video_rtp_socket != -1) {
280         /* just keep listening on rtp socket (is this really required?) - ignore any errors */
281         while ((rc = recv(taskinfo->video_rtp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
282             /*
283              * rtpstream_bytes_in += rc;
284              */
285         }
286     }
287 
288     if (taskinfo->video_rtcp_socket != -1) {
289         /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
290         while ((rc = recv(taskinfo->video_rtcp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
291             /*
292              * rtpstream_bytes_in+= rc;
293              */
294         }
295     }
296 
297     if (taskinfo->audio_rtp_socket != -1) {
298         /* this is temp code - will have to reorganize if/when we include echo functionality */
299         /* just keep listening on rtcp socket (is this really required?) - ignore any errors */
300         while ((rc = recv(taskinfo->audio_rtp_socket, udp.buffer, sizeof(udp), 0)) >= 0) {
301             /* for now we will just ignore any received data or receive errors */
302             /* separate code path for RTP echo */
303             rtpstream_bytes_in += rc;
304         }
305         /* are we playing back an audio file? */
306         if (taskinfo->loop_count) {
307             target_timestamp = timenow_ms * taskinfo->timeticks_per_ms;
308             next_wake = timenow_ms + taskinfo->ms_per_packet - timenow_ms % taskinfo->ms_per_packet;
309             if (taskinfo->flags & (TI_NULL_AUDIOIP|TI_PAUSERTP)) {
310                 /* when paused, set timestamp so stream appears to be up to date */
311                 taskinfo->last_timestamp = target_timestamp;
312             }
313             if (taskinfo->last_timestamp < target_timestamp) {
314                 /* need to send rtp payload - build rtp packet header... */
315                 udp.hdr.flags = htons(0x8000 | taskinfo->payload_type);
316                 udp.hdr.seq = htons(taskinfo->seq);
317                 udp.hdr.timestamp = htonl((uint32_t)(taskinfo->last_timestamp & 0xFFFFFFFF));
318                 udp.hdr.ssrc_id = htonl(taskinfo->ssrc_id);
319                 /* add payload data to the packet - handle buffer wraparound */
320                 if (taskinfo->file_bytes_left >= taskinfo->bytes_per_packet) {
321                     /* no need for fancy acrobatics */
322                     memcpy(udp.buffer + sizeof(rtp_header_t), taskinfo->current_file_bytes, taskinfo->bytes_per_packet);
323                 } else {
324                     /* copy from end and then begining of file. does not handle the */
325                     /* case where file is shorter than the packet length!! */
326                     memcpy(udp.buffer + sizeof(rtp_header_t), taskinfo->current_file_bytes, taskinfo->file_bytes_left);
327                     memcpy(udp.buffer + sizeof(rtp_header_t) + taskinfo->file_bytes_left,
328                            taskinfo->file_bytes_start, taskinfo->bytes_per_packet-taskinfo->file_bytes_left);
329                 }
330                 /* now send the actual packet */
331                 rc = send(taskinfo->audio_rtp_socket, udp.buffer, taskinfo->bytes_per_packet + sizeof(rtp_header_t), 0);
332                 if (rc < 0) {
333                     /* handle sending errors */
334                     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
335                         next_wake= timenow_ms + 2; /* retry after short sleep */
336                     } else {
337                         /* this looks like a permanent error  - should we ignore ENETUNREACH? */
338                         debugprint("closing rtp socket %d due to error %d in rtpstream_new_call taskinfo=%p\n",
339                                    taskinfo->audio_rtp_socket, errno, taskinfo);
340                         close(taskinfo->audio_rtp_socket);
341                         taskinfo->audio_rtp_socket= -1;
342                     }
343                 } else {
344                     /* statistics - only count successful sends */
345                     rtpstream_bytes_out += taskinfo->bytes_per_packet + sizeof(rtp_header_t);
346                     rtpstream_pckts++;
347                     /* advance playback pointer to next packet */
348                     taskinfo->seq++;
349                     /* must change if timer ticks per packet can be fractional */
350                     taskinfo->last_timestamp += taskinfo->timeticks_per_packet;
351                     taskinfo->file_bytes_left -= taskinfo->bytes_per_packet;
352                     if (taskinfo->file_bytes_left > 0) {
353                         taskinfo->current_file_bytes += taskinfo->bytes_per_packet;
354                     } else {
355                         taskinfo->current_file_bytes = taskinfo->file_bytes_start-taskinfo->file_bytes_left;
356                         taskinfo->file_bytes_left += taskinfo->file_num_bytes;
357                         if (taskinfo->loop_count > 0) {
358                             /* one less loop to play. -1 (infinite loops) will stay as is */
359                             taskinfo->loop_count--;
360                         }
361                     }
362                     if (taskinfo->last_timestamp < target_timestamp) {
363                         /* no sleep if we are behind */
364                         next_wake= timenow_ms;
365                     }
366                 }
367             }
368         } else {
369             /* not busy playing back a file -  put possible rtp echo code here. */
370         }
371     }
372 
373     return next_wake;
374 }
375 
376 
377 
378 /*********************************************************************************/
379 /*********************************************************************************/
380 /*********************************************************************************/
381 
382 
383 /* code checked */
rtpstream_playback_thread(void * params)384 static void* rtpstream_playback_thread(void* params)
385 {
386   threaddata_t   *threaddata= (threaddata_t *) params;
387   taskentry_t    *taskinfo;
388   int            taskindex;
389 
390   unsigned long  timenow_ms;
391   unsigned long  waketime_ms;
392   int            sleeptime_us;
393 
394   rtpstream_numthreads++; /* perhaps wrap this in a mutex? */
395 
396   while (!threaddata->exit_flag) {
397     timenow_ms= getmilliseconds();
398     waketime_ms= timenow_ms+ 100; /* default sleep 100ms */
399 
400     /* iterate through tasks and handle playback and other actions */
401     for (taskindex=0;taskindex<threaddata->num_tasks;taskindex++) {
402       taskinfo= (&threaddata->tasklist)[taskindex];
403       if (taskinfo->flags&TI_CONFIGFLAGS) {
404         if (taskinfo->flags&TI_KILLTASK) {
405           /* remove this task entry and release its resources */
406           pthread_mutex_lock (&(threaddata->tasklist_mutex));
407           (&threaddata->tasklist)[taskindex--]= (&threaddata->tasklist)[--threaddata->num_tasks];
408           threaddata->del_pending--;   /* must decrease del_pending after num_tasks */
409           pthread_mutex_unlock (&(threaddata->tasklist_mutex));
410           rtpstream_free_taskinfo (taskinfo);
411           continue;
412         }
413         /* handle any other config related flags */
414         rtpstream_process_task_flags (taskinfo);
415       }
416 
417       /* should we update current time inbetween tasks? */
418       if (taskinfo->nextwake_ms<=timenow_ms) {
419         /* task needs to execute now */
420         taskinfo->nextwake_ms= rtpstream_playrtptask (taskinfo,timenow_ms);
421       }
422       if (waketime_ms>taskinfo->nextwake_ms) {
423         waketime_ms= taskinfo->nextwake_ms;
424       }
425     }
426     /* sleep until next iteration of playback loop */
427     sleeptime_us= (waketime_ms-getmilliseconds())*1000;
428     if (sleeptime_us>0) {
429       usleep (sleeptime_us);
430     }
431   }
432 
433   /* Free all task and thread resources and exit the thread */
434   for (taskindex=0;taskindex<threaddata->num_tasks;taskindex++) {
435     /* check if we should delete this thread, else let owner call clear it */
436     /* small chance of race condition in this code */
437     taskinfo= (&threaddata->tasklist)[taskindex];
438     if (taskinfo->flags&TI_KILLTASK) {
439       rtpstream_free_taskinfo (taskinfo);
440     } else {
441       taskinfo->parent_thread= NULL; /* no longer associated with a thread */
442     }
443   }
444   pthread_mutex_destroy(&(threaddata->tasklist_mutex));
445   free (threaddata);
446   rtpstream_numthreads--; /* perhaps wrap this in a mutex? */
447 
448   return NULL;
449 }
450 
451 /* code checked */
rtpstream_start_task(rtpstream_callinfo_t * callinfo)452 static int rtpstream_start_task (rtpstream_callinfo_t *callinfo)
453 {
454   int           ready_index;
455   int           allocsize;
456   threaddata_t  **threadlist;
457   threaddata_t  *threaddata;
458   pthread_t     newthread;
459 
460   /* safety check... */
461   if (!callinfo->taskinfo) {
462     return 0;
463   }
464 
465   /* we count on the fact that only one thread can add/remove playback tasks */
466   /* thus we don't have mutexes to protect the thread list objects.          */
467   for (ready_index=0;ready_index<num_ready_threads;ready_index++) {
468     /* ready threads have a spare task slot or should have one very shortly */
469     /* if we find a task with no spare slots, just skip to the next one.    */
470     if (ready_threads[ready_index]->num_tasks<ready_threads[ready_index]->max_tasks) {
471       /* we found a thread with an open task slot. */
472       break;
473     }
474   }
475 
476   if (ready_index==num_ready_threads) {
477     /* did not find a thread with spare task slots, thus we create one here */
478     if (num_ready_threads>=ready_threads_max) {
479       /* need to allocate more memory for thread list */
480       ready_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
481       threadlist= (threaddata_t **) realloc (ready_threads,sizeof(*ready_threads)*ready_threads_max);
482       if (!threadlist) {
483         /* could not allocate bigger block... worry [about it later] */
484         ready_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
485         return 0;
486       }
487       ready_threads= threadlist;
488     }
489     /* create and initialise data structure for new thread */
490     allocsize= sizeof(*threaddata)+sizeof(threaddata->tasklist)*(rtp_tasks_per_thread-1);
491     threaddata= (threaddata_t *) malloc (allocsize);
492     if (!threaddata) {
493       return 0;
494     }
495     memset (threaddata,0,allocsize);
496     threaddata->max_tasks= rtp_tasks_per_thread;
497     threaddata->busy_list_index= -1;
498     pthread_mutex_init(&(threaddata->tasklist_mutex),NULL);
499     /* create the thread itself */
500     if (pthread_create(&newthread,NULL,rtpstream_playback_thread,threaddata)) {
501       /* error creating the thread */
502       free (threaddata);
503       return 0;
504     }
505     /* Add thread to list of ready (spare capacity) threads */
506     ready_threads[num_ready_threads++]= threaddata;
507   }
508 
509   /* now add new task to a spare slot in our thread tasklist */
510   threaddata= ready_threads[ready_index];
511   callinfo->taskinfo->parent_thread= threaddata;
512   pthread_mutex_lock (&(threaddata->tasklist_mutex));
513   (&threaddata->tasklist)[threaddata->num_tasks++]= callinfo->taskinfo;
514   pthread_mutex_unlock (&(threaddata->tasklist_mutex));
515 
516   /* this check relies on playback thread to decrement num_tasks before */
517   /* decrementing del_pending -- else we need to lock before this test  */
518   if ((threaddata->del_pending==0)&&(threaddata->num_tasks>=threaddata->max_tasks)) {
519     /* move this thread to the busy list - no free task slots */
520     /* first check if the busy list is big enough to hold new thread */
521     if (num_busy_threads>=busy_threads_max) {
522       /* need to allocate more memory for thread list */
523       busy_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
524       threadlist= (threaddata_t **) realloc (busy_threads,sizeof(*busy_threads)*busy_threads_max);
525       if (!threadlist) {
526         /* could not allocate bigger block... leave thread in ready list */
527         busy_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
528         return 1; /* success, sort of */
529       }
530       busy_threads= threadlist;
531     }
532     /* add to busy list */
533     threaddata->busy_list_index= num_busy_threads;
534     busy_threads[num_busy_threads++]= threaddata;
535     /* remove from ready list */
536     ready_threads[ready_index]= ready_threads[--num_ready_threads];
537   }
538 
539   return 1; /* done! */
540 }
541 
542 /* code checked */
rtpstream_stop_task(rtpstream_callinfo_t * callinfo)543 static void rtpstream_stop_task(rtpstream_callinfo_t* callinfo)
544 {
545   threaddata_t  **threadlist;
546   taskentry_t   *taskinfo= callinfo->taskinfo;
547   int           busy_index;
548 
549   if (taskinfo) {
550     if (taskinfo->parent_thread) {
551       /* this call's task is registered with an executing thread */
552       /* first move owning thread to the ready list - will be ready soon */
553       busy_index= taskinfo->parent_thread->busy_list_index;
554       if (busy_index>=0) {
555         /* make sure we have enough entries in ready list */
556         if (num_ready_threads>=ready_threads_max) {
557           /* need to allocate more memory for thread list */
558           ready_threads_max+= RTPSTREAM_THREADBLOCKSIZE;
559           threadlist= (threaddata_t **) realloc (ready_threads,sizeof(*ready_threads)*ready_threads_max);
560           if (!threadlist) {
561             /* could not allocate bigger block... reset max threads */
562             /* this is a problem - ready thread gets "lost" on busy list */
563             ready_threads_max-= RTPSTREAM_THREADBLOCKSIZE;
564           } else {
565             ready_threads= threadlist;
566           }
567         }
568 
569         if (num_ready_threads<ready_threads_max) {
570           /* OK, got space on ready list, move to ready list */
571           busy_threads[busy_index]->busy_list_index= -1;
572           ready_threads[num_ready_threads++]= busy_threads[busy_index];
573           num_busy_threads--;
574           /* fill up gap in the busy thread list */
575           if (busy_index!=num_busy_threads) {
576             busy_threads[busy_index]= busy_threads[num_busy_threads];
577             busy_threads[busy_index]->busy_list_index= busy_index;
578           }
579         }
580       }
581       /* then ask the thread to destory this task (and its memory) */
582       pthread_mutex_lock (&(taskinfo->parent_thread->tasklist_mutex));
583       taskinfo->parent_thread->del_pending++;
584       taskinfo->flags|= TI_KILLTASK;
585       pthread_mutex_unlock (&(taskinfo->parent_thread->tasklist_mutex));
586     } else {
587       /* no playback thread owner, just free it */
588       rtpstream_free_taskinfo (taskinfo);
589     }
590     callinfo->taskinfo= NULL;
591   }
592 }
593 
594 /* code checked */
rtpstream_new_call(rtpstream_callinfo_t * callinfo)595 int rtpstream_new_call (rtpstream_callinfo_t *callinfo)
596 {
597   debugprint ("rtpstream_new_call callinfo=%p\n",callinfo);
598 
599   taskentry_t   *taskinfo;
600 
601   /* general init */
602   memset (callinfo,0,sizeof(*callinfo));
603 
604   taskinfo= (taskentry_t *) malloc (sizeof(*taskinfo));
605   if (!taskinfo) {
606     /* cannot allocate taskinfo memory - bubble error up */
607     return 0;
608   }
609   callinfo->taskinfo= taskinfo;
610 
611   memset (taskinfo,0,sizeof(*taskinfo));
612   taskinfo->flags= TI_NULLIP;
613   /* socket descriptors */
614   taskinfo->audio_rtp_socket= -1;
615   taskinfo->audio_rtcp_socket= -1;
616   taskinfo->video_rtp_socket= -1;
617   taskinfo->video_rtcp_socket= -1;
618   /* rtp stream members */
619   taskinfo->ssrc_id= global_ssrc_id++;
620   /* pthread mutexes */
621   pthread_mutex_init(&(callinfo->taskinfo->mutex),NULL);
622 
623   return 1;
624 }
625 
626 /* code checked */
rtpstream_end_call(rtpstream_callinfo_t * callinfo)627 void rtpstream_end_call (rtpstream_callinfo_t *callinfo)
628 {
629   debugprint ("rtpstream_end_call callinfo=%p\n",callinfo);
630 
631   /* stop playback thread(s) for this call */
632   rtpstream_stop_task (callinfo);
633 }
634 
635 /* code checked */
rtpstream_cache_file(char * filename)636 int rtpstream_cache_file (char *filename)
637 {
638   int           count= 0;
639   cached_file_t *newcachelist;
640   char          *filecontents;
641   struct stat   statbuffer;
642   FILE          *f;
643 
644   debugprint ("rtpstream_cache_file filename=%s\n",filename);
645 
646   /* cached file entries are stored in a dynamically grown array. */
647   /* could use a binary (or avl) tree but number of files should  */
648   /* be small and doesn't really justify the effort.              */
649   while (count<num_cached_files) {
650     if (!strcmp(cached_files[count].filename,filename)) {
651       /* found the file already loaded. just return index */
652       return count;
653     }
654     count++;
655   }
656 
657   /* Allocate memory and load file */
658   if (stat(filename,&statbuffer)) {
659     /* could not get file information */
660     return -1;
661   }
662   f= fopen(filename,"rb");
663   if (!f) {
664     /* could not open file */
665     return -1;
666   }
667 
668   filecontents= (char *)malloc (statbuffer.st_size);
669   if (!filecontents) {
670     /* could not alloc mem */
671     return -1;
672   }
673   if (!fread (filecontents,statbuffer.st_size,1,f)) {
674     /* could not read file */
675     free (filecontents);
676     return -1;
677   }
678   fclose (f);
679 
680   if (!(num_cached_files%RTPSTREAM_FILESPERBLOCK)) {
681     /* Time to allocate more memory for the next block of files */
682     newcachelist= (cached_file_t*) realloc(cached_files,sizeof(*cached_files)*(num_cached_files+RTPSTREAM_FILESPERBLOCK));
683     if (!newcachelist) {
684       /* out of memory */
685       free (filecontents);
686       return -1;
687     }
688     cached_files= newcachelist;
689   }
690   cached_files[num_cached_files].bytes= filecontents;
691   strncpy(cached_files[num_cached_files].filename,filename,
692           sizeof(cached_files[num_cached_files].filename) - 1);
693   cached_files[num_cached_files].filesize=statbuffer.st_size;
694   return num_cached_files++;
695 }
696 
rtpstream_setsocketoptions(int sock)697 static int rtpstream_setsocketoptions (int sock)
698 {
699   /* set socket non-blocking */
700   int flags= fcntl(sock,F_GETFL,0);
701   if (fcntl(sock,F_SETFL,flags|O_NONBLOCK)==-1) {
702     return 0;
703   }
704 
705   /* set buffer size */
706   unsigned int buffsize= rtp_buffsize;
707 
708   /* Increase buffer sizes for this sockets */
709   if(setsockopt(sock,SOL_SOCKET,SO_SNDBUF,(char*)&buffsize,sizeof(buffsize))) {
710     return 0;
711   }
712   if(setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&buffsize,sizeof(buffsize))) {
713     return 0;
714   }
715 
716   return 1; /* success */
717 }
718 
719 /* code checked */
rtpstream_get_localport(int * rtpsocket,int * rtcpsocket)720 static int rtpstream_get_localport (int *rtpsocket, int *rtcpsocket)
721 {
722   int                       port_number;
723   int                       tries;
724   struct sockaddr_storage   address;
725 
726   debugprint ("rtpstream_get_localport\n");
727 
728   if (next_rtp_port<min_rtp_port) {
729     /* initialise RTP port number counter */
730     next_rtp_port= min_rtp_port;
731   }
732 
733   /* initialise address family and IP address for media socket */
734   memset(&address,0,sizeof(address));
735   address.ss_family= media_ip_is_ipv6?AF_INET6:AF_INET;
736   if ((media_ip_is_ipv6?
737        inet_pton(AF_INET6,media_ip,&((_RCAST(struct sockaddr_in6 *,&address))->sin6_addr)):
738        inet_pton(AF_INET,media_ip,&((_RCAST(struct sockaddr_in *,&address))->sin_addr)))!=1) {
739     WARNING("Could not set up media IP for RTP streaming");
740     return 0;
741   }
742 
743   /* create new UDP listen socket */
744   *rtpsocket= socket(media_ip_is_ipv6?PF_INET6:PF_INET,SOCK_DGRAM,0);
745   if (*rtpsocket==-1) {
746     WARNING("Could not open socket for RTP streaming: %s", strerror(errno));
747     return 0;
748   }
749 
750   for (tries=0;tries<BIND_MAX_TRIES;tries++) {
751     /* try a sequence of port numbers until we find one where we can bind    */
752     /* should normally be the first port we try, unless we have long-running */
753     /* calls or somebody else is nicking ports.                              */
754 
755     port_number= next_rtp_port;
756     /* skip rtp ports in multples of 2 (allow for rtp plus rtcp) */
757     next_rtp_port+= 2;
758     if (next_rtp_port>(max_rtp_port-1)) {
759       next_rtp_port= min_rtp_port;
760     }
761 
762     if (media_ip_is_ipv6) {
763       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port =
764         htons((short)port_number);
765     } else {
766       (_RCAST(struct sockaddr_in *,&address))->sin_port=
767         htons((short)port_number);
768     }
769     if (::bind(*rtpsocket,(sockaddr *)(void *)&address,
770          SOCK_ADDR_SIZE(&address)) == 0) {
771       break;
772     }
773   }
774   /* Exit here if we didn't get a suitable port for rtp stream */
775   if (tries==BIND_MAX_TRIES) {
776     close (*rtpsocket);
777     *rtpsocket= -1;
778     WARNING("Could not bind port for RTP streaming after %d tries", tries);
779     return 0;
780   }
781 
782   if (!rtpstream_setsocketoptions (*rtpsocket)) {
783     close (*rtpsocket);
784     *rtpsocket= -1;
785     WARNING("Could not set socket options for RTP streaming");
786     return 0;
787   }
788 
789   /* create socket for rtcp - ignore any errors */
790   *rtcpsocket= socket(media_ip_is_ipv6?PF_INET6:PF_INET,SOCK_DGRAM,0);
791   if (*rtcpsocket!=-1) {
792     /* try to bind it to our preferred address */
793     if (media_ip_is_ipv6) {
794       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port =
795         htons((short)port_number+1);
796     } else {
797       (_RCAST(struct sockaddr_in *,&address))->sin_port=
798         htons((short)port_number+1);
799     }
800     if (::bind(*rtcpsocket,(sockaddr *)(void *)&address,
801          SOCK_ADDR_SIZE(&address))) {
802       /* could not bind the rtcp socket to required port. so we delete it */
803       close (*rtcpsocket);
804       *rtcpsocket= -1;
805     }
806     if (!rtpstream_setsocketoptions (*rtcpsocket)) {
807       close (*rtcpsocket);
808       *rtcpsocket= -1;
809     }
810   }
811 
812   return port_number;
813 }
814 
815 /* code checked */
rtpstream_get_audioport(rtpstream_callinfo_t * callinfo)816 int rtpstream_get_audioport (rtpstream_callinfo_t *callinfo)
817 {
818   debugprint ("rtpstream_get_audioport callinfo=%p",callinfo);
819 
820   int   rtp_socket;
821   int   rtcp_socket;
822 
823   if (!callinfo->taskinfo) {
824     return 0;
825   }
826 
827   if (callinfo->audioport) {
828     /* already a port assigned to this call */
829     debugprint (" ==> %d\n",callinfo->audioport);
830     return callinfo->audioport;
831   }
832 
833   callinfo->audioport= rtpstream_get_localport (&rtp_socket,&rtcp_socket);
834   debugprint (" ==> %d\n",callinfo->audioport);
835 
836   /* assign rtp and rtcp sockets to callinfo. must assign rtcp socket first */
837   callinfo->taskinfo->audio_rtcp_socket= rtcp_socket;
838   callinfo->taskinfo->audio_rtp_socket= rtp_socket;
839 
840   /* start playback task if not already started */
841   if (!callinfo->taskinfo->parent_thread) {
842     if (!rtpstream_start_task (callinfo)) {
843       /* error starting playback task */
844       return 0;
845     }
846   }
847 
848   /* make sure the new socket gets bound to destination address (if any) */
849   callinfo->taskinfo->flags|= TI_RECONNECTSOCKET;
850 
851   return callinfo->audioport;
852 }
853 
854 /* code checked */
rtpstream_get_videoport(rtpstream_callinfo_t * callinfo)855 int rtpstream_get_videoport (rtpstream_callinfo_t *callinfo)
856 {
857   debugprint ("rtpstream_get_videoport callinfo=%p",callinfo);
858 
859   int   rtp_socket;
860   int   rtcp_socket;
861 
862   if (!callinfo->taskinfo) {
863     return 0;
864   }
865 
866   if (callinfo->videoport) {
867     /* already a port assigned to this call */
868     debugprint (" ==> %d\n",callinfo->videoport);
869     return callinfo->videoport;
870   }
871 
872   callinfo->videoport= rtpstream_get_localport (&rtp_socket,&rtcp_socket);
873   debugprint (" ==> %d\n",callinfo->videoport);
874 
875   /* assign rtp and rtcp sockets to callinfo. must assign rtcp socket first */
876   callinfo->taskinfo->video_rtcp_socket= rtcp_socket;
877   callinfo->taskinfo->video_rtp_socket= rtp_socket;
878 
879   /* start playback task if not already started */
880   if (!callinfo->taskinfo->parent_thread) {
881     if (!rtpstream_start_task (callinfo)) {
882       /* error starting playback task */
883       return 0;
884     }
885   }
886 
887   /* make sure the new socket gets bound to destination address (if any) */
888   callinfo->taskinfo->flags|= TI_RECONNECTSOCKET;
889 
890   return callinfo->videoport;
891 }
892 
893 /* code checked */
rtpstream_set_remote(rtpstream_callinfo_t * callinfo,int ip_ver,char * ip_addr,int audio_port,int video_port)894 void rtpstream_set_remote (rtpstream_callinfo_t *callinfo, int ip_ver, char *ip_addr,
895                            int audio_port, int video_port)
896 {
897   struct sockaddr_storage   address;
898   struct in_addr            *ip4_addr;
899   struct in6_addr           *ip6_addr;
900   taskentry_t               *taskinfo;
901   unsigned                  count;
902   int                       nonzero_ip;
903 
904   debugprint("rtpstream_set_remote callinfo=%p, ip_ver %d ip_addr %s audio %d video %d\n",
905              callinfo, ip_ver, ip_addr, audio_port, video_port);
906 
907   taskinfo= callinfo->taskinfo;
908   if (!taskinfo) {
909     /* no task info found - cannot set remote data. just return */
910     return;
911   }
912 
913   nonzero_ip= 0;
914   taskinfo->flags|= TI_NULLIP;   /// TODO: this (may) cause a gap in playback, if playback thread gets to exec while this is set and before new IP is checked.
915 
916   /* test that media ip address version match remote ip address version? */
917 
918   /* initialise address family and IP address for remote socket */
919   memset(&address,0,sizeof(address));
920   if (media_ip_is_ipv6) {
921     /* process ipv6 address */
922     address.ss_family= AF_INET6;
923     ip6_addr= &((_RCAST(struct sockaddr_in6 *,&address))->sin6_addr);
924     if (inet_pton(AF_INET6,ip_addr,ip6_addr)==1) {
925       for (count=0;count<sizeof(*ip6_addr);count++) {
926         if (((char*)ip6_addr)[count]) {
927           nonzero_ip= 1;
928           break;
929         }
930       }
931     }
932   } else {
933     /* process ipv4 address */
934     address.ss_family= AF_INET;
935     ip4_addr= &((_RCAST(struct sockaddr_in *,&address))->sin_addr);
936     if (inet_pton(AF_INET,ip_addr,ip4_addr)==1) {
937       for (count=0;count<sizeof(*ip4_addr);count++) {
938         if (((char*)ip4_addr)[count]) {
939           nonzero_ip= 1;
940           break;
941         }
942       }
943     }
944   }
945 
946   if (!nonzero_ip) {
947     return;
948   }
949 
950   /* enter critical section to lock address updates */
951   /* may want to leave this out -- low chance of race condition */
952   pthread_mutex_lock (&(taskinfo->mutex));
953 
954   /* clear out existing addresses  */
955   memset (&(taskinfo->remote_audio_rtp_addr),0,sizeof(taskinfo->remote_audio_rtp_addr));
956   memset (&(taskinfo->remote_audio_rtcp_addr),0,sizeof(taskinfo->remote_audio_rtcp_addr));
957   memset (&(taskinfo->remote_video_rtp_addr),0,sizeof(taskinfo->remote_video_rtp_addr));
958   memset (&(taskinfo->remote_video_rtcp_addr),0,sizeof(taskinfo->remote_video_rtcp_addr));
959 
960   /* Audio */
961   if (audio_port) {
962     if (media_ip_is_ipv6) {
963       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)audio_port);
964     } else {
965       (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)audio_port);
966     }
967     memcpy (&(taskinfo->remote_audio_rtp_addr),&address,sizeof(address));
968 
969     if (media_ip_is_ipv6) {
970       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)audio_port+1);
971     } else {
972       (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)audio_port+1);
973     }
974     memcpy (&(taskinfo->remote_audio_rtcp_addr),&address,sizeof(address));
975 
976     taskinfo->flags&= ~TI_NULL_AUDIOIP;
977   }
978 
979   /* Video */
980   if (video_port) {
981     if (media_ip_is_ipv6) {
982       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)video_port);
983     } else {
984       (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)video_port);
985     }
986     memcpy (&(taskinfo->remote_video_rtp_addr),&address,sizeof(address));
987 
988     if (media_ip_is_ipv6) {
989       (_RCAST(struct sockaddr_in6 *,&address))->sin6_port= htons((short)video_port+1);
990     } else {
991       (_RCAST(struct sockaddr_in *,&address))->sin_port= htons((short)video_port+1);
992     }
993     memcpy (&(taskinfo->remote_video_rtcp_addr),&address,sizeof(address));
994 
995     taskinfo->flags&= ~TI_NULL_VIDEOIP;
996   }
997 
998   /* ok, we are done with the shared memory objects. let go mutex */
999   pthread_mutex_unlock (&(taskinfo->mutex));
1000 
1001   taskinfo->flags|= TI_RECONNECTSOCKET;
1002 
1003   /* may want to start a playback (listen) task here if no task running? */
1004   /* only makes sense if we decide to send 0-filled packets on idle */
1005 }
1006 
1007 /* code checked */
rtpstream_play(rtpstream_callinfo_t * callinfo,rtpstream_actinfo_t * actioninfo)1008 void rtpstream_play (rtpstream_callinfo_t *callinfo, rtpstream_actinfo_t *actioninfo)
1009 {
1010   debugprint ("rtpstream_play callinfo=%p filename %s loop %d bytes %d payload %d ptime %d tick %d\n",callinfo,actioninfo->filename,actioninfo->loop_count,actioninfo->bytes_per_packet,actioninfo->payload_type,actioninfo->ms_per_packet,actioninfo->ticks_per_packet);
1011 
1012   int           file_index= rtpstream_cache_file (actioninfo->filename);
1013   taskentry_t   *taskinfo= callinfo->taskinfo;
1014 
1015   if (file_index<0) {
1016     return; /* cannot find file to play */
1017   }
1018 
1019   if (!taskinfo) {
1020     return; /* no task data structure */
1021   }
1022 
1023   /* make sure we have an open socket from which to play the audio file */
1024   rtpstream_get_audioport (callinfo);
1025 
1026   /* save file parameter in taskinfo structure */
1027   taskinfo->new_loop_count= actioninfo->loop_count;
1028   taskinfo->new_bytes_per_packet= actioninfo->bytes_per_packet;
1029   taskinfo->new_file_size= cached_files[file_index].filesize;
1030   taskinfo->new_file_bytes= cached_files[file_index].bytes;
1031   taskinfo->new_ms_per_packet= actioninfo->ms_per_packet;
1032   taskinfo->new_timeticks_per_packet= actioninfo->ticks_per_packet;
1033   taskinfo->new_payload_type= actioninfo->payload_type;
1034 
1035   /* set flag that we have a new file to play */
1036   taskinfo->flags|= TI_PLAYFILE;
1037 }
1038 
1039 /* code checked */
rtpstream_pause(rtpstream_callinfo_t * callinfo)1040 void rtpstream_pause (rtpstream_callinfo_t *callinfo)
1041 {
1042   debugprint ("rtpstream_pause callinfo=%p\n",callinfo);
1043 
1044   if (callinfo->taskinfo) {
1045     callinfo->taskinfo->flags|= TI_PAUSERTP;
1046   }
1047 }
1048 
1049 /* code checked */
rtpstream_resume(rtpstream_callinfo_t * callinfo)1050 void rtpstream_resume (rtpstream_callinfo_t *callinfo)
1051 {
1052   debugprint ("rtpstream_resume callinfo=%p\n",callinfo);
1053 
1054   if (callinfo->taskinfo) {
1055     callinfo->taskinfo->flags&= ~TI_PAUSERTP;
1056   }
1057 }
1058 
1059 /* code checked */
rtpstream_shutdown(void)1060 void rtpstream_shutdown (void)
1061 {
1062   int            count= 0;
1063 
1064   debugprint ("rtpstream_shutdown\n");
1065 
1066   /* signal all playback threads that they should exit */
1067   if (ready_threads) {
1068     for (count=0;count<num_ready_threads;count++) {
1069       ready_threads[count]->exit_flag= 1;
1070     }
1071     free (ready_threads);
1072     ready_threads= NULL;
1073   }
1074 
1075   if (busy_threads) {
1076     for (count=0;count<num_busy_threads;count++) {
1077       busy_threads[count]->exit_flag= 1;
1078     }
1079     free(busy_threads);
1080     busy_threads = NULL;
1081   }
1082 
1083   /* first make sure no playback threads are accessing the file buffers */
1084   /* else small chance the playback thread tries to access freed memory */
1085   while (rtpstream_numthreads) {
1086     usleep (50000);
1087   }
1088 
1089   /* now free cached file bytes and structure */
1090   for (count=0;count<num_cached_files;count++) {
1091     free (cached_files[count].bytes);
1092   }
1093   if (cached_files) {
1094     free (cached_files);
1095     cached_files= NULL;
1096   }
1097 }
1098