1 #include "worker.h"
2 
3 extern char * gcheckphrase;
4 unsigned long grx_saved_checksum = 0xff;
5 
rxworker(struct rxworker_s * rxworker)6 void rxworker ( struct rxworker_s * rxworker ) {
7 int readsize;
8 int restartme=0;
9 int readlen;  // XXX
10 char * buffer;
11 char  okphrase[] ="ok";
12 char  checkphrase[] ="yoes";
13 buffer = calloc ( 1 , (size_t)kfootsize );
14 
15 while ( !rxworker->rxconf_parent->done_mbox ) {
16 	restartme =0;
17 	assert ( rxworker->id < kthreadmax );
18 	pthread_mutex_lock  ( &rxworker->rxconf_parent->sa_mutex);
19 	whisper ( 7, "rxw:%02i accepting and locked\n", rxworker->id);
20 	rxworker->sockfd = tcp_accept ( &(rxworker->rxconf_parent->sa), rxworker->rxconf_parent->socknum);
21 	whisper ( 5, "rxw:%02i accepted fd:%i \n", rxworker->id, rxworker->sockfd);
22 	pthread_mutex_unlock( &rxworker->rxconf_parent->sa_mutex );
23 	whisper ( 16, "rxw:%02i fd:%i expect %s\n", rxworker->id, rxworker->sockfd, gcheckphrase);
24 	read ( rxworker->sockfd , buffer, (size_t)  4 ); //XXX this is vulnerable to slow starts
25 	if ( bcmp ( buffer, gcheckphrase, 4 ) == 0 )
26 		{ whisper ( 13, "rxw:%02d checkphrase ok\n", rxworker->id); }
27 	else { whisper  (1,  "rxw:%02d checkphrase failure  got: %x  %x %x %x", rxworker->id, buffer[0], buffer[1], buffer[2], buffer[4]);
28 		assert ( -1 && "checkphrase failure ");
29 		exit ( -1);
30 	}
31 	checkperror ("checkphrase  nuiscance ");
32 	whisper ( 18, "rxw:%02i send ok\n", rxworker->id );
33 	if ( write (rxworker->sockfd, okphrase, (size_t) 2 ) != 2 )  {
34 		whisper( 1, "write fail " );
35 		assert ( -1  && "okwritefail");
36 		exit (-36);
37 	}
38 	checkperror ( "signaturewrite ");
39 	// /usr/src/contrib/netcat has a nice pipe input routine XXX perhaps lift it
40 	// XXX stop using bespoke heirloom bufferfill routines.
41 	while ( !rxworker->rxconf_parent->done_mbox && (!restartme)) {
42 		struct millipacket_s pkt;
43 		int cursor = 0;
44 		int preamble_cursor=0;
45 		int preamble_fuse=0;
46 		unsigned long rx_checksum =0 ;
47 		while ( preamble_cursor < sizeof(struct millipacket_s) && (!restartme))   {
48 			//fill the preamble + millipacket structure whole
49 			checkperror ("nuicsance before preamble read");
50 			whisper ( ((errno == 0)?19:3) ,"rxw:%02i prepreamble and millipacket: errno:%i cursor:%i\n", rxworker->id, errno, preamble_cursor);
51 			//XX regrettable pointer cast + arithmatic otherwise we get sizeof(pkt) * cursor
52 			readlen = read ( rxworker->sockfd , ((u_char*)&pkt)+preamble_cursor, ( sizeof(struct millipacket_s) - preamble_cursor ));
53 			checkperror ("preamble read");
54 			whisper ( ((errno == 0)?19:3) ,"rxw:%02i preamble and millipacket: len %i errno:%i cursor:%i\n", rxworker->id, readlen,errno, preamble_cursor);
55 			assert (( readlen >= 0 ) && " bad packet header read");
56 			if ( readlen == 0 ) {
57 				whisper ( 6, "rxw:%02i exits after empty preamble\n", rxworker->id);
58 				pthread_exit( rxworker );  // no really we are done, and who wants our exit status?
59 				continue;
60 			};
61 			if ( readlen < (sizeof(struct millipacket_s) - preamble_cursor) ) {
62 				whisper ( 7, "rx: short header %i(read)< %lu (headersize), preamble cursor: %i \n",
63 					 readlen, sizeof(struct millipacket_s),preamble_cursor);
64 				checkperror ("short read");
65 			}
66 			preamble_cursor += readlen;
67 			assert ( preamble_cursor <= sizeof(struct millipacket_s));
68 			preamble_fuse ++ ;
69 			assert ( preamble_fuse < 100000 && " preamble fuse popped, check network ");
70 		}
71 		assert ( preamble_cursor == sizeof(struct millipacket_s) );
72 		assert ( pkt.preamble == preamble_cannon_ul   && "preamble check");
73 		assert ( pkt.size >= 0 );
74 		assert ( pkt.size <= kfootsize);
75 		whisper ( 9, "rxw:%02i leg:%lu siz:%lu op:%x caught new leg\n", rxworker->id,  pkt.leg_id , pkt.size,pkt.opcode);
76 		int remainder = pkt.size;
77 		int remainder_counter =0;
78 		assert ( remainder <= kfootsize);
79 		while ( remainder  && (!restartme) ) {
80 			readsize = read( rxworker->sockfd, buffer+cursor, MIN(remainder, MAXBSIZE ));
81 			checkperror ( "rx: read failure\n");
82 			if ( errno != 0   || readsize == 0  ) {
83 				whisper (4 , "rxw:%02i retired due to read len:%i errno:%i\n",rxworker->id,readsize, errno);
84 				restartme=1;
85 			}
86 			cursor += readsize;
87 			assert( cursor <= kfootsize );
88 			remainder -= readsize ;
89 			assert (remainder >=0);
90 			if (readsize == 0 && (!restartme) ) {
91 				whisper ( 2, "rx: 0 byte read ;giving up. are we done?\n" );  // XXX this should not be the end
92 				break;
93 			}
94 			checkperror ("read buffer");
95 
96 			whisper( (errno != 0)?3:19 , "rx%02i %lu[%i]-[%i]\t%c",
97 				rxworker->id, pkt.leg_id ,
98 				readsize >> 10 ,remainder >>10,
99 				((remainder_counter++)%16 == 0 ) ? (int)10:(int)' ' ) ;
100 		}
101 		whisper( 8, "\nrxw:%02i leg:%lu buffer filled to :%i\n", rxworker->id, pkt.leg_id,  cursor) ;
102 		checkperror ("read leg");
103 		/*block until the sequencer is ready to push this
104 		 XXXX  suboptimal  sequencer ?? prove it!
105 		 perhaps a minheap??
106 
107 		Heisenberg compensator theory of operation:
108 		next_leg will monotonically increment  asserting that the output stream is
109 		ordered by tracking it's assingment from the ingest code.
110 
111 		If the sequencer blocks for an extended time; it's unlikely to ever get better
112 		so decare and error and exit
113 		*/
114 		int sequencer_stalls =0 ;
115 		while( pkt.leg_id != rxworker->rxconf_parent->next_leg  && ( !restartme ) ) {
116 			pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
117 #define ktimeout ( 1000 * 13000 )
118 #define ktimeout_chunks ( 250000  )
119 			usleep ( ktimeout / ktimeout_chunks );
120 			sequencer_stalls++;
121 			assert ( sequencer_stalls  < ktimeout_chunks && "transporter accident! rx seqencer stalled");
122 			pthread_mutex_lock( &rxworker->rxconf_parent->rxmutex ); // do nothing but compare seqeuncer under lock
123 		}
124 		pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
125 		if ( !restartme ) {
126 			whisper ( 5, "rxw:%02i sequenced leg:%08lu[%08lu]after %05i stalls\n", rxworker->id,  pkt.leg_id, pkt.size, sequencer_stalls);
127 			remainder = pkt.size;
128 			int writesize=0;
129 			cursor = 0 ;
130 			while (  remainder ) {
131 				writesize = write ( STDOUT_FILENO, buffer+cursor, (size_t) MIN( remainder, MAXBSIZE ));
132 				cursor += writesize;
133 				remainder -= writesize ;
134 			}
135 			checkperror ("write buffer");
136 			readlen = readsize = -111;
137 			if ( pkt.opcode == end_of_millipede ) {
138 				whisper ( 5, "rxw:%02i caught 0x%x done with last frame\n", rxworker->id,  pkt.opcode);
139 				rxworker->rxconf_parent->done_mbox = 1; //XXX xxx
140 			}
141 			else {
142 				// the last frame will be empty and have a borken cksum
143 				if ( pkt.checksum )	{
144 
145 					rx_checksum = mix ( grx_saved_checksum + pkt.leg_id, buffer, pkt.size );
146 					if ( rx_checksum != pkt.checksum )
147 						{
148 						whisper( 2 , "rx checksum mismatch %lu != %lu", rx_checksum, pkt.checksum);
149 						whisper ( 2, "rxw:%02i offending leg:%lu.%lu after %i stalls\n", rxworker->id,  pkt.leg_id, pkt.size, sequencer_stalls);
150 						}
151 					assert ( rx_checksum == pkt.checksum ) ;
152 					grx_saved_checksum = rx_checksum;
153 				};
154 
155 			}
156 				pthread_mutex_lock( &rxworker->rxconf_parent->rxmutex );
157 			rxworker->rxconf_parent->next_leg ++ ; // do this last or race out the cksum code
158 			pthread_mutex_unlock( &rxworker->rxconf_parent->rxmutex );
159 		} // if not restartme
160 	}// while !done_mbox
161 	whisper( 5, "rxw:%02i exiting work restartme block\n", rxworker->id);
162 } //restartme
163 whisper( 4, "rxw:%02i done\n", rxworker->id);
164 }
165 
rxlaunchworkers(struct rxconf_s * rxconf)166 void rxlaunchworkers ( struct rxconf_s * rxconf ) {
167 	int worker_cursor=0;
168 	int retcode ;
169 	rxconf->done_mbox = 0;
170 	rxconf->next_leg = 0;  //initalize sequencer
171 	if (tcp_recieve_prep(&(rxconf->sa), &(rxconf->socknum),  rxconf->port ) != 0) {
172 		whisper ( 1, "rx: tcp prep failed. this is unfortunate. " );
173 		assert( -1);
174 	}
175 	do  {
176 		rxconf->workers[worker_cursor].id = worker_cursor;
177 		rxconf->workers[worker_cursor].rxconf_parent = rxconf;
178 		// all the bunnies made to hassenfeffer
179 		retcode = pthread_create (
180 			&rxconf->workers[worker_cursor].thread,
181 			NULL, //attrs - none
182 			(void *)&rxworker,
183                         &(rxconf->workers[worker_cursor])
184 		);
185 		whisper ( 18, "rxw:%02i threadlaunched\n", worker_cursor);
186 		checkperror ( "rxpthreadlaunch");
187 		assert ( retcode == 0 && "pthreadlaunch");
188 		worker_cursor++;
189 	} while ( worker_cursor < (kthreadmax ) );
190 	whisper( 7, "rx: worker group launched\n");
191 }
rx(struct rxconf_s * rxconf)192 void rx (struct rxconf_s* rxconf) {
193 	pthread_mutex_init ( &(rxconf->sa_mutex), NULL );
194 	pthread_mutex_init ( &(rxconf->rxmutex ), NULL );
195 	rxconf->done_mbox = 0;
196 	rxlaunchworkers( rxconf);
197 	while ( ! rxconf->done_mbox ) {
198 		pthread_mutex_unlock ( &(rxconf->rxmutex) );
199 		usleep ( 10000);
200 		//XXX  join or  hoist out the lame pthread_exits();
201 		pthread_mutex_lock ( &(rxconf->rxmutex) );
202 	}
203 	whisper ( 4,"rx: done\n");
204 }
205 
206