1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30 #include <poll.h>
31 #include <sys/time.h>
32 #include <syslog.h>
33 #include <errno.h>
34 #include <inttypes.h>
35 #include <pthread.h>
36 
37 #include "MFSCommunication.h"
38 #include "hddspacemgr.h"
39 #include "sockets.h"
40 #include "crc.h"
41 #include "slogger.h"
42 #include "datapack.h"
43 #include "massert.h"
44 #include "mfsstrerr.h"
45 #include "clocks.h"
46 
47 #include "replicator.h"
48 
49 #define CONNMSECTO 5000
50 #define SENDMSECTO 5000
51 #define RECVMSECTO 5000
52 
53 #define MAX_RECV_PACKET_SIZE (20+MFSBLOCKSIZE)
54 
55 typedef enum {IDLE,CONNECTING,HEADER,DATA} modetype;
56 
57 typedef struct _repsrc {
58 	int sock;
59 	modetype mode;
60 	uint8_t hdrbuff[8];
61 	uint8_t *packet;
62 	uint8_t *startptr;
63 	uint32_t bytesleft;
64 
65 	uint64_t chunkid;
66 	uint32_t version;
67 	uint16_t blocks;
68 
69 	uint32_t ip;
70 	uint16_t port;
71 } repsrc;
72 
73 typedef struct _replication {
74 	uint64_t chunkid;
75 	uint32_t version;
76 
77 	uint8_t *xorbuff;
78 
79 	uint8_t created,opened;
80 	uint8_t srccnt;
81 	struct pollfd *fds;
82 	repsrc *repsources;
83 } replication;
84 
85 static uint32_t stats_repl = 0;
86 static uint64_t stats_bytesin = 0;
87 static uint64_t stats_bytesout = 0;
88 static pthread_mutex_t statslock = PTHREAD_MUTEX_INITIALIZER;
89 
replicator_stats(uint64_t * bin,uint64_t * bout,uint32_t * repl)90 void replicator_stats(uint64_t *bin,uint64_t *bout,uint32_t *repl) {
91 	pthread_mutex_lock(&statslock);
92 	*bin = stats_bytesin;
93 	*bout = stats_bytesout;
94 	*repl = stats_repl;
95 	stats_repl = 0;
96 	stats_bytesin = 0;
97 	stats_bytesout = 0;
98 	pthread_mutex_unlock(&statslock);
99 }
100 
replicator_bytesin(uint64_t bytes)101 static inline void replicator_bytesin(uint64_t bytes) {
102 	zassert(pthread_mutex_lock(&statslock));
103 	stats_bytesin += bytes;
104 	zassert(pthread_mutex_unlock(&statslock));
105 }
106 
replicator_bytesout(uint64_t bytes)107 static inline void replicator_bytesout(uint64_t bytes) {
108 	zassert(pthread_mutex_lock(&statslock));
109 	stats_bytesout += bytes;
110 	zassert(pthread_mutex_unlock(&statslock));
111 }
112 
xordata(uint8_t * dst,const uint8_t * src,uint32_t leng)113 static void xordata(uint8_t *dst,const uint8_t *src,uint32_t leng) {
114 	uint32_t *dst4;
115 	const uint32_t *src4;
116 #define XOR_ONE_BYTE (*dst++)^=(*src++)
117 #define XOR_FOUR_BYTES (*dst4++)^=(*src4++)
118 	if (((unsigned long)dst&3)==((unsigned long)src&3)) {
119 		while (leng && ((unsigned long)src & 3)) {
120 			XOR_ONE_BYTE;
121 			leng--;
122 		}
123 		dst4 = (uint32_t*)dst;
124 		src4 = (const uint32_t*)src;
125 		while (leng>=32) {
126 			XOR_FOUR_BYTES;
127 			XOR_FOUR_BYTES;
128 			XOR_FOUR_BYTES;
129 			XOR_FOUR_BYTES;
130 			XOR_FOUR_BYTES;
131 			XOR_FOUR_BYTES;
132 			XOR_FOUR_BYTES;
133 			XOR_FOUR_BYTES;
134 			leng-=32;
135 		}
136 		while (leng>=4) {
137 			XOR_FOUR_BYTES;
138 			leng-=4;
139 		}
140 		src = (const uint8_t*)src4;
141 		dst = (uint8_t*)dst4;
142 		if (leng) do {
143 			XOR_ONE_BYTE;
144 		} while (--leng);
145 	} else {
146 		while (leng>=8) {
147 			XOR_ONE_BYTE;
148 			XOR_ONE_BYTE;
149 			XOR_ONE_BYTE;
150 			XOR_ONE_BYTE;
151 			XOR_ONE_BYTE;
152 			XOR_ONE_BYTE;
153 			XOR_ONE_BYTE;
154 			XOR_ONE_BYTE;
155 			leng-=8;
156 		}
157 		if (leng>0) do {
158 			XOR_ONE_BYTE;
159 		} while (--leng);
160 	}
161 }
162 
rep_read(repsrc * rs)163 static int rep_read(repsrc *rs) {
164 	int32_t i;
165 	uint32_t type;
166 	uint32_t size;
167 	const uint8_t *ptr;
168 	while (rs->bytesleft>0) {
169 		i=read(rs->sock,rs->startptr,rs->bytesleft);
170 		if (i==0) {
171 			syslog(LOG_NOTICE,"replicator: connection lost");
172 			return -1;
173 		}
174 		if (i<0) {
175 			if (ERRNO_ERROR) {
176 				mfs_errlog_silent(LOG_NOTICE,"replicator: read error");
177 				return -1;
178 			}
179 			return 0;
180 		}
181 		replicator_bytesin(i);
182 //		stats_bytesin+=i;
183 		rs->startptr+=i;
184 		rs->bytesleft-=i;
185 
186 		if (rs->bytesleft>0) {
187 			return 0;
188 		}
189 
190 		if (rs->mode==HEADER) {
191 			ptr = rs->hdrbuff;
192 			type = get32bit(&ptr);
193 			size = get32bit(&ptr);
194 			if (type==ANTOAN_NOP && size==0) { // NOP
195 				rs->startptr = rs->hdrbuff;
196 				rs->bytesleft = 8;
197 				return 0;
198 			}
199 
200 			if (rs->packet) {
201 				free(rs->packet);
202 			}
203 			if (size>0) {
204 				if (size>MAX_RECV_PACKET_SIZE) {
205 					syslog(LOG_WARNING,"replicator: packet too long (%"PRIu32"/%u)",size,MAX_RECV_PACKET_SIZE);
206 					return -1;
207 				}
208 				rs->packet = malloc(size);
209 				passert(rs->packet);
210 				rs->startptr = rs->packet;
211 			} else {
212 				rs->packet = NULL;
213 			}
214 			rs->bytesleft = size;
215 			rs->mode = DATA;
216 		}
217 	}
218 	return 0;
219 }
220 
rep_receive_all_packets(replication * r,uint32_t msecto)221 static int rep_receive_all_packets(replication *r,uint32_t msecto) {
222 	uint8_t i,l;
223 	uint64_t st;
224 	uint32_t msec;
225 	st = monotonic_useconds();
226 	for (;;) {
227 		l=1;
228 		for (i=0 ; i<r->srccnt ; i++) {
229 			if (r->repsources[i].bytesleft>0) {
230 				r->fds[i].events = POLLIN;
231 				l=0;
232 			} else {
233 				r->fds[i].events = 0;
234 			}
235 		}
236 		if (l) {	// finished
237 			return 0;
238 		}
239 		msec = (monotonic_useconds()-st)/1000;
240 		if (msec>=msecto) {
241 			syslog(LOG_NOTICE,"replicator: receive timed out");
242 			return -1; // timed out
243 		}
244 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
245 			if (errno!=EINTR && ERRNO_ERROR) {
246 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
247 				return -1;
248 			}
249 			continue;
250 		}
251 		for (i=0 ; i<r->srccnt ; i++) {
252 			if (r->fds[i].revents & POLLHUP) {
253 				syslog(LOG_NOTICE,"replicator: connection lost");
254 				return -1;
255 			}
256 			if (r->fds[i].revents & POLLIN) {
257 				if (rep_read(r->repsources+i)<0) {
258 					return -1;
259 				}
260 			}
261 		}
262 	}
263 }
264 
rep_create_packet(repsrc * rs,uint32_t type,uint32_t size)265 static uint8_t* rep_create_packet(repsrc *rs,uint32_t type,uint32_t size) {
266 	uint8_t *ptr;
267 	if (rs->packet) {
268 		free(rs->packet);
269 	}
270 	rs->packet = malloc(size+8);
271 	passert(rs->packet);
272 	ptr = rs->packet;
273 	put32bit(&ptr,type);
274 	put32bit(&ptr,size);
275 	rs->startptr = rs->packet;
276 	rs->bytesleft = 8+size;
277 	return ptr;
278 }
279 
rep_no_packet(repsrc * rs)280 static void rep_no_packet(repsrc *rs) {
281 	if (rs->packet) {
282 		free(rs->packet);
283 	}
284 	rs->packet=NULL;
285 	rs->startptr=NULL;
286 	rs->bytesleft=0;
287 }
288 
rep_write(repsrc * rs)289 static int rep_write(repsrc *rs) {
290 	int i;
291 	i = write(rs->sock,rs->startptr,rs->bytesleft);
292 	if (i==0) {
293 		syslog(LOG_NOTICE,"replicator: connection lost");
294 		return -1;
295 	}
296 	if (i<0) {
297 		if (ERRNO_ERROR) {
298 			mfs_errlog_silent(LOG_NOTICE,"replicator: write error");
299 			return -1;
300 		}
301 		return 0;
302 	}
303 	replicator_bytesout(i);
304 //	stats_bytesin+=i;
305 	rs->startptr+=i;
306 	rs->bytesleft-=i;
307 	return 0;
308 }
309 
rep_send_all_packets(replication * r,uint32_t msecto)310 static int rep_send_all_packets(replication *r,uint32_t msecto) {
311 	uint8_t i,l;
312 	uint64_t st;
313 	uint32_t msec;
314 	st = monotonic_useconds();
315 	for (;;) {
316 		l=1;
317 		for (i=0 ; i<r->srccnt ; i++) {
318 			if (r->repsources[i].bytesleft>0) {
319 				r->fds[i].events = POLLOUT;
320 				l=0;
321 			} else {
322 				r->fds[i].events = 0;
323 			}
324 		}
325 		if (l) {	// finished
326 			return 0;
327 		}
328 		msec = (monotonic_useconds()-st)/1000;
329 		if (msec>=msecto) {
330 			syslog(LOG_NOTICE,"replicator: send timed out");
331 			return -1; // timed out
332 		}
333 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
334 			if (errno!=EINTR && ERRNO_ERROR) {
335 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
336 				return -1;
337 			}
338 			continue;
339 		}
340 		for (i=0 ; i<r->srccnt ; i++) {
341 			if (r->fds[i].revents & POLLHUP) {
342 				syslog(LOG_NOTICE,"replicator: connection lost");
343 				return -1;
344 			}
345 			if (r->fds[i].revents & POLLOUT) {
346 				if (rep_write(r->repsources+i)<0) {
347 					return -1;
348 				}
349 			}
350 		}
351 	}
352 }
353 
rep_wait_for_connection(replication * r,uint32_t msecto)354 static int rep_wait_for_connection(replication *r,uint32_t msecto) {
355 	uint8_t i,l;
356 	uint64_t st;
357 	uint32_t msec;
358 	st = monotonic_useconds();
359 	for (;;) {
360 		l=1;
361 		for (i=0 ; i<r->srccnt ; i++) {
362 			if (r->repsources[i].mode==CONNECTING) {
363 				r->fds[i].events = POLLOUT;
364 				l=0;
365 			} else {
366 				r->fds[i].events = 0;
367 			}
368 		}
369 		if (l) {	// finished
370 			return 0;
371 		}
372 		msec = (monotonic_useconds()-st)/1000;
373 		if (msec>=msecto) {
374 			syslog(LOG_NOTICE,"replicator: connect timed out");
375 			return -1; // timed out
376 		}
377 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
378 			if (errno!=EINTR && ERRNO_ERROR) {
379 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
380 				return -1;
381 			}
382 			continue;
383 		}
384 		for (i=0 ; i<r->srccnt ; i++) {
385 			if (r->fds[i].revents & POLLHUP) {
386 				syslog(LOG_NOTICE,"replicator: connection lost");
387 				return -1;
388 			}
389 			if (r->fds[i].revents & POLLOUT) {
390 				if (tcpgetstatus(r->repsources[i].sock)<0) {
391 					mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
392 					return -1;
393 				}
394 				r->repsources[i].mode=IDLE;
395 			}
396 		}
397 	}
398 }
399 
rep_cleanup(replication * r)400 static void rep_cleanup(replication *r) {
401 	int i;
402 	if (r->opened) {
403 		hdd_close(r->chunkid);
404 	}
405 	if (r->created) {
406 		hdd_delete(r->chunkid,0);
407 	}
408 	for (i=0 ; i<r->srccnt ; i++) {
409 		if (r->repsources[i].sock>=0) {
410 			tcpclose(r->repsources[i].sock);
411 		}
412 		if (r->repsources[i].packet) {
413 			free(r->repsources[i].packet);
414 		}
415 	}
416 	if (r->fds) {
417 		free(r->fds);
418 	}
419 	if (r->repsources) {
420 		free(r->repsources);
421 	}
422 	if (r->xorbuff) {
423 		free(r->xorbuff);
424 	}
425 }
426 
427 /* srcs: srccnt * (chunkid:64 version:32 ip:32 port:16) */
replicate(uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t * srcs)428 uint8_t replicate(uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t *srcs) {
429 	replication r;
430 	uint8_t status,i,vbuffs,first;
431 	uint16_t b,blocks;
432 	uint32_t xcrc,crc;
433 	uint8_t *wptr;
434 	const uint8_t *rptr;
435 	int s;
436 
437 	if (srccnt==0) {
438 		return ERROR_EINVAL;
439 	}
440 
441 //	syslog(LOG_NOTICE,"replication begin (chunkid:%08"PRIX64",version:%04"PRIX32",srccnt:%"PRIu8")",chunkid,version,srccnt);
442 
443 	pthread_mutex_lock(&statslock);
444 	stats_repl++;
445 	pthread_mutex_unlock(&statslock);
446 
447 // init replication structure
448 	r.chunkid = chunkid;
449 	r.version = version;
450 	r.srccnt = 0;
451 	r.created = 0;
452 	r.opened = 0;
453 	r.fds = malloc(sizeof(struct pollfd)*srccnt);
454 	passert(r.fds);
455 	r.repsources = malloc(sizeof(repsrc)*srccnt);
456 	passert(r.repsources);
457 	if (srccnt>1) {
458 		r.xorbuff = malloc(MFSBLOCKSIZE+4);
459 		passert(r.xorbuff);
460 	} else {
461 		r.xorbuff = NULL;
462 	}
463 // create chunk
464 	status = hdd_create(chunkid,0);
465 	if (status!=STATUS_OK) {
466 		syslog(LOG_NOTICE,"replicator: hdd_create status: %s",mfsstrerr(status));
467 		rep_cleanup(&r);
468 		return status;
469 	}
470 	r.created = 1;
471 // init sources
472 	r.srccnt = srccnt;
473 	for (i=0 ; i<srccnt ; i++) {
474 		r.repsources[i].chunkid = get64bit(&srcs);
475 		r.repsources[i].version = get32bit(&srcs);
476 		r.repsources[i].ip = get32bit(&srcs);
477 		r.repsources[i].port = get16bit(&srcs);
478 		r.repsources[i].sock = -1;
479 		r.repsources[i].packet = NULL;
480 	}
481 // connect
482 	for (i=0 ; i<srccnt ; i++) {
483 		s = tcpsocket();
484 		if (s<0) {
485 			mfs_errlog_silent(LOG_NOTICE,"replicator: socket error");
486 			rep_cleanup(&r);
487 			return ERROR_CANTCONNECT;
488 		}
489 		r.repsources[i].sock = s;
490 		r.fds[i].fd = s;
491 		if (tcpnonblock(s)<0) {
492 			mfs_errlog_silent(LOG_NOTICE,"replicator: nonblock error");
493 			rep_cleanup(&r);
494 			return ERROR_CANTCONNECT;
495 		}
496 		s = tcpnumconnect(s,r.repsources[i].ip,r.repsources[i].port);
497 		if (s<0) {
498 			mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
499 			rep_cleanup(&r);
500 			return ERROR_CANTCONNECT;
501 		}
502 		if (s==0) {
503 			r.repsources[i].mode = IDLE;
504 		} else {
505 			r.repsources[i].mode = CONNECTING;
506 		}
507 	}
508 	if (rep_wait_for_connection(&r,CONNMSECTO)<0) {
509 		rep_cleanup(&r);
510 		return ERROR_CANTCONNECT;
511 	}
512 // open chunk
513 	status = hdd_open(chunkid,0);
514 	if (status!=STATUS_OK) {
515 		syslog(LOG_NOTICE,"replicator: hdd_open status: %s",mfsstrerr(status));
516 		rep_cleanup(&r);
517 		return status;
518 	}
519 	r.opened = 1;
520 // get block numbers
521 	for (i=0 ; i<srccnt ; i++) {
522 		wptr = rep_create_packet(r.repsources+i,ANTOCS_GET_CHUNK_BLOCKS,8+4);
523 		if (wptr==NULL) {
524 			syslog(LOG_NOTICE,"replicator: out of memory");
525 			rep_cleanup(&r);
526 			return ERROR_OUTOFMEMORY;
527 		}
528 		put64bit(&wptr,r.repsources[i].chunkid);
529 		put32bit(&wptr,r.repsources[i].version);
530 	}
531 // send packet
532 	if (rep_send_all_packets(&r,SENDMSECTO)<0) {
533 		rep_cleanup(&r);
534 		return ERROR_DISCONNECTED;
535 	}
536 // receive answers
537 	for (i=0 ; i<srccnt ; i++) {
538 		r.repsources[i].mode = HEADER;
539 		r.repsources[i].startptr = r.repsources[i].hdrbuff;
540 		r.repsources[i].bytesleft = 8;
541 	}
542 	if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
543 		rep_cleanup(&r);
544 		return ERROR_DISCONNECTED;
545 	}
546 // get # of blocks
547 	blocks = 0;
548 	for (i=0 ; i<srccnt ; i++) {
549 		uint32_t type,size;
550 		uint64_t pchid;
551 		uint32_t pver;
552 		uint16_t pblocks;
553 		uint8_t pstatus;
554 		uint32_t ip;
555 		rptr = r.repsources[i].hdrbuff;
556 		type = get32bit(&rptr);
557 		size = get32bit(&rptr);
558 		rptr = r.repsources[i].packet;
559 		ip = r.repsources[i].ip;
560 		if (rptr==NULL || type!=CSTOAN_CHUNK_BLOCKS || size!=15) {
561 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
562 			rep_cleanup(&r);
563 			return ERROR_DISCONNECTED;
564 		}
565 		pchid = get64bit(&rptr);
566 		pver = get32bit(&rptr);
567 		pblocks = get16bit(&rptr);
568 		pstatus = get8bit(&rptr);
569 		if (pchid!=r.repsources[i].chunkid) {
570 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
571 			rep_cleanup(&r);
572 			return ERROR_WRONGCHUNKID;
573 		}
574 		if (pver!=r.repsources[i].version) {
575 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:version:%"PRIX32"/%"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",pver,r.repsources[i].version,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
576 			rep_cleanup(&r);
577 			return ERROR_WRONGVERSION;
578 		}
579 		if (pstatus!=STATUS_OK) {
580 			syslog(LOG_NOTICE,"replicator,get # of blocks: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
581 			rep_cleanup(&r);
582 			return pstatus;
583 		}
584 		r.repsources[i].blocks = pblocks;
585 		if (pblocks>blocks) {
586 			blocks=pblocks;
587 		}
588 	}
589 // create read request
590 	for (i=0 ; i<srccnt ; i++) {
591 		if (r.repsources[i].blocks>0) {
592 			uint32_t leng;
593 			wptr = rep_create_packet(r.repsources+i,CLTOCS_READ,8+4+4+4);
594 			if (wptr==NULL) {
595 				syslog(LOG_NOTICE,"replicator: out of memory");
596 				rep_cleanup(&r);
597 				return ERROR_OUTOFMEMORY;
598 			}
599 			leng = r.repsources[i].blocks*MFSBLOCKSIZE;
600 			put64bit(&wptr,r.repsources[i].chunkid);
601 			put32bit(&wptr,r.repsources[i].version);
602 			put32bit(&wptr,0);
603 			put32bit(&wptr,leng);
604 		} else {
605 			rep_no_packet(r.repsources+i);
606 		}
607 	}
608 // send read request
609 	if (rep_send_all_packets(&r,SENDMSECTO)<0) {
610 		rep_cleanup(&r);
611 		return ERROR_DISCONNECTED;
612 	}
613 // receive data and write to hdd
614 	for (b=0 ; b<blocks ; b++) {
615 // prepare receive
616 		for (i=0 ; i<srccnt ; i++) {
617 			if (b<r.repsources[i].blocks) {
618 				r.repsources[i].mode = HEADER;
619 				r.repsources[i].startptr = r.repsources[i].hdrbuff;
620 				r.repsources[i].bytesleft = 8;
621 			} else {
622 				r.repsources[i].mode = IDLE;
623 				r.repsources[i].bytesleft = 0;
624 			}
625 		}
626 // receive data
627 		if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
628 			rep_cleanup(&r);
629 			return ERROR_DISCONNECTED;
630 		}
631 // check packets
632 		vbuffs = 0;
633 		for (i=0 ; i<srccnt ; i++) {
634 			if (r.repsources[i].mode!=IDLE) {
635 				uint32_t type,size;
636 				uint64_t pchid;
637 				uint16_t pblocknum;
638 				uint16_t poffset;
639 				uint32_t psize;
640 				uint8_t pstatus;
641 				uint32_t ip;
642 				rptr = r.repsources[i].hdrbuff;
643 				type = get32bit(&rptr);
644 				size = get32bit(&rptr);
645 				rptr = r.repsources[i].packet;
646 				ip = r.repsources[i].ip;
647 				if (rptr==NULL) {
648 					rep_cleanup(&r);
649 					return ERROR_DISCONNECTED;
650 				}
651 				if (type==CSTOCL_READ_STATUS && size==9) {
652 					pchid = get64bit(&rptr);
653 					pstatus = get8bit(&rptr);
654 					if (pchid!=r.repsources[i].chunkid) {
655 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
656 						rep_cleanup(&r);
657 						return ERROR_WRONGCHUNKID;
658 					}
659 					if (pstatus==STATUS_OK) {	// got status too early or got incorrect packet
660 						syslog(LOG_WARNING,"replicator,read chunks: got unexpected ok status from (%u.%u.%u.%u:%04"PRIX16")",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
661 						rep_cleanup(&r);
662 						return ERROR_DISCONNECTED;
663 					}
664 					syslog(LOG_NOTICE,"replicator,read chunks: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
665 					rep_cleanup(&r);
666 					return pstatus;
667 				} else if (type==CSTOCL_READ_DATA && size==20+MFSBLOCKSIZE) {
668 					pchid = get64bit(&rptr);
669 					pblocknum = get16bit(&rptr);
670 					poffset = get16bit(&rptr);
671 					psize = get32bit(&rptr);
672 					if (pchid!=r.repsources[i].chunkid) {
673 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
674 						rep_cleanup(&r);
675 						return ERROR_WRONGCHUNKID;
676 					}
677 					if (pblocknum!=b) {
678 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:blocknum:%"PRIu16"/%"PRIu16") from (%u.%u.%u.%u:%04"PRIX16")",pblocknum,b,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
679 						rep_cleanup(&r);
680 						return ERROR_DISCONNECTED;
681 					}
682 					if (poffset!=0) {
683 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:offset:%"PRIu16") from (%u.%u.%u.%u:%04"PRIX16")",poffset,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
684 						rep_cleanup(&r);
685 						return ERROR_WRONGOFFSET;
686 					}
687 					if (psize!=MFSBLOCKSIZE) {
688 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:size:%"PRIu32") from (%u.%u.%u.%u:%04"PRIX16")",psize,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
689 						rep_cleanup(&r);
690 						return ERROR_WRONGSIZE;
691 					}
692 				} else {
693 					syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
694 					rep_cleanup(&r);
695 					return ERROR_DISCONNECTED;
696 				}
697 				vbuffs++;
698 			}
699 		}
700 // write data
701 		if (vbuffs==0) {	// no buffers ? - it should never happen
702 			syslog(LOG_WARNING,"replicator: no data received for block: %"PRIu16,b);
703 			rep_cleanup(&r);
704 			return ERROR_DISCONNECTED;
705 		} else if (vbuffs==1) { // xor not needed, so just find block and write it
706 			for (i=0 ; i<srccnt ; i++) {
707 				if (r.repsources[i].mode!=IDLE) {
708 					rptr = r.repsources[i].packet;
709 					status = hdd_write(chunkid,0,b,rptr+20,0,MFSBLOCKSIZE,rptr+16);
710 					if (status!=STATUS_OK) {
711 						syslog(LOG_WARNING,"replicator: write status: %s",mfsstrerr(status));
712 						rep_cleanup(&r);
713 						return status;
714 					}
715 				}
716 			}
717 		} else {
718 			first=1;
719 			if (vbuffs&1) {
720 				xcrc = 0;
721 			} else {
722 				xcrc = MFSCRCEMPTY; // = mycrc32_zeroblock(0,0x10000);
723 			}
724 			for (i=0 ; i<srccnt ; i++) {
725 				if (r.repsources[i].mode!=IDLE) {
726 					uint32_t ip;
727 					ip = r.repsources[i].ip;
728 					rptr = r.repsources[i].packet;
729 					rptr+=16;	// skip chunkid,blockno,offset and size
730 					if (first) {
731 						memcpy(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
732 						first=0;
733 					} else {
734 						xordata(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
735 					}
736 					crc = get32bit(&rptr);
737 					if (crc!=mycrc32(0,rptr,MFSBLOCKSIZE)) {
738 						syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%04"PRIX16")",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
739 						rep_cleanup(&r);
740 						return ERROR_CRC;
741 					}
742 					xcrc^=crc;
743 				}
744 			}
745 			wptr = r.xorbuff;
746 			put32bit(&wptr,xcrc);
747 			status = hdd_write(chunkid,0,b,r.xorbuff+4,0,MFSBLOCKSIZE,r.xorbuff);
748 			if (status!=STATUS_OK) {
749 				syslog(LOG_WARNING,"replicator: xor write status: %s",mfsstrerr(status));
750 				rep_cleanup(&r);
751 				return status;
752 			}
753 		}
754 	}
755 // receive status
756 	for (i=0 ; i<srccnt ; i++) {
757 		if (r.repsources[i].blocks>0) {
758 //			if (r.repsources[i].packet) {
759 //				free(r.repsources[i].packet);
760 //				r.repsources[i].packet=NULL;
761 //			}
762 			r.repsources[i].mode = HEADER;
763 			r.repsources[i].startptr = r.repsources[i].hdrbuff;
764 			r.repsources[i].bytesleft = 8;
765 		} else {
766 			r.repsources[i].mode = IDLE;
767 			r.repsources[i].bytesleft = 0;
768 		}
769 	}
770 	if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
771 		rep_cleanup(&r);
772 		return ERROR_DISCONNECTED;
773 	}
774 	for (i=0 ; i<srccnt ; i++) {
775 		if (r.repsources[i].blocks>0) {
776 			uint32_t type,size;
777 			uint64_t pchid;
778 			uint8_t pstatus;
779 			uint32_t ip;
780 			rptr = r.repsources[i].hdrbuff;
781 			type = get32bit(&rptr);
782 			size = get32bit(&rptr);
783 			rptr = r.repsources[i].packet;
784 			ip = r.repsources[i].ip;
785 			if (rptr==NULL || type!=CSTOCL_READ_STATUS || size!=9) {
786 				syslog(LOG_WARNING,"replicator,check status: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
787 				rep_cleanup(&r);
788 				return ERROR_DISCONNECTED;
789 			}
790 			pchid = get64bit(&rptr);
791 			pstatus = get8bit(&rptr);
792 			if (pchid!=r.repsources[i].chunkid) {
793 				syslog(LOG_WARNING,"replicator,check status: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
794 				rep_cleanup(&r);
795 				return ERROR_WRONGCHUNKID;
796 			}
797 			if (pstatus!=STATUS_OK) {
798 				syslog(LOG_NOTICE,"replicator,check status: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
799 				rep_cleanup(&r);
800 				return pstatus;
801 			}
802 		}
803 	}
804 // close chunk and change version
805 	status = hdd_close(chunkid);
806 	if (status!=STATUS_OK) {
807 		syslog(LOG_NOTICE,"replicator: hdd_close status: %s",mfsstrerr(status));
808 		rep_cleanup(&r);
809 		return status;
810 	}
811 	r.opened = 0;
812 	status = hdd_version(chunkid,0,version);
813 	if (status!=STATUS_OK) {
814 		syslog(LOG_NOTICE,"replicator: hdd_version status: %s",mfsstrerr(status));
815 		rep_cleanup(&r);
816 		return status;
817 	}
818 	r.created = 0;
819 	rep_cleanup(&r);
820 	return STATUS_OK;
821 }
822