1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <sys/types.h>
26 #ifdef HAVE_WRITEV
27 #include <sys/uio.h>
28 #endif
29 #include <sys/time.h>
30 #include <unistd.h>
31 #ifndef WIN32
32 #include <poll.h>
33 #include <syslog.h>
34 #endif
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <limits.h>
40 #include <signal.h>
41 #include <pthread.h>
42 #include <inttypes.h>
43
44 #include "massert.h"
45 #include "datapack.h"
46 #include "crc.h"
47 #include "strerr.h"
48 #include "mfsstrerr.h"
49 #include "pcqueue.h"
50 #include "sockets.h"
51 #include "conncache.h"
52 #include "csorder.h"
53 #include "csdb.h"
54 #include "delayrun.h"
55 #include "mastercomm.h"
56 #include "clocks.h"
57 #include "portable.h"
58 #include "readdata.h"
59 #include "chunkrwlock.h"
60 #include "chunksdatacache.h"
61 #include "MFSCommunication.h"
62 #ifdef MFSMOUNT
63 #include "fdcache.h"
64 #endif
65
66 // #define WORKER_DEBUG 1
67 // #define BUFFER_DEBUG 1
68 // #define WDEBUG 1
69
70 #ifndef EDQUOT
71 #define EDQUOT ENOSPC
72 #endif
73
74 // for Nagle's-like algorithm
75 #define NEXT_BLOCK_DELAY 0.05
76
77 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
78
79 #define WORKER_IDLE_TIMEOUT 0.1
80
81 #define WORKER_BUSY_LAST_SEND_TIMEOUT 5.0
82 #define WORKER_BUSY_WAIT_FOR_STATUS 5.0
83 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
84
85 #define WORKER_NOP_INTERVAL 1.0
86
87 #define MAX_SIM_CHUNKS 16
88
89 #define SUSTAIN_WORKERS 50
90 #define HEAVYLOAD_WORKERS 150
91 #define MAX_WORKERS 250
92
93 #define WCHASHSIZE 256
94 #define WCHASH(inode,indx) (((inode)*0xB239FB71+(indx)*193)%WCHASHSIZE)
95
96 #define IDHASHSIZE 256
97 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
98
99 typedef struct cblock_s {
100 uint8_t data[MFSBLOCKSIZE]; // modified only when writeid==0
101 uint16_t pos; // block in chunk (0...1023) - never modified
102 uint32_t writeid; // 0 = not sent, >0 = block was sent (modified and accessed only when wchunk is locked)
103 uint32_t from; // first filled byte in data (modified only when writeid==0)
104 uint32_t to; // first not used byte in data (modified only when writeid==0)
105 struct cblock_s *next,*prev;
106 } cblock;
107
108 struct inodedata_s;
109
110 typedef struct chunkdata_s {
111 uint32_t chindx;
112 uint16_t trycnt;
113 uint8_t waitingworker;
114 uint8_t chunkready;
115 uint8_t unbreakable;
116 uint8_t continueop;
117 uint8_t superuser;
118 int wakeup_fd;
119 cblock *datachainhead,*datachaintail;
120 struct inodedata_s *parent;
121 struct chunkdata_s *next,**prev;
122 } chunkdata;
123
124 typedef struct inodedata_s {
125 uint32_t inode;
126 uint64_t maxfleng;
127 uint32_t cacheblockcount;
128 int status;
129 uint16_t flushwaiting;
130 uint16_t writewaiting;
131 uint16_t chunkwaiting;
132 uint16_t lcnt;
133 // uint16_t trycnt;
134 uint16_t chunkscnt;
135 chunkdata *chunks,**chunkstail;
136 chunkdata *chunksnext;
137 pthread_cond_t flushcond; // wait for chunks==NULL (flush)
138 pthread_cond_t writecond; // wait for flushwaiting==0 (write)
139 pthread_cond_t chunkcond; // wait for status!=0 or all chunks 'chunkready==1'
140 pthread_mutex_t lock;
141 struct inodedata_s *next;
142 } inodedata;
143
144 typedef struct worker_s {
145 pthread_t thread_id;
146 } worker;
147
148 static pthread_mutex_t fcblock;
149 static pthread_cond_t fcbcond;
150 static uint16_t fcbwaiting;
151 static cblock *cacheblocks,*freecblockshead;
152 static uint32_t freecacheblocks;
153 static uint32_t cacheblockcount;
154
155 static double optimeout;
156 static uint32_t maxretries;
157 static uint32_t minlogretry;
158 static uint8_t erroronlostchunk;
159 static uint8_t erroronnospace;
160
161 static inodedata **idhash;
162
163 static pthread_mutex_t hashlock;
164
165 #ifdef BUFFER_DEBUG
166 static pthread_t info_worker_th;
167 static uint32_t usedblocks;
168 #endif
169
170 // static pthread_t dqueue_worker_th;
171
172 static pthread_mutex_t workerslock;
173 static uint32_t workers_avail;
174 static uint32_t workers_total;
175 static uint32_t worker_term_waiting;
176 static pthread_cond_t worker_term_cond;
177 static pthread_attr_t worker_thattr;
178
179 static void *jqueue; //,*dqueue;
180
181 #ifdef BUFFER_DEBUG
write_info_worker(void * arg)182 void* write_info_worker(void *arg) {
183 (void)arg;
184 uint32_t cbcnt,fcbcnt,ucbcnt;
185 uint32_t i;
186 inodedata *ind;
187 chunkdata *chd;
188 cblock *cb;
189 for (;;) {
190 zassert(pthread_mutex_lock(&hashlock));
191 cbcnt = 0;
192 for (i = 0 ; i<IDHASHSIZE ; i++) {
193 for (ind = idhash[i] ; ind ; ind = ind->next) {
194 zassert(pthread_mutex_lock(&(ind->lock)));
195 ucbcnt = 0;
196 for (chd = ind->chunks ; chd!=NULL ; chd = chd->next) {
197 for (cb = chd->datachainhead ; cb ; cb = cb->next) {
198 ucbcnt++;
199 }
200 }
201 if (ucbcnt != ind->cacheblockcount) {
202 syslog(LOG_NOTICE,"inode: %"PRIu32" ; wrong cache block count (%"PRIu32"/%"PRIu32")",ind->inode,ucbcnt,ind->cacheblockcount);
203 }
204 cbcnt += ucbcnt;
205 zassert(pthread_mutex_unlock(&(ind->lock)));
206 }
207 }
208 zassert(pthread_mutex_unlock(&hashlock));
209 zassert(pthread_mutex_lock(&fcblock));
210 fcbcnt = 0;
211 for (cb = freecblockshead ; cb ; cb = cb->next) {
212 fcbcnt++;
213 }
214 syslog(LOG_NOTICE,"used cache blocks: %"PRIu32" ; sum of inode used blocks: %"PRIu32" ; free cache blocks: %"PRIu32" ; free cache chain blocks: %"PRIu32,usedblocks,cbcnt,freecacheblocks,fcbcnt);
215 zassert(pthread_mutex_unlock(&fcblock));
216 portable_usleep(500000);
217 }
218
219 }
220 #endif
221
write_cb_release(inodedata * ind,cblock * cb)222 void write_cb_release (inodedata *ind,cblock *cb) {
223 zassert(pthread_mutex_lock(&fcblock));
224 cb->next = freecblockshead;
225 freecblockshead = cb;
226 freecacheblocks++;
227 ind->cacheblockcount--;
228 if (fcbwaiting) {
229 zassert(pthread_cond_signal(&fcbcond));
230 }
231 #ifdef BUFFER_DEBUG
232 usedblocks--;
233 #endif
234 zassert(pthread_mutex_unlock(&fcblock));
235 }
236
write_cb_acquire(inodedata * ind)237 cblock* write_cb_acquire(inodedata *ind/*,uint8_t *waited*/) {
238 cblock *ret;
239 zassert(pthread_mutex_lock(&fcblock));
240 fcbwaiting++;
241 // *waited=0;
242 while (freecblockshead==NULL/* || ind->cacheblockcount>(freecacheblocks/3)*/) {
243 zassert(pthread_cond_wait(&fcbcond,&fcblock));
244 // *waited=1;
245 }
246 fcbwaiting--;
247 ret = freecblockshead;
248 freecblockshead = ret->next;
249 ret->pos = 0;
250 ret->writeid = 0;
251 ret->from = 0;
252 ret->to = 0;
253 ret->next = NULL;
254 ret->prev = NULL;
255 freecacheblocks--;
256 ind->cacheblockcount++;
257 #ifdef BUFFER_DEBUG
258 usedblocks++;
259 #endif
260 zassert(pthread_mutex_unlock(&fcblock));
261 return ret;
262 }
263
write_cache_almost_full(void)264 uint8_t write_cache_almost_full(void) {
265 uint8_t r;
266 zassert(pthread_mutex_lock(&fcblock));
267 r = (freecacheblocks < (cacheblockcount / 3))?1:0;
268 zassert(pthread_mutex_unlock(&fcblock));
269 return r;
270 }
271
272 /* inode */
273
write_find_inodedata(uint32_t inode)274 inodedata* write_find_inodedata(uint32_t inode) {
275 uint32_t indh = IDHASH(inode);
276 inodedata *ind;
277 zassert(pthread_mutex_lock(&hashlock));
278 for (ind=idhash[indh] ; ind ; ind=ind->next) {
279 if (ind->inode == inode) {
280 ind->lcnt++;
281 zassert(pthread_mutex_unlock(&hashlock));
282 return ind;
283 }
284 }
285 zassert(pthread_mutex_unlock(&hashlock));
286 return NULL;
287 }
288
write_get_inodedata(uint32_t inode,uint64_t fleng)289 inodedata* write_get_inodedata(uint32_t inode,uint64_t fleng) {
290 uint32_t indh = IDHASH(inode);
291 inodedata *ind;
292 // int pfd[2];
293
294 zassert(pthread_mutex_lock(&hashlock));
295 for (ind=idhash[indh] ; ind ; ind=ind->next) {
296 if (ind->inode == inode) {
297 ind->lcnt++;
298 zassert(pthread_mutex_unlock(&hashlock));
299 return ind;
300 }
301 }
302
303 ind = malloc(sizeof(inodedata));
304 passert(ind);
305 ind->inode = inode;
306 ind->cacheblockcount = 0;
307 ind->maxfleng = fleng;
308 ind->status = 0;
309 // ind->trycnt = 0;
310 ind->chunkscnt = 0;
311 ind->chunks = NULL;
312 ind->chunksnext = NULL;
313 ind->chunkstail = &(ind->chunks);
314 ind->flushwaiting = 0;
315 ind->chunkwaiting = 0;
316 ind->writewaiting = 0;
317 ind->lcnt = 1;
318 zassert(pthread_cond_init(&(ind->flushcond),NULL));
319 zassert(pthread_cond_init(&(ind->writecond),NULL));
320 zassert(pthread_cond_init(&(ind->chunkcond),NULL));
321 zassert(pthread_mutex_init(&(ind->lock),NULL));
322 ind->next = idhash[indh];
323 idhash[indh] = ind;
324 zassert(pthread_mutex_unlock(&hashlock));
325 return ind;
326 }
327
write_free_inodedata(inodedata * fid)328 void write_free_inodedata(inodedata *fid) {
329 uint32_t indh = IDHASH(fid->inode);
330 inodedata *ind,**indp;
331 zassert(pthread_mutex_lock(&hashlock));
332 indp = &(idhash[indh]);
333 while ((ind=*indp)) {
334 if (ind==fid) {
335 ind->lcnt--;
336 if (ind->lcnt==0) {
337 *indp = ind->next;
338 zassert(pthread_mutex_lock(&(ind->lock)));
339 massert(ind->chunkscnt==0 && ind->flushwaiting==0 && ind->writewaiting==0,"inode structure not clean");
340 zassert(pthread_mutex_unlock(&(ind->lock)));
341 zassert(pthread_cond_destroy(&(ind->flushcond)));
342 zassert(pthread_cond_destroy(&(ind->writecond)));
343 zassert(pthread_cond_destroy(&(ind->chunkcond)));
344 zassert(pthread_mutex_destroy(&(ind->lock)));
345 free(ind);
346 }
347 zassert(pthread_mutex_unlock(&hashlock));
348 return;
349 }
350 indp = &(ind->next);
351 }
352 zassert(pthread_mutex_unlock(&hashlock));
353 }
354
355 void write_enqueue(chunkdata *chd);
356
write_test_chunkdata(inodedata * ind)357 void write_test_chunkdata(inodedata *ind) {
358 chunkdata *chd;
359
360 if (ind->chunkscnt<MAX_SIM_CHUNKS) {
361 if (ind->chunksnext!=NULL) {
362 chd = ind->chunksnext;
363 ind->chunksnext = chd->next;
364 ind->chunkscnt++;
365 write_enqueue(chd);
366 }
367 } else {
368 for (chd=ind->chunks ; chd!=NULL ; chd=chd->next) {
369 if (chd->waitingworker) {
370 if (universal_write(chd->wakeup_fd," ",1)!=1) {
371 syslog(LOG_ERR,"can't write to pipe !!!");
372 }
373 chd->waitingworker = 0;
374 chd->wakeup_fd = -1;
375 }
376 }
377 }
378 }
379
write_new_chunkdata(inodedata * ind,uint32_t chindx)380 chunkdata* write_new_chunkdata(inodedata *ind,uint32_t chindx) {
381 chunkdata *chd;
382
383 chd = malloc(sizeof(chunkdata));
384 passert(chd);
385 chd->chindx = chindx;
386 chd->wakeup_fd = -1;
387 chd->datachainhead = NULL;
388 chd->datachaintail = NULL;
389 chd->waitingworker = 0;
390 chd->chunkready = 0;
391 chd->unbreakable = 0;
392 chd->continueop = 0;
393 chd->superuser = 0;
394 chd->trycnt = 0;
395 chd->parent = ind;
396 chd->next = NULL;
397 chd->prev = ind->chunkstail;
398 *(ind->chunkstail) = chd;
399 ind->chunkstail = &(chd->next);
400 if (ind->chunksnext==NULL) {
401 ind->chunksnext = chd;
402 }
403 return chd;
404 }
405
write_free_chunkdata(chunkdata * chd)406 void write_free_chunkdata(chunkdata *chd) {
407 *(chd->prev) = chd->next;
408 if (chd->next) {
409 chd->next->prev = chd->prev;
410 } else {
411 chd->parent->chunkstail = chd->prev;
412 }
413 chd->parent->chunkscnt--;
414 write_test_chunkdata(chd->parent);
415 free(chd);
416 }
417
418 /* queues */
419
write_enqueue(chunkdata * chd)420 void write_enqueue(chunkdata *chd) {
421 queue_put(jqueue,0,0,(uint8_t*)chd,0);
422 }
423
write_delayrun_enqueue(void * udata)424 void write_delayrun_enqueue(void *udata) {
425 queue_put(jqueue,0,0,(uint8_t*)udata,0);
426 }
427
write_delayed_enqueue(chunkdata * chd,uint32_t usecs)428 void write_delayed_enqueue(chunkdata *chd,uint32_t usecs) {
429 if (usecs>0) {
430 delay_run(write_delayrun_enqueue,chd,usecs);
431 } else {
432 queue_put(jqueue,0,0,(uint8_t*)chd,0);
433 }
434 }
435
436 /*
437 void* write_dqueue_worker(void *arg) {
438 uint64_t t,usec;
439 uint32_t husec,lusec,cnt;
440 uint8_t *ind;
441 (void)arg;
442 for (;;) {
443 queue_get(dqueue,&husec,&lusec,&ind,&cnt);
444 if (ind==NULL) {
445 return NULL;
446 }
447 t = monotonic_useconds();
448 usec = husec;
449 usec <<= 32;
450 usec |= lusec;
451 if (t>usec) {
452 t -= usec;
453 while (t>=1000000 && cnt>0) {
454 t-=1000000;
455 cnt--;
456 }
457 if (cnt>0) {
458 if (t<1000000) {
459 portable_usleep(1000000-t);
460 }
461 cnt--;
462 }
463 }
464 if (cnt>0) {
465 t = monotonic_useconds();
466 queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)ind,cnt);
467 } else {
468 queue_put(jqueue,0,0,ind,0);
469 }
470 }
471 return NULL;
472 }
473 */
474
write_job_end(chunkdata * chd,int status,uint32_t delay)475 void write_job_end(chunkdata *chd,int status,uint32_t delay) {
476 cblock *cb,*fcb;
477 inodedata *ind = chd->parent;
478
479 zassert(pthread_mutex_lock(&(ind->lock)));
480 if (status!=0) {
481 errno = status;
482 syslog(LOG_WARNING,"error writing file number %"PRIu32": %s",ind->inode,strerr(errno));
483 ind->status = status;
484 // if (ind->chunkwaiting>0) {
485 zassert(pthread_cond_broadcast(&(ind->chunkcond)));
486 // }
487 }
488 if (status==0 && delay==0) {
489 chd->trycnt=0; // on good write reset try counter
490 }
491 status = ind->status;
492
493 if (chd->datachainhead && status==0) { // still have some work to do
494 // reset write ind
495 for (cb=chd->datachainhead ; cb ; cb=cb->next) {
496 cb->writeid = 0;
497 }
498 write_delayed_enqueue(chd,delay);
499 } else { // no more work or error occurred
500 // if this is an error then release all data blocks
501 cb = chd->datachainhead;
502 while (cb) {
503 fcb = cb;
504 cb = cb->next;
505 write_cb_release(ind,fcb);
506 }
507 if (ind->flushwaiting>0) {
508 zassert(pthread_cond_broadcast(&(ind->flushcond)));
509 }
510 write_free_chunkdata(chd);
511 }
512 zassert(pthread_mutex_unlock(&(ind->lock)));
513 }
514
515 void* write_worker(void *arg);
516
517 #ifndef WDEBUG
518 static uint32_t lastnotify = 0;
519 #endif
520
write_data_spawn_worker(void)521 static inline void write_data_spawn_worker(void) {
522 #ifndef WIN32
523 sigset_t oldset;
524 sigset_t newset;
525 #endif
526 worker *w;
527 int res;
528
529 w = malloc(sizeof(worker));
530 if (w==NULL) {
531 return;
532 }
533 #ifndef WIN32
534 sigemptyset(&newset);
535 sigaddset(&newset, SIGTERM);
536 sigaddset(&newset, SIGINT);
537 sigaddset(&newset, SIGHUP);
538 sigaddset(&newset, SIGQUIT);
539 zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
540 #endif
541 res = pthread_create(&(w->thread_id),&worker_thattr,write_worker,w);
542 #ifndef WIN32
543 zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
544 #endif
545 if (res<0) {
546 return;
547 }
548 workers_avail++;
549 workers_total++;
550 #ifdef WDEBUG
551 fprintf(stderr,"spawn write worker (total: %"PRIu32")\n",workers_total);
552 #else
553 if (workers_total%10==0 && workers_total!=lastnotify) {
554 syslog(LOG_INFO,"write workers: %"PRIu32"+\n",workers_total);
555 lastnotify = workers_total;
556 }
557 #endif
558 }
559
write_data_close_worker(worker * w)560 static inline void write_data_close_worker(worker *w) {
561 workers_avail--;
562 workers_total--;
563 if (workers_total==0 && worker_term_waiting) {
564 zassert(pthread_cond_signal(&worker_term_cond));
565 worker_term_waiting--;
566 }
567 pthread_detach(w->thread_id);
568 free(w);
569 #ifdef WDEBUG
570 fprintf(stderr,"close write worker (total: %"PRIu32")\n",workers_total);
571 #else
572 if (workers_total%10==0 && workers_total!=lastnotify) {
573 syslog(LOG_INFO,"write workers: %"PRIu32"-\n",workers_total);
574 lastnotify = workers_total;
575 }
576 #endif
577 }
578
write_prepare_ip(char ipstr[16],uint32_t ip)579 static inline void write_prepare_ip (char ipstr[16],uint32_t ip) {
580 if (ipstr[0]==0) {
581 snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
582 ipstr[15]=0;
583 }
584 }
585
586 /* main working thread */
write_worker(void * arg)587 void* write_worker(void *arg) {
588 uint32_t z1,z2,z3;
589 uint8_t *data;
590 int fd;
591 int i;
592 struct pollfd pfd[2];
593 uint32_t sent,rcvd;
594 uint32_t hdrtosend;
595 uint8_t sending_mode;
596 uint8_t recvbuff[21];
597 uint8_t sendbuff[32];
598 #ifdef HAVE_WRITEV
599 struct iovec siov[2];
600 #endif
601 uint8_t pipebuff[1024];
602 int pipefd[2];
603 uint8_t *wptr;
604 const uint8_t *rptr;
605
606 uint32_t reccmd;
607 uint32_t recleng;
608 uint64_t recchunkid;
609 uint32_t recwriteid;
610 uint8_t recstatus;
611
612 #ifdef WORKER_DEBUG
613 uint32_t partialblocks;
614 uint32_t bytessent;
615 char debugchain[200];
616 uint32_t cl;
617 #endif
618
619 uint8_t *cpw;
620 cspri chain[100];
621 uint32_t chainminver;
622 uint16_t chainelements;
623 uint8_t cschain[6*99];
624 uint32_t cschainsize;
625
626 uint32_t inode;
627 uint32_t chindx;
628 uint32_t ip;
629 uint16_t port;
630 uint32_t srcip;
631 uint64_t mfleng;
632 uint64_t maxwroffset;
633 uint64_t chunkid;
634 uint32_t version;
635 uint32_t nextwriteid;
636 const uint8_t *csdata;
637 uint32_t csdatasize;
638 uint8_t csdataver;
639 uint8_t westatus;
640 uint8_t wrstatus;
641 uint8_t chunkready;
642 uint8_t unbreakable;
643 uint8_t chunkopflags;
644 int status;
645 char csstrip[16];
646 uint8_t waitforstatus;
647 uint8_t donotstayidle;
648 double opbegin;
649 double start,now,lastrcvd,lastblock,lastsent;
650 double workingtime,lrdiff,lbdiff;
651 uint32_t wtotal;
652 uint8_t cnt;
653 uint8_t firsttime = 1;
654 worker *w = (worker*)arg;
655
656 uint8_t valid_offsets;
657 uint64_t min_offset;
658 uint64_t max_offset;
659
660 inodedata *ind;
661 chunkdata *chd;
662 cblock *cb,*ncb,*rcb;
663 // inodedata *ind;
664
665 chainelements = 0;
666 chindx = 0;
667
668 if (pipe(pipefd)<0) {
669 syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
670 return NULL;
671 }
672
673 for (;;) {
674 for (i=0 ; i<chainelements ; i++) {
675 csdb_writedec(chain[i].ip,chain[i].port);
676 }
677 chainelements=0;
678
679 if (firsttime==0) {
680 zassert(pthread_mutex_lock(&workerslock));
681 workers_avail++;
682 if (workers_avail > SUSTAIN_WORKERS) {
683 // fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
684 write_data_close_worker(w);
685 zassert(pthread_mutex_unlock(&workerslock));
686 close_pipe(pipefd);
687 return NULL;
688 }
689 zassert(pthread_mutex_unlock(&workerslock));
690 }
691 firsttime = 0;
692
693 // get next job
694 queue_get(jqueue,&z1,&z2,&data,&z3);
695
696 zassert(pthread_mutex_lock(&workerslock));
697
698 if (data==NULL) {
699 write_data_close_worker(w);
700 zassert(pthread_mutex_unlock(&workerslock));
701 close_pipe(pipefd);
702 return NULL;
703 }
704
705 workers_avail--;
706 if (workers_avail==0 && workers_total<MAX_WORKERS) {
707 write_data_spawn_worker();
708 }
709 zassert(pthread_mutex_unlock(&workerslock));
710
711 chd = (chunkdata*)data;
712 ind = chd->parent;
713
714 zassert(pthread_mutex_lock(&(ind->lock)));
715
716 if (chd->datachainhead) {
717 chindx = chd->chindx;
718 status = ind->status;
719 } else {
720 syslog(LOG_WARNING,"writeworker got inode with no data to write !!!");
721 status = EINVAL; // this should never happen, so status is not important - just anything
722 }
723 chunkready = chd->chunkready;
724 chunkopflags = (chd->continueop?CHUNKOPFLAG_CONTINUEOP:0) | (chd->superuser?CHUNKOPFLAG_CANUSERESERVESPACE:0);
725
726 zassert(pthread_mutex_unlock(&(ind->lock)));
727
728 if (status) {
729 write_job_end(chd,status,0);
730 continue;
731 }
732
733 inode = ind->inode;
734 chunkrwlock_wlock(inode,chindx);
735
736 opbegin = 0; // make static code analysers happy
737 if (optimeout>0.0) {
738 opbegin = monotonic_seconds();
739 }
740
741 valid_offsets = 0;
742 min_offset = 0;
743 max_offset = 0;
744
745 // syslog(LOG_NOTICE,"file: %"PRIu32", index: %"PRIu16" - debug1",inode,chindx);
746 // get chunk data from master
747 // start = monotonic_seconds();
748 wrstatus = fs_writechunk(inode,chindx,chunkopflags,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
749
750 // rdstatus potential results:
751 // MFS_ERROR_NOCHUNK - internal error (can't be repaired)
752 // MFS_ERROR_ENOENT - internal error (wrong inode - can't be repaired)
753 // MFS_ERROR_EPERM - internal error (wrong inode - can't be repaired)
754 // MFS_ERROR_INDEXTOOBIG - requested file position is too big
755 // MFS_ERROR_CHUNKLOST - according to master chunk is definitelly lost (all chunkservers are connected and chunk is not there)
756 // MFS_ERROR_QUOTA - disk quota exceeded (quit immediately - no retry)
757 // MFS_ERROR_NOSPACE - no space on disk
758 // MFS_ERROR_IO (for future use)
759 // MFS_ERROR_NOCHUNKSERVERS - can't create new chunk - no space available, but maybe only temporarily
760 // MFS_ERROR_CSNOTPRESENT - can't modify chunk - chunk is lost, but maybe only temporarily
761
762 if (wrstatus!=MFS_STATUS_OK) {
763 if (wrstatus!=MFS_ERROR_LOCKED && wrstatus!=MFS_ERROR_EAGAIN) {
764 if (wrstatus==MFS_ERROR_ENOENT || wrstatus==MFS_ERROR_EPERM || wrstatus==MFS_ERROR_NOCHUNK) {
765 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
766 write_job_end(chd,EBADF,0);
767 } else if (wrstatus==MFS_ERROR_INDEXTOOBIG) {
768 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
769 write_job_end(chd,EINVAL,0);
770 } else if (wrstatus==MFS_ERROR_QUOTA) {
771 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
772 #ifdef EDQUOT
773 write_job_end(chd,EDQUOT,0);
774 #else
775 write_job_end(chd,ENOSPC,0);
776 #endif
777 } else if (wrstatus==MFS_ERROR_NOSPACE && erroronnospace) {
778 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
779 write_job_end(chd,ENOSPC,0);
780 } else if (wrstatus==MFS_ERROR_CHUNKLOST && erroronlostchunk) {
781 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
782 write_job_end(chd,ENXIO,0);
783 } else if (wrstatus==MFS_ERROR_IO) {
784 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
785 write_job_end(chd,EIO,0);
786 } else if (wrstatus==MFS_ERROR_EROFS) {
787 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
788 write_job_end(chd,EROFS,0);
789 } else {
790 if (chd->trycnt >= minlogretry) {
791 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
792 }
793 chd->trycnt++;
794 if (chd->trycnt>=maxretries) {
795 if (wrstatus==MFS_ERROR_NOCHUNKSERVERS || wrstatus==MFS_ERROR_NOSPACE) {
796 write_job_end(chd,ENOSPC,0);
797 } else if (wrstatus==MFS_ERROR_CSNOTPRESENT || wrstatus==MFS_ERROR_CHUNKLOST) {
798 write_job_end(chd,ENXIO,0);
799 } else {
800 write_job_end(chd,EIO,0);
801 }
802 } else {
803 write_delayed_enqueue(chd,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
804 }
805 }
806 } else {
807 if (chd->trycnt<=2) {
808 chd->trycnt++;
809 write_delayed_enqueue(chd,1000);
810 } else if (chd->trycnt<=6) {
811 chd->trycnt++;
812 write_delayed_enqueue(chd,100000);
813 } else {
814 write_delayed_enqueue(chd,500000);
815 }
816 }
817 chunkrwlock_wunlock(inode,chindx);
818 continue; // get next job
819 }
820
821 chunksdatacache_insert(inode,chindx,chunkid,version,csdataver,csdata,csdatasize);
822 #ifdef MFSMOUNT
823 fdcache_invalidate(inode);
824 #endif
825
826 // now = monotonic_seconds();
827 // fprintf(stderr,"fs_writechunk time: %.3lf\n",(now-start));
828
829 if (csdata!=NULL && csdatasize>0) {
830 chainelements = csorder_sort(chain,csdataver,csdata,csdatasize,1);
831 } else {
832 chainelements = 0;
833 }
834
835 if (csdata==NULL || csdatasize==0 || chainelements==0) {
836 chainelements = 0;
837 zassert(pthread_mutex_lock(&(ind->lock)));
838 chd->trycnt+=6;
839 unbreakable = chd->unbreakable;
840 if (chd->trycnt>=maxretries) {
841 unbreakable = 0; // unlock chunk on error
842 }
843 chd->continueop = unbreakable;
844 zassert(pthread_mutex_unlock(&(ind->lock)));
845 if (unbreakable==0) {
846 fs_writeend(chunkid,inode,chindx,0,0);
847 }
848 if (chd->trycnt >= minlogretry) {
849 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - there are no valid copies",inode,chindx,chunkid,version);
850 }
851 if (chd->trycnt>=maxretries) {
852 write_job_end(chd,ENXIO,0);
853 } else {
854 write_delayed_enqueue(chd,60000000);
855 }
856 chunkrwlock_wunlock(inode,chindx);
857 continue;
858 }
859 ip = chain[0].ip;
860 port = chain[0].port;
861 chainminver = chain[0].version;
862 csdb_writeinc(ip,port);
863 csstrip[0] = 0;
864 cpw = cschain;
865 cschainsize = 0;
866 for (i=1 ; i<chainelements ; i++) {
867 csdb_writeinc(chain[i].ip,chain[i].port);
868 if (chain[i].version < chainminver) {
869 chainminver = chain[i].version;
870 }
871 put32bit(&cpw,chain[i].ip);
872 put16bit(&cpw,chain[i].port);
873 cschainsize += 6;
874 }
875 #if 0
876 cp = csdata;
877 cpe = csdata+csdatasize;
878 chainminver = 0xFFFFFFFF;
879 cpw = cschain;
880 cschainsize = 0;
881 while (cp<cpe && chainelements<100) {
882 tmpip = get32bit(&cp);
883 tmpport = get16bit(&cp);
884 if (csdataver>0) {
885 tmpver = get32bit(&cp);
886 } else {
887 tmpver = 0;
888 }
889 if (csdataver>1) {
890 tmplabelmask = get32bit(&cp);
891 } else {
892 tmplabelmask = 0;
893 }
894 chainip[chainelements] = tmpip;
895 chainport[chainelements] = tmpport;
896 csdb_writeinc(tmpip,tmpport);
897 if (tmpver<chainminver) {
898 chainminver = tmpver;
899 }
900 if (chainelements==0) {
901 ip = tmpip;
902 port = tmpport;
903 } else {
904 put32bit(&cpw,tmpip);
905 put16bit(&cpw,tmpport);
906 cschainsize += 6;
907 }
908 chainelements++;
909 }
910
911 if (cp<cpe) {
912 fs_writeend(chunkid,inode,0,0);
913 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - there are too many copies",inode,chindx,chunkid,version);
914 chd->trycnt+=6;
915 if (chd->trycnt>=maxretries) {
916 write_job_end(chd,ENXIO,0);
917 } else {
918 write_delayed_enqueue(chd,60000000);
919 }
920 continue;
921 }
922 #endif
923 // chain = csdata;
924 // ip = get32bit(&chain);
925 // port = get16bit(&chain);
926 // chainsize = csdatasize-6;
927
928 start = monotonic_seconds();
929
930 /*
931 if (csdatasize>CSDATARESERVE) {
932 csdatasize = CSDATARESERVE;
933 }
934 memcpy(wrec->csdata,csdata,csdatasize);
935 wrec->csdatasize=csdatasize;
936 while (csdatasize>=6) {
937 tmpip = get32bit(&csdata);
938 tmpport = get16bit(&csdata);
939 csdatasize-=6;
940 csdb_writeinc(tmpip,tmpport);
941 }
942 */
943
944 // make connection to cs
945 srcip = fs_getsrcip();
946 fd = conncache_get(ip,port);
947 if (fd<0) {
948 uint32_t connmaxtry;
949 zassert(pthread_mutex_lock(&(ind->lock)));
950 connmaxtry = (chd->trycnt*2)+2;
951 if (connmaxtry>10) {
952 connmaxtry = 10;
953 }
954 zassert(pthread_mutex_unlock(&(ind->lock)));
955 cnt=0;
956 while (cnt<connmaxtry) {
957 fd = tcpsocket();
958 if (fd<0) {
959 syslog(LOG_WARNING,"writeworker: can't create tcp socket: %s",strerr(errno));
960 break;
961 }
962 if (srcip) {
963 if (tcpnumbind(fd,srcip,0)<0) {
964 syslog(LOG_WARNING,"writeworker: can't bind socket to given ip: %s",strerr(errno));
965 tcpclose(fd);
966 fd=-1;
967 break;
968 }
969 }
970 if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
971 cnt++;
972 if (cnt>=connmaxtry) {
973 int err = errno;
974 zassert(pthread_mutex_lock(&(ind->lock)));
975 if (chd->trycnt >= minlogretry) {
976 write_prepare_ip(csstrip,ip);
977 syslog(LOG_WARNING,"writeworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(err));
978 }
979 zassert(pthread_mutex_unlock(&(ind->lock)));
980 }
981 close(fd);
982 fd=-1;
983 } else {
984 uint32_t mip,pip;
985 uint16_t mport,pport;
986 tcpgetpeer(fd,&pip,&pport);
987 tcpgetmyaddr(fd,&mip,&mport);
988 #ifdef WDEBUG
989 fprintf(stderr,"connection ok (%"PRIX32":%"PRIu16"->%"PRIX32":%"PRIu16")\n",mip,mport,pip,pport);
990 #endif
991 cnt=connmaxtry;
992 }
993 }
994 }
995 if (fd<0) {
996 zassert(pthread_mutex_lock(&(ind->lock)));
997 chd->trycnt++;
998 unbreakable = chd->unbreakable;
999 if (chd->trycnt>=maxretries) {
1000 unbreakable = 0; // unlock chunk on error
1001 }
1002 chd->continueop = unbreakable;
1003 zassert(pthread_mutex_unlock(&(ind->lock)));
1004 if (unbreakable==0) {
1005 fs_writeend(chunkid,inode,chindx,0,0);
1006 }
1007 if (chd->trycnt>=maxretries) {
1008 write_job_end(chd,EIO,0);
1009 } else {
1010 write_delayed_enqueue(chd,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
1011 }
1012 chunkrwlock_wunlock(inode,chindx);
1013 continue;
1014 }
1015 if (tcpnodelay(fd)<0) {
1016 syslog(LOG_WARNING,"writeworker: can't set TCP_NODELAY: %s",strerr(errno));
1017 }
1018
1019 if (chunkready==0) {
1020 zassert(pthread_mutex_lock(&(ind->lock)));
1021 if (chd->chunkready==0) {
1022 chd->chunkready = 1;
1023 // if (ind->chunkwaiting>0) {
1024 zassert(pthread_cond_broadcast(&(ind->chunkcond)));
1025 // }
1026 }
1027 zassert(pthread_mutex_unlock(&(ind->lock)));
1028 }
1029
1030 #ifdef WORKER_DEBUG
1031 partialblocks=0;
1032 bytessent=0;
1033 #endif
1034 nextwriteid=1;
1035
1036 pfd[0].fd = fd;
1037 pfd[1].fd = pipefd[0];
1038 rcvd = 0;
1039 sent = 0;
1040 waitforstatus=1;
1041 wptr = sendbuff;
1042
1043 put32bit(&wptr,CLTOCS_WRITE);
1044 if (chainminver>=VERSION2INT(1,7,32)) {
1045 put32bit(&wptr,13+cschainsize);
1046 put8bit(&wptr,1);
1047 hdrtosend = 21;
1048 } else {
1049 put32bit(&wptr,12+cschainsize);
1050 hdrtosend = 20;
1051 }
1052
1053 put64bit(&wptr,chunkid);
1054 put32bit(&wptr,version);
1055 sending_mode = 1;
1056 // debug: syslog(LOG_NOTICE,"writeworker: init packet prepared");
1057 cb = NULL;
1058
1059 status = 0;
1060 wrstatus = MFS_STATUS_OK;
1061
1062 lastrcvd = 0.0;
1063 lastsent = 0.0;
1064 lastblock = 0.0;
1065
1066 donotstayidle = 0;
1067 // firstloop = 1;
1068
1069 do {
1070 now = monotonic_seconds();
1071 zassert(pthread_mutex_lock(&workerslock));
1072 wtotal = workers_total;
1073 zassert(pthread_mutex_unlock(&workerslock));
1074
1075 if (optimeout>0.0 && now - opbegin > optimeout) {
1076 status = EIO;
1077 break;
1078 }
1079 zassert(pthread_mutex_lock(&(ind->lock)));
1080
1081 // if (ind->status!=0) {
1082 // zassert(pthread_mutex_unlock(&glock));
1083 // break;
1084 // }
1085
1086 if (lastrcvd==0.0) {
1087 lastrcvd = now;
1088 } else {
1089 lrdiff = now - lastrcvd;
1090 if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
1091 if (chd->trycnt >= minlogretry) {
1092 write_prepare_ip(csstrip,ip);
1093 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was timed out (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1094 }
1095 zassert(pthread_mutex_unlock(&(ind->lock)));
1096 break;
1097 }
1098 }
1099 if (lastblock==0.0) {
1100 lbdiff = NEXT_BLOCK_DELAY; // first block should be send immediately
1101 } else {
1102 lbdiff = now - lastblock;
1103 }
1104 workingtime = now - start;
1105
1106 chd->waitingworker=1;
1107 chd->wakeup_fd = pipefd[1];
1108
1109 if (sending_mode==0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+((wtotal>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT) && waitforstatus<64) {
1110 if (cb==NULL) {
1111 ncb = chd->datachainhead;
1112 } else {
1113 ncb = cb->next;
1114 }
1115 if (ncb) {
1116 if (ncb->to-ncb->from==MFSBLOCKSIZE || lbdiff>=NEXT_BLOCK_DELAY || ncb->next!=NULL || ind->flushwaiting) {
1117 cb = ncb;
1118 sending_mode = 2;
1119 } else {
1120 chd->waitingworker = 2; // wait for block expand
1121 chd->wakeup_fd = pipefd[1];
1122 }
1123 }
1124 if (sending_mode==2) {
1125 uint64_t offset_from;
1126 uint64_t offset_to;
1127
1128 cb->writeid = nextwriteid++;
1129 // debug: syslog(LOG_NOTICE,"writeworker: data packet prepared (writeid:%"PRIu32",pos:%"PRIu16")",cb->writeid,cb->pos);
1130 waitforstatus++;
1131 wptr = sendbuff;
1132 put32bit(&wptr,CLTOCS_WRITE_DATA);
1133 put32bit(&wptr,24+(cb->to-cb->from));
1134 put64bit(&wptr,chunkid);
1135 put32bit(&wptr,cb->writeid);
1136 put16bit(&wptr,cb->pos);
1137 put16bit(&wptr,cb->from);
1138 put32bit(&wptr,cb->to-cb->from);
1139 put32bit(&wptr,mycrc32(0,cb->data+cb->from,cb->to-cb->from));
1140 #ifdef WORKER_DEBUG
1141 if (cb->to-cb->from<MFSBLOCKSIZE) {
1142 partialblocks++;
1143 }
1144 bytessent+=(cb->to-cb->from);
1145 #endif
1146 sent = 0;
1147 lastblock = now;
1148 lastsent = now;
1149 offset_from = chindx;
1150 offset_from <<= MFSCHUNKBITS;
1151 offset_from += cb->pos*MFSBLOCKSIZE;
1152 offset_to = offset_from;
1153 offset_from += cb->from;
1154 offset_to += cb->to;
1155 if (valid_offsets) {
1156 if (offset_from < min_offset) {
1157 min_offset = offset_from;
1158 }
1159 if (offset_to > max_offset) {
1160 max_offset = offset_to;
1161 }
1162 } else {
1163 min_offset = offset_from;
1164 max_offset = offset_to;
1165 valid_offsets = 1;
1166 }
1167 } else if (lastsent+WORKER_NOP_INTERVAL<now && chainminver>=VERSION2INT(1,7,32)) {
1168 wptr = sendbuff;
1169 put32bit(&wptr,ANTOAN_NOP);
1170 put32bit(&wptr,0);
1171 sent = 0;
1172 sending_mode = 3;
1173 lastsent = now;
1174 }
1175 }
1176
1177 #ifdef WORKER_DEBUG
1178 fprintf(stderr,"workerloop: waitforstatus:%u workingtime:%.6lf workers_total:%u lbdiff:%.6lf donotstayidle:%u\n",waitforstatus,workingtime,wtotal,lbdiff,donotstayidle);
1179 #endif
1180 if (waitforstatus>0) {
1181 if (workingtime>WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((wtotal>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) { // timeout
1182 chd->waitingworker = 0;
1183 chd->wakeup_fd = -1;
1184 zassert(pthread_mutex_unlock(&(ind->lock)));
1185 break;
1186 }
1187 } else {
1188 if (lbdiff>=WORKER_IDLE_TIMEOUT || donotstayidle || wtotal>HEAVYLOAD_WORKERS) {
1189 chd->waitingworker = 0;
1190 chd->wakeup_fd = -1;
1191 zassert(pthread_mutex_unlock(&(ind->lock)));
1192 break;
1193 }
1194 }
1195
1196
1197 zassert(pthread_mutex_unlock(&(ind->lock)));
1198
1199 switch (sending_mode) {
1200 case 1:
1201 if (sent<hdrtosend) {
1202 #ifdef HAVE_WRITEV
1203 if (cschainsize>0) {
1204 siov[0].iov_base = (void*)(sendbuff+sent);
1205 siov[0].iov_len = hdrtosend-sent;
1206 siov[1].iov_base = (void*)cschain; // discard const (safe - because it's used in writev)
1207 siov[1].iov_len = cschainsize;
1208 i = writev(fd,siov,2);
1209 } else {
1210 #endif
1211 i = universal_write(fd,sendbuff+sent,hdrtosend-sent);
1212 #ifdef HAVE_WRITEV
1213 }
1214 #endif
1215 } else {
1216 i = universal_write(fd,cschain+(sent-hdrtosend),cschainsize-(sent-hdrtosend));
1217 }
1218 if (i>=0) {
1219 sent+=i;
1220 if (sent==hdrtosend+cschainsize) {
1221 sending_mode = 0;
1222 }
1223 }
1224 break;
1225 case 2:
1226 if (sent<32) {
1227 #ifdef HAVE_WRITEV
1228 siov[0].iov_base = (void*)(sendbuff+sent);
1229 siov[0].iov_len = 32-sent;
1230 siov[1].iov_base = (void*)(cb->data+cb->from);
1231 siov[1].iov_len = cb->to-cb->from;
1232 i = writev(fd,siov,2);
1233 #else
1234 i = universal_write(fd,sendbuff+sent,32-sent);
1235 #endif
1236 } else {
1237 i = universal_write(fd,cb->data+cb->from+(sent-32),cb->to-cb->from-(sent-32));
1238 }
1239 if (i>=0) {
1240 sent+=i;
1241 if (sent==32+cb->to-cb->from) {
1242 sending_mode = 0;
1243 }
1244 }
1245 break;
1246 case 3:
1247 i = universal_write(fd,sendbuff+sent,8-sent);
1248 if (i>=0) {
1249 sent+=i;
1250 if (sent==8) {
1251 sending_mode = 0;
1252 }
1253 }
1254 break;
1255 default:
1256 i=0;
1257 }
1258
1259 if (i<0) {
1260 if (ERRNO_ERROR && errno!=EINTR) {
1261 if (chd->trycnt >= minlogretry) {
1262 write_prepare_ip(csstrip,ip);
1263 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: write to (%s:%"PRIu16") error: %s / NEGWRITE (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,chd->trycnt+1);
1264 }
1265 status=EIO;
1266 break;
1267 }
1268 }
1269
1270 pfd[0].events = POLLIN | (sending_mode?POLLOUT:0);
1271 pfd[0].revents = 0;
1272 pfd[1].events = POLLIN;
1273 pfd[1].revents = 0;
1274 if (poll(pfd,2,100)<0) { /* correct timeout - in msec */
1275 if (errno!=EINTR) {
1276 if (chd->trycnt >= minlogretry) {
1277 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: poll error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,strerr(errno),waitforstatus,chd->trycnt+1);
1278 }
1279 status=EIO;
1280 break;
1281 }
1282 }
1283 zassert(pthread_mutex_lock(&(ind->lock))); // make helgrind happy
1284 chd->waitingworker = 0;
1285 chd->wakeup_fd = -1;
1286 donotstayidle = (ind->flushwaiting>0 || ind->status!=0 || ind->chunkscnt>=MAX_SIM_CHUNKS)?1:0;
1287 zassert(pthread_mutex_unlock(&(ind->lock))); // make helgrind happy
1288 if (pfd[1].revents&POLLIN) { // used just to break poll - so just read all data from pipe to empty it
1289 i = universal_read(pipefd[0],pipebuff,1024);
1290 if (i<0) { // mainly to make happy static code analyzers
1291 if (chd->trycnt >= minlogretry) {
1292 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: read pipe error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,strerr(errno),waitforstatus,chd->trycnt+1);
1293 }
1294 }
1295 }
1296 if (pfd[0].revents&POLLHUP) {
1297 if (chd->trycnt >= minlogretry) {
1298 write_prepare_ip(csstrip,ip);
1299 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1300 }
1301 status=EIO;
1302 break;
1303 }
1304 if (pfd[0].revents&POLLERR) {
1305 if (chd->trycnt >= minlogretry) {
1306 write_prepare_ip(csstrip,ip);
1307 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") got error status / POLLERR (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1308 }
1309 status=EIO;
1310 break;
1311 }
1312 if (pfd[0].revents&POLLIN) {
1313 i = universal_read(fd,recvbuff+rcvd,21-rcvd);
1314 if (i==0) { // connection reset by peer or read error
1315 if (chd->trycnt >= minlogretry) {
1316 write_prepare_ip(csstrip,ip);
1317 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1318 }
1319 status=EIO;
1320 break;
1321 }
1322 if (i<0) {
1323 if (errno!=EINTR) {
1324 if (chd->trycnt >= minlogretry) {
1325 write_prepare_ip(csstrip,ip);
1326 syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: read from (%s:%"PRIu16") error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,chd->trycnt+1);
1327 }
1328 status=EIO;
1329 break;
1330 } else {
1331 i=0;
1332 }
1333 }
1334 lastrcvd = monotonic_seconds();
1335 rcvd+=i;
1336 // do not accept ANTOAN_UNKNOWN_COMMAND and ANTOAN_BAD_COMMAND_SIZE here - only ANTOAN_NOP
1337 if (rcvd>=8 && recvbuff[7]==0 && recvbuff[6]==0 && recvbuff[5]==0 && recvbuff[4]==0 && recvbuff[3]==0 && recvbuff[2]==0 && recvbuff[1]==0 && recvbuff[0]==0) { // ANTOAN_NOP packet received - skip it
1338 if (rcvd>8) {
1339 memmove(recvbuff,recvbuff+8,rcvd-8);
1340 rcvd-=8;
1341 }
1342 }
1343 if (rcvd==21) {
1344 rptr = recvbuff;
1345 reccmd = get32bit(&rptr);
1346 recleng = get32bit(&rptr);
1347 recchunkid = get64bit(&rptr);
1348 recwriteid = get32bit(&rptr);
1349 recstatus = get8bit(&rptr);
1350 if (reccmd!=CSTOCL_WRITE_STATUS || recleng!=13) {
1351 syslog(LOG_WARNING,"writeworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32")",reccmd,recleng);
1352 status=EIO;
1353 break;
1354 }
1355 if (recchunkid!=chunkid) {
1356 syslog(LOG_WARNING,"writeworker: got unexpected packet (expected chunkdid:%016"PRIX64",packet chunkid:%016"PRIX64")",chunkid,recchunkid);
1357 status=EIO;
1358 break;
1359 }
1360 if (recstatus!=MFS_STATUS_OK) {
1361 if (chd->trycnt >= minlogretry) {
1362 syslog(LOG_WARNING,"writeworker: write error: %s",mfsstrerr(recstatus));
1363 }
1364 wrstatus=recstatus;
1365 break;
1366 }
1367 // debug: syslog(LOG_NOTICE,"writeworker: received status ok for writeid:%"PRIu32,recwriteid);
1368 if (recwriteid>0) {
1369 zassert(pthread_mutex_lock(&(ind->lock)));
1370 for (rcb = chd->datachainhead ; rcb && rcb->writeid!=recwriteid ; rcb=rcb->next) {}
1371 if (rcb==NULL) {
1372 syslog(LOG_WARNING,"writeworker: got unexpected status (writeid:%"PRIu32")",recwriteid);
1373 zassert(pthread_mutex_unlock(&(ind->lock)));
1374 status=EIO;
1375 break;
1376 }
1377 if (rcb==cb) { // current block
1378 // debug: syslog(LOG_NOTICE,"writeworker: received status for current block");
1379 if (sending_mode==2) { // got status ok before all data had been sent - error
1380 syslog(LOG_WARNING,"writeworker: got status OK before all data have been sent");
1381 zassert(pthread_mutex_unlock(&(ind->lock)));
1382 status=EIO;
1383 break;
1384 } else {
1385 cb = NULL;
1386 }
1387 }
1388 if (rcb->prev) {
1389 rcb->prev->next = rcb->next;
1390 } else {
1391 chd->datachainhead = rcb->next;
1392 }
1393 if (rcb->next) {
1394 rcb->next->prev = rcb->prev;
1395 } else {
1396 chd->datachaintail = rcb->prev;
1397 }
1398 maxwroffset = (((uint64_t)(chindx))<<MFSCHUNKBITS)+(((uint32_t)(rcb->pos))<<MFSBLOCKBITS)+rcb->to;
1399 if (maxwroffset>mfleng) {
1400 mfleng=maxwroffset;
1401 }
1402 write_cb_release(ind,rcb);
1403 zassert(pthread_mutex_unlock(&(ind->lock)));
1404 }
1405 waitforstatus--;
1406 rcvd=0;
1407 }
1408 }
1409 } while (1);
1410
1411 if (waitforstatus==0 && chainminver>=VERSION2INT(1,7,32)) {
1412 wptr = sendbuff;
1413 put32bit(&wptr,CLTOCS_WRITE_FINISH);
1414 put32bit(&wptr,12);
1415 put64bit(&wptr,chunkid);
1416 put32bit(&wptr,version);
1417 if (universal_write(fd,sendbuff,20)==20) {
1418 conncache_insert(ip,port,fd);
1419 } else {
1420 tcpclose(fd);
1421 }
1422 } else {
1423 tcpclose(fd);
1424 }
1425
1426 #ifdef WORKER_DEBUG
1427 now = monotonic_seconds();
1428 workingtime = now - start;
1429
1430 cl=0;
1431 for (cnt=0 ; cnt<chainelements ; cnt++) {
1432 cl+=snprintf(debugchain+cl,200-cl,"%u.%u.%u.%u:%u->",(chain[cnt].ip>>24)&255,(chain[cnt].ip>>16)&255,(chain[cnt].ip>>8)&255,chain[cnt].ip&255,chain[cnt].port);
1433 }
1434 if (cl>=2) {
1435 debugchain[cl-2]='\0';
1436 }
1437 syslog(LOG_NOTICE,"worker %lu sent %"PRIu32" blocks (%"PRIu32" partial) of chunk %016"PRIX64"_%08"PRIX32", received status for %"PRIu32" blocks (%"PRIu32" lost), bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s ), chain: %s",(unsigned long)arg,nextwriteid-1,partialblocks,chunkid,version,nextwriteid-1-waitforstatus,waitforstatus,(double)bytessent/workingtime,bytessent,workingtime,debugchain);
1438 #endif
1439
1440 if (status!=0 || wrstatus!=MFS_STATUS_OK) {
1441 if (wrstatus!=MFS_STATUS_OK) { // convert MFS status to OS errno
1442 if (wrstatus==MFS_ERROR_NOSPACE) {
1443 status=ENOSPC;
1444 } else {
1445 status=EIO;
1446 }
1447 }
1448 } else if (nextwriteid-1 == waitforstatus) { // nothing has been written - treat it as EIO
1449 status=EIO;
1450 }
1451
1452 zassert(pthread_mutex_lock(&(ind->lock))); // make helgrind happy
1453 unbreakable = chd->unbreakable;
1454
1455 if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1456 unbreakable = 0;
1457 } else if (status!=0) {
1458 if (wrstatus!=MFS_ERROR_NOTDONE) {
1459 chd->trycnt++;
1460 }
1461 if (chd->trycnt>=maxretries) {
1462 unbreakable = 0;
1463 }
1464 } else {
1465 unbreakable = 0; // operation finished
1466 }
1467 chd->continueop = unbreakable;
1468 zassert(pthread_mutex_unlock(&(ind->lock)));
1469 if (unbreakable==0) {
1470 // for (cnt=0 ; cnt<10 ; cnt++) {
1471 westatus = fs_writeend(chunkid,inode,chindx,mfleng,0);
1472 // if (westatus==MFS_ERROR_ENOENT || westatus==MFS_ERROR_QUOTA) { // can't change -> do not repeat
1473 // break;
1474 // } else if (westatus!=MFS_STATUS_OK) {
1475 // if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1476 // westatus = MFS_ERROR_EIO;
1477 // break;
1478 // }
1479 // portable_usleep(100000+(10000<<cnt));
1480 // zassert(pthread_mutex_lock(&(ind->lock))); // make helgrind happy
1481 // zassert(pthread_mutex_unlock(&(ind->lock)));
1482 // } else {
1483 // break;
1484 // }
1485 // }
1486 } else {
1487 westatus = MFS_STATUS_OK;
1488 }
1489
1490 if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1491 write_job_end(chd,EIO,0);
1492 } else if (westatus==MFS_ERROR_ENOENT) {
1493 write_job_end(chd,EBADF,0);
1494 } else if (westatus==MFS_ERROR_QUOTA) {
1495 write_job_end(chd,EDQUOT,0);
1496 } else if (westatus==MFS_ERROR_IO) {
1497 write_job_end(chd,EIO,0);
1498 } else if (westatus!=MFS_STATUS_OK) {
1499 write_job_end(chd,ENXIO,0);
1500 } else {
1501 if (status!=0) {
1502 if (chd->trycnt>=maxretries) {
1503 write_job_end(chd,status,0);
1504 } else {
1505 if (wrstatus==MFS_ERROR_NOTDONE) {
1506 write_job_end(chd,0,300000);
1507 } else {
1508 write_job_end(chd,0,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
1509 }
1510 }
1511 } else {
1512 if (valid_offsets) {
1513 read_inode_clear_cache(inode,min_offset,max_offset-min_offset);
1514 }
1515 // read_inode_set_length_async(inode,mfleng,0);
1516 write_job_end(chd,0,0);
1517 }
1518 }
1519 chunkrwlock_wunlock(inode,chindx);
1520 }
1521 return NULL;
1522 }
1523
write_data_init(uint32_t cachesize,uint32_t retries,uint32_t timeout,uint32_t logretry,uint8_t erronlostchunk,uint8_t erronnospace)1524 void write_data_init (uint32_t cachesize,uint32_t retries,uint32_t timeout,uint32_t logretry,uint8_t erronlostchunk,uint8_t erronnospace) {
1525 uint32_t i;
1526 size_t mystacksize;
1527 // sigset_t oldset;
1528 // sigset_t newset;
1529
1530 erroronlostchunk = erronlostchunk;
1531 erroronnospace = erronnospace;
1532 cacheblockcount = (cachesize/MFSBLOCKSIZE);
1533 maxretries = retries;
1534 if (optimeout>0) {
1535 optimeout = timeout;
1536 } else {
1537 optimeout = 0.0;
1538 }
1539 minlogretry = logretry;
1540 if (cacheblockcount<10) {
1541 cacheblockcount=10;
1542 }
1543 zassert(pthread_mutex_init(&hashlock,NULL));
1544 zassert(pthread_mutex_init(&workerslock,NULL));
1545 zassert(pthread_cond_init(&worker_term_cond,NULL));
1546 worker_term_waiting = 0;
1547
1548 zassert(pthread_mutex_init(&fcblock,NULL));
1549 zassert(pthread_cond_init(&fcbcond,NULL));
1550 fcbwaiting=0;
1551 cacheblocks = malloc(sizeof(cblock)*cacheblockcount);
1552 passert(cacheblocks);
1553 for (i=0 ; i<cacheblockcount-1 ; i++) {
1554 cacheblocks[i].next = cacheblocks+(i+1);
1555 }
1556 cacheblocks[cacheblockcount-1].next = NULL;
1557 freecblockshead = cacheblocks;
1558 freecacheblocks = cacheblockcount;
1559
1560 idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1561 passert(idhash);
1562 for (i=0 ; i<IDHASHSIZE ; i++) {
1563 idhash[i]=NULL;
1564 }
1565
1566 // dqueue = queue_new(0);
1567 jqueue = queue_new(0);
1568
1569 zassert(pthread_attr_init(&worker_thattr));
1570 #ifdef PTHREAD_STACK_MIN
1571 mystacksize = PTHREAD_STACK_MIN;
1572 if (mystacksize < 0x20000) {
1573 mystacksize = 0x20000;
1574 }
1575 #else
1576 mystacksize = 0x20000;
1577 #endif
1578 zassert(pthread_attr_setstacksize(&worker_thattr,mystacksize));
1579
1580 // sigemptyset(&newset);
1581 // sigaddset(&newset, SIGTERM);
1582 // sigaddset(&newset, SIGINT);
1583 // sigaddset(&newset, SIGHUP);
1584 // sigaddset(&newset, SIGQUIT);
1585 // zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
1586 // zassert(pthread_create(&dqueue_worker_th,&worker_thattr,write_dqueue_worker,NULL));
1587 // zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
1588
1589 zassert(pthread_mutex_lock(&workerslock));
1590 workers_avail = 0;
1591 workers_total = 0;
1592 write_data_spawn_worker();
1593 zassert(pthread_mutex_unlock(&workerslock));
1594 #ifdef BUFFER_DEBUG
1595 zassert(pthread_create(&info_worker_th,&worker_thattr,write_info_worker,NULL));
1596 #endif
1597 }
1598
write_data_term(void)1599 void write_data_term(void) {
1600 uint32_t i;
1601 inodedata *ind,*indn;
1602 chunkdata *chd,*chdn;
1603
1604 // queue_close(dqueue);
1605 queue_close(jqueue);
1606 zassert(pthread_mutex_lock(&workerslock));
1607 while (workers_total>0) {
1608 worker_term_waiting++;
1609 zassert(pthread_cond_wait(&worker_term_cond,&workerslock));
1610 }
1611 zassert(pthread_mutex_unlock(&workerslock));
1612 // zassert(pthread_join(dqueue_worker_th,NULL));
1613 // queue_delete(dqueue);
1614 queue_delete(jqueue);
1615 zassert(pthread_mutex_lock(&hashlock));
1616 for (i=0 ; i<IDHASHSIZE ; i++) {
1617 for (ind = idhash[i] ; ind ; ind = indn) {
1618 indn = ind->next;
1619 zassert(pthread_mutex_lock(&(ind->lock)));
1620 chd = ind->chunks;
1621 while (chd) {
1622 chdn = chd->next;
1623 write_free_chunkdata(chd);
1624 chd = chdn;
1625 }
1626 zassert(pthread_mutex_unlock(&(ind->lock)));
1627 zassert(pthread_cond_destroy(&(ind->flushcond)));
1628 zassert(pthread_cond_destroy(&(ind->writecond)));
1629 zassert(pthread_mutex_destroy(&(ind->lock)));
1630 free(ind);
1631 }
1632 }
1633 free(idhash);
1634 zassert(pthread_mutex_unlock(&hashlock));
1635 free(cacheblocks);
1636 zassert(pthread_attr_destroy(&worker_thattr));
1637 zassert(pthread_cond_destroy(&worker_term_cond));
1638 zassert(pthread_cond_destroy(&fcbcond));
1639 zassert(pthread_mutex_destroy(&fcblock));
1640 zassert(pthread_mutex_destroy(&workerslock));
1641 zassert(pthread_mutex_destroy(&hashlock));
1642 }
1643
write_cb_expand(chunkdata * chd,cblock * cb,uint32_t from,uint32_t to,const uint8_t * data)1644 int write_cb_expand(chunkdata *chd,cblock *cb,uint32_t from,uint32_t to,const uint8_t *data) {
1645 if (cb->writeid>0 || from>cb->to || to<cb->from) { // can't expand
1646 return -1;
1647 }
1648 memcpy(cb->data+from,data,to-from);
1649 if (from<cb->from) {
1650 cb->from = from;
1651 }
1652 if (to>cb->to) {
1653 cb->to = to;
1654 }
1655 if (cb->to-cb->from==MFSBLOCKSIZE && cb->next==NULL && chd->waitingworker==2) {
1656 if (universal_write(chd->wakeup_fd," ",1)!=1) {
1657 syslog(LOG_ERR,"can't write to pipe !!!");
1658 }
1659 chd->waitingworker = 0;
1660 chd->wakeup_fd = -1;
1661 }
1662 return 0;
1663 }
1664
write_block(inodedata * ind,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data,uint8_t superuser)1665 int write_block(inodedata *ind,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t *data,uint8_t superuser) {
1666 cblock *cb,*ncb;
1667 chunkdata *chd;
1668 uint8_t newchunk;
1669
1670 ncb = write_cb_acquire(ind);
1671 zassert(pthread_mutex_lock(&(ind->lock)));
1672 for (chd=ind->chunks ; chd ; chd=chd->next) {
1673 if (chd->chindx == chindx) {
1674 if (superuser) {
1675 chd->superuser = 1;
1676 }
1677 for (cb=chd->datachaintail ; cb ; cb=cb->prev) {
1678 if (cb->pos==pos) {
1679 if (write_cb_expand(chd,cb,from,to,data)==0) {
1680 write_cb_release(ind,ncb);
1681 zassert(pthread_mutex_unlock(&(ind->lock)));
1682 return 0;
1683 }
1684 break;
1685 }
1686 }
1687 break;
1688 }
1689 }
1690 ncb->pos = pos;
1691 ncb->from = from;
1692 ncb->to = to;
1693 memcpy(ncb->data+from,data,to-from);
1694 if (chd==NULL) {
1695 chd = write_new_chunkdata(ind,chindx);
1696 if (superuser) {
1697 chd->superuser = 1;
1698 }
1699 newchunk = 1;
1700 } else {
1701 newchunk = 0;
1702 }
1703 ncb->prev = chd->datachaintail;
1704 ncb->next = NULL;
1705 if (chd->datachaintail!=NULL) {
1706 chd->datachaintail->next = ncb;
1707 } else {
1708 chd->datachainhead = ncb;
1709 }
1710 chd->datachaintail = ncb;
1711 if (newchunk) {
1712 write_test_chunkdata(ind);
1713 } else {
1714 if (chd->waitingworker) {
1715 if (universal_write(chd->wakeup_fd," ",1)!=1) {
1716 syslog(LOG_ERR,"can't write to pipe !!!");
1717 }
1718 chd->waitingworker = 0;
1719 chd->wakeup_fd = -1;
1720 }
1721 }
1722 zassert(pthread_mutex_unlock(&(ind->lock)));
1723 return 0;
1724 }
1725
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data,uint8_t superuser)1726 int write_data(void *vid,uint64_t offset,uint32_t size,const uint8_t *data,uint8_t superuser) {
1727 uint32_t chindx;
1728 uint16_t pos;
1729 uint32_t from;
1730 int status;
1731 inodedata *ind = (inodedata*)vid;
1732 if (ind==NULL) {
1733 return EIO;
1734 }
1735 // int64_t s,e;
1736
1737 // s = monotonic_useconds();
1738 zassert(pthread_mutex_lock(&(ind->lock)));
1739
1740 // syslog(LOG_NOTICE,"write_data: inode:%"PRIu32" offset:%"PRIu64" size:%"PRIu32,ind->inode,offset,size);
1741 status = ind->status;
1742 if (status==0) {
1743 if (offset+size>ind->maxfleng) { // move fleng
1744 ind->maxfleng = offset+size;
1745 }
1746 ind->writewaiting++;
1747 while (ind->flushwaiting>0) {
1748 zassert(pthread_cond_wait(&(ind->writecond),&(ind->lock)));
1749 }
1750 ind->writewaiting--;
1751 }
1752 zassert(pthread_mutex_unlock(&(ind->lock)));
1753 if (status!=0) {
1754 return status;
1755 }
1756
1757 chindx = offset>>MFSCHUNKBITS;
1758 pos = (offset&MFSCHUNKMASK)>>MFSBLOCKBITS;
1759 from = offset&MFSBLOCKMASK;
1760 while (size>0) {
1761 if (size>MFSBLOCKSIZE-from) {
1762 if (write_block(ind,chindx,pos,from,MFSBLOCKSIZE,data,superuser)<0) {
1763 return EIO;
1764 }
1765 size -= (MFSBLOCKSIZE-from);
1766 data += (MFSBLOCKSIZE-from);
1767 from = 0;
1768 pos++;
1769 if (pos==1024) {
1770 pos = 0;
1771 chindx++;
1772 }
1773 } else {
1774 if (write_block(ind,chindx,pos,from,from+size,data,superuser)<0) {
1775 return EIO;
1776 }
1777 size = 0;
1778 }
1779 }
1780 // e = monotonic_useconds();
1781 // syslog(LOG_NOTICE,"write_data time: %"PRId64,e-s);
1782 return 0;
1783 }
1784
write_data_new(uint32_t inode,uint64_t fleng)1785 void* write_data_new(uint32_t inode,uint64_t fleng) {
1786 inodedata* ind;
1787 ind = write_get_inodedata(inode,fleng);
1788 if (ind==NULL) {
1789 return NULL;
1790 }
1791 return ind;
1792 }
1793
write_data_do_chunk_wait(inodedata * ind)1794 static int write_data_do_chunk_wait(inodedata *ind) {
1795 int ret;
1796 chunkdata *chd;
1797 #ifdef WDEBUG
1798 int64_t s,e;
1799
1800 s = monotonic_useconds();
1801 #endif
1802 zassert(pthread_mutex_lock(&(ind->lock)));
1803 // ind->chunkwaiting++;
1804 do {
1805 chd=NULL;
1806 if (ind->status==0) {
1807 for (chd = ind->chunks ; chd!=NULL && chd->chunkready ; chd=chd->next) {}
1808 if (chd!=NULL) {
1809 #ifdef WDEBUG
1810 syslog(LOG_NOTICE,"(inode:%"PRIu32") chunk_ready: wait ...",ind->inode);
1811 #endif
1812 zassert(pthread_cond_wait(&(ind->chunkcond),&(ind->lock)));
1813 #ifdef WDEBUG
1814 syslog(LOG_NOTICE,"(inode:%"PRIu32") chunk_ready: woken up",ind->inode);
1815 #endif
1816 }
1817 }
1818 } while (ind->status==0 && chd!=NULL);
1819 // ind->chunkwaiting--;
1820 for (chd = ind->chunks ; chd!=NULL ; chd=chd->next) {
1821 chd->unbreakable = 1;
1822 }
1823 ret = ind->status;
1824 zassert(pthread_mutex_unlock(&(ind->lock)));
1825 #ifdef WDEBUG
1826 e = monotonic_useconds();
1827 syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1828 #endif
1829 return ret;
1830 }
1831
write_data_will_flush_wait(inodedata * ind)1832 static int write_data_will_flush_wait(inodedata *ind) {
1833 int ret;
1834 zassert(pthread_mutex_lock(&(ind->lock)));
1835 ret = ind->chunkscnt;
1836 zassert(pthread_mutex_unlock(&(ind->lock)));
1837 return ret;
1838 }
1839
write_data_do_flush(inodedata * ind,uint8_t releaseflag)1840 static int write_data_do_flush(inodedata *ind,uint8_t releaseflag) {
1841 int ret;
1842 chunkdata *chd;
1843 #ifdef WDEBUG
1844 int64_t s,e;
1845
1846 s = monotonic_useconds();
1847 #endif
1848 zassert(pthread_mutex_lock(&(ind->lock)));
1849 ind->flushwaiting++;
1850 while (ind->chunkscnt>0) {
1851 for (chd = ind->chunks ; chd!=NULL ; chd=chd->next) {
1852 if (chd->waitingworker) {
1853 if (universal_write(chd->wakeup_fd," ",1)!=1) {
1854 syslog(LOG_ERR,"can't write to pipe !!!");
1855 }
1856 chd->waitingworker = 0;
1857 chd->wakeup_fd = -1;
1858 }
1859 }
1860 #ifdef WDEBUG
1861 syslog(LOG_NOTICE,"(inode:%"PRIu32") flush: wait ...",ind->inode);
1862 #endif
1863 zassert(pthread_cond_wait(&(ind->flushcond),&(ind->lock)));
1864 #ifdef WDEBUG
1865 syslog(LOG_NOTICE,"(inode:%"PRIu32") flush: woken up",ind->inode);
1866 #endif
1867 }
1868 ind->flushwaiting--;
1869 if (ind->flushwaiting==0 && ind->writewaiting>0) {
1870 zassert(pthread_cond_broadcast(&(ind->writecond)));
1871 }
1872 ret = ind->status;
1873 zassert(pthread_mutex_unlock(&(ind->lock)));
1874 if (releaseflag) {
1875 write_free_inodedata(ind);
1876 }
1877 #ifdef WDEBUG
1878 e = monotonic_useconds();
1879 syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1880 #endif
1881 return ret;
1882 }
1883
write_data_flush(void * vid)1884 int write_data_flush(void *vid) {
1885 if (vid==NULL) {
1886 return EIO;
1887 }
1888 return write_data_do_flush((inodedata*)vid,0);
1889 }
1890
write_data_chunk_wait(void * vid)1891 int write_data_chunk_wait(void *vid) {
1892 if (vid==NULL) {
1893 return EIO;
1894 }
1895 return write_data_do_chunk_wait((inodedata*)vid);
1896 }
1897
write_data_inode_setmaxfleng(uint32_t inode,uint64_t maxfleng)1898 void write_data_inode_setmaxfleng(uint32_t inode,uint64_t maxfleng) {
1899 inodedata* ind;
1900 ind = write_find_inodedata(inode);
1901 if (ind) {
1902 zassert(pthread_mutex_lock(&(ind->lock)));
1903 ind->maxfleng = maxfleng;
1904 zassert(pthread_mutex_unlock(&(ind->lock)));
1905 write_free_inodedata(ind);
1906 }
1907 }
1908
write_data_inode_getmaxfleng(uint32_t inode)1909 uint64_t write_data_inode_getmaxfleng(uint32_t inode) {
1910 uint64_t maxfleng;
1911 inodedata* ind;
1912 ind = write_find_inodedata(inode);
1913 if (ind) {
1914 zassert(pthread_mutex_lock(&(ind->lock)));
1915 maxfleng = ind->maxfleng;
1916 zassert(pthread_mutex_unlock(&(ind->lock)));
1917 write_free_inodedata(ind);
1918 } else {
1919 maxfleng = 0;
1920 }
1921 return maxfleng;
1922 }
1923
write_data_getmaxfleng(void * vid)1924 uint64_t write_data_getmaxfleng(void *vid) {
1925 uint64_t maxfleng;
1926 inodedata* ind;
1927 if (vid==NULL) {
1928 return 0;
1929 }
1930 ind = (inodedata*)vid;
1931 zassert(pthread_mutex_lock(&(ind->lock)));
1932 maxfleng = ind->maxfleng;
1933 zassert(pthread_mutex_unlock(&(ind->lock)));
1934 return maxfleng;
1935 }
1936
write_data_flush_inode(uint32_t inode)1937 int write_data_flush_inode(uint32_t inode) {
1938 inodedata* ind;
1939 int ret;
1940 ind = write_find_inodedata(inode);
1941 if (ind==NULL) {
1942 return 0;
1943 }
1944 ret = write_data_do_flush(ind,1);
1945 return ret;
1946 }
1947
write_data_will_end_wait(void * vid)1948 int write_data_will_end_wait(void *vid) {
1949 if (vid!=NULL) {
1950 return write_data_will_flush_wait((inodedata*)vid);
1951 } else {
1952 return 0;
1953 }
1954 }
1955
write_data_end(void * vid)1956 int write_data_end(void *vid) {
1957 int ret;
1958 if (vid==NULL) {
1959 return EIO;
1960 }
1961 ret = write_data_do_flush((inodedata*)vid,1);
1962 return ret;
1963 }
1964