1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <unistd.h>
30 #include <poll.h>
31 #include <sys/time.h>
32 #include <syslog.h>
33 #include <errno.h>
34 #include <inttypes.h>
35 #include <pthread.h>
36
37 #include "MFSCommunication.h"
38 #include "hddspacemgr.h"
39 #include "sockets.h"
40 #include "crc.h"
41 #include "slogger.h"
42 #include "datapack.h"
43 #include "massert.h"
44 #include "mfsstrerr.h"
45 #include "clocks.h"
46
47 #include "replicator.h"
48
49 #define CONNMSECTO 5000
50 #define SENDMSECTO 5000
51 #define RECVMSECTO 5000
52
53 #define MAX_RECV_PACKET_SIZE (20+MFSBLOCKSIZE)
54
55 typedef enum {IDLE,CONNECTING,HEADER,DATA} modetype;
56
57 typedef struct _repsrc {
58 int sock;
59 modetype mode;
60 uint8_t hdrbuff[8];
61 uint8_t *packet;
62 uint8_t *startptr;
63 uint32_t bytesleft;
64
65 uint64_t chunkid;
66 uint32_t version;
67 uint16_t blocks;
68
69 uint32_t ip;
70 uint16_t port;
71
72 uint32_t crcsums[4];
73 } repsrc;
74
75 typedef struct _replication {
76 uint64_t chunkid;
77 uint32_t version;
78
79 uint8_t *xorbuff;
80
81 uint8_t created,opened;
82 uint8_t srccnt;
83 struct pollfd *fds;
84 repsrc *repsources;
85 } replication;
86
87 static uint32_t stats_repl = 0;
88 static uint64_t stats_bytesin = 0;
89 static uint64_t stats_bytesout = 0;
90 static pthread_mutex_t statslock = PTHREAD_MUTEX_INITIALIZER;
91
replicator_stats(uint64_t * bin,uint64_t * bout,uint32_t * repl)92 void replicator_stats(uint64_t *bin,uint64_t *bout,uint32_t *repl) {
93 pthread_mutex_lock(&statslock);
94 *bin = stats_bytesin;
95 *bout = stats_bytesout;
96 *repl = stats_repl;
97 stats_repl = 0;
98 stats_bytesin = 0;
99 stats_bytesout = 0;
100 pthread_mutex_unlock(&statslock);
101 }
102
replicator_bytesin(uint64_t bytes)103 static inline void replicator_bytesin(uint64_t bytes) {
104 zassert(pthread_mutex_lock(&statslock));
105 stats_bytesin += bytes;
106 zassert(pthread_mutex_unlock(&statslock));
107 }
108
replicator_bytesout(uint64_t bytes)109 static inline void replicator_bytesout(uint64_t bytes) {
110 zassert(pthread_mutex_lock(&statslock));
111 stats_bytesout += bytes;
112 zassert(pthread_mutex_unlock(&statslock));
113 }
114
xordata(uint8_t * dst,const uint8_t * src,uint32_t leng)115 static void xordata(uint8_t *dst,const uint8_t *src,uint32_t leng) {
116 uint32_t *dst4;
117 const uint32_t *src4;
118 #define XOR_ONE_BYTE (*dst++)^=(*src++)
119 #define XOR_FOUR_BYTES (*dst4++)^=(*src4++)
120 if (((unsigned long)dst&3)==((unsigned long)src&3)) {
121 while (leng && ((unsigned long)src & 3)) {
122 XOR_ONE_BYTE;
123 leng--;
124 }
125 dst4 = (uint32_t*)dst;
126 src4 = (const uint32_t*)src;
127 while (leng>=32) {
128 XOR_FOUR_BYTES;
129 XOR_FOUR_BYTES;
130 XOR_FOUR_BYTES;
131 XOR_FOUR_BYTES;
132 XOR_FOUR_BYTES;
133 XOR_FOUR_BYTES;
134 XOR_FOUR_BYTES;
135 XOR_FOUR_BYTES;
136 leng-=32;
137 }
138 while (leng>=4) {
139 XOR_FOUR_BYTES;
140 leng-=4;
141 }
142 src = (const uint8_t*)src4;
143 dst = (uint8_t*)dst4;
144 if (leng) do {
145 XOR_ONE_BYTE;
146 } while (--leng);
147 } else {
148 while (leng>=8) {
149 XOR_ONE_BYTE;
150 XOR_ONE_BYTE;
151 XOR_ONE_BYTE;
152 XOR_ONE_BYTE;
153 XOR_ONE_BYTE;
154 XOR_ONE_BYTE;
155 XOR_ONE_BYTE;
156 XOR_ONE_BYTE;
157 leng-=8;
158 }
159 if (leng>0) do {
160 XOR_ONE_BYTE;
161 } while (--leng);
162 }
163 }
164
rep_read(repsrc * rs)165 static int rep_read(repsrc *rs) {
166 int32_t i;
167 uint32_t type;
168 uint32_t size;
169 const uint8_t *ptr;
170 while (rs->bytesleft>0) {
171 i=read(rs->sock,rs->startptr,rs->bytesleft);
172 if (i==0) {
173 syslog(LOG_NOTICE,"replicator: connection lost");
174 return -1;
175 }
176 if (i<0) {
177 if (ERRNO_ERROR) {
178 mfs_errlog_silent(LOG_NOTICE,"replicator: read error");
179 return -1;
180 }
181 return 0;
182 }
183 replicator_bytesin(i);
184 // stats_bytesin+=i;
185 rs->startptr+=i;
186 rs->bytesleft-=i;
187
188 if (rs->bytesleft>0) {
189 return 0;
190 }
191
192 if (rs->mode==HEADER) {
193 ptr = rs->hdrbuff;
194 type = get32bit(&ptr);
195 size = get32bit(&ptr);
196 if (type==ANTOAN_NOP && size==0) { // NOP
197 rs->startptr = rs->hdrbuff;
198 rs->bytesleft = 8;
199 return 0;
200 }
201
202 if (rs->packet) {
203 free(rs->packet);
204 }
205 if (size>0) {
206 if (size>MAX_RECV_PACKET_SIZE) {
207 syslog(LOG_WARNING,"replicator: packet too long (%"PRIu32"/%u) ; command:%"PRIu32,size,MAX_RECV_PACKET_SIZE,type);
208 return -1;
209 }
210 rs->packet = malloc(size);
211 passert(rs->packet);
212 rs->startptr = rs->packet;
213 } else {
214 rs->packet = NULL;
215 }
216 rs->bytesleft = size;
217 rs->mode = DATA;
218 }
219 }
220 return 0;
221 }
222
rep_receive_all_packets(replication * r,uint32_t msecto)223 static int rep_receive_all_packets(replication *r,uint32_t msecto) {
224 uint8_t i,l;
225 uint64_t st;
226 uint32_t msec;
227 st = monotonic_useconds();
228 for (;;) {
229 l=1;
230 for (i=0 ; i<r->srccnt ; i++) {
231 if (r->repsources[i].bytesleft>0) {
232 r->fds[i].events = POLLIN;
233 l=0;
234 } else {
235 r->fds[i].events = 0;
236 }
237 }
238 if (l) { // finished
239 return 0;
240 }
241 msec = (monotonic_useconds()-st)/1000;
242 if (msec>=msecto) {
243 syslog(LOG_NOTICE,"replicator: receive timed out");
244 return -1; // timed out
245 }
246 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
247 if (errno!=EINTR && ERRNO_ERROR) {
248 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
249 return -1;
250 }
251 continue;
252 }
253 for (i=0 ; i<r->srccnt ; i++) {
254 if (r->fds[i].revents & POLLHUP) {
255 syslog(LOG_NOTICE,"replicator: connection lost");
256 return -1;
257 }
258 if (r->fds[i].revents & POLLIN) {
259 if (rep_read(r->repsources+i)<0) {
260 return -1;
261 }
262 }
263 }
264 }
265 }
266
rep_create_packet(repsrc * rs,uint32_t type,uint32_t size)267 static uint8_t* rep_create_packet(repsrc *rs,uint32_t type,uint32_t size) {
268 uint8_t *ptr;
269 if (rs->packet) {
270 free(rs->packet);
271 }
272 rs->packet = malloc(size+8);
273 passert(rs->packet);
274 ptr = rs->packet;
275 put32bit(&ptr,type);
276 put32bit(&ptr,size);
277 rs->startptr = rs->packet;
278 rs->bytesleft = 8+size;
279 return ptr;
280 }
281
rep_no_packet(repsrc * rs)282 static void rep_no_packet(repsrc *rs) {
283 if (rs->packet) {
284 free(rs->packet);
285 }
286 rs->packet=NULL;
287 rs->startptr=NULL;
288 rs->bytesleft=0;
289 }
290
rep_write(repsrc * rs)291 static int rep_write(repsrc *rs) {
292 int i;
293 i = write(rs->sock,rs->startptr,rs->bytesleft);
294 if (i==0) {
295 syslog(LOG_NOTICE,"replicator: connection lost");
296 return -1;
297 }
298 if (i<0) {
299 if (ERRNO_ERROR) {
300 mfs_errlog_silent(LOG_NOTICE,"replicator: write error");
301 return -1;
302 }
303 return 0;
304 }
305 replicator_bytesout(i);
306 // stats_bytesin+=i;
307 rs->startptr+=i;
308 rs->bytesleft-=i;
309 return 0;
310 }
311
rep_send_all_packets(replication * r,uint32_t msecto)312 static int rep_send_all_packets(replication *r,uint32_t msecto) {
313 uint8_t i,l;
314 uint64_t st;
315 uint32_t msec;
316 st = monotonic_useconds();
317 for (;;) {
318 l=1;
319 for (i=0 ; i<r->srccnt ; i++) {
320 if (r->repsources[i].bytesleft>0) {
321 r->fds[i].events = POLLOUT;
322 l=0;
323 } else {
324 r->fds[i].events = 0;
325 }
326 }
327 if (l) { // finished
328 return 0;
329 }
330 msec = (monotonic_useconds()-st)/1000;
331 if (msec>=msecto) {
332 syslog(LOG_NOTICE,"replicator: send timed out");
333 return -1; // timed out
334 }
335 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
336 if (errno!=EINTR && ERRNO_ERROR) {
337 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
338 return -1;
339 }
340 continue;
341 }
342 for (i=0 ; i<r->srccnt ; i++) {
343 if (r->fds[i].revents & POLLHUP) {
344 syslog(LOG_NOTICE,"replicator: connection lost");
345 return -1;
346 }
347 if (r->fds[i].revents & POLLOUT) {
348 if (rep_write(r->repsources+i)<0) {
349 return -1;
350 }
351 }
352 }
353 }
354 }
355
rep_wait_for_connection(replication * r,uint32_t msecto)356 static int rep_wait_for_connection(replication *r,uint32_t msecto) {
357 uint8_t i,l;
358 uint64_t st;
359 uint32_t msec;
360 st = monotonic_useconds();
361 for (;;) {
362 l=1;
363 for (i=0 ; i<r->srccnt ; i++) {
364 if (r->repsources[i].mode==CONNECTING) {
365 r->fds[i].events = POLLOUT;
366 l=0;
367 } else {
368 r->fds[i].events = 0;
369 }
370 }
371 if (l) { // finished
372 return 0;
373 }
374 msec = (monotonic_useconds()-st)/1000;
375 if (msec>=msecto) {
376 syslog(LOG_NOTICE,"replicator: connect timed out");
377 return -1; // timed out
378 }
379 if (poll(r->fds,r->srccnt,msecto-msec)<0) {
380 if (errno!=EINTR && ERRNO_ERROR) {
381 mfs_errlog_silent(LOG_NOTICE,"replicator: poll error");
382 return -1;
383 }
384 continue;
385 }
386 for (i=0 ; i<r->srccnt ; i++) {
387 if (r->fds[i].revents & POLLHUP) {
388 syslog(LOG_NOTICE,"replicator: connection lost");
389 return -1;
390 }
391 if (r->fds[i].revents & POLLOUT) {
392 if (tcpgetstatus(r->repsources[i].sock)<0) {
393 mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
394 return -1;
395 }
396 r->repsources[i].mode=IDLE;
397 }
398 }
399 }
400 }
401
rep_cleanup(replication * r)402 static void rep_cleanup(replication *r) {
403 int i;
404 if (r->opened) {
405 hdd_close(r->chunkid);
406 }
407 if (r->created) {
408 hdd_delete(r->chunkid,0);
409 }
410 for (i=0 ; i<r->srccnt ; i++) {
411 if (r->repsources[i].sock>=0) {
412 tcpclose(r->repsources[i].sock);
413 }
414 if (r->repsources[i].packet) {
415 free(r->repsources[i].packet);
416 }
417 }
418 if (r->fds) {
419 free(r->fds);
420 }
421 if (r->repsources) {
422 free(r->repsources);
423 }
424 if (r->xorbuff) {
425 free(r->xorbuff);
426 }
427 }
428
429 /* srcs: srccnt * (chunkid:64 version:32 ip:32 port:16) */
replicate(uint64_t chunkid,uint32_t version,const uint32_t xormasks[4],uint8_t srccnt,const uint8_t * srcs)430 uint8_t replicate(uint64_t chunkid,uint32_t version,const uint32_t xormasks[4],uint8_t srccnt,const uint8_t *srcs) {
431 replication r;
432 uint8_t status,i,j,vbuffs,first;
433 uint16_t b,blocks;
434 uint32_t xcrc[4],crc;
435 uint32_t codeindex,codeword;
436 uint8_t *wptr;
437 const uint8_t *rptr;
438 int s;
439
440 if (srccnt==0) {
441 return MFS_ERROR_EINVAL;
442 }
443
444 // syslog(LOG_NOTICE,"replication begin (chunkid:%08"PRIX64",version:%04"PRIX32",srccnt:%"PRIu8")",chunkid,version,srccnt);
445
446 pthread_mutex_lock(&statslock);
447 stats_repl++;
448 pthread_mutex_unlock(&statslock);
449
450 // init replication structure
451 r.chunkid = chunkid;
452 r.version = version;
453 r.srccnt = 0;
454 r.created = 0;
455 r.opened = 0;
456 r.fds = malloc(sizeof(struct pollfd)*srccnt);
457 passert(r.fds);
458 r.repsources = malloc(sizeof(repsrc)*srccnt);
459 passert(r.repsources);
460 if (srccnt>1) {
461 r.xorbuff = malloc(MFSBLOCKSIZE+4);
462 passert(r.xorbuff);
463 } else {
464 r.xorbuff = NULL;
465 }
466 // create chunk
467 status = hdd_create(chunkid,0);
468 if (status!=MFS_STATUS_OK) {
469 syslog(LOG_NOTICE,"replicator: hdd_create status: %s",mfsstrerr(status));
470 rep_cleanup(&r);
471 return status;
472 }
473 r.created = 1;
474 // init sources
475 r.srccnt = srccnt;
476 for (i=0 ; i<srccnt ; i++) {
477 r.repsources[i].chunkid = get64bit(&srcs);
478 r.repsources[i].version = get32bit(&srcs);
479 r.repsources[i].ip = get32bit(&srcs);
480 r.repsources[i].port = get16bit(&srcs);
481 r.repsources[i].sock = -1;
482 r.repsources[i].packet = NULL;
483 }
484 // connect
485 for (i=0 ; i<srccnt ; i++) {
486 s = tcpsocket();
487 if (s<0) {
488 mfs_errlog_silent(LOG_NOTICE,"replicator: socket error");
489 rep_cleanup(&r);
490 return MFS_ERROR_CANTCONNECT;
491 }
492 r.repsources[i].sock = s;
493 r.fds[i].fd = s;
494 if (tcpnonblock(s)<0) {
495 mfs_errlog_silent(LOG_NOTICE,"replicator: nonblock error");
496 rep_cleanup(&r);
497 return MFS_ERROR_CANTCONNECT;
498 }
499 s = tcpnumconnect(s,r.repsources[i].ip,r.repsources[i].port);
500 if (s<0) {
501 mfs_errlog_silent(LOG_NOTICE,"replicator: connect error");
502 rep_cleanup(&r);
503 return MFS_ERROR_CANTCONNECT;
504 }
505 if (s==0) {
506 r.repsources[i].mode = IDLE;
507 } else {
508 r.repsources[i].mode = CONNECTING;
509 }
510 }
511 if (rep_wait_for_connection(&r,CONNMSECTO)<0) {
512 rep_cleanup(&r);
513 return MFS_ERROR_CANTCONNECT;
514 }
515 // disable Nagle
516 for (i=0 ; i<srccnt ; i++) {
517 tcpnodelay(r.repsources[i].sock);
518 }
519 // open chunk
520 status = hdd_open(chunkid,0);
521 if (status!=MFS_STATUS_OK) {
522 syslog(LOG_NOTICE,"replicator: hdd_open status: %s",mfsstrerr(status));
523 rep_cleanup(&r);
524 return status;
525 }
526 r.opened = 1;
527 // get block numbers
528 for (i=0 ; i<srccnt ; i++) {
529 wptr = rep_create_packet(r.repsources+i,ANTOCS_GET_CHUNK_BLOCKS,8+4);
530 if (wptr==NULL) {
531 syslog(LOG_NOTICE,"replicator: out of memory");
532 rep_cleanup(&r);
533 return MFS_ERROR_OUTOFMEMORY;
534 }
535 put64bit(&wptr,r.repsources[i].chunkid);
536 put32bit(&wptr,r.repsources[i].version);
537 }
538 // send packet
539 if (rep_send_all_packets(&r,SENDMSECTO)<0) {
540 rep_cleanup(&r);
541 return MFS_ERROR_DISCONNECTED;
542 }
543 // receive answers
544 for (i=0 ; i<srccnt ; i++) {
545 r.repsources[i].mode = HEADER;
546 r.repsources[i].startptr = r.repsources[i].hdrbuff;
547 r.repsources[i].bytesleft = 8;
548 }
549 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
550 rep_cleanup(&r);
551 return MFS_ERROR_DISCONNECTED;
552 }
553 // get # of blocks
554 blocks = 0;
555 for (i=0 ; i<srccnt ; i++) {
556 uint32_t type,size;
557 uint64_t pchid;
558 uint32_t pver;
559 uint16_t pblocks;
560 uint8_t pstatus;
561 uint32_t ip;
562 rptr = r.repsources[i].hdrbuff;
563 type = get32bit(&rptr);
564 size = get32bit(&rptr);
565 rptr = r.repsources[i].packet;
566 ip = r.repsources[i].ip;
567 if (rptr==NULL || type!=CSTOAN_CHUNK_BLOCKS || size!=15) {
568 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
569 rep_cleanup(&r);
570 return MFS_ERROR_DISCONNECTED;
571 }
572 pchid = get64bit(&rptr);
573 pver = get32bit(&rptr);
574 pblocks = get16bit(&rptr);
575 pstatus = get8bit(&rptr);
576 if (pchid!=r.repsources[i].chunkid) {
577 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
578 rep_cleanup(&r);
579 return MFS_ERROR_WRONGCHUNKID;
580 }
581 if (pver!=r.repsources[i].version) {
582 syslog(LOG_WARNING,"replicator,get # of blocks: got wrong answer (chunk_status:version:%"PRIX32"/%"PRIX32") from (%u.%u.%u.%u:%u)",pver,r.repsources[i].version,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
583 rep_cleanup(&r);
584 return MFS_ERROR_WRONGVERSION;
585 }
586 if (pstatus!=MFS_STATUS_OK) {
587 syslog(LOG_NOTICE,"replicator,get # of blocks: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
588 rep_cleanup(&r);
589 return pstatus;
590 }
591 r.repsources[i].blocks = pblocks;
592 if (pblocks>blocks) {
593 blocks=pblocks;
594 }
595 }
596 // create read request
597 for (i=0 ; i<srccnt ; i++) {
598 if (r.repsources[i].blocks>0) {
599 uint32_t leng;
600 wptr = rep_create_packet(r.repsources+i,CLTOCS_READ,8+4+4+4);
601 if (wptr==NULL) {
602 syslog(LOG_NOTICE,"replicator: out of memory");
603 rep_cleanup(&r);
604 return MFS_ERROR_OUTOFMEMORY;
605 }
606 leng = r.repsources[i].blocks*MFSBLOCKSIZE;
607 put64bit(&wptr,r.repsources[i].chunkid);
608 put32bit(&wptr,r.repsources[i].version);
609 put32bit(&wptr,0);
610 put32bit(&wptr,leng);
611 } else {
612 rep_no_packet(r.repsources+i);
613 }
614 }
615 // send read request
616 if (rep_send_all_packets(&r,SENDMSECTO)<0) {
617 rep_cleanup(&r);
618 return MFS_ERROR_DISCONNECTED;
619 }
620 // receive data and write to hdd
621 for (b=0 ; b<blocks ; b++) {
622 // prepare receive
623 for (i=0 ; i<srccnt ; i++) {
624 if (b<r.repsources[i].blocks) {
625 r.repsources[i].mode = HEADER;
626 r.repsources[i].startptr = r.repsources[i].hdrbuff;
627 r.repsources[i].bytesleft = 8;
628 } else {
629 r.repsources[i].mode = IDLE;
630 r.repsources[i].bytesleft = 0;
631 }
632 }
633 // receive data
634 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
635 rep_cleanup(&r);
636 return MFS_ERROR_DISCONNECTED;
637 }
638 // check packets
639 vbuffs = 0;
640 for (i=0 ; i<srccnt ; i++) {
641 if (r.repsources[i].mode!=IDLE) {
642 uint32_t type,size;
643 uint64_t pchid;
644 uint16_t pblocknum;
645 uint16_t poffset;
646 uint32_t psize;
647 uint8_t pstatus;
648 uint32_t ip;
649 rptr = r.repsources[i].hdrbuff;
650 type = get32bit(&rptr);
651 size = get32bit(&rptr);
652 rptr = r.repsources[i].packet;
653 ip = r.repsources[i].ip;
654 if (rptr==NULL) {
655 rep_cleanup(&r);
656 return MFS_ERROR_DISCONNECTED;
657 }
658 if (type==CSTOCL_READ_STATUS && size==9) {
659 pchid = get64bit(&rptr);
660 pstatus = get8bit(&rptr);
661 if (pchid!=r.repsources[i].chunkid) {
662 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
663 rep_cleanup(&r);
664 return MFS_ERROR_WRONGCHUNKID;
665 }
666 if (pstatus==MFS_STATUS_OK) { // got status too early or got incorrect packet
667 syslog(LOG_WARNING,"replicator,read chunks: got unexpected ok status from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
668 rep_cleanup(&r);
669 return MFS_ERROR_DISCONNECTED;
670 }
671 syslog(LOG_NOTICE,"replicator,read chunks: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
672 rep_cleanup(&r);
673 return pstatus;
674 } else if (type==CSTOCL_READ_DATA && size==20+MFSBLOCKSIZE) {
675 pchid = get64bit(&rptr);
676 pblocknum = get16bit(&rptr);
677 poffset = get16bit(&rptr);
678 psize = get32bit(&rptr);
679 if (pchid!=r.repsources[i].chunkid) {
680 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
681 rep_cleanup(&r);
682 return MFS_ERROR_WRONGCHUNKID;
683 }
684 if (pblocknum!=b) {
685 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:blocknum:%"PRIu16"/%"PRIu16") from (%u.%u.%u.%u:%u)",pblocknum,b,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
686 rep_cleanup(&r);
687 return MFS_ERROR_DISCONNECTED;
688 }
689 if (poffset!=0) {
690 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:offset:%"PRIu16") from (%u.%u.%u.%u:%u)",poffset,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
691 rep_cleanup(&r);
692 return MFS_ERROR_WRONGOFFSET;
693 }
694 if (psize!=MFSBLOCKSIZE) {
695 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (read_data:size:%"PRIu32") from (%u.%u.%u.%u:%u)",psize,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
696 rep_cleanup(&r);
697 return MFS_ERROR_WRONGSIZE;
698 }
699 } else {
700 syslog(LOG_WARNING,"replicator,read chunks: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
701 rep_cleanup(&r);
702 return MFS_ERROR_DISCONNECTED;
703 }
704 vbuffs++;
705 }
706 }
707 // write data
708 if (vbuffs==0) { // no buffers ? - it should never happen
709 syslog(LOG_WARNING,"replicator: no data received for block: %"PRIu16,b);
710 rep_cleanup(&r);
711 return MFS_ERROR_DISCONNECTED;
712 } else if (vbuffs==1) { // xor not needed, so just find block and write it
713 for (i=0 ; i<srccnt ; i++) {
714 if (r.repsources[i].mode!=IDLE) {
715 rptr = r.repsources[i].packet;
716 status = hdd_write(chunkid,0,b,rptr+20,0,MFSBLOCKSIZE,rptr+16);
717 if (status!=MFS_STATUS_OK) {
718 syslog(LOG_WARNING,"replicator: write status: %s",mfsstrerr(status));
719 rep_cleanup(&r);
720 return status;
721 }
722 }
723 }
724 } else {
725 for (i=0 ; i<srccnt ; i++) {
726 if (r.repsources[i].mode!=IDLE) {
727 rptr = r.repsources[i].packet;
728 rptr += 16;
729 crc = get32bit(&rptr);
730 for (j=0 ; j<4 ; j++) {
731 r.repsources[i].crcsums[j] = mycrc32(0,rptr+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
732 }
733 if (crc != mycrc32_combine(mycrc32_combine(r.repsources[i].crcsums[0],r.repsources[i].crcsums[1],MFSBLOCKSIZE/4),mycrc32_combine(r.repsources[i].crcsums[2],r.repsources[i].crcsums[3],MFSBLOCKSIZE/4),MFSBLOCKSIZE/2)) {
734 uint32_t ip;
735 ip = r.repsources[i].ip;
736 syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
737 rep_cleanup(&r);
738 return MFS_ERROR_CRC;
739 }
740 }
741 }
742 crc = mycrc32_zeroblock(0,MFSBLOCKSIZE/4);
743 for (codeindex=0 ; codeindex<4 ; codeindex++) {
744 codeword = xormasks[codeindex];
745 first = 1;
746 for (i=0 ; i<srccnt ; i++) {
747 for (j=0 ; j<4 ; j++) {
748 if (r.repsources[i].mode!=IDLE && (codeword&UINT32_C(0x80000000))) {
749 rptr = r.repsources[i].packet;
750 rptr += 16;
751 if (first) {
752 memcpy(r.xorbuff+4+codeindex*MFSBLOCKSIZE/4,rptr+4+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
753 first = 0;
754 xcrc[codeindex] = r.repsources[i].crcsums[j];
755 } else {
756 xordata(r.xorbuff+4+codeindex*MFSBLOCKSIZE/4,rptr+4+j*MFSBLOCKSIZE/4,MFSBLOCKSIZE/4);
757 xcrc[codeindex] ^= r.repsources[i].crcsums[j] ^ crc;
758 }
759 }
760 codeword>>=1;
761 }
762 }
763 }
764 crc = mycrc32_combine(mycrc32_combine(xcrc[0],xcrc[1],MFSBLOCKSIZE/4),mycrc32_combine(xcrc[2],xcrc[3],MFSBLOCKSIZE/4),MFSBLOCKSIZE/2);
765 wptr = r.xorbuff;
766 put32bit(&wptr,crc);
767 /*
768 first=1;
769 if (vbuffs&1) {
770 xcrc = 0;
771 } else {
772 xcrc = MFSCRCEMPTY; // = mycrc32_zeroblock(0,0x10000);
773 }
774 for (i=0 ; i<srccnt ; i++) {
775 if (r.repsources[i].mode!=IDLE) {
776 uint32_t ip;
777 ip = r.repsources[i].ip;
778 rptr = r.repsources[i].packet;
779 rptr+=16; // skip chunkid,blockno,offset and size
780 if (first) {
781 memcpy(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
782 first=0;
783 } else {
784 xordata(r.xorbuff+4,rptr+4,MFSBLOCKSIZE);
785 }
786 crc = get32bit(&rptr);
787 if (crc!=mycrc32(0,rptr,MFSBLOCKSIZE)) {
788 syslog(LOG_WARNING,"replicator: received data with wrong checksum from (%u.%u.%u.%u:%u)",(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
789 rep_cleanup(&r);
790 return MFS_ERROR_CRC;
791 }
792 xcrc^=crc;
793 }
794 }
795 wptr = r.xorbuff;
796 put32bit(&wptr,xcrc);
797 */
798 status = hdd_write(chunkid,0,b,r.xorbuff+4,0,MFSBLOCKSIZE,r.xorbuff);
799 if (status!=MFS_STATUS_OK) {
800 syslog(LOG_WARNING,"replicator: xor write status: %s",mfsstrerr(status));
801 rep_cleanup(&r);
802 return status;
803 }
804 }
805 }
806 // receive status
807 for (i=0 ; i<srccnt ; i++) {
808 if (r.repsources[i].blocks>0) {
809 // if (r.repsources[i].packet) {
810 // free(r.repsources[i].packet);
811 // r.repsources[i].packet=NULL;
812 // }
813 r.repsources[i].mode = HEADER;
814 r.repsources[i].startptr = r.repsources[i].hdrbuff;
815 r.repsources[i].bytesleft = 8;
816 } else {
817 r.repsources[i].mode = IDLE;
818 r.repsources[i].bytesleft = 0;
819 }
820 }
821 if (rep_receive_all_packets(&r,RECVMSECTO)<0) {
822 rep_cleanup(&r);
823 return MFS_ERROR_DISCONNECTED;
824 }
825 for (i=0 ; i<srccnt ; i++) {
826 if (r.repsources[i].blocks>0) {
827 uint32_t type,size;
828 uint64_t pchid;
829 uint8_t pstatus;
830 uint32_t ip;
831 rptr = r.repsources[i].hdrbuff;
832 type = get32bit(&rptr);
833 size = get32bit(&rptr);
834 rptr = r.repsources[i].packet;
835 ip = r.repsources[i].ip;
836 if (rptr==NULL || type!=CSTOCL_READ_STATUS || size!=9) {
837 syslog(LOG_WARNING,"replicator,check status: got wrong answer (type:0x%08"PRIX32"/size:0x%08"PRIX32") from (%u.%u.%u.%u:%u)",type,size,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
838 rep_cleanup(&r);
839 return MFS_ERROR_DISCONNECTED;
840 }
841 pchid = get64bit(&rptr);
842 pstatus = get8bit(&rptr);
843 if (pchid!=r.repsources[i].chunkid) {
844 syslog(LOG_WARNING,"replicator,check status: got wrong answer (read_status:chunkid:%"PRIX64"/%"PRIX64") from (%u.%u.%u.%u:%u)",pchid,r.repsources[i].chunkid,(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
845 rep_cleanup(&r);
846 return MFS_ERROR_WRONGCHUNKID;
847 }
848 if (pstatus!=MFS_STATUS_OK) {
849 syslog(LOG_NOTICE,"replicator,check status: got status: %s from (%u.%u.%u.%u:%u)",mfsstrerr(pstatus),(ip>>24)&0xFF,(ip>>16)&0xFF,(ip>>8)&0xFF,ip&0xFF,r.repsources[i].port);
850 rep_cleanup(&r);
851 return pstatus;
852 }
853 }
854 }
855 // close chunk and change version
856 status = hdd_close(chunkid);
857 if (status!=MFS_STATUS_OK) {
858 syslog(LOG_NOTICE,"replicator: hdd_close status: %s",mfsstrerr(status));
859 rep_cleanup(&r);
860 return status;
861 }
862 r.opened = 0;
863 status = hdd_version(chunkid,0,version);
864 if (status!=MFS_STATUS_OK) {
865 syslog(LOG_NOTICE,"replicator: hdd_version status: %s",mfsstrerr(status));
866 rep_cleanup(&r);
867 return status;
868 }
869 r.created = 0;
870 rep_cleanup(&r);
871 return MFS_STATUS_OK;
872 }
873