1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30 #include <poll.h>
31 #include <sys/time.h>
32 #include <syslog.h>
33 #include <errno.h>
34 #include <inttypes.h>
35 #include <pthread.h>
36
37 #include "MFSCommunication.h"
38 #include "hddspacemgr.h"
39 #include "sockets.h"
40 #include "crc.h"
41 #include "slogger.h"
42 #include "datapack.h"
43 #include "massert.h"
44 #include "mfsstrerr.h"
45 #include "clocks.h"
46
47 #include "replicator.h"
48
49 #define CONNMSECTO 5000
50 #define SENDMSECTO 5000
51 #define RECVMSECTO 5000
52
53 #define MAX_RECV_PACKET_SIZE (20+MFSBLOCKSIZE)
54
55 typedef enum {IDLE,CONNECTING,HEADER,DATA} modetype;
56
57 typedef struct _repsrc {
58 int sock;
59 modetype mode;
60 uint8_t hdrbuff[8];
61 uint8_t *packet;
62 uint8_t *startptr;
63 uint32_t bytesleft;
64
65 uint64_t chunkid;
66 uint32_t version;
67 uint16_t blocks;
68
69 uint32_t ip;
70 uint16_t port;
71 } repsrc;
72
73 typedef struct _replication {
74 uint64_t chunkid;
75 uint32_t version;
76
77 uint8_t *xorbuff;
78
79 uint8_t created,opened;
80 uint8_t srccnt;
81 struct pollfd *fds;
82 repsrc *repsources;
83 } replication;
84
85 static uint32_t stats_repl = 0;
86 static uint64_t stats_bytesin = 0;
87 static uint64_t stats_bytesout = 0;
88 static pthread_mutex_t statslock = PTHREAD_MUTEX_INITIALIZER;
89
replicator_stats(uint64_t * bin,uint64_t * bout,uint32_t * repl)90 void replicator_stats(uint64_t *bin,uint64_t *bout,uint32_t *repl) {
91 pthread_mutex_lock(&statslock);
92 *bin = stats_bytesin;
93 *bout = stats_bytesout;
94 *repl = stats_repl;
95 stats_repl = 0;
96 stats_bytesin = 0;
97 stats_bytesout = 0;
98 pthread_mutex_unlock(&statslock);
99 }
100
replicator_bytesin(uint64_t bytes)101 static inline void replicator_bytesin(uint64_t bytes) {
102 zassert(pthread_mutex_lock(&statslock));
103 stats_bytesin += bytes;
104 zassert(pthread_mutex_unlock(&statslock));
105 }
106
replicator_bytesout(uint64_t bytes)107 static inline void replicator_bytesout(uint64_t bytes) {
108 zassert(pthread_mutex_lock(&statslock));
109 stats_bytesout += bytes;
110 zassert(pthread_mutex_unlock(&statslock));
111 }
112
xordata(uint8_t * dst,const uint8_t * src,uint32_t leng)113 static void xordata(uint8_t *dst,const uint8_t *src,uint32_t leng) {
114 uint32_t *dst4;
115 const uint32_t *src4;
116 #define XOR_ONE_BYTE (*dst++)^=(*src++)
117 #define XOR_FOUR_BYTES (*dst4++)^=(*src4++)
118 if (((unsigned long)dst&3)==((unsigned long)src&3)) {
119 while (leng && ((unsigned long)src & 3)) {
120 XOR_ONE_BYTE;
121 leng--;
122 }
123 dst4 = (uint32_t*)dst;
124 src4 = (const uint32_t*)src;
125 while (leng>=32) {
126 XOR_FOUR_BYTES;
127 XOR_FOUR_BYTES;
128 XOR_FOUR_BYTES;
129 XOR_FOUR_BYTES;
130 XOR_FOUR_BYTES;
131 XOR_FOUR_BYTES;
132 XOR_FOUR_BYTES;
133 XOR_FOUR_BYTES;
134 leng-=32;
135 }
136 while (leng>=4) {
137 XOR_FOUR_BYTES;
138 leng-=4;
139 }
140 src = (const uint8_t*)src4;
141 dst = (uint8_t*)dst4;
142 if (leng) do {
143 XOR_ONE_BYTE;
144 } while (--leng);
145 } else {
146 while (leng>=8) {
147 XOR_ONE_BYTE;
148 XOR_ONE_BYTE;
149 XOR_ONE_BYTE;
150 XOR_ONE_BYTE;
151 XOR_ONE_BYTE;
152 XOR_ONE_BYTE;
153 XOR_ONE_BYTE;
154 XOR_ONE_BYTE;
155 leng-=8;
156 }
157 if (leng>0) do {
158 XOR_ONE_BYTE;
159 } while (--leng);
160 }
161 }
162
rep_read(repsrc * rs)163 static int rep_read(repsrc *rs) {
164 int32_t i;
165 uint32_t type;
166 uint32_t size;
167 const uint8_t *ptr;
168 while (rs->bytesleft>0) {
169 i=read(rs->sock,rs->startptr,rs->bytesleft);
170 if (i==0) {
171 syslog(LOG_NOTICE,"replicator: connection lost");
172 return -1;
173 }
174 if (i<0) {
175 if (ERRNO_ERROR) {
176 mfs_errlog_silent(LOG_NOTICE,"replicator: read error");
177 return -1;
178 }
179 return 0;
180 }
181 replicator_bytesin(i);
182 // stats_bytesin+=i;
183 rs->startptr+=i;
184 rs->bytesleft-=i;
185
186 if (rs->bytesleft>0) {
187 return 0;
188 }
189
190 if (rs->mode==HEADER) {
191 ptr = rs->hdrbuff;
192 type = get32bit(&ptr);
193 size = get32bit(&ptr);
194 if (type==ANTOAN_NOP && size==0) { // NOP
195 rs->startptr = rs->hdrbuff;
196 rs->bytesleft = 8;
197 return 0;
198 }
199
200 if (rs->packet) {
201 free(rs->packet);
202 }
203 if (size>0) {
204 if (size>MAX_RECV_PACKET_SIZE) {
205 syslog(LOG_WARNING,"replicator: packet too long (%"PRIu32"/%u)",size,MAX_RECV_PACKET_SIZE);
206 return -1;
207 }
208 rs->packet = malloc(size);
209 passert(rs->packet);
210 rs->startptr = rs->packet;
211 } else {
212 rs->packet = NULL;
213 }
214 rs->bytesleft = size;
215 rs->mode = DATA;
216 }
217 }
218 return 0;
219 }
220
rep_receive_all_packets(replication * r,uint32_t msecto)221 static int rep_receive_all_packets(replication *r,uint32_t msecto) {
222 uint8_t i,l;
223 uint64_t st;
224 uint32_t msec;
225 st = monotonic_useconds();
226 for (;;) {
227 l=1;
228 for (i=0 ; i<r->srccnt ; i++) {
229 if (r->repsources[i].bytesleft>0) {
230 r->fds[i].events = POLLIN;
231 l=0;
232 } else {
233 r->fds[i].events = 0;
234 }
235 }
236 if (l) { // finished
237 return 0;
238 }
239 msec = (monotonic_useconds()-st)/1000;
240 if (msec>=msecto) {
241 syslog(LOG_NOTICE,"replicator: receive timed out");
242 return -1; // timed out
243 }
244 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
245 if (errno!=EINTR && ERRNO_ERROR) {
246 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
247 return -1;
248 }
249 continue;
250 }
251 for (i=0 ; i<r->srccnt ; i++) {
252 if (r->fds[i].revents & POLLHUP) {
253 syslog(LOG_NOTICE,"replicator: connection lost");
254 return -1;
255 }
256 if (r->fds[i].revents & POLLIN) {
257 if (rep_read(r->repsources+i)<0) {
258 return -1;
259 }
260 }
261 }
262 }
263 }
264
rep_create_packet(repsrc * rs,uint32_t type,uint32_t size)265 static uint8_t* rep_create_packet(repsrc *rs,uint32_t type,uint32_t size) {
266 uint8_t *ptr;
267 if (rs->packet) {
268 free(rs->packet);
269 }
270 rs->packet = malloc(size+8);
271 passert(rs->packet);
272 ptr = rs->packet;
273 put32bit(&ptr,type);
274 put32bit(&ptr,size);
275 rs->startptr = rs->packet;
276 rs->bytesleft = 8+size;
277 return ptr;
278 }
279
rep_no_packet(repsrc * rs)280 static void rep_no_packet(repsrc *rs) {
281 if (rs->packet) {
282 free(rs->packet);
283 }
284 rs->packet=NULL;
285 rs->startptr=NULL;
286 rs->bytesleft=0;
287 }
288
rep_write(repsrc * rs)289 static int rep_write(repsrc *rs) {
290 int i;
291 i = write(rs->sock,rs->startptr,rs->bytesleft);
292 if (i==0) {
293 syslog(LOG_NOTICE,"replicator: connection lost");
294 return -1;
295 }
296 if (i<0) {
297 if (ERRNO_ERROR) {
298 mfs_errlog_silent(LOG_NOTICE,"replicator: write error");
299 return -1;
300 }
301 return 0;
302 }
303 replicator_bytesout(i);
304 // stats_bytesin+=i;
305 rs->startptr+=i;
306 rs->bytesleft-=i;
307 return 0;
308 }
309
rep_send_all_packets(replication * r,uint32_t msecto)310 static int rep_send_all_packets(replication *r,uint32_t msecto) {
311 uint8_t i,l;
312 uint64_t st;
313 uint32_t msec;
314 st = monotonic_useconds();
315 for (;;) {
316 l=1;
317 for (i=0 ; i<r->srccnt ; i++) {
318 if (r->repsources[i].bytesleft>0) {
319 r->fds[i].events = POLLOUT;
320 l=0;
321 } else {
322 r->fds[i].events = 0;
323 }
324 }
325 if (l) { // finished
326 return 0;
327 }
328 msec = (monotonic_useconds()-st)/1000;
329 if (msec>=msecto) {
330 syslog(LOG_NOTICE,"replicator: send timed out");
331 return -1; // timed out
332 }
333 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
334 if (errno!=EINTR && ERRNO_ERROR) {
335 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
336 return -1;
337 }
338 continue;
339 }
340 for (i=0 ; i<r->srccnt ; i++) {
341 if (r->fds[i].revents & POLLHUP) {
342 syslog(LOG_NOTICE,"replicator: connection lost");
343 return -1;
344 }
345 if (r->fds[i].revents & POLLOUT) {
346 if (rep_write(r->repsources+i)<0) {
347 return -1;
348 }
349 }
350 }
351 }
352 }
353
rep_wait_for_connection(replication * r,uint32_t msecto)354 static int rep_wait_for_connection(replication *r,uint32_t msecto) {
355 uint8_t i,l;
356 uint64_t st;
357 uint32_t msec;
358 st = monotonic_useconds();
359 for (;;) {
360 l=1;
361 for (i=0 ; i<r->srccnt ; i++) {
362 if (r->repsources[i].mode==CONNECTING) {
363 r->fds[i].events = POLLOUT;
364 l=0;
365 } else {
366 r->fds[i].events = 0;
367 }
368 }
369 if (l) { // finished
370 return 0;
371 }
372 msec = (monotonic_useconds()-st)/1000;
373 if (msec>=msecto) {
374 syslog(LOG_NOTICE,"replicator: connect timed out");
375 return -1; // timed out
376 }
377 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
378 if (errno!=EINTR && ERRNO_ERROR) {
379 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
380 return -1;
381 }
382 continue;
383 }
384 for (i=0 ; i<r->srccnt ; i++) {
385 if (r->fds[i].revents & POLLHUP) {
386 syslog(LOG_NOTICE,"replicator: connection lost");
387 return -1;
388 }
389 if (r->fds[i].revents & POLLOUT) {
390 if (tcpgetstatus(r->repsources[i].sock)<0) {
391 mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
392 return -1;
393 }
394 r->repsources[i].mode=IDLE;
395 }
396 }
397 }
398 }
399
rep_cleanup(replication * r)400 static void rep_cleanup(replication *r) {
401 int i;
402 if (r->opened) {
403 hdd_close(r->chunkid);
404 }
405 if (r->created) {
406 hdd_delete(r->chunkid,0);
407 }
408 for (i=0 ; i<r->srccnt ; i++) {
409 if (r->repsources[i].sock>=0) {
410 tcpclose(r->repsources[i].sock);
411 }
412 if (r->repsources[i].packet) {
413 free(r->repsources[i].packet);
414 }
415 }
416 if (r->fds) {
417 free(r->fds);
418 }
419 if (r->repsources) {
420 free(r->repsources);
421 }
422 if (r->xorbuff) {
423 free(r->xorbuff);
424 }
425 }
426
427 /* srcs: srccnt * (chunkid:64 version:32 ip:32 port:16) */
replicate(uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t * srcs)428 uint8_t replicate(uint64_t chunkid,uint32_t version,uint8_t srccnt,const uint8_t *srcs) {
429 replication r;
430 uint8_t status,i,vbuffs,first;
431 uint16_t b,blocks;
432 uint32_t xcrc,crc;
433 uint8_t *wptr;
434 const uint8_t *rptr;
435 int s;
436
437 if (srccnt==0) {
438 return ERROR_EINVAL;
439 }
440
441 // syslog(LOG_NOTICE,"replication begin (chunkid:%08"PRIX64",version:%04"PRIX32",srccnt:%"PRIu8")",chunkid,version,srccnt);
442
443 pthread_mutex_lock(&statslock);
444 stats_repl++;
445 pthread_mutex_unlock(&statslock);
446
447 // init replication structure
448 r.chunkid = chunkid;
449 r.version = version;
450 r.srccnt = 0;
451 r.created = 0;
452 r.opened = 0;
453 r.fds = malloc(sizeof(struct pollfd)*srccnt);
454 passert(r.fds);
455 r.repsources = malloc(sizeof(repsrc)*srccnt);
456 passert(r.repsources);
457 if (srccnt>1) {
458 r.xorbuff = malloc(MFSBLOCKSIZE+4);
459 passert(r.xorbuff);
460 } else {
461 r.xorbuff = NULL;
462 }
463 // create chunk
464 status = hdd_create(chunkid,0);
465 if (status!=STATUS_OK) {
466 syslog(LOG_NOTICE,"replicator: hdd_create status: %s",mfsstrerr(status));
467 rep_cleanup(&r);
468 return status;
469 }
470 r.created = 1;
471 // init sources
472 r.srccnt = srccnt;
473 for (i=0 ; i<srccnt ; i++) {
474 r.repsources[i].chunkid = get64bit(&srcs);
475 r.repsources[i].version = get32bit(&srcs);
476 r.repsources[i].ip = get32bit(&srcs);
477 r.repsources[i].port = get16bit(&srcs);
478 r.repsources[i].sock = -1;
479 r.repsources[i].packet = NULL;
480 }
481 // connect
482 for (i=0 ; i<srccnt ; i++) {
483 s = tcpsocket();
484 if (s<0) {
485 mfs_errlog_silent(LOG_NOTICE,"replicator: socket error");
486 rep_cleanup(&r);
487 return ERROR_CANTCONNECT;
488 }
489 r.repsources[i].sock = s;
490 r.fds[i].fd = s;
491 if (tcpnonblock(s)<0) {
492 mfs_errlog_silent(LOG_NOTICE,"replicator: nonblock error");
493 rep_cleanup(&r);
494 return ERROR_CANTCONNECT;
495 }
496 s = tcpnumconnect(s,r.repsources[i].ip,r.repsources[i].port);
497 if (s<0) {
498 mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
499 rep_cleanup(&r);
500 return ERROR_CANTCONNECT;
501 }
502 if (s==0) {
503 r.repsources[i].mode = IDLE;
504 } else {
505 r.repsources[i].mode = CONNECTING;
506 }
507 }
508 if (rep_wait_for_connection(&r,CONNMSECTO)<0) {
509 rep_cleanup(&r);
510 return ERROR_CANTCONNECT;
511 }
512 // open chunk
513 status = hdd_open(chunkid,0);
514 if (status!=STATUS_OK) {
515 syslog(LOG_NOTICE,"replicator: hdd_open status: %s",mfsstrerr(status));
516 rep_cleanup(&r);
517 return status;
518 }
519 r.opened = 1;
520 // get block numbers
521 for (i=0 ; i<srccnt ; i++) {
522 wptr = rep_create_packet(r.repsources+i,ANTOCS_GET_CHUNK_BLOCKS,8+4);
523 if (wptr==NULL) {
524 syslog(LOG_NOTICE,"replicator: out of memory");
525 rep_cleanup(&r);
526 return ERROR_OUTOFMEMORY;
527 }
528 put64bit(&wptr,r.repsources[i].chunkid);
529 put32bit(&wptr,r.repsources[i].version);
530 }
531 // send packet
532 if (rep_send_all_packets(&r,SENDMSECTO)<0) {
533 rep_cleanup(&r);
534 return ERROR_DISCONNECTED;
535 }
536 // receive answers
537 for (i=0 ; i<srccnt ; i++) {
538 r.repsources[i].mode = HEADER;
539 r.repsources[i].startptr = r.repsources[i].hdrbuff;
540 r.repsources[i].bytesleft = 8;
541 }
542 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
543 rep_cleanup(&r);
544 return ERROR_DISCONNECTED;
545 }
546 // get # of blocks
547 blocks = 0;
548 for (i=0 ; i<srccnt ; i++) {
549 uint32_t type,size;
550 uint64_t pchid;
551 uint32_t pver;
552 uint16_t pblocks;
553 uint8_t pstatus;
554 uint32_t ip;
555 rptr = r.repsources[i].hdrbuff;
556 type = get32bit(&rptr);
557 size = get32bit(&rptr);
558 rptr = r.repsources[i].packet;
559 ip = r.repsources[i].ip;
560 if (rptr==NULL || type!=CSTOAN_CHUNK_BLOCKS || size!=15) {
561 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
562 rep_cleanup(&r);
563 return ERROR_DISCONNECTED;
564 }
565 pchid = get64bit(&rptr);
566 pver = get32bit(&rptr);
567 pblocks = get16bit(&rptr);
568 pstatus = get8bit(&rptr);
569 if (pchid!=r.repsources[i].chunkid) {
570 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
571 rep_cleanup(&r);
572 return ERROR_WRONGCHUNKID;
573 }
574 if (pver!=r.repsources[i].version) {
575 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:version:%"PRIX32"/%"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",pver,r.repsources[i].version,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
576 rep_cleanup(&r);
577 return ERROR_WRONGVERSION;
578 }
579 if (pstatus!=STATUS_OK) {
580 syslog(LOG_NOTICE,"replicator,get # of blocks: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
581 rep_cleanup(&r);
582 return pstatus;
583 }
584 r.repsources[i].blocks = pblocks;
585 if (pblocks>blocks) {
586 blocks=pblocks;
587 }
588 }
589 // create read request
590 for (i=0 ; i<srccnt ; i++) {
591 if (r.repsources[i].blocks>0) {
592 uint32_t leng;
593 wptr = rep_create_packet(r.repsources+i,CLTOCS_READ,8+4+4+4);
594 if (wptr==NULL) {
595 syslog(LOG_NOTICE,"replicator: out of memory");
596 rep_cleanup(&r);
597 return ERROR_OUTOFMEMORY;
598 }
599 leng = r.repsources[i].blocks*MFSBLOCKSIZE;
600 put64bit(&wptr,r.repsources[i].chunkid);
601 put32bit(&wptr,r.repsources[i].version);
602 put32bit(&wptr,0);
603 put32bit(&wptr,leng);
604 } else {
605 rep_no_packet(r.repsources+i);
606 }
607 }
608 // send read request
609 if (rep_send_all_packets(&r,SENDMSECTO)<0) {
610 rep_cleanup(&r);
611 return ERROR_DISCONNECTED;
612 }
613 // receive data and write to hdd
614 for (b=0 ; b<blocks ; b++) {
615 // prepare receive
616 for (i=0 ; i<srccnt ; i++) {
617 if (b<r.repsources[i].blocks) {
618 r.repsources[i].mode = HEADER;
619 r.repsources[i].startptr = r.repsources[i].hdrbuff;
620 r.repsources[i].bytesleft = 8;
621 } else {
622 r.repsources[i].mode = IDLE;
623 r.repsources[i].bytesleft = 0;
624 }
625 }
626 // receive data
627 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
628 rep_cleanup(&r);
629 return ERROR_DISCONNECTED;
630 }
631 // check packets
632 vbuffs = 0;
633 for (i=0 ; i<srccnt ; i++) {
634 if (r.repsources[i].mode!=IDLE) {
635 uint32_t type,size;
636 uint64_t pchid;
637 uint16_t pblocknum;
638 uint16_t poffset;
639 uint32_t psize;
640 uint8_t pstatus;
641 uint32_t ip;
642 rptr = r.repsources[i].hdrbuff;
643 type = get32bit(&rptr);
644 size = get32bit(&rptr);
645 rptr = r.repsources[i].packet;
646 ip = r.repsources[i].ip;
647 if (rptr==NULL) {
648 rep_cleanup(&r);
649 return ERROR_DISCONNECTED;
650 }
651 if (type==CSTOCL_READ_STATUS && size==9) {
652 pchid = get64bit(&rptr);
653 pstatus = get8bit(&rptr);
654 if (pchid!=r.repsources[i].chunkid) {
655 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
656 rep_cleanup(&r);
657 return ERROR_WRONGCHUNKID;
658 }
659 if (pstatus==STATUS_OK) { // got status too early or got incorrect packet
660 syslog(LOG_WARNING,"replicator,read chunks: got unexpected ok status from (%u.%u.%u.%u:%04"PRIX16")",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
661 rep_cleanup(&r);
662 return ERROR_DISCONNECTED;
663 }
664 syslog(LOG_NOTICE,"replicator,read chunks: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
665 rep_cleanup(&r);
666 return pstatus;
667 } else if (type==CSTOCL_READ_DATA && size==20+MFSBLOCKSIZE) {
668 pchid = get64bit(&rptr);
669 pblocknum = get16bit(&rptr);
670 poffset = get16bit(&rptr);
671 psize = get32bit(&rptr);
672 if (pchid!=r.repsources[i].chunkid) {
673 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
674 rep_cleanup(&r);
675 return ERROR_WRONGCHUNKID;
676 }
677 if (pblocknum!=b) {
678 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:blocknum:%"PRIu16"/%"PRIu16") from (%u.%u.%u.%u:%04"PRIX16")",pblocknum,b,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
679 rep_cleanup(&r);
680 return ERROR_DISCONNECTED;
681 }
682 if (poffset!=0) {
683 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:offset:%"PRIu16") from (%u.%u.%u.%u:%04"PRIX16")",poffset,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
684 rep_cleanup(&r);
685 return ERROR_WRONGOFFSET;
686 }
687 if (psize!=MFSBLOCKSIZE) {
688 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:size:%"PRIu32") from (%u.%u.%u.%u:%04"PRIX16")",psize,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
689 rep_cleanup(&r);
690 return ERROR_WRONGSIZE;
691 }
692 } else {
693 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
694 rep_cleanup(&r);
695 return ERROR_DISCONNECTED;
696 }
697 vbuffs++;
698 }
699 }
700 // write data
701 if (vbuffs==0) { // no buffers ? - it should never happen
702 syslog(LOG_WARNING,"replicator: no data received for block: %"PRIu16,b);
703 rep_cleanup(&r);
704 return ERROR_DISCONNECTED;
705 } else if (vbuffs==1) { // xor not needed, so just find block and write it
706 for (i=0 ; i<srccnt ; i++) {
707 if (r.repsources[i].mode!=IDLE) {
708 rptr = r.repsources[i].packet;
709 status = hdd_write(chunkid,0,b,rptr+20,0,MFSBLOCKSIZE,rptr+16);
710 if (status!=STATUS_OK) {
711 syslog(LOG_WARNING,"replicator: write status: %s",mfsstrerr(status));
712 rep_cleanup(&r);
713 return status;
714 }
715 }
716 }
717 } else {
718 first=1;
719 if (vbuffs&1) {
720 xcrc = 0;
721 } else {
722 xcrc = MFSCRCEMPTY; // = mycrc32_zeroblock(0,0x10000);
723 }
724 for (i=0 ; i<srccnt ; i++) {
725 if (r.repsources[i].mode!=IDLE) {
726 uint32_t ip;
727 ip = r.repsources[i].ip;
728 rptr = r.repsources[i].packet;
729 rptr+=16; // skip chunkid,blockno,offset and size
730 if (first) {
731 memcpy(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
732 first=0;
733 } else {
734 xordata(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
735 }
736 crc = get32bit(&rptr);
737 if (crc!=mycrc32(0,rptr,MFSBLOCKSIZE)) {
738 syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%04"PRIX16")",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
739 rep_cleanup(&r);
740 return ERROR_CRC;
741 }
742 xcrc^=crc;
743 }
744 }
745 wptr = r.xorbuff;
746 put32bit(&wptr,xcrc);
747 status = hdd_write(chunkid,0,b,r.xorbuff+4,0,MFSBLOCKSIZE,r.xorbuff);
748 if (status!=STATUS_OK) {
749 syslog(LOG_WARNING,"replicator: xor write status: %s",mfsstrerr(status));
750 rep_cleanup(&r);
751 return status;
752 }
753 }
754 }
755 // receive status
756 for (i=0 ; i<srccnt ; i++) {
757 if (r.repsources[i].blocks>0) {
758 // if (r.repsources[i].packet) {
759 // free(r.repsources[i].packet);
760 // r.repsources[i].packet=NULL;
761 // }
762 r.repsources[i].mode = HEADER;
763 r.repsources[i].startptr = r.repsources[i].hdrbuff;
764 r.repsources[i].bytesleft = 8;
765 } else {
766 r.repsources[i].mode = IDLE;
767 r.repsources[i].bytesleft = 0;
768 }
769 }
770 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
771 rep_cleanup(&r);
772 return ERROR_DISCONNECTED;
773 }
774 for (i=0 ; i<srccnt ; i++) {
775 if (r.repsources[i].blocks>0) {
776 uint32_t type,size;
777 uint64_t pchid;
778 uint8_t pstatus;
779 uint32_t ip;
780 rptr = r.repsources[i].hdrbuff;
781 type = get32bit(&rptr);
782 size = get32bit(&rptr);
783 rptr = r.repsources[i].packet;
784 ip = r.repsources[i].ip;
785 if (rptr==NULL || type!=CSTOCL_READ_STATUS || size!=9) {
786 syslog(LOG_WARNING,"replicator,check status: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%04"PRIX16")",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
787 rep_cleanup(&r);
788 return ERROR_DISCONNECTED;
789 }
790 pchid = get64bit(&rptr);
791 pstatus = get8bit(&rptr);
792 if (pchid!=r.repsources[i].chunkid) {
793 syslog(LOG_WARNING,"replicator,check status: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%04"PRIX16")",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
794 rep_cleanup(&r);
795 return ERROR_WRONGCHUNKID;
796 }
797 if (pstatus!=STATUS_OK) {
798 syslog(LOG_NOTICE,"replicator,check status: got status: %s from (%u.%u.%u.%u:%04"PRIX16")",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
799 rep_cleanup(&r);
800 return pstatus;
801 }
802 }
803 }
804 // close chunk and change version
805 status = hdd_close(chunkid);
806 if (status!=STATUS_OK) {
807 syslog(LOG_NOTICE,"replicator: hdd_close status: %s",mfsstrerr(status));
808 rep_cleanup(&r);
809 return status;
810 }
811 r.opened = 0;
812 status = hdd_version(chunkid,0,version);
813 if (status!=STATUS_OK) {
814 syslog(LOG_NOTICE,"replicator: hdd_version status: %s",mfsstrerr(status));
815 rep_cleanup(&r);
816 return status;
817 }
818 r.created = 0;
819 rep_cleanup(&r);
820 return STATUS_OK;
821 }
822