1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <sys/types.h>
26 #ifdef HAVE_WRITEV
27 #include <sys/uio.h>
28 #endif
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <signal.h>
39 #include <pthread.h>
40 #include <inttypes.h>
41
42 #include "massert.h"
43 #include "datapack.h"
44 #include "crc.h"
45 #include "strerr.h"
46 #include "mfsstrerr.h"
47 #include "pcqueue.h"
48 #include "sockets.h"
49 #include "conncache.h"
50 #include "csdb.h"
51 #include "mastercomm.h"
52 #include "clocks.h"
53 #include "portable.h"
54 #include "readdata.h"
55 #include "MFSCommunication.h"
56
57 //#define WORKER_DEBUG 1
58 //#define BUFFER_DEBUG 1
59 //#define WDEBUG 1
60
61 #ifndef EDQUOT
62 #define EDQUOT ENOSPC
63 #endif
64
65 // for Nagle's-like algorithm
66 #define NEXT_BLOCK_DELAY 0.05
67
68 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
69
70 #define WORKER_IDLE_TIMEOUT 1.0
71
72 #define WORKER_BUSY_LAST_SEND_TIMEOUT 5.0
73 #define WORKER_BUSY_WAIT_FOR_STATUS 5.0
74 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
75
76 #define WORKER_NOP_INTERVAL 1.0
77
78 #define SUSTAIN_WORKERS 50
79 #define HEAVYLOAD_WORKERS 150
80 #define MAX_WORKERS 250
81
82 #define WCHASHSIZE 256
83 #define WCHASH(inode,indx) (((inode)*0xB239FB71+(indx)*193)%WCHASHSIZE)
84
85 #define IDHASHSIZE 256
86 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
87
88 typedef struct cblock_s {
89 uint8_t data[MFSBLOCKSIZE]; // modified only when writeid==0
90 uint32_t chindx; // chunk number
91 uint16_t pos; // block in chunk (0...1023) - never modified
92 uint32_t writeid; // 0 = not sent, >0 = block was sent (modified and accessed only when wchunk is locked)
93 uint32_t from; // first filled byte in data (modified only when writeid==0)
94 uint32_t to; // first not used byte in data (modified only when writeid==0)
95 struct cblock_s *next,*prev;
96 } cblock;
97
98 typedef struct inodedata_s {
99 uint32_t inode;
100 uint64_t maxfleng;
101 uint32_t cacheblockcount;
102 int status;
103 uint16_t flushwaiting;
104 uint16_t writewaiting;
105 uint16_t lcnt;
106 uint32_t trycnt;
107 uint8_t waitingworker;
108 uint8_t inqueue;
109 int pipe[2];
110 cblock *datachainhead,*datachaintail;
111 pthread_cond_t flushcond; // wait for inqueue==0 (flush)
112 pthread_cond_t writecond; // wait for flushwaiting==0 (write)
113 struct inodedata_s *next;
114 } inodedata;
115
116 typedef struct worker_s {
117 pthread_t thread_id;
118 } worker;
119
120 // static pthread_mutex_t fcblock;
121
122 static pthread_cond_t fcbcond;
123 static uint8_t fcbwaiting;
124 static cblock *cacheblocks,*freecblockshead;
125 static uint32_t freecacheblocks;
126
127 static uint32_t maxretries;
128
129 static inodedata **idhash;
130
131 static pthread_mutex_t glock;
132
133 #ifdef BUFFER_DEBUG
134 static pthread_t info_worker_th;
135 static uint32_t usedblocks;
136 #endif
137
138 static pthread_t dqueue_worker_th;
139
140 static uint32_t workers_avail;
141 static uint32_t workers_total;
142 static uint32_t worker_term_waiting;
143 static pthread_cond_t worker_term_cond;
144 static pthread_attr_t worker_thattr;
145
146 static void *jqueue,*dqueue;
147
148 #ifdef BUFFER_DEBUG
write_info_worker(void * arg)149 void* write_info_worker(void *arg) {
150 (void)arg;
151 uint32_t cbcnt,fcbcnt,ucbcnt;
152 uint32_t i;
153 inodedata *id;
154 cblock *cb;
155 for (;;) {
156 zassert(pthread_mutex_lock(&glock));
157 cbcnt = 0;
158 for (i = 0 ; i<IDHASHSIZE ; i++) {
159 for (id = idhash[i] ; id ; id = id->next) {
160 ucbcnt = 0;
161 for (cb = id->datachainhead ; cb ; cb = cb->next) {
162 ucbcnt++;
163 }
164 if (ucbcnt != id->cacheblockcount) {
165 syslog(LOG_NOTICE,"inode: %"PRIu32" ; wrong cache block count (%"PRIu32"/%"PRIu32")",id->inode,ucbcnt,id->cacheblockcount);
166 }
167 cbcnt += ucbcnt;
168 }
169 }
170 fcbcnt = 0;
171 for (cb = freecblockshead ; cb ; cb = cb->next) {
172 fcbcnt++;
173 }
174 syslog(LOG_NOTICE,"used cache blocks: %"PRIu32" ; sum of inode used blocks: %"PRIu32" ; free cache blocks: %"PRIu32" ; free cache chain blocks: %"PRIu32,usedblocks,cbcnt,freecacheblocks,fcbcnt);
175 zassert(pthread_mutex_unlock(&glock));
176 portable_usleep(500000);
177 }
178
179 }
180 #endif
181
182 /* glock: LOCKED */
write_cb_release(inodedata * id,cblock * cb)183 void write_cb_release (inodedata *id,cblock *cb) {
184 // zassert(pthread_mutex_lock(&fcblock));
185 cb->next = freecblockshead;
186 freecblockshead = cb;
187 freecacheblocks++;
188 id->cacheblockcount--;
189 if (fcbwaiting) {
190 zassert(pthread_cond_signal(&fcbcond));
191 }
192 #ifdef BUFFER_DEBUG
193 usedblocks--;
194 #endif
195 // zassert(pthread_mutex_unlock(&fcblock));
196 }
197
198 /* glock: LOCKED */
write_cb_acquire(inodedata * id)199 cblock* write_cb_acquire(inodedata *id) {
200 cblock *ret;
201 // zassert(pthread_mutex_lock(&fcblock));
202 fcbwaiting++;
203 while (freecblockshead==NULL || id->cacheblockcount>(freecacheblocks/3)) {
204 zassert(pthread_cond_wait(&fcbcond,&glock));
205 }
206 fcbwaiting--;
207 ret = freecblockshead;
208 freecblockshead = ret->next;
209 ret->chindx = 0;
210 ret->pos = 0;
211 ret->writeid = 0;
212 ret->from = 0;
213 ret->to = 0;
214 ret->next = NULL;
215 ret->prev = NULL;
216 freecacheblocks--;
217 id->cacheblockcount++;
218 #ifdef BUFFER_DEBUG
219 usedblocks++;
220 #endif
221 // zassert(pthread_mutex_unlock(&fcblock));
222 return ret;
223 }
224
225
226 /* inode */
227
228 /* glock: LOCKED */
write_find_inodedata(uint32_t inode)229 inodedata* write_find_inodedata(uint32_t inode) {
230 uint32_t idh = IDHASH(inode);
231 inodedata *id;
232 for (id=idhash[idh] ; id ; id=id->next) {
233 if (id->inode == inode) {
234 return id;
235 }
236 }
237 return NULL;
238 }
239
240 /* glock: LOCKED */
write_get_inodedata(uint32_t inode)241 inodedata* write_get_inodedata(uint32_t inode) {
242 uint32_t idh = IDHASH(inode);
243 inodedata *id;
244 int pfd[2];
245
246 for (id=idhash[idh] ; id ; id=id->next) {
247 if (id->inode == inode) {
248 return id;
249 }
250 }
251
252 if (pipe(pfd)<0) {
253 syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
254 return NULL;
255 }
256 id = malloc(sizeof(inodedata));
257 id->inode = inode;
258 id->cacheblockcount = 0;
259 id->maxfleng = 0;
260 id->status = 0;
261 id->trycnt = 0;
262 id->pipe[0] = pfd[0];
263 id->pipe[1] = pfd[1];
264 id->datachainhead = NULL;
265 id->datachaintail = NULL;
266 id->waitingworker = 0;
267 id->inqueue = 0;
268 id->flushwaiting = 0;
269 id->writewaiting = 0;
270 id->lcnt = 0;
271 zassert(pthread_cond_init(&(id->flushcond),NULL));
272 zassert(pthread_cond_init(&(id->writecond),NULL));
273 id->next = idhash[idh];
274 idhash[idh] = id;
275 return id;
276 }
277
278 /* glock: LOCKED */
write_free_inodedata(inodedata * fid)279 void write_free_inodedata(inodedata *fid) {
280 uint32_t idh = IDHASH(fid->inode);
281 inodedata *id,**idp;
282 idp = &(idhash[idh]);
283 while ((id=*idp)) {
284 if (id==fid) {
285 *idp = id->next;
286 zassert(pthread_cond_destroy(&(id->flushcond)));
287 zassert(pthread_cond_destroy(&(id->writecond)));
288 close(id->pipe[0]);
289 close(id->pipe[1]);
290 free(id);
291 return;
292 }
293 idp = &(id->next);
294 }
295 }
296
297
298 /* queues */
299
300 /* glock: UNUSED */
write_delayed_enqueue(inodedata * id,uint32_t cnt)301 void write_delayed_enqueue(inodedata *id,uint32_t cnt) {
302 uint64_t t;
303 if (cnt>0) {
304 t = monotonic_useconds();
305 queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
306 } else {
307 queue_put(jqueue,0,0,(uint8_t*)id,0);
308 }
309 }
310
311 /* glock: UNUSED */
write_enqueue(inodedata * id)312 void write_enqueue(inodedata *id) {
313 queue_put(jqueue,0,0,(uint8_t*)id,0);
314 }
315
316 /* worker thread | glock: UNUSED */
write_dqueue_worker(void * arg)317 void* write_dqueue_worker(void *arg) {
318 uint64_t t,usec;
319 uint32_t husec,lusec,cnt;
320 uint8_t *id;
321 (void)arg;
322 for (;;) {
323 queue_get(dqueue,&husec,&lusec,&id,&cnt);
324 if (id==NULL) {
325 return NULL;
326 }
327 t = monotonic_useconds();
328 usec = husec;
329 usec <<= 32;
330 usec |= lusec;
331 if (t>usec) {
332 t -= usec;
333 while (t>=1000000 && cnt>0) {
334 t-=1000000;
335 cnt--;
336 }
337 if (cnt>0) {
338 if (t<1000000) {
339 portable_usleep(1000000-t);
340 }
341 cnt--;
342 }
343 }
344 if (cnt>0) {
345 t = monotonic_useconds();
346 queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
347 } else {
348 queue_put(jqueue,0,0,id,0);
349 }
350 }
351 return NULL;
352 }
353
354 /* glock: UNLOCKED */
write_job_end(inodedata * id,int status,uint32_t delay)355 void write_job_end(inodedata *id,int status,uint32_t delay) {
356 cblock *cb,*fcb;
357
358 zassert(pthread_mutex_lock(&glock));
359 if (status) {
360 errno = status;
361 syslog(LOG_WARNING,"error writing file number %"PRIu32": %s",id->inode,strerr(errno));
362 id->status = status;
363 }
364 if (status==0 && delay==0) {
365 id->trycnt=0; // on good write reset try counter
366 }
367 status = id->status;
368
369 if (id->datachainhead && status==0) { // still have some work to do
370 // reset write id
371 for (cb=id->datachainhead ; cb ; cb=cb->next) {
372 cb->writeid = 0;
373 }
374 write_delayed_enqueue(id,delay);
375 } else { // no more work or error occurred
376 // if this is an error then release all data blocks
377 cb = id->datachainhead;
378 while (cb) {
379 fcb = cb;
380 cb = cb->next;
381 write_cb_release(id,fcb);
382 }
383 id->datachainhead=NULL;
384 id->datachaintail=NULL;
385 id->inqueue=0;
386
387 if (id->flushwaiting>0) {
388 zassert(pthread_cond_broadcast(&(id->flushcond)));
389 }
390 }
391 zassert(pthread_mutex_unlock(&glock));
392 }
393
394 void* write_worker(void *arg);
395
396 static uint32_t lastnotify = 0;
397
398 /* glock:LOCKED */
write_data_spawn_worker(void)399 static inline void write_data_spawn_worker(void) {
400 sigset_t oldset;
401 sigset_t newset;
402 worker *w;
403 int res;
404
405 w = malloc(sizeof(worker));
406 if (w==NULL) {
407 return;
408 }
409 sigemptyset(&newset);
410 sigaddset(&newset, SIGTERM);
411 sigaddset(&newset, SIGINT);
412 sigaddset(&newset, SIGHUP);
413 sigaddset(&newset, SIGQUIT);
414 zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
415 res = pthread_create(&(w->thread_id),&worker_thattr,write_worker,w);
416 zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
417 if (res<0) {
418 return;
419 }
420 workers_avail++;
421 workers_total++;
422 #ifdef WDEBUG
423 fprintf(stderr,"spawn write worker (total: %"PRIu32")\n",workers_total);
424 #else
425 if (workers_total%10==0 && workers_total!=lastnotify) {
426 syslog(LOG_INFO,"write workers: %"PRIu32"+\n",workers_total);
427 lastnotify = workers_total;
428 }
429 #endif
430 }
431
432 /* glock:LOCKED */
write_data_close_worker(worker * w)433 static inline void write_data_close_worker(worker *w) {
434 workers_avail--;
435 workers_total--;
436 if (workers_total==0 && worker_term_waiting) {
437 zassert(pthread_cond_signal(&worker_term_cond));
438 worker_term_waiting--;
439 }
440 pthread_detach(w->thread_id);
441 free(w);
442 #ifdef WDEBUG
443 fprintf(stderr,"close write worker (total: %"PRIu32")\n",workers_total);
444 #else
445 if (workers_total%10==0 && workers_total!=lastnotify) {
446 syslog(LOG_INFO,"write workers: %"PRIu32"-\n",workers_total);
447 lastnotify = workers_total;
448 }
449 #endif
450 }
451
write_prepare_ip(char ipstr[16],uint32_t ip)452 static inline void write_prepare_ip (char ipstr[16],uint32_t ip) {
453 if (ipstr[0]==0) {
454 snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
455 ipstr[15]=0;
456 }
457 }
458
459 /* main working thread | glock:UNLOCKED */
write_worker(void * arg)460 void* write_worker(void *arg) {
461 uint32_t z1,z2,z3;
462 uint8_t *data;
463 int fd;
464 int i;
465 struct pollfd pfd[2];
466 uint32_t sent,rcvd;
467 uint32_t hdrtosend;
468 uint8_t sending_mode;
469 uint8_t recvbuff[21];
470 uint8_t sendbuff[32];
471 #ifdef HAVE_WRITEV
472 struct iovec siov[2];
473 #endif
474 uint8_t pipebuff[1024];
475 uint8_t *wptr;
476 const uint8_t *rptr;
477
478 uint32_t reccmd;
479 uint32_t recleng;
480 uint64_t recchunkid;
481 uint32_t recwriteid;
482 uint8_t recstatus;
483
484 #ifdef WORKER_DEBUG
485 uint32_t partialblocks;
486 uint32_t bytessent;
487 char debugchain[200];
488 uint32_t cl;
489 #endif
490
491 const uint8_t *cp,*cpe;
492 uint8_t *cpw;
493 uint32_t chainip[100];
494 uint16_t chainport[100];
495 // uint32_t chainver[100];
496 uint32_t chainminver;
497 uint16_t chainelements;
498 uint32_t tmpip;
499 uint16_t tmpport;
500 uint32_t tmpver;
501 uint8_t cschain[6*99];
502 uint32_t cschainsize;
503
504 uint32_t chindx;
505 uint32_t ip;
506 uint16_t port;
507 uint32_t srcip;
508 uint64_t mfleng;
509 uint64_t maxwroffset;
510 uint64_t chunkid;
511 uint32_t version;
512 uint32_t nextwriteid;
513 const uint8_t *csdata;
514 uint32_t csdatasize;
515 uint8_t csdataver;
516 uint8_t westatus;
517 uint8_t wrstatus;
518 int status;
519 char csstrip[16];
520 uint8_t waitforstatus;
521 uint8_t flushwaiting;
522 uint8_t endofchunk;
523 double start,now,lastrcvd,lastblock,lastsent;
524 double workingtime,lrdiff,lbdiff;
525 uint8_t cnt;
526 uint8_t firsttime = 1;
527 worker *w = (worker*)arg;
528
529 inodedata *id;
530 cblock *cb,*ncb,*rcb;
531 // inodedata *id;
532
533 chainelements = 0;
534 chindx = 0;
535
536 for (;;) {
537 for (cnt=0 ; cnt<chainelements ; cnt++) {
538 csdb_writedec(chainip[cnt],chainport[cnt]);
539 }
540 chainelements=0;
541
542 if (firsttime==0) {
543 zassert(pthread_mutex_lock(&glock));
544 workers_avail++;
545 if (workers_avail > SUSTAIN_WORKERS) {
546 // fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
547 write_data_close_worker(w);
548 zassert(pthread_mutex_unlock(&glock));
549 return NULL;
550 }
551 zassert(pthread_mutex_unlock(&glock));
552 }
553 firsttime = 0;
554
555 // get next job
556 queue_get(jqueue,&z1,&z2,&data,&z3);
557
558 zassert(pthread_mutex_lock(&glock));
559
560 if (data==NULL) {
561 write_data_close_worker(w);
562 zassert(pthread_mutex_unlock(&glock));
563 return NULL;
564 }
565
566 workers_avail--;
567 if (workers_avail==0 && workers_total<MAX_WORKERS) {
568 write_data_spawn_worker();
569 // fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
570 }
571
572 id = (inodedata*)data;
573
574 if (id->status==0) {
575 if (id->datachainhead) {
576 chindx = id->datachainhead->chindx;
577 status = id->status;
578 } else {
579 syslog(LOG_WARNING,"writeworker got inode with no data to write !!!");
580 status = EINVAL; // this should never happen, so status is not important - just anything
581 }
582 } else {
583 status = id->status;
584 }
585
586 zassert(pthread_mutex_unlock(&glock));
587
588 if (status) {
589 write_job_end(id,status,0);
590 continue;
591 }
592
593 // syslog(LOG_NOTICE,"file: %"PRIu32", index: %"PRIu16" - debug1",id->inode,chindx);
594 // get chunk data from master
595 // start = monotonic_seconds();
596 wrstatus = fs_writechunk(id->inode,chindx,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
597 if (wrstatus!=STATUS_OK) {
598 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",id->inode,chindx,mfsstrerr(wrstatus));
599 if (wrstatus!=ERROR_LOCKED) {
600 if (wrstatus==ERROR_ENOENT) {
601 write_job_end(id,EBADF,0);
602 } else if (wrstatus==ERROR_QUOTA) {
603 write_job_end(id,EDQUOT,0);
604 } else if (wrstatus==ERROR_NOSPACE) {
605 write_job_end(id,ENOSPC,0);
606 } else if (wrstatus==ERROR_CHUNKLOST) {
607 write_job_end(id,ENXIO,0);
608 } else {
609 id->trycnt++;
610 if (id->trycnt>=maxretries) {
611 if (wrstatus==ERROR_NOCHUNKSERVERS) {
612 write_job_end(id,ENOSPC,0);
613 } else if (wrstatus==ERROR_CSNOTPRESENT) {
614 write_job_end(id,ENXIO,0);
615 } else {
616 write_job_end(id,EIO,0);
617 }
618 } else {
619 write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
620 }
621 }
622 } else {
623 write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
624 }
625 continue; // get next job
626 }
627 // now = monotonic_seconds();
628 // fprintf(stderr,"fs_writechunk time: %.3lf\n",(now-start));
629
630 if (csdata==NULL || csdatasize==0) {
631 fs_writeend(chunkid,id->inode,0);
632 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies",id->inode,chindx,chunkid,version);
633 id->trycnt+=6;
634 if (id->trycnt>=maxretries) {
635 write_job_end(id,ENXIO,0);
636 } else {
637 write_delayed_enqueue(id,60);
638 }
639 continue;
640 }
641 ip = 0; // make old compilers happy
642 port = 0; // make old compilers happy
643 csstrip[0] = 0;
644 cp = csdata;
645 cpe = csdata+csdatasize;
646 chainminver = 0xFFFFFFFF;
647 cpw = cschain;
648 cschainsize = 0;
649 while (cp<cpe && chainelements<100) {
650 tmpip = get32bit(&cp);
651 tmpport = get16bit(&cp);
652 if (csdataver==0) {
653 tmpver = 0;
654 } else {
655 tmpver = get32bit(&cp);
656 }
657 chainip[chainelements] = tmpip;
658 chainport[chainelements] = tmpport;
659 csdb_writeinc(tmpip,tmpport);
660 if (tmpver<chainminver) {
661 chainminver = tmpver;
662 }
663 if (chainelements==0) {
664 ip = tmpip;
665 port = tmpport;
666 } else {
667 put32bit(&cpw,tmpip);
668 put16bit(&cpw,tmpport);
669 cschainsize += 6;
670 }
671 chainelements++;
672 }
673
674 if (cp<cpe) {
675 fs_writeend(chunkid,id->inode,0);
676 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are too many copies",id->inode,chindx,chunkid,version);
677 id->trycnt+=6;
678 if (id->trycnt>=maxretries) {
679 write_job_end(id,ENXIO,0);
680 } else {
681 write_delayed_enqueue(id,60);
682 }
683 continue;
684 }
685
686 // chain = csdata;
687 // ip = get32bit(&chain);
688 // port = get16bit(&chain);
689 // chainsize = csdatasize-6;
690
691 start = monotonic_seconds();
692
693 /*
694 if (csdatasize>CSDATARESERVE) {
695 csdatasize = CSDATARESERVE;
696 }
697 memcpy(wrec->csdata,csdata,csdatasize);
698 wrec->csdatasize=csdatasize;
699 while (csdatasize>=6) {
700 tmpip = get32bit(&csdata);
701 tmpport = get16bit(&csdata);
702 csdatasize-=6;
703 csdb_writeinc(tmpip,tmpport);
704 }
705 */
706
707 // make connection to cs
708 srcip = fs_getsrcip();
709 fd = conncache_get(ip,port);
710 if (fd<0) {
711 cnt=0;
712 while (cnt<10) {
713 fd = tcpsocket();
714 if (fd<0) {
715 syslog(LOG_WARNING,"writeworker: can't create tcp socket: %s",strerr(errno));
716 break;
717 }
718 if (srcip) {
719 if (tcpnumbind(fd,srcip,0)<0) {
720 syslog(LOG_WARNING,"writeworker: can't bind socket to given ip: %s",strerr(errno));
721 tcpclose(fd);
722 fd=-1;
723 break;
724 }
725 }
726 if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
727 cnt++;
728 if (cnt>=10) {
729 write_prepare_ip(csstrip,ip);
730 syslog(LOG_WARNING,"writeworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(errno));
731 }
732 close(fd);
733 fd=-1;
734 } else {
735 uint32_t mip,pip;
736 uint16_t mport,pport;
737 tcpgetpeer(fd,&pip,&pport);
738 tcpgetmyaddr(fd,&mip,&mport);
739 #ifdef WDEBUG
740 fprintf(stderr,"connection ok (%"PRIX32":%"PRIu16"->%"PRIX32":%"PRIu16")\n",mip,mport,pip,pport);
741 #endif
742 cnt=10;
743 }
744 }
745 }
746 if (fd<0) {
747 fs_writeend(chunkid,id->inode,0);
748 id->trycnt++;
749 if (id->trycnt>=maxretries) {
750 write_job_end(id,EIO,0);
751 } else {
752 write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
753 }
754 continue;
755 }
756 if (tcpnodelay(fd)<0) {
757 syslog(LOG_WARNING,"writeworker: can't set TCP_NODELAY: %s",strerr(errno));
758 }
759
760 #ifdef WORKER_DEBUG
761 partialblocks=0;
762 bytessent=0;
763 #endif
764 nextwriteid=1;
765
766 pfd[0].fd = fd;
767 pfd[1].fd = id->pipe[0];
768 rcvd = 0;
769 sent = 0;
770 waitforstatus=1;
771 wptr = sendbuff;
772
773 put32bit(&wptr,CLTOCS_WRITE);
774 if (chainminver>=VERSION2INT(1,7,32)) {
775 put32bit(&wptr,13+cschainsize);
776 put8bit(&wptr,1);
777 hdrtosend = 21;
778 } else {
779 put32bit(&wptr,12+cschainsize);
780 hdrtosend = 20;
781 }
782
783 put64bit(&wptr,chunkid);
784 put32bit(&wptr,version);
785 sending_mode = 1;
786 // debug: syslog(LOG_NOTICE,"writeworker: init packet prepared");
787 cb = NULL;
788 endofchunk = 0;
789
790 status = 0;
791 wrstatus = STATUS_OK;
792
793 lastrcvd = 0.0;
794 lastsent = 0.0;
795 lastblock = 0.0;
796
797 flushwaiting = 0;
798 // firstloop = 1;
799
800 do {
801 now = monotonic_seconds();
802 zassert(pthread_mutex_lock(&glock));
803
804 if (lastrcvd==0.0) {
805 lastrcvd = now;
806 } else {
807 lrdiff = now - lastrcvd;
808 if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
809 write_prepare_ip(csstrip,ip);
810 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was timed out (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
811 zassert(pthread_mutex_unlock(&glock));
812 break;
813 }
814 }
815 if (lastblock==0.0) {
816 lbdiff = NEXT_BLOCK_DELAY; // first block should be send immediately
817 } else {
818 lbdiff = now - lastblock;
819 }
820 workingtime = now - start;
821
822 // if (!((waitforstatus>0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) || (waitforstatus==0 && lbdiff<WORKER_IDLE_TIMEOUT && flushwaiting==0 && (workers_total<=HEAVYLOAD_WORKERS) && endofchunk==0))) {
823 // zassert(pthread_mutex_unlock(&glock));
824 // break;
825 // }
826 // }
827
828 id->waitingworker=1;
829
830 if (sending_mode==0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT) && waitforstatus<64) {
831 if (cb==NULL) {
832 ncb = id->datachainhead;
833 } else {
834 ncb = cb->next;
835 }
836 if (ncb) {
837 if (ncb->chindx==chindx) {
838 if (ncb->to-ncb->from==MFSBLOCKSIZE || lbdiff>=NEXT_BLOCK_DELAY || ncb->next!=NULL || id->flushwaiting) {
839 cb = ncb;
840 sending_mode = 2;
841 } else {
842 id->waitingworker=2; // wait for block expand
843 }
844 } else {
845 endofchunk=1;
846 }
847 }
848 /*
849 if (cb==NULL) {
850 if (id->datachainhead) {
851 if (id->datachainhead->chindx==chindx) {
852 if (id->datachainhead->to-id->datachainhead->from==MFSBLOCKSIZE || id->datachainhead->next!=NULL || id->flushwaiting) {
853 cb = id->datachainhead;
854 havedata=1;
855 }
856 } else {
857 endofchunk=1;
858 }
859 } else {
860 id->waitingworker=1;
861 }
862 } else {
863 if (cb->next) {
864 if (cb->next->chindx==chindx) {
865 if (cb->next->to-cb->next->from==MFSBLOCKSIZE || lbdiff>NEXT_BLOCK_DELAY || cb->next->next!=NULL || id->flushwaiting) {
866 cb = cb->next;
867 havedata=1;
868 } else {
869 id->waitingworker=2;
870 }
871 } else {
872 endofchunk=1;
873 }
874 } else {
875 id->waitingworker=1;
876 }
877 }
878 */
879 if (sending_mode==2) {
880 cb->writeid = nextwriteid++;
881 // debug: syslog(LOG_NOTICE,"writeworker: data packet prepared (writeid:%"PRIu32",pos:%"PRIu16")",cb->writeid,cb->pos);
882 waitforstatus++;
883 wptr = sendbuff;
884 put32bit(&wptr,CLTOCS_WRITE_DATA);
885 put32bit(&wptr,24+(cb->to-cb->from));
886 put64bit(&wptr,chunkid);
887 put32bit(&wptr,cb->writeid);
888 put16bit(&wptr,cb->pos);
889 put16bit(&wptr,cb->from);
890 put32bit(&wptr,cb->to-cb->from);
891 put32bit(&wptr,mycrc32(0,cb->data+cb->from,cb->to-cb->from));
892 #ifdef WORKER_DEBUG
893 if (cb->to-cb->from<MFSBLOCKSIZE) {
894 partialblocks++;
895 }
896 bytessent+=(cb->to-cb->from);
897 #endif
898 sent = 0;
899 lastblock = now;
900 lastsent = now;
901 } else if (lastsent+WORKER_NOP_INTERVAL<now && chainminver>=VERSION2INT(1,7,32)) {
902 wptr = sendbuff;
903 put32bit(&wptr,ANTOAN_NOP);
904 put32bit(&wptr,0);
905 sent = 0;
906 sending_mode = 3;
907 }
908 }
909
910 #ifdef WORKER_DEBUG
911 fprintf(stderr,"workerloop: waitforstatus:%u workingtime:%.6lf workers_total:%u lbdiff:%.6lf flushwaiting:%u endofchunk:%u\n",waitforstatus,workingtime,workers_total,lbdiff,flushwaiting,endofchunk);
912 #endif
913 if (waitforstatus>0) {
914 if (workingtime>WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) { // timeout
915 id->waitingworker=0;
916 zassert(pthread_mutex_unlock(&glock));
917 break;
918 }
919 } else {
920 if (lbdiff>=WORKER_IDLE_TIMEOUT || flushwaiting || workers_total>HEAVYLOAD_WORKERS || endofchunk) {
921 id->waitingworker=0;
922 zassert(pthread_mutex_unlock(&glock));
923 break;
924 }
925 }
926
927
928 zassert(pthread_mutex_unlock(&glock));
929
930 switch (sending_mode) {
931 case 1:
932 if (sent<hdrtosend) {
933 #ifdef HAVE_WRITEV
934 if (cschainsize>0) {
935 siov[0].iov_base = (void*)(sendbuff+sent);
936 siov[0].iov_len = hdrtosend-sent;
937 siov[1].iov_base = (void*)cschain; // discard const (safe - because it's used in writev)
938 siov[1].iov_len = cschainsize;
939 i = writev(fd,siov,2);
940 } else {
941 #endif
942 i = write(fd,sendbuff+sent,hdrtosend-sent);
943 #ifdef HAVE_WRITEV
944 }
945 #endif
946 } else {
947 i = write(fd,cschain+(sent-hdrtosend),cschainsize-(sent-hdrtosend));
948 }
949 if (i>=0) {
950 sent+=i;
951 if (sent==hdrtosend+cschainsize) {
952 sending_mode = 0;
953 }
954 }
955 break;
956 case 2:
957 if (sent<32) {
958 #ifdef HAVE_WRITEV
959 siov[0].iov_base = (void*)(sendbuff+sent);
960 siov[0].iov_len = 32-sent;
961 siov[1].iov_base = (void*)(cb->data+cb->from);
962 siov[1].iov_len = cb->to-cb->from;
963 i = writev(fd,siov,2);
964 #else
965 i = write(fd,sendbuff+sent,32-sent);
966 #endif
967 } else {
968 i = write(fd,cb->data+cb->from+(sent-32),cb->to-cb->from-(sent-32));
969 }
970 if (i>=0) {
971 sent+=i;
972 if (sent==32+cb->to-cb->from) {
973 sending_mode = 0;
974 }
975 }
976 break;
977 case 3:
978 i = write(fd,sendbuff+sent,8-sent);
979 if (i>=0) {
980 sent+=i;
981 if (sent==8) {
982 sending_mode = 0;
983 }
984 }
985 break;
986 default:
987 i=0;
988 }
989
990 if (i<0) {
991 if (ERRNO_ERROR && errno!=EINTR) {
992 write_prepare_ip(csstrip,ip);
993 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: write to (%s:%"PRIu16") error: %s / NEGWRITE (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,id->trycnt+1);
994 status=EIO;
995 break;
996 }
997 }
998
999 pfd[0].events = POLLIN | (sending_mode?POLLOUT:0);
1000 pfd[0].revents = 0;
1001 pfd[1].events = POLLIN;
1002 pfd[1].revents = 0;
1003 if (poll(pfd,2,100)<0) { /* correct timeout - in msec */
1004 if (errno!=EINTR) {
1005 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: poll error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),waitforstatus,id->trycnt+1);
1006 status=EIO;
1007 break;
1008 }
1009 }
1010 zassert(pthread_mutex_lock(&glock)); // make helgrind happy
1011 id->waitingworker=0;
1012 flushwaiting = (id->flushwaiting>0)?1:0;
1013 zassert(pthread_mutex_unlock(&glock)); // make helgrind happy
1014 if (pfd[1].revents&POLLIN) { // used just to break poll - so just read all data from pipe to empty it
1015 i = read(id->pipe[0],pipebuff,1024);
1016 if (i<0) { // mainly to make happy static code analyzers
1017 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: read pipe error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),waitforstatus,id->trycnt+1);
1018 }
1019 }
1020 if (pfd[0].revents&POLLHUP) {
1021 write_prepare_ip(csstrip,ip);
1022 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1023 status=EIO;
1024 break;
1025 }
1026 if (pfd[0].revents&POLLERR) {
1027 write_prepare_ip(csstrip,ip);
1028 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") got error status / POLLERR (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1029 status=EIO;
1030 break;
1031 }
1032 if (pfd[0].revents&POLLIN) {
1033 i = read(fd,recvbuff+rcvd,21-rcvd);
1034 if (i==0) { // connection reset by peer or read error
1035 write_prepare_ip(csstrip,ip);
1036 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1037 status=EIO;
1038 break;
1039 }
1040 if (i<0) {
1041 if (errno!=EINTR) {
1042 write_prepare_ip(csstrip,ip);
1043 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: read from (%s:%"PRIu16") error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,id->trycnt+1);
1044 status=EIO;
1045 break;
1046 } else {
1047 i=0;
1048 }
1049 }
1050 lastrcvd = monotonic_seconds();
1051 rcvd+=i;
1052 // do not accept ANTOAN_UNKNOWN_COMMAND and ANTOAN_BAD_COMMAND_SIZE here - only ANTOAN_NOP
1053 if (rcvd>=8 && recvbuff[7]==0 && recvbuff[6]==0 && recvbuff[5]==0 && recvbuff[4]==0 && recvbuff[3]==0 && recvbuff[2]==0 && recvbuff[1]==0 && recvbuff[0]==0) { // ANTOAN_NOP packet received - skip it
1054 if (rcvd>8) {
1055 memmove(recvbuff,recvbuff+8,rcvd-8);
1056 rcvd-=8;
1057 }
1058 }
1059 if (rcvd==21) {
1060 rptr = recvbuff;
1061 reccmd = get32bit(&rptr);
1062 recleng = get32bit(&rptr);
1063 recchunkid = get64bit(&rptr);
1064 recwriteid = get32bit(&rptr);
1065 recstatus = get8bit(&rptr);
1066 if (reccmd!=CSTOCL_WRITE_STATUS || recleng!=13) {
1067 syslog(LOG_WARNING,"writeworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32")",reccmd,recleng);
1068 status=EIO;
1069 break;
1070 }
1071 if (recchunkid!=chunkid) {
1072 syslog(LOG_WARNING,"writeworker: got unexpected packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1073 status=EIO;
1074 break;
1075 }
1076 if (recstatus!=STATUS_OK) {
1077 syslog(LOG_WARNING,"writeworker: write error: %s",mfsstrerr(recstatus));
1078 wrstatus=recstatus;
1079 break;
1080 }
1081 // debug: syslog(LOG_NOTICE,"writeworker: received status ok for writeid:%"PRIu32,recwriteid);
1082 if (recwriteid>0) {
1083 zassert(pthread_mutex_lock(&glock));
1084 for (rcb = id->datachainhead ; rcb && rcb->writeid!=recwriteid ; rcb=rcb->next) {}
1085 if (rcb==NULL) {
1086 syslog(LOG_WARNING,"writeworker: got unexpected status (writeid:%"PRIu32")",recwriteid);
1087 zassert(pthread_mutex_unlock(&glock));
1088 status=EIO;
1089 break;
1090 }
1091 if (rcb==cb) { // current block
1092 // debug: syslog(LOG_NOTICE,"writeworker: received status for current block");
1093 if (sending_mode==2) { // got status ok before all data had been sent - error
1094 syslog(LOG_WARNING,"writeworker: got status OK before all data have been sent");
1095 zassert(pthread_mutex_unlock(&glock));
1096 status=EIO;
1097 break;
1098 } else {
1099 cb = NULL;
1100 }
1101 }
1102 if (rcb->prev) {
1103 rcb->prev->next = rcb->next;
1104 } else {
1105 id->datachainhead = rcb->next;
1106 }
1107 if (rcb->next) {
1108 rcb->next->prev = rcb->prev;
1109 } else {
1110 id->datachaintail = rcb->prev;
1111 }
1112 maxwroffset = (((uint64_t)(chindx))<<MFSCHUNKBITS)+(((uint32_t)(rcb->pos))<<MFSBLOCKBITS)+rcb->to;
1113 if (maxwroffset>mfleng) {
1114 mfleng=maxwroffset;
1115 }
1116 write_cb_release(id,rcb);
1117 zassert(pthread_mutex_unlock(&glock));
1118 }
1119 waitforstatus--;
1120 rcvd=0;
1121 }
1122 }
1123 } while (1);
1124
1125 if (waitforstatus==0 && chainminver>=VERSION2INT(1,7,32)) {
1126 wptr = sendbuff;
1127 put32bit(&wptr,CLTOCS_WRITE_FINISH);
1128 put32bit(&wptr,12);
1129 put64bit(&wptr,chunkid);
1130 put32bit(&wptr,version);
1131 if (write(fd,sendbuff,20)==20) {
1132 conncache_insert(ip,port,fd);
1133 } else {
1134 tcpclose(fd);
1135 }
1136 } else {
1137 tcpclose(fd);
1138 }
1139
1140 #ifdef WORKER_DEBUG
1141 now = monotonic_seconds();
1142 workingtime = now - start;
1143
1144 cl=0;
1145 for (cnt=0 ; cnt<chainelements ; cnt++) {
1146 cl+=snprintf(debugchain+cl,200-cl,"%u.%u.%u.%u:%u->",(chainip[cnt]>>24)&255,(chainip[cnt]>>16)&255,(chainip[cnt]>>8)&255,chainip[cnt]&255,chainport[cnt]);
1147 }
1148 if (cl>=2) {
1149 debugchain[cl-2]='\0';
1150 }
1151 syslog(LOG_NOTICE,"worker %lu sent %"PRIu32" blocks (%"PRIu32" partial) of chunk %016"PRIX64"_%08"PRIX32", received status for %"PRIu32" blocks (%"PRIu32" lost), bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s ), chain: %s",(unsigned long)arg,nextwriteid-1,partialblocks,chunkid,version,nextwriteid-1-waitforstatus,waitforstatus,(double)bytessent/workingtime,bytessent,workingtime,debugchain);
1152 #endif
1153
1154 for (cnt=0 ; cnt<10 ; cnt++) {
1155 westatus = fs_writeend(chunkid,id->inode,mfleng);
1156 if (westatus==ERROR_ENOENT || westatus==ERROR_QUOTA) {
1157 break;
1158 } else if (westatus!=STATUS_OK) {
1159 portable_usleep(100000+(10000<<cnt));
1160 } else {
1161 break;
1162 }
1163 }
1164
1165 if (westatus==ERROR_ENOENT) {
1166 write_job_end(id,EBADF,0);
1167 } else if (westatus==ERROR_QUOTA) {
1168 write_job_end(id,EDQUOT,0);
1169 } else if (westatus!=STATUS_OK) {
1170 write_job_end(id,ENXIO,0);
1171 } else if (status!=0 || wrstatus!=STATUS_OK) {
1172 if (wrstatus!=STATUS_OK) { // convert MFS status to OS errno
1173 if (wrstatus==ERROR_NOSPACE) {
1174 status=ENOSPC;
1175 } else {
1176 status=EIO;
1177 }
1178 }
1179 id->trycnt++;
1180 if (id->trycnt>=maxretries) {
1181 write_job_end(id,status,0);
1182 } else {
1183 write_job_end(id,0,1+((id->trycnt<30)?(id->trycnt/3):10));
1184 }
1185 } else {
1186 // read_inode_ops(id->inode);
1187 read_inode_set_length(id->inode,mfleng,0);
1188 write_job_end(id,0,0);
1189 }
1190 }
1191 }
1192
1193 /* API | glock: INITIALIZED,UNLOCKED */
write_data_init(uint32_t cachesize,uint32_t retries)1194 void write_data_init (uint32_t cachesize,uint32_t retries) {
1195 uint32_t cacheblockcount = (cachesize/MFSBLOCKSIZE);
1196 uint32_t i;
1197 sigset_t oldset;
1198 sigset_t newset;
1199
1200 maxretries = retries;
1201 if (cacheblockcount<10) {
1202 cacheblockcount=10;
1203 }
1204 zassert(pthread_mutex_init(&glock,NULL));
1205 zassert(pthread_cond_init(&worker_term_cond,NULL));
1206 worker_term_waiting = 0;
1207
1208 zassert(pthread_cond_init(&fcbcond,NULL));
1209 fcbwaiting=0;
1210 cacheblocks = malloc(sizeof(cblock)*cacheblockcount);
1211 for (i=0 ; i<cacheblockcount-1 ; i++) {
1212 cacheblocks[i].next = cacheblocks+(i+1);
1213 }
1214 cacheblocks[cacheblockcount-1].next = NULL;
1215 freecblockshead = cacheblocks;
1216 freecacheblocks = cacheblockcount;
1217
1218 idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1219 for (i=0 ; i<IDHASHSIZE ; i++) {
1220 idhash[i]=NULL;
1221 }
1222
1223 dqueue = queue_new(0);
1224 jqueue = queue_new(0);
1225
1226 zassert(pthread_attr_init(&worker_thattr));
1227 zassert(pthread_attr_setstacksize(&worker_thattr,0x100000));
1228 sigemptyset(&newset);
1229 sigaddset(&newset, SIGTERM);
1230 sigaddset(&newset, SIGINT);
1231 sigaddset(&newset, SIGHUP);
1232 sigaddset(&newset, SIGQUIT);
1233 zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
1234 zassert(pthread_create(&dqueue_worker_th,&worker_thattr,write_dqueue_worker,NULL));
1235 zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
1236
1237 zassert(pthread_mutex_lock(&glock));
1238 workers_avail = 0;
1239 workers_total = 0;
1240 write_data_spawn_worker();
1241 zassert(pthread_mutex_unlock(&glock));
1242 #ifdef BUFFER_DEBUG
1243 zassert(pthread_create(&info_worker_th,&worker_thattr,write_info_worker,NULL));
1244 #endif
1245 }
1246
write_data_term(void)1247 void write_data_term(void) {
1248 uint32_t i;
1249 inodedata *id,*idn;
1250
1251 queue_close(dqueue);
1252 queue_close(jqueue);
1253 zassert(pthread_mutex_lock(&glock));
1254 while (workers_total>0) {
1255 worker_term_waiting++;
1256 zassert(pthread_cond_wait(&worker_term_cond,&glock));
1257 }
1258 zassert(pthread_mutex_unlock(&glock));
1259 zassert(pthread_join(dqueue_worker_th,NULL));
1260 queue_delete(dqueue);
1261 queue_delete(jqueue);
1262 for (i=0 ; i<IDHASHSIZE ; i++) {
1263 for (id = idhash[i] ; id ; id = idn) {
1264 idn = id->next;
1265 zassert(pthread_cond_destroy(&(id->flushcond)));
1266 zassert(pthread_cond_destroy(&(id->writecond)));
1267 close(id->pipe[0]);
1268 close(id->pipe[1]);
1269 free(id);
1270 }
1271 }
1272 free(idhash);
1273 free(cacheblocks);
1274 zassert(pthread_attr_destroy(&worker_thattr));
1275 zassert(pthread_cond_destroy(&worker_term_cond));
1276 zassert(pthread_cond_destroy(&fcbcond));
1277 zassert(pthread_mutex_destroy(&glock));
1278 }
1279
1280 /* glock: LOCKED */
write_cb_expand(inodedata * id,cblock * cb,uint32_t from,uint32_t to,const uint8_t * data)1281 int write_cb_expand(inodedata *id,cblock *cb,uint32_t from,uint32_t to,const uint8_t *data) {
1282 if (cb->writeid>0 || from>cb->to || to<cb->from) { // can't expand
1283 return -1;
1284 }
1285 memcpy(cb->data+from,data,to-from);
1286 if (from<cb->from) {
1287 cb->from = from;
1288 }
1289 if (to>cb->to) {
1290 cb->to = to;
1291 }
1292 if (cb->to-cb->from==MFSBLOCKSIZE && cb->next==NULL && id->waitingworker==2) {
1293 if (write(id->pipe[1]," ",1)!=1) {
1294 syslog(LOG_ERR,"can't write to pipe !!!");
1295 }
1296 id->waitingworker=0;
1297 }
1298 return 0;
1299 }
1300
1301 /* glock: UNLOCKED */
write_block(inodedata * id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data)1302 int write_block(inodedata *id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t *data) {
1303 cblock *cb;
1304
1305 zassert(pthread_mutex_lock(&glock));
1306 for (cb=id->datachaintail ; cb ; cb=cb->prev) {
1307 if (cb->pos==pos && cb->chindx==chindx) {
1308 if (write_cb_expand(id,cb,from,to,data)==0) {
1309 zassert(pthread_mutex_unlock(&glock));
1310 return 0;
1311 } else {
1312 break;
1313 }
1314 }
1315 }
1316
1317 cb = write_cb_acquire(id);
1318 // syslog(LOG_NOTICE,"write_block: acquired new cache block");
1319 cb->chindx = chindx;
1320 cb->pos = pos;
1321 cb->from = from;
1322 cb->to = to;
1323 memcpy(cb->data+from,data,to-from);
1324 cb->prev = id->datachaintail;
1325 cb->next = NULL;
1326 if (id->datachaintail!=NULL) {
1327 id->datachaintail->next = cb;
1328 } else {
1329 id->datachainhead = cb;
1330 }
1331 id->datachaintail = cb;
1332 if (id->inqueue) {
1333 if (id->waitingworker) {
1334 if (write(id->pipe[1]," ",1)!=1) {
1335 syslog(LOG_ERR,"can't write to pipe !!!");
1336 }
1337 id->waitingworker=0;
1338 }
1339 } else {
1340 id->inqueue=1;
1341 write_enqueue(id);
1342 }
1343 zassert(pthread_mutex_unlock(&glock));
1344 // zassert(pthread_mutex_unlock(&(wc->lock)));
1345 return 0;
1346 }
1347
1348 /* API | glock: UNLOCKED */
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data)1349 int write_data(void *vid,uint64_t offset,uint32_t size,const uint8_t *data) {
1350 uint32_t chindx;
1351 uint16_t pos;
1352 uint32_t from;
1353 int status;
1354 inodedata *id = (inodedata*)vid;
1355 if (id==NULL) {
1356 return EIO;
1357 }
1358 // int64_t s,e;
1359
1360 // s = monotonic_useconds();
1361 zassert(pthread_mutex_lock(&glock));
1362 // syslog(LOG_NOTICE,"write_data: inode:%"PRIu32" offset:%"PRIu64" size:%"PRIu32,id->inode,offset,size);
1363 // id = write_get_inodedata(inode);
1364 status = id->status;
1365 if (status==0) {
1366 if (offset+size>id->maxfleng) { // move fleng
1367 id->maxfleng = offset+size;
1368 }
1369 id->writewaiting++;
1370 while (id->flushwaiting>0) {
1371 zassert(pthread_cond_wait(&(id->writecond),&glock));
1372 }
1373 id->writewaiting--;
1374 }
1375 zassert(pthread_mutex_unlock(&glock));
1376 if (status!=0) {
1377 return status;
1378 }
1379
1380 chindx = offset>>MFSCHUNKBITS;
1381 pos = (offset&MFSCHUNKMASK)>>MFSBLOCKBITS;
1382 from = offset&MFSBLOCKMASK;
1383 while (size>0) {
1384 if (size>MFSBLOCKSIZE-from) {
1385 if (write_block(id,chindx,pos,from,MFSBLOCKSIZE,data)<0) {
1386 return EIO;
1387 }
1388 size -= (MFSBLOCKSIZE-from);
1389 data += (MFSBLOCKSIZE-from);
1390 from = 0;
1391 pos++;
1392 if (pos==1024) {
1393 pos = 0;
1394 chindx++;
1395 }
1396 } else {
1397 if (write_block(id,chindx,pos,from,from+size,data)<0) {
1398 return EIO;
1399 }
1400 size = 0;
1401 }
1402 }
1403 // e = monotonic_useconds();
1404 // syslog(LOG_NOTICE,"write_data time: %"PRId64,e-s);
1405 return 0;
1406 }
1407
1408 /* API | glock: UNLOCKED */
write_data_new(uint32_t inode)1409 void* write_data_new(uint32_t inode) {
1410 inodedata* id;
1411 zassert(pthread_mutex_lock(&glock));
1412 id = write_get_inodedata(inode);
1413 if (id==NULL) {
1414 zassert(pthread_mutex_unlock(&glock));
1415 return NULL;
1416 }
1417 id->lcnt++;
1418 // zassert(pthread_mutex_unlock(&(id->lock)));
1419 zassert(pthread_mutex_unlock(&glock));
1420 return id;
1421 }
1422
1423 /* common flush routine | glock: LOCKED */
write_data_do_flush(inodedata * id,uint8_t releaseflag)1424 static int write_data_do_flush(inodedata *id,uint8_t releaseflag) {
1425 int ret;
1426 // int64_t s,e;
1427
1428 // s = monotonic_useconds();
1429 id->flushwaiting++;
1430 while (id->inqueue) {
1431 if (id->waitingworker) {
1432 if (write(id->pipe[1]," ",1)!=1) {
1433 syslog(LOG_ERR,"can't write to pipe !!!");
1434 }
1435 id->waitingworker=0;
1436 }
1437 // syslog(LOG_NOTICE,"flush: wait ...");
1438 zassert(pthread_cond_wait(&(id->flushcond),&glock));
1439 // syslog(LOG_NOTICE,"flush: woken up");
1440 }
1441 id->flushwaiting--;
1442 if (id->flushwaiting==0 && id->writewaiting>0) {
1443 zassert(pthread_cond_broadcast(&(id->writecond)));
1444 }
1445 ret = id->status;
1446 if (releaseflag) {
1447 id->lcnt--;
1448 }
1449 if (id->lcnt==0 && id->inqueue==0 && id->flushwaiting==0 && id->writewaiting==0) {
1450 write_free_inodedata(id);
1451 }
1452 // e = monotonic_useconds();
1453 // syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1454 return ret;
1455 }
1456
1457 /* API | glock: UNLOCKED */
write_data_flush(void * vid)1458 int write_data_flush(void *vid) {
1459 int ret;
1460 if (vid==NULL) {
1461 return EIO;
1462 }
1463 zassert(pthread_mutex_lock(&glock));
1464 ret = write_data_do_flush((inodedata*)vid,0);
1465 zassert(pthread_mutex_unlock(&glock));
1466 return ret;
1467 }
1468
1469 /* API | glock: UNLOCKED */
write_data_setmaxfleng(uint32_t inode,uint64_t maxfleng)1470 void write_data_setmaxfleng(uint32_t inode,uint64_t maxfleng) {
1471 inodedata* id;
1472 zassert(pthread_mutex_lock(&glock));
1473 id = write_find_inodedata(inode);
1474 if (id) {
1475 id->maxfleng = maxfleng;
1476 }
1477 zassert(pthread_mutex_unlock(&glock));
1478 }
1479
1480 /* API | glock: UNLOCKED */
write_data_getmaxfleng(uint32_t inode)1481 uint64_t write_data_getmaxfleng(uint32_t inode) {
1482 uint64_t maxfleng;
1483 inodedata* id;
1484 zassert(pthread_mutex_lock(&glock));
1485 id = write_find_inodedata(inode);
1486 if (id) {
1487 maxfleng = id->maxfleng;
1488 } else {
1489 maxfleng = 0;
1490 }
1491 zassert(pthread_mutex_unlock(&glock));
1492 return maxfleng;
1493 }
1494
1495 /* API | glock: UNLOCKED */
write_data_flush_inode(uint32_t inode)1496 int write_data_flush_inode(uint32_t inode) {
1497 inodedata* id;
1498 int ret;
1499 zassert(pthread_mutex_lock(&glock));
1500 id = write_find_inodedata(inode);
1501 if (id==NULL) {
1502 zassert(pthread_mutex_unlock(&glock));
1503 return 0;
1504 }
1505 ret = write_data_do_flush(id,0);
1506 zassert(pthread_mutex_unlock(&glock));
1507 return ret;
1508 }
1509
1510 /* API | glock: UNLOCKED */
write_data_end(void * vid)1511 int write_data_end(void *vid) {
1512 int ret;
1513 if (vid==NULL) {
1514 return EIO;
1515 }
1516 zassert(pthread_mutex_lock(&glock));
1517 ret = write_data_do_flush(vid,1);
1518 zassert(pthread_mutex_unlock(&glock));
1519 return ret;
1520 }
1521