1 #include "worker.h"
2 #include "util.h"
3 #include <sys/socket.h>
4 extern unsigned long gchaos;
5 extern unsigned long gchaoscounter;
6
chaos_fail()7 int chaos_fail() {
8 // give up sometimes
9 if (( gchaoscounter-- == 1) && gchaos ) {
10 gchaoscounter = gchaos;
11 whisper (1, "chaos inserted");
12 return ( 1 );
13 }
14 return ( 0 );
15 }
16
bufferfill(int fd,u_char * __restrict dest,size_t size)17 ssize_t bufferfill ( int fd, u_char * __restrict dest, size_t size ) {
18 // forcefully read utill a bufffer completes or EOF
19 // XXX replace with kqueue - perhaps
20 int remainder=size;
21 u_char * dest_cursor = dest;
22 ssize_t accumulator=0;
23 ssize_t readsize;
24 int fuse=1055; // don't spin forever
25 int sleep_thief =0;
26 assert ( dest != NULL );
27 do {
28 checkperror ( "bufferfill nuiscance err");
29 if ( errno !=0 ) {
30 whisper (3, "ignoring sig %i\n", errno);
31 errno = 0 ;
32 }
33 //reset nuiscances
34 readsize = read( fd, dest_cursor, MIN( MAXBSIZE, remainder) );
35 if ( errno != 0 ) {
36 whisper ( 3, "erno: %i readsize %ld requestedsiz: %d fd:%i dest:%p \n", errno , readsize, MIN( MAXBSIZE, remainder), fd, dest_cursor );
37 }
38 checkperror( "bufferfill read err");
39 /*
40 whisper( 20, "txingest: read stdin size %ld offset:%i remaining %i \n",
41 readsize,
42 (int) ((u_char*)dest_cursor - (u_char*)dest),
43 remainder );
44 */
45 if ( readsize < 0 ) {
46 whisper (2, "negative read");
47 perror ( "negread");
48 break;
49 }
50 else {
51 remainder -= readsize ;
52 assert ( remainder >= 0);
53 accumulator += readsize;
54 dest_cursor += readsize;
55 if ( readsize < MAXBSIZE) {
56 // discourage tinygrams - they just beat us up and chew the cpu
57 // XXX histgram the readsize and use ema to track optimal effort
58 sleep_thief ++;
59 } else {
60 sleep_thief= 0;
61 }
62 usleep ( sleep_thief );
63 if ( readsize < 1 ) { // short reads are the end
64 break;
65 }
66 }
67 } while ( (remainder > 0) && ( fuse-- > 0 ) );
68 assert ( (fuse > 1 ) && "fuse blown" );
69 return ( (fuse < 1 )? -1 : accumulator );
70 }
stopwatch_start(struct timespec * t)71 void stopwatch_start (struct timespec * t ) {
72 assert ( clock_gettime( CLOCK_UPTIME, t ) == 0);
73 }
stopwatch_stop(struct timespec * t,int whisper_channel)74 int stopwatch_stop ( struct timespec * t , int whisper_channel) {
75 // stop the timer started at t
76 // returns usec resolution diff of time
77 struct timespec stoptime;
78 assert ( clock_gettime( CLOCK_UPTIME, &stoptime ) == 0 );
79 time_t secondsdiff = stoptime.tv_sec - t->tv_sec ;
80 long nanoes = stoptime.tv_nsec - t->tv_nsec;
81 if ( nanoes < 0 ) {
82 // borrow billions place nanoseconds to come up true
83 nanoes += 1000000000;
84 secondsdiff --;
85 }
86 u_long ret = MIN( ULONG_MAX, (secondsdiff * 1000000 ) + (nanoes/1000)); //in usec
87 if ( whisper_channel > 0 ) { whisper ( whisper_channel, "%li.%03li", secondsdiff, nanoes/1000000); }
88 return ret;
89 }
90 //return a connected socket fd
tcp_connect(char * host,int port)91 int tcp_connect ( char * host, int port ) {
92 int ret_sockfd= -1;
93 int retcode;
94 struct hostent *lhostent;
95 struct addrinfo laddrinfo;
96 struct sockaddr_in lsockaddr;
97 //struct sockaddr lsockaddr;
98 lhostent = gethostbyname ( host );
99 if ( h_errno != 0 ) herror ( "gethostenterror" );
100 assert ( h_errno == 0 && "hostname fishy");
101 lsockaddr.sin_family = AF_INET;
102 lsockaddr.sin_port = htons( port );
103 memcpy(&(lsockaddr.sin_addr), lhostent->h_addr_list [0], sizeof(struct in_addr)); //y u hate c?
104 ret_sockfd = socket ( AF_INET,SOCK_STREAM, 0 );
105 assert ( ret_sockfd > 0 && "socket fishy");
106 retcode = connect ( ret_sockfd, (struct sockaddr*) &(lsockaddr), sizeof (struct sockaddr) );
107 if ( retcode != 0 ) {
108 whisper ( 1, "tx: connect failed to %s:%d fd: %i \n", host, port, ret_sockfd);
109 //our only output is the socketfd, so trash it
110 ret_sockfd = -1;
111 whisper ( 19, "trashing fd to fail %s:%d fd: %i \n", host, port, ret_sockfd);
112 } else {
113 int sockerr;
114 u_int sockerrsize = sizeof(sockerr); //uhg
115 getsockopt ( ret_sockfd , SOL_SOCKET, SO_ERROR, &sockerr, &sockerrsize);
116 checkperror ( "connect");
117 assert ( sockerr == 0);
118 whisper (8, " connected to %s:%i\n", host, port);
119 }
120 return ( ret_sockfd );
121 }
tcp_recieve_prep(struct sockaddr_in * sa,int * socknum,int inport)122 int tcp_recieve_prep (struct sockaddr_in * sa, int * socknum, int inport) {
123 int one=1;
124 int retcode;
125 int lsock = -1;
126 *socknum = socket (AF_INET, SOCK_STREAM, 0);
127 sa->sin_family= AF_INET;
128 sa->sin_addr.s_addr= htons (INADDR_ANY);
129 sa->sin_port = htons (inport);
130 whisper (7, "bind sockid: %i\n",*socknum);
131 setsockopt(*socknum,SOL_SOCKET,SO_REUSEPORT,(char *)&one,sizeof(one));
132 retcode = bind (*socknum, (struct sockaddr *) sa,sizeof (struct sockaddr_in) );
133 if ( retcode != 0 ) { perror ("bind failed"); assert (0); }
134 checkperror("bindfail");
135 whisper (9, " listen sockid: %i\n",*socknum);
136 retcode = listen ( *socknum, 6) ;
137 if ( retcode != 0) { perror ("listen fail:\n"); assert ( 0 );};
138 return retcode ;
139 }
140
tcp_accept(struct sockaddr_in * sa,int socknum)141 int tcp_accept(struct sockaddr_in * sa , int socknum ){
142 int out_sockfd;
143 socklen_t socklen = sizeof (struct sockaddr_in ) ;
144 whisper (17, " accept sockid: %i\n",socknum);
145 out_sockfd = accept (socknum,(struct sockaddr *)sa,&socklen);
146 whisper (13, " socket %i accepted to fd:%i \n" , socknum,out_sockfd);
147 checkperror ("acceptor");
148 return (out_sockfd);
149 }
150
151
mix(unsigned int seed,void * data,unsigned long size)152 unsigned long mix ( unsigned int seed, void * data, unsigned long size ) {
153 unsigned acc = seed;
154 for ( unsigned long cursor = 0 ; cursor <= size; cursor += sizeof ( unsigned long) ) { // XX we miss the tail for unaligned sizes
155 acc += *( unsigned long * ) data + cursor;
156 }
157 return ( acc );
158 }
159