1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <sys/types.h>
26 #include <stdio.h>
27 #ifdef HAVE_READV
28 #include <sys/uio.h>
29 #endif
30 #include <sys/time.h>
31 #include <unistd.h>
32 #include <poll.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <signal.h>
39 #include <pthread.h>
40 #include <inttypes.h>
41
42 #include "massert.h"
43 #include "datapack.h"
44 #include "crc.h"
45 #include "strerr.h"
46 #include "mfsstrerr.h"
47 #include "pcqueue.h"
48 #include "sockets.h"
49 #include "conncache.h"
50 #include "csdb.h"
51 #include "mastercomm.h"
52 #include "clocks.h"
53 #include "portable.h"
54 #include "readdata.h"
55 #include "MFSCommunication.h"
56
57 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
58
59 #define WORKER_IDLE_TIMEOUT 1.0
60
61 #define WORKER_BUSY_LAST_REQUEST_TIMEOUT 5.0
62 #define WORKER_BUSY_WAIT_FOR_FINISH 5.0
63 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
64
65 #define BUFFER_VALIDITY_TIMEOUT 60.0
66
67 #define SUSTAIN_WORKERS 50
68 #define HEAVYLOAD_WORKERS 150
69 #define MAX_WORKERS 250
70
71 #define IDHASHSIZE 256
72 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
73
74 /*
75 typedef struct cblock_s {
76 uint8_t data[MFSBLOCKSIZE];
77 uint32_t chindx; // chunk number
78 uint16_t pos; // block in chunk (0..1023)
79 uint8_t filled; // data present
80 uint8_t wakeup; // somebody wants to be woken up when data are present
81 struct readbuffer *next;
82 } cblock;
83 */
84
85 // #define RDEBUG 1
86
87 #ifdef RDEBUG
read_data_hexdump(uint8_t * buff,uint32_t leng)88 void read_data_hexdump(uint8_t *buff,uint32_t leng) {
89 uint32_t i;
90 for (i=0 ; i<leng ; i++) {
91 if ((i%32)==0) {
92 fprintf(stderr,"%p %08X:",(void*)(buff+i),i);
93 }
94 fprintf(stderr," %02X",buff[i]);
95 if ((i%32)==31) {
96 fprintf(stderr,"\n");
97 }
98 }
99 if ((leng%32)!=0) {
100 fprintf(stderr,"\n");
101 }
102 }
103 #endif
104
105 typedef struct rrequest_s {
106 uint8_t *data;
107 uint64_t offset;
108 uint32_t leng;
109 uint32_t rleng;
110 uint32_t chindx;
111 double modified;
112 uint8_t filled;
113 uint8_t refresh;
114 uint8_t busy;
115 uint8_t free;
116 uint16_t lcnt;
117 uint16_t waiting;
118 pthread_cond_t cond;
119 struct rrequest_s *next,**prev;
120 } rrequest;
121
122 typedef struct inodedata_s {
123 uint32_t inode;
124 uint32_t seqdata;
125 uint64_t fleng;
126 // uint32_t cacheblockcount;
127 int status;
128 uint16_t closewaiting;
129 uint32_t trycnt;
130 uint8_t flengisvalid;
131 uint8_t waitingworker;
132 uint8_t inqueue;
133 int pipe[2];
134 uint8_t readahead;
135 uint64_t lastoffset;
136 uint64_t lastchunkid;
137 uint32_t lastip;
138 uint16_t lastport;
139 uint8_t laststatus;
140 rrequest *reqhead,**reqtail;
141 // cblock *datachainhead,*datachaintail;
142 pthread_cond_t closecond;
143 struct inodedata_s *next;
144 } inodedata;
145
146 typedef struct worker_s {
147 pthread_t thread_id;
148 } worker;
149
150 //static pthread_cond_t fcbcond;
151 //static uint8_t fcbwaiting;
152 //static cblock *cacheblocks,*freecblockshead;
153 //static uint32_t freecacheblocks;
154
155 static uint32_t readahead;
156 static uint32_t readahead_trigger;
157
158 static uint32_t maxretries;
159 static uint64_t maxreadaheadsize;
160 static uint64_t reqbufftotalsize;
161
162 static inodedata **idhash;
163
164 static pthread_mutex_t glock;
165
166 //#ifdef BUFFER_DEBUG
167 //static pthread_t info_worker_th;
168 //static uint32_t usedblocks;
169 //#endif
170
171 static pthread_t dqueue_worker_th;
172
173 static uint32_t workers_avail;
174 static uint32_t workers_total;
175 static uint32_t worker_term_waiting;
176 static pthread_cond_t worker_term_cond;
177 static pthread_attr_t worker_thattr;
178
179 // static pthread_t read_worker_th[WORKERS];
180 //static inodedata *read_worker_id[WORKERS];
181
182 static void *jqueue,*dqueue;
183
184 /* queues */
185
186 /* glock: UNUSED */
read_delayed_enqueue(inodedata * id,uint32_t cnt)187 void read_delayed_enqueue(inodedata *id,uint32_t cnt) {
188 uint64_t t;
189 if (cnt>0) {
190 t = monotonic_useconds();
191 queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
192 } else {
193 queue_put(jqueue,0,0,(uint8_t*)id,0);
194 }
195 }
196
197 /* glock: UNUSED */
read_enqueue(inodedata * id)198 void read_enqueue(inodedata *id) {
199 queue_put(jqueue,0,0,(uint8_t*)id,0);
200 }
201
202 /* worker thread | glock: UNUSED */
read_dqueue_worker(void * arg)203 void* read_dqueue_worker(void *arg) {
204 uint64_t t,usec;
205 uint32_t husec,lusec,cnt;
206 uint8_t *id;
207 (void)arg;
208 for (;;) {
209 queue_get(dqueue,&husec,&lusec,&id,&cnt);
210 if (id==NULL) {
211 return NULL;
212 }
213 t = monotonic_useconds();
214 usec = husec;
215 usec <<= 32;
216 usec |= lusec;
217 if (t>usec) {
218 t -= usec;
219 while (t>=1000000 && cnt>0) {
220 t-=1000000;
221 cnt--;
222 }
223 if (cnt>0) {
224 if (t<1000000) {
225 portable_usleep(1000000-t);
226 }
227 cnt--;
228 }
229 }
230 if (cnt>0) {
231 t = monotonic_useconds();
232 queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
233 } else {
234 queue_put(jqueue,0,0,id,0);
235 }
236 }
237 return NULL;
238 }
239
240 /* glock: UNLOCKED */
read_job_end(inodedata * id,int status,uint32_t delay)241 void read_job_end(inodedata *id,int status,uint32_t delay) {
242 rrequest *rreq,**rreqp;
243 uint8_t todo;
244
245 zassert(pthread_mutex_lock(&glock));
246 if (status) {
247 if (id->closewaiting==0) {
248 errno = status;
249 syslog(LOG_WARNING,"error reading file number %"PRIu32": %s",id->inode,strerr(errno));
250 }
251 id->status = status;
252 }
253 status = id->status;
254 todo = 0;
255 if (status==0) {
256 for (rreq = id->reqhead ; rreq && todo==0 ; rreq=rreq->next) {
257 if (rreq->filled==0 && rreq->free==0) {
258 todo=1;
259 }
260 }
261 if (delay==0) {
262 id->trycnt=0; // on good read reset try counter
263 }
264 }
265
266 if (id->closewaiting) {
267 #ifdef RDEBUG
268 fprintf(stderr,"%.6lf: inode: %"PRIu32" - closewaiting\n",monotonic_seconds(),id->inode);
269 #endif
270 rreqp = &(id->reqhead);
271 while ((rreq = *rreqp)) {
272 if (rreq->lcnt==0 && rreq->busy==0) {
273 reqbufftotalsize -= rreq->leng;
274 free(rreq->data);
275 *rreqp = rreq->next;
276 free(rreq);
277 } else {
278 if (rreq->filled==0) {
279 rreq->rleng = 0;
280 rreq->filled = 1;
281 }
282 if (rreq->waiting) {
283 zassert(pthread_cond_broadcast(&(rreq->cond)));
284 }
285 rreq->free = 1;
286 rreqp = &(rreq->next);
287 }
288 }
289
290 id->inqueue=0;
291
292 #ifdef RDEBUG
293 fprintf(stderr,"%.6lf: inode: %"PRIu32" - reqhead: %s (reqbufftotalsize: %"PRIu64")\n",monotonic_seconds(),id->inode,id->reqhead?"NOT NULL":"NULL",reqbufftotalsize);
294 #endif
295 if (id->reqhead==NULL) {
296 zassert(pthread_cond_broadcast(&(id->closecond)));
297 }
298 } else if (todo && status==0) { // still have some work to do
299 read_delayed_enqueue(id,delay);
300 } else { // no more work, descriptor wait for being closed or error occurred
301 for (rreq = id->reqhead ; rreq ; rreq=rreq->next) {
302 if (rreq->filled==0) { // error occurred
303 rreq->rleng = 0;
304 rreq->filled = 1;
305 if (rreq->waiting) {
306 zassert(pthread_cond_broadcast(&(rreq->cond)));
307 }
308 }
309 }
310
311 id->inqueue=0;
312 }
313 zassert(pthread_mutex_unlock(&glock));
314 }
315
316 void* read_worker(void *arg);
317
318 #ifndef RDEBUG
319 static uint32_t lastnotify = 0;
320 #endif
321
322 /* glock:LOCKED */
read_data_spawn_worker(void)323 static inline void read_data_spawn_worker(void) {
324 sigset_t oldset;
325 sigset_t newset;
326 worker *w;
327 int res;
328
329 w = malloc(sizeof(worker));
330 if (w==NULL) {
331 return;
332 }
333 sigemptyset(&newset);
334 sigaddset(&newset, SIGTERM);
335 sigaddset(&newset, SIGINT);
336 sigaddset(&newset, SIGHUP);
337 sigaddset(&newset, SIGQUIT);
338 pthread_sigmask(SIG_BLOCK, &newset, &oldset);
339 res = pthread_create(&(w->thread_id),&worker_thattr,read_worker,w);
340 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
341 if (res<0) {
342 return;
343 }
344 workers_avail++;
345 workers_total++;
346 #ifdef RDEBUG
347 fprintf(stderr,"%.6lf: spawn read worker (total: %"PRIu32")\n",monotonic_seconds(),workers_total);
348 #else
349 if (workers_total%10==0 && workers_total!=lastnotify) {
350 syslog(LOG_INFO,"read workers: %"PRIu32"+",workers_total);
351 lastnotify = workers_total;
352 }
353 #endif
354 }
355
356 /* glock:LOCKED */
read_data_close_worker(worker * w)357 static inline void read_data_close_worker(worker *w) {
358 workers_avail--;
359 workers_total--;
360 if (workers_total==0 && worker_term_waiting) {
361 zassert(pthread_cond_signal(&worker_term_cond));
362 worker_term_waiting--;
363 }
364 pthread_detach(w->thread_id);
365 free(w);
366 #ifdef RDEBUG
367 fprintf(stderr,"%.6lf: close read worker (total: %"PRIu32")\n",monotonic_seconds(),workers_total);
368 #else
369 if (workers_total%10==0 && workers_total!=lastnotify) {
370 syslog(LOG_INFO,"read workers: %"PRIu32"-",workers_total);
371 lastnotify = workers_total;
372 }
373 #endif
374 }
375
read_prepare_ip(char ipstr[16],uint32_t ip)376 static inline void read_prepare_ip (char ipstr[16],uint32_t ip) {
377 if (ipstr[0]==0) {
378 snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
379 ipstr[15]=0;
380 }
381 }
382
383 /* main working thread | glock:UNLOCKED */
read_worker(void * arg)384 void* read_worker(void *arg) {
385 uint32_t z1,z2,z3;
386 uint8_t *data;
387 int fd;
388 int i;
389 struct pollfd pfd[3];
390 uint32_t sent,tosend,received,currentpos;
391 uint8_t recvbuff[20];
392 uint8_t sendbuff[29];
393 #ifdef HAVE_READV
394 struct iovec siov[2];
395 #endif
396 uint8_t pipebuff[1024];
397 uint8_t *wptr;
398 const uint8_t *rptr;
399
400 uint32_t reccmd;
401 uint32_t recleng;
402 uint64_t recchunkid;
403 uint16_t recblocknum;
404 uint16_t recoffset;
405 uint32_t recsize;
406 uint32_t reccrc;
407 uint8_t recstatus;
408 uint8_t gotstatus;
409
410 uint32_t chindx;
411 uint32_t ip;
412 uint16_t port;
413 uint32_t srcip;
414 uint64_t mfleng;
415 uint64_t chunkid;
416 uint32_t version;
417 const uint8_t *csdata;
418 uint32_t tmpip;
419 uint16_t tmpport;
420 uint32_t tmpver;
421 uint32_t csver;
422 uint32_t cnt,bestcnt;
423 uint32_t csdatasize;
424 uint8_t csdataver;
425 uint8_t csrecsize;
426 uint8_t rdstatus;
427 int status;
428 char csstrip[16];
429 uint8_t reqsend;
430 uint8_t closewaiting;
431 double start,now,lastrcvd,lastsend;
432 double workingtime,lrdiff;
433 uint8_t firsttime = 1;
434 worker *w = (worker*)arg;
435
436 inodedata *id;
437 rrequest *rreq,*nrreq;
438
439 ip = 0;
440 port = 0;
441 csstrip[0] = 0;
442
443 for (;;) {
444 if (ip || port) {
445 csdb_readdec(ip,port);
446 }
447 ip = 0;
448 port = 0;
449 csstrip[0] = 0;
450
451 if (firsttime==0) {
452 zassert(pthread_mutex_lock(&glock));
453 workers_avail++;
454 if (workers_avail > SUSTAIN_WORKERS) {
455 // fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
456 read_data_close_worker(w);
457 zassert(pthread_mutex_unlock(&glock));
458 return NULL;
459 }
460 zassert(pthread_mutex_unlock(&glock));
461 }
462 firsttime = 0;
463
464 // get next job
465 queue_get(jqueue,&z1,&z2,&data,&z3);
466
467 zassert(pthread_mutex_lock(&glock));
468
469 if (data==NULL) {
470 // fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
471 read_data_close_worker(w);
472 zassert(pthread_mutex_unlock(&glock));
473 return NULL;
474 }
475
476 workers_avail--;
477 if (workers_avail==0 && workers_total<MAX_WORKERS) {
478 read_data_spawn_worker();
479 // fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
480 }
481
482 id = (inodedata*)data;
483
484 for (rreq = id->reqhead ; rreq && rreq->filled==1 && rreq->busy==0 ; rreq=rreq->next) {}
485 if (rreq) {
486 chindx = rreq->chindx;
487 status = id->status;
488 if (status==STATUS_OK) {
489 rreq->busy = 1;
490 }
491 } else {
492 // no data to read - just ignore it
493 zassert(pthread_mutex_unlock(&glock));
494 read_job_end(id,0,0);
495 continue;
496 }
497
498 zassert(pthread_mutex_unlock(&glock));
499
500 if (status!=STATUS_OK) {
501 read_job_end(id,status,0);
502 continue;
503 }
504
505 // get chunk data from master
506 // start = monotonic_seconds();
507 rdstatus = fs_readchunk(id->inode,chindx,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
508
509 if (rdstatus!=STATUS_OK) {
510 zassert(pthread_mutex_lock(&glock));
511 rreq->busy = 0;
512 zassert(pthread_mutex_unlock(&glock));
513 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_readchunk returned status: %s",id->inode,chindx,mfsstrerr(rdstatus));
514 if (rdstatus==ERROR_ENOENT) {
515 read_job_end(id,EBADF,0);
516 } else if (rdstatus==ERROR_QUOTA) {
517 read_job_end(id,EDQUOT,0);
518 } else if (rdstatus==ERROR_NOSPACE) {
519 read_job_end(id,ENOSPC,0);
520 } else if (rdstatus==ERROR_CHUNKLOST) {
521 read_job_end(id,ENXIO,0);
522 } else {
523 id->trycnt++;
524 if (id->trycnt>=maxretries) {
525 if (rdstatus==ERROR_NOCHUNKSERVERS) {
526 read_job_end(id,ENOSPC,0);
527 } else if (rdstatus==ERROR_CSNOTPRESENT) {
528 read_job_end(id,ENXIO,0);
529 } else {
530 read_job_end(id,EIO,0);
531 }
532 } else {
533 read_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
534 }
535 }
536 continue; // get next job
537 }
538 // now = monotonic_seconds();
539 // fprintf(stderr,"fs_readchunk time: %.3lf\n",now-start);
540 if (chunkid==0 && version==0) { // empty chunk
541 zassert(pthread_mutex_lock(&glock));
542 id->fleng = mfleng;
543 id->flengisvalid = 1;
544 rreq->busy = 0;
545 #ifdef RDEBUG
546 fprintf(stderr,"%.6lf: inode: %"PRIu32" ; mfleng: %"PRIu64" (empty chunk)\n",monotonic_seconds(),id->inode,id->fleng);
547 #endif
548 while (rreq) {
549 if (rreq->offset > mfleng) {
550 rreq->rleng = 0;
551 } else if ((rreq->offset + rreq->leng) > mfleng) {
552 rreq->rleng = mfleng - rreq->offset;
553 } else {
554 rreq->rleng = rreq->leng;
555 }
556
557 if (rreq->rleng>0) {
558 memset(rreq->data,0,rreq->rleng);
559 }
560 rreq->filled=1;
561 rreq->modified = monotonic_seconds();
562 if (rreq->waiting>0) {
563 zassert(pthread_cond_broadcast(&(rreq->cond)));
564 }
565 rreq = NULL;
566
567 for (nrreq = id->reqhead ; nrreq && nrreq->filled==1 && nrreq->busy==0 ; nrreq=nrreq->next) {}
568 if (nrreq && nrreq->chindx==chindx) {
569 rreq = nrreq;
570 #ifdef RDEBUG
571 fprintf(stderr,"%.6lf: readworker: get next request (empty chunk)\n",monotonic_seconds());
572 #endif
573 }
574 }
575
576 zassert(pthread_mutex_unlock(&glock));
577 read_job_end(id,0,0);
578
579 continue;
580 }
581
582 if (csdata==NULL || csdatasize==0) {
583 zassert(pthread_mutex_lock(&glock));
584 rreq->busy = 0;
585 zassert(pthread_mutex_unlock(&glock));
586 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies",id->inode,chindx,chunkid,version);
587 id->trycnt+=6;
588 if (id->trycnt>=maxretries) {
589 read_job_end(id,ENXIO,0);
590 } else {
591 read_delayed_enqueue(id,60);
592 }
593 continue;
594 }
595
596 ip = 0; // make old compilers happy
597 port = 0; // make old compilers happy
598 csver = 0; // make old compilers happy
599 if (csdataver==0) {
600 csrecsize = 6;
601 } else {
602 csrecsize = 10;
603 }
604 // choose cs
605 bestcnt = 0xFFFFFFFF;
606 while (csdatasize>=csrecsize) {
607 tmpip = get32bit(&csdata);
608 tmpport = get16bit(&csdata);
609 if (csdataver>0) {
610 tmpver = get32bit(&csdata);
611 } else {
612 tmpver = 0;
613 }
614 csdatasize-=csrecsize;
615 if (id->lastchunkid==chunkid && tmpip==id->lastip && tmpport==id->lastport) {
616 if (id->laststatus==1) {
617 ip = tmpip;
618 port = tmpport;
619 csver = tmpver;
620 break;
621 } else {
622 cnt = 0xFFFFFFFE;
623 }
624 } else {
625 cnt = csdb_getopcnt(tmpip,tmpport);
626 }
627 if (cnt<bestcnt) {
628 ip = tmpip;
629 port = tmpport;
630 csver = tmpver;
631 bestcnt = cnt;
632 }
633 }
634
635 if (ip || port) {
636 csdb_readinc(ip,port);
637 id->lastchunkid = chunkid;
638 id->lastip = ip;
639 id->lastport = port;
640 id->laststatus = 0;
641 } else {
642 zassert(pthread_mutex_lock(&glock));
643 rreq->busy = 0;
644 zassert(pthread_mutex_unlock(&glock));
645 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies (bad ip and/or port)",id->inode,chindx,chunkid,version);
646 id->trycnt+=6;
647 if (id->trycnt>=maxretries) {
648 read_job_end(id,ENXIO,0);
649 } else {
650 read_delayed_enqueue(id,60);
651 }
652 continue;
653 }
654
655 start = monotonic_seconds();
656
657
658 // make connection to cs
659 srcip = fs_getsrcip();
660 fd = conncache_get(ip,port);
661 if (fd<0) {
662 cnt=0;
663 while (cnt<10) {
664 fd = tcpsocket();
665 if (fd<0) {
666 syslog(LOG_WARNING,"readworker: can't create tcp socket: %s",strerr(errno));
667 break;
668 }
669 if (srcip) {
670 if (tcpnumbind(fd,srcip,0)<0) {
671 syslog(LOG_WARNING,"readworker: can't bind socket to given ip: %s",strerr(errno));
672 tcpclose(fd);
673 fd=-1;
674 break;
675 }
676 }
677 if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
678 cnt++;
679 if (cnt>=10) {
680 read_prepare_ip(csstrip,ip);
681 syslog(LOG_WARNING,"readworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(errno));
682 }
683 close(fd);
684 fd=-1;
685 } else {
686 cnt=10;
687 }
688 }
689 }
690 if (fd<0) {
691 zassert(pthread_mutex_lock(&glock));
692 rreq->busy = 0;
693 zassert(pthread_mutex_unlock(&glock));
694 id->trycnt++;
695 if (id->trycnt>=maxretries) {
696 read_job_end(id,EIO,0);
697 } else {
698 read_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
699 }
700 continue;
701 }
702 if (tcpnodelay(fd)<0) {
703 syslog(LOG_WARNING,"readworker: can't set TCP_NODELAY: %s",strerr(errno));
704 }
705
706 pfd[0].fd = fd;
707 pfd[1].fd = id->pipe[0];
708 currentpos = 0;
709 gotstatus = 0;
710 received = 0;
711 reqsend = 0;
712 sent = 0;
713 tosend = 0;
714 lastrcvd = 0.0;
715 lastsend = 0.0;
716
717 zassert(pthread_mutex_lock(&glock));
718 id->fleng = mfleng;
719 id->flengisvalid = 1;
720 #ifdef RDEBUG
721 fprintf(stderr,"%.6lf: inode: %"PRIu32" ; mfleng: %"PRIu64"\n",monotonic_seconds(),id->inode,id->fleng);
722 #endif
723 zassert(pthread_mutex_unlock(&glock));
724
725 reccmd = 0; // makes gcc happy
726 recleng = 0; // makes gcc happy
727
728 do {
729 now = monotonic_seconds();
730 #ifdef RDEBUG
731 if (rreq) {
732 fprintf(stderr,"%.6lf: readworker inode: %"PRIu32" ; rreq: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng);
733 } else {
734 fprintf(stderr,"%.6lf: readworker inode: %"PRIu32" ; rreq: NULL\n",monotonic_seconds(),id->inode);
735 }
736 #endif
737
738 zassert(pthread_mutex_lock(&glock));
739
740 if (id->flengisvalid) {
741 mfleng = id->fleng;
742 }
743
744 if (rreq!=NULL && ((reqsend && gotstatus) || rreq->refresh==1)) { // rreq has been read or needs to be reread
745 rreq->busy = 0;
746 if (rreq->refresh==1) {
747 rreq->refresh = 0;
748 rreq->filled = 0;
749 zassert(pthread_mutex_unlock(&glock));
750 status = EINTR;
751 break;
752 } else {
753 rreq->filled = 1;
754 rreq->modified = monotonic_seconds();
755 }
756 if (rreq->waiting>0) {
757 zassert(pthread_cond_broadcast(&(rreq->cond)));
758 }
759 rreq = NULL;
760 }
761
762 if (lastrcvd==0.0) {
763 lastrcvd = now;
764 } else {
765 lrdiff = now - lastrcvd;
766 if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
767 read_prepare_ip(csstrip,ip);
768 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was timed out (lastrcvd:%.6lf,now:%.6lf,lrdiff:%.6lf received: %"PRIu32"/%"PRIu32", try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,lastrcvd,now,lrdiff,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
769 if (rreq) {
770 status = EIO;
771 }
772 zassert(pthread_mutex_unlock(&glock));
773 break;
774 }
775 }
776
777 workingtime = now - start;
778
779 if (rreq==NULL) { // finished current block
780 for (nrreq = id->reqhead ; nrreq && nrreq->filled==1 && nrreq->busy==0 ; nrreq=nrreq->next) {}
781 if (nrreq) { // have next block
782 if (nrreq->chindx!=chindx || nrreq->filled || nrreq->busy || workingtime>WORKER_BUSY_LAST_REQUEST_TIMEOUT+((workers_total>HEAVYLOAD_WORKERS)?0.0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) {
783 #ifdef RDEBUG
784 fprintf(stderr,"%.6lf: readworker: ignore next request\n",monotonic_seconds());
785 #endif
786 zassert(pthread_mutex_unlock(&glock));
787 break;
788 }
789 if (nrreq->lcnt==0 && workers_total>HEAVYLOAD_WORKERS) { // currently nobody wants this block and there are a lot of busy workers, so skip this one
790 #ifdef RDEBUG
791 fprintf(stderr,"%.6lf: readworker: lcnt is zero and there are a lot of workers\n",monotonic_seconds());
792 #endif
793 zassert(pthread_mutex_unlock(&glock));
794 break;
795 }
796 #ifdef RDEBUG
797 fprintf(stderr,"%.6lf: readworker: get next request\n",monotonic_seconds());
798 #endif
799 rreq = nrreq;
800 rreq->busy = 1;
801 currentpos = 0;
802 received = 0;
803 reqsend = 0;
804 gotstatus = 0;
805 } else { // do not have next block
806 if (workingtime>WORKER_IDLE_TIMEOUT || workers_total>HEAVYLOAD_WORKERS) {
807 #ifdef RDEBUG
808 fprintf(stderr,"%.6lf: readworker: next request doesn't exist and there are a lot of workers or idle timeout passed\n",monotonic_seconds());
809 #endif
810 zassert(pthread_mutex_unlock(&glock));
811 break;
812 }
813 #ifdef RDEBUG
814 fprintf(stderr,"%.6lf: readworker: next request doesn't exist, so wait on pipe\n",monotonic_seconds());
815 #endif
816 }
817 } else { // have current block
818 if (workingtime>(WORKER_BUSY_LAST_REQUEST_TIMEOUT+WORKER_BUSY_WAIT_FOR_FINISH+((workers_total>HEAVYLOAD_WORKERS)?0.0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT))) {
819 #ifdef RDEBUG
820 fprintf(stderr,"%.6lf: readworker: current request not finished but busy timeout passed\n",monotonic_seconds());
821 #endif
822 zassert(pthread_mutex_unlock(&glock));
823 status = EINTR;
824 break;
825 }
826 }
827
828 if (reqsend==0) {
829 if (rreq->offset > mfleng) {
830 rreq->rleng = 0;
831 } else if ((rreq->offset + rreq->leng) > mfleng) {
832 rreq->rleng = mfleng - rreq->offset;
833 } else {
834 rreq->rleng = rreq->leng;
835 }
836 if (rreq->rleng>0) {
837 wptr = sendbuff;
838 put32bit(&wptr,CLTOCS_READ);
839 if (csver>=VERSION2INT(1,7,32)) {
840 put32bit(&wptr,21);
841 put8bit(&wptr,1);
842 tosend = 29;
843 } else {
844 put32bit(&wptr,20);
845 tosend = 28;
846 }
847 put64bit(&wptr,chunkid);
848 put32bit(&wptr,version);
849 put32bit(&wptr,(rreq->offset & MFSCHUNKMASK));
850 put32bit(&wptr,rreq->rleng);
851 sent = 0;
852 reqsend = 1;
853 } else {
854 tosend = 0;
855 sent = 0;
856 reqsend = 1;
857 gotstatus = 1;
858 zassert(pthread_mutex_unlock(&glock));
859 continue;
860 }
861 }
862
863 id->waitingworker=1;
864 zassert(pthread_mutex_unlock(&glock));
865
866 if (tosend==0 && (now - lastsend > (CHUNKSERVER_ACTIVITY_TIMEOUT/2.0))) {
867 wptr = sendbuff;
868 put32bit(&wptr,ANTOAN_NOP);
869 put32bit(&wptr,0);
870 tosend = 8;
871 sent = 0;
872 }
873
874 if (tosend>0) {
875 i = write(fd,sendbuff+sent,tosend-sent);
876 if (i<0) { // error
877 if (ERRNO_ERROR && errno!=EINTR) {
878 read_prepare_ip(csstrip,ip);
879 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: write to (%s:%"PRIu16") error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
880 status = EIO;
881 zassert(pthread_mutex_lock(&glock));
882 id->waitingworker=0;
883 zassert(pthread_mutex_unlock(&glock));
884 break;
885 } else {
886 i=0;
887 }
888 }
889 if (i>0) {
890 sent += i;
891 if (tosend<=sent) {
892 sent = 0;
893 tosend = 0;
894 }
895 lastsend = now;
896 }
897 }
898
899 pfd[0].events = POLLIN | ((tosend>0)?POLLOUT:0);
900 pfd[0].revents = 0;
901 pfd[1].events = POLLIN;
902 pfd[1].revents = 0;
903 if (poll(pfd,2,100)<0) {
904 if (errno!=EINTR) {
905 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: poll error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
906 status = EIO;
907 break;
908 }
909 }
910 zassert(pthread_mutex_lock(&glock));
911 id->waitingworker=0;
912 closewaiting = (id->closewaiting>0)?1:0;
913 zassert(pthread_mutex_unlock(&glock));
914 if (pfd[1].revents&POLLIN) { // used just to break poll - so just read all data from pipe to empty it
915 #ifdef RDEBUG
916 fprintf(stderr,"%.6lf: readworker: %"PRIu32" woken up by pipe\n",monotonic_seconds(),id->inode);
917 #endif
918 i = read(id->pipe[0],pipebuff,1024);
919 if (i<0) { // mainly to make happy static code analyzers
920 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: read pipe error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
921 }
922 }
923 if (closewaiting) {
924 #ifdef RDEBUG
925 fprintf(stderr,"%.6lf: readworker: closewaiting\n",monotonic_seconds());
926 #endif
927 if (rreq!=NULL) {
928 status = EINTR;
929 }
930 break;
931 }
932 if (pfd[0].revents&POLLHUP) {
933 read_prepare_ip(csstrip,ip);
934 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
935 status = EIO;
936 break;
937 }
938 if (pfd[0].revents&POLLERR) {
939 read_prepare_ip(csstrip,ip);
940 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") got error status / POLLERR (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
941 status = EIO;
942 break;
943 }
944 if (pfd[0].revents&POLLIN) {
945 lastrcvd = monotonic_seconds();
946 if (received < 8) {
947 i = read(fd,recvbuff+received,8-received);
948 if (i==0) {
949 read_prepare_ip(csstrip,ip);
950 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
951 status = EIO;
952 break;
953 }
954 if (i<0) {
955 read_prepare_ip(csstrip,ip);
956 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: read from (%s:%"PRIu16") error: %s (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
957 status = EIO;
958 break;
959 }
960 received += i;
961 if (received == 8) { // full header
962 rptr = recvbuff;
963
964 reccmd = get32bit(&rptr);
965 recleng = get32bit(&rptr);
966 if (reccmd==CSTOCL_READ_STATUS) {
967 if (recleng!=9) {
968 syslog(LOG_WARNING,"readworker: got wrong sized status packet from chunkserver (leng:%"PRIu32")",recleng);
969 status = EIO;
970 break;
971 }
972 } else if (reccmd==CSTOCL_READ_DATA) {
973 if (rreq==NULL) {
974 syslog(LOG_WARNING,"readworker: got unexpected data from chunkserver (leng:%"PRIu32")",recleng);
975 status = EIO;
976 break;
977 } else if (recleng<20) {
978 syslog(LOG_WARNING,"readworker: got too short data packet from chunkserver (leng:%"PRIu32")",recleng);
979 status = EIO;
980 break;
981 } else if ((recleng-20) + currentpos > rreq->rleng) {
982 syslog(LOG_WARNING,"readworker: got too long data packet from chunkserver (leng:%"PRIu32")",recleng);
983 status = EIO;
984 break;
985 }
986 } else if (reccmd==ANTOAN_NOP) {
987 if (recleng!=0) {
988 syslog(LOG_WARNING,"readworker: got wrong sized nop packet from chunkserver (leng:%"PRIu32")",recleng);
989 status = EIO;
990 break;
991 }
992 received = 0;
993 } else {
994 uint32_t myip,peerip;
995 uint16_t myport,peerport;
996 tcpgetpeer(fd,&peerip,&peerport);
997 tcpgetmyaddr(fd,&myip,&myport);
998 syslog(LOG_WARNING,"readworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32",%u.%u.%u.%u:%u<->%u.%u.%u.%u:%u)",reccmd,recleng,(myip>>24)&0xFF,(myip>>16)&0xFF,(myip>>8)&0xFF,myip&0xFF,myport,(peerip>>24)&0xFF,(peerip>>16)&0xFF,(peerip>>8)&0xFF,peerip&0xFF,peerport);
999 status = EIO;
1000 break;
1001 }
1002 }
1003 }
1004 if (received >= 8) {
1005 if (recleng<=20) {
1006 i = read(fd,recvbuff + (received-8),recleng - (received-8));
1007 } else {
1008 if (received < 8 + 20) {
1009 #ifdef HAVE_READV
1010 siov[0].iov_base = recvbuff + (received-8);
1011 siov[0].iov_len = 20 - (received-8);
1012 siov[1].iov_base = rreq->data + currentpos;
1013 siov[1].iov_len = recleng - 20;
1014 i = readv(fd,siov,2);
1015 #else
1016 i = read(fd,recvbuff + (received-8),20 - (received-8));
1017 #endif
1018 } else {
1019 i = read(fd,rreq->data + currentpos,recleng - (received-8));
1020 }
1021 }
1022 if (i==0) {
1023 read_prepare_ip(csstrip,ip);
1024 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") was reset by peer (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
1025 status = EIO;
1026 break;
1027 }
1028 if (i<0) {
1029 read_prepare_ip(csstrip,ip);
1030 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - readworker: connection with (%s:%"PRIu16") got error status (received: %"PRIu32"/%"PRIu32"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,currentpos,(rreq?rreq->rleng:0),id->trycnt+1);
1031 status = EIO;
1032 break;
1033 }
1034 if (received < 8+20) {
1035 if (received+i >= 8+20) {
1036 currentpos += i - ((8+20) - received);
1037 }
1038 } else {
1039 currentpos += i;
1040 }
1041 received += i;
1042 if (received > 8+recleng) {
1043 syslog(LOG_WARNING,"readworker: internal error - received more bytes than expected");
1044 status = EIO;
1045 break;
1046 } else if (received == 8+recleng) {
1047
1048 if (reccmd==CSTOCL_READ_STATUS) {
1049 rptr = recvbuff;
1050 recchunkid = get64bit(&rptr);
1051 recstatus = get8bit(&rptr);
1052 if (recchunkid != chunkid) {
1053 syslog(LOG_WARNING,"readworker: got unexpected status packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1054 status = EIO;
1055 break;
1056 }
1057 if (recstatus!=STATUS_OK) {
1058 syslog(LOG_WARNING,"readworker: read error: %s",mfsstrerr(recstatus));
1059 status = EIO;
1060 break;
1061 }
1062 if (currentpos != rreq->rleng) {
1063 syslog(LOG_WARNING,"readworker: unexpected data block size (requested: %"PRIu32" / received: %"PRIu32")",rreq->rleng,currentpos);
1064 status = EIO;
1065 break;
1066 }
1067 gotstatus = 1;
1068 } else if (reccmd==CSTOCL_READ_DATA) {
1069 rptr = recvbuff;
1070 recchunkid = get64bit(&rptr);
1071 recblocknum = get16bit(&rptr);
1072 recoffset = get16bit(&rptr);
1073 recsize = get32bit(&rptr);
1074 reccrc = get32bit(&rptr);
1075 (void)recoffset;
1076 (void)recblocknum;
1077 if (recchunkid != chunkid) {
1078 syslog(LOG_WARNING,"readworker: got unexpected data packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1079 status = EIO;
1080 break;
1081 }
1082 if (recsize+20 != recleng) {
1083 syslog(LOG_WARNING,"readworker: got malformed data packet (datasize: %"PRIu32",packetsize: %"PRIu32")",recsize,recleng);
1084 status = EIO;
1085 break;
1086 }
1087 if (reccrc != mycrc32(0,rreq->data + (currentpos - recsize),recsize)) {
1088 syslog(LOG_WARNING,"readworker: data checksum error");
1089 status = EIO;
1090 break;
1091 }
1092 }
1093 received = 0;
1094 }
1095 }
1096 }
1097 } while (1);
1098
1099 if (status==0 && csver>=VERSION2INT(1,7,32)) {
1100 conncache_insert(ip,port,fd);
1101 } else {
1102 tcpclose(fd);
1103 }
1104
1105 if (status==EINTR) {
1106 status=0;
1107 }
1108
1109 #ifdef WORKER_DEBUG
1110 now = monotonic_seconds();
1111 workingtime = now - start;
1112
1113 syslog(LOG_NOTICE,"worker %lu received data from chunk %016"PRIX64"_%08"PRIX32", bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s )",(unsigned long)arg,chunkid,version,(double)bytesreceived/workingtime,bytesreceived,workingtime);
1114 #endif
1115
1116 zassert(pthread_mutex_lock(&glock));
1117 if (rreq) { // block hasn't been read
1118 rreq->busy = 0;
1119 }
1120 if (status!=0) {
1121 id->trycnt++;
1122 if (id->trycnt>=maxretries) {
1123 zassert(pthread_mutex_unlock(&glock));
1124 read_job_end(id,status,0);
1125 } else {
1126 zassert(pthread_mutex_unlock(&glock));
1127 read_job_end(id,0,1+((id->trycnt<30)?(id->trycnt/3):10));
1128 }
1129 } else {
1130 id->laststatus = 1;
1131 zassert(pthread_mutex_unlock(&glock));
1132 read_job_end(id,0,0);
1133 }
1134 }
1135 return NULL;
1136 }
1137
1138 /* API | glock: INITIALIZED,UNLOCKED */
read_data_init(uint64_t readaheadsize,uint32_t readaheadleng,uint32_t readaheadtrigger,uint32_t retries)1139 void read_data_init (uint64_t readaheadsize,uint32_t readaheadleng,uint32_t readaheadtrigger,uint32_t retries) {
1140 uint32_t i;
1141 sigset_t oldset;
1142 sigset_t newset;
1143
1144 maxretries = retries;
1145 readahead = readaheadleng;
1146 readahead_trigger = readaheadtrigger;
1147 maxreadaheadsize = readaheadsize;
1148 reqbufftotalsize = 0;
1149
1150 zassert(pthread_mutex_init(&glock,NULL));
1151 zassert(pthread_cond_init(&worker_term_cond,NULL));
1152 worker_term_waiting = 0;
1153
1154 idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1155 passert(idhash);
1156 for (i=0 ; i<IDHASHSIZE ; i++) {
1157 idhash[i]=NULL;
1158 }
1159
1160 dqueue = queue_new(0);
1161 jqueue = queue_new(0);
1162
1163 zassert(pthread_attr_init(&worker_thattr));
1164 zassert(pthread_attr_setstacksize(&worker_thattr,0x100000));
1165 sigemptyset(&newset);
1166 sigaddset(&newset, SIGTERM);
1167 sigaddset(&newset, SIGINT);
1168 sigaddset(&newset, SIGHUP);
1169 sigaddset(&newset, SIGQUIT);
1170 pthread_sigmask(SIG_BLOCK, &newset, &oldset);
1171 zassert(pthread_create(&dqueue_worker_th,&worker_thattr,read_dqueue_worker,NULL));
1172 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
1173
1174 zassert(pthread_mutex_lock(&glock));
1175 workers_avail = 0;
1176 workers_total = 0;
1177 read_data_spawn_worker();
1178 zassert(pthread_mutex_unlock(&glock));
1179 // fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
1180
1181 //#ifdef BUFFER_DEBUG
1182 // pthread_create(&info_worker_th,&thattr,read_info_worker,NULL);
1183 //#endif
1184 // for (i=0 ; i<WORKERS ; i++) {
1185 // zassert(pthread_create(read_worker_th+i,&thattr,read_worker,(void*)(unsigned long)(i)));
1186 // }
1187 }
1188
read_data_term(void)1189 void read_data_term(void) {
1190 uint32_t i;
1191 inodedata *id,*idn;
1192
1193 queue_close(dqueue);
1194 queue_close(jqueue);
1195 zassert(pthread_mutex_lock(&glock));
1196 while (workers_total>0) {
1197 worker_term_waiting++;
1198 zassert(pthread_cond_wait(&worker_term_cond,&glock));
1199 }
1200 zassert(pthread_mutex_unlock(&glock));
1201 zassert(pthread_join(dqueue_worker_th,NULL));
1202 queue_delete(dqueue);
1203 queue_delete(jqueue);
1204 for (i=0 ; i<IDHASHSIZE ; i++) {
1205 for (id = idhash[i] ; id ; id = idn) {
1206 idn = id->next;
1207 zassert(pthread_cond_destroy(&(id->closecond)));
1208 close(id->pipe[0]);
1209 close(id->pipe[1]);
1210 free(id);
1211 }
1212 }
1213 free(idhash);
1214 // free(cacheblocks);
1215 // pthread_cond_destroy(&fcbcond);
1216 zassert(pthread_attr_destroy(&worker_thattr));
1217 zassert(pthread_cond_destroy(&worker_term_cond));
1218 zassert(pthread_mutex_destroy(&glock));
1219 }
1220
1221
1222
read_new_request(inodedata * id,uint64_t * offset,uint64_t blockend)1223 rrequest* read_new_request(inodedata *id,uint64_t *offset,uint64_t blockend) {
1224 uint64_t chunkoffset;
1225 uint64_t chunkend;
1226 uint32_t chunkleng;
1227 uint32_t chindx;
1228
1229 chunkoffset = *offset;
1230 chindx = chunkoffset>>MFSCHUNKBITS;
1231 chunkend = chindx;
1232 chunkend <<= MFSCHUNKBITS;
1233 chunkend += MFSCHUNKSIZE;
1234 if (blockend > chunkend) {
1235 chunkleng = chunkend - chunkoffset;
1236 *offset = chunkend;
1237 } else {
1238 chunkleng = blockend - (*offset);
1239 *offset = blockend;
1240 }
1241
1242 rrequest *rreq;
1243 rreq = malloc(sizeof(rrequest));
1244 passert(rreq);
1245 #ifdef RDEBUG
1246 fprintf(stderr,"%.6lf: inode: %"PRIu32" - new request: chindx: %"PRIu32" chunkoffset: %"PRIu64" chunkleng: %"PRIu32"\n",monotonic_seconds(),id->inode,chindx,chunkoffset,chunkleng);
1247 #endif
1248 rreq->modified = monotonic_seconds();
1249 rreq->offset = chunkoffset;
1250 rreq->leng = chunkleng;
1251 rreq->chindx = chindx;
1252 rreq->rleng = 0;
1253 rreq->filled = 0;
1254 rreq->refresh = 0;
1255 rreq->busy = 0;
1256 rreq->free = 0;
1257 rreq->lcnt = 0;
1258 rreq->data = malloc(chunkleng);
1259 passert(rreq->data);
1260 rreq->waiting = 0;
1261 zassert(pthread_cond_init(&(rreq->cond),NULL));
1262 if (id->inqueue==0) {
1263 read_enqueue(id);
1264 id->inqueue=1;
1265 }
1266 rreq->next = NULL;
1267 rreq->prev = id->reqtail;
1268 *(id->reqtail) = rreq;
1269 id->reqtail = &(rreq->next);
1270 reqbufftotalsize+=chunkleng;
1271 return rreq;
1272 }
1273
1274 typedef struct rlist_s {
1275 rrequest *rreq;
1276 uint64_t offsetadd;
1277 uint32_t reqleng;
1278 struct rlist_s *next;
1279 } rlist;
1280
1281 // return list of rreq
read_data(void * vid,uint64_t offset,uint32_t * size,void ** vrhead,struct iovec ** iov,uint32_t * iovcnt)1282 int read_data(void *vid, uint64_t offset, uint32_t *size, void **vrhead,struct iovec **iov,uint32_t *iovcnt) {
1283 inodedata *id = (inodedata*)vid;
1284 rrequest *rreq,*rreqn;
1285 rlist *rl,*rhead,**rtail;
1286 uint64_t firstbyte;
1287 uint64_t lastbyte;
1288 uint32_t cnt;
1289 uint8_t newrequests;
1290 int status;
1291 double now;
1292 zassert(pthread_mutex_lock(&glock));
1293
1294 *vrhead = NULL;
1295 *iov = NULL;
1296 *iovcnt = 0;
1297 cnt = 0;
1298
1299 #ifdef RDEBUG
1300 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" id->status: %d id->closewaiting: %"PRIu16"\n",monotonic_seconds(),id->inode,id->status,id->closewaiting);
1301 #endif
1302
1303 if (id->status==0 && id->closewaiting==0) {
1304 if (offset==id->lastoffset) {
1305 if (id->readahead==0) {
1306 if (id->seqdata>=readahead_trigger) {
1307 id->readahead = 1;
1308 }
1309 }
1310 } else {
1311 if (offset+(readahead/2) < id->lastoffset || id->lastoffset+(readahead/2) < offset) {
1312 id->readahead = 0;
1313 id->seqdata = 0;
1314 }
1315 }
1316 #ifdef RDEBUG
1317 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" seqdata: %"PRIu32" offset: %"PRIu64" id->lastoffset: %"PRIu64" id->readahead: %u reqbufftotalsize:%"PRIu64"\n",monotonic_seconds(),id->inode,id->seqdata,offset,id->lastoffset,id->readahead,reqbufftotalsize);
1318 #endif
1319 newrequests = 0;
1320
1321 // prepare requests
1322
1323 firstbyte = offset;
1324 lastbyte = offset + (*size);
1325 rhead = NULL;
1326 rtail = &rhead;
1327 rreq = id->reqhead;
1328 now = monotonic_seconds();
1329 while (rreq && lastbyte>firstbyte) {
1330 rreqn = rreq->next;
1331 #ifdef RDEBUG
1332 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" , rreq->modified:%.6lf , rreq->offset: %"PRIu64" , rreq->leng: %"PRIu32" , firstbyte: %"PRIu64" , lastbyte: %"PRIu64"\n",monotonic_seconds(),id->inode,rreq->modified,rreq->offset,rreq->leng,firstbyte,lastbyte);
1333 #endif
1334 if (rreq->modified+BUFFER_VALIDITY_TIMEOUT<now) { // buffer too old
1335 #ifdef RDEBUG
1336 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" data too old: free rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1337 #endif
1338 if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1339 *(rreq->prev) = rreq->next;
1340 if (rreq->next) {
1341 rreq->next->prev = rreq->prev;
1342 } else {
1343 id->reqtail = rreq->prev;
1344 }
1345 reqbufftotalsize -= rreq->leng;
1346 free(rreq->data);
1347 free(rreq);
1348 } else {
1349 rreq->free = 1; // somenody still using it, so mark it for removal
1350 }
1351 } else if (firstbyte < rreq->offset || firstbyte >= rreq->offset+rreq->leng) { // all not sequential read cases
1352 #ifdef RDEBUG
1353 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 0: free rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1354 #endif
1355 // rreq: |---------|
1356 // read: |--|
1357 // read: |-------|
1358 // read: |-------------------|
1359 // read: |--|
1360 if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1361 *(rreq->prev) = rreq->next;
1362 if (rreq->next) {
1363 rreq->next->prev = rreq->prev;
1364 } else {
1365 id->reqtail = rreq->prev;
1366 }
1367 reqbufftotalsize -= rreq->leng;
1368 free(rreq->data);
1369 free(rreq);
1370 } else {
1371 rreq->free = 1; // somenody still using it, so mark it for removal
1372 }
1373 } else if (lastbyte <= rreq->offset+rreq->leng) {
1374 #ifdef RDEBUG
1375 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 1: use rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1376 #endif
1377 // rreq: |---------|
1378 // read: |---|
1379 rl = malloc(sizeof(rlist));
1380 passert(rl);
1381 rl->rreq = rreq;
1382 rl->offsetadd = firstbyte - rreq->offset;
1383 rl->reqleng = (lastbyte - firstbyte) + rl->offsetadd;
1384 rl->next = NULL;
1385 *rtail = rl;
1386 rtail = &(rl->next);
1387 rreq->lcnt++;
1388 if (id->readahead && id->flengisvalid && reqbufftotalsize<maxreadaheadsize) {
1389 if (lastbyte > rreq->offset + (rreq->leng/5)) {
1390 // request next block of data
1391 if (rreq->next==NULL) {
1392 uint64_t blockstart,blockend;
1393 blockstart = rreq->offset+rreq->leng;
1394 blockend = blockstart+readahead;
1395 if (blockend<=id->fleng) {
1396 rreq->next = read_new_request(id,&blockstart,blockend);
1397 newrequests = 1;
1398 } else if (blockstart<id->fleng) {
1399 rreq->next = read_new_request(id,&blockstart,id->fleng);
1400 newrequests = 1;
1401 }
1402 }
1403 }
1404 }
1405 lastbyte = 0;
1406 firstbyte = 0;
1407 } else {
1408 #ifdef RDEBUG
1409 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" case 2: use tail of rreq (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; free:%u)\n",monotonic_seconds(),id->inode,rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->free);
1410 #endif
1411 // rreq: |---------|
1412 // read: |---|
1413 rl = malloc(sizeof(rlist));
1414 passert(rl);
1415 rl->rreq = rreq;
1416 rl->offsetadd = firstbyte - rreq->offset;
1417 rl->reqleng = rreq->leng;
1418 rl->next = NULL;
1419 *rtail = rl;
1420 rtail = &(rl->next);
1421 rreq->lcnt++;
1422 firstbyte = rreq->offset+rreq->leng;
1423 }
1424 rreq = rreqn;
1425 }
1426 while (lastbyte>firstbyte) {
1427 rreq = read_new_request(id,&firstbyte,lastbyte);
1428 rl = malloc(sizeof(rlist));
1429 passert(rl);
1430 rl->rreq = rreq;
1431 rl->offsetadd = 0;
1432 rl->reqleng = rreq->leng;
1433 rl->next = NULL;
1434 *rtail = rl;
1435 rtail = &(rl->next);
1436 rreq->lcnt++;
1437 if (lastbyte==firstbyte && id->readahead && id->flengisvalid && reqbufftotalsize<maxreadaheadsize) {
1438 if (lastbyte+readahead<=id->fleng) {
1439 (void)read_new_request(id,&firstbyte,lastbyte+readahead);
1440 } else if (lastbyte<id->fleng) {
1441 (void)read_new_request(id,&firstbyte,id->fleng);
1442 }
1443 }
1444 newrequests = 1;
1445 }
1446
1447 *vrhead = rhead;
1448
1449 #ifdef RDEBUG
1450 if (newrequests) {
1451 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" newrequest\n",monotonic_seconds(),id->inode);
1452 }
1453 #endif
1454 if (newrequests && id->waitingworker) {
1455 #ifdef RDEBUG
1456 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" wakeup readworker\n",monotonic_seconds(),id->inode);
1457 #endif
1458 if (write(id->pipe[1]," ",1)!=1) {
1459 syslog(LOG_ERR,"can't write to pipe !!!");
1460 }
1461 id->waitingworker=0;
1462 }
1463
1464 cnt = 0;
1465 *size = 0;
1466 for (rl = rhead ; rl ; rl=rl->next) {
1467 while (rl->rreq->filled==0 && id->status==0 && id->closewaiting==0) {
1468 rl->rreq->waiting++;
1469 #ifdef RDEBUG
1470 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" wait for data: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->leng);
1471 #endif
1472 zassert(pthread_cond_wait(&(rl->rreq->cond),&glock));
1473 rl->rreq->waiting--;
1474 }
1475 if (id->status==0) {
1476 #ifdef RDEBUG
1477 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" block %"PRIu64":%"PRIu32"(%"PRIu32") has been read\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->rleng,rl->rreq->leng);
1478 #endif
1479 if (rl->rreq->rleng < rl->rreq->leng) {
1480 if (rl->rreq->rleng > rl->offsetadd) {
1481 cnt++;
1482 if (rl->reqleng > rl->rreq->rleng) {
1483 rl->reqleng = rl->rreq->rleng;
1484 }
1485 *size += rl->reqleng - rl->offsetadd;
1486 }
1487 break; // end of file
1488 } else {
1489 cnt++;
1490 *size += rl->reqleng - rl->offsetadd;
1491 }
1492 } else {
1493 #ifdef RDEBUG
1494 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" error reading block: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),id->inode,rl->rreq->offset,rl->rreq->leng);
1495 #endif
1496 break;
1497 }
1498 #ifdef RDEBUG
1499 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" size: %"PRIu32" ; cnt: %u\n",monotonic_seconds(),id->inode,*size,cnt);
1500 #endif
1501 }
1502 }
1503
1504 if (id->status==0 && id->closewaiting==0 && cnt>0) {
1505 id->lastoffset = offset + (*size);
1506 if (id->readahead==0) {
1507 id->seqdata += (*size);
1508 }
1509 *iov = malloc(sizeof(struct iovec)*cnt);
1510 passert(*iov);
1511 cnt = 0;
1512 for (rl = rhead ; rl ; rl=rl->next) {
1513 if (rl->rreq->rleng < rl->rreq->leng) {
1514 if (rl->rreq->rleng > rl->offsetadd) {
1515 (*iov)[cnt].iov_base = rl->rreq->data + rl->offsetadd;
1516 (*iov)[cnt].iov_len = rl->reqleng - rl->offsetadd;
1517 cnt++;
1518 }
1519 break;
1520 } else {
1521 (*iov)[cnt].iov_base = rl->rreq->data + rl->offsetadd;
1522 (*iov)[cnt].iov_len = rl->reqleng - rl->offsetadd;
1523 cnt++;
1524 }
1525 }
1526 *iovcnt = cnt;
1527 } else {
1528 *iovcnt = 0;
1529 *iov = NULL;
1530 }
1531
1532 status = id->status;
1533
1534 #ifdef RDEBUG
1535 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" id->status: %d iovcnt: %"PRIu32" iovec: %p\n",monotonic_seconds(),id->inode,id->status,*iovcnt,(void*)(*iov));
1536 #endif
1537
1538 zassert(pthread_mutex_unlock(&glock));
1539 return status;
1540 }
1541
read_data_free_buff(void * vid,void * vrhead,struct iovec * iov)1542 void read_data_free_buff(void *vid,void *vrhead,struct iovec *iov) {
1543 inodedata *id = (inodedata*)vid;
1544 rlist *rl,*rln;
1545 rrequest *rreq;
1546 rl = (rlist*)vrhead;
1547 zassert(pthread_mutex_lock(&glock));
1548 #ifdef RDEBUG
1549 fprintf(stderr,"%.6lf: read_data: inode: %"PRIu32" inode_structure: %p vrhead: %p iovec: %p\n",monotonic_seconds(),id->inode,(void*)id,(void*)vrhead,(void*)iov);
1550 #endif
1551 while (rl) {
1552 rln = rl->next;
1553 rreq = rl->rreq;
1554 rreq->lcnt--;
1555 if (rreq->lcnt==0 && rreq->busy==0 && rreq->free) {
1556 *(rreq->prev) = rreq->next;
1557 if (rreq->next) {
1558 rreq->next->prev = rreq->prev;
1559 } else {
1560 id->reqtail = rreq->prev;
1561 }
1562 reqbufftotalsize -= rreq->leng;
1563 free(rreq->data);
1564 free(rreq);
1565 }
1566 free(rl);
1567 rl = rln;
1568 }
1569 if (id->reqhead==NULL && id->inqueue==0 && id->closewaiting>0) {
1570 zassert(pthread_cond_broadcast(&(id->closecond)));
1571 }
1572 if (iov) {
1573 free(iov);
1574 }
1575 zassert(pthread_mutex_unlock(&glock));
1576 }
1577
read_inode_dirty_region(uint32_t inode,uint64_t offset,uint32_t size,const char * buff)1578 void read_inode_dirty_region(uint32_t inode,uint64_t offset,uint32_t size,const char *buff) {
1579 uint32_t idh = IDHASH(inode);
1580 inodedata *id;
1581 rrequest *rreq,*rreqn;
1582 // int clearedbuff = 0;
1583
1584 #ifdef RDEBUG
1585 fprintf(stderr,"%.6lf: read_inode_dirty_region: inode: %"PRIu32" set dirty region: %"PRIu64":%"PRIu32"\n",monotonic_seconds(),inode,offset,size);
1586 #endif
1587 zassert(pthread_mutex_lock(&glock));
1588 for (id = idhash[idh] ; id ; id=id->next) {
1589 if (id->inode == inode) {
1590 for (rreq = id->reqhead ; rreq ; rreq=rreqn) {
1591 rreqn = rreq->next;
1592 #ifdef RDEBUG
1593 fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (before): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1594 #endif
1595 if (rreq->free==0 && ((rreq->offset < offset + size) && (rreq->offset + rreq->leng > offset))) {
1596 if (rreq->filled) { // already filled, exchange data
1597 if (rreq->offset > offset) {
1598 if (rreq->offset + rreq->leng > offset + size) {
1599 // rreq: |-------|
1600 // buff: |----|
1601 #ifdef RDEBUG
1602 fprintf(stderr,"%.6lf: read_inode_dirty_region: case 1: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1603 #endif
1604 memcpy(rreq->data,buff + (rreq->offset - offset),size - (rreq->offset - offset));
1605 if (size - (rreq->offset - offset) > rreq->rleng) {
1606 rreq->rleng = size - (rreq->offset - offset);
1607 }
1608 } else {
1609 // rreq: |-------|
1610 // buff: |-----------|
1611 #ifdef RDEBUG
1612 fprintf(stderr,"%.6lf: read_inode_dirty_region: case 2: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1613 #endif
1614 memcpy(rreq->data,buff + (rreq->offset - offset),rreq->leng);
1615 if (rreq->leng > rreq->rleng) {
1616 rreq->rleng = rreq->leng;
1617 }
1618 }
1619 } else {
1620 if (rreq->offset + rreq->leng > offset + size) {
1621 // rreq: |-------|
1622 // buff: |----|
1623 #ifdef RDEBUG
1624 fprintf(stderr,"%.6lf: read_inode_dirty_region: case 3: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1625 #endif
1626 memcpy(rreq->data + (offset - rreq->offset),buff,size);
1627 if ((offset - rreq->offset) > rreq->rleng) {
1628 memset(rreq->data + rreq->rleng,0,(offset - rreq->offset) - rreq->rleng);
1629 }
1630 if (size + (offset - rreq->offset) > rreq->rleng) {
1631 rreq->rleng = size + (offset - rreq->offset);
1632 }
1633 } else {
1634 // rreq: |-------|
1635 // buff: |--------|
1636 #ifdef RDEBUG
1637 fprintf(stderr,"%.6lf: read_inode_dirty_region: case 4: rreq (%"PRIu64":%"PRIu32") / buff (%"PRIu64":%"PRIu32")\n",monotonic_seconds(),rreq->offset,rreq->leng,offset,size);
1638 #endif
1639 memcpy(rreq->data+(offset-rreq->offset),buff,rreq->leng-(offset-rreq->offset));
1640 if ((offset - rreq->offset) > rreq->rleng) {
1641 memset(rreq->data + rreq->rleng,0,(offset - rreq->offset) - rreq->rleng);
1642 }
1643 if (rreq->leng > rreq->rleng) {
1644 rreq->rleng = rreq->leng;
1645 }
1646 }
1647 }
1648 } else if (rreq->busy) { // in progress, so refresh it
1649 #ifdef RDEBUG
1650 fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (%"PRIu64":%"PRIu32") : refresh\n",monotonic_seconds(),rreq->offset,rreq->leng);
1651 #endif
1652 rreq->refresh = 1;
1653 }
1654 }
1655 #ifdef RDEBUG
1656 if (rreq) {
1657 fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (after): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1658 } else {
1659 fprintf(stderr,"%.6lf: read_inode_dirty_region: rreq (after): NULL\n",monotonic_seconds());
1660 }
1661 #endif
1662 }
1663 if (id->flengisvalid && offset+size>id->fleng) {
1664 id->fleng = offset+size;
1665 }
1666 if (id->waitingworker) {
1667 if (write(id->pipe[1]," ",1)!=1) {
1668 syslog(LOG_ERR,"can't write to pipe !!!");
1669 }
1670 id->waitingworker=0;
1671 }
1672 }
1673 }
1674 zassert(pthread_mutex_unlock(&glock));
1675 }
1676
1677 // void read_inode_ops(uint32_t inode) {
read_inode_set_length(uint32_t inode,uint64_t newlength,uint8_t active)1678 void read_inode_set_length(uint32_t inode,uint64_t newlength,uint8_t active) {
1679 uint32_t idh = IDHASH(inode);
1680 inodedata *id;
1681 rrequest *rreq,*rreqn;
1682 int inqueue = 0;
1683
1684 #ifdef RDEBUG
1685 fprintf(stderr,"%.6lf: read_inode_set_length: inode: %"PRIu32" set length: %"PRIu64"\n",monotonic_seconds(),inode,newlength);
1686 #endif
1687 zassert(pthread_mutex_lock(&glock));
1688 for (id = idhash[idh] ; id ; id=id->next) {
1689 if (id->inode == inode) {
1690 for (rreq = id->reqhead ; rreq ; rreq=rreqn) {
1691 rreqn = rreq->next;
1692 #ifdef RDEBUG
1693 fprintf(stderr,"%.6lf: read_inode_set_length: rreq (before): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1694 #endif
1695 if (rreq->free==0) {
1696 if (rreq->filled) {
1697 if (active) {
1698 if (newlength < rreq->offset + rreq->rleng) {
1699 if (newlength < rreq->offset) {
1700 #ifdef RDEBUG
1701 fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 1: - set rleng to 0\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength);
1702 #endif
1703 rreq->rleng = 0;
1704 } else {
1705 #ifdef RDEBUG
1706 fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 2: - set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,(uint32_t)(newlength - rreq->offset));
1707 #endif
1708 rreq->rleng = newlength - rreq->offset;
1709 }
1710 } else if (newlength > rreq->offset + rreq->rleng) {
1711 if (newlength > rreq->offset + rreq->leng) {
1712 #ifdef RDEBUG
1713 fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 3: - clear data from rleng, set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,rreq->leng);
1714 #endif
1715 memset(rreq->data + rreq->rleng,0,rreq->leng - rreq->rleng);
1716 rreq->rleng = rreq->leng;
1717 } else {
1718 #ifdef RDEBUG
1719 fprintf(stderr,"%.6lf: read_inode_set_length: block is filled (%"PRIu64":%"PRIu32") / newlength: %"PRIu64", case 4: - clear data from rleng, set rleng to %"PRIu32"\n",monotonic_seconds(),rreq->offset,rreq->rleng,newlength,(uint32_t)(newlength - rreq->offset));
1720 #endif
1721 memset(rreq->data + rreq->rleng,0,newlength - (rreq->offset + rreq->rleng));
1722 rreq->rleng = newlength - rreq->offset;
1723 }
1724 }
1725 } else {
1726 if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1727 *(rreq->prev) = rreq->next;
1728 if (rreq->next) {
1729 rreq->next->prev = rreq->prev;
1730 } else {
1731 id->reqtail = rreq->prev;
1732 }
1733 reqbufftotalsize -= rreq->leng;
1734 free(rreq->data);
1735 free(rreq);
1736 rreq = NULL;
1737 } else { // somebody wants it, so clear it
1738 rreq->filled = 0;
1739 if (rreq->busy==0) { // not busy ?
1740 inqueue = 1; // add inode to queue
1741 }
1742 }
1743 }
1744 } else if (rreq->busy) {
1745 #ifdef RDEBUG
1746 fprintf(stderr,"%.6lf: read_inode_set_length: block is busy - refresh\n",monotonic_seconds());
1747 #endif
1748 rreq->refresh = 1;
1749 }
1750 }
1751 #if 0
1752 if (rreq->free==0) {
1753 if (id->flengisvalid==0) { // refresh everything
1754 if (rreq->filled) {
1755 #ifdef RDEBUG
1756 fprintf(stderr,"%.6lf: read_inode_set_length: old length unknown, block filled - clear it\n",monotonic_seconds());
1757 #endif
1758 if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1759 *(rreq->prev) = rreq->next;
1760 if (rreq->next) {
1761 rreq->next->prev = rreq->prev;
1762 } else {
1763 id->reqtail = rreq->prev;
1764 }
1765 reqbufftotalsize -= rreq->leng;
1766 free(rreq->data);
1767 free(rreq);
1768 rreq = NULL;
1769 } else { // somebody wants it, so clear it
1770 rreq->filled = 0;
1771 if (rreq->busy==0) { // not busy ?
1772 clearedbuff = 1; // add inode to queue
1773 }
1774 }
1775 } else if (rreq->busy) {
1776 #ifdef RDEBUG
1777 fprintf(stderr,"%.6lf: read_inode_set_length: old length unknown, block is busy - refresh block\n",monotonic_seconds());
1778 #endif
1779 rreq->refresh = 1;
1780 }
1781 } else if (id->fleng > newlength) { // file is shorter
1782 if (rreq->filled) {
1783 #ifdef RDEBUG
1784 fprintf(stderr,"%.6lf: read_inode_set_length: new length is smaller than previous, block filled - change rleng\n",monotonic_seconds());
1785 #endif
1786 if (newlength<=rreq->offset) {
1787 rreq->rleng = 0;
1788 } else if (newlength<rreq->offset+rreq->leng) {
1789 rreq->rleng = newlength - rreq->offset;
1790 }
1791 } else if (rreq->busy) {
1792 #ifdef RDEBUG
1793 fprintf(stderr,"%.6lf: read_inode_set_length: new length is smaller than previous, block is busy - refresh block\n",monotonic_seconds());
1794 #endif
1795 rreq->refresh = 1;
1796 }
1797 } else if (id->fleng < newlength) { // file is longer
1798 if (rreq->filled) {
1799 #ifdef RDEBUG
1800 fprintf(stderr,"%.6lf: read_inode_set_length: new length is larger than previous, block filled - clear buffer and change rleng\n",monotonic_seconds());
1801 #endif
1802 if (newlength >= rreq->offset + rreq->leng) {
1803 memset(rreq->data + rreq->rleng,0,rreq->leng - rreq->rleng);
1804 rreq->rleng = rreq->leng;
1805 } else if (newlength > rreq->offset + rreq->rleng) {
1806 memset(rreq->data + rreq->rleng,0,newlength - (rreq->offset + rreq->rleng));
1807 rreq->rleng = (newlength - rreq->offset);
1808 }
1809 } else if (rreq->busy) {
1810 #ifdef RDEBUG
1811 fprintf(stderr,"%.6lf: read_inode_set_length: new length is larger than previous, block is busy - refresh block\n",monotonic_seconds());
1812 #endif
1813 rreq->refresh = 1;
1814 }
1815 }
1816 }
1817 #endif
1818 /*
1819 if (rreq->free==0 && ((newlength < rreq->offset + rreq->leng) || (id->fleng < rreq->offset + rreq->leng))) {
1820 if (rreq->filled) {
1821 if (rreq->lcnt==0 && rreq->busy==0) { // nobody wants it anymore, so delete it
1822 *(rreq->prev) = rreq->next;
1823 if (rreq->next) {
1824 rreq->next->prev = rreq->prev;
1825 } else {
1826 id->reqtail = rreq->prev;
1827 }
1828 reqbufftotalsize -= rreq->leng;
1829 free(rreq->data);
1830 free(rreq);
1831 rreq = NULL;
1832 } else { // somebody wants it, so clear it
1833 rreq->filled = 0;
1834 if (rreq->busy==0) { // not busy ?
1835 clearedbuff = 1; // add inode to queue
1836 }
1837 }
1838 } else if (rreq->busy) {
1839 rreq->refresh = 1;
1840 }
1841 }
1842 */
1843 #ifdef RDEBUG
1844 if (rreq) {
1845 fprintf(stderr,"%.6lf: read_inode_set_length: rreq (after): (%"PRIu64":%"PRIu32" ; lcnt:%u ; busy:%u ; filled:%u ; free:%u)\n",monotonic_seconds(),rreq->offset,rreq->leng,rreq->lcnt,rreq->busy,rreq->filled,rreq->free);
1846 } else {
1847 fprintf(stderr,"%.6lf: read_inode_set_length: rreq (after): NULL\n",monotonic_seconds());
1848 }
1849 #endif
1850 }
1851 if (inqueue && id->inqueue==0) {
1852 read_enqueue(id);
1853 id->inqueue=1;
1854 }
1855 id->fleng = newlength;
1856 id->flengisvalid = 1;
1857 if (id->waitingworker) {
1858 if (write(id->pipe[1]," ",1)!=1) {
1859 syslog(LOG_ERR,"can't write to pipe !!!");
1860 }
1861 id->waitingworker=0;
1862 }
1863 }
1864 }
1865 zassert(pthread_mutex_unlock(&glock));
1866 }
1867
read_data_new(uint32_t inode)1868 void* read_data_new(uint32_t inode) {
1869 uint32_t idh = IDHASH(inode);
1870 inodedata *id;
1871 int pfd[2];
1872
1873 zassert(pthread_mutex_lock(&glock));
1874
1875 if (pipe(pfd)<0) {
1876 syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
1877 zassert(pthread_mutex_unlock(&glock));
1878 return NULL;
1879 }
1880
1881 id = malloc(sizeof(inodedata));
1882 passert(id);
1883 id->inode = inode;
1884 id->flengisvalid = 0;
1885 id->seqdata = 0;
1886 id->fleng = 0;
1887 id->status = 0;
1888 id->trycnt = 0;
1889 id->pipe[0] = pfd[0];
1890 id->pipe[1] = pfd[1];
1891 id->inqueue = 0;
1892 id->readahead = 0;
1893 id->lastoffset = 0;
1894 id->closewaiting = 0;
1895 id->waitingworker = 0;
1896 id->lastchunkid = 0;
1897 id->lastip = 0;
1898 id->lastport = 0;
1899 id->laststatus = 0;
1900 zassert(pthread_cond_init(&(id->closecond),NULL));
1901 id->reqhead = NULL;
1902 id->reqtail = &(id->reqhead);
1903 id->next = idhash[idh];
1904 idhash[idh] = id;
1905 #ifdef RDEBUG
1906 fprintf(stderr,"%.6lf: opening: %"PRIu32" ; inode_structure: %p\n",monotonic_seconds(),inode,(void*)id);
1907 // read_data_hexdump((uint8_t*)id,sizeof(inodedata));
1908 #endif
1909 zassert(pthread_mutex_unlock(&glock));
1910 return id;
1911 }
1912
read_data_end(void * vid)1913 void read_data_end(void *vid) {
1914 inodedata *id,**idp;
1915 rrequest *rreq,*rreqn;
1916 inodedata *rid = (inodedata*)vid;
1917 uint32_t idh = IDHASH(rid->inode);
1918
1919 #ifdef RDEBUG
1920 fprintf(stderr,"%.6lf: closing: %"PRIu32" ; inode_structure: %p\n",monotonic_seconds(),rid->inode,(void*)rid);
1921 // read_data_hexdump((uint8_t*)rid,sizeof(inodedata));
1922 #endif
1923 zassert(pthread_mutex_lock(&glock));
1924 #ifdef RDEBUG
1925 fprintf(stderr,"%.6lf: closing: %"PRIu32" ; cleaning req list\n",monotonic_seconds(),rid->inode);
1926 #endif
1927 for (rreq = rid->reqhead ; rreq ; rreq=rreqn) {
1928 rreqn = rreq->next;
1929 #ifdef RDEBUG
1930 fprintf(stderr,"%.6lf: closing: %"PRIu32" ; rreq: lcnt: %u ; busy: %u ; free: %u ; filled: %u\n",monotonic_seconds(),rid->inode,rreq->lcnt,rreq->busy,rreq->free,rreq->filled);
1931 #endif
1932 if (rreq->lcnt==0 && rreq->busy==0) {
1933 *(rreq->prev) = rreq->next;
1934 if (rreq->next) {
1935 rreq->next->prev = rreq->prev;
1936 } else {
1937 rid->reqtail = rreq->prev;
1938 }
1939 reqbufftotalsize -= rreq->leng;
1940 free(rreq->data);
1941 free(rreq);
1942 } else {
1943 rreq->free = 1;
1944 }
1945 }
1946 while (rid->reqhead!=NULL || rid->inqueue==1) {
1947 #ifdef RDEBUG
1948 fprintf(stderr,"%.6lf: closing: %"PRIu32" ; reqhead: %s ; inqueue: %u\n",monotonic_seconds(),rid->inode,rid->reqhead?"NOT NULL":"NULL",rid->inqueue);
1949 #endif
1950 rid->closewaiting++;
1951 if (rid->waitingworker) {
1952 if (write(rid->pipe[1]," ",1)!=1) {
1953 syslog(LOG_ERR,"can't write to pipe !!!");
1954 }
1955 rid->waitingworker=0;
1956 }
1957 #ifdef RDEBUG
1958 fprintf(stderr,"%.6lf: inode: %"PRIu32" ; waiting for close\n",monotonic_seconds(),rid->inode);
1959 #endif
1960 zassert(pthread_cond_wait(&(rid->closecond),&glock));
1961 rid->closewaiting--;
1962 }
1963 #ifdef RDEBUG
1964 fprintf(stderr,"%.6lf: closing: %"PRIu32" ; reqhead: %s ; inqueue: %u - delete structure\n",monotonic_seconds(),rid->inode,rid->reqhead?"NOT NULL":"NULL",rid->inqueue);
1965 #endif
1966 idp = &(idhash[idh]);
1967 while ((id=*idp)) {
1968 if (id==rid) {
1969 *idp = id->next;
1970 zassert(pthread_cond_destroy(&(id->closecond)));
1971 close(id->pipe[0]);
1972 close(id->pipe[1]);
1973 free(id);
1974 } else {
1975 idp = &(id->next);
1976 }
1977 }
1978 zassert(pthread_mutex_unlock(&glock));
1979 }
1980