1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <sys/types.h>
26 #include <stdio.h>
27 #ifdef HAVE_READV
28 #include <sys/uio.h>
29 #endif
30 #include <sys/time.h>
31 #include <unistd.h>
32 #include <poll.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <signal.h>
39 #include <pthread.h>
40 #include <inttypes.h>
41 
42 #include "massert.h"
43 #include "datapack.h"
44 #include "crc.h"
45 #include "strerr.h"
46 #include "mfsstrerr.h"
47 #include "pcqueue.h"
48 #include "sockets.h"
49 #include "conncache.h"
50 #include "csdb.h"
51 #include "mastercomm.h"
52 #include "clocks.h"
53 #include "portable.h"
54 #include "readdata.h"
55 #include "MFSCommunication.h"
56 
57 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
58 
59 #define WORKER_IDLE_TIMEOUT 1.0
60 
61 #define WORKER_BUSY_LAST_REQUEST_TIMEOUT 5.0
62 #define WORKER_BUSY_WAIT_FOR_FINISH 5.0
63 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
64 
65 #define BUFFER_VALIDITY_TIMEOUT 60.0
66 
67 #define SUSTAIN_WORKERS 50
68 #define HEAVYLOAD_WORKERS 150
69 #define MAX_WORKERS 250
70 
71 #define IDHASHSIZE 256
72 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
73 
74 /*
75 typedef struct cblock_s {
76 	uint8_t data[MFSBLOCKSIZE];
77 	uint32_t chindx;	// chunk number
78 	uint16_t pos;		// block in chunk (0..1023)
79 	uint8_t filled;		// data present
80 	uint8_t wakeup;		// somebody wants to be woken up when data are present
81 	struct readbuffer *next;
82 } cblock;
83 */
84 
85 // #define RDEBUG 1
86 
87 #ifdef RDEBUG
read_data_hexdump(uint8_t * buff,uint32_t leng)88 void read_data_hexdump(uint8_t *buff,uint32_t leng) {
89 	uint32_t i;
90 	for (i=0 ; i<leng ; i++) {
91 		if ((i%32)==0) {
92 			fprintf(stderr,"%p %08X:",(void*)(buff+i),i);
93 		}
94 		fprintf(stderr," %02X",buff[i]);
95 		if ((i%32)==31) {
96 			fprintf(stderr,"\n");
97 		}
98 	}
99 	if ((leng%32)!=0) {
100 		fprintf(stderr,"\n");
101 	}
102 }
103 #endif
104 
105 typedef struct rrequest_s {
106 	uint8_t *data;
107 	uint64_t offset;
108 	uint32_t leng;
109 	uint32_t rleng;
110 	uint32_t chindx;
111 	double modified;
112 	uint8_t filled;
113 	uint8_t refresh;
114 	uint8_t busy;
115 	uint8_t free;
116 	uint16_t lcnt;
117 	uint16_t waiting;
118 	pthread_cond_t cond;
119 	struct rrequest_s *next,**prev;
120 } rrequest;
121 
122 typedef struct inodedata_s {
123 	uint32_t inode;
124 	uint32_t seqdata;
125 	uint64_t fleng;
126 //	uint32_t cacheblockcount;
127 	int status;
128 	uint16_t closewaiting;
129 	uint32_t trycnt;
130 	uint8_t flengisvalid;
131 	uint8_t waitingworker;
132 	uint8_t inqueue;
133 	int pipe[2];
134 	uint8_t readahead;
135 	uint64_t lastoffset;
136 	uint64_t lastchunkid;
137 	uint32_t lastip;
138 	uint16_t lastport;
139 	uint8_t laststatus;
140 	rrequest *reqhead,**reqtail;
141 //	cblock *datachainhead,*datachaintail;
142 	pthread_cond_t closecond;
143 	struct inodedata_s *next;
144 } inodedata;
145 
146 typedef struct worker_s {
147 	pthread_t thread_id;
148 } worker;
149 
150 //static pthread_cond_t fcbcond;
151 //static uint8_t fcbwaiting;
152 //static cblock *cacheblocks,*freecblockshead;
153 //static uint32_t freecacheblocks;
154 
155 static uint32_t readahead;
156 static uint32_t readahead_trigger;
157 
158 static uint32_t maxretries;
159 static uint64_t maxreadaheadsize;
160 static uint64_t reqbufftotalsize;
161 
162 static inodedata **idhash;
163 
164 static pthread_mutex_t glock;
165 
166 //#ifdef BUFFER_DEBUG
167 //static pthread_t info_worker_th;
168 //static uint32_t usedblocks;
169 //#endif
170 
171 static pthread_t dqueue_worker_th;
172 
173 static uint32_t workers_avail;
174 static uint32_t workers_total;
175 static uint32_t worker_term_waiting;
176 static pthread_cond_t worker_term_cond;
177 static pthread_attr_t worker_thattr;
178 
179 // static pthread_t read_worker_th[WORKERS];
180 //static inodedata *read_worker_id[WORKERS];
181 
182 static void *jqueue,*dqueue;
183 
184 /* queues */
185 
186 /* glock: UNUSED */
read_delayed_enqueue(inodedata * id,uint32_t cnt)187 void read_delayed_enqueue(inodedata *id,uint32_t cnt) {
188 	uint64_t t;
189 	if (cnt>0) {
190 		t = monotonic_useconds();
191 		queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
192 	} else {
193 		queue_put(jqueue,0,0,(uint8_t*)id,0);
194 	}
195 }
196 
197 /* glock: UNUSED */
read_enqueue(inodedata * id)198 void read_enqueue(inodedata *id) {
199 	queue_put(jqueue,0,0,(uint8_t*)id,0);
200 }
201 
202 /* worker thread | glock: UNUSED */
read_dqueue_worker(void * arg)203 void* read_dqueue_worker(void *arg) {
204 	uint64_t t,usec;
205 	uint32_t husec,lusec,cnt;
206 	uint8_t *id;
207 	(void)arg;
208 	for (;;) {
209 		queue_get(dqueue,&husec,&lusec,&id,&cnt);
210 		if (id==NULL) {
211 			return NULL;
212 		}
213 		t = monotonic_useconds();
214 		usec = husec;
215 		usec <<= 32;
216 		usec |= lusec;
217 		if (t>usec) {
218 			t -= usec;
219 			while (t>=1000000 && cnt>0) {
220 				t-=1000000;
221 				cnt--;
222 			}
223 			if (cnt>0) {
224 				if (t<1000000) {
225 					portable_usleep(1000000-t);
226 				}
227 				cnt--;
228 			}
229 		}
230 		if (cnt>0) {
231 			t = monotonic_useconds();
232 			queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
233 		} else {
234 			queue_put(jqueue,0,0,id,0);
235 		}
236 	}
237 	return NULL;
238 }
239 
240 /* glock: UNLOCKED */
read_job_end(inodedata * id,int status,uint32_t delay)241 void read_job_end(inodedata *id,int status,uint32_t delay) {
242 	rrequest *rreq,**rreqp;
243 	uint8_t todo;
244 
245 	zassert(pthread_mutex_lock(&glock));
246 	if (status) {
247 		if (id->closewaiting==0) {
248 			errno = status;
249 			syslog(LOG_WARNING,"error reading file number %"PRIu32": %s",id->inode,strerr(errno));
250 		}
251 		id->status = status;
252 	}
253 	status = id->status;
254 	todo = 0;
255 	if (status==0) {
256 		for (rreq = id->reqhead ; rreq && todo==0 ; rreq=rreq->next) {
257 			if (rreq->filled==0 && rreq->free==0) {
258 				todo=1;
259 			}
260 		}
261                 if (delay==0) {
262                         id->trycnt=0;   // on good read reset try counter
263                 }
264 	}
265 
266 	if (id->closewaiting) {
267 #ifdef RDEBUG
268 		fprintf(stderr,"%.6lf: inode: %"PRIu32" - closewaiting\n",monotonic_seconds(),id->inode);
269 #endif
270 		rreqp = &(id->reqhead);
271 		while ((rreq = *rreqp)) {
272 			if (rreq->lcnt==0 && rreq->busy==0) {
273 				reqbufftotalsize -= rreq->leng;
274 				free(rreq->data);
275 				*rreqp = rreq->next;
276 				free(rreq);
277 			} else {
278 				if (rreq->filled==0) {
279 					rreq->rleng = 0;
280 					rreq->filled = 1;
281 				}
282 				if (rreq->waiting) {
283 					zassert(pthread_cond_broadcast(&(rreq->cond)));
284 				}
285 				rreq->free = 1;
286 				rreqp = &(rreq->next);
287 			}
288 		}
289 
290                 id->inqueue=0;
291 
292 #ifdef RDEBUG
293 		fprintf(stderr,"%.6lf: inode: %"PRIu32" - reqhead: %s (reqbufftotalsize: %"PRIu64")\n",monotonic_seconds(),id->inode,id->reqhead?"NOT NULL":"NULL",reqbufftotalsize);
294 #endif
295 		if (id->reqhead==NULL) {
296 			zassert(pthread_cond_broadcast(&(id->closecond)));
297 		}
298 	} else if (todo && status==0) {   // still have some work to do
299                 read_delayed_enqueue(id,delay);
300         } else {        // no more work, descriptor wait for being closed or error occurred
301 		for (rreq = id->reqhead ; rreq ; rreq=rreq->next) {
302 			if (rreq->filled==0) { // error occurred
303 				rreq->rleng = 0;
304 				rreq->filled = 1;
305 				if (rreq->waiting) {
306 					zassert(pthread_cond_broadcast(&(rreq->cond)));
307 				}
308 			}
309 		}
310 
311                 id->inqueue=0;
312         }
313         zassert(pthread_mutex_unlock(&glock));
314 }
315 
316 void* read_worker(void *arg);
317 
318 #ifndef RDEBUG
319 static uint32_t lastnotify = 0;
320 #endif
321 
322 /* glock:LOCKED */
read_data_spawn_worker(void)323 static inline void read_data_spawn_worker(void) {
324 	sigset_t oldset;
325 	sigset_t newset;
326 	worker *w;
327 	int res;
328 
329 	w = malloc(sizeof(worker));
330 	if (w==NULL) {
331 		return;
332 	}
333 	sigemptyset(&newset);
334 	sigaddset(&newset, SIGTERM);
335 	sigaddset(&newset, SIGINT);
336 	sigaddset(&newset, SIGHUP);
337 	sigaddset(&newset, SIGQUIT);
338 	pthread_sigmask(SIG_BLOCK, &newset, &oldset);
339 	res = pthread_create(&(w->thread_id),&worker_thattr,read_worker,w);
340 	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
341 	if (res<0) {
342 		return;
343 	}
344 	workers_avail++;
345 	workers_total++;
346 #ifdef RDEBUG
347 	fprintf(stderr,"%.6lf: spawn read worker (total: %"PRIu32")\n",monotonic_seconds(),workers_total);
348 #else
349 	if (workers_total%10==0 && workers_total!=lastnotify) {
350 		syslog(LOG_INFO,"read workers: %"PRIu32"+",workers_total);
351 		lastnotify = workers_total;
352 	}
353 #endif
354 }
355 
356 /* glock:LOCKED */
read_data_close_worker(worker * w)357 static inline void read_data_close_worker(worker *w) {
358 	workers_avail--;
359 	workers_total--;
360 	if (workers_total==0 && worker_term_waiting) {
361 		zassert(pthread_cond_signal(&worker_term_cond));
362 		worker_term_waiting--;
363 	}
364 	pthread_detach(w->thread_id);
365 	free(w);
366 #ifdef RDEBUG
367 	fprintf(stderr,"%.6lf: close read worker (total: %"PRIu32")\n",monotonic_seconds(),workers_total);
368 #else
369 	if (workers_total%10==0 && workers_total!=lastnotify) {
370 		syslog(LOG_INFO,"read workers: %"PRIu32"-",workers_total);
371 		lastnotify = workers_total;
372 	}
373 #endif
374 }
375 
read_prepare_ip(char ipstr[16],uint32_t ip)376 static inline void read_prepare_ip (char ipstr[16],uint32_t ip) {
377 	if (ipstr[0]==0) {
378 		snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
379 		ipstr[15]=0;
380 	}
381 }
382 
383 /* main working thread | glock:UNLOCKED */
read_worker(void * arg)384 void* read_worker(void *arg) {
385 	uint32_t z1,z2,z3;
386 	uint8_t *data;
387 	int fd;
388 	int i;
389 	struct pollfd pfd[3];
390 	uint32_t sent,tosend,received,currentpos;
391 	uint8_t recvbuff[20];
392 	uint8_t sendbuff[29];
393 #ifdef HAVE_READV
394 	struct iovec siov[2];
395 #endif
396 	uint8_t pipebuff[1024];
397 	uint8_t *wptr;
398 	const uint8_t *rptr;
399 
400 	uint32_t reccmd;
401 	uint32_t recleng;
402 	uint64_t recchunkid;
403 	uint16_t recblocknum;
404 	uint16_t recoffset;
405 	uint32_t recsize;
406 	uint32_t reccrc;
407 	uint8_t recstatus;
408 	uint8_t gotstatus;
409 
410 	uint32_t chindx;
411 	uint32_t ip;
412 	uint16_t port;
413 	uint32_t srcip;
414 	uint64_t mfleng;
415 	uint64_t chunkid;
416 	uint32_t version;
417 	const uint8_t *csdata;
418 	uint32_t tmpip;
419 	uint16_t tmpport;
420 	uint32_t tmpver;
421 	uint32_t csver;
422 	uint32_t cnt,bestcnt;
423 	uint32_t csdatasize;
424 	uint8_t csdataver;
425 	uint8_t csrecsize;
426 	uint8_t rdstatus;
427 	int status;
428 	char csstrip[16];
429 	uint8_t reqsend;
430 	uint8_t closewaiting;
431 	double start,now,lastrcvd,lastsend;
432 	double workingtime,lrdiff;
433 	uint8_t firsttime = 1;
434 	worker *w = (worker*)arg;
435 
436 	inodedata *id;
437 	rrequest *rreq,*nrreq;
438 
439 	ip = 0;
440 	port = 0;
441 	csstrip[0] = 0;
442 
443 	for (;;) {
444 		if (ip || port) {
445 			csdb_readdec(ip,port);
446 		}
447 		ip = 0;
448 		port = 0;
449 		csstrip[0] = 0;
450 
451 		if (firsttime==0) {
452 			zassert(pthread_mutex_lock(&glock));
453 			workers_avail++;
454 			if (workers_avail > SUSTAIN_WORKERS) {
455 //				fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
456 				read_data_close_worker(w);
457 				zassert(pthread_mutex_unlock(&glock));
458 				return NULL;
459 			}
460 			zassert(pthread_mutex_unlock(&glock));
461 		}
462 		firsttime = 0;
463 
464 		// get next job
465 		queue_get(jqueue,&z1,&z2,&data,&z3);
466 
467 		zassert(pthread_mutex_lock(&glock));
468 
469 		if (data==NULL) {
470 //			fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
471 			read_data_close_worker(w);
472 			zassert(pthread_mutex_unlock(&glock));
473 			return NULL;
474 		}
475 
476 		workers_avail--;
477 		if (workers_avail==0 && workers_total<MAX_WORKERS) {
478 			read_data_spawn_worker();
479 //			fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
480 		}
481 
482 		id = (inodedata*)data;
483 
484 		for (rreq = id->reqhead ; rreq && rreq->filled==1 && rreq->busy==0 ; rreq=rreq->next) {}
485 		if (rreq) {
486 			chindx = rreq->chindx;
487 			status = id->status;
488 			if (status==STATUS_OK) {
489 				rreq->busy = 1;
490 			}
491 		} else {
492 			// no data to read - just ignore it
493 			zassert(pthread_mutex_unlock(&glock));
494 			read_job_end(id,0,0);
495 			continue;
496 		}
497 
498 		zassert(pthread_mutex_unlock(&glock));
499 
500 		if (status!=STATUS_OK) {
501 			read_job_end(id,status,0);
502 			continue;
503 		}
504 
505 		// get chunk data from master
506 //		start = monotonic_seconds();
507 		rdstatus = fs_readchunk(id->inode,chindx,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
508 
509 		if (rdstatus!=STATUS_OK) {
510 			zassert(pthread_mutex_lock(&glock));
511 			rreq->busy = 0;
512 			zassert(pthread_mutex_unlock(&glock));
513 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_readchunk returned status: %s",id->inode,chindx,mfsstrerr(rdstatus));
514 			if (rdstatus==ERROR_ENOENT) {
515 				read_job_end(id,EBADF,0);
516 			} else if (rdstatus==ERROR_QUOTA) {
517 				read_job_end(id,EDQUOT,0);
518 			} else if (rdstatus==ERROR_NOSPACE) {
519 				read_job_end(id,ENOSPC,0);
520 			} else if (rdstatus==ERROR_CHUNKLOST) {
521 				read_job_end(id,ENXIO,0);
522 			} else {
523 				id->trycnt++;
524 				if (id->trycnt>=maxretries) {
525 					if (rdstatus==ERROR_NOCHUNKSERVERS) {
526 						read_job_end(id,ENOSPC,0);
527 					} else if (rdstatus==ERROR_CSNOTPRESENT) {
528 						read_job_end(id,ENXIO,0);
529 					} else {
530 						read_job_end(id,EIO,0);
531 					}
532 				} else {
533 					read_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
534 				}
535 			}
536 			continue;	// get next job
537 		}
538 //		now = monotonic_seconds();
539 //		fprintf(stderr,"fs_readchunk time: %.3lf\n",now-start);
540 		if (chunkid==0 && version==0) { // empty chunk
541 			zassert(pthread_mutex_lock(&glock));
542 			id->fleng = mfleng;
543 			id->flengisvalid = 1;
544 			rreq->busy = 0;
545 #ifdef RDEBUG
546 			fprintf(stderr,"%.6lf: inode: %"PRIu32" ; mfleng: %"PRIu64" (empty chunk)\n",monotonic_seconds(),id->inode,id->fleng);
547 #endif
548 			while (rreq) {
549 				if (rreq->offset > mfleng) {
550 					rreq->rleng = 0;
551 				} else if ((rreq->offset + rreq->leng) > mfleng) {
552 					rreq->rleng = mfleng - rreq->offset;
553 				} else {
554 					rreq->rleng = rreq->leng;
555 				}
556 
557 				if (rreq->rleng>0) {
558 					memset(rreq->data,0,rreq->rleng);
559 				}
560 				rreq->filled=1;
561 				rreq->modified = monotonic_seconds();
562 				if (rreq->waiting>0) {
563 					zassert(pthread_cond_broadcast(&(rreq->cond)));
564 				}
565 				rreq = NULL;
566 
567 				for (nrreq = id->reqhead ; nrreq && nrreq->filled==1 && nrreq->busy==0 ; nrreq=nrreq->next) {}
568 				if (nrreq && nrreq->chindx==chindx) {
569 					rreq = nrreq;
570 #ifdef RDEBUG
571 					fprintf(stderr,"%.6lf: readworker: get next request (empty chunk)\n",monotonic_seconds());
572 #endif
573 				}
574 			}
575 
576 			zassert(pthread_mutex_unlock(&glock));
577 			read_job_end(id,0,0);
578 
579 			continue;
580 		}
581 
582 		if (csdata==NULL || csdatasize==0) {
583 			zassert(pthread_mutex_lock(&glock));
584 			rreq->busy = 0;
585 			zassert(pthread_mutex_unlock(&glock));
586 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies",id->inode,chindx,chunkid,version);
587 			id->trycnt+=6;
588 			if (id->trycnt>=maxretries) {
589 				read_job_end(id,ENXIO,0);
590 			} else {
591 				read_delayed_enqueue(id,60);
592 			}
593 			continue;
594 		}
595 
596 		ip = 0; // make old compilers happy
597 		port = 0; // make old compilers happy
598 		csver = 0; // make old compilers happy
599 		if (csdataver==0) {
600 			csrecsize = 6;
601 		} else {
602 			csrecsize = 10;
603 		}
604 		// choose cs
605 		bestcnt = 0xFFFFFFFF;
606 		while (csdatasize>=csrecsize) {
607 			tmpip = get32bit(&csdata);
608 			tmpport = get16bit(&csdata);
609 			if (csdataver>0) {
610 				tmpver = get32bit(&csdata);
611 			} else {
612 				tmpver = 0;
613 			}
614 			csdatasize-=csrecsize;
615 			if (id->lastchunkid==chunkid && tmpip==id->lastip && tmpport==id->lastport) {
616 				if (id->laststatus==1) {
617 					ip = tmpip;
618 					port = tmpport;
619 					csver = tmpver;
620 					break;
621 				} else {
622 					cnt = 0xFFFFFFFE;
623 				}
624 			} else {
625 				cnt = csdb_getopcnt(tmpip,tmpport);
626 			}
627 			if (cnt<bestcnt) {
628 				ip = tmpip;
629 				port = tmpport;
630 				csver = tmpver;
631 				bestcnt = cnt;
632 			}
633 		}
634 
635 		if (ip || port) {
636 			csdb_readinc(ip,port);
637 			id->lastchunkid = chunkid;
638 			id->lastip = ip;
639 			id->lastport = port;
640 			id->laststatus = 0;
641 		} else {
642 			zassert(pthread_mutex_lock(&glock));
643 			rreq->busy = 0;
644 			zassert(pthread_mutex_unlock(&glock));
645 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies (bad ip and/or port)",id->inode,chindx,chunkid,version);
646 			id->trycnt+=6;
647 			if (id->trycnt>=maxretries) {
648 				read_job_end(id,ENXIO,0);
649 			} else {
650 				read_delayed_enqueue(id,60);
651 			}
652 			continue;
653 		}
654 
655 		start = monotonic_seconds();
656 
657 
658 		// make connection to cs
659 		srcip = fs_getsrcip();
660 		fd = conncache_get(ip,port);
661 		if (fd<0) {
662 			cnt=0;
663 			while (cnt<10) {
664 				fd = tcpsocket();
665 				if (fd<0) {
666 					syslog(LOG_WARNING,"readworker: can't create tcp socket: %s",strerr(errno));
667 					break;
668 				}
669 				if (srcip) {
670 					if (tcpnumbind(fd,srcip,0)<0) {
671 						syslog(LOG_WARNING,"readworker: can't bind socket to given ip: %s",strerr(errno));
672 						tcpclose(fd);
673 						fd=-1;
674 						break;
675 					}
676 				}
677 				if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
678 					cnt++;
679 					if (cnt>=10) {
680 						read_prepare_ip(csstrip,ip);
681 						syslog(LOG_WARNING,"readworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(errno));
682 					}
683 					close(fd);
684 					fd=-1;
685 				} else {
686 					cnt=10;
687 				}
688 			}
689 		}
690 		if (fd<0) {
691 			zassert(pthread_mutex_lock(&glock));
692 			rreq->busy = 0;
693 			zassert(pthread_mutex_unlock(&glock));
694 			id->trycnt++;
695 			if (id->trycnt>=maxretries) {
696 				read_job_end(id,EIO,0);
697 			} else {
698 				read_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
699 			}
700 			continue;
701 		}
702 		if (tcpnodelay(fd)<0) {
703 			syslog(LOG_WARNING,"readworker: can't set TCP_NODELAY: %s",strerr(errno));
704 		}
705 
706 		pfd[0].fd = fd;
707 		pfd[1].fd = id->pipe[0];
708 		currentpos = 0;
709 		gotstatus = 0;
710 		received = 0;
711 		reqsend = 0;
712 		sent = 0;
713 		tosend = 0;
714 		lastrcvd = 0.0;
715 		lastsend = 0.0;
716 
717 		zassert(pthread_mutex_lock(&glock));
718 		id->fleng = mfleng;
719 		id->flengisvalid = 1;
720 #ifdef RDEBUG
721 		fprintf(stderr,"%.6lf: inode: %"PRIu32" ; mfleng: %"PRIu64"\n",monotonic_seconds(),id->inode,id->fleng);
722 #endif
723 		zassert(pthread_mutex_unlock(&glock));
724 
725 		reccmd = 0; // makes gcc happy
726 		recleng = 0; // makes gcc happy
727 
728 		do {
729 			now = monotonic_seconds();
730 #ifdef RDEBUG
731 			if (rreq) {
732 				fprintf(stderr,"%.6lf: readworker inode: %"PRIu32" ; rreq: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng);
733 			} else {
734 				fprintf(stderr,"%.6lf: readworker inode: %"PRIu32" ; rreq: NULL\n",monotonic_seconds(),id->inode);
735 			}
736 #endif
737 
738 			zassert(pthread_mutex_lock(&glock));
739 
740 			if (id->flengisvalid) {
741 				mfleng = id->fleng;
742 			}
743 
744 			if (rreq!=NULL && ((reqsend && gotstatus) || rreq->refresh==1)) { // rreq has been read or needs to be reread
745 				rreq->busy = 0;
746 				if (rreq->refresh==1) {
747 					rreq->refresh = 0;
748 					rreq->filled = 0;
749 					zassert(pthread_mutex_unlock(&glock));
750 					status = EINTR;
751 					break;
752 				} else {
753 					rreq->filled = 1;
754 					rreq->modified = monotonic_seconds();
755 				}
756 				if (rreq->waiting>0) {
757 					zassert(pthread_cond_broadcast(&(rreq->cond)));
758 				}
759 				rreq = NULL;
760 			}
761 
762 			if (lastrcvd==0.0) {
763 				lastrcvd = now;
764 			} else {
765 				lrdiff = now - lastrcvd;
766 				if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
767 					read_prepare_ip(csstrip,ip);
768 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was timed out (lastrcvd:%.6lf,now:%.6lf,lrdiff:%.6lf received: %"PRIu32"/%"PRIu32", try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,lastrcvd,now,lrdiff,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
769 					if (rreq) {
770 						status = EIO;
771 					}
772 					zassert(pthread_mutex_unlock(&glock));
773 					break;
774 				}
775 			}
776 
777 			workingtime = now - start;
778 
779 			if (rreq==NULL) { // finished current block
780 				for (nrreq = id->reqhead ; nrreq && nrreq->filled==1 && nrreq->busy==0 ; nrreq=nrreq->next) {}
781 				if (nrreq) { // have next block
782 					if (nrreq->chindx!=chindx || nrreq->filled || nrreq->busy || workingtime>WORKER_BUSY_LAST_REQUEST_TIMEOUT+((workers_total>HEAVYLOAD_WORKERS)?0.0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) {
783 #ifdef RDEBUG
784 						fprintf(stderr,"%.6lf: readworker: ignore next request\n",monotonic_seconds());
785 #endif
786 						zassert(pthread_mutex_unlock(&glock));
787 						break;
788 					}
789 					if (nrreq->lcnt==0 && workers_total>HEAVYLOAD_WORKERS) { // currently nobody wants this block and there are a lot of busy workers, so skip this one
790 #ifdef RDEBUG
791 						fprintf(stderr,"%.6lf: readworker: lcnt is zero and there are a lot of workers\n",monotonic_seconds());
792 #endif
793 						zassert(pthread_mutex_unlock(&glock));
794 						break;
795 					}
796 #ifdef RDEBUG
797 					fprintf(stderr,"%.6lf: readworker: get next request\n",monotonic_seconds());
798 #endif
799 					rreq = nrreq;
800 					rreq->busy = 1;
801 					currentpos = 0;
802 					received = 0;
803 					reqsend = 0;
804 					gotstatus = 0;
805 				} else { // do not have next block
806 					if (workingtime>WORKER_IDLE_TIMEOUT || workers_total>HEAVYLOAD_WORKERS) {
807 #ifdef RDEBUG
808 						fprintf(stderr,"%.6lf: readworker: next request doesn't exist and there are a lot of workers or idle timeout passed\n",monotonic_seconds());
809 #endif
810 						zassert(pthread_mutex_unlock(&glock));
811 						break;
812 					}
813 #ifdef RDEBUG
814 					fprintf(stderr,"%.6lf: readworker: next request doesn't exist, so wait on pipe\n",monotonic_seconds());
815 #endif
816 				}
817 			} else { // have current block
818 				if (workingtime>(WORKER_BUSY_LAST_REQUEST_TIMEOUT+WORKER_BUSY_WAIT_FOR_FINISH+((workers_total>HEAVYLOAD_WORKERS)?0.0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT))) {
819 #ifdef RDEBUG
820 					fprintf(stderr,"%.6lf: readworker: current request not finished but busy timeout passed\n",monotonic_seconds());
821 #endif
822 					zassert(pthread_mutex_unlock(&glock));
823 					status = EINTR;
824 					break;
825 				}
826 			}
827 
828 			if (reqsend==0) {
829 				if (rreq->offset > mfleng) {
830 					rreq->rleng = 0;
831 				} else if ((rreq->offset + rreq->leng) > mfleng) {
832 					rreq->rleng = mfleng - rreq->offset;
833 				} else {
834 					rreq->rleng = rreq->leng;
835 				}
836 				if (rreq->rleng>0) {
837 					wptr = sendbuff;
838 					put32bit(&wptr,CLTOCS_READ);
839 					if (csver>=VERSION2INT(1,7,32)) {
840 						put32bit(&wptr,21);
841 						put8bit(&wptr,1);
842 						tosend = 29;
843 					} else {
844 						put32bit(&wptr,20);
845 						tosend = 28;
846 					}
847 					put64bit(&wptr,chunkid);
848 					put32bit(&wptr,version);
849 					put32bit(&wptr,(rreq->offset & MFSCHUNKMASK));
850 					put32bit(&wptr,rreq->rleng);
851 					sent = 0;
852 					reqsend = 1;
853 				} else {
854 					tosend = 0;
855 					sent = 0;
856 					reqsend = 1;
857 					gotstatus = 1;
858 					zassert(pthread_mutex_unlock(&glock));
859 					continue;
860 				}
861 			}
862 
863 			id->waitingworker=1;
864 			zassert(pthread_mutex_unlock(&glock));
865 
866 			if (tosend==0 && (now - lastsend > (CHUNKSERVER_ACTIVITY_TIMEOUT/2.0))) {
867 				wptr = sendbuff;
868 				put32bit(&wptr,ANTOAN_NOP);
869 				put32bit(&wptr,0);
870 				tosend = 8;
871 				sent = 0;
872 			}
873 
874 			if (tosend>0) {
875 				i = write(fd,sendbuff+sent,tosend-sent);
876 				if (i<0) { // error
877 					if (ERRNO_ERROR && errno!=EINTR) {
878 						read_prepare_ip(csstrip,ip);
879 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: write to (%s:%"PRIu16") error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
880 						status = EIO;
881 						zassert(pthread_mutex_lock(&glock));
882 						id->waitingworker=0;
883 						zassert(pthread_mutex_unlock(&glock));
884 						break;
885 					} else {
886 						i=0;
887 					}
888 				}
889 				if (i>0) {
890 					sent += i;
891 					if (tosend<=sent) {
892 						sent = 0;
893 						tosend = 0;
894 					}
895 					lastsend = now;
896 				}
897 			}
898 
899 			pfd[0].events = POLLIN | ((tosend>0)?POLLOUT:0);
900 			pfd[0].revents = 0;
901 			pfd[1].events = POLLIN;
902 			pfd[1].revents = 0;
903 			if (poll(pfd,2,100)<0) {
904 				if (errno!=EINTR) {
905 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: poll error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
906 					status = EIO;
907 					break;
908 				}
909 			}
910 			zassert(pthread_mutex_lock(&glock));
911 			id->waitingworker=0;
912 			closewaiting = (id->closewaiting>0)?1:0;
913 			zassert(pthread_mutex_unlock(&glock));
914 			if (pfd[1].revents&POLLIN) {    // used just to break poll - so just read all data from pipe to empty it
915 #ifdef RDEBUG
916 				fprintf(stderr,"%.6lf: readworker: %"PRIu32" woken up by pipe\n",monotonic_seconds(),id->inode);
917 #endif
918 				i = read(id->pipe[0],pipebuff,1024);
919 				if (i<0) { // mainly to make happy static code analyzers
920 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: read pipe error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
921 				}
922 			}
923 			if (closewaiting) {
924 #ifdef RDEBUG
925 				fprintf(stderr,"%.6lf: readworker: closewaiting\n",monotonic_seconds());
926 #endif
927 				if (rreq!=NULL) {
928 					status = EINTR;
929 				}
930 				break;
931 			}
932 			if (pfd[0].revents&POLLHUP) {
933 				read_prepare_ip(csstrip,ip);
934 				syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
935 				status = EIO;
936 				break;
937 			}
938 			if (pfd[0].revents&POLLERR) {
939 				read_prepare_ip(csstrip,ip);
940 				syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") got error status / POLLERR (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
941 				status = EIO;
942 				break;
943 			}
944 			if (pfd[0].revents&POLLIN) {
945 				lastrcvd = monotonic_seconds();
946 				if (received < 8) {
947 					i = read(fd,recvbuff+received,8-received);
948 					if (i==0) {
949 						read_prepare_ip(csstrip,ip);
950 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
951 						status = EIO;
952 						break;
953 					}
954 					if (i<0) {
955 						read_prepare_ip(csstrip,ip);
956 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: read from (%s:%"PRIu16") error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
957 						status = EIO;
958 						break;
959 					}
960 					received += i;
961 					if (received == 8) { // full header
962 						rptr = recvbuff;
963 
964 					        reccmd = get32bit(&rptr);
965 						recleng = get32bit(&rptr);
966 						if (reccmd==CSTOCL_READ_STATUS) {
967 							if (recleng!=9) {
968 								syslog(LOG_WARNING,"readworker: got wrong sized status packet from chunkserver (leng:%"PRIu32")",recleng);
969 								status = EIO;
970 								break;
971 							}
972 						} else if (reccmd==CSTOCL_READ_DATA) {
973 							if (rreq==NULL) {
974 								syslog(LOG_WARNING,"readworker: got unexpected data from chunkserver (leng:%"PRIu32")",recleng);
975 								status = EIO;
976 								break;
977 							} else if (recleng<20) {
978 								syslog(LOG_WARNING,"readworker: got too short data packet from chunkserver (leng:%"PRIu32")",recleng);
979 								status = EIO;
980 								break;
981 							} else if ((recleng-20) + currentpos > rreq->rleng) {
982 								syslog(LOG_WARNING,"readworker: got too long data packet from chunkserver (leng:%"PRIu32")",recleng);
983 								status = EIO;
984 								break;
985 							}
986 						} else if (reccmd==ANTOAN_NOP) {
987 							if (recleng!=0) {
988 								syslog(LOG_WARNING,"readworker: got wrong sized nop packet from chunkserver (leng:%"PRIu32")",recleng);
989 								status = EIO;
990 								break;
991 							}
992 							received = 0;
993 						} else {
994 							uint32_t myip,peerip;
995 							uint16_t myport,peerport;
996 							tcpgetpeer(fd,&peerip,&peerport);
997 							tcpgetmyaddr(fd,&myip,&myport);
998 							syslog(LOG_WARNING,"readworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32",%u.%u.%u.%u:%u<->%u.%u.%u.%u:%u)",reccmd,recleng,(myip>>24)&0xFF,(myip>>16)&0xFF,(myip>>8)&0xFF,myip&0xFF,myport,(peerip>>24)&0xFF,(peerip>>16)&0xFF,(peerip>>8)&0xFF,peerip&0xFF,peerport);
999 							status = EIO;
1000 							break;
1001 						}
1002 					}
1003 				}
1004 				if (received >= 8) {
1005 					if (recleng<=20) {
1006 						i = read(fd,recvbuff + (received-8),recleng - (received-8));
1007 					} else {
1008 						if (received < 8 + 20) {
1009 #ifdef HAVE_READV
1010 							siov[0].iov_base = recvbuff + (received-8);
1011 							siov[0].iov_len = 20 - (received-8);
1012 							siov[1].iov_base = rreq->data + currentpos;
1013 							siov[1].iov_len = recleng - 20;
1014 							i = readv(fd,siov,2);
1015 #else
1016 							i = read(fd,recvbuff + (received-8),20 - (received-8));
1017 #endif
1018 						} else {
1019 							i = read(fd,rreq->data + currentpos,recleng - (received-8));
1020 						}
1021 					}
1022 					if (i==0) {
1023 						read_prepare_ip(csstrip,ip);
1024 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
1025 						status = EIO;
1026 						break;
1027 					}
1028 					if (i<0) {
1029 						read_prepare_ip(csstrip,ip);
1030 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") got error status (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
1031 						status = EIO;
1032 						break;
1033 					}
1034 					if (received < 8+20) {
1035 						if (received+i >= 8+20) {
1036 							currentpos += i - ((8+20) - received);
1037 						}
1038 					} else {
1039 						currentpos += i;
1040 					}
1041 					received += i;
1042 					if (received > 8+recleng) {
1043 						syslog(LOG_WARNING,"readworker: internal error - received more bytes than expected");
1044 						status = EIO;
1045 						break;
1046 					} else if (received == 8+recleng) {
1047 
1048 						if (reccmd==CSTOCL_READ_STATUS) {
1049 							rptr = recvbuff;
1050 							recchunkid = get64bit(&rptr);
1051 							recstatus = get8bit(&rptr);
1052 							if (recchunkid != chunkid) {
1053 								syslog(LOG_WARNING,"readworker: got unexpected status packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1054 								status = EIO;
1055 								break;
1056 							}
1057 							if (recstatus!=STATUS_OK) {
1058 								syslog(LOG_WARNING,"readworker: read error: %s",mfsstrerr(recstatus));
1059 								status = EIO;
1060 								break;
1061 							}
1062 							if (currentpos != rreq->rleng) {
1063 								syslog(LOG_WARNING,"readworker: unexpected data block size (requested: %"PRIu32" / received: %"PRIu32")",rreq->rleng,currentpos);
1064 								status = EIO;
1065 								break;
1066 							}
1067 							gotstatus = 1;
1068 						} else if (reccmd==CSTOCL_READ_DATA) {
1069 							rptr = recvbuff;
1070 							recchunkid = get64bit(&rptr);
1071 							recblocknum = get16bit(&rptr);
1072 							recoffset = get16bit(&rptr);
1073 							recsize = get32bit(&rptr);
1074 							reccrc = get32bit(&rptr);
1075 							(void)recoffset;
1076 							(void)recblocknum;
1077 							if (recchunkid != chunkid) {
1078 								syslog(LOG_WARNING,"readworker: got unexpected data packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1079 								status = EIO;
1080 								break;
1081 							}
1082 							if (recsize+20 != recleng) {
1083 								syslog(LOG_WARNING,"readworker: got malformed data packet (datasize: %"PRIu32",packetsize: %"PRIu32")",recsize,recleng);
1084 								status = EIO;
1085 								break;
1086 							}
1087 							if (reccrc != mycrc32(0,rreq->data + (currentpos - recsize),recsize)) {
1088 								syslog(LOG_WARNING,"readworker: data checksum error");
1089 								status = EIO;
1090 								break;
1091 							}
1092 						}
1093 						received = 0;
1094 					}
1095 				}
1096 			}
1097 		} while (1);
1098 
1099 		if (status==0 && csver>=VERSION2INT(1,7,32)) {
1100 			conncache_insert(ip,port,fd);
1101 		} else {
1102 			tcpclose(fd);
1103 		}
1104 
1105 		if (status==EINTR) {
1106 			status=0;
1107 		}
1108 
1109 #ifdef WORKER_DEBUG
1110 		now = monotonic_seconds();
1111 		workingtime = now - start;
1112 
1113 		syslog(LOG_NOTICE,"worker %lu received data from chunk %016"PRIX64"_%08"PRIX32", bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s )",(unsigned long)arg,chunkid,version,(double)bytesreceived/workingtime,bytesreceived,workingtime);
1114 #endif
1115 
1116 		zassert(pthread_mutex_lock(&glock));
1117 		if (rreq) { // block hasn't been read
1118 			rreq->busy = 0;
1119 		}
1120 		if (status!=0) {
1121 			id->trycnt++;
1122 			if (id->trycnt>=maxretries) {
1123 				zassert(pthread_mutex_unlock(&glock));
1124 				read_job_end(id,status,0);
1125 			} else {
1126 				zassert(pthread_mutex_unlock(&glock));
1127 				read_job_end(id,0,1+((id->trycnt<30)?(id->trycnt/3):10));
1128 			}
1129 		} else {
1130 			id->laststatus = 1;
1131 			zassert(pthread_mutex_unlock(&glock));
1132 			read_job_end(id,0,0);
1133 		}
1134 	}
1135 	return NULL;
1136 }
1137 
1138 /* API | glock: INITIALIZED,UNLOCKED */
read_data_init(uint64_t readaheadsize,uint32_t readaheadleng,uint32_t readaheadtrigger,uint32_t retries)1139 void read_data_init (uint64_t readaheadsize,uint32_t readaheadleng,uint32_t readaheadtrigger,uint32_t retries) {
1140         uint32_t i;
1141 	sigset_t oldset;
1142 	sigset_t newset;
1143 
1144 	maxretries = retries;
1145 	readahead = readaheadleng;
1146 	readahead_trigger = readaheadtrigger;
1147 	maxreadaheadsize = readaheadsize;
1148 	reqbufftotalsize = 0;
1149 
1150 	zassert(pthread_mutex_init(&glock,NULL));
1151 	zassert(pthread_cond_init(&worker_term_cond,NULL));
1152 	worker_term_waiting = 0;
1153 
1154 	idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1155 	passert(idhash);
1156 	for (i=0 ; i<IDHASHSIZE ; i++) {
1157 		idhash[i]=NULL;
1158 	}
1159 
1160 	dqueue = queue_new(0);
1161 	jqueue = queue_new(0);
1162 
1163         zassert(pthread_attr_init(&worker_thattr));
1164         zassert(pthread_attr_setstacksize(&worker_thattr,0x100000));
1165 	sigemptyset(&newset);
1166 	sigaddset(&newset, SIGTERM);
1167 	sigaddset(&newset, SIGINT);
1168 	sigaddset(&newset, SIGHUP);
1169 	sigaddset(&newset, SIGQUIT);
1170 	pthread_sigmask(SIG_BLOCK, &newset, &oldset);
1171         zassert(pthread_create(&dqueue_worker_th,&worker_thattr,read_dqueue_worker,NULL));
1172 	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
1173 
1174 	zassert(pthread_mutex_lock(&glock));
1175 	workers_avail = 0;
1176 	workers_total = 0;
1177 	read_data_spawn_worker();
1178 	zassert(pthread_mutex_unlock(&glock));
1179 //	fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
1180 
1181 //#ifdef BUFFER_DEBUG
1182 //        pthread_create(&info_worker_th,&thattr,read_info_worker,NULL);
1183 //#endif
1184 //	for (i=0 ; i<WORKERS ; i++) {
1185 //		zassert(pthread_create(read_worker_th+i,&thattr,read_worker,(void*)(unsigned long)(i)));
1186 //	}
1187 }
1188 
read_data_term(void)1189 void read_data_term(void) {
1190 	uint32_t i;
1191 	inodedata *id,*idn;
1192 
1193 	queue_close(dqueue);
1194 	queue_close(jqueue);
1195 	zassert(pthread_mutex_lock(&glock));
1196 	while (workers_total>0) {
1197 		worker_term_waiting++;
1198 		zassert(pthread_cond_wait(&worker_term_cond,&glock));
1199 	}
1200 	zassert(pthread_mutex_unlock(&glock));
1201 	zassert(pthread_join(dqueue_worker_th,NULL));
1202 	queue_delete(dqueue);
1203 	queue_delete(jqueue);
1204 	for (i=0 ; i<IDHASHSIZE ; i++) {
1205 		for (id = idhash[i] ; id ; id = idn) {
1206 			idn = id->next;
1207 			zassert(pthread_cond_destroy(&(id->closecond)));
1208 			close(id->pipe[0]);
1209 			close(id->pipe[1]);
1210 			free(id);
1211 		}
1212 	}
1213 	free(idhash);
1214 	//        free(cacheblocks);
1215 	//        pthread_cond_destroy(&fcbcond);
1216 	zassert(pthread_attr_destroy(&worker_thattr));
1217 	zassert(pthread_cond_destroy(&worker_term_cond));
1218         zassert(pthread_mutex_destroy(&glock));
1219 }
1220 
1221 
1222 
read_new_request(inodedata * id,uint64_t * offset,uint64_t blockend)1223 rrequest* read_new_request(inodedata *id,uint64_t *offset,uint64_t blockend) {
1224 	uint64_t chunkoffset;
1225 	uint64_t chunkend;
1226 	uint32_t chunkleng;
1227 	uint32_t chindx;
1228 
1229 	chunkoffset = *offset;
1230 	chindx = chunkoffset>>MFSCHUNKBITS;
1231 	chunkend = chindx;
1232 	chunkend <<= MFSCHUNKBITS;
1233 	chunkend += MFSCHUNKSIZE;
1234 	if (blockend > chunkend) {
1235 		chunkleng = chunkend - chunkoffset;
1236 		*offset = chunkend;
1237 	} else {
1238 		chunkleng = blockend - (*offset);
1239 		*offset = blockend;
1240 	}
1241 
1242 	rrequest *rreq;
1243 	rreq = malloc(sizeof(rrequest));
1244 	passert(rreq);
1245 #ifdef RDEBUG
1246 	fprintf(stderr,"%.6lf: inode: %"PRIu32" - new request: chindx: %"PRIu32" chunkoffset: %"PRIu64" chunkleng: %"PRIu32"\n",monotonic_seconds(),id->inode,chindx,chunkoffset,chunkleng);
1247 #endif
1248 	rreq->modified = monotonic_seconds();
1249 	rreq->offset = chunkoffset;
1250 	rreq->leng = chunkleng;
1251 	rreq->chindx = chindx;
1252 	rreq->rleng = 0;
1253 	rreq->filled = 0;
1254 	rreq->refresh = 0;
1255 	rreq->busy = 0;
1256 	rreq->free = 0;
1257 	rreq->lcnt = 0;
1258 	rreq->data = malloc(chunkleng);
1259 	passert(rreq->data);
1260 	rreq->waiting = 0;
1261 	zassert(pthread_cond_init(&(rreq->cond),NULL));
1262 	if (id->inqueue==0) {
1263 		read_enqueue(id);
1264 		id->inqueue=1;
1265 	}
1266 	rreq->next = NULL;
1267 	rreq->prev = id->reqtail;
1268 	*(id->reqtail) = rreq;
1269 	id->reqtail = &(rreq->next);
1270 	reqbufftotalsize+=chunkleng;
1271 	return rreq;
1272 }
1273 
1274 typedef struct rlist_s {
1275 	rrequest *rreq;
1276 	uint64_t offsetadd;
1277 	uint32_t reqleng;
1278 	struct rlist_s *next;
1279 } rlist;
1280 
1281 // return list of rreq
read_data(void * vid,uint64_t offset,uint32_t * size,void ** vrhead,struct iovec ** iov,uint32_t * iovcnt)1282 int read_data(void *vid, uint64_t offset, uint32_t *size, void **vrhead,struct iovec **iov,uint32_t *iovcnt) {
1283 	inodedata *id = (inodedata*)vid;
1284 	rrequest *rreq,*rreqn;
1285 	rlist *rl,*rhead,**rtail;
1286 	uint64_t firstbyte;
1287 	uint64_t lastbyte;
1288 	uint32_t cnt;
1289 	uint8_t newrequests;
1290 	int status;
1291 	double now;
1292 	zassert(pthread_mutex_lock(&glock));
1293 
1294 	*vrhead = NULL;
1295 	*iov = NULL;
1296 	*iovcnt = 0;
1297 	cnt = 0;
1298 
1299 #ifdef RDEBUG
1300 	fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" id->status: %d id->closewaiting: %"PRIu16"\n",monotonic_seconds(),id->inode,id->status,id->closewaiting);
1301 #endif
1302 
1303 	if (id->status==0 && id->closewaiting==0) {
1304 		if (offset==id->lastoffset) {
1305 			if (id->readahead==0) {
1306 				if (id->seqdata>=readahead_trigger) {
1307 					id->readahead = 1;
1308 				}
1309 			}
1310 		} else {
1311 			if (offset+(readahead/2) < id->lastoffset || id->lastoffset+(readahead/2) < offset) {
1312 				id->readahead = 0;
1313 				id->seqdata = 0;
1314 			}
1315 		}
1316 #ifdef RDEBUG
1317 		fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" seqdata: %"PRIu32" offset: %"PRIu64" id->lastoffset: %"PRIu64" id->readahead: %u reqbufftotalsize:%"PRIu64"\n",monotonic_seconds(),id->inode,id->seqdata,offset,id->lastoffset,id->readahead,reqbufftotalsize);
1318 #endif
1319 		newrequests = 0;
1320 
1321 		// prepare requests
1322 
1323 		firstbyte = offset;
1324 		lastbyte = offset + (*size);
1325 		rhead = NULL;
1326 		rtail = &rhead;
1327 		rreq = id->reqhead;
1328 		now = monotonic_seconds();
1329 		while (rreq && lastbyte>firstbyte) {
1330 			rreqn = rreq->next;
1331 #ifdef RDEBUG
1332 			fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" , rreq->modified:%.6lf , rreq->offset: %"PRIu64" , rreq->leng: %"PRIu32" , firstbyte: %"PRIu64" , lastbyte: %"PRIu64"\n",monotonic_seconds(),id->inode,rreq->modified,rreq->offset,rreq->leng,firstbyte,lastbyte);
1333 #endif
1334 			if (rreq->modified+BUFFER_VALIDITY_TIMEOUT<now) { // buffer too old
1335 #ifdef RDEBUG
1336 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" data too old: free rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1337 #endif
1338 				if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1339 					*(rreq->prev) = rreq->next;
1340 					if (rreq->next) {
1341 						rreq->next->prev = rreq->prev;
1342 					} else {
1343 						id->reqtail = rreq->prev;
1344 					}
1345 					reqbufftotalsize -= rreq->leng;
1346 					free(rreq->data);
1347 					free(rreq);
1348 				} else {
1349 					rreq->free = 1; // somenody still using it, so mark it for removal
1350 				}
1351 			} else if (firstbyte < rreq->offset || firstbyte >= rreq->offset+rreq->leng) { // all not sequential read cases
1352 #ifdef RDEBUG
1353 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 0: free rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1354 #endif
1355 				// rreq:      |---------|
1356 				// read: |--|
1357 				// read: |-------|
1358 				// read: |-------------------|
1359 				// read:                  |--|
1360 				if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1361 					*(rreq->prev) = rreq->next;
1362 					if (rreq->next) {
1363 						rreq->next->prev = rreq->prev;
1364 					} else {
1365 						id->reqtail = rreq->prev;
1366 					}
1367 					reqbufftotalsize -= rreq->leng;
1368 					free(rreq->data);
1369 					free(rreq);
1370 				} else {
1371 					rreq->free = 1; // somenody still using it, so mark it for removal
1372 				}
1373 			} else if (lastbyte <= rreq->offset+rreq->leng) {
1374 #ifdef RDEBUG
1375 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 1: use rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1376 #endif
1377 				// rreq: |---------|
1378 				// read:    |---|
1379 				rl = malloc(sizeof(rlist));
1380 				passert(rl);
1381 				rl->rreq = rreq;
1382 				rl->offsetadd = firstbyte - rreq->offset;
1383 				rl->reqleng = (lastbyte - firstbyte) + rl->offsetadd;
1384 				rl->next = NULL;
1385 				*rtail = rl;
1386 				rtail = &(rl->next);
1387 				rreq->lcnt++;
1388 				if (id->readahead && id->flengisvalid && reqbufftotalsize<maxreadaheadsize) {
1389 					if (lastbyte > rreq->offset + (rreq->leng/5)) {
1390 						// request next block of data
1391 						if (rreq->next==NULL) {
1392 							uint64_t blockstart,blockend;
1393 							blockstart = rreq->offset+rreq->leng;
1394 							blockend = blockstart+readahead;
1395 							if (blockend<=id->fleng) {
1396 								rreq->next = read_new_request(id,&blockstart,blockend);
1397 								newrequests = 1;
1398 							} else if (blockstart<id->fleng) {
1399 								rreq->next = read_new_request(id,&blockstart,id->fleng);
1400 								newrequests = 1;
1401 							}
1402 						}
1403 					}
1404 				}
1405 				lastbyte = 0;
1406 				firstbyte = 0;
1407 			} else {
1408 #ifdef RDEBUG
1409 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 2: use tail of rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1410 #endif
1411 				// rreq: |---------|
1412 				// read:         |---|
1413 				rl = malloc(sizeof(rlist));
1414 				passert(rl);
1415 				rl->rreq = rreq;
1416 				rl->offsetadd = firstbyte - rreq->offset;
1417 				rl->reqleng = rreq->leng;
1418 				rl->next = NULL;
1419 				*rtail = rl;
1420 				rtail = &(rl->next);
1421 				rreq->lcnt++;
1422 				firstbyte = rreq->offset+rreq->leng;
1423 			}
1424 			rreq = rreqn;
1425 		}
1426 		while (lastbyte>firstbyte) {
1427 			rreq = read_new_request(id,&firstbyte,lastbyte);
1428 			rl = malloc(sizeof(rlist));
1429 			passert(rl);
1430 			rl->rreq = rreq;
1431 			rl->offsetadd = 0;
1432 			rl->reqleng = rreq->leng;
1433 			rl->next = NULL;
1434 			*rtail = rl;
1435 			rtail = &(rl->next);
1436 			rreq->lcnt++;
1437 			if (lastbyte==firstbyte && id->readahead && id->flengisvalid && reqbufftotalsize<maxreadaheadsize) {
1438 				if (lastbyte+readahead<=id->fleng) {
1439 					(void)read_new_request(id,&firstbyte,lastbyte+readahead);
1440 				} else if (lastbyte<id->fleng) {
1441 					(void)read_new_request(id,&firstbyte,id->fleng);
1442 				}
1443 			}
1444 			newrequests = 1;
1445 		}
1446 
1447 		*vrhead = rhead;
1448 
1449 #ifdef RDEBUG
1450 		if (newrequests) {
1451 			fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" newrequest\n",monotonic_seconds(),id->inode);
1452 		}
1453 #endif
1454 		if (newrequests && id->waitingworker) {
1455 #ifdef RDEBUG
1456 			fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" wakeup readworker\n",monotonic_seconds(),id->inode);
1457 #endif
1458 			if (write(id->pipe[1]," ",1)!=1) {
1459 				syslog(LOG_ERR,"can't write to pipe !!!");
1460 			}
1461 			id->waitingworker=0;
1462 		}
1463 
1464 		cnt = 0;
1465 		*size = 0;
1466 		for (rl = rhead ; rl ; rl=rl->next) {
1467 			while (rl->rreq->filled==0 && id->status==0 && id->closewaiting==0) {
1468 				rl->rreq->waiting++;
1469 #ifdef RDEBUG
1470 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" wait for data: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->leng);
1471 #endif
1472 				zassert(pthread_cond_wait(&(rl->rreq->cond),&glock));
1473 				rl->rreq->waiting--;
1474 			}
1475 			if (id->status==0) {
1476 #ifdef RDEBUG
1477 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" block %"PRIu64":%"PRIu32"(%"PRIu32") has been read\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->rleng,rl->rreq->leng);
1478 #endif
1479 				if (rl->rreq->rleng < rl->rreq->leng) {
1480 					if (rl->rreq->rleng > rl->offsetadd) {
1481 						cnt++;
1482 						if (rl->reqleng > rl->rreq->rleng) {
1483 							rl->reqleng = rl->rreq->rleng;
1484 						}
1485 						*size += rl->reqleng - rl->offsetadd;
1486 					}
1487 					break; // end of file
1488 				} else {
1489 					cnt++;
1490 					*size += rl->reqleng - rl->offsetadd;
1491 				}
1492 			} else {
1493 #ifdef RDEBUG
1494 				fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" error reading block: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->leng);
1495 #endif
1496 				break;
1497 			}
1498 #ifdef RDEBUG
1499 			fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" size: %"PRIu32" ; cnt: %u\n",monotonic_seconds(),id->inode,*size,cnt);
1500 #endif
1501 		}
1502 	}
1503 
1504 	if (id->status==0 && id->closewaiting==0 && cnt>0) {
1505 		id->lastoffset = offset + (*size);
1506 		if (id->readahead==0) {
1507 			id->seqdata += (*size);
1508 		}
1509 		*iov = malloc(sizeof(struct iovec)*cnt);
1510 		passert(*iov);
1511 		cnt = 0;
1512 		for (rl = rhead ; rl ; rl=rl->next) {
1513 			if (rl->rreq->rleng < rl->rreq->leng) {
1514 				if (rl->rreq->rleng > rl->offsetadd) {
1515 					(*iov)[cnt].iov_base = rl->rreq->data + rl->offsetadd;
1516 					(*iov)[cnt].iov_len = rl->reqleng - rl->offsetadd;
1517 					cnt++;
1518 				}
1519 				break;
1520 			} else {
1521 				(*iov)[cnt].iov_base = rl->rreq->data + rl->offsetadd;
1522 				(*iov)[cnt].iov_len = rl->reqleng - rl->offsetadd;
1523 				cnt++;
1524 			}
1525 		}
1526 		*iovcnt = cnt;
1527 	} else {
1528 		*iovcnt = 0;
1529 		*iov = NULL;
1530 	}
1531 
1532 	status = id->status;
1533 
1534 #ifdef RDEBUG
1535 	fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" id->status: %d iovcnt: %"PRIu32" iovec: %p\n",monotonic_seconds(),id->inode,id->status,*iovcnt,(void*)(*iov));
1536 #endif
1537 
1538 	zassert(pthread_mutex_unlock(&glock));
1539 	return status;
1540 }
1541 
read_data_free_buff(void * vid,void * vrhead,struct iovec * iov)1542 void read_data_free_buff(void *vid,void *vrhead,struct iovec *iov) {
1543 	inodedata *id = (inodedata*)vid;
1544 	rlist *rl,*rln;
1545 	rrequest *rreq;
1546 	rl = (rlist*)vrhead;
1547 	zassert(pthread_mutex_lock(&glock));
1548 #ifdef RDEBUG
1549 	fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" inode_structure: %p vrhead: %p iovec: %p\n",monotonic_seconds(),id->inode,(void*)id,(void*)vrhead,(void*)iov);
1550 #endif
1551 	while (rl) {
1552 		rln = rl->next;
1553 		rreq = rl->rreq;
1554 		rreq->lcnt--;
1555 		if (rreq->lcnt==0 && rreq->busy==0 && rreq->free) {
1556 			*(rreq->prev) = rreq->next;
1557 			if (rreq->next) {
1558 				rreq->next->prev = rreq->prev;
1559 			} else {
1560 				id->reqtail = rreq->prev;
1561 			}
1562 			reqbufftotalsize -= rreq->leng;
1563 			free(rreq->data);
1564 			free(rreq);
1565 		}
1566 		free(rl);
1567 		rl = rln;
1568 	}
1569 	if (id->reqhead==NULL && id->inqueue==0 && id->closewaiting>0) {
1570 		zassert(pthread_cond_broadcast(&(id->closecond)));
1571 	}
1572 	if (iov) {
1573 		free(iov);
1574 	}
1575 	zassert(pthread_mutex_unlock(&glock));
1576 }
1577 
read_inode_dirty_region(uint32_t inode,uint64_t offset,uint32_t size,const char * buff)1578 void read_inode_dirty_region(uint32_t inode,uint64_t offset,uint32_t size,const char *buff) {
1579 	uint32_t idh = IDHASH(inode);
1580 	inodedata *id;
1581 	rrequest *rreq,*rreqn;
1582 //	int clearedbuff = 0;
1583 
1584 #ifdef RDEBUG
1585 	fprintf(stderr,"%.6lf: read_inode_dirty_region: inode: %"PRIu32" set dirty region: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),inode,offset,size);
1586 #endif
1587 	zassert(pthread_mutex_lock(&glock));
1588 	for (id = idhash[idh] ; id ; id=id->next) {
1589 		if (id->inode == inode) {
1590 			for (rreq = id->reqhead ; rreq ; rreq=rreqn) {
1591 				rreqn = rreq->next;
1592 #ifdef RDEBUG
1593 				fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (before): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1594 #endif
1595 				if (rreq->free==0 && ((rreq->offset < offset + size) && (rreq->offset + rreq->leng > offset))) {
1596 					if (rreq->filled) { // already filled, exchange data
1597 						if (rreq->offset > offset) {
1598 							if (rreq->offset + rreq->leng > offset + size) {
1599 								// rreq:   |-------|
1600 								// buff: |----|
1601 #ifdef RDEBUG
1602 								fprintf(stderr,"%.6lf: read_inode_dirty_region: case 1: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1603 #endif
1604 								memcpy(rreq->data,buff + (rreq->offset - offset),size - (rreq->offset - offset));
1605 								if (size - (rreq->offset - offset) > rreq->rleng) {
1606 									rreq->rleng = size - (rreq->offset - offset);
1607 								}
1608 							} else {
1609 								// rreq:   |-------|
1610 								// buff: |-----------|
1611 #ifdef RDEBUG
1612 								fprintf(stderr,"%.6lf: read_inode_dirty_region: case 2: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1613 #endif
1614 								memcpy(rreq->data,buff + (rreq->offset - offset),rreq->leng);
1615 								if (rreq->leng > rreq->rleng) {
1616 									rreq->rleng = rreq->leng;
1617 								}
1618 							}
1619 						} else {
1620 							if (rreq->offset + rreq->leng > offset + size) {
1621 								// rreq: |-------|
1622 								// buff:   |----|
1623 #ifdef RDEBUG
1624 								fprintf(stderr,"%.6lf: read_inode_dirty_region: case 3: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1625 #endif
1626 								memcpy(rreq->data + (offset - rreq->offset),buff,size);
1627 								if ((offset - rreq->offset) > rreq->rleng) {
1628 									memset(rreq->data + rreq->rleng,0,(offset - rreq->offset) - rreq->rleng);
1629 								}
1630 								if (size + (offset - rreq->offset) > rreq->rleng) {
1631 									rreq->rleng = size + (offset - rreq->offset);
1632 								}
1633 							} else {
1634 								// rreq: |-------|
1635 								// buff:   |--------|
1636 #ifdef RDEBUG
1637 								fprintf(stderr,"%.6lf: read_inode_dirty_region: case 4: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1638 #endif
1639 								memcpy(rreq->data+(offset-rreq->offset),buff,rreq->leng-(offset-rreq->offset));
1640 								if ((offset - rreq->offset) > rreq->rleng) {
1641 									memset(rreq->data + rreq->rleng,0,(offset - rreq->offset) - rreq->rleng);
1642 								}
1643 								if (rreq->leng > rreq->rleng) {
1644 									rreq->rleng = rreq->leng;
1645 								}
1646 							}
1647 						}
1648 					} else if (rreq->busy) { // in progress, so refresh it
1649 #ifdef RDEBUG
1650 						fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (%"PRIu64":%"PRIu32") : refresh\n",monotonic_seconds(),rreq->offset,rreq->leng);
1651 #endif
1652 						rreq->refresh = 1;
1653 					}
1654 				}
1655 #ifdef RDEBUG
1656 				if (rreq) {
1657 					fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (after): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1658 				} else {
1659 					fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (after): NULL\n",monotonic_seconds());
1660 				}
1661 #endif
1662 			}
1663 			if (id->flengisvalid && offset+size>id->fleng) {
1664 				id->fleng = offset+size;
1665 			}
1666 			if (id->waitingworker) {
1667 				if (write(id->pipe[1]," ",1)!=1) {
1668 					syslog(LOG_ERR,"can't write to pipe !!!");
1669 				}
1670 				id->waitingworker=0;
1671 			}
1672 		}
1673 	}
1674 	zassert(pthread_mutex_unlock(&glock));
1675 }
1676 
1677 // void read_inode_ops(uint32_t inode) {
read_inode_set_length(uint32_t inode,uint64_t newlength,uint8_t active)1678 void read_inode_set_length(uint32_t inode,uint64_t newlength,uint8_t active) {
1679 	uint32_t idh = IDHASH(inode);
1680 	inodedata *id;
1681 	rrequest *rreq,*rreqn;
1682 	int inqueue = 0;
1683 
1684 #ifdef RDEBUG
1685 	fprintf(stderr,"%.6lf: read_inode_set_length: inode: %"PRIu32" set length: %"PRIu64"\n",monotonic_seconds(),inode,newlength);
1686 #endif
1687 	zassert(pthread_mutex_lock(&glock));
1688 	for (id = idhash[idh] ; id ; id=id->next) {
1689 		if (id->inode == inode) {
1690 			for (rreq = id->reqhead ; rreq ; rreq=rreqn) {
1691 				rreqn = rreq->next;
1692 #ifdef RDEBUG
1693 				fprintf(stderr,"%.6lf: read_inode_set_length: rreq (before): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1694 #endif
1695 				if (rreq->free==0) {
1696 					if (rreq->filled) {
1697 						if (active) {
1698 							if (newlength < rreq->offset + rreq->rleng) {
1699 								if (newlength < rreq->offset) {
1700 #ifdef RDEBUG
1701 									fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 1: - set rleng to 0\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength);
1702 #endif
1703 									rreq->rleng = 0;
1704 								} else {
1705 #ifdef RDEBUG
1706 									fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 2: - set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,(uint32_t)(newlength - rreq->offset));
1707 #endif
1708 									rreq->rleng = newlength - rreq->offset;
1709 								}
1710 							} else if (newlength > rreq->offset + rreq->rleng) {
1711 								if (newlength > rreq->offset + rreq->leng) {
1712 #ifdef RDEBUG
1713 									fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 3: - clear data from rleng, set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,rreq->leng);
1714 #endif
1715 									memset(rreq->data + rreq->rleng,0,rreq->leng - rreq->rleng);
1716 									rreq->rleng = rreq->leng;
1717 								} else {
1718 #ifdef RDEBUG
1719 									fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 4: - clear data from rleng, set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,(uint32_t)(newlength - rreq->offset));
1720 #endif
1721 									memset(rreq->data + rreq->rleng,0,newlength - (rreq->offset + rreq->rleng));
1722 									rreq->rleng = newlength - rreq->offset;
1723 								}
1724 							}
1725 						} else {
1726 							if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1727 								*(rreq->prev) = rreq->next;
1728 								if (rreq->next) {
1729 									rreq->next->prev = rreq->prev;
1730 								} else {
1731 									id->reqtail = rreq->prev;
1732 								}
1733 								reqbufftotalsize -= rreq->leng;
1734 								free(rreq->data);
1735 								free(rreq);
1736 								rreq = NULL;
1737 							} else { // somebody wants it, so clear it
1738 								rreq->filled = 0;
1739 								if (rreq->busy==0) { // not busy ?
1740 									inqueue = 1; // add inode to queue
1741 								}
1742 							}
1743 						}
1744 					} else if (rreq->busy) {
1745 #ifdef RDEBUG
1746 						fprintf(stderr,"%.6lf: read_inode_set_length: block is busy - refresh\n",monotonic_seconds());
1747 #endif
1748 						rreq->refresh = 1;
1749 					}
1750 				}
1751 #if 0
1752 				if (rreq->free==0) {
1753 					if (id->flengisvalid==0) { // refresh everything
1754 						if (rreq->filled) {
1755 #ifdef RDEBUG
1756 							fprintf(stderr,"%.6lf: read_inode_set_length: old length unknown, block filled - clear it\n",monotonic_seconds());
1757 #endif
1758 							if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1759 								*(rreq->prev) = rreq->next;
1760 								if (rreq->next) {
1761 									rreq->next->prev = rreq->prev;
1762 								} else {
1763 									id->reqtail = rreq->prev;
1764 								}
1765 								reqbufftotalsize -= rreq->leng;
1766 								free(rreq->data);
1767 								free(rreq);
1768 								rreq = NULL;
1769 							} else { // somebody wants it, so clear it
1770 								rreq->filled = 0;
1771 								if (rreq->busy==0) { // not busy ?
1772 									clearedbuff = 1; // add inode to queue
1773 								}
1774 							}
1775 						} else if (rreq->busy) {
1776 #ifdef RDEBUG
1777 							fprintf(stderr,"%.6lf: read_inode_set_length: old length unknown, block is busy - refresh block\n",monotonic_seconds());
1778 #endif
1779 							rreq->refresh = 1;
1780 						}
1781 					} else if (id->fleng > newlength) { // file is shorter
1782 						if (rreq->filled) {
1783 #ifdef RDEBUG
1784 							fprintf(stderr,"%.6lf: read_inode_set_length: new length is smaller than previous, block filled - change rleng\n",monotonic_seconds());
1785 #endif
1786 							if (newlength<=rreq->offset) {
1787 								rreq->rleng = 0;
1788 							} else if (newlength<rreq->offset+rreq->leng) {
1789 								rreq->rleng = newlength - rreq->offset;
1790 							}
1791 						} else if (rreq->busy) {
1792 #ifdef RDEBUG
1793 							fprintf(stderr,"%.6lf: read_inode_set_length: new length is smaller than previous, block is busy - refresh block\n",monotonic_seconds());
1794 #endif
1795 							rreq->refresh = 1;
1796 						}
1797 					} else if (id->fleng < newlength) { // file is longer
1798 						if (rreq->filled) {
1799 #ifdef RDEBUG
1800 							fprintf(stderr,"%.6lf: read_inode_set_length: new length is larger than previous, block filled - clear buffer and change rleng\n",monotonic_seconds());
1801 #endif
1802 							if (newlength >= rreq->offset + rreq->leng) {
1803 								memset(rreq->data + rreq->rleng,0,rreq->leng - rreq->rleng);
1804 								rreq->rleng = rreq->leng;
1805 							} else if (newlength > rreq->offset + rreq->rleng) {
1806 								memset(rreq->data + rreq->rleng,0,newlength - (rreq->offset + rreq->rleng));
1807 								rreq->rleng = (newlength - rreq->offset);
1808 							}
1809 						} else if (rreq->busy) {
1810 #ifdef RDEBUG
1811 							fprintf(stderr,"%.6lf: read_inode_set_length: new length is larger than previous, block is busy - refresh block\n",monotonic_seconds());
1812 #endif
1813 							rreq->refresh = 1;
1814 						}
1815 					}
1816 				}
1817 #endif
1818 /*
1819 				if (rreq->free==0 && ((newlength < rreq->offset + rreq->leng) || (id->fleng < rreq->offset + rreq->leng))) {
1820 					if (rreq->filled) {
1821 						if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1822 							*(rreq->prev) = rreq->next;
1823 							if (rreq->next) {
1824 								rreq->next->prev = rreq->prev;
1825 							} else {
1826 								id->reqtail = rreq->prev;
1827 							}
1828 							reqbufftotalsize -= rreq->leng;
1829 							free(rreq->data);
1830 							free(rreq);
1831 							rreq = NULL;
1832 						} else { // somebody wants it, so clear it
1833 							rreq->filled = 0;
1834 							if (rreq->busy==0) { // not busy ?
1835 								clearedbuff = 1; // add inode to queue
1836 							}
1837 						}
1838 					} else if (rreq->busy) {
1839 						rreq->refresh = 1;
1840 					}
1841 				}
1842 */
1843 #ifdef RDEBUG
1844 				if (rreq) {
1845 					fprintf(stderr,"%.6lf: read_inode_set_length: rreq (after): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1846 				} else {
1847 					fprintf(stderr,"%.6lf: read_inode_set_length: rreq (after): NULL\n",monotonic_seconds());
1848 				}
1849 #endif
1850 			}
1851 			if (inqueue && id->inqueue==0) {
1852 				read_enqueue(id);
1853 				id->inqueue=1;
1854 			}
1855 			id->fleng = newlength;
1856 			id->flengisvalid = 1;
1857 			if (id->waitingworker) {
1858 				if (write(id->pipe[1]," ",1)!=1) {
1859 					syslog(LOG_ERR,"can't write to pipe !!!");
1860 				}
1861 				id->waitingworker=0;
1862 			}
1863 		}
1864 	}
1865 	zassert(pthread_mutex_unlock(&glock));
1866 }
1867 
read_data_new(uint32_t inode)1868 void* read_data_new(uint32_t inode) {
1869 	uint32_t idh = IDHASH(inode);
1870 	inodedata *id;
1871 	int pfd[2];
1872 
1873 	zassert(pthread_mutex_lock(&glock));
1874 
1875 	if (pipe(pfd)<0) {
1876 		syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
1877 		zassert(pthread_mutex_unlock(&glock));
1878 		return NULL;
1879 	}
1880 
1881 	id = malloc(sizeof(inodedata));
1882 	passert(id);
1883 	id->inode = inode;
1884 	id->flengisvalid = 0;
1885 	id->seqdata = 0;
1886 	id->fleng = 0;
1887 	id->status = 0;
1888 	id->trycnt = 0;
1889 	id->pipe[0] = pfd[0];
1890 	id->pipe[1] = pfd[1];
1891 	id->inqueue = 0;
1892 	id->readahead = 0;
1893 	id->lastoffset = 0;
1894 	id->closewaiting = 0;
1895 	id->waitingworker = 0;
1896 	id->lastchunkid = 0;
1897 	id->lastip = 0;
1898 	id->lastport = 0;
1899 	id->laststatus = 0;
1900 	zassert(pthread_cond_init(&(id->closecond),NULL));
1901 	id->reqhead = NULL;
1902 	id->reqtail = &(id->reqhead);
1903 	id->next = idhash[idh];
1904 	idhash[idh] = id;
1905 #ifdef RDEBUG
1906 	fprintf(stderr,"%.6lf: opening: %"PRIu32" ; inode_structure: %p\n",monotonic_seconds(),inode,(void*)id);
1907 //	read_data_hexdump((uint8_t*)id,sizeof(inodedata));
1908 #endif
1909 	zassert(pthread_mutex_unlock(&glock));
1910 	return id;
1911 }
1912 
read_data_end(void * vid)1913 void read_data_end(void *vid) {
1914 	inodedata *id,**idp;
1915 	rrequest *rreq,*rreqn;
1916 	inodedata *rid = (inodedata*)vid;
1917 	uint32_t idh = IDHASH(rid->inode);
1918 
1919 #ifdef RDEBUG
1920 	fprintf(stderr,"%.6lf: closing: %"PRIu32" ; inode_structure: %p\n",monotonic_seconds(),rid->inode,(void*)rid);
1921 //	read_data_hexdump((uint8_t*)rid,sizeof(inodedata));
1922 #endif
1923 	zassert(pthread_mutex_lock(&glock));
1924 #ifdef RDEBUG
1925 	fprintf(stderr,"%.6lf: closing: %"PRIu32" ; cleaning req list\n",monotonic_seconds(),rid->inode);
1926 #endif
1927 	for (rreq = rid->reqhead ; rreq ; rreq=rreqn) {
1928 		rreqn = rreq->next;
1929 #ifdef RDEBUG
1930 		fprintf(stderr,"%.6lf: closing: %"PRIu32" ; rreq: lcnt: %u ; busy: %u ; free: %u ; filled: %u\n",monotonic_seconds(),rid->inode,rreq->lcnt,rreq->busy,rreq->free,rreq->filled);
1931 #endif
1932 		if (rreq->lcnt==0 && rreq->busy==0) {
1933 			*(rreq->prev) = rreq->next;
1934 			if (rreq->next) {
1935 				rreq->next->prev = rreq->prev;
1936 			} else {
1937 				rid->reqtail = rreq->prev;
1938 			}
1939 			reqbufftotalsize -= rreq->leng;
1940 			free(rreq->data);
1941 			free(rreq);
1942 		} else {
1943 			rreq->free = 1;
1944 		}
1945 	}
1946 	while (rid->reqhead!=NULL || rid->inqueue==1) {
1947 #ifdef RDEBUG
1948 		fprintf(stderr,"%.6lf: closing: %"PRIu32" ; reqhead: %s ; inqueue: %u\n",monotonic_seconds(),rid->inode,rid->reqhead?"NOT NULL":"NULL",rid->inqueue);
1949 #endif
1950 		rid->closewaiting++;
1951 		if (rid->waitingworker) {
1952 			if (write(rid->pipe[1]," ",1)!=1) {
1953 				syslog(LOG_ERR,"can't write to pipe !!!");
1954 			}
1955 			rid->waitingworker=0;
1956 		}
1957 #ifdef RDEBUG
1958 		fprintf(stderr,"%.6lf: inode: %"PRIu32" ; waiting for close\n",monotonic_seconds(),rid->inode);
1959 #endif
1960 		zassert(pthread_cond_wait(&(rid->closecond),&glock));
1961 		rid->closewaiting--;
1962 	}
1963 #ifdef RDEBUG
1964 	fprintf(stderr,"%.6lf: closing: %"PRIu32" ; reqhead: %s ; inqueue: %u - delete structure\n",monotonic_seconds(),rid->inode,rid->reqhead?"NOT NULL":"NULL",rid->inqueue);
1965 #endif
1966 	idp = &(idhash[idh]);
1967 	while ((id=*idp)) {
1968 		if (id==rid) {
1969 			*idp = id->next;
1970 			zassert(pthread_cond_destroy(&(id->closecond)));
1971 			close(id->pipe[0]);
1972 			close(id->pipe[1]);
1973 			free(id);
1974 		} else {
1975 			idp = &(id->next);
1976 		}
1977 	}
1978 	zassert(pthread_mutex_unlock(&glock));
1979 }
1980