1 /*
2
3 Copyright 2008--2013, Centre for Advanced Internet Architectures,
4 Swinburne University of Technology, http://caia.swin.edu.au
5
6 Author: Amiel Heyde, amiel@swin.edu.au
7
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License version 2 as
10 published by the Free Software Foundation.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
21 $Id: master.c 171 2015-05-20 05:58:54Z szander $
22
23 */
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <netdb.h>
31 #include <sys/types.h>
32 #include <netinet/in.h>
33 #include <sys/socket.h>
34 #include <sys/queue.h>
35 #include <pcap.h>
36 #include <netdb.h>
37 #include <pthread.h>
38
39 #include "instance.h"
40 #include "record.h"
41 #include "pair.h"
42 #include "spptool.h"
43 #include "master.h"
44 #include "rtp.h"
45
46
47 extern uint16_t scale;
48 extern size_t ts_len;
49 extern int finished;
50 extern int verbosity;
51
52 PRIVATE int sockfd;
53 PRIVATE struct sockaddr_in slave_addr; // connector's address information
54
55
56
loadMaster(monitor_point_t * mpoint,const char * name)57 void loadMaster(monitor_point_t * mpoint, const char * name) {
58
59 struct timeval timeout;
60 struct hostent *slave;
61 struct sockaddr_in my_addr;
62
63 timeout.tv_sec = 1;
64 timeout.tv_usec = 0;
65 if ((slave = gethostbyname(name)) == NULL) { // get the host info
66 perror("Error looking up host");
67 exit(EXIT_FAILURE);
68 }
69
70 if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
71 perror("Error getting socket");
72 exit(EXIT_FAILURE);
73 }
74
75
76 if(setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
77 perror("Error setting socket options");
78 exit(EXIT_FAILURE);
79 }
80 slave_addr.sin_family = AF_INET; // host byte order
81 slave_addr.sin_port = htons(PORT); // short, network byte order
82 slave_addr.sin_addr = *((struct in_addr *)slave->h_addr);
83 memset(slave_addr.sin_zero, '\0', sizeof slave_addr.sin_zero);
84
85 my_addr.sin_family = AF_INET; // host byte order
86 my_addr.sin_port = htons(PORT); // short, network byte order
87 my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP
88 memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
89 if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof my_addr) == -1) {
90 perror("Error binding to port");
91 exit(EXIT_FAILURE);
92 }
93 }
94
95
runMaster(void * args)96 void * runMaster(void * args) {
97
98 instance_t * ins;
99 char buf[MAX_PKT_LEN];
100 void * buf_ptr;
101 unsigned int no_to_recv, recv_count;
102 int numbytes;
103 direction_t direction;
104 socklen_t addr_len;
105 char ts_code;
106 size_t ts_len;
107 char hash_len;
108
109 rtp_hdr_t recv_hdr;
110
111 //ts_len = ts_code;
112 hash_len = 4;
113
114 //Offset timestamp vars
115 struct timeval prev_ts;
116
117 monitor_point_t * mpoint = (monitor_point_t *)args;
118
119
120 while(!finished) {
121
122 addr_len = sizeof slave_addr;
123
124 do {
125 numbytes = recvfrom(sockfd, buf, MAX_PKT_LEN, 0, (struct sockaddr *)&slave_addr, &addr_len);
126 if (finished) break;
127 } while(numbytes == -1);
128
129 if(finished) {
130 continue; // get out of here so we can exit this function
131 } else if(numbytes < 12) {
132 printf("ERROR: Received undersized packet!\n");
133 }
134 else {
135 buf_ptr = buf;
136
137 // extract header
138 memcpy(&recv_hdr, buf_ptr, sizeof(recv_hdr));
139 buf_ptr += sizeof(recv_hdr);
140
141 ts_code = recv_hdr.pt;
142 if(ts_code == 0) { // If this is an empty packet
143 continue; // Ignore it
144 }
145
146 prev_ts.tv_sec = recv_hdr.ts; // put the reference 'seconds' in the previous to ensure the first timestamp calculation works
147 prev_ts.tv_usec = recv_hdr.ssrc; // grab the usec from ssrc field of rtp
148
149 ts_len = ts_code; // may need to change as payload type is defined differently
150
151 if(verbosity & 128) {
152 printf("RTP HEADER: ts: %u, pt: %u\n", recv_hdr.ts, recv_hdr.pt);
153 printf("Initial Timestamp: %llu, %llu\n", (unsigned long long) prev_ts.tv_sec, (unsigned long long) prev_ts.tv_usec);
154 }
155
156 no_to_recv = ((numbytes - sizeof(recv_hdr)) / (ts_len + hash_len));
157
158 if(verbosity & 256)
159 printf("Received %u Bytes, %u Instances\n", numbytes, no_to_recv);
160
161
162 recv_count = 0;
163 while(recv_count < no_to_recv) {
164 ins = malloc(sizeof(instance_t)); // allocate space for new instance
165 // extract hash
166 memcpy(&ins->pkt_id, buf_ptr, sizeof(uint64_t));
167 buf_ptr += sizeof(uint64_t);
168
169
170 if(ts_code == ABSOLUTE) {
171 memcpy(&ins->ts, buf_ptr, sizeof(struct timeval)); //read the timestamp directly
172 buf_ptr += sizeof(struct timeval);
173 direction = (direction_t)((ins->ts.tv_sec & (1 << 31)) != 0); // clear bit
174 ins->ts.tv_sec &= ~(1 << 31);
175 break;
176 }
177 else {
178 direction = (direction_t)((*((u_char *)buf_ptr) & (1 << 7)) != 0); // get direction from the first bit of the usec offset field
179 *(u_char *)buf_ptr &= ~(1 << 7); // clear the direction bit
180
181 memcpy(&ins->ts.tv_usec, buf_ptr, ts_len); // start with the offset received usec
182 buf_ptr += ts_len;
183 ins->ts.tv_usec = ntohl(ins->ts.tv_usec); // put usec offset back to host order
184 ins->ts.tv_usec = ins->ts.tv_usec >> (8 * (sizeof(uint32_t) - ts_len)); // realign to adjust for the larger size on the host
185
186 ins->ts.tv_usec *= scale; // multiply by the offset to correct for division at slave end
187
188 if(verbosity & 1024) printf("Incoming offset: %llu\n", (unsigned long long) ins->ts.tv_usec);
189
190 ins->ts.tv_usec += prev_ts.tv_usec; // add in the previos usec
191 ins->ts.tv_sec = ins->ts.tv_usec / 1000000; // carry seconds over
192 ins->ts.tv_usec %= 1000000; // since we have carried.. set usec as remainder
193 ins->ts.tv_sec += prev_ts.tv_sec; // finally add previous seconds
194
195 prev_ts = ins->ts; // prepare the prev_ts for next time
196 }
197
198 pthread_mutex_lock(&mpoint->q_mutex[direction]);
199 TAILQ_INSERT_TAIL(&mpoint->instance_q[direction], ins, entries); // Insert instance into appropriate queue
200 pthread_mutex_unlock(&mpoint->q_mutex[direction]);
201 pthread_mutex_lock(&mpoint->q_size_mutex[direction]);
202 mpoint->q_size[direction]++;
203 pthread_mutex_unlock(&mpoint->q_size_mutex[direction]);
204 if(verbosity & 16) printf("NETWORK: Added %lu to mpoint %u instance_q[%u] - timestamp: %llu.%06llu\n",
205 ins->pkt_id, mpoint->id, direction, (unsigned long long) ins->ts.tv_sec, (unsigned long long) ins->ts.tv_usec);
206 recv_count++;
207 }
208 }
209 }
210 close(sockfd);
211 pthread_exit(NULL);
212 }
213
214
215