1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30 #include <poll.h>
31 #include <sys/time.h>
32 #include <syslog.h>
33 #include <errno.h>
34 #include <inttypes.h>
35 #include <pthread.h>
36 
37 #include "MFSCommunication.h"
38 #include "hddspacemgr.h"
39 #include "sockets.h"
40 #include "crc.h"
41 #include "slogger.h"
42 #include "datapack.h"
43 #include "massert.h"
44 #include "mfsstrerr.h"
45 #include "clocks.h"
46 
47 #include "replicator.h"
48 
49 #define CONNMSECTO 5000
50 #define SENDMSECTO 5000
51 #define RECVMSECTO 5000
52 
53 #define MAX_RECV_PACKET_SIZE (20+MFSBLOCKSIZE)
54 
55 typedef enum {IDLE,CONNECTING,HEADER,DATA} modetype;
56 
57 typedef struct _repsrc {
58 	int sock;
59 	modetype mode;
60 	uint8_t hdrbuff[8];
61 	uint8_t *packet;
62 	uint8_t *startptr;
63 	uint32_t bytesleft;
64 
65 	uint64_t chunkid;
66 	uint32_t version;
67 	uint16_t blocks;
68 
69 	uint32_t ip;
70 	uint16_t port;
71 
72 	uint32_t crcsums[4];
73 } repsrc;
74 
75 typedef struct _replication {
76 	uint64_t chunkid;
77 	uint32_t version;
78 
79 	uint8_t *xorbuff;
80 
81 	uint8_t created,opened;
82 	uint8_t srccnt;
83 	struct pollfd *fds;
84 	repsrc *repsources;
85 } replication;
86 
87 static uint32_t stats_repl = 0;
88 static uint64_t stats_bytesin = 0;
89 static uint64_t stats_bytesout = 0;
90 static pthread_mutex_t statslock = PTHREAD_MUTEX_INITIALIZER;
91 
replicator_stats(uint64_t * bin,uint64_t * bout,uint32_t * repl)92 void replicator_stats(uint64_t *bin,uint64_t *bout,uint32_t *repl) {
93 	pthread_mutex_lock(&statslock);
94 	*bin = stats_bytesin;
95 	*bout = stats_bytesout;
96 	*repl = stats_repl;
97 	stats_repl = 0;
98 	stats_bytesin = 0;
99 	stats_bytesout = 0;
100 	pthread_mutex_unlock(&statslock);
101 }
102 
replicator_bytesin(uint64_t bytes)103 static inline void replicator_bytesin(uint64_t bytes) {
104 	zassert(pthread_mutex_lock(&statslock));
105 	stats_bytesin += bytes;
106 	zassert(pthread_mutex_unlock(&statslock));
107 }
108 
replicator_bytesout(uint64_t bytes)109 static inline void replicator_bytesout(uint64_t bytes) {
110 	zassert(pthread_mutex_lock(&statslock));
111 	stats_bytesout += bytes;
112 	zassert(pthread_mutex_unlock(&statslock));
113 }
114 
xordata(uint8_t * dst,const uint8_t * src,uint32_t leng)115 static void xordata(uint8_t *dst,const uint8_t *src,uint32_t leng) {
116 	uint32_t *dst4;
117 	const uint32_t *src4;
118 #define XOR_ONE_BYTE (*dst++)^=(*src++)
119 #define XOR_FOUR_BYTES (*dst4++)^=(*src4++)
120 	if (((unsigned long)dst&3)==((unsigned long)src&3)) {
121 		while (leng && ((unsigned long)src & 3)) {
122 			XOR_ONE_BYTE;
123 			leng--;
124 		}
125 		dst4 = (uint32_t*)dst;
126 		src4 = (const uint32_t*)src;
127 		while (leng>=32) {
128 			XOR_FOUR_BYTES;
129 			XOR_FOUR_BYTES;
130 			XOR_FOUR_BYTES;
131 			XOR_FOUR_BYTES;
132 			XOR_FOUR_BYTES;
133 			XOR_FOUR_BYTES;
134 			XOR_FOUR_BYTES;
135 			XOR_FOUR_BYTES;
136 			leng-=32;
137 		}
138 		while (leng>=4) {
139 			XOR_FOUR_BYTES;
140 			leng-=4;
141 		}
142 		src = (const uint8_t*)src4;
143 		dst = (uint8_t*)dst4;
144 		if (leng) do {
145 			XOR_ONE_BYTE;
146 		} while (--leng);
147 	} else {
148 		while (leng>=8) {
149 			XOR_ONE_BYTE;
150 			XOR_ONE_BYTE;
151 			XOR_ONE_BYTE;
152 			XOR_ONE_BYTE;
153 			XOR_ONE_BYTE;
154 			XOR_ONE_BYTE;
155 			XOR_ONE_BYTE;
156 			XOR_ONE_BYTE;
157 			leng-=8;
158 		}
159 		if (leng>0) do {
160 			XOR_ONE_BYTE;
161 		} while (--leng);
162 	}
163 }
164 
rep_read(repsrc * rs)165 static int rep_read(repsrc *rs) {
166 	int32_t i;
167 	uint32_t type;
168 	uint32_t size;
169 	const uint8_t *ptr;
170 	while (rs->bytesleft>0) {
171 		i=read(rs->sock,rs->startptr,rs->bytesleft);
172 		if (i==0) {
173 			syslog(LOG_NOTICE,"replicator: connection lost");
174 			return -1;
175 		}
176 		if (i<0) {
177 			if (ERRNO_ERROR) {
178 				mfs_errlog_silent(LOG_NOTICE,"replicator: read error");
179 				return -1;
180 			}
181 			return 0;
182 		}
183 		replicator_bytesin(i);
184 //		stats_bytesin+=i;
185 		rs->startptr+=i;
186 		rs->bytesleft-=i;
187 
188 		if (rs->bytesleft>0) {
189 			return 0;
190 		}
191 
192 		if (rs->mode==HEADER) {
193 			ptr = rs->hdrbuff;
194 			type = get32bit(&ptr);
195 			size = get32bit(&ptr);
196 			if (type==ANTOAN_NOP && size==0) { // NOP
197 				rs->startptr = rs->hdrbuff;
198 				rs->bytesleft = 8;
199 				return 0;
200 			}
201 
202 			if (rs->packet) {
203 				free(rs->packet);
204 			}
205 			if (size>0) {
206 				if (size>MAX_RECV_PACKET_SIZE) {
207 					syslog(LOG_WARNING,"replicator: packet too long (%"PRIu32"/%u) ; command:%"PRIu32,size,MAX_RECV_PACKET_SIZE,type);
208 					return -1;
209 				}
210 				rs->packet = malloc(size);
211 				passert(rs->packet);
212 				rs->startptr = rs->packet;
213 			} else {
214 				rs->packet = NULL;
215 			}
216 			rs->bytesleft = size;
217 			rs->mode = DATA;
218 		}
219 	}
220 	return 0;
221 }
222 
rep_receive_all_packets(replication * r,uint32_t msecto)223 static int rep_receive_all_packets(replication *r,uint32_t msecto) {
224 	uint8_t i,l;
225 	uint64_t st;
226 	uint32_t msec;
227 	st = monotonic_useconds();
228 	for (;;) {
229 		l=1;
230 		for (i=0 ; i<r->srccnt ; i++) {
231 			if (r->repsources[i].bytesleft>0) {
232 				r->fds[i].events = POLLIN;
233 				l=0;
234 			} else {
235 				r->fds[i].events = 0;
236 			}
237 		}
238 		if (l) {	// finished
239 			return 0;
240 		}
241 		msec = (monotonic_useconds()-st)/1000;
242 		if (msec>=msecto) {
243 			syslog(LOG_NOTICE,"replicator: receive timed out");
244 			return -1; // timed out
245 		}
246 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
247 			if (errno!=EINTR && ERRNO_ERROR) {
248 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
249 				return -1;
250 			}
251 			continue;
252 		}
253 		for (i=0 ; i<r->srccnt ; i++) {
254 			if (r->fds[i].revents & POLLHUP) {
255 				syslog(LOG_NOTICE,"replicator: connection lost");
256 				return -1;
257 			}
258 			if (r->fds[i].revents & POLLIN) {
259 				if (rep_read(r->repsources+i)<0) {
260 					return -1;
261 				}
262 			}
263 		}
264 	}
265 }
266 
rep_create_packet(repsrc * rs,uint32_t type,uint32_t size)267 static uint8_t* rep_create_packet(repsrc *rs,uint32_t type,uint32_t size) {
268 	uint8_t *ptr;
269 	if (rs->packet) {
270 		free(rs->packet);
271 	}
272 	rs->packet = malloc(size+8);
273 	passert(rs->packet);
274 	ptr = rs->packet;
275 	put32bit(&ptr,type);
276 	put32bit(&ptr,size);
277 	rs->startptr = rs->packet;
278 	rs->bytesleft = 8+size;
279 	return ptr;
280 }
281 
rep_no_packet(repsrc * rs)282 static void rep_no_packet(repsrc *rs) {
283 	if (rs->packet) {
284 		free(rs->packet);
285 	}
286 	rs->packet=NULL;
287 	rs->startptr=NULL;
288 	rs->bytesleft=0;
289 }
290 
rep_write(repsrc * rs)291 static int rep_write(repsrc *rs) {
292 	int i;
293 	i = write(rs->sock,rs->startptr,rs->bytesleft);
294 	if (i==0) {
295 		syslog(LOG_NOTICE,"replicator: connection lost");
296 		return -1;
297 	}
298 	if (i<0) {
299 		if (ERRNO_ERROR) {
300 			mfs_errlog_silent(LOG_NOTICE,"replicator: write error");
301 			return -1;
302 		}
303 		return 0;
304 	}
305 	replicator_bytesout(i);
306 //	stats_bytesin+=i;
307 	rs->startptr+=i;
308 	rs->bytesleft-=i;
309 	return 0;
310 }
311 
rep_send_all_packets(replication * r,uint32_t msecto)312 static int rep_send_all_packets(replication *r,uint32_t msecto) {
313 	uint8_t i,l;
314 	uint64_t st;
315 	uint32_t msec;
316 	st = monotonic_useconds();
317 	for (;;) {
318 		l=1;
319 		for (i=0 ; i<r->srccnt ; i++) {
320 			if (r->repsources[i].bytesleft>0) {
321 				r->fds[i].events = POLLOUT;
322 				l=0;
323 			} else {
324 				r->fds[i].events = 0;
325 			}
326 		}
327 		if (l) {	// finished
328 			return 0;
329 		}
330 		msec = (monotonic_useconds()-st)/1000;
331 		if (msec>=msecto) {
332 			syslog(LOG_NOTICE,"replicator: send timed out");
333 			return -1; // timed out
334 		}
335 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
336 			if (errno!=EINTR && ERRNO_ERROR) {
337 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
338 				return -1;
339 			}
340 			continue;
341 		}
342 		for (i=0 ; i<r->srccnt ; i++) {
343 			if (r->fds[i].revents & POLLHUP) {
344 				syslog(LOG_NOTICE,"replicator: connection lost");
345 				return -1;
346 			}
347 			if (r->fds[i].revents & POLLOUT) {
348 				if (rep_write(r->repsources+i)<0) {
349 					return -1;
350 				}
351 			}
352 		}
353 	}
354 }
355 
rep_wait_for_connection(replication * r,uint32_t msecto)356 static int rep_wait_for_connection(replication *r,uint32_t msecto) {
357 	uint8_t i,l;
358 	uint64_t st;
359 	uint32_t msec;
360 	st = monotonic_useconds();
361 	for (;;) {
362 		l=1;
363 		for (i=0 ; i<r->srccnt ; i++) {
364 			if (r->repsources[i].mode==CONNECTING) {
365 				r->fds[i].events = POLLOUT;
366 				l=0;
367 			} else {
368 				r->fds[i].events = 0;
369 			}
370 		}
371 		if (l) {	// finished
372 			return 0;
373 		}
374 		msec = (monotonic_useconds()-st)/1000;
375 		if (msec>=msecto) {
376 			syslog(LOG_NOTICE,"replicator: connect timed out");
377 			return -1; // timed out
378 		}
379 		if (poll(r->fds,r->srccnt,msecto-msec)<0) {
380 			if (errno!=EINTR && ERRNO_ERROR) {
381 				mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
382 				return -1;
383 			}
384 			continue;
385 		}
386 		for (i=0 ; i<r->srccnt ; i++) {
387 			if (r->fds[i].revents & POLLHUP) {
388 				syslog(LOG_NOTICE,"replicator: connection lost");
389 				return -1;
390 			}
391 			if (r->fds[i].revents & POLLOUT) {
392 				if (tcpgetstatus(r->repsources[i].sock)<0) {
393 					mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
394 					return -1;
395 				}
396 				r->repsources[i].mode=IDLE;
397 			}
398 		}
399 	}
400 }
401 
rep_cleanup(replication * r)402 static void rep_cleanup(replication *r) {
403 	int i;
404 	if (r->opened) {
405 		hdd_close(r->chunkid);
406 	}
407 	if (r->created) {
408 		hdd_delete(r->chunkid,0);
409 	}
410 	for (i=0 ; i<r->srccnt ; i++) {
411 		if (r->repsources[i].sock>=0) {
412 			tcpclose(r->repsources[i].sock);
413 		}
414 		if (r->repsources[i].packet) {
415 			free(r->repsources[i].packet);
416 		}
417 	}
418 	if (r->fds) {
419 		free(r->fds);
420 	}
421 	if (r->repsources) {
422 		free(r->repsources);
423 	}
424 	if (r->xorbuff) {
425 		free(r->xorbuff);
426 	}
427 }
428 
429 /* srcs: srccnt * (chunkid:64 version:32 ip:32 port:16) */
replicate(uint64_t chunkid,uint32_t version,const uint32_t xormasks[4],uint8_t srccnt,const uint8_t * srcs)430 uint8_t replicate(uint64_t chunkid,uint32_t version,const uint32_t xormasks[4],uint8_t srccnt,const uint8_t *srcs) {
431 	replication r;
432 	uint8_t status,i,j,vbuffs,first;
433 	uint16_t b,blocks;
434 	uint32_t xcrc[4],crc;
435 	uint32_t codeindex,codeword;
436 	uint8_t *wptr;
437 	const uint8_t *rptr;
438 	int s;
439 
440 	if (srccnt==0) {
441 		return MFS_ERROR_EINVAL;
442 	}
443 
444 //	syslog(LOG_NOTICE,"replication begin (chunkid:%08"PRIX64",version:%04"PRIX32",srccnt:%"PRIu8")",chunkid,version,srccnt);
445 
446 	pthread_mutex_lock(&statslock);
447 	stats_repl++;
448 	pthread_mutex_unlock(&statslock);
449 
450 // init replication structure
451 	r.chunkid = chunkid;
452 	r.version = version;
453 	r.srccnt = 0;
454 	r.created = 0;
455 	r.opened = 0;
456 	r.fds = malloc(sizeof(struct pollfd)*srccnt);
457 	passert(r.fds);
458 	r.repsources = malloc(sizeof(repsrc)*srccnt);
459 	passert(r.repsources);
460 	if (srccnt>1) {
461 		r.xorbuff = malloc(MFSBLOCKSIZE+4);
462 		passert(r.xorbuff);
463 	} else {
464 		r.xorbuff = NULL;
465 	}
466 // create chunk
467 	status = hdd_create(chunkid,0);
468 	if (status!=MFS_STATUS_OK) {
469 		syslog(LOG_NOTICE,"replicator: hdd_create status: %s",mfsstrerr(status));
470 		rep_cleanup(&r);
471 		return status;
472 	}
473 	r.created = 1;
474 // init sources
475 	r.srccnt = srccnt;
476 	for (i=0 ; i<srccnt ; i++) {
477 		r.repsources[i].chunkid = get64bit(&srcs);
478 		r.repsources[i].version = get32bit(&srcs);
479 		r.repsources[i].ip = get32bit(&srcs);
480 		r.repsources[i].port = get16bit(&srcs);
481 		r.repsources[i].sock = -1;
482 		r.repsources[i].packet = NULL;
483 	}
484 // connect
485 	for (i=0 ; i<srccnt ; i++) {
486 		s = tcpsocket();
487 		if (s<0) {
488 			mfs_errlog_silent(LOG_NOTICE,"replicator: socket error");
489 			rep_cleanup(&r);
490 			return MFS_ERROR_CANTCONNECT;
491 		}
492 		r.repsources[i].sock = s;
493 		r.fds[i].fd = s;
494 		if (tcpnonblock(s)<0) {
495 			mfs_errlog_silent(LOG_NOTICE,"replicator: nonblock error");
496 			rep_cleanup(&r);
497 			return MFS_ERROR_CANTCONNECT;
498 		}
499 		s = tcpnumconnect(s,r.repsources[i].ip,r.repsources[i].port);
500 		if (s<0) {
501 			mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
502 			rep_cleanup(&r);
503 			return MFS_ERROR_CANTCONNECT;
504 		}
505 		if (s==0) {
506 			r.repsources[i].mode = IDLE;
507 		} else {
508 			r.repsources[i].mode = CONNECTING;
509 		}
510 	}
511 	if (rep_wait_for_connection(&r,CONNMSECTO)<0) {
512 		rep_cleanup(&r);
513 		return MFS_ERROR_CANTCONNECT;
514 	}
515 // disable Nagle
516 	for (i=0 ; i<srccnt ; i++) {
517 		tcpnodelay(r.repsources[i].sock);
518 	}
519 // open chunk
520 	status = hdd_open(chunkid,0);
521 	if (status!=MFS_STATUS_OK) {
522 		syslog(LOG_NOTICE,"replicator: hdd_open status: %s",mfsstrerr(status));
523 		rep_cleanup(&r);
524 		return status;
525 	}
526 	r.opened = 1;
527 // get block numbers
528 	for (i=0 ; i<srccnt ; i++) {
529 		wptr = rep_create_packet(r.repsources+i,ANTOCS_GET_CHUNK_BLOCKS,8+4);
530 		if (wptr==NULL) {
531 			syslog(LOG_NOTICE,"replicator: out of memory");
532 			rep_cleanup(&r);
533 			return MFS_ERROR_OUTOFMEMORY;
534 		}
535 		put64bit(&wptr,r.repsources[i].chunkid);
536 		put32bit(&wptr,r.repsources[i].version);
537 	}
538 // send packet
539 	if (rep_send_all_packets(&r,SENDMSECTO)<0) {
540 		rep_cleanup(&r);
541 		return MFS_ERROR_DISCONNECTED;
542 	}
543 // receive answers
544 	for (i=0 ; i<srccnt ; i++) {
545 		r.repsources[i].mode = HEADER;
546 		r.repsources[i].startptr = r.repsources[i].hdrbuff;
547 		r.repsources[i].bytesleft = 8;
548 	}
549 	if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
550 		rep_cleanup(&r);
551 		return MFS_ERROR_DISCONNECTED;
552 	}
553 // get # of blocks
554 	blocks = 0;
555 	for (i=0 ; i<srccnt ; i++) {
556 		uint32_t type,size;
557 		uint64_t pchid;
558 		uint32_t pver;
559 		uint16_t pblocks;
560 		uint8_t pstatus;
561 		uint32_t ip;
562 		rptr = r.repsources[i].hdrbuff;
563 		type = get32bit(&rptr);
564 		size = get32bit(&rptr);
565 		rptr = r.repsources[i].packet;
566 		ip = r.repsources[i].ip;
567 		if (rptr==NULL || type!=CSTOAN_CHUNK_BLOCKS || size!=15) {
568 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
569 			rep_cleanup(&r);
570 			return MFS_ERROR_DISCONNECTED;
571 		}
572 		pchid = get64bit(&rptr);
573 		pver = get32bit(&rptr);
574 		pblocks = get16bit(&rptr);
575 		pstatus = get8bit(&rptr);
576 		if (pchid!=r.repsources[i].chunkid) {
577 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
578 			rep_cleanup(&r);
579 			return MFS_ERROR_WRONGCHUNKID;
580 		}
581 		if (pver!=r.repsources[i].version) {
582 			syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:version:%"PRIX32"/%"PRIX32") from (%u.%u.%u.%u:%u)",pver,r.repsources[i].version,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
583 			rep_cleanup(&r);
584 			return MFS_ERROR_WRONGVERSION;
585 		}
586 		if (pstatus!=MFS_STATUS_OK) {
587 			syslog(LOG_NOTICE,"replicator,get # of blocks: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
588 			rep_cleanup(&r);
589 			return pstatus;
590 		}
591 		r.repsources[i].blocks = pblocks;
592 		if (pblocks>blocks) {
593 			blocks=pblocks;
594 		}
595 	}
596 // create read request
597 	for (i=0 ; i<srccnt ; i++) {
598 		if (r.repsources[i].blocks>0) {
599 			uint32_t leng;
600 			wptr = rep_create_packet(r.repsources+i,CLTOCS_READ,8+4+4+4);
601 			if (wptr==NULL) {
602 				syslog(LOG_NOTICE,"replicator: out of memory");
603 				rep_cleanup(&r);
604 				return MFS_ERROR_OUTOFMEMORY;
605 			}
606 			leng = r.repsources[i].blocks*MFSBLOCKSIZE;
607 			put64bit(&wptr,r.repsources[i].chunkid);
608 			put32bit(&wptr,r.repsources[i].version);
609 			put32bit(&wptr,0);
610 			put32bit(&wptr,leng);
611 		} else {
612 			rep_no_packet(r.repsources+i);
613 		}
614 	}
615 // send read request
616 	if (rep_send_all_packets(&r,SENDMSECTO)<0) {
617 		rep_cleanup(&r);
618 		return MFS_ERROR_DISCONNECTED;
619 	}
620 // receive data and write to hdd
621 	for (b=0 ; b<blocks ; b++) {
622 // prepare receive
623 		for (i=0 ; i<srccnt ; i++) {
624 			if (b<r.repsources[i].blocks) {
625 				r.repsources[i].mode = HEADER;
626 				r.repsources[i].startptr = r.repsources[i].hdrbuff;
627 				r.repsources[i].bytesleft = 8;
628 			} else {
629 				r.repsources[i].mode = IDLE;
630 				r.repsources[i].bytesleft = 0;
631 			}
632 		}
633 // receive data
634 		if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
635 			rep_cleanup(&r);
636 			return MFS_ERROR_DISCONNECTED;
637 		}
638 // check packets
639 		vbuffs = 0;
640 		for (i=0 ; i<srccnt ; i++) {
641 			if (r.repsources[i].mode!=IDLE) {
642 				uint32_t type,size;
643 				uint64_t pchid;
644 				uint16_t pblocknum;
645 				uint16_t poffset;
646 				uint32_t psize;
647 				uint8_t pstatus;
648 				uint32_t ip;
649 				rptr = r.repsources[i].hdrbuff;
650 				type = get32bit(&rptr);
651 				size = get32bit(&rptr);
652 				rptr = r.repsources[i].packet;
653 				ip = r.repsources[i].ip;
654 				if (rptr==NULL) {
655 					rep_cleanup(&r);
656 					return MFS_ERROR_DISCONNECTED;
657 				}
658 				if (type==CSTOCL_READ_STATUS && size==9) {
659 					pchid = get64bit(&rptr);
660 					pstatus = get8bit(&rptr);
661 					if (pchid!=r.repsources[i].chunkid) {
662 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
663 						rep_cleanup(&r);
664 						return MFS_ERROR_WRONGCHUNKID;
665 					}
666 					if (pstatus==MFS_STATUS_OK) {	// got status too early or got incorrect packet
667 						syslog(LOG_WARNING,"replicator,read chunks: got unexpected ok status from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
668 						rep_cleanup(&r);
669 						return MFS_ERROR_DISCONNECTED;
670 					}
671 					syslog(LOG_NOTICE,"replicator,read chunks: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
672 					rep_cleanup(&r);
673 					return pstatus;
674 				} else if (type==CSTOCL_READ_DATA && size==20+MFSBLOCKSIZE) {
675 					pchid = get64bit(&rptr);
676 					pblocknum = get16bit(&rptr);
677 					poffset = get16bit(&rptr);
678 					psize = get32bit(&rptr);
679 					if (pchid!=r.repsources[i].chunkid) {
680 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
681 						rep_cleanup(&r);
682 						return MFS_ERROR_WRONGCHUNKID;
683 					}
684 					if (pblocknum!=b) {
685 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:blocknum:%"PRIu16"/%"PRIu16") from (%u.%u.%u.%u:%u)",pblocknum,b,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
686 						rep_cleanup(&r);
687 						return MFS_ERROR_DISCONNECTED;
688 					}
689 					if (poffset!=0) {
690 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:offset:%"PRIu16") from (%u.%u.%u.%u:%u)",poffset,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
691 						rep_cleanup(&r);
692 						return MFS_ERROR_WRONGOFFSET;
693 					}
694 					if (psize!=MFSBLOCKSIZE) {
695 						syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:size:%"PRIu32") from (%u.%u.%u.%u:%u)",psize,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
696 						rep_cleanup(&r);
697 						return MFS_ERROR_WRONGSIZE;
698 					}
699 				} else {
700 					syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
701 					rep_cleanup(&r);
702 					return MFS_ERROR_DISCONNECTED;
703 				}
704 				vbuffs++;
705 			}
706 		}
707 // write data
708 		if (vbuffs==0) {	// no buffers ? - it should never happen
709 			syslog(LOG_WARNING,"replicator: no data received for block: %"PRIu16,b);
710 			rep_cleanup(&r);
711 			return MFS_ERROR_DISCONNECTED;
712 		} else if (vbuffs==1) { // xor not needed, so just find block and write it
713 			for (i=0 ; i<srccnt ; i++) {
714 				if (r.repsources[i].mode!=IDLE) {
715 					rptr = r.repsources[i].packet;
716 					status = hdd_write(chunkid,0,b,rptr+20,0,MFSBLOCKSIZE,rptr+16);
717 					if (status!=MFS_STATUS_OK) {
718 						syslog(LOG_WARNING,"replicator: write status: %s",mfsstrerr(status));
719 						rep_cleanup(&r);
720 						return status;
721 					}
722 				}
723 			}
724 		} else {
725 			for (i=0 ; i<srccnt ; i++) {
726 				if (r.repsources[i].mode!=IDLE) {
727 					rptr = r.repsources[i].packet;
728 					rptr += 16;
729 					crc = get32bit(&rptr);
730 					for (j=0 ; j<4 ; j++) {
731 						r.repsources[i].crcsums[j] = mycrc32(0,rptr+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
732 					}
733 					if (crc != mycrc32_combine(mycrc32_combine(r.repsources[i].crcsums[0],r.repsources[i].crcsums[1],MFSBLOCKSIZE/4),mycrc32_combine(r.repsources[i].crcsums[2],r.repsources[i].crcsums[3],MFSBLOCKSIZE/4),MFSBLOCKSIZE/2)) {
734 						uint32_t ip;
735 						ip = r.repsources[i].ip;
736 						syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
737 						rep_cleanup(&r);
738 						return MFS_ERROR_CRC;
739 					}
740 				}
741 			}
742 			crc = mycrc32_zeroblock(0,MFSBLOCKSIZE/4);
743 			for (codeindex=0 ; codeindex<4 ; codeindex++) {
744 				codeword = xormasks[codeindex];
745 				first = 1;
746 				for (i=0 ; i<srccnt ; i++) {
747 					for (j=0 ; j<4 ; j++) {
748 						if (r.repsources[i].mode!=IDLE && (codeword&UINT32_C(0x80000000))) {
749 							rptr = r.repsources[i].packet;
750 							rptr += 16;
751 							if (first) {
752 								memcpy(r.xorbuff+4+codeindex*MFSBLOCKSIZE/4,rptr+4+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
753 								first = 0;
754 								xcrc[codeindex] = r.repsources[i].crcsums[j];
755 							} else {
756 								xordata(r.xorbuff+4+codeindex*MFSBLOCKSIZE/4,rptr+4+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
757 								xcrc[codeindex] ^= r.repsources[i].crcsums[j] ^ crc;
758 							}
759 						}
760 						codeword>>=1;
761 					}
762 				}
763 			}
764 			crc = mycrc32_combine(mycrc32_combine(xcrc[0],xcrc[1],MFSBLOCKSIZE/4),mycrc32_combine(xcrc[2],xcrc[3],MFSBLOCKSIZE/4),MFSBLOCKSIZE/2);
765 			wptr = r.xorbuff;
766 			put32bit(&wptr,crc);
767 /*
768 			first=1;
769 			if (vbuffs&1) {
770 				xcrc = 0;
771 			} else {
772 				xcrc = MFSCRCEMPTY; // = mycrc32_zeroblock(0,0x10000);
773 			}
774 			for (i=0 ; i<srccnt ; i++) {
775 				if (r.repsources[i].mode!=IDLE) {
776 					uint32_t ip;
777 					ip = r.repsources[i].ip;
778 					rptr = r.repsources[i].packet;
779 					rptr+=16;	// skip chunkid,blockno,offset and size
780 					if (first) {
781 						memcpy(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
782 						first=0;
783 					} else {
784 						xordata(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
785 					}
786 					crc = get32bit(&rptr);
787 					if (crc!=mycrc32(0,rptr,MFSBLOCKSIZE)) {
788 						syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
789 						rep_cleanup(&r);
790 						return MFS_ERROR_CRC;
791 					}
792 					xcrc^=crc;
793 				}
794 			}
795 			wptr = r.xorbuff;
796 			put32bit(&wptr,xcrc);
797 */
798 			status = hdd_write(chunkid,0,b,r.xorbuff+4,0,MFSBLOCKSIZE,r.xorbuff);
799 			if (status!=MFS_STATUS_OK) {
800 				syslog(LOG_WARNING,"replicator: xor write status: %s",mfsstrerr(status));
801 				rep_cleanup(&r);
802 				return status;
803 			}
804 		}
805 	}
806 // receive status
807 	for (i=0 ; i<srccnt ; i++) {
808 		if (r.repsources[i].blocks>0) {
809 //			if (r.repsources[i].packet) {
810 //				free(r.repsources[i].packet);
811 //				r.repsources[i].packet=NULL;
812 //			}
813 			r.repsources[i].mode = HEADER;
814 			r.repsources[i].startptr = r.repsources[i].hdrbuff;
815 			r.repsources[i].bytesleft = 8;
816 		} else {
817 			r.repsources[i].mode = IDLE;
818 			r.repsources[i].bytesleft = 0;
819 		}
820 	}
821 	if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
822 		rep_cleanup(&r);
823 		return MFS_ERROR_DISCONNECTED;
824 	}
825 	for (i=0 ; i<srccnt ; i++) {
826 		if (r.repsources[i].blocks>0) {
827 			uint32_t type,size;
828 			uint64_t pchid;
829 			uint8_t pstatus;
830 			uint32_t ip;
831 			rptr = r.repsources[i].hdrbuff;
832 			type = get32bit(&rptr);
833 			size = get32bit(&rptr);
834 			rptr = r.repsources[i].packet;
835 			ip = r.repsources[i].ip;
836 			if (rptr==NULL || type!=CSTOCL_READ_STATUS || size!=9) {
837 				syslog(LOG_WARNING,"replicator,check status: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
838 				rep_cleanup(&r);
839 				return MFS_ERROR_DISCONNECTED;
840 			}
841 			pchid = get64bit(&rptr);
842 			pstatus = get8bit(&rptr);
843 			if (pchid!=r.repsources[i].chunkid) {
844 				syslog(LOG_WARNING,"replicator,check status: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
845 				rep_cleanup(&r);
846 				return MFS_ERROR_WRONGCHUNKID;
847 			}
848 			if (pstatus!=MFS_STATUS_OK) {
849 				syslog(LOG_NOTICE,"replicator,check status: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
850 				rep_cleanup(&r);
851 				return pstatus;
852 			}
853 		}
854 	}
855 // close chunk and change version
856 	status = hdd_close(chunkid);
857 	if (status!=MFS_STATUS_OK) {
858 		syslog(LOG_NOTICE,"replicator: hdd_close status: %s",mfsstrerr(status));
859 		rep_cleanup(&r);
860 		return status;
861 	}
862 	r.opened = 0;
863 	status = hdd_version(chunkid,0,version);
864 	if (status!=MFS_STATUS_OK) {
865 		syslog(LOG_NOTICE,"replicator: hdd_version status: %s",mfsstrerr(status));
866 		rep_cleanup(&r);
867 		return status;
868 	}
869 	r.created = 0;
870 	rep_cleanup(&r);
871 	return MFS_STATUS_OK;
872 }
873