1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <sys/types.h>
26 #ifdef HAVE_WRITEV
27 #include <sys/uio.h>
28 #endif
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <limits.h>
38 #include <signal.h>
39 #include <pthread.h>
40 #include <inttypes.h>
41 
42 #include "massert.h"
43 #include "datapack.h"
44 #include "crc.h"
45 #include "strerr.h"
46 #include "mfsstrerr.h"
47 #include "pcqueue.h"
48 #include "sockets.h"
49 #include "conncache.h"
50 #include "csdb.h"
51 #include "mastercomm.h"
52 #include "clocks.h"
53 #include "portable.h"
54 #include "readdata.h"
55 #include "MFSCommunication.h"
56 
57 //#define WORKER_DEBUG 1
58 //#define BUFFER_DEBUG 1
59 //#define WDEBUG 1
60 
61 #ifndef EDQUOT
62 #define EDQUOT ENOSPC
63 #endif
64 
65 // for Nagle's-like algorithm
66 #define NEXT_BLOCK_DELAY 0.05
67 
68 #define CHUNKSERVER_ACTIVITY_TIMEOUT 2.0
69 
70 #define WORKER_IDLE_TIMEOUT 1.0
71 
72 #define WORKER_BUSY_LAST_SEND_TIMEOUT 5.0
73 #define WORKER_BUSY_WAIT_FOR_STATUS 5.0
74 #define WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT 20.0
75 
76 #define WORKER_NOP_INTERVAL 1.0
77 
78 #define SUSTAIN_WORKERS 50
79 #define HEAVYLOAD_WORKERS 150
80 #define MAX_WORKERS 250
81 
82 #define WCHASHSIZE 256
83 #define WCHASH(inode,indx) (((inode)*0xB239FB71+(indx)*193)%WCHASHSIZE)
84 
85 #define IDHASHSIZE 256
86 #define IDHASH(inode) (((inode)*0xB239FB71)%IDHASHSIZE)
87 
88 typedef struct cblock_s {
89 	uint8_t data[MFSBLOCKSIZE];	// modified only when writeid==0
90 	uint32_t chindx;	// chunk number
91 	uint16_t pos;		// block in chunk (0...1023) - never modified
92 	uint32_t writeid;	// 0 = not sent, >0 = block was sent (modified and accessed only when wchunk is locked)
93 	uint32_t from;		// first filled byte in data (modified only when writeid==0)
94 	uint32_t to;		// first not used byte in data (modified only when writeid==0)
95 	struct cblock_s *next,*prev;
96 } cblock;
97 
98 typedef struct inodedata_s {
99 	uint32_t inode;
100 	uint64_t maxfleng;
101 	uint32_t cacheblockcount;
102 	int status;
103 	uint16_t flushwaiting;
104 	uint16_t writewaiting;
105 	uint16_t lcnt;
106 	uint32_t trycnt;
107 	uint8_t waitingworker;
108 	uint8_t inqueue;
109 	int pipe[2];
110 	cblock *datachainhead,*datachaintail;
111 	pthread_cond_t flushcond;	// wait for inqueue==0 (flush)
112 	pthread_cond_t writecond;	// wait for flushwaiting==0 (write)
113 	struct inodedata_s *next;
114 } inodedata;
115 
116 typedef struct worker_s {
117 	pthread_t thread_id;
118 } worker;
119 
120 // static pthread_mutex_t fcblock;
121 
122 static pthread_cond_t fcbcond;
123 static uint8_t fcbwaiting;
124 static cblock *cacheblocks,*freecblockshead;
125 static uint32_t freecacheblocks;
126 
127 static uint32_t maxretries;
128 
129 static inodedata **idhash;
130 
131 static pthread_mutex_t glock;
132 
133 #ifdef BUFFER_DEBUG
134 static pthread_t info_worker_th;
135 static uint32_t usedblocks;
136 #endif
137 
138 static pthread_t dqueue_worker_th;
139 
140 static uint32_t workers_avail;
141 static uint32_t workers_total;
142 static uint32_t worker_term_waiting;
143 static pthread_cond_t worker_term_cond;
144 static pthread_attr_t worker_thattr;
145 
146 static void *jqueue,*dqueue;
147 
148 #ifdef BUFFER_DEBUG
write_info_worker(void * arg)149 void* write_info_worker(void *arg) {
150 	(void)arg;
151 	uint32_t cbcnt,fcbcnt,ucbcnt;
152 	uint32_t i;
153 	inodedata *id;
154 	cblock *cb;
155 	for (;;) {
156 		zassert(pthread_mutex_lock(&glock));
157 		cbcnt = 0;
158 		for (i = 0 ; i<IDHASHSIZE ; i++) {
159 			for (id = idhash[i] ; id ; id = id->next) {
160 				ucbcnt = 0;
161 				for (cb = id->datachainhead ; cb ; cb = cb->next) {
162 					ucbcnt++;
163 				}
164 				if (ucbcnt != id->cacheblockcount) {
165 					syslog(LOG_NOTICE,"inode: %"PRIu32" ; wrong cache block count (%"PRIu32"/%"PRIu32")",id->inode,ucbcnt,id->cacheblockcount);
166 				}
167 				cbcnt += ucbcnt;
168 			}
169 		}
170 		fcbcnt = 0;
171 		for (cb = freecblockshead ; cb ; cb = cb->next) {
172 			fcbcnt++;
173 		}
174 		syslog(LOG_NOTICE,"used cache blocks: %"PRIu32" ; sum of inode used blocks: %"PRIu32" ; free cache blocks: %"PRIu32" ; free cache chain blocks: %"PRIu32,usedblocks,cbcnt,freecacheblocks,fcbcnt);
175 		zassert(pthread_mutex_unlock(&glock));
176 		portable_usleep(500000);
177 	}
178 
179 }
180 #endif
181 
182 /* glock: LOCKED */
write_cb_release(inodedata * id,cblock * cb)183 void write_cb_release (inodedata *id,cblock *cb) {
184 //	zassert(pthread_mutex_lock(&fcblock));
185 	cb->next = freecblockshead;
186 	freecblockshead = cb;
187 	freecacheblocks++;
188 	id->cacheblockcount--;
189 	if (fcbwaiting) {
190 		zassert(pthread_cond_signal(&fcbcond));
191 	}
192 #ifdef BUFFER_DEBUG
193 	usedblocks--;
194 #endif
195 //	zassert(pthread_mutex_unlock(&fcblock));
196 }
197 
198 /* glock: LOCKED */
write_cb_acquire(inodedata * id)199 cblock* write_cb_acquire(inodedata *id) {
200 	cblock *ret;
201 //	zassert(pthread_mutex_lock(&fcblock));
202 	fcbwaiting++;
203 	while (freecblockshead==NULL || id->cacheblockcount>(freecacheblocks/3)) {
204 		zassert(pthread_cond_wait(&fcbcond,&glock));
205 	}
206 	fcbwaiting--;
207 	ret = freecblockshead;
208 	freecblockshead = ret->next;
209 	ret->chindx = 0;
210 	ret->pos = 0;
211 	ret->writeid = 0;
212 	ret->from = 0;
213 	ret->to = 0;
214 	ret->next = NULL;
215 	ret->prev = NULL;
216 	freecacheblocks--;
217 	id->cacheblockcount++;
218 #ifdef BUFFER_DEBUG
219 	usedblocks++;
220 #endif
221 //	zassert(pthread_mutex_unlock(&fcblock));
222 	return ret;
223 }
224 
225 
226 /* inode */
227 
228 /* glock: LOCKED */
write_find_inodedata(uint32_t inode)229 inodedata* write_find_inodedata(uint32_t inode) {
230 	uint32_t idh = IDHASH(inode);
231 	inodedata *id;
232 	for (id=idhash[idh] ; id ; id=id->next) {
233 		if (id->inode == inode) {
234 			return id;
235 		}
236 	}
237 	return NULL;
238 }
239 
240 /* glock: LOCKED */
write_get_inodedata(uint32_t inode)241 inodedata* write_get_inodedata(uint32_t inode) {
242 	uint32_t idh = IDHASH(inode);
243 	inodedata *id;
244 	int pfd[2];
245 
246 	for (id=idhash[idh] ; id ; id=id->next) {
247 		if (id->inode == inode) {
248 			return id;
249 		}
250 	}
251 
252 	if (pipe(pfd)<0) {
253 		syslog(LOG_WARNING,"pipe error: %s",strerr(errno));
254 		return NULL;
255 	}
256 	id = malloc(sizeof(inodedata));
257 	id->inode = inode;
258 	id->cacheblockcount = 0;
259 	id->maxfleng = 0;
260 	id->status = 0;
261 	id->trycnt = 0;
262 	id->pipe[0] = pfd[0];
263 	id->pipe[1] = pfd[1];
264 	id->datachainhead = NULL;
265 	id->datachaintail = NULL;
266 	id->waitingworker = 0;
267 	id->inqueue = 0;
268 	id->flushwaiting = 0;
269 	id->writewaiting = 0;
270 	id->lcnt = 0;
271 	zassert(pthread_cond_init(&(id->flushcond),NULL));
272 	zassert(pthread_cond_init(&(id->writecond),NULL));
273 	id->next = idhash[idh];
274 	idhash[idh] = id;
275 	return id;
276 }
277 
278 /* glock: LOCKED */
write_free_inodedata(inodedata * fid)279 void write_free_inodedata(inodedata *fid) {
280 	uint32_t idh = IDHASH(fid->inode);
281 	inodedata *id,**idp;
282 	idp = &(idhash[idh]);
283 	while ((id=*idp)) {
284 		if (id==fid) {
285 			*idp = id->next;
286 			zassert(pthread_cond_destroy(&(id->flushcond)));
287 			zassert(pthread_cond_destroy(&(id->writecond)));
288 			close(id->pipe[0]);
289 			close(id->pipe[1]);
290 			free(id);
291 			return;
292 		}
293 		idp = &(id->next);
294 	}
295 }
296 
297 
298 /* queues */
299 
300 /* glock: UNUSED */
write_delayed_enqueue(inodedata * id,uint32_t cnt)301 void write_delayed_enqueue(inodedata *id,uint32_t cnt) {
302 	uint64_t t;
303 	if (cnt>0) {
304 		t = monotonic_useconds();
305 		queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
306 	} else {
307 		queue_put(jqueue,0,0,(uint8_t*)id,0);
308 	}
309 }
310 
311 /* glock: UNUSED */
write_enqueue(inodedata * id)312 void write_enqueue(inodedata *id) {
313 	queue_put(jqueue,0,0,(uint8_t*)id,0);
314 }
315 
316 /* worker thread | glock: UNUSED */
write_dqueue_worker(void * arg)317 void* write_dqueue_worker(void *arg) {
318 	uint64_t t,usec;
319 	uint32_t husec,lusec,cnt;
320 	uint8_t *id;
321 	(void)arg;
322 	for (;;) {
323 		queue_get(dqueue,&husec,&lusec,&id,&cnt);
324 		if (id==NULL) {
325 			return NULL;
326 		}
327 		t = monotonic_useconds();
328 		usec = husec;
329 		usec <<= 32;
330 		usec |= lusec;
331 		if (t>usec) {
332 			t -= usec;
333 			while (t>=1000000 && cnt>0) {
334 				t-=1000000;
335 				cnt--;
336 			}
337 			if (cnt>0) {
338 				if (t<1000000) {
339 					portable_usleep(1000000-t);
340 				}
341 				cnt--;
342 			}
343 		}
344 		if (cnt>0) {
345 			t = monotonic_useconds();
346 			queue_put(dqueue,t>>32,t&0xFFFFFFFFU,(uint8_t*)id,cnt);
347 		} else {
348 			queue_put(jqueue,0,0,id,0);
349 		}
350 	}
351 	return NULL;
352 }
353 
354 /* glock: UNLOCKED */
write_job_end(inodedata * id,int status,uint32_t delay)355 void write_job_end(inodedata *id,int status,uint32_t delay) {
356 	cblock *cb,*fcb;
357 
358 	zassert(pthread_mutex_lock(&glock));
359 	if (status) {
360 		errno = status;
361 		syslog(LOG_WARNING,"error writing file number %"PRIu32": %s",id->inode,strerr(errno));
362 		id->status = status;
363 	}
364 	if (status==0 && delay==0) {
365 		id->trycnt=0;	// on good write reset try counter
366 	}
367 	status = id->status;
368 
369 	if (id->datachainhead && status==0) {	// still have some work to do
370 		// reset write id
371 		for (cb=id->datachainhead ; cb ; cb=cb->next) {
372 			cb->writeid = 0;
373 		}
374 		write_delayed_enqueue(id,delay);
375 	} else {	// no more work or error occurred
376 		// if this is an error then release all data blocks
377 		cb = id->datachainhead;
378 		while (cb) {
379 			fcb = cb;
380 			cb = cb->next;
381 			write_cb_release(id,fcb);
382 		}
383 		id->datachainhead=NULL;
384 		id->datachaintail=NULL;
385 		id->inqueue=0;
386 
387 		if (id->flushwaiting>0) {
388 			zassert(pthread_cond_broadcast(&(id->flushcond)));
389 		}
390 	}
391 	zassert(pthread_mutex_unlock(&glock));
392 }
393 
394 void* write_worker(void *arg);
395 
396 static uint32_t lastnotify = 0;
397 
398 /* glock:LOCKED */
write_data_spawn_worker(void)399 static inline void write_data_spawn_worker(void) {
400 	sigset_t oldset;
401 	sigset_t newset;
402 	worker *w;
403 	int res;
404 
405 	w = malloc(sizeof(worker));
406 	if (w==NULL) {
407 		return;
408 	}
409 	sigemptyset(&newset);
410 	sigaddset(&newset, SIGTERM);
411 	sigaddset(&newset, SIGINT);
412 	sigaddset(&newset, SIGHUP);
413 	sigaddset(&newset, SIGQUIT);
414 	zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
415 	res = pthread_create(&(w->thread_id),&worker_thattr,write_worker,w);
416 	zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
417 	if (res<0) {
418 		return;
419 	}
420 	workers_avail++;
421 	workers_total++;
422 #ifdef WDEBUG
423 	fprintf(stderr,"spawn write worker (total: %"PRIu32")\n",workers_total);
424 #else
425 	if (workers_total%10==0 && workers_total!=lastnotify) {
426 		syslog(LOG_INFO,"write workers: %"PRIu32"+\n",workers_total);
427 		lastnotify = workers_total;
428 	}
429 #endif
430 }
431 
432 /* glock:LOCKED */
write_data_close_worker(worker * w)433 static inline void write_data_close_worker(worker *w) {
434 	workers_avail--;
435 	workers_total--;
436 	if (workers_total==0 && worker_term_waiting) {
437 		zassert(pthread_cond_signal(&worker_term_cond));
438 		worker_term_waiting--;
439 	}
440 	pthread_detach(w->thread_id);
441 	free(w);
442 #ifdef WDEBUG
443 	fprintf(stderr,"close write worker (total: %"PRIu32")\n",workers_total);
444 #else
445 	if (workers_total%10==0 && workers_total!=lastnotify) {
446 		syslog(LOG_INFO,"write workers: %"PRIu32"-\n",workers_total);
447 		lastnotify = workers_total;
448 	}
449 #endif
450 }
451 
write_prepare_ip(char ipstr[16],uint32_t ip)452 static inline void write_prepare_ip (char ipstr[16],uint32_t ip) {
453 	if (ipstr[0]==0) {
454 		snprintf(ipstr,16,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,(uint8_t)(ip>>24),(uint8_t)(ip>>16),(uint8_t)(ip>>8),(uint8_t)ip);
455 		ipstr[15]=0;
456 	}
457 }
458 
459 /* main working thread | glock:UNLOCKED */
write_worker(void * arg)460 void* write_worker(void *arg) {
461 	uint32_t z1,z2,z3;
462 	uint8_t *data;
463 	int fd;
464 	int i;
465 	struct pollfd pfd[2];
466 	uint32_t sent,rcvd;
467 	uint32_t hdrtosend;
468 	uint8_t sending_mode;
469 	uint8_t recvbuff[21];
470 	uint8_t sendbuff[32];
471 #ifdef HAVE_WRITEV
472 	struct iovec siov[2];
473 #endif
474 	uint8_t pipebuff[1024];
475 	uint8_t *wptr;
476 	const uint8_t *rptr;
477 
478 	uint32_t reccmd;
479 	uint32_t recleng;
480 	uint64_t recchunkid;
481 	uint32_t recwriteid;
482 	uint8_t recstatus;
483 
484 #ifdef WORKER_DEBUG
485 	uint32_t partialblocks;
486 	uint32_t bytessent;
487 	char debugchain[200];
488 	uint32_t cl;
489 #endif
490 
491 	const uint8_t *cp,*cpe;
492 	uint8_t *cpw;
493 	uint32_t chainip[100];
494 	uint16_t chainport[100];
495 //	uint32_t chainver[100];
496 	uint32_t chainminver;
497 	uint16_t chainelements;
498 	uint32_t tmpip;
499 	uint16_t tmpport;
500 	uint32_t tmpver;
501 	uint8_t cschain[6*99];
502 	uint32_t cschainsize;
503 
504 	uint32_t chindx;
505 	uint32_t ip;
506 	uint16_t port;
507 	uint32_t srcip;
508 	uint64_t mfleng;
509 	uint64_t maxwroffset;
510 	uint64_t chunkid;
511 	uint32_t version;
512 	uint32_t nextwriteid;
513 	const uint8_t *csdata;
514 	uint32_t csdatasize;
515 	uint8_t csdataver;
516 	uint8_t westatus;
517 	uint8_t wrstatus;
518 	int status;
519 	char csstrip[16];
520 	uint8_t waitforstatus;
521 	uint8_t flushwaiting;
522 	uint8_t endofchunk;
523 	double start,now,lastrcvd,lastblock,lastsent;
524 	double workingtime,lrdiff,lbdiff;
525 	uint8_t cnt;
526 	uint8_t firsttime = 1;
527 	worker *w = (worker*)arg;
528 
529 	inodedata *id;
530 	cblock *cb,*ncb,*rcb;
531 //	inodedata *id;
532 
533 	chainelements = 0;
534 	chindx = 0;
535 
536 	for (;;) {
537 		for (cnt=0 ; cnt<chainelements ; cnt++) {
538 			csdb_writedec(chainip[cnt],chainport[cnt]);
539 		}
540 		chainelements=0;
541 
542 		if (firsttime==0) {
543 			zassert(pthread_mutex_lock(&glock));
544 			workers_avail++;
545 			if (workers_avail > SUSTAIN_WORKERS) {
546 //				fprintf(stderr,"close worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
547 				write_data_close_worker(w);
548 				zassert(pthread_mutex_unlock(&glock));
549 				return NULL;
550 			}
551 			zassert(pthread_mutex_unlock(&glock));
552 		}
553 		firsttime = 0;
554 
555 		// get next job
556 		queue_get(jqueue,&z1,&z2,&data,&z3);
557 
558 		zassert(pthread_mutex_lock(&glock));
559 
560 		if (data==NULL) {
561 			write_data_close_worker(w);
562 			zassert(pthread_mutex_unlock(&glock));
563 			return NULL;
564 		}
565 
566 		workers_avail--;
567 		if (workers_avail==0 && workers_total<MAX_WORKERS) {
568 			write_data_spawn_worker();
569 //			fprintf(stderr,"spawn worker (avail:%"PRIu32" ; total:%"PRIu32")\n",workers_avail,workers_total);
570 		}
571 
572 		id = (inodedata*)data;
573 
574 		if (id->status==0) {
575 			if (id->datachainhead) {
576 				chindx = id->datachainhead->chindx;
577 				status = id->status;
578 			} else {
579 				syslog(LOG_WARNING,"writeworker got inode with no data to write !!!");
580 				status = EINVAL;	// this should never happen, so status is not important - just anything
581 			}
582 		} else {
583 			status = id->status;
584 		}
585 
586 		zassert(pthread_mutex_unlock(&glock));
587 
588 		if (status) {
589 			write_job_end(id,status,0);
590 			continue;
591 		}
592 
593 		// syslog(LOG_NOTICE,"file: %"PRIu32", index: %"PRIu16" - debug1",id->inode,chindx);
594 		// get chunk data from master
595 //		start = monotonic_seconds();
596 		wrstatus = fs_writechunk(id->inode,chindx,&csdataver,&mfleng,&chunkid,&version,&csdata,&csdatasize);
597 		if (wrstatus!=STATUS_OK) {
598 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32" - fs_writechunk returned status: %s",id->inode,chindx,mfsstrerr(wrstatus));
599 			if (wrstatus!=ERROR_LOCKED) {
600 				if (wrstatus==ERROR_ENOENT) {
601 					write_job_end(id,EBADF,0);
602 				} else if (wrstatus==ERROR_QUOTA) {
603 					write_job_end(id,EDQUOT,0);
604 				} else if (wrstatus==ERROR_NOSPACE) {
605 					write_job_end(id,ENOSPC,0);
606 				} else if (wrstatus==ERROR_CHUNKLOST) {
607 					write_job_end(id,ENXIO,0);
608 				} else {
609 					id->trycnt++;
610 					if (id->trycnt>=maxretries) {
611 						if (wrstatus==ERROR_NOCHUNKSERVERS) {
612 							write_job_end(id,ENOSPC,0);
613 						} else if (wrstatus==ERROR_CSNOTPRESENT) {
614 							write_job_end(id,ENXIO,0);
615 						} else {
616 							write_job_end(id,EIO,0);
617 						}
618 					} else {
619 						write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
620 					}
621 				}
622 			} else {
623 				write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
624 			}
625 			continue;	// get next job
626 		}
627 //		now = monotonic_seconds();
628 //		fprintf(stderr,"fs_writechunk time: %.3lf\n",(now-start));
629 
630 		if (csdata==NULL || csdatasize==0) {
631 			fs_writeend(chunkid,id->inode,0);
632 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are no valid copies",id->inode,chindx,chunkid,version);
633 			id->trycnt+=6;
634 			if (id->trycnt>=maxretries) {
635 				write_job_end(id,ENXIO,0);
636 			} else {
637 				write_delayed_enqueue(id,60);
638 			}
639 			continue;
640 		}
641 		ip = 0; // make old compilers happy
642 		port = 0; // make old compilers happy
643 		csstrip[0] = 0;
644 		cp = csdata;
645 		cpe = csdata+csdatasize;
646 		chainminver = 0xFFFFFFFF;
647 		cpw = cschain;
648 		cschainsize = 0;
649 		while (cp<cpe && chainelements<100) {
650 			tmpip = get32bit(&cp);
651 			tmpport = get16bit(&cp);
652 			if (csdataver==0) {
653 				tmpver = 0;
654 			} else {
655 				tmpver = get32bit(&cp);
656 			}
657 			chainip[chainelements] = tmpip;
658 			chainport[chainelements] = tmpport;
659 			csdb_writeinc(tmpip,tmpport);
660 			if (tmpver<chainminver) {
661 				chainminver = tmpver;
662 			}
663 			if (chainelements==0) {
664 				ip = tmpip;
665 				port = tmpport;
666 			} else {
667 				put32bit(&cpw,tmpip);
668 				put16bit(&cpw,tmpport);
669 				cschainsize += 6;
670 			}
671 			chainelements++;
672 		}
673 
674 		if (cp<cpe) {
675 			fs_writeend(chunkid,id->inode,0);
676 			syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - there are too many copies",id->inode,chindx,chunkid,version);
677 			id->trycnt+=6;
678 			if (id->trycnt>=maxretries) {
679 				write_job_end(id,ENXIO,0);
680 			} else {
681 				write_delayed_enqueue(id,60);
682 			}
683 			continue;
684 		}
685 
686 //		chain = csdata;
687 //		ip = get32bit(&chain);
688 //		port = get16bit(&chain);
689 //		chainsize = csdatasize-6;
690 
691 		start = monotonic_seconds();
692 
693 /*
694 		if (csdatasize>CSDATARESERVE) {
695 			csdatasize = CSDATARESERVE;
696 		}
697 		memcpy(wrec->csdata,csdata,csdatasize);
698 		wrec->csdatasize=csdatasize;
699 		while (csdatasize>=6) {
700 			tmpip = get32bit(&csdata);
701 			tmpport = get16bit(&csdata);
702 			csdatasize-=6;
703 			csdb_writeinc(tmpip,tmpport);
704 		}
705 */
706 
707 		// make connection to cs
708 		srcip = fs_getsrcip();
709 		fd = conncache_get(ip,port);
710 		if (fd<0) {
711 			cnt=0;
712 			while (cnt<10) {
713 				fd = tcpsocket();
714 				if (fd<0) {
715 					syslog(LOG_WARNING,"writeworker: can't create tcp socket: %s",strerr(errno));
716 					break;
717 				}
718 				if (srcip) {
719 					if (tcpnumbind(fd,srcip,0)<0) {
720 						syslog(LOG_WARNING,"writeworker: can't bind socket to given ip: %s",strerr(errno));
721 						tcpclose(fd);
722 						fd=-1;
723 						break;
724 					}
725 				}
726 				if (tcpnumtoconnect(fd,ip,port,(cnt%2)?(300*(1<<(cnt>>1))):(200*(1<<(cnt>>1))))<0) {
727 					cnt++;
728 					if (cnt>=10) {
729 						write_prepare_ip(csstrip,ip);
730 						syslog(LOG_WARNING,"writeworker: can't connect to (%s:%"PRIu16"): %s",csstrip,port,strerr(errno));
731 					}
732 					close(fd);
733 					fd=-1;
734 				} else {
735 					uint32_t mip,pip;
736 					uint16_t mport,pport;
737 					tcpgetpeer(fd,&pip,&pport);
738 					tcpgetmyaddr(fd,&mip,&mport);
739 #ifdef WDEBUG
740 					fprintf(stderr,"connection ok (%"PRIX32":%"PRIu16"->%"PRIX32":%"PRIu16")\n",mip,mport,pip,pport);
741 #endif
742 					cnt=10;
743 				}
744 			}
745 		}
746 		if (fd<0) {
747 			fs_writeend(chunkid,id->inode,0);
748 			id->trycnt++;
749 			if (id->trycnt>=maxretries) {
750 				write_job_end(id,EIO,0);
751 			} else {
752 				write_delayed_enqueue(id,1+((id->trycnt<30)?(id->trycnt/3):10));
753 			}
754 			continue;
755 		}
756 		if (tcpnodelay(fd)<0) {
757 			syslog(LOG_WARNING,"writeworker: can't set TCP_NODELAY: %s",strerr(errno));
758 		}
759 
760 #ifdef WORKER_DEBUG
761 		partialblocks=0;
762 		bytessent=0;
763 #endif
764 		nextwriteid=1;
765 
766 		pfd[0].fd = fd;
767 		pfd[1].fd = id->pipe[0];
768 		rcvd = 0;
769 		sent = 0;
770 		waitforstatus=1;
771 		wptr = sendbuff;
772 
773 		put32bit(&wptr,CLTOCS_WRITE);
774 		if (chainminver>=VERSION2INT(1,7,32)) {
775 			put32bit(&wptr,13+cschainsize);
776 			put8bit(&wptr,1);
777 			hdrtosend = 21;
778 		} else {
779 			put32bit(&wptr,12+cschainsize);
780 			hdrtosend = 20;
781 		}
782 
783 		put64bit(&wptr,chunkid);
784 		put32bit(&wptr,version);
785 		sending_mode = 1;
786 // debug:	syslog(LOG_NOTICE,"writeworker: init packet prepared");
787 		cb = NULL;
788 		endofchunk = 0;
789 
790 		status = 0;
791 		wrstatus = STATUS_OK;
792 
793 		lastrcvd = 0.0;
794 		lastsent = 0.0;
795 		lastblock = 0.0;
796 
797 		flushwaiting = 0;
798 //		firstloop = 1;
799 
800 		do {
801 			now = monotonic_seconds();
802 			zassert(pthread_mutex_lock(&glock));
803 
804 			if (lastrcvd==0.0) {
805 				lastrcvd = now;
806 			} else {
807 				lrdiff = now - lastrcvd;
808 				if (lrdiff>=CHUNKSERVER_ACTIVITY_TIMEOUT) {
809 					write_prepare_ip(csstrip,ip);
810 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was timed out (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
811 					zassert(pthread_mutex_unlock(&glock));
812 					break;
813 				}
814 			}
815 			if (lastblock==0.0) {
816 				lbdiff = NEXT_BLOCK_DELAY; // first block should be send immediately
817 			} else {
818 				lbdiff = now - lastblock;
819 			}
820 			workingtime = now - start;
821 
822 //			if (!((waitforstatus>0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) || (waitforstatus==0 && lbdiff<WORKER_IDLE_TIMEOUT && flushwaiting==0 && (workers_total<=HEAVYLOAD_WORKERS) && endofchunk==0))) {
823 //					zassert(pthread_mutex_unlock(&glock));
824 //					break;
825 //				}
826 //			}
827 
828 			id->waitingworker=1;
829 
830 			if (sending_mode==0 && workingtime<WORKER_BUSY_LAST_SEND_TIMEOUT+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT) && waitforstatus<64) {
831 				if (cb==NULL) {
832 					ncb = id->datachainhead;
833 				} else {
834 					ncb = cb->next;
835 				}
836 				if (ncb) {
837 					if (ncb->chindx==chindx) {
838 						if (ncb->to-ncb->from==MFSBLOCKSIZE || lbdiff>=NEXT_BLOCK_DELAY || ncb->next!=NULL || id->flushwaiting) {
839 							cb = ncb;
840 							sending_mode = 2;
841 						} else {
842 							id->waitingworker=2; // wait for block expand
843 						}
844 					} else {
845 						endofchunk=1;
846 					}
847 				}
848 /*
849 				if (cb==NULL) {
850 					if (id->datachainhead) {
851 						if (id->datachainhead->chindx==chindx) {
852 							if (id->datachainhead->to-id->datachainhead->from==MFSBLOCKSIZE || id->datachainhead->next!=NULL || id->flushwaiting) {
853 								cb = id->datachainhead;
854 								havedata=1;
855 							}
856 						} else {
857 							endofchunk=1;
858 						}
859 					} else {
860 						id->waitingworker=1;
861 					}
862 				} else {
863 					if (cb->next) {
864 						if (cb->next->chindx==chindx) {
865 							if (cb->next->to-cb->next->from==MFSBLOCKSIZE || lbdiff>NEXT_BLOCK_DELAY || cb->next->next!=NULL || id->flushwaiting) {
866 								cb = cb->next;
867 								havedata=1;
868 							} else {
869 								id->waitingworker=2;
870 							}
871 						} else {
872 							endofchunk=1;
873 						}
874 					} else {
875 						id->waitingworker=1;
876 					}
877 				}
878 */
879 				if (sending_mode==2) {
880 					cb->writeid = nextwriteid++;
881 // debug:				syslog(LOG_NOTICE,"writeworker: data packet prepared (writeid:%"PRIu32",pos:%"PRIu16")",cb->writeid,cb->pos);
882 					waitforstatus++;
883 					wptr = sendbuff;
884 					put32bit(&wptr,CLTOCS_WRITE_DATA);
885 					put32bit(&wptr,24+(cb->to-cb->from));
886 					put64bit(&wptr,chunkid);
887 					put32bit(&wptr,cb->writeid);
888 					put16bit(&wptr,cb->pos);
889 					put16bit(&wptr,cb->from);
890 					put32bit(&wptr,cb->to-cb->from);
891 					put32bit(&wptr,mycrc32(0,cb->data+cb->from,cb->to-cb->from));
892 #ifdef WORKER_DEBUG
893 					if (cb->to-cb->from<MFSBLOCKSIZE) {
894 						partialblocks++;
895 					}
896 					bytessent+=(cb->to-cb->from);
897 #endif
898 					sent = 0;
899 					lastblock = now;
900 					lastsent = now;
901 				} else if (lastsent+WORKER_NOP_INTERVAL<now && chainminver>=VERSION2INT(1,7,32)) {
902 					wptr = sendbuff;
903 					put32bit(&wptr,ANTOAN_NOP);
904 					put32bit(&wptr,0);
905 					sent = 0;
906 					sending_mode = 3;
907 				}
908 			}
909 
910 #ifdef WORKER_DEBUG
911 			fprintf(stderr,"workerloop: waitforstatus:%u workingtime:%.6lf workers_total:%u lbdiff:%.6lf flushwaiting:%u endofchunk:%u\n",waitforstatus,workingtime,workers_total,lbdiff,flushwaiting,endofchunk);
912 #endif
913 			if (waitforstatus>0) {
914 				if (workingtime>WORKER_BUSY_LAST_SEND_TIMEOUT+WORKER_BUSY_WAIT_FOR_STATUS+((workers_total>HEAVYLOAD_WORKERS)?0:WORKER_BUSY_NOJOBS_INCREASE_TIMEOUT)) { // timeout
915 					id->waitingworker=0;
916 					zassert(pthread_mutex_unlock(&glock));
917 					break;
918 				}
919 			} else {
920 				if (lbdiff>=WORKER_IDLE_TIMEOUT || flushwaiting || workers_total>HEAVYLOAD_WORKERS || endofchunk) {
921 					id->waitingworker=0;
922 					zassert(pthread_mutex_unlock(&glock));
923 					break;
924 				}
925 			}
926 
927 
928 			zassert(pthread_mutex_unlock(&glock));
929 
930 			switch (sending_mode) {
931 				case 1:
932 					if (sent<hdrtosend) {
933 #ifdef HAVE_WRITEV
934 						if (cschainsize>0) {
935 							siov[0].iov_base = (void*)(sendbuff+sent);
936 							siov[0].iov_len = hdrtosend-sent;
937 							siov[1].iov_base = (void*)cschain;	// discard const (safe - because it's used in writev)
938 							siov[1].iov_len = cschainsize;
939 							i = writev(fd,siov,2);
940 						} else {
941 #endif
942 							i = write(fd,sendbuff+sent,hdrtosend-sent);
943 #ifdef HAVE_WRITEV
944 						}
945 #endif
946 					} else {
947 						i = write(fd,cschain+(sent-hdrtosend),cschainsize-(sent-hdrtosend));
948 					}
949 					if (i>=0) {
950 						sent+=i;
951 						if (sent==hdrtosend+cschainsize) {
952 							sending_mode = 0;
953 						}
954 					}
955 					break;
956 				case 2:
957 					if (sent<32) {
958 #ifdef HAVE_WRITEV
959 						siov[0].iov_base = (void*)(sendbuff+sent);
960 						siov[0].iov_len = 32-sent;
961 						siov[1].iov_base = (void*)(cb->data+cb->from);
962 						siov[1].iov_len = cb->to-cb->from;
963 						i = writev(fd,siov,2);
964 #else
965 						i = write(fd,sendbuff+sent,32-sent);
966 #endif
967 					} else {
968 						i = write(fd,cb->data+cb->from+(sent-32),cb->to-cb->from-(sent-32));
969 					}
970 					if (i>=0) {
971 						sent+=i;
972 						if (sent==32+cb->to-cb->from) {
973 							sending_mode = 0;
974 						}
975 					}
976 					break;
977 				case 3:
978 					i = write(fd,sendbuff+sent,8-sent);
979 					if (i>=0) {
980 						sent+=i;
981 						if (sent==8) {
982 							sending_mode = 0;
983 						}
984 					}
985 					break;
986 				default:
987 					i=0;
988 			}
989 
990 			if (i<0) {
991 				if (ERRNO_ERROR && errno!=EINTR) {
992 					write_prepare_ip(csstrip,ip);
993 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: write to (%s:%"PRIu16") error: %s / NEGWRITE (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,id->trycnt+1);
994 					status=EIO;
995 					break;
996 				}
997 			}
998 
999 			pfd[0].events = POLLIN | (sending_mode?POLLOUT:0);
1000 			pfd[0].revents = 0;
1001 			pfd[1].events = POLLIN;
1002 			pfd[1].revents = 0;
1003 			if (poll(pfd,2,100)<0) { /* correct timeout - in msec */
1004 				if (errno!=EINTR) {
1005 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: poll error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),waitforstatus,id->trycnt+1);
1006 					status=EIO;
1007 					break;
1008 				}
1009 			}
1010 			zassert(pthread_mutex_lock(&glock));	// make helgrind happy
1011 			id->waitingworker=0;
1012 			flushwaiting = (id->flushwaiting>0)?1:0;
1013 			zassert(pthread_mutex_unlock(&glock));	// make helgrind happy
1014 			if (pfd[1].revents&POLLIN) {	// used just to break poll - so just read all data from pipe to empty it
1015 				i = read(id->pipe[0],pipebuff,1024);
1016 				if (i<0) { // mainly to make happy static code analyzers
1017 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: read pipe error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,strerr(errno),waitforstatus,id->trycnt+1);
1018 				}
1019 			}
1020 			if (pfd[0].revents&POLLHUP) {
1021 				write_prepare_ip(csstrip,ip);
1022 				syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / POLLHUP (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1023 				status=EIO;
1024 				break;
1025 			}
1026 			if (pfd[0].revents&POLLERR) {
1027 				write_prepare_ip(csstrip,ip);
1028 				syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") got error status / POLLERR (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1029 				status=EIO;
1030 				break;
1031 			}
1032 			if (pfd[0].revents&POLLIN) {
1033 				i = read(fd,recvbuff+rcvd,21-rcvd);
1034 				if (i==0) { 	// connection reset by peer or read error
1035 					write_prepare_ip(csstrip,ip);
1036 					syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: connection with (%s:%"PRIu16") was reset by peer / ZEROREAD (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,waitforstatus,id->trycnt+1);
1037 					status=EIO;
1038 					break;
1039 				}
1040 				if (i<0) {
1041 					if (errno!=EINTR) {
1042 						write_prepare_ip(csstrip,ip);
1043 						syslog(LOG_WARNING,"file: %"PRIu32", index: %"PRIu32", chunk: %"PRIu64", version: %"PRIu32" - writeworker: read from (%s:%"PRIu16") error: %s (unfinished writes: %"PRIu8"; try counter: %"PRIu32")",id->inode,chindx,chunkid,version,csstrip,port,strerr(errno),waitforstatus,id->trycnt+1);
1044 						status=EIO;
1045 						break;
1046 					} else {
1047 						i=0;
1048 					}
1049 				}
1050 				lastrcvd = monotonic_seconds();
1051 				rcvd+=i;
1052 				// do not accept ANTOAN_UNKNOWN_COMMAND and ANTOAN_BAD_COMMAND_SIZE here - only ANTOAN_NOP
1053 				if (rcvd>=8 && recvbuff[7]==0 && recvbuff[6]==0 && recvbuff[5]==0 && recvbuff[4]==0 && recvbuff[3]==0 && recvbuff[2]==0 && recvbuff[1]==0 && recvbuff[0]==0) {	// ANTOAN_NOP packet received - skip it
1054 					if (rcvd>8) {
1055 						memmove(recvbuff,recvbuff+8,rcvd-8);
1056 						rcvd-=8;
1057 					}
1058 				}
1059 				if (rcvd==21) {
1060 					rptr = recvbuff;
1061 					reccmd = get32bit(&rptr);
1062 					recleng = get32bit(&rptr);
1063 					recchunkid = get64bit(&rptr);
1064 					recwriteid = get32bit(&rptr);
1065 					recstatus = get8bit(&rptr);
1066 					if (reccmd!=CSTOCL_WRITE_STATUS ||  recleng!=13) {
1067 						syslog(LOG_WARNING,"writeworker: got unrecognized packet from chunkserver (cmd:%"PRIu32",leng:%"PRIu32")",reccmd,recleng);
1068 						status=EIO;
1069 						break;
1070 					}
1071 					if (recchunkid!=chunkid) {
1072 						syslog(LOG_WARNING,"writeworker: got unexpected packet (expected chunkdid:%"PRIu64",packet chunkid:%"PRIu64")",chunkid,recchunkid);
1073 						status=EIO;
1074 						break;
1075 					}
1076 					if (recstatus!=STATUS_OK) {
1077 						syslog(LOG_WARNING,"writeworker: write error: %s",mfsstrerr(recstatus));
1078 						wrstatus=recstatus;
1079 						break;
1080 					}
1081 // debug:				syslog(LOG_NOTICE,"writeworker: received status ok for writeid:%"PRIu32,recwriteid);
1082 					if (recwriteid>0) {
1083 						zassert(pthread_mutex_lock(&glock));
1084 						for (rcb = id->datachainhead ; rcb && rcb->writeid!=recwriteid ; rcb=rcb->next) {}
1085 						if (rcb==NULL) {
1086 							syslog(LOG_WARNING,"writeworker: got unexpected status (writeid:%"PRIu32")",recwriteid);
1087 							zassert(pthread_mutex_unlock(&glock));
1088 							status=EIO;
1089 							break;
1090 						}
1091 						if (rcb==cb) {	// current block
1092 // debug:						syslog(LOG_NOTICE,"writeworker: received status for current block");
1093 							if (sending_mode==2) {	// got status ok before all data had been sent - error
1094 								syslog(LOG_WARNING,"writeworker: got status OK before all data have been sent");
1095 								zassert(pthread_mutex_unlock(&glock));
1096 								status=EIO;
1097 								break;
1098 							} else {
1099 								cb = NULL;
1100 							}
1101 						}
1102 						if (rcb->prev) {
1103 							rcb->prev->next = rcb->next;
1104 						} else {
1105 							id->datachainhead = rcb->next;
1106 						}
1107 						if (rcb->next) {
1108 							rcb->next->prev = rcb->prev;
1109 						} else {
1110 							id->datachaintail = rcb->prev;
1111 						}
1112 						maxwroffset = (((uint64_t)(chindx))<<MFSCHUNKBITS)+(((uint32_t)(rcb->pos))<<MFSBLOCKBITS)+rcb->to;
1113 						if (maxwroffset>mfleng) {
1114 							mfleng=maxwroffset;
1115 						}
1116 						write_cb_release(id,rcb);
1117 						zassert(pthread_mutex_unlock(&glock));
1118 					}
1119 					waitforstatus--;
1120 					rcvd=0;
1121 				}
1122 			}
1123 		} while (1);
1124 
1125 		if (waitforstatus==0 && chainminver>=VERSION2INT(1,7,32)) {
1126 			wptr = sendbuff;
1127 			put32bit(&wptr,CLTOCS_WRITE_FINISH);
1128 			put32bit(&wptr,12);
1129 			put64bit(&wptr,chunkid);
1130 			put32bit(&wptr,version);
1131 			if (write(fd,sendbuff,20)==20) {
1132 				conncache_insert(ip,port,fd);
1133 			} else {
1134 				tcpclose(fd);
1135 			}
1136 		} else {
1137 			tcpclose(fd);
1138 		}
1139 
1140 #ifdef WORKER_DEBUG
1141 		now = monotonic_seconds();
1142 		workingtime = now - start;
1143 
1144 		cl=0;
1145 		for (cnt=0 ; cnt<chainelements ; cnt++) {
1146 			cl+=snprintf(debugchain+cl,200-cl,"%u.%u.%u.%u:%u->",(chainip[cnt]>>24)&255,(chainip[cnt]>>16)&255,(chainip[cnt]>>8)&255,chainip[cnt]&255,chainport[cnt]);
1147 		}
1148 		if (cl>=2) {
1149 			debugchain[cl-2]='\0';
1150 		}
1151 		syslog(LOG_NOTICE,"worker %lu sent %"PRIu32" blocks (%"PRIu32" partial) of chunk %016"PRIX64"_%08"PRIX32", received status for %"PRIu32" blocks (%"PRIu32" lost), bw: %.6lfMB/s ( %"PRIu32" B / %.6lf s ), chain: %s",(unsigned long)arg,nextwriteid-1,partialblocks,chunkid,version,nextwriteid-1-waitforstatus,waitforstatus,(double)bytessent/workingtime,bytessent,workingtime,debugchain);
1152 #endif
1153 
1154 		for (cnt=0 ; cnt<10 ; cnt++) {
1155 			westatus = fs_writeend(chunkid,id->inode,mfleng);
1156 			if (westatus==ERROR_ENOENT || westatus==ERROR_QUOTA) {
1157 				break;
1158 			} else if (westatus!=STATUS_OK) {
1159 				portable_usleep(100000+(10000<<cnt));
1160 			} else {
1161 				break;
1162 			}
1163 		}
1164 
1165 		if (westatus==ERROR_ENOENT) {
1166 			write_job_end(id,EBADF,0);
1167 		} else if (westatus==ERROR_QUOTA) {
1168 			write_job_end(id,EDQUOT,0);
1169 		} else if (westatus!=STATUS_OK) {
1170 			write_job_end(id,ENXIO,0);
1171 		} else if (status!=0 || wrstatus!=STATUS_OK) {
1172 			if (wrstatus!=STATUS_OK) {	// convert MFS status to OS errno
1173 				if (wrstatus==ERROR_NOSPACE) {
1174 					status=ENOSPC;
1175 				} else {
1176 					status=EIO;
1177 				}
1178 			}
1179 			id->trycnt++;
1180 			if (id->trycnt>=maxretries) {
1181 				write_job_end(id,status,0);
1182 			} else {
1183 				write_job_end(id,0,1+((id->trycnt<30)?(id->trycnt/3):10));
1184 			}
1185 		} else {
1186 //			read_inode_ops(id->inode);
1187 			read_inode_set_length(id->inode,mfleng,0);
1188 			write_job_end(id,0,0);
1189 		}
1190 	}
1191 }
1192 
1193 /* API | glock: INITIALIZED,UNLOCKED */
write_data_init(uint32_t cachesize,uint32_t retries)1194 void write_data_init (uint32_t cachesize,uint32_t retries) {
1195 	uint32_t cacheblockcount = (cachesize/MFSBLOCKSIZE);
1196 	uint32_t i;
1197 	sigset_t oldset;
1198 	sigset_t newset;
1199 
1200 	maxretries = retries;
1201 	if (cacheblockcount<10) {
1202 		cacheblockcount=10;
1203 	}
1204 	zassert(pthread_mutex_init(&glock,NULL));
1205 	zassert(pthread_cond_init(&worker_term_cond,NULL));
1206 	worker_term_waiting = 0;
1207 
1208 	zassert(pthread_cond_init(&fcbcond,NULL));
1209 	fcbwaiting=0;
1210 	cacheblocks = malloc(sizeof(cblock)*cacheblockcount);
1211 	for (i=0 ; i<cacheblockcount-1 ; i++) {
1212 		cacheblocks[i].next = cacheblocks+(i+1);
1213 	}
1214 	cacheblocks[cacheblockcount-1].next = NULL;
1215 	freecblockshead = cacheblocks;
1216 	freecacheblocks = cacheblockcount;
1217 
1218 	idhash = malloc(sizeof(inodedata*)*IDHASHSIZE);
1219 	for (i=0 ; i<IDHASHSIZE ; i++) {
1220 		idhash[i]=NULL;
1221 	}
1222 
1223 	dqueue = queue_new(0);
1224 	jqueue = queue_new(0);
1225 
1226         zassert(pthread_attr_init(&worker_thattr));
1227         zassert(pthread_attr_setstacksize(&worker_thattr,0x100000));
1228 	sigemptyset(&newset);
1229 	sigaddset(&newset, SIGTERM);
1230 	sigaddset(&newset, SIGINT);
1231 	sigaddset(&newset, SIGHUP);
1232 	sigaddset(&newset, SIGQUIT);
1233 	zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
1234         zassert(pthread_create(&dqueue_worker_th,&worker_thattr,write_dqueue_worker,NULL));
1235 	zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
1236 
1237 	zassert(pthread_mutex_lock(&glock));
1238 	workers_avail = 0;
1239 	workers_total = 0;
1240 	write_data_spawn_worker();
1241 	zassert(pthread_mutex_unlock(&glock));
1242 #ifdef BUFFER_DEBUG
1243 	zassert(pthread_create(&info_worker_th,&worker_thattr,write_info_worker,NULL));
1244 #endif
1245 }
1246 
write_data_term(void)1247 void write_data_term(void) {
1248 	uint32_t i;
1249 	inodedata *id,*idn;
1250 
1251 	queue_close(dqueue);
1252 	queue_close(jqueue);
1253 	zassert(pthread_mutex_lock(&glock));
1254 	while (workers_total>0) {
1255 		worker_term_waiting++;
1256 		zassert(pthread_cond_wait(&worker_term_cond,&glock));
1257 	}
1258 	zassert(pthread_mutex_unlock(&glock));
1259 	zassert(pthread_join(dqueue_worker_th,NULL));
1260 	queue_delete(dqueue);
1261 	queue_delete(jqueue);
1262 	for (i=0 ; i<IDHASHSIZE ; i++) {
1263 		for (id = idhash[i] ; id ; id = idn) {
1264 			idn = id->next;
1265 			zassert(pthread_cond_destroy(&(id->flushcond)));
1266 			zassert(pthread_cond_destroy(&(id->writecond)));
1267 			close(id->pipe[0]);
1268 			close(id->pipe[1]);
1269 			free(id);
1270 		}
1271 	}
1272 	free(idhash);
1273 	free(cacheblocks);
1274 	zassert(pthread_attr_destroy(&worker_thattr));
1275 	zassert(pthread_cond_destroy(&worker_term_cond));
1276 	zassert(pthread_cond_destroy(&fcbcond));
1277 	zassert(pthread_mutex_destroy(&glock));
1278 }
1279 
1280 /* glock: LOCKED */
write_cb_expand(inodedata * id,cblock * cb,uint32_t from,uint32_t to,const uint8_t * data)1281 int write_cb_expand(inodedata *id,cblock *cb,uint32_t from,uint32_t to,const uint8_t *data) {
1282 	if (cb->writeid>0 || from>cb->to || to<cb->from) {	// can't expand
1283 		return -1;
1284 	}
1285 	memcpy(cb->data+from,data,to-from);
1286 	if (from<cb->from) {
1287 		cb->from = from;
1288 	}
1289 	if (to>cb->to) {
1290 		cb->to = to;
1291 	}
1292 	if (cb->to-cb->from==MFSBLOCKSIZE && cb->next==NULL && id->waitingworker==2) {
1293 		if (write(id->pipe[1]," ",1)!=1) {
1294 			syslog(LOG_ERR,"can't write to pipe !!!");
1295 		}
1296 		id->waitingworker=0;
1297 	}
1298 	return 0;
1299 }
1300 
1301 /* glock: UNLOCKED */
write_block(inodedata * id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t * data)1302 int write_block(inodedata *id,uint32_t chindx,uint16_t pos,uint32_t from,uint32_t to,const uint8_t *data) {
1303 	cblock *cb;
1304 
1305 	zassert(pthread_mutex_lock(&glock));
1306 	for (cb=id->datachaintail ; cb ; cb=cb->prev) {
1307 		if (cb->pos==pos && cb->chindx==chindx) {
1308 			if (write_cb_expand(id,cb,from,to,data)==0) {
1309 				zassert(pthread_mutex_unlock(&glock));
1310 				return 0;
1311 			} else {
1312 				break;
1313 			}
1314 		}
1315 	}
1316 
1317 	cb = write_cb_acquire(id);
1318 //	syslog(LOG_NOTICE,"write_block: acquired new cache block");
1319 	cb->chindx = chindx;
1320 	cb->pos = pos;
1321 	cb->from = from;
1322 	cb->to = to;
1323 	memcpy(cb->data+from,data,to-from);
1324 	cb->prev = id->datachaintail;
1325 	cb->next = NULL;
1326 	if (id->datachaintail!=NULL) {
1327 		id->datachaintail->next = cb;
1328 	} else {
1329 		id->datachainhead = cb;
1330 	}
1331 	id->datachaintail = cb;
1332 	if (id->inqueue) {
1333 		if (id->waitingworker) {
1334 			if (write(id->pipe[1]," ",1)!=1) {
1335 				syslog(LOG_ERR,"can't write to pipe !!!");
1336 			}
1337 			id->waitingworker=0;
1338 		}
1339 	} else {
1340 		id->inqueue=1;
1341 		write_enqueue(id);
1342 	}
1343 	zassert(pthread_mutex_unlock(&glock));
1344 //	zassert(pthread_mutex_unlock(&(wc->lock)));
1345 	return 0;
1346 }
1347 
1348 /* API | glock: UNLOCKED */
write_data(void * vid,uint64_t offset,uint32_t size,const uint8_t * data)1349 int write_data(void *vid,uint64_t offset,uint32_t size,const uint8_t *data) {
1350 	uint32_t chindx;
1351 	uint16_t pos;
1352 	uint32_t from;
1353 	int status;
1354 	inodedata *id = (inodedata*)vid;
1355 	if (id==NULL) {
1356 		return EIO;
1357 	}
1358 //	int64_t s,e;
1359 
1360 //	s = monotonic_useconds();
1361 	zassert(pthread_mutex_lock(&glock));
1362 //	syslog(LOG_NOTICE,"write_data: inode:%"PRIu32" offset:%"PRIu64" size:%"PRIu32,id->inode,offset,size);
1363 //	id = write_get_inodedata(inode);
1364 	status = id->status;
1365 	if (status==0) {
1366 		if (offset+size>id->maxfleng) {	// move fleng
1367 			id->maxfleng = offset+size;
1368 		}
1369 		id->writewaiting++;
1370 		while (id->flushwaiting>0) {
1371 			zassert(pthread_cond_wait(&(id->writecond),&glock));
1372 		}
1373 		id->writewaiting--;
1374 	}
1375 	zassert(pthread_mutex_unlock(&glock));
1376 	if (status!=0) {
1377 		return status;
1378 	}
1379 
1380 	chindx = offset>>MFSCHUNKBITS;
1381 	pos = (offset&MFSCHUNKMASK)>>MFSBLOCKBITS;
1382 	from = offset&MFSBLOCKMASK;
1383 	while (size>0) {
1384 		if (size>MFSBLOCKSIZE-from) {
1385 			if (write_block(id,chindx,pos,from,MFSBLOCKSIZE,data)<0) {
1386 				return EIO;
1387 			}
1388 			size -= (MFSBLOCKSIZE-from);
1389 			data += (MFSBLOCKSIZE-from);
1390 			from = 0;
1391 			pos++;
1392 			if (pos==1024) {
1393 				pos = 0;
1394 				chindx++;
1395 			}
1396 		} else {
1397 			if (write_block(id,chindx,pos,from,from+size,data)<0) {
1398 				return EIO;
1399 			}
1400 			size = 0;
1401 		}
1402 	}
1403 //	e = monotonic_useconds();
1404 //	syslog(LOG_NOTICE,"write_data time: %"PRId64,e-s);
1405 	return 0;
1406 }
1407 
1408 /* API | glock: UNLOCKED */
write_data_new(uint32_t inode)1409 void* write_data_new(uint32_t inode) {
1410 	inodedata* id;
1411 	zassert(pthread_mutex_lock(&glock));
1412 	id = write_get_inodedata(inode);
1413 	if (id==NULL) {
1414 		zassert(pthread_mutex_unlock(&glock));
1415 		return NULL;
1416 	}
1417 	id->lcnt++;
1418 //	zassert(pthread_mutex_unlock(&(id->lock)));
1419 	zassert(pthread_mutex_unlock(&glock));
1420 	return id;
1421 }
1422 
1423 /* common flush routine | glock: LOCKED */
write_data_do_flush(inodedata * id,uint8_t releaseflag)1424 static int write_data_do_flush(inodedata *id,uint8_t releaseflag) {
1425 	int ret;
1426 //	int64_t s,e;
1427 
1428 //	s = monotonic_useconds();
1429 	id->flushwaiting++;
1430 	while (id->inqueue) {
1431 		if (id->waitingworker) {
1432 			if (write(id->pipe[1]," ",1)!=1) {
1433 				syslog(LOG_ERR,"can't write to pipe !!!");
1434 			}
1435 			id->waitingworker=0;
1436 		}
1437 //		syslog(LOG_NOTICE,"flush: wait ...");
1438 		zassert(pthread_cond_wait(&(id->flushcond),&glock));
1439 //		syslog(LOG_NOTICE,"flush: woken up");
1440 	}
1441 	id->flushwaiting--;
1442 	if (id->flushwaiting==0 && id->writewaiting>0) {
1443 		zassert(pthread_cond_broadcast(&(id->writecond)));
1444 	}
1445 	ret = id->status;
1446 	if (releaseflag) {
1447 		id->lcnt--;
1448 	}
1449 	if (id->lcnt==0 && id->inqueue==0 && id->flushwaiting==0 && id->writewaiting==0) {
1450 		write_free_inodedata(id);
1451 	}
1452 //	e = monotonic_useconds();
1453 //	syslog(LOG_NOTICE,"flush time: %"PRId64,e-s);
1454 	return ret;
1455 }
1456 
1457 /* API | glock: UNLOCKED */
write_data_flush(void * vid)1458 int write_data_flush(void *vid) {
1459 	int ret;
1460 	if (vid==NULL) {
1461 		return EIO;
1462 	}
1463 	zassert(pthread_mutex_lock(&glock));
1464 	ret = write_data_do_flush((inodedata*)vid,0);
1465 	zassert(pthread_mutex_unlock(&glock));
1466 	return ret;
1467 }
1468 
1469 /* API | glock: UNLOCKED */
write_data_setmaxfleng(uint32_t inode,uint64_t maxfleng)1470 void write_data_setmaxfleng(uint32_t inode,uint64_t maxfleng) {
1471 	inodedata* id;
1472 	zassert(pthread_mutex_lock(&glock));
1473 	id = write_find_inodedata(inode);
1474 	if (id) {
1475 		id->maxfleng = maxfleng;
1476 	}
1477 	zassert(pthread_mutex_unlock(&glock));
1478 }
1479 
1480 /* API | glock: UNLOCKED */
write_data_getmaxfleng(uint32_t inode)1481 uint64_t write_data_getmaxfleng(uint32_t inode) {
1482 	uint64_t maxfleng;
1483 	inodedata* id;
1484 	zassert(pthread_mutex_lock(&glock));
1485 	id = write_find_inodedata(inode);
1486 	if (id) {
1487 		maxfleng = id->maxfleng;
1488 	} else {
1489 		maxfleng = 0;
1490 	}
1491 	zassert(pthread_mutex_unlock(&glock));
1492 	return maxfleng;
1493 }
1494 
1495 /* API | glock: UNLOCKED */
write_data_flush_inode(uint32_t inode)1496 int write_data_flush_inode(uint32_t inode) {
1497 	inodedata* id;
1498 	int ret;
1499 	zassert(pthread_mutex_lock(&glock));
1500 	id = write_find_inodedata(inode);
1501 	if (id==NULL) {
1502 		zassert(pthread_mutex_unlock(&glock));
1503 		return 0;
1504 	}
1505 	ret = write_data_do_flush(id,0);
1506 	zassert(pthread_mutex_unlock(&glock));
1507 	return ret;
1508 }
1509 
1510 /* API | glock: UNLOCKED */
write_data_end(void * vid)1511 int write_data_end(void *vid) {
1512 	int ret;
1513 	if (vid==NULL) {
1514 		return EIO;
1515 	}
1516 	zassert(pthread_mutex_lock(&glock));
1517 	ret = write_data_do_flush(vid,1);
1518 	zassert(pthread_mutex_unlock(&glock));
1519 	return ret;
1520 }
1521