1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <sys/types.h>
26 #ifdef HAVE_WRITEV
27 #include <sys/uio.h>
28 #endif
29 #include <sys/time.h>
30 #include <unistd.h>
31 #ifndef WIN32
32 #include <poll.h>
33 #include <syslog.h>
34 #endif
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <limits.h>
40 #include <signal.h>
41 #include <pthread.h>
42 #include <inttypes.h>
43 
44 #include "massert.h"
45 #include "datapack.h"
46 #include "crc.h"
47 #include "strerr.h"
48 #include "mfsstrerr.h"
49 #include "pcqueue.h"
50 #include "sockets.h"
51 #include "conncache.h"
52 #include "csorder.h"
53 #include "csdb.h"
54 #include "delayrun.h"
55 #include "mastercomm.h"
56 #include "clocks.h"
57 #include "portable.h"
58 #include "readdata.h"
59 #include "chunkrwlock.h"
60 #include "chunksdatacache.h"
61 #include "MFSCommunication.h"
62 #ifdef MFSMOUNT
63 #include "fdcache.h"
64 #endif
65 
66 // #define WORKER_DEBUG 1
67 // #define BUFFER_DEBUG 1
68 // #define WDEBUG 1
69 
70 #ifndef EDQUOT
71 #define EDQUOT ENOSPC
72 #endif
73 
74 // for Nagle's-like algorithm
75 #define NEXT_BLOCK_DELAY 0.05
76 
77 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
78 
79 #define WORKER_IDLE_TIMEOUT 0.1
80 
81 #define WORKER_BUSY_LAST_SEND_TIMEOUT 5.0
82 #define WORKER_BUSY_WAIT_FOR_STATUS 5.0
83 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
84 
85 #define WORKER_NOP_INTERVAL 1.0
86 
87 #define MAX_SIM_CHUNKS 16
88 
89 #define SUSTAIN_WORKERS 50
90 #define HEAVYLOAD_WORKERS 150
91 #define MAX_WORKERS 250
92 
93 #define WCHASHSIZE 256
94 #define WCHASH(inode,indx) (((inode)*0xB239FB71+(indx)*193)%WCHASHSIZE)
95 
96 #define IDHASHSIZE 256
97 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
98 
99 typedef struct cblock_s {
100 	uint8_t data[MFSBLOCKSIZE];	// modified only when writeid==0
101 	uint16_t pos;		// block in chunk (0...1023) - never modified
102 	uint32_t writeid;	// 0 = not sent, >0 = block was sent (modified and accessed only when wchunk is locked)
103 	uint32_t from;		// first filled byte in data (modified only when writeid==0)
104 	uint32_t to;		// first not used byte in data (modified only when writeid==0)
105 	struct cblock_s *next,*prev;
106 } cblock;
107 
108 struct inodedata_s;
109 
110 typedef struct chunkdata_s {
111 	uint32_t chindx;
112 	uint16_t trycnt;
113 	uint8_t waitingworker;
114 	uint8_t chunkready;
115 	uint8_t unbreakable;
116 	uint8_t continueop;
117 	uint8_t superuser;
118 	int wakeup_fd;
119 	cblock *datachainhead,*datachaintail;
120 	struct inodedata_s *parent;
121 	struct chunkdata_s *next,**prev;
122 } chunkdata;
123 
124 typedef struct inodedata_s {
125 	uint32_t inode;
126 	uint64_t maxfleng;
127 	uint32_t cacheblockcount;
128 	int status;
129 	uint16_t flushwaiting;
130 	uint16_t writewaiting;
131 	uint16_t chunkwaiting;
132 	uint16_t lcnt;
133 //	uint16_t trycnt;
134 	uint16_t chunkscnt;
135 	chunkdata *chunks,**chunkstail;
136 	chunkdata *chunksnext;
137 	pthread_cond_t flushcond;	// wait for chunks==NULL (flush)
138 	pthread_cond_t writecond;	// wait for flushwaiting==0 (write)
139 	pthread_cond_t chunkcond;	// wait for status!=0 or all chunks 'chunkready==1'
140 	pthread_mutex_t lock;
141 	struct inodedata_s *next;
142 } inodedata;
143 
144 typedef struct worker_s {
145 	pthread_t thread_id;
146 } worker;
147 
148 static pthread_mutex_t fcblock;
149 static pthread_cond_t fcbcond;
150 static uint16_t fcbwaiting;
151 static cblock *cacheblocks,*freecblockshead;
152 static uint32_t freecacheblocks;
153 static uint32_t cacheblockcount;
154 
155 static double optimeout;
156 static uint32_t maxretries;
157 static uint32_t minlogretry;
158 static uint8_t erroronlostchunk;
159 static uint8_t erroronnospace;
160 
161 static inodedata **idhash;
162 
163 static pthread_mutex_t hashlock;
164 
165 #ifdef BUFFER_DEBUG
166 static pthread_t info_worker_th;
167 static uint32_t usedblocks;
168 #endif
169 
170 // static pthread_t dqueue_worker_th;
171 
172 static pthread_mutex_t workerslock;
173 static uint32_t workers_avail;
174 static uint32_t workers_total;
175 static uint32_t worker_term_waiting;
176 static pthread_cond_t worker_term_cond;
177 static pthread_attr_t worker_thattr;
178 
179 static void *jqueue; //,*dqueue;
180 
181 #ifdef BUFFER_DEBUG
write_info_worker(void * arg)182 void* write_info_worker(void *arg) {
183 	(void)arg;
184 	uint32_t cbcnt,fcbcnt,ucbcnt;
185 	uint32_t i;
186 	inodedata *ind;
187 	chunkdata *chd;
188 	cblock *cb;
189 	for (;;) {
190 		zassert(pthread_mutex_lock(&hashlock));
191 		cbcnt = 0;
192 		for (i = 0 ; i<IDHASHSIZE ; i++) {
193 			for (ind = idhash[i] ; ind ; ind = ind->next) {
194 				zassert(pthread_mutex_lock(&(ind->lock)));
195 				ucbcnt = 0;
196 				for (chd = ind->chunks ; chd!=NULL ; chd = chd->next) {
197 					for (cb = chd->datachainhead ; cb ; cb = cb->next) {
198 						ucbcnt++;
199 					}
200 				}
201 				if (ucbcnt != ind->cacheblockcount) {
202 					syslog(LOG_NOTICE,"inode: %"PRIu32" ; wrong cache block count (%"PRIu32"/%"PRIu32")",ind->inode,ucbcnt,ind->cacheblockcount);
203 				}
204 				cbcnt += ucbcnt;
205 				zassert(pthread_mutex_unlock(&(ind->lock)));
206 			}
207 		}
208 		zassert(pthread_mutex_unlock(&hashlock));
209 		zassert(pthread_mutex_lock(&fcblock));
210 		fcbcnt = 0;
211 		for (cb = freecblockshead ; cb ; cb = cb->next) {
212 			fcbcnt++;
213 		}
214 		syslog(LOG_NOTICE,"used cache blocks: %"PRIu32" ; sum of inode used blocks: %"PRIu32" ; free cache blocks: %"PRIu32" ; free cache chain blocks: %"PRIu32,usedblocks,cbcnt,freecacheblocks,fcbcnt);
215 		zassert(pthread_mutex_unlock(&fcblock));
216 		portable_usleep(500000);
217 	}
218 
219 }
220 #endif
221 
write_cb_release(inodedata * ind,cblock * cb)222 void write_cb_release (inodedata *ind,cblock *cb) {
223 	zassert(pthread_mutex_lock(&fcblock));
224 	cb->next = freecblockshead;
225 	freecblockshead = cb;
226 	freecacheblocks++;
227 	ind->cacheblockcount--;
228 	if (fcbwaiting) {
229 		zassert(pthread_cond_signal(&fcbcond));
230 	}
231 #ifdef BUFFER_DEBUG
232 	usedblocks--;
233 #endif
234 	zassert(pthread_mutex_unlock(&fcblock));
235 }
236 
write_cb_acquire(inodedata * ind)237 cblock* write_cb_acquire(inodedata *ind/*,uint8_t *waited*/) {
238 	cblock *ret;
239 	zassert(pthread_mutex_lock(&fcblock));
240 	fcbwaiting++;
241 //	*waited=0;
242 	while (freecblockshead==NULL/* || ind->cacheblockcount>(freecacheblocks/3)*/) {
243 		zassert(pthread_cond_wait(&fcbcond,&fcblock));
244 //		*waited=1;
245 	}
246 	fcbwaiting--;
247 	ret = freecblockshead;
248 	freecblockshead = ret->next;
249 	ret->pos = 0;
250 	ret->writeid = 0;
251 	ret->from = 0;
252 	ret->to = 0;
253 	ret->next = NULL;
254 	ret->prev = NULL;
255 	freecacheblocks--;
256 	ind->cacheblockcount++;
257 #ifdef BUFFER_DEBUG
258 	usedblocks++;
259 #endif
260 	zassert(pthread_mutex_unlock(&fcblock));
261 	return ret;
262 }
263 
write_cache_almost_full(void)264 uint8_t write_cache_almost_full(void) {
265 	uint8_t r;
266 	zassert(pthread_mutex_lock(&fcblock));
267 	r = (freecacheblocks < (cacheblockcount / 3))?1:0;
268 	zassert(pthread_mutex_unlock(&fcblock));
269 	return r;
270 }
271 
272 /* inode */
273 
write_find_inodedata(uint32_t inode)274 inodedata* write_find_inodedata(uint32_t inode) {
275 	uint32_t indh = IDHASH(inode);
276 	inodedata *ind;
277 	zassert(pthread_mutex_lock(&hashlock));
278 	for (ind=idhash[indh] ; ind ; ind=ind->next) {
279 		if (ind->inode == inode) {
280 			ind->lcnt++;
281 			zassert(pthread_mutex_unlock(&hashlock));
282 			return ind;
283 		}
284 	}
285 	zassert(pthread_mutex_unlock(&hashlock));
286 	return NULL;
287 }
288 
write_get_inodedata(uint32_t inode,uint64_t fleng)289 inodedata* write_get_inodedata(uint32_t inode,uint64_t fleng) {
290 	uint32_t indh = IDHASH(inode);
291 	inodedata *ind;
292 //	int pfd[2];
293 
294 	zassert(pthread_mutex_lock(&hashlock));
295 	for (ind=idhash[indh] ; ind ; ind=ind->next) {
296 		if (ind->inode == inode) {
297 			ind->lcnt++;
298 			zassert(pthread_mutex_unlock(&hashlock));
299 			return ind;
300 		}
301 	}
302 
303 	ind = malloc(sizeof(inodedata));
304 	passert(ind);
305 	ind->inode = inode;
306 	ind->cacheblockcount = 0;
307 	ind->maxfleng = fleng;
308 	ind->status = 0;
309 //	ind->trycnt = 0;
310 	ind->chunkscnt = 0;
311 	ind->chunks = NULL;
312 	ind->chunksnext = NULL;
313 	ind->chunkstail = &(ind->chunks);
314 	ind->flushwaiting = 0;
315 	ind->chunkwaiting = 0;
316 	ind->writewaiting = 0;
317 	ind->lcnt = 1;
318 	zassert(pthread_cond_init(&(ind->flushcond),NULL));
319 	zassert(pthread_cond_init(&(ind->writecond),NULL));
320 	zassert(pthread_cond_init(&(ind->chunkcond),NULL));
321 	zassert(pthread_mutex_init(&(ind->lock),NULL));
322 	ind->next = idhash[indh];
323 	idhash[indh] = ind;
324 	zassert(pthread_mutex_unlock(&hashlock));
325 	return ind;
326 }
327 
write_free_inodedata(inodedata * fid)328 void write_free_inodedata(inodedata *fid) {
329 	uint32_t indh = IDHASH(fid->inode);
330 	inodedata *ind,**indp;
331 	zassert(pthread_mutex_lock(&hashlock));
332 	indp = &(idhash[indh]);
333 	while ((ind=*indp)) {
334 		if (ind==fid) {
335 			ind->lcnt--;
336 			if (ind->lcnt==0) {
337 				*indp = ind->next;
338 				zassert(pthread_mutex_lock(&(ind->lock)));
339 				massert(ind->chunkscnt==0 && ind->flushwaiting==0 && ind->writewaiting==0,"inode structure not clean");
340 				zassert(pthread_mutex_unlock(&(ind->lock)));
341 				zassert(pthread_cond_destroy(&(ind->flushcond)));
342 				zassert(pthread_cond_destroy(&(ind->writecond)));
343 				zassert(pthread_cond_destroy(&(ind->chunkcond)));
344 				zassert(pthread_mutex_destroy(&(ind->lock)));
345 				free(ind);
346 			}
347 			zassert(pthread_mutex_unlock(&hashlock));
348 			return;
349 		}
350 		indp = &(ind->next);
351 	}
352 	zassert(pthread_mutex_unlock(&hashlock));
353 }
354 
355 void write_enqueue(chunkdata *chd);
356 
write_test_chunkdata(inodedata * ind)357 void write_test_chunkdata(inodedata *ind) {
358 	chunkdata *chd;
359 
360 	if (ind->chunkscnt<MAX_SIM_CHUNKS) {
361 		if (ind->chunksnext!=NULL) {
362 			chd = ind->chunksnext;
363 			ind->chunksnext = chd->next;
364 			ind->chunkscnt++;
365 			write_enqueue(chd);
366 		}
367 	} else {
368 		for (chd=ind->chunks ; chd!=NULL ; chd=chd->next) {
369 			if (chd->waitingworker) {
370 				if (universal_write(chd->wakeup_fd," ",1)!=1) {
371 					syslog(LOG_ERR,"can't write to pipe !!!");
372 				}
373 				chd->waitingworker = 0;
374 				chd->wakeup_fd = -1;
375 			}
376 		}
377 	}
378 }
379 
write_new_chunkdata(inodedata * ind,uint32_t chindx)380 chunkdata* write_new_chunkdata(inodedata *ind,uint32_t chindx) {
381 	chunkdata *chd;
382 
383 	chd = malloc(sizeof(chunkdata));
384 	passert(chd);
385 	chd->chindx = chindx;
386 	chd->wakeup_fd = -1;
387 	chd->datachainhead = NULL;
388 	chd->datachaintail = NULL;
389 	chd->waitingworker = 0;
390 	chd->chunkready = 0;
391 	chd->unbreakable = 0;
392 	chd->continueop = 0;
393 	chd->superuser = 0;
394 	chd->trycnt = 0;
395 	chd->parent = ind;
396 	chd->next = NULL;
397 	chd->prev = ind->chunkstail;
398 	*(ind->chunkstail) = chd;
399 	ind->chunkstail = &(chd->next);
400 	if (ind->chunksnext==NULL) {
401 		ind->chunksnext = chd;
402 	}
403 	return chd;
404 }
405 
write_free_chunkdata(chunkdata * chd)406 void write_free_chunkdata(chunkdata *chd) {
407 	*(chd->prev) = chd->next;
408 	if (chd->next) {
409 		chd->next->prev = chd->prev;
410 	} else {
411 		chd->parent->chunkstail = chd->prev;
412 	}
413 	chd->parent->chunkscnt--;
414 	write_test_chunkdata(chd->parent);
415 	free(chd);
416 }
417 
418 /* queues */
419 
write_enqueue(chunkdata * chd)420 void write_enqueue(chunkdata *chd) {
421 	queue_put(jqueue,0,0,(uint8_t*)chd,0);
422 }
423 
write_delayrun_enqueue(void * udata)424 void write_delayrun_enqueue(void *udata) {
425 	queue_put(jqueue,0,0,(uint8_t*)udata,0);
426 }
427 
write_delayed_enqueue(chunkdata * chd,uint32_t usecs)428 void write_delayed_enqueue(chunkdata *chd,uint32_t usecs) {
429 	if (usecs>0) {
430 		delay_run(write_delayrun_enqueue,chd,usecs);
431 	} else {
432 		queue_put(jqueue,0,0,(uint8_t*)chd,0);
433 	}
434 }
435 
436 /*
437 void* write_dqueue_worker(void *arg) {
438 	uint64_t t,usec;
439 	uint32_t husec,lusec,cnt;
440 	uint8_t *ind;
441 	(void)arg;
442 	for (;;) {
443 		queue_get(dqueue,&husec,&lusec,&ind,&cnt);
444 		if (ind==NULL) {
445 			return NULL;
446 		}
447 		t = monotonic_useconds();
448 		usec = husec;
449 		usec <<= 32;
450 		usec |= lusec;
451 		if (t>usec) {
452 			t -= usec;
453 			while (t>=1000000 && cnt>0) {
454 				t-=1000000;
455 				cnt--;
456 			}
457 			if (cnt>0) {
458 				if (t<1000000) {
459 					portable_usleep(1000000-t);
460 				}
461 				cnt--;
462 			}
463 		}
464 		if (cnt>0) {
465 			t = monotonic_useconds();
466 			queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)ind,cnt);
467 		} else {
468 			queue_put(jqueue,0,0,ind,0);
469 		}
470 	}
471 	return NULL;
472 }
473 */
474 
write_job_end(chunkdata * chd,int status,uint32_t delay)475 void write_job_end(chunkdata *chd,int status,uint32_t delay) {
476 	cblock *cb,*fcb;
477 	inodedata *ind = chd->parent;
478 
479 	zassert(pthread_mutex_lock(&(ind->lock)));
480 	if (status!=0) {
481 		errno = status;
482 		syslog(LOG_WARNING,"error writing file number %"PRIu32": %s",ind->inode,strerr(errno));
483 		ind->status = status;
484 //		if (ind->chunkwaiting>0) {
485 			zassert(pthread_cond_broadcast(&(ind->chunkcond)));
486 //		}
487 	}
488 	if (status==0 && delay==0) {
489 		chd->trycnt=0;	// on good write reset try counter
490 	}
491 	status = ind->status;
492 
493 	if (chd->datachainhead && status==0) {	// still have some work to do
494 		// reset write ind
495 		for (cb=chd->datachainhead ; cb ; cb=cb->next) {
496 			cb->writeid = 0;
497 		}
498 		write_delayed_enqueue(chd,delay);
499 	} else {	// no more work or error occurred
500 		// if this is an error then release all data blocks
501 		cb = chd->datachainhead;
502 		while (cb) {
503 			fcb = cb;
504 			cb = cb->next;
505 			write_cb_release(ind,fcb);
506 		}
507 		if (ind->flushwaiting>0) {
508 			zassert(pthread_cond_broadcast(&(ind->flushcond)));
509 		}
510 		write_free_chunkdata(chd);
511 	}
512 	zassert(pthread_mutex_unlock(&(ind->lock)));
513 }
514 
515 void* write_worker(void *arg);
516 
517 #ifndef WDEBUG
518 static uint32_t lastnotify = 0;
519 #endif
520 
write_data_spawn_worker(void)521 static inline void write_data_spawn_worker(void) {
522 #ifndef WIN32
523 	sigset_t oldset;
524 	sigset_t newset;
525 #endif
526 	worker *w;
527 	int res;
528 
529 	w = malloc(sizeof(worker));
530 	if (w==NULL) {
531 		return;
532 	}
533 #ifndef WIN32
534 	sigemptyset(&newset);
535 	sigaddset(&newset, SIGTERM);
536 	sigaddset(&newset, SIGINT);
537 	sigaddset(&newset, SIGHUP);
538 	sigaddset(&newset, SIGQUIT);
539 	zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
540 #endif
541 	res = pthread_create(&(w->thread_id),&worker_thattr,write_worker,w);
542 #ifndef WIN32
543 	zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
544 #endif
545 	if (res<0) {
546 		return;
547 	}
548 	workers_avail++;
549 	workers_total++;
550 #ifdef WDEBUG
551 	fprintf(stderr,"spawn write worker (total: %"PRIu32")\n",workers_total);
552 #else
553 	if (workers_total%10==0 && workers_total!=lastnotify) {
554 		syslog(LOG_INFO,"write workers: %"PRIu32"+\n",workers_total);
555 		lastnotify = workers_total;
556 	}
557 #endif
558 }
559 
write_data_close_worker(worker * w)560 static inline void write_data_close_worker(worker *w) {
561 	workers_avail--;
562 	workers_total--;
563 	if (workers_total==0 && worker_term_waiting) {
564 		zassert(pthread_cond_signal(&worker_term_cond));
565 		worker_term_waiting--;
566 	}
567 	pthread_detach(w->thread_id);
568 	free(w);
569 #ifdef WDEBUG
570 	fprintf(stderr,"close write worker (total: %"PRIu32")\n",workers_total);
571 #else
572 	if (workers_total%10==0 && workers_total!=lastnotify) {
573 		syslog(LOG_INFO,"write workers: %"PRIu32"-\n",workers_total);
574 		lastnotify = workers_total;
575 	}
576 #endif
577 }
578 
write_prepare_ip(char ipstr[16],uint32_t ip)579 static inline void write_prepare_ip (char ipstr[16],uint32_t ip) {
580 	if (ipstr[0]==0) {
581 		snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
582 		ipstr[15]=0;
583 	}
584 }
585 
586 /* main working thread */
write_worker(void * arg)587 void* write_worker(void *arg) {
588 	uint32_t z1,z2,z3;
589 	uint8_t *data;
590 	int fd;
591 	int i;
592 	struct pollfd pfd[2];
593 	uint32_t sent,rcvd;
594 	uint32_t hdrtosend;
595 	uint8_t sending_mode;
596 	uint8_t recvbuff[21];
597 	uint8_t sendbuff[32];
598 #ifdef HAVE_WRITEV
599 	struct iovec siov[2];
600 #endif
601 	uint8_t pipebuff[1024];
602 	int pipefd[2];
603 	uint8_t *wptr;
604 	const uint8_t *rptr;
605 
606 	uint32_t reccmd;
607 	uint32_t recleng;
608 	uint64_t recchunkid;
609 	uint32_t recwriteid;
610 	uint8_t recstatus;
611 
612 #ifdef WORKER_DEBUG
613 	uint32_t partialblocks;
614 	uint32_t bytessent;
615 	char debugchain[200];
616 	uint32_t cl;
617 #endif
618 
619 	uint8_t *cpw;
620 	cspri chain[100];
621 	uint32_t chainminver;
622 	uint16_t chainelements;
623 	uint8_t cschain[6*99];
624 	uint32_t cschainsize;
625 
626 	uint32_t inode;
627 	uint32_t chindx;
628 	uint32_t ip;
629 	uint16_t port;
630 	uint32_t srcip;
631 	uint64_t mfleng;
632 	uint64_t maxwroffset;
633 	uint64_t chunkid;
634 	uint32_t version;
635 	uint32_t nextwriteid;
636 	const uint8_t *csdata;
637 	uint32_t csdatasize;
638 	uint8_t csdataver;
639 	uint8_t westatus;
640 	uint8_t wrstatus;
641 	uint8_t chunkready;
642 	uint8_t unbreakable;
643 	uint8_t chunkopflags;
644 	int status;
645 	char csstrip[16];
646 	uint8_t waitforstatus;
647 	uint8_t donotstayidle;
648 	double opbegin;
649 	double start,now,lastrcvd,lastblock,lastsent;
650 	double workingtime,lrdiff,lbdiff;
651 	uint32_t wtotal;
652 	uint8_t cnt;
653 	uint8_t firsttime = 1;
654 	worker *w = (worker*)arg;
655 
656 	uint8_t valid_offsets;
657 	uint64_t min_offset;
658 	uint64_t max_offset;
659 
660 	inodedata *ind;
661 	chunkdata *chd;
662 	cblock *cb,*ncb,*rcb;
663 //	inodedata *ind;
664 
665 	chainelements = 0;
666 	chindx = 0;
667 
668 	if (pipe(pipefd)<0) {
669 		syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
670 		return NULL;
671 	}
672 
673 	for (;;) {
674 		for (i=0 ; i<chainelements ; i++) {
675 			csdb_writedec(chain[i].ip,chain[i].port);
676 		}
677 		chainelements=0;
678 
679 		if (firsttime==0) {
680 			zassert(pthread_mutex_lock(&workerslock));
681 			workers_avail++;
682 			if (workers_avail > SUSTAIN_WORKERS) {
683 //				fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
684 				write_data_close_worker(w);
685 				zassert(pthread_mutex_unlock(&workerslock));
686 				close_pipe(pipefd);
687 				return NULL;
688 			}
689 			zassert(pthread_mutex_unlock(&workerslock));
690 		}
691 		firsttime = 0;
692 
693 		// get next job
694 		queue_get(jqueue,&z1,&z2,&data,&z3);
695 
696 		zassert(pthread_mutex_lock(&workerslock));
697 
698 		if (data==NULL) {
699 			write_data_close_worker(w);
700 			zassert(pthread_mutex_unlock(&workerslock));
701 			close_pipe(pipefd);
702 			return NULL;
703 		}
704 
705 		workers_avail--;
706 		if (workers_avail==0 && workers_total<MAX_WORKERS) {
707 			write_data_spawn_worker();
708 		}
709 		zassert(pthread_mutex_unlock(&workerslock));
710 
711 		chd = (chunkdata*)data;
712 		ind = chd->parent;
713 
714 		zassert(pthread_mutex_lock(&(ind->lock)));
715 
716 		if (chd->datachainhead) {
717 			chindx = chd->chindx;
718 			status = ind->status;
719 		} else {
720 			syslog(LOG_WARNING,"writeworker got inode with no data to write !!!");
721 			status = EINVAL;	// this should never happen, so status is not important - just anything
722 		}
723 		chunkready = chd->chunkready;
724 		chunkopflags = (chd->continueop?CHUNKOPFLAG_CONTINUEOP:0) | (chd->superuser?CHUNKOPFLAG_CANUSERESERVESPACE:0);
725 
726 		zassert(pthread_mutex_unlock(&(ind->lock)));
727 
728 		if (status) {
729 			write_job_end(chd,status,0);
730 			continue;
731 		}
732 
733 		inode = ind->inode;
734 		chunkrwlock_wlock(inode,chindx);
735 
736 		opbegin = 0; // make static code analysers happy
737 		if (optimeout>0.0) {
738 			opbegin = monotonic_seconds();
739 		}
740 
741 		valid_offsets = 0;
742 		min_offset = 0;
743 		max_offset = 0;
744 
745 		// syslog(LOG_NOTICE,"file: %"PRIu32", index: %"PRIu16" - debug1",inode,chindx);
746 		// get chunk data from master
747 //		start = monotonic_seconds();
748 		wrstatus = fs_writechunk(inode,chindx,chunkopflags,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
749 
750 // rdstatus potential results:
751 // MFS_ERROR_NOCHUNK - internal error (can't be repaired)
752 // MFS_ERROR_ENOENT - internal error (wrong inode - can't be repaired)
753 // MFS_ERROR_EPERM - internal error (wrong inode - can't be repaired)
754 // MFS_ERROR_INDEXTOOBIG - requested file position is too big
755 // MFS_ERROR_CHUNKLOST - according to master chunk is definitelly lost (all chunkservers are connected and chunk is not there)
756 // MFS_ERROR_QUOTA - disk quota exceeded (quit immediately - no retry)
757 // MFS_ERROR_NOSPACE - no space on disk
758 // MFS_ERROR_IO (for future use)
759 // MFS_ERROR_NOCHUNKSERVERS - can't create new chunk - no space available, but maybe only temporarily
760 // MFS_ERROR_CSNOTPRESENT - can't modify chunk - chunk is lost, but maybe only temporarily
761 
762 		if (wrstatus!=MFS_STATUS_OK) {
763 			if (wrstatus!=MFS_ERROR_LOCKED && wrstatus!=MFS_ERROR_EAGAIN) {
764 				if (wrstatus==MFS_ERROR_ENOENT || wrstatus==MFS_ERROR_EPERM || wrstatus==MFS_ERROR_NOCHUNK) {
765 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
766 					write_job_end(chd,EBADF,0);
767 				} else if (wrstatus==MFS_ERROR_INDEXTOOBIG) {
768 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
769 					write_job_end(chd,EINVAL,0);
770 				} else if (wrstatus==MFS_ERROR_QUOTA) {
771 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
772 #ifdef EDQUOT
773 					write_job_end(chd,EDQUOT,0);
774 #else
775 					write_job_end(chd,ENOSPC,0);
776 #endif
777 				} else if (wrstatus==MFS_ERROR_NOSPACE && erroronnospace) {
778 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
779 					write_job_end(chd,ENOSPC,0);
780 				} else if (wrstatus==MFS_ERROR_CHUNKLOST && erroronlostchunk) {
781 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
782 					write_job_end(chd,ENXIO,0);
783 				} else if (wrstatus==MFS_ERROR_IO) {
784 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
785 					write_job_end(chd,EIO,0);
786 				} else if (wrstatus==MFS_ERROR_EROFS) {
787 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
788 					write_job_end(chd,EROFS,0);
789 				} else {
790 					if (chd->trycnt >= minlogretry) {
791 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",inode,chindx,mfsstrerr(wrstatus));
792 					}
793 					chd->trycnt++;
794 					if (chd->trycnt>=maxretries) {
795 						if (wrstatus==MFS_ERROR_NOCHUNKSERVERS || wrstatus==MFS_ERROR_NOSPACE) {
796 							write_job_end(chd,ENOSPC,0);
797 						} else if (wrstatus==MFS_ERROR_CSNOTPRESENT || wrstatus==MFS_ERROR_CHUNKLOST) {
798 							write_job_end(chd,ENXIO,0);
799 						} else {
800 							write_job_end(chd,EIO,0);
801 						}
802 					} else {
803 						write_delayed_enqueue(chd,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
804 					}
805 				}
806 			} else {
807 				if (chd->trycnt<=2) {
808 					chd->trycnt++;
809 					write_delayed_enqueue(chd,1000);
810 				} else if (chd->trycnt<=6) {
811 					chd->trycnt++;
812 					write_delayed_enqueue(chd,100000);
813 				} else {
814 					write_delayed_enqueue(chd,500000);
815 				}
816 			}
817 			chunkrwlock_wunlock(inode,chindx);
818 			continue;	// get next job
819 		}
820 
821 		chunksdatacache_insert(inode,chindx,chunkid,version,csdataver,csdata,csdatasize);
822 #ifdef MFSMOUNT
823 		fdcache_invalidate(inode);
824 #endif
825 
826 //		now = monotonic_seconds();
827 //		fprintf(stderr,"fs_writechunk time: %.3lf\n",(now-start));
828 
829 		if (csdata!=NULL && csdatasize>0) {
830 			chainelements = csorder_sort(chain,csdataver,csdata,csdatasize,1);
831 		} else {
832 			chainelements = 0;
833 		}
834 
835 		if (csdata==NULL || csdatasize==0 || chainelements==0) {
836 			chainelements = 0;
837 			zassert(pthread_mutex_lock(&(ind->lock)));
838 			chd->trycnt+=6;
839 			unbreakable = chd->unbreakable;
840 			if (chd->trycnt>=maxretries) {
841 				unbreakable = 0; // unlock chunk on error
842 			}
843 			chd->continueop = unbreakable;
844 			zassert(pthread_mutex_unlock(&(ind->lock)));
845 			if (unbreakable==0) {
846 				fs_writeend(chunkid,inode,chindx,0,0);
847 			}
848 			if (chd->trycnt >= minlogretry) {
849 				syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - there are no valid copies",inode,chindx,chunkid,version);
850 			}
851 			if (chd->trycnt>=maxretries) {
852 				write_job_end(chd,ENXIO,0);
853 			} else {
854 				write_delayed_enqueue(chd,60000000);
855 			}
856 			chunkrwlock_wunlock(inode,chindx);
857 			continue;
858 		}
859 		ip = chain[0].ip;
860 		port = chain[0].port;
861 		chainminver = chain[0].version;
862 		csdb_writeinc(ip,port);
863 		csstrip[0] = 0;
864 		cpw = cschain;
865 		cschainsize = 0;
866 		for (i=1 ; i<chainelements ; i++) {
867 			csdb_writeinc(chain[i].ip,chain[i].port);
868 			if (chain[i].version < chainminver) {
869 				chainminver = chain[i].version;
870 			}
871 			put32bit(&cpw,chain[i].ip);
872 			put16bit(&cpw,chain[i].port);
873 			cschainsize += 6;
874 		}
875 #if 0
876 		cp = csdata;
877 		cpe = csdata+csdatasize;
878 		chainminver = 0xFFFFFFFF;
879 		cpw = cschain;
880 		cschainsize = 0;
881 		while (cp<cpe && chainelements<100) {
882 			tmpip = get32bit(&cp);
883 			tmpport = get16bit(&cp);
884 			if (csdataver>0) {
885 				tmpver = get32bit(&cp);
886 			} else {
887 				tmpver = 0;
888 			}
889 			if (csdataver>1) {
890 				tmplabelmask = get32bit(&cp);
891 			} else {
892 				tmplabelmask = 0;
893 			}
894 			chainip[chainelements] = tmpip;
895 			chainport[chainelements] = tmpport;
896 			csdb_writeinc(tmpip,tmpport);
897 			if (tmpver<chainminver) {
898 				chainminver = tmpver;
899 			}
900 			if (chainelements==0) {
901 				ip = tmpip;
902 				port = tmpport;
903 			} else {
904 				put32bit(&cpw,tmpip);
905 				put16bit(&cpw,tmpport);
906 				cschainsize += 6;
907 			}
908 			chainelements++;
909 		}
910 
911 		if (cp<cpe) {
912 			fs_writeend(chunkid,inode,0,0);
913 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - there are too many copies",inode,chindx,chunkid,version);
914 			chd->trycnt+=6;
915 			if (chd->trycnt>=maxretries) {
916 				write_job_end(chd,ENXIO,0);
917 			} else {
918 				write_delayed_enqueue(chd,60000000);
919 			}
920 			continue;
921 		}
922 #endif
923 //		chain = csdata;
924 //		ip = get32bit(&chain);
925 //		port = get16bit(&chain);
926 //		chainsize = csdatasize-6;
927 
928 		start = monotonic_seconds();
929 
930 /*
931 		if (csdatasize>CSDATARESERVE) {
932 			csdatasize = CSDATARESERVE;
933 		}
934 		memcpy(wrec->csdata,csdata,csdatasize);
935 		wrec->csdatasize=csdatasize;
936 		while (csdatasize>=6) {
937 			tmpip = get32bit(&csdata);
938 			tmpport = get16bit(&csdata);
939 			csdatasize-=6;
940 			csdb_writeinc(tmpip,tmpport);
941 		}
942 */
943 
944 		// make connection to cs
945 		srcip = fs_getsrcip();
946 		fd = conncache_get(ip,port);
947 		if (fd<0) {
948 			uint32_t connmaxtry;
949 			zassert(pthread_mutex_lock(&(ind->lock)));
950 			connmaxtry = (chd->trycnt*2)+2;
951 			if (connmaxtry>10) {
952 				connmaxtry = 10;
953 			}
954 			zassert(pthread_mutex_unlock(&(ind->lock)));
955 			cnt=0;
956 			while (cnt<connmaxtry) {
957 				fd = tcpsocket();
958 				if (fd<0) {
959 					syslog(LOG_WARNING,"writeworker: can't create tcp socket: %s",strerr(errno));
960 					break;
961 				}
962 				if (srcip) {
963 					if (tcpnumbind(fd,srcip,0)<0) {
964 						syslog(LOG_WARNING,"writeworker: can't bind socket to given ip: %s",strerr(errno));
965 						tcpclose(fd);
966 						fd=-1;
967 						break;
968 					}
969 				}
970 				if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
971 					cnt++;
972 					if (cnt>=connmaxtry) {
973 						int err = errno;
974 						zassert(pthread_mutex_lock(&(ind->lock)));
975 						if (chd->trycnt >= minlogretry) {
976 							write_prepare_ip(csstrip,ip);
977 							syslog(LOG_WARNING,"writeworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(err));
978 						}
979 						zassert(pthread_mutex_unlock(&(ind->lock)));
980 					}
981 					close(fd);
982 					fd=-1;
983 				} else {
984 					uint32_t mip,pip;
985 					uint16_t mport,pport;
986 					tcpgetpeer(fd,&pip,&pport);
987 					tcpgetmyaddr(fd,&mip,&mport);
988 #ifdef WDEBUG
989 					fprintf(stderr,"connection ok (%"PRIX32":%"PRIu16"->%"PRIX32":%"PRIu16")\n",mip,mport,pip,pport);
990 #endif
991 					cnt=connmaxtry;
992 				}
993 			}
994 		}
995 		if (fd<0) {
996 			zassert(pthread_mutex_lock(&(ind->lock)));
997 			chd->trycnt++;
998 			unbreakable = chd->unbreakable;
999 			if (chd->trycnt>=maxretries) {
1000 				unbreakable = 0; // unlock chunk on error
1001 			}
1002 			chd->continueop = unbreakable;
1003 			zassert(pthread_mutex_unlock(&(ind->lock)));
1004 			if (unbreakable==0) {
1005 				fs_writeend(chunkid,inode,chindx,0,0);
1006 			}
1007 			if (chd->trycnt>=maxretries) {
1008 				write_job_end(chd,EIO,0);
1009 			} else {
1010 				write_delayed_enqueue(chd,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
1011 			}
1012 			chunkrwlock_wunlock(inode,chindx);
1013 			continue;
1014 		}
1015 		if (tcpnodelay(fd)<0) {
1016 			syslog(LOG_WARNING,"writeworker: can't set TCP_NODELAY: %s",strerr(errno));
1017 		}
1018 
1019 		if (chunkready==0) {
1020 			zassert(pthread_mutex_lock(&(ind->lock)));
1021 			if (chd->chunkready==0) {
1022 				chd->chunkready = 1;
1023 //				if (ind->chunkwaiting>0) {
1024 					zassert(pthread_cond_broadcast(&(ind->chunkcond)));
1025 //				}
1026 			}
1027 			zassert(pthread_mutex_unlock(&(ind->lock)));
1028 		}
1029 
1030 #ifdef WORKER_DEBUG
1031 		partialblocks=0;
1032 		bytessent=0;
1033 #endif
1034 		nextwriteid=1;
1035 
1036 		pfd[0].fd = fd;
1037 		pfd[1].fd = pipefd[0];
1038 		rcvd = 0;
1039 		sent = 0;
1040 		waitforstatus=1;
1041 		wptr = sendbuff;
1042 
1043 		put32bit(&wptr,CLTOCS_WRITE);
1044 		if (chainminver>=VERSION2INT(1,7,32)) {
1045 			put32bit(&wptr,13+cschainsize);
1046 			put8bit(&wptr,1);
1047 			hdrtosend = 21;
1048 		} else {
1049 			put32bit(&wptr,12+cschainsize);
1050 			hdrtosend = 20;
1051 		}
1052 
1053 		put64bit(&wptr,chunkid);
1054 		put32bit(&wptr,version);
1055 		sending_mode = 1;
1056 // debug:	syslog(LOG_NOTICE,"writeworker: init packet prepared");
1057 		cb = NULL;
1058 
1059 		status = 0;
1060 		wrstatus = MFS_STATUS_OK;
1061 
1062 		lastrcvd = 0.0;
1063 		lastsent = 0.0;
1064 		lastblock = 0.0;
1065 
1066 		donotstayidle = 0;
1067 //		firstloop = 1;
1068 
1069 		do {
1070 			now = monotonic_seconds();
1071 			zassert(pthread_mutex_lock(&workerslock));
1072 			wtotal = workers_total;
1073 			zassert(pthread_mutex_unlock(&workerslock));
1074 
1075 			if (optimeout>0.0 && now - opbegin > optimeout) {
1076 				status = EIO;
1077 				break;
1078 			}
1079 			zassert(pthread_mutex_lock(&(ind->lock)));
1080 
1081 //			if (ind->status!=0) {
1082 //				zassert(pthread_mutex_unlock(&glock));
1083 //				break;
1084 //			}
1085 
1086 			if (lastrcvd==0.0) {
1087 				lastrcvd = now;
1088 			} else {
1089 				lrdiff = now - lastrcvd;
1090 				if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
1091 					if (chd->trycnt >= minlogretry) {
1092 						write_prepare_ip(csstrip,ip);
1093 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was timed out (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1094 					}
1095 					zassert(pthread_mutex_unlock(&(ind->lock)));
1096 					break;
1097 				}
1098 			}
1099 			if (lastblock==0.0) {
1100 				lbdiff = NEXT_BLOCK_DELAY; // first block should be send immediately
1101 			} else {
1102 				lbdiff = now - lastblock;
1103 			}
1104 			workingtime = now - start;
1105 
1106 			chd->waitingworker=1;
1107 			chd->wakeup_fd = pipefd[1];
1108 
1109 			if (sending_mode==0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+((wtotal>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT) && waitforstatus<64) {
1110 				if (cb==NULL) {
1111 					ncb = chd->datachainhead;
1112 				} else {
1113 					ncb = cb->next;
1114 				}
1115 				if (ncb) {
1116 					if (ncb->to-ncb->from==MFSBLOCKSIZE || lbdiff>=NEXT_BLOCK_DELAY || ncb->next!=NULL || ind->flushwaiting) {
1117 						cb = ncb;
1118 						sending_mode = 2;
1119 					} else {
1120 						chd->waitingworker = 2; // wait for block expand
1121 						chd->wakeup_fd = pipefd[1];
1122 					}
1123 				}
1124 				if (sending_mode==2) {
1125 					uint64_t offset_from;
1126 					uint64_t offset_to;
1127 
1128 					cb->writeid = nextwriteid++;
1129 // debug:				syslog(LOG_NOTICE,"writeworker: data packet prepared (writeid:%"PRIu32",pos:%"PRIu16")",cb->writeid,cb->pos);
1130 					waitforstatus++;
1131 					wptr = sendbuff;
1132 					put32bit(&wptr,CLTOCS_WRITE_DATA);
1133 					put32bit(&wptr,24+(cb->to-cb->from));
1134 					put64bit(&wptr,chunkid);
1135 					put32bit(&wptr,cb->writeid);
1136 					put16bit(&wptr,cb->pos);
1137 					put16bit(&wptr,cb->from);
1138 					put32bit(&wptr,cb->to-cb->from);
1139 					put32bit(&wptr,mycrc32(0,cb->data+cb->from,cb->to-cb->from));
1140 #ifdef WORKER_DEBUG
1141 					if (cb->to-cb->from<MFSBLOCKSIZE) {
1142 						partialblocks++;
1143 					}
1144 					bytessent+=(cb->to-cb->from);
1145 #endif
1146 					sent = 0;
1147 					lastblock = now;
1148 					lastsent = now;
1149 					offset_from = chindx;
1150 					offset_from <<= MFSCHUNKBITS;
1151 					offset_from += cb->pos*MFSBLOCKSIZE;
1152 					offset_to = offset_from;
1153 					offset_from += cb->from;
1154 					offset_to += cb->to;
1155 					if (valid_offsets) {
1156 						if (offset_from < min_offset) {
1157 							min_offset = offset_from;
1158 						}
1159 						if (offset_to > max_offset) {
1160 							max_offset = offset_to;
1161 						}
1162 					} else {
1163 						min_offset = offset_from;
1164 						max_offset = offset_to;
1165 						valid_offsets = 1;
1166 					}
1167 				} else if (lastsent+WORKER_NOP_INTERVAL<now && chainminver>=VERSION2INT(1,7,32)) {
1168 					wptr = sendbuff;
1169 					put32bit(&wptr,ANTOAN_NOP);
1170 					put32bit(&wptr,0);
1171 					sent = 0;
1172 					sending_mode = 3;
1173 					lastsent = now;
1174 				}
1175 			}
1176 
1177 #ifdef WORKER_DEBUG
1178 			fprintf(stderr,"workerloop: waitforstatus:%u workingtime:%.6lf workers_total:%u lbdiff:%.6lf donotstayidle:%u\n",waitforstatus,workingtime,wtotal,lbdiff,donotstayidle);
1179 #endif
1180 			if (waitforstatus>0) {
1181 				if (workingtime>WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((wtotal>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) { // timeout
1182 					chd->waitingworker = 0;
1183 					chd->wakeup_fd = -1;
1184 					zassert(pthread_mutex_unlock(&(ind->lock)));
1185 					break;
1186 				}
1187 			} else {
1188 				if (lbdiff>=WORKER_IDLE_TIMEOUT || donotstayidle || wtotal>HEAVYLOAD_WORKERS) {
1189 					chd->waitingworker = 0;
1190 					chd->wakeup_fd = -1;
1191 					zassert(pthread_mutex_unlock(&(ind->lock)));
1192 					break;
1193 				}
1194 			}
1195 
1196 
1197 			zassert(pthread_mutex_unlock(&(ind->lock)));
1198 
1199 			switch (sending_mode) {
1200 				case 1:
1201 					if (sent<hdrtosend) {
1202 #ifdef HAVE_WRITEV
1203 						if (cschainsize>0) {
1204 							siov[0].iov_base = (void*)(sendbuff+sent);
1205 							siov[0].iov_len = hdrtosend-sent;
1206 							siov[1].iov_base = (void*)cschain;	// discard const (safe - because it's used in writev)
1207 							siov[1].iov_len = cschainsize;
1208 							i = writev(fd,siov,2);
1209 						} else {
1210 #endif
1211 							i = universal_write(fd,sendbuff+sent,hdrtosend-sent);
1212 #ifdef HAVE_WRITEV
1213 						}
1214 #endif
1215 					} else {
1216 						i = universal_write(fd,cschain+(sent-hdrtosend),cschainsize-(sent-hdrtosend));
1217 					}
1218 					if (i>=0) {
1219 						sent+=i;
1220 						if (sent==hdrtosend+cschainsize) {
1221 							sending_mode = 0;
1222 						}
1223 					}
1224 					break;
1225 				case 2:
1226 					if (sent<32) {
1227 #ifdef HAVE_WRITEV
1228 						siov[0].iov_base = (void*)(sendbuff+sent);
1229 						siov[0].iov_len = 32-sent;
1230 						siov[1].iov_base = (void*)(cb->data+cb->from);
1231 						siov[1].iov_len = cb->to-cb->from;
1232 						i = writev(fd,siov,2);
1233 #else
1234 						i = universal_write(fd,sendbuff+sent,32-sent);
1235 #endif
1236 					} else {
1237 						i = universal_write(fd,cb->data+cb->from+(sent-32),cb->to-cb->from-(sent-32));
1238 					}
1239 					if (i>=0) {
1240 						sent+=i;
1241 						if (sent==32+cb->to-cb->from) {
1242 							sending_mode = 0;
1243 						}
1244 					}
1245 					break;
1246 				case 3:
1247 					i = universal_write(fd,sendbuff+sent,8-sent);
1248 					if (i>=0) {
1249 						sent+=i;
1250 						if (sent==8) {
1251 							sending_mode = 0;
1252 						}
1253 					}
1254 					break;
1255 				default:
1256 					i=0;
1257 			}
1258 
1259 			if (i<0) {
1260 				if (ERRNO_ERROR && errno!=EINTR) {
1261 					if (chd->trycnt >= minlogretry) {
1262 						write_prepare_ip(csstrip,ip);
1263 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: write to (%s:%"PRIu16") error: %s / NEGWRITE (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,chd->trycnt+1);
1264 					}
1265 					status=EIO;
1266 					break;
1267 				}
1268 			}
1269 
1270 			pfd[0].events = POLLIN | (sending_mode?POLLOUT:0);
1271 			pfd[0].revents = 0;
1272 			pfd[1].events = POLLIN;
1273 			pfd[1].revents = 0;
1274 			if (poll(pfd,2,100)<0) { /* correct timeout - in msec */
1275 				if (errno!=EINTR) {
1276 					if (chd->trycnt >= minlogretry) {
1277 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: poll error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,strerr(errno),waitforstatus,chd->trycnt+1);
1278 					}
1279 					status=EIO;
1280 					break;
1281 				}
1282 			}
1283 			zassert(pthread_mutex_lock(&(ind->lock)));	// make helgrind happy
1284 			chd->waitingworker = 0;
1285 			chd->wakeup_fd = -1;
1286 			donotstayidle = (ind->flushwaiting>0 || ind->status!=0 || ind->chunkscnt>=MAX_SIM_CHUNKS)?1:0;
1287 			zassert(pthread_mutex_unlock(&(ind->lock)));	// make helgrind happy
1288 			if (pfd[1].revents&POLLIN) {	// used just to break poll - so just read all data from pipe to empty it
1289 				i = universal_read(pipefd[0],pipebuff,1024);
1290 				if (i<0) { // mainly to make happy static code analyzers
1291 					if (chd->trycnt >= minlogretry) {
1292 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: read pipe error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,strerr(errno),waitforstatus,chd->trycnt+1);
1293 					}
1294 				}
1295 			}
1296 			if (pfd[0].revents&POLLHUP) {
1297 				if (chd->trycnt >= minlogretry) {
1298 					write_prepare_ip(csstrip,ip);
1299 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1300 				}
1301 				status=EIO;
1302 				break;
1303 			}
1304 			if (pfd[0].revents&POLLERR) {
1305 				if (chd->trycnt >= minlogretry) {
1306 					write_prepare_ip(csstrip,ip);
1307 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") got error status / POLLERR (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1308 				}
1309 				status=EIO;
1310 				break;
1311 			}
1312 			if (pfd[0].revents&POLLIN) {
1313 				i = universal_read(fd,recvbuff+rcvd,21-rcvd);
1314 				if (i==0) { 	// connection reset by peer or read error
1315 					if (chd->trycnt >= minlogretry) {
1316 						write_prepare_ip(csstrip,ip);
1317 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,waitforstatus,chd->trycnt+1);
1318 					}
1319 					status=EIO;
1320 					break;
1321 				}
1322 				if (i<0) {
1323 					if (errno!=EINTR) {
1324 						if (chd->trycnt >= minlogretry) {
1325 							write_prepare_ip(csstrip,ip);
1326 							syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %016"PRIX64", version: %"PRIu32" - writeworker: read from (%s:%"PRIu16") error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,chd->trycnt+1);
1327 						}
1328 						status=EIO;
1329 						break;
1330 					} else {
1331 						i=0;
1332 					}
1333 				}
1334 				lastrcvd = monotonic_seconds();
1335 				rcvd+=i;
1336 				// do not accept ANTOAN_UNKNOWN_COMMAND and ANTOAN_BAD_COMMAND_SIZE here - only ANTOAN_NOP
1337 				if (rcvd>=8 && recvbuff[7]==0 && recvbuff[6]==0 && recvbuff[5]==0 && recvbuff[4]==0 && recvbuff[3]==0 && recvbuff[2]==0 && recvbuff[1]==0 && recvbuff[0]==0) {	// ANTOAN_NOP packet received - skip it
1338 					if (rcvd>8) {
1339 						memmove(recvbuff,recvbuff+8,rcvd-8);
1340 						rcvd-=8;
1341 					}
1342 				}
1343 				if (rcvd==21) {
1344 					rptr = recvbuff;
1345 					reccmd = get32bit(&rptr);
1346 					recleng = get32bit(&rptr);
1347 					recchunkid = get64bit(&rptr);
1348 					recwriteid = get32bit(&rptr);
1349 					recstatus = get8bit(&rptr);
1350 					if (reccmd!=CSTOCL_WRITE_STATUS || recleng!=13) {
1351 						syslog(LOG_WARNING,"writeworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32")",reccmd,recleng);
1352 						status=EIO;
1353 						break;
1354 					}
1355 					if (recchunkid!=chunkid) {
1356 						syslog(LOG_WARNING,"writeworker: got unexpected packet (expected chunkdid:%016"PRIX64",packet chunkid:%016"PRIX64")",chunkid,recchunkid);
1357 						status=EIO;
1358 						break;
1359 					}
1360 					if (recstatus!=MFS_STATUS_OK) {
1361 						if (chd->trycnt >= minlogretry) {
1362 							syslog(LOG_WARNING,"writeworker: write error: %s",mfsstrerr(recstatus));
1363 						}
1364 						wrstatus=recstatus;
1365 						break;
1366 					}
1367 // debug:				syslog(LOG_NOTICE,"writeworker: received status ok for writeid:%"PRIu32,recwriteid);
1368 					if (recwriteid>0) {
1369 						zassert(pthread_mutex_lock(&(ind->lock)));
1370 						for (rcb = chd->datachainhead ; rcb && rcb->writeid!=recwriteid ; rcb=rcb->next) {}
1371 						if (rcb==NULL) {
1372 							syslog(LOG_WARNING,"writeworker: got unexpected status (writeid:%"PRIu32")",recwriteid);
1373 							zassert(pthread_mutex_unlock(&(ind->lock)));
1374 							status=EIO;
1375 							break;
1376 						}
1377 						if (rcb==cb) {	// current block
1378 // debug:						syslog(LOG_NOTICE,"writeworker: received status for current block");
1379 							if (sending_mode==2) {	// got status ok before all data had been sent - error
1380 								syslog(LOG_WARNING,"writeworker: got status OK before all data have been sent");
1381 								zassert(pthread_mutex_unlock(&(ind->lock)));
1382 								status=EIO;
1383 								break;
1384 							} else {
1385 								cb = NULL;
1386 							}
1387 						}
1388 						if (rcb->prev) {
1389 							rcb->prev->next = rcb->next;
1390 						} else {
1391 							chd->datachainhead = rcb->next;
1392 						}
1393 						if (rcb->next) {
1394 							rcb->next->prev = rcb->prev;
1395 						} else {
1396 							chd->datachaintail = rcb->prev;
1397 						}
1398 						maxwroffset = (((uint64_t)(chindx))<<MFSCHUNKBITS)+(((uint32_t)(rcb->pos))<<MFSBLOCKBITS)+rcb->to;
1399 						if (maxwroffset>mfleng) {
1400 							mfleng=maxwroffset;
1401 						}
1402 						write_cb_release(ind,rcb);
1403 						zassert(pthread_mutex_unlock(&(ind->lock)));
1404 					}
1405 					waitforstatus--;
1406 					rcvd=0;
1407 				}
1408 			}
1409 		} while (1);
1410 
1411 		if (waitforstatus==0 && chainminver>=VERSION2INT(1,7,32)) {
1412 			wptr = sendbuff;
1413 			put32bit(&wptr,CLTOCS_WRITE_FINISH);
1414 			put32bit(&wptr,12);
1415 			put64bit(&wptr,chunkid);
1416 			put32bit(&wptr,version);
1417 			if (universal_write(fd,sendbuff,20)==20) {
1418 				conncache_insert(ip,port,fd);
1419 			} else {
1420 				tcpclose(fd);
1421 			}
1422 		} else {
1423 			tcpclose(fd);
1424 		}
1425 
1426 #ifdef WORKER_DEBUG
1427 		now = monotonic_seconds();
1428 		workingtime = now - start;
1429 
1430 		cl=0;
1431 		for (cnt=0 ; cnt<chainelements ; cnt++) {
1432 			cl+=snprintf(debugchain+cl,200-cl,"%u.%u.%u.%u:%u->",(chain[cnt].ip>>24)&255,(chain[cnt].ip>>16)&255,(chain[cnt].ip>>8)&255,chain[cnt].ip&255,chain[cnt].port);
1433 		}
1434 		if (cl>=2) {
1435 			debugchain[cl-2]='\0';
1436 		}
1437 		syslog(LOG_NOTICE,"worker %lu sent %"PRIu32" blocks (%"PRIu32" partial) of chunk %016"PRIX64"_%08"PRIX32", received status for %"PRIu32" blocks (%"PRIu32" lost), bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s ), chain: %s",(unsigned long)arg,nextwriteid-1,partialblocks,chunkid,version,nextwriteid-1-waitforstatus,waitforstatus,(double)bytessent/workingtime,bytessent,workingtime,debugchain);
1438 #endif
1439 
1440 		if (status!=0 || wrstatus!=MFS_STATUS_OK) {
1441 			if (wrstatus!=MFS_STATUS_OK) {	// convert MFS status to OS errno
1442 				if (wrstatus==MFS_ERROR_NOSPACE) {
1443 					status=ENOSPC;
1444 				} else {
1445 					status=EIO;
1446 				}
1447 			}
1448 		} else if (nextwriteid-1 == waitforstatus) { // nothing has been written - treat it as EIO
1449 			status=EIO;
1450 		}
1451 
1452 		zassert(pthread_mutex_lock(&(ind->lock)));	// make helgrind happy
1453 		unbreakable = chd->unbreakable;
1454 
1455 		if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1456 			unbreakable = 0;
1457 		} else if (status!=0) {
1458 			if (wrstatus!=MFS_ERROR_NOTDONE) {
1459 				chd->trycnt++;
1460 			}
1461 			if (chd->trycnt>=maxretries) {
1462 				unbreakable = 0;
1463 			}
1464 		} else {
1465 			unbreakable = 0; // operation finished
1466 		}
1467 		chd->continueop = unbreakable;
1468 		zassert(pthread_mutex_unlock(&(ind->lock)));
1469 		if (unbreakable==0) {
1470 //			for (cnt=0 ; cnt<10 ; cnt++) {
1471 			westatus = fs_writeend(chunkid,inode,chindx,mfleng,0);
1472 //				if (westatus==MFS_ERROR_ENOENT || westatus==MFS_ERROR_QUOTA) { // can't change -> do not repeat
1473 //					break;
1474 //				} else if (westatus!=MFS_STATUS_OK) {
1475 //					if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1476 //						westatus = MFS_ERROR_EIO;
1477 //						break;
1478 //					}
1479 //					portable_usleep(100000+(10000<<cnt));
1480 //					zassert(pthread_mutex_lock(&(ind->lock)));	// make helgrind happy
1481 //					zassert(pthread_mutex_unlock(&(ind->lock)));
1482 //				} else {
1483 //					break;
1484 //				}
1485 //			}
1486 		} else {
1487 			westatus = MFS_STATUS_OK;
1488 		}
1489 
1490 		if (optimeout>0.0 && monotonic_seconds() - opbegin > optimeout) {
1491 			write_job_end(chd,EIO,0);
1492 		} else if (westatus==MFS_ERROR_ENOENT) {
1493 			write_job_end(chd,EBADF,0);
1494 		} else if (westatus==MFS_ERROR_QUOTA) {
1495 			write_job_end(chd,EDQUOT,0);
1496 		} else if (westatus==MFS_ERROR_IO) {
1497 			write_job_end(chd,EIO,0);
1498 		} else if (westatus!=MFS_STATUS_OK) {
1499 			write_job_end(chd,ENXIO,0);
1500 		} else {
1501 			if (status!=0) {
1502 				if (chd->trycnt>=maxretries) {
1503 					write_job_end(chd,status,0);
1504 				} else {
1505 					if (wrstatus==MFS_ERROR_NOTDONE) {
1506 						write_job_end(chd,0,300000);
1507 					} else {
1508 						write_job_end(chd,0,1000+((chd->trycnt<30)?((chd->trycnt-1)*300000):10000000));
1509 					}
1510 				}
1511 			} else {
1512 				if (valid_offsets) {
1513 					read_inode_clear_cache(inode,min_offset,max_offset-min_offset);
1514 				}
1515 				// read_inode_set_length_async(inode,mfleng,0);
1516 				write_job_end(chd,0,0);
1517 			}
1518 		}
1519 		chunkrwlock_wunlock(inode,chindx);
1520 	}
1521 	return NULL;
1522 }
1523 
write_data_init(uint32_t cachesize,uint32_t retries,uint32_t timeout,uint32_t logretry,uint8_t erronlostchunk,uint8_t erronnospace)1524 void write_data_init (uint32_t cachesize,uint32_t retries,uint32_t timeout,uint32_t logretry,uint8_t erronlostchunk,uint8_t erronnospace) {
1525 	uint32_t i;
1526 	size_t mystacksize;
1527 //	sigset_t oldset;
1528 //	sigset_t newset;
1529 
1530 	erroronlostchunk = erronlostchunk;
1531 	erroronnospace = erronnospace;
1532 	cacheblockcount = (cachesize/MFSBLOCKSIZE);
1533 	maxretries = retries;
1534 	if (optimeout>0) {
1535 		optimeout = timeout;
1536 	} else {
1537 		optimeout = 0.0;
1538 	}
1539 	minlogretry = logretry;
1540 	if (cacheblockcount<10) {
1541 		cacheblockcount=10;
1542 	}
1543 	zassert(pthread_mutex_init(&hashlock,NULL));
1544 	zassert(pthread_mutex_init(&workerslock,NULL));
1545 	zassert(pthread_cond_init(&worker_term_cond,NULL));
1546 	worker_term_waiting = 0;
1547 
1548 	zassert(pthread_mutex_init(&fcblock,NULL));
1549 	zassert(pthread_cond_init(&fcbcond,NULL));
1550 	fcbwaiting=0;
1551 	cacheblocks = malloc(sizeof(cblock)*cacheblockcount);
1552 	passert(cacheblocks);
1553 	for (i=0 ; i<cacheblockcount-1 ; i++) {
1554 		cacheblocks[i].next = cacheblocks+(i+1);
1555 	}
1556 	cacheblocks[cacheblockcount-1].next = NULL;
1557 	freecblockshead = cacheblocks;
1558 	freecacheblocks = cacheblockcount;
1559 
1560 	idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1561 	passert(idhash);
1562 	for (i=0 ; i<IDHASHSIZE ; i++) {
1563 		idhash[i]=NULL;
1564 	}
1565 
1566 //	dqueue = queue_new(0);
1567 	jqueue = queue_new(0);
1568 
1569 	zassert(pthread_attr_init(&worker_thattr));
1570 #ifdef PTHREAD_STACK_MIN
1571 	mystacksize = PTHREAD_STACK_MIN;
1572 	if (mystacksize < 0x20000) {
1573 		mystacksize = 0x20000;
1574 	}
1575 #else
1576 	mystacksize = 0x20000;
1577 #endif
1578 	zassert(pthread_attr_setstacksize(&worker_thattr,mystacksize));
1579 
1580 //	sigemptyset(&newset);
1581 //	sigaddset(&newset, SIGTERM);
1582 //	sigaddset(&newset, SIGINT);
1583 //	sigaddset(&newset, SIGHUP);
1584 //	sigaddset(&newset, SIGQUIT);
1585 //	zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
1586 //	zassert(pthread_create(&dqueue_worker_th,&worker_thattr,write_dqueue_worker,NULL));
1587 //	zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
1588 
1589 	zassert(pthread_mutex_lock(&workerslock));
1590 	workers_avail = 0;
1591 	workers_total = 0;
1592 	write_data_spawn_worker();
1593 	zassert(pthread_mutex_unlock(&workerslock));
1594 #ifdef BUFFER_DEBUG
1595 	zassert(pthread_create(&info_worker_th,&worker_thattr,write_info_worker,NULL));
1596 #endif
1597 }
1598 
write_data_term(void)1599 void write_data_term(void) {
1600 	uint32_t i;
1601 	inodedata *ind,*indn;
1602 	chunkdata *chd,*chdn;
1603 
1604 //	queue_close(dqueue);
1605 	queue_close(jqueue);
1606 	zassert(pthread_mutex_lock(&workerslock));
1607 	while (workers_total>0) {
1608 		worker_term_waiting++;
1609 		zassert(pthread_cond_wait(&worker_term_cond,&workerslock));
1610 	}
1611 	zassert(pthread_mutex_unlock(&workerslock));
1612 //	zassert(pthread_join(dqueue_worker_th,NULL));
1613 //	queue_delete(dqueue);
1614 	queue_delete(jqueue);
1615 	zassert(pthread_mutex_lock(&hashlock));
1616 	for (i=0 ; i<IDHASHSIZE ; i++) {
1617 		for (ind = idhash[i] ; ind ; ind = indn) {
1618 			indn = ind->next;
1619 			zassert(pthread_mutex_lock(&(ind->lock)));
1620 			chd = ind->chunks;
1621 			while (chd) {
1622 				chdn = chd->next;
1623 				write_free_chunkdata(chd);
1624 				chd = chdn;
1625 			}
1626 			zassert(pthread_mutex_unlock(&(ind->lock)));
1627 			zassert(pthread_cond_destroy(&(ind->flushcond)));
1628 			zassert(pthread_cond_destroy(&(ind->writecond)));
1629 			zassert(pthread_mutex_destroy(&(ind->lock)));
1630 			free(ind);
1631 		}
1632 	}
1633 	free(idhash);
1634 	zassert(pthread_mutex_unlock(&hashlock));
1635 	free(cacheblocks);
1636 	zassert(pthread_attr_destroy(&worker_thattr));
1637 	zassert(pthread_cond_destroy(&worker_term_cond));
1638 	zassert(pthread_cond_destroy(&fcbcond));
1639 	zassert(pthread_mutex_destroy(&fcblock));
1640 	zassert(pthread_mutex_destroy(&workerslock));
1641 	zassert(pthread_mutex_destroy(&hashlock));
1642 }
1643 
write_cb_expand(chunkdata * chd,cblock * cb,uint32_t from,uint32_t to,const uint8_t * data)1644 int write_cb_expand(chunkdata *chd,cblock *cb,uint32_t from,uint32_t to,const uint8_t *data) {
1645 	if (cb->writeid>0 || from>cb->to || to<cb->from) {	// can't expand
1646 		return -1;
1647 	}
1648 	memcpy(cb->data+from,data,to-from);
1649 	if (from<cb->from) {
1650 		cb->from = from;
1651 	}
1652 	if (to>cb->to) {
1653 		cb->to = to;
1654 	}
1655 	if (cb->to-cb->from==MFSBLOCKSIZE && cb->next==NULL && chd->waitingworker==2) {
1656 		if (universal_write(chd->wakeup_fd," ",1)!=1) {
1657 			syslog(LOG_ERR,"can't write to pipe !!!");
1658 		}
1659 		chd->waitingworker = 0;
1660 		chd->wakeup_fd = -1;
1661 	}
1662 	return 0;
1663 }
1664 
write_block(inodedata * ind,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data,uint8_t superuser)1665 int write_block(inodedata *ind,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t *data,uint8_t superuser) {
1666 	cblock *cb,*ncb;
1667 	chunkdata *chd;
1668 	uint8_t newchunk;
1669 
1670 	ncb = write_cb_acquire(ind);
1671 	zassert(pthread_mutex_lock(&(ind->lock)));
1672 	for (chd=ind->chunks ; chd ; chd=chd->next) {
1673 		if (chd->chindx == chindx) {
1674 			if (superuser) {
1675 				chd->superuser = 1;
1676 			}
1677 			for (cb=chd->datachaintail ; cb ; cb=cb->prev) {
1678 				if (cb->pos==pos) {
1679 					if (write_cb_expand(chd,cb,from,to,data)==0) {
1680 						write_cb_release(ind,ncb);
1681 						zassert(pthread_mutex_unlock(&(ind->lock)));
1682 						return 0;
1683 					}
1684 					break;
1685 				}
1686 			}
1687 			break;
1688 		}
1689 	}
1690 	ncb->pos = pos;
1691 	ncb->from = from;
1692 	ncb->to = to;
1693 	memcpy(ncb->data+from,data,to-from);
1694 	if (chd==NULL) {
1695 		chd = write_new_chunkdata(ind,chindx);
1696 		if (superuser) {
1697 			chd->superuser = 1;
1698 		}
1699 		newchunk = 1;
1700 	} else {
1701 		newchunk = 0;
1702 	}
1703 	ncb->prev = chd->datachaintail;
1704 	ncb->next = NULL;
1705 	if (chd->datachaintail!=NULL) {
1706 		chd->datachaintail->next = ncb;
1707 	} else {
1708 		chd->datachainhead = ncb;
1709 	}
1710 	chd->datachaintail = ncb;
1711 	if (newchunk) {
1712 		write_test_chunkdata(ind);
1713 	} else {
1714 		if (chd->waitingworker) {
1715 			if (universal_write(chd->wakeup_fd," ",1)!=1) {
1716 				syslog(LOG_ERR,"can't write to pipe !!!");
1717 			}
1718 			chd->waitingworker = 0;
1719 			chd->wakeup_fd = -1;
1720 		}
1721 	}
1722 	zassert(pthread_mutex_unlock(&(ind->lock)));
1723 	return 0;
1724 }
1725 
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data,uint8_t superuser)1726 int write_data(void *vid,uint64_t offset,uint32_t size,const uint8_t *data,uint8_t superuser) {
1727 	uint32_t chindx;
1728 	uint16_t pos;
1729 	uint32_t from;
1730 	int status;
1731 	inodedata *ind = (inodedata*)vid;
1732 	if (ind==NULL) {
1733 		return EIO;
1734 	}
1735 //	int64_t s,e;
1736 
1737 //	s = monotonic_useconds();
1738 	zassert(pthread_mutex_lock(&(ind->lock)));
1739 
1740 //	syslog(LOG_NOTICE,"write_data: inode:%"PRIu32" offset:%"PRIu64" size:%"PRIu32,ind->inode,offset,size);
1741 	status = ind->status;
1742 	if (status==0) {
1743 		if (offset+size>ind->maxfleng) {	// move fleng
1744 			ind->maxfleng = offset+size;
1745 		}
1746 		ind->writewaiting++;
1747 		while (ind->flushwaiting>0) {
1748 			zassert(pthread_cond_wait(&(ind->writecond),&(ind->lock)));
1749 		}
1750 		ind->writewaiting--;
1751 	}
1752 	zassert(pthread_mutex_unlock(&(ind->lock)));
1753 	if (status!=0) {
1754 		return status;
1755 	}
1756 
1757 	chindx = offset>>MFSCHUNKBITS;
1758 	pos = (offset&MFSCHUNKMASK)>>MFSBLOCKBITS;
1759 	from = offset&MFSBLOCKMASK;
1760 	while (size>0) {
1761 		if (size>MFSBLOCKSIZE-from) {
1762 			if (write_block(ind,chindx,pos,from,MFSBLOCKSIZE,data,superuser)<0) {
1763 				return EIO;
1764 			}
1765 			size -= (MFSBLOCKSIZE-from);
1766 			data += (MFSBLOCKSIZE-from);
1767 			from = 0;
1768 			pos++;
1769 			if (pos==1024) {
1770 				pos = 0;
1771 				chindx++;
1772 			}
1773 		} else {
1774 			if (write_block(ind,chindx,pos,from,from+size,data,superuser)<0) {
1775 				return EIO;
1776 			}
1777 			size = 0;
1778 		}
1779 	}
1780 //	e = monotonic_useconds();
1781 //	syslog(LOG_NOTICE,"write_data time: %"PRId64,e-s);
1782 	return 0;
1783 }
1784 
write_data_new(uint32_t inode,uint64_t fleng)1785 void* write_data_new(uint32_t inode,uint64_t fleng) {
1786 	inodedata* ind;
1787 	ind = write_get_inodedata(inode,fleng);
1788 	if (ind==NULL) {
1789 		return NULL;
1790 	}
1791 	return ind;
1792 }
1793 
write_data_do_chunk_wait(inodedata * ind)1794 static int write_data_do_chunk_wait(inodedata *ind) {
1795 	int ret;
1796 	chunkdata *chd;
1797 #ifdef WDEBUG
1798 	int64_t s,e;
1799 
1800 	s = monotonic_useconds();
1801 #endif
1802 	zassert(pthread_mutex_lock(&(ind->lock)));
1803 //	ind->chunkwaiting++;
1804 	do {
1805 		chd=NULL;
1806 		if (ind->status==0) {
1807 			for (chd = ind->chunks ; chd!=NULL && chd->chunkready ; chd=chd->next) {}
1808 			if (chd!=NULL) {
1809 #ifdef WDEBUG
1810 				syslog(LOG_NOTICE,"(inode:%"PRIu32") chunk_ready: wait ...",ind->inode);
1811 #endif
1812 				zassert(pthread_cond_wait(&(ind->chunkcond),&(ind->lock)));
1813 #ifdef WDEBUG
1814 				syslog(LOG_NOTICE,"(inode:%"PRIu32") chunk_ready: woken up",ind->inode);
1815 #endif
1816 			}
1817 		}
1818 	} while (ind->status==0 && chd!=NULL);
1819 //	ind->chunkwaiting--;
1820 	for (chd = ind->chunks ; chd!=NULL ; chd=chd->next) {
1821 		chd->unbreakable = 1;
1822 	}
1823 	ret = ind->status;
1824 	zassert(pthread_mutex_unlock(&(ind->lock)));
1825 #ifdef WDEBUG
1826 	e = monotonic_useconds();
1827 	syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1828 #endif
1829 	return ret;
1830 }
1831 
write_data_will_flush_wait(inodedata * ind)1832 static int write_data_will_flush_wait(inodedata *ind) {
1833 	int ret;
1834 	zassert(pthread_mutex_lock(&(ind->lock)));
1835 	ret = ind->chunkscnt;
1836 	zassert(pthread_mutex_unlock(&(ind->lock)));
1837 	return ret;
1838 }
1839 
write_data_do_flush(inodedata * ind,uint8_t releaseflag)1840 static int write_data_do_flush(inodedata *ind,uint8_t releaseflag) {
1841 	int ret;
1842 	chunkdata *chd;
1843 #ifdef WDEBUG
1844 	int64_t s,e;
1845 
1846 	s = monotonic_useconds();
1847 #endif
1848 	zassert(pthread_mutex_lock(&(ind->lock)));
1849 	ind->flushwaiting++;
1850 	while (ind->chunkscnt>0) {
1851 		for (chd = ind->chunks ; chd!=NULL ; chd=chd->next) {
1852 			if (chd->waitingworker) {
1853 				if (universal_write(chd->wakeup_fd," ",1)!=1) {
1854 					syslog(LOG_ERR,"can't write to pipe !!!");
1855 				}
1856 				chd->waitingworker = 0;
1857 				chd->wakeup_fd = -1;
1858 			}
1859 		}
1860 #ifdef WDEBUG
1861 		syslog(LOG_NOTICE,"(inode:%"PRIu32") flush: wait ...",ind->inode);
1862 #endif
1863 		zassert(pthread_cond_wait(&(ind->flushcond),&(ind->lock)));
1864 #ifdef WDEBUG
1865 		syslog(LOG_NOTICE,"(inode:%"PRIu32") flush: woken up",ind->inode);
1866 #endif
1867 	}
1868 	ind->flushwaiting--;
1869 	if (ind->flushwaiting==0 && ind->writewaiting>0) {
1870 		zassert(pthread_cond_broadcast(&(ind->writecond)));
1871 	}
1872 	ret = ind->status;
1873 	zassert(pthread_mutex_unlock(&(ind->lock)));
1874 	if (releaseflag) {
1875 		write_free_inodedata(ind);
1876 	}
1877 #ifdef WDEBUG
1878 	e = monotonic_useconds();
1879 	syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1880 #endif
1881 	return ret;
1882 }
1883 
write_data_flush(void * vid)1884 int write_data_flush(void *vid) {
1885 	if (vid==NULL) {
1886 		return EIO;
1887 	}
1888 	return write_data_do_flush((inodedata*)vid,0);
1889 }
1890 
write_data_chunk_wait(void * vid)1891 int write_data_chunk_wait(void *vid) {
1892 	if (vid==NULL) {
1893 		return EIO;
1894 	}
1895 	return write_data_do_chunk_wait((inodedata*)vid);
1896 }
1897 
write_data_inode_setmaxfleng(uint32_t inode,uint64_t maxfleng)1898 void write_data_inode_setmaxfleng(uint32_t inode,uint64_t maxfleng) {
1899 	inodedata* ind;
1900 	ind = write_find_inodedata(inode);
1901 	if (ind) {
1902 		zassert(pthread_mutex_lock(&(ind->lock)));
1903 		ind->maxfleng = maxfleng;
1904 		zassert(pthread_mutex_unlock(&(ind->lock)));
1905 		write_free_inodedata(ind);
1906 	}
1907 }
1908 
write_data_inode_getmaxfleng(uint32_t inode)1909 uint64_t write_data_inode_getmaxfleng(uint32_t inode) {
1910 	uint64_t maxfleng;
1911 	inodedata* ind;
1912 	ind = write_find_inodedata(inode);
1913 	if (ind) {
1914 		zassert(pthread_mutex_lock(&(ind->lock)));
1915 		maxfleng = ind->maxfleng;
1916 		zassert(pthread_mutex_unlock(&(ind->lock)));
1917 		write_free_inodedata(ind);
1918 	} else {
1919 		maxfleng = 0;
1920 	}
1921 	return maxfleng;
1922 }
1923 
write_data_getmaxfleng(void * vid)1924 uint64_t write_data_getmaxfleng(void *vid) {
1925 	uint64_t maxfleng;
1926 	inodedata* ind;
1927 	if (vid==NULL) {
1928 		return 0;
1929 	}
1930 	ind = (inodedata*)vid;
1931 	zassert(pthread_mutex_lock(&(ind->lock)));
1932 	maxfleng = ind->maxfleng;
1933 	zassert(pthread_mutex_unlock(&(ind->lock)));
1934 	return maxfleng;
1935 }
1936 
write_data_flush_inode(uint32_t inode)1937 int write_data_flush_inode(uint32_t inode) {
1938 	inodedata* ind;
1939 	int ret;
1940 	ind = write_find_inodedata(inode);
1941 	if (ind==NULL) {
1942 		return 0;
1943 	}
1944 	ret = write_data_do_flush(ind,1);
1945 	return ret;
1946 }
1947 
write_data_will_end_wait(void * vid)1948 int write_data_will_end_wait(void *vid) {
1949 	if (vid!=NULL) {
1950 		return write_data_will_flush_wait((inodedata*)vid);
1951 	} else {
1952 		return 0;
1953 	}
1954 }
1955 
write_data_end(void * vid)1956 int write_data_end(void *vid) {
1957 	int ret;
1958 	if (vid==NULL) {
1959 		return EIO;
1960 	}
1961 	ret = write_data_do_flush((inodedata*)vid,1);
1962 	return ret;
1963 }
1964