1 #include "worker.h"
2
3 extern char * gcheckphrase;
4 unsigned long grx_saved_checksum = 0xff;
5
rxworker(struct rxworker_s * rxworker)6 void rxworker ( struct rxworker_s * rxworker ) {
7 int readsize;
8 int restartme=0;
9 int readlen; // XXX
10 char * buffer;
11 char okphrase[] ="ok";
12 char checkphrase[] ="yoes";
13 buffer = calloc ( 1 , (size_t)kfootsize );
14
15 while ( !rxworker->rxconf_parent->done_mbox ) {
16 restartme =0;
17 assert ( rxworker->id < kthreadmax );
18 pthread_mutex_lock ( &rxworker->rxconf_parent->sa_mutex);
19 whisper ( 7, "rxw:%02i accepting and locked\n", rxworker->id);
20 rxworker->sockfd = tcp_accept ( &(rxworker->rxconf_parent->sa), rxworker->rxconf_parent->socknum);
21 whisper ( 5, "rxw:%02i accepted fd:%i \n", rxworker->id, rxworker->sockfd);
22 pthread_mutex_unlock( &rxworker->rxconf_parent->sa_mutex );
23 whisper ( 16, "rxw:%02i fd:%i expect %s\n", rxworker->id, rxworker->sockfd, gcheckphrase);
24 read ( rxworker->sockfd , buffer, (size_t) 4 ); //XXX this is vulnerable to slow starts
25 if ( bcmp ( buffer, gcheckphrase, 4 ) == 0 )
26 { whisper ( 13, "rxw:%02d checkphrase ok\n", rxworker->id); }
27 else { whisper (1, "rxw:%02d checkphrase failure got: %x %x %x %x", rxworker->id, buffer[0], buffer[1], buffer[2], buffer[4]);
28 assert ( -1 && "checkphrase failure ");
29 exit ( -1);
30 }
31 checkperror ("checkphrase nuiscance ");
32 whisper ( 18, "rxw:%02i send ok\n", rxworker->id );
33 if ( write (rxworker->sockfd, okphrase, (size_t) 2 ) != 2 ) {
34 whisper( 1, "write fail " );
35 assert ( -1 && "okwritefail");
36 exit (-36);
37 }
38 checkperror ( "signaturewrite ");
39 // /usr/src/contrib/netcat has a nice pipe input routine XXX perhaps lift it
40 // XXX stop using bespoke heirloom bufferfill routines.
41 while ( !rxworker->rxconf_parent->done_mbox && (!restartme)) {
42 struct millipacket_s pkt;
43 int cursor = 0;
44 int preamble_cursor=0;
45 int preamble_fuse=0;
46 unsigned long rx_checksum =0 ;
47 while ( preamble_cursor < sizeof(struct millipacket_s) && (!restartme)) {
48 //fill the preamble + millipacket structure whole
49 checkperror ("nuicsance before preamble read");
50 whisper ( ((errno == 0)?19:3) ,"rxw:%02i prepreamble and millipacket: errno:%i cursor:%i\n", rxworker->id, errno, preamble_cursor);
51 //XX regrettable pointer cast + arithmatic otherwise we get sizeof(pkt) * cursor
52 readlen = read ( rxworker->sockfd , ((u_char*)&pkt)+preamble_cursor, ( sizeof(struct millipacket_s) - preamble_cursor ));
53 checkperror ("preamble read");
54 whisper ( ((errno == 0)?19:3) ,"rxw:%02i preamble and millipacket: len %i errno:%i cursor:%i\n", rxworker->id, readlen,errno, preamble_cursor);
55 assert (( readlen >= 0 ) && " bad packet header read");
56 if ( readlen == 0 ) {
57 whisper ( 6, "rxw:%02i exits after empty preamble\n", rxworker->id);
58 pthread_exit( rxworker ); // no really we are done, and who wants our exit status?
59 continue;
60 };
61 if ( readlen < (sizeof(struct millipacket_s) - preamble_cursor) ) {
62 whisper ( 7, "rx: short header %i(read)< %lu (headersize), preamble cursor: %i \n",
63 readlen, sizeof(struct millipacket_s),preamble_cursor);
64 checkperror ("short read");
65 }
66 preamble_cursor += readlen;
67 assert ( preamble_cursor <= sizeof(struct millipacket_s));
68 preamble_fuse ++ ;
69 assert ( preamble_fuse < 100000 && " preamble fuse popped, check network ");
70 }
71 assert ( preamble_cursor == sizeof(struct millipacket_s) );
72 assert ( pkt.preamble == preamble_cannon_ul && "preamble check");
73 assert ( pkt.size >= 0 );
74 assert ( pkt.size <= kfootsize);
75 whisper ( 9, "rxw:%02i leg:%lu siz:%lu op:%x caught new leg\n", rxworker->id, pkt.leg_id , pkt.size,pkt.opcode);
76 int remainder = pkt.size;
77 int remainder_counter =0;
78 assert ( remainder <= kfootsize);
79 while ( remainder && (!restartme) ) {
80 readsize = read( rxworker->sockfd, buffer+cursor, MIN(remainder, MAXBSIZE ));
81 checkperror ( "rx: read failure\n");
82 if ( errno != 0 || readsize == 0 ) {
83 whisper (4 , "rxw:%02i retired due to read len:%i errno:%i\n",rxworker->id,readsize, errno);
84 restartme=1;
85 }
86 cursor += readsize;
87 assert( cursor <= kfootsize );
88 remainder -= readsize ;
89 assert (remainder >=0);
90 if (readsize == 0 && (!restartme) ) {
91 whisper ( 2, "rx: 0 byte read ;giving up. are we done?\n" ); // XXX this should not be the end
92 break;
93 }
94 checkperror ("read buffer");
95
96 whisper( (errno != 0)?3:19 , "rx%02i %lu[%i]-[%i]\t%c",
97 rxworker->id, pkt.leg_id ,
98 readsize >> 10 ,remainder >>10,
99 ((remainder_counter++)%16 == 0 ) ? (int)10:(int)' ' ) ;
100 }
101 whisper( 8, "\nrxw:%02i leg:%lu buffer filled to :%i\n", rxworker->id, pkt.leg_id, cursor) ;
102 checkperror ("read leg");
103 /*block until the sequencer is ready to push this
104 XXXX suboptimal sequencer ?? prove it!
105 perhaps a minheap??
106
107 Heisenberg compensator theory of operation:
108 next_leg will monotonically increment asserting that the output stream is
109 ordered by tracking it's assingment from the ingest code.
110
111 If the sequencer blocks for an extended time; it's unlikely to ever get better
112 so decare and error and exit
113 */
114 int sequencer_stalls =0 ;
115 while( pkt.leg_id != rxworker->rxconf_parent->next_leg && ( !restartme ) ) {
116 pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
117 #define ktimeout ( 1000 * 13000 )
118 #define ktimeout_chunks ( 250000 )
119 usleep ( ktimeout / ktimeout_chunks );
120 sequencer_stalls++;
121 assert ( sequencer_stalls < ktimeout_chunks && "transporter accident! rx seqencer stalled");
122 pthread_mutex_lock( &rxworker->rxconf_parent->rxmutex ); // do nothing but compare seqeuncer under lock
123 }
124 pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
125 if ( !restartme ) {
126 whisper ( 5, "rxw:%02i sequenced leg:%08lu[%08lu]after %05i stalls\n", rxworker->id, pkt.leg_id, pkt.size, sequencer_stalls);
127 remainder = pkt.size;
128 int writesize=0;
129 cursor = 0 ;
130 while ( remainder ) {
131 writesize = write ( STDOUT_FILENO, buffer+cursor, (size_t) MIN( remainder, MAXBSIZE ));
132 cursor += writesize;
133 remainder -= writesize ;
134 }
135 checkperror ("write buffer");
136 readlen = readsize = -111;
137 if ( pkt.opcode == end_of_millipede ) {
138 whisper ( 5, "rxw:%02i caught 0x%x done with last frame\n", rxworker->id, pkt.opcode);
139 rxworker->rxconf_parent->done_mbox = 1; //XXX xxx
140 }
141 else {
142 // the last frame will be empty and have a borken cksum
143 if ( pkt.checksum ) {
144
145 rx_checksum = mix ( grx_saved_checksum + pkt.leg_id, buffer, pkt.size );
146 if ( rx_checksum != pkt.checksum )
147 {
148 whisper( 2 , "rx checksum mismatch %lu != %lu", rx_checksum, pkt.checksum);
149 whisper ( 2, "rxw:%02i offending leg:%lu.%lu after %i stalls\n", rxworker->id, pkt.leg_id, pkt.size, sequencer_stalls);
150 }
151 assert ( rx_checksum == pkt.checksum ) ;
152 grx_saved_checksum = rx_checksum;
153 };
154
155 }
156 pthread_mutex_lock( &rxworker->rxconf_parent->rxmutex );
157 rxworker->rxconf_parent->next_leg ++ ; // do this last or race out the cksum code
158 pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
159 } // if not restartme
160 }// while !done_mbox
161 whisper( 5, "rxw:%02i exiting work restartme block\n", rxworker->id);
162 } //restartme
163 whisper( 4, "rxw:%02i done\n", rxworker->id);
164 }
165
rxlaunchworkers(struct rxconf_s * rxconf)166 void rxlaunchworkers ( struct rxconf_s * rxconf ) {
167 int worker_cursor=0;
168 int retcode ;
169 rxconf->done_mbox = 0;
170 rxconf->next_leg = 0; //initalize sequencer
171 if (tcp_recieve_prep(&(rxconf->sa), &(rxconf->socknum), rxconf->port ) != 0) {
172 whisper ( 1, "rx: tcp prep failed. this is unfortunate. " );
173 assert( -1);
174 }
175 do {
176 rxconf->workers[worker_cursor].id = worker_cursor;
177 rxconf->workers[worker_cursor].rxconf_parent = rxconf;
178 // all the bunnies made to hassenfeffer
179 retcode = pthread_create (
180 &rxconf->workers[worker_cursor].thread,
181 NULL, //attrs - none
182 (void *)&rxworker,
183 &(rxconf->workers[worker_cursor])
184 );
185 whisper ( 18, "rxw:%02i threadlaunched\n", worker_cursor);
186 checkperror ( "rxpthreadlaunch");
187 assert ( retcode == 0 && "pthreadlaunch");
188 worker_cursor++;
189 } while ( worker_cursor < (kthreadmax ) );
190 whisper( 7, "rx: worker group launched\n");
191 }
rx(struct rxconf_s * rxconf)192 void rx (struct rxconf_s* rxconf) {
193 pthread_mutex_init ( &(rxconf->sa_mutex), NULL );
194 pthread_mutex_init ( &(rxconf->rxmutex ), NULL );
195 rxconf->done_mbox = 0;
196 rxlaunchworkers( rxconf);
197 while ( ! rxconf->done_mbox ) {
198 pthread_mutex_unlock ( &(rxconf->rxmutex) );
199 usleep ( 10000);
200 //XXX join or hoist out the lame pthread_exits();
201 pthread_mutex_lock ( &(rxconf->rxmutex) );
202 }
203 whisper ( 4,"rx: done\n");
204 }
205
206