1 /*
2 
3     Copyright 2008--2013, Centre for Advanced Internet Architectures,
4     Swinburne University of Technology, http://caia.swin.edu.au
5 
6     Author: Amiel Heyde, amiel@swin.edu.au
7 
8     This program is free software; you can redistribute it and/or modify
9     it under the terms of the GNU General Public License version 2 as
10     published by the Free Software Foundation.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 
21     $Id: master.c 171 2015-05-20 05:58:54Z szander $
22 
23  */
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <netdb.h>
31 #include <sys/types.h>
32 #include <netinet/in.h>
33 #include <sys/socket.h>
34 #include <sys/queue.h>
35 #include <pcap.h>
36 #include <netdb.h>
37 #include <pthread.h>
38 
39 #include "instance.h"
40 #include "record.h"
41 #include "pair.h"
42 #include "spptool.h"
43 #include "master.h"
44 #include "rtp.h"
45 
46 
47 extern uint16_t scale;
48 extern size_t ts_len;
49 extern int finished;
50 extern int verbosity;
51 
52 PRIVATE int sockfd;
53 PRIVATE struct sockaddr_in slave_addr;             // connector's address information
54 
55 
56 
loadMaster(monitor_point_t * mpoint,const char * name)57 void loadMaster(monitor_point_t * mpoint, const char * name) {
58 
59   struct timeval timeout;
60   struct hostent *slave;
61   struct sockaddr_in my_addr;
62 
63   timeout.tv_sec = 1;
64   timeout.tv_usec = 0;
65   if ((slave = gethostbyname(name)) == NULL) {        // get the host info
66       perror("Error looking up host");
67       exit(EXIT_FAILURE);
68   }
69 
70   if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
71       perror("Error getting socket");
72       exit(EXIT_FAILURE);
73   }
74 
75 
76   if(setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
77       perror("Error setting socket options");
78       exit(EXIT_FAILURE);
79   }
80   slave_addr.sin_family = AF_INET;                    // host byte order
81   slave_addr.sin_port = htons(PORT);                  // short, network byte order
82   slave_addr.sin_addr = *((struct in_addr *)slave->h_addr);
83   memset(slave_addr.sin_zero, '\0', sizeof slave_addr.sin_zero);
84 
85   my_addr.sin_family = AF_INET;		             // host byte order
86   my_addr.sin_port = htons(PORT);	             // short, network byte order
87   my_addr.sin_addr.s_addr = INADDR_ANY;              // automatically fill with my IP
88   memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
89   if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof my_addr) == -1) {
90     perror("Error binding to port");
91       exit(EXIT_FAILURE);
92   }
93 }
94 
95 
runMaster(void * args)96 void * runMaster(void * args) {
97 
98   instance_t * ins;
99   char buf[MAX_PKT_LEN];
100   void * buf_ptr;
101   unsigned int no_to_recv, recv_count;
102   int numbytes;
103   direction_t direction;
104   socklen_t addr_len;
105   char ts_code;
106   size_t ts_len;
107   char hash_len;
108 
109   rtp_hdr_t recv_hdr;
110 
111   //ts_len = ts_code;
112   hash_len = 4;
113 
114   //Offset timestamp vars
115   struct timeval prev_ts;
116 
117   monitor_point_t * mpoint = (monitor_point_t *)args;
118 
119 
120   while(!finished) {
121 
122     addr_len = sizeof slave_addr;
123 
124     do {
125       numbytes = recvfrom(sockfd, buf, MAX_PKT_LEN, 0, (struct sockaddr *)&slave_addr, &addr_len);
126       if (finished) break;
127     } while(numbytes == -1);
128 
129     if(finished) {
130       continue; // get out of here so we can exit this function
131     } else if(numbytes < 12) {
132       printf("ERROR: Received undersized packet!\n");
133     }
134     else {
135       buf_ptr = buf;
136 
137       // extract header
138       memcpy(&recv_hdr, buf_ptr, sizeof(recv_hdr));
139       buf_ptr += sizeof(recv_hdr);
140 
141       ts_code = recv_hdr.pt;
142       if(ts_code == 0) {                              // If this is an empty packet
143         continue;                                     // Ignore it
144       }
145 
146       prev_ts.tv_sec = recv_hdr.ts;                   // put the reference 'seconds' in the previous to ensure the first timestamp calculation works
147       prev_ts.tv_usec = recv_hdr.ssrc;                // grab the usec from ssrc field of rtp
148 
149       ts_len = ts_code;                               // may need to change as payload type is defined differently
150 
151       if(verbosity & 128) {
152         printf("RTP HEADER: ts: %u, pt: %u\n", recv_hdr.ts, recv_hdr.pt);
153         printf("Initial Timestamp: %llu, %llu\n", (unsigned long long) prev_ts.tv_sec, (unsigned long long) prev_ts.tv_usec);
154       }
155 
156       no_to_recv = ((numbytes - sizeof(recv_hdr)) / (ts_len + hash_len));
157 
158       if(verbosity & 256)
159         printf("Received %u Bytes, %u Instances\n", numbytes, no_to_recv);
160 
161 
162       recv_count = 0;
163       while(recv_count < no_to_recv) {
164         ins = malloc(sizeof(instance_t));             // allocate space for new instance
165         // extract hash
166         memcpy(&ins->pkt_id, buf_ptr, sizeof(uint64_t));
167         buf_ptr += sizeof(uint64_t);
168 
169 
170         if(ts_code == ABSOLUTE) {
171             memcpy(&ins->ts, buf_ptr, sizeof(struct timeval));                      //read the timestamp directly
172             buf_ptr += sizeof(struct timeval);
173             direction = (direction_t)((ins->ts.tv_sec & (1 << 31)) != 0);           // clear bit
174             ins->ts.tv_sec &= ~(1 << 31);
175             break;
176         }
177         else {
178           direction = (direction_t)((*((u_char *)buf_ptr) & (1 << 7)) != 0);        // get direction from the first bit of the usec offset field
179           *(u_char *)buf_ptr &= ~(1 << 7);                                          // clear the direction bit
180 
181           memcpy(&ins->ts.tv_usec, buf_ptr, ts_len);                                // start with the offset received usec
182           buf_ptr += ts_len;
183           ins->ts.tv_usec = ntohl(ins->ts.tv_usec);                                 // put usec offset back to host order
184           ins->ts.tv_usec = ins->ts.tv_usec >> (8 * (sizeof(uint32_t) - ts_len));   // realign to adjust for the larger size on the host
185 
186           ins->ts.tv_usec *= scale;                                                 // multiply by the offset to correct for division at slave end
187 
188               if(verbosity & 1024) printf("Incoming offset: %llu\n", (unsigned long long) ins->ts.tv_usec);
189 
190           ins->ts.tv_usec += prev_ts.tv_usec;                                       // add in the previos usec
191           ins->ts.tv_sec = ins->ts.tv_usec / 1000000;                               // carry seconds over
192           ins->ts.tv_usec %= 1000000;                                               // since we have carried.. set usec as remainder
193           ins->ts.tv_sec += prev_ts.tv_sec;                                         // finally add previous seconds
194 
195           prev_ts = ins->ts;                                                      // prepare the prev_ts for next time
196         }
197 
198         pthread_mutex_lock(&mpoint->q_mutex[direction]);
199         TAILQ_INSERT_TAIL(&mpoint->instance_q[direction], ins, entries);        // Insert instance into appropriate queue
200         pthread_mutex_unlock(&mpoint->q_mutex[direction]);
201         pthread_mutex_lock(&mpoint->q_size_mutex[direction]);
202         mpoint->q_size[direction]++;
203         pthread_mutex_unlock(&mpoint->q_size_mutex[direction]);
204             if(verbosity & 16) printf("NETWORK: Added %lu to mpoint %u instance_q[%u] - timestamp: %llu.%06llu\n",
205                                       ins->pkt_id, mpoint->id, direction, (unsigned long long) ins->ts.tv_sec, (unsigned long long) ins->ts.tv_usec);
206         recv_count++;
207       }
208     }
209   }
210   close(sockfd);
211   pthread_exit(NULL);
212 }
213 
214 
215