1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <time.h>
26 #include <stddef.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <sys/uio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #ifdef HAVE_WRITEV
40 #include <sys/uio.h>
41 #endif
42
43 #include "MFSCommunication.h"
44 #include "datapack.h"
45 #include "masterconn.h"
46 #include "crc.h"
47 #include "cfg.h"
48 #include "main.h"
49 #include "slogger.h"
50 #include "massert.h"
51 #include "sockets.h"
52 #include "clocks.h"
53
54 #define MaxPacketSize ANTOMA_MAXPACKETSIZE
55
56 #define META_DL_BLOCK ((((MATOAN_MAXPACKETSIZE) - 1000) < 1000000) ? ((MATOAN_MAXPACKETSIZE) - 1000) : 1000000)
57
58 // mode
59 enum {FREE,CONNECTING,DATA,KILL};
60
61 typedef struct out_packetstruct {
62 struct out_packetstruct *next;
63 uint8_t *startptr;
64 uint32_t bytesleft;
65 uint8_t data[1];
66 } out_packetstruct;
67
68 typedef struct in_packetstruct {
69 struct in_packetstruct *next;
70 uint32_t type,leng;
71 uint8_t data[1];
72 } in_packetstruct;
73
74 typedef struct masterconn {
75 uint8_t mode;
76 int sock;
77 int32_t pdescpos;
78 double lastread,lastwrite,conntime;
79 uint8_t input_hdr[8];
80 uint8_t *input_startptr;
81 uint32_t input_bytesleft;
82 uint8_t input_end;
83 in_packetstruct *input_packet;
84 in_packetstruct *inputhead,**inputtail;
85 out_packetstruct *outputhead,**outputtail;
86
87 uint32_t bindip;
88 uint32_t masterip;
89 uint16_t masterport;
90 uint8_t masteraddrvalid;
91
92 uint8_t downloadretrycnt;
93 uint8_t downloading;
94 uint8_t oldmode;
95 FILE *logfd; // using stdio because this is text file
96 int metafd; // using standard unix I/O because this is binary file
97 uint64_t filesize;
98 uint64_t dloffset;
99 uint64_t dlstartuts;
100 } masterconn;
101
102 static masterconn *masterconnsingleton=NULL;
103
104 // from config
105 static uint32_t BackLogsNumber;
106 static uint32_t BackMetaCopies;
107 static char *MasterHost;
108 static char *MasterPort;
109 static char *BindHost;
110 static uint32_t Timeout;
111 static void *reconnect_hook;
112 static void *download_hook;
113 static uint64_t lastlogversion=0;
114
115 static uint32_t stats_bytesout=0;
116 static uint32_t stats_bytesin=0;
117
masterconn_stats(uint32_t * bin,uint32_t * bout)118 void masterconn_stats(uint32_t *bin,uint32_t *bout) {
119 *bin = stats_bytesin;
120 *bout = stats_bytesout;
121 stats_bytesin = 0;
122 stats_bytesout = 0;
123 }
124
masterconn_findlastlogversion(void)125 void masterconn_findlastlogversion(void) {
126 struct stat st;
127 uint8_t buff[32800]; // 32800 = 32768 + 32
128 uint64_t size;
129 uint32_t buffpos;
130 uint64_t lastnewline;
131 int fd;
132
133 lastlogversion = 0;
134
135 if (stat("metadata_ml.mfs.back",&st)<0 || st.st_size==0 || (st.st_mode & S_IFMT)!=S_IFREG) {
136 return;
137 }
138
139 fd = open("changelog_ml.0.back",O_RDWR);
140 if (fd<0) {
141 return;
142 }
143 fstat(fd,&st);
144 size = st.st_size;
145 memset(buff,0,32);
146 lastnewline = 0;
147 while (size>0 && size+200000>(uint64_t)(st.st_size)) {
148 if (size>32768) {
149 memcpy(buff+32768,buff,32);
150 size-=32768;
151 lseek(fd,size,SEEK_SET);
152 if (read(fd,buff,32768)!=32768) {
153 lastlogversion = 0;
154 close(fd);
155 return;
156 }
157 buffpos = 32768;
158 } else {
159 memmove(buff+size,buff,32);
160 lseek(fd,0,SEEK_SET);
161 if (read(fd,buff,size)!=(ssize_t)size) {
162 lastlogversion = 0;
163 close(fd);
164 return;
165 }
166 buffpos = size;
167 size = 0;
168 }
169 // size = position in file of first byte in buff
170 // buffpos = position of last byte in buff to search
171 while (buffpos>0) {
172 buffpos--;
173 if (buff[buffpos]=='\n') {
174 if (lastnewline==0) {
175 lastnewline = size + buffpos;
176 } else {
177 if (lastnewline+1 != (uint64_t)(st.st_size)) { // garbage at the end of file - truncate
178 if (ftruncate(fd,lastnewline+1)<0) {
179 lastlogversion = 0;
180 close(fd);
181 return;
182 }
183 }
184 buffpos++;
185 while (buffpos<32800 && buff[buffpos]>='0' && buff[buffpos]<='9') {
186 lastlogversion *= 10;
187 lastlogversion += buff[buffpos]-'0';
188 buffpos++;
189 }
190 if (buffpos==32800 || buff[buffpos]!=':') {
191 lastlogversion = 0;
192 }
193 close(fd);
194 return;
195 }
196 }
197 }
198 }
199 close(fd);
200 return;
201 }
202
masterconn_createpacket(masterconn * eptr,uint32_t type,uint32_t size)203 uint8_t* masterconn_createpacket(masterconn *eptr,uint32_t type,uint32_t size) {
204 out_packetstruct *outpacket;
205 uint8_t *ptr;
206 uint32_t psize;
207
208 psize = size+8;
209 outpacket=malloc(offsetof(out_packetstruct,data)+psize);
210 passert(outpacket);
211 outpacket->bytesleft = psize;
212 ptr = outpacket->data;
213 put32bit(&ptr,type);
214 put32bit(&ptr,size);
215 outpacket->startptr = outpacket->data;
216 outpacket->next = NULL;
217 *(eptr->outputtail) = outpacket;
218 eptr->outputtail = &(outpacket->next);
219 return ptr;
220 }
221
masterconn_sendregister(masterconn * eptr)222 void masterconn_sendregister(masterconn *eptr) {
223 uint8_t *buff;
224
225 eptr->downloading=0;
226 eptr->metafd=-1;
227 eptr->logfd=NULL;
228
229 if (lastlogversion>0) {
230 buff = masterconn_createpacket(eptr,ANTOMA_REGISTER,1+4+2+8);
231 put8bit(&buff,2);
232 put16bit(&buff,VERSMAJ);
233 put8bit(&buff,VERSMID);
234 put8bit(&buff,VERSMIN);
235 put16bit(&buff,Timeout);
236 put64bit(&buff,lastlogversion);
237 } else {
238 buff = masterconn_createpacket(eptr,ANTOMA_REGISTER,1+4+2);
239 put8bit(&buff,1);
240 put16bit(&buff,VERSMAJ);
241 put8bit(&buff,VERSMID);
242 put8bit(&buff,VERSMIN);
243 put16bit(&buff,Timeout);
244 }
245 }
246
247
masterconn_metachanges_log(masterconn * eptr,const uint8_t * data,uint32_t length)248 void masterconn_metachanges_log(masterconn *eptr,const uint8_t *data,uint32_t length) {
249 char logname1[100],logname2[100];
250 uint32_t i;
251 uint64_t version;
252 if (length==1 && data[0]==0x55) {
253 if (eptr->logfd!=NULL) {
254 fclose(eptr->logfd);
255 eptr->logfd=NULL;
256 }
257 if (BackLogsNumber>0) {
258 for (i=BackLogsNumber ; i>0 ; i--) {
259 snprintf(logname1,100,"changelog_ml.%"PRIu32".mfs",i);
260 snprintf(logname2,100,"changelog_ml.%"PRIu32".mfs",i-1);
261 rename(logname2,logname1);
262 }
263 } else {
264 unlink("changelog_ml.0.mfs");
265 }
266 return;
267 }
268 if (length<10) {
269 syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - wrong size (%"PRIu32"/9+data)",length);
270 eptr->mode = KILL;
271 return;
272 }
273 if (data[0]!=0xFF) {
274 syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - wrong packet");
275 eptr->mode = KILL;
276 return;
277 }
278 if (data[length-1]!='\0') {
279 syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - invalid string");
280 eptr->mode = KILL;
281 return;
282 }
283
284 data++;
285 version = get64bit(&data);
286
287 if (lastlogversion>0 && version!=lastlogversion+1) {
288 syslog(LOG_WARNING, "some changes lost: [%"PRIu64"-%"PRIu64"], download metadata again",lastlogversion,version-1);
289 if (eptr->logfd!=NULL) {
290 fclose(eptr->logfd);
291 eptr->logfd=NULL;
292 }
293 for (i=0 ; i<=BackLogsNumber ; i++) {
294 snprintf(logname1,100,"changelog_ml.%"PRIu32".mfs",i);
295 unlink(logname1);
296 }
297 lastlogversion = 0;
298 eptr->mode = KILL;
299 return;
300 }
301
302 if (eptr->logfd==NULL) {
303 eptr->logfd = fopen("changelog_ml.0.mfs","a");
304 }
305
306 if (eptr->logfd) {
307 fprintf(eptr->logfd,"%"PRIu64": %s\n",version,data);
308 lastlogversion = version;
309 } else {
310 syslog(LOG_NOTICE,"lost MFS change %"PRIu64": %s",version,data);
311 }
312 }
313
masterconn_metachanges_flush(void)314 void masterconn_metachanges_flush(void) {
315 masterconn *eptr = masterconnsingleton;
316 if (eptr->logfd) {
317 fflush(eptr->logfd);
318 }
319 }
320
masterconn_download_end(masterconn * eptr)321 int masterconn_download_end(masterconn *eptr) {
322 eptr->downloading=0;
323 masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_END,0);
324 if (eptr->metafd>=0) {
325 if (close(eptr->metafd)<0) {
326 mfs_errlog_silent(LOG_NOTICE,"error closing metafile");
327 eptr->metafd=-1;
328 return -1;
329 }
330 eptr->metafd=-1;
331 }
332 return 0;
333 }
334
masterconn_download_init(masterconn * eptr,uint8_t filenum)335 void masterconn_download_init(masterconn *eptr,uint8_t filenum) {
336 uint8_t *ptr;
337 // syslog(LOG_NOTICE,"download_init %d",filenum);
338 if (eptr->mode==DATA && eptr->downloading==0) {
339 // syslog(LOG_NOTICE,"sending packet");
340 ptr = masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_START,1);
341 put8bit(&ptr,filenum);
342 eptr->downloading=filenum;
343 }
344 }
345
masterconn_metadownloadinit(void)346 void masterconn_metadownloadinit(void) {
347 masterconn_download_init(masterconnsingleton,1);
348 }
349
masterconn_metadata_check(char * name)350 int masterconn_metadata_check(char *name) {
351 int fd;
352 char chkbuff[16];
353 char eofmark[16];
354 const uint8_t *rptr;
355 uint64_t metaversion,metaid;
356 fd = open(name,O_RDONLY);
357 if (fd<0) {
358 syslog(LOG_WARNING,"can't open downloaded metadata");
359 return -1;
360 }
361 if (read(fd,chkbuff,8)!=8) {
362 syslog(LOG_WARNING,"can't read downloaded metadata");
363 close(fd);
364 return -1;
365 }
366 if (memcmp(chkbuff,"MFSM NEW",8)==0) { // silently ignore "new file"
367 close(fd);
368 return -1;
369 }
370 if (memcmp(chkbuff,MFSSIGNATURE "M ",5)==0 && chkbuff[5]>='1' && chkbuff[5]<='9' && chkbuff[6]=='.' && chkbuff[7]>='0' && chkbuff[7]<='9') {
371 uint8_t fver = ((chkbuff[5]-'0')<<4)+(chkbuff[7]-'0');
372 if (fver<0x17) {
373 memset(eofmark,0,16);
374 } else {
375 memcpy(eofmark,"[MFS EOF MARKER]",16);
376 if (fver>=0x20) {
377 if (read(fd,chkbuff,16)!=16) {
378 syslog(LOG_WARNING,"can't read downloaded metadata");
379 close(fd);
380 return -1;
381 }
382 rptr = (uint8_t*)chkbuff;
383 metaversion = get64bit(&rptr);
384 metaid = get64bit(&rptr);
385 syslog(LOG_NOTICE,"meta data version: %"PRIu64", meta data id: 0x%016"PRIX64,metaversion,metaid);
386 }
387 }
388 } else {
389 syslog(LOG_WARNING,"bad metadata file format");
390 close(fd);
391 return -1;
392 }
393 lseek(fd,-16,SEEK_END);
394 if (read(fd,chkbuff,16)!=16) {
395 syslog(LOG_WARNING,"can't read downloaded metadata");
396 close(fd);
397 return -1;
398 }
399 close(fd);
400 if (memcmp(chkbuff,eofmark,16)!=0) {
401 syslog(LOG_WARNING,"truncated metadata file !!!");
402 return -1;
403 }
404 return 0;
405 }
406
masterconn_download_next(masterconn * eptr)407 void masterconn_download_next(masterconn *eptr) {
408 uint8_t *ptr;
409 uint8_t filenum;
410 int64_t dltime;
411 if (eptr->dloffset>=eptr->filesize) { // end of file
412 filenum = eptr->downloading;
413 if (masterconn_download_end(eptr)<0) {
414 return;
415 }
416 dltime = monotonic_useconds()-eptr->dlstartuts;
417 if (dltime<=0) {
418 dltime=1;
419 }
420 syslog(LOG_NOTICE,"%s downloaded %"PRIu64"B/%"PRIu64".%06"PRIu32"s (%.3lf MB/s)",(filenum==1)?"metadata":(filenum==11)?"changelog_0":(filenum==12)?"changelog_1":"???",eptr->filesize,dltime/1000000,(uint32_t)(dltime%1000000),(double)(eptr->filesize)/(double)(dltime));
421 if (filenum==1) {
422 if (masterconn_metadata_check("metadata_ml.tmp")==0) {
423 if (BackMetaCopies>0) {
424 char metaname1[100],metaname2[100];
425 int i;
426 for (i=BackMetaCopies-1 ; i>0 ; i--) {
427 snprintf(metaname1,100,"metadata_ml.mfs.back.%"PRIu32,i+1);
428 snprintf(metaname2,100,"metadata_ml.mfs.back.%"PRIu32,i);
429 rename(metaname2,metaname1);
430 }
431 rename("metadata_ml.mfs.back","metadata_ml.mfs.back.1");
432 }
433 if (rename("metadata_ml.tmp","metadata_ml.mfs.back")<0) {
434 syslog(LOG_NOTICE,"can't rename downloaded metadata - do it manually before next download");
435 }
436 }
437 if (eptr->oldmode==0) {
438 masterconn_download_init(eptr,11);
439 }
440 } else if (filenum==11) {
441 if (rename("changelog_ml.tmp","changelog_ml_back.0.mfs")<0) {
442 syslog(LOG_NOTICE,"can't rename downloaded changelog - do it manually before next download");
443 }
444 masterconn_download_init(eptr,12);
445 } else if (filenum==12) {
446 if (rename("changelog_ml.tmp","changelog_ml_back.1.mfs")<0) {
447 syslog(LOG_NOTICE,"can't rename downloaded changelog - do it manually before next download");
448 }
449 }
450 } else { // send request for next data packet
451 ptr = masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_REQUEST,12);
452 put64bit(&ptr,eptr->dloffset);
453 if (eptr->filesize-eptr->dloffset>META_DL_BLOCK) {
454 put32bit(&ptr,META_DL_BLOCK);
455 } else {
456 put32bit(&ptr,eptr->filesize-eptr->dloffset);
457 }
458 }
459 }
460
masterconn_download_info(masterconn * eptr,const uint8_t * data,uint32_t length)461 void masterconn_download_info(masterconn *eptr,const uint8_t *data,uint32_t length) {
462 if (length!=1 && length!=8) {
463 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_INFO - wrong size (%"PRIu32"/1|8)",length);
464 eptr->mode = KILL;
465 return;
466 }
467 passert(data);
468 if (length==1) {
469 eptr->downloading = 0;
470 syslog(LOG_NOTICE,"download start error");
471 return;
472 }
473 eptr->filesize = get64bit(&data);
474 eptr->dloffset = 0;
475 eptr->downloadretrycnt = 0;
476 eptr->dlstartuts = monotonic_useconds();
477 if (eptr->downloading==1) {
478 eptr->metafd = open("metadata_ml.tmp",O_WRONLY | O_TRUNC | O_CREAT,0666);
479 } else if (eptr->downloading==11 || eptr->downloading==12) {
480 eptr->metafd = open("changelog_ml.tmp",O_WRONLY | O_TRUNC | O_CREAT,0666);
481 } else {
482 syslog(LOG_NOTICE,"unexpected MATOAN_DOWNLOAD_INFO packet");
483 eptr->mode = KILL;
484 return;
485 }
486 if (eptr->metafd<0) {
487 mfs_errlog_silent(LOG_NOTICE,"error opening metafile");
488 masterconn_download_end(eptr);
489 return;
490 }
491 masterconn_download_next(eptr);
492 }
493
masterconn_download_data(masterconn * eptr,const uint8_t * data,uint32_t length)494 void masterconn_download_data(masterconn *eptr,const uint8_t *data,uint32_t length) {
495 uint64_t offset;
496 uint32_t leng;
497 uint32_t crc;
498 ssize_t ret;
499 if (eptr->metafd<0) {
500 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - file not opened");
501 eptr->mode = KILL;
502 return;
503 }
504 if (length<16) {
505 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - wrong size (%"PRIu32"/16+data)",length);
506 eptr->mode = KILL;
507 return;
508 }
509 passert(data);
510 offset = get64bit(&data);
511 leng = get32bit(&data);
512 crc = get32bit(&data);
513 if (leng+16!=length) {
514 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - wrong size (%"PRIu32"/16+%"PRIu32")",length,leng);
515 eptr->mode = KILL;
516 return;
517 }
518 if (offset!=eptr->dloffset) {
519 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - unexpected file offset (%"PRIu64"/%"PRIu64")",offset,eptr->dloffset);
520 eptr->mode = KILL;
521 return;
522 }
523 if (offset+leng>eptr->filesize) {
524 syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - unexpected file size (%"PRIu64"/%"PRIu64")",offset+leng,eptr->filesize);
525 eptr->mode = KILL;
526 return;
527 }
528 #ifdef HAVE_PWRITE
529 ret = pwrite(eptr->metafd,data,leng,offset);
530 #else /* HAVE_PWRITE */
531 lseek(eptr->metafd,offset,SEEK_SET);
532 ret = write(eptr->metafd,data,leng);
533 #endif /* HAVE_PWRITE */
534 if (ret!=(ssize_t)leng) {
535 mfs_errlog_silent(LOG_NOTICE,"error writing metafile");
536 if (eptr->downloadretrycnt>=5) {
537 masterconn_download_end(eptr);
538 } else {
539 eptr->downloadretrycnt++;
540 masterconn_download_next(eptr);
541 }
542 return;
543 }
544 if (crc!=mycrc32(0,data,leng)) {
545 syslog(LOG_NOTICE,"metafile data crc error");
546 if (eptr->downloadretrycnt>=5) {
547 masterconn_download_end(eptr);
548 } else {
549 eptr->downloadretrycnt++;
550 masterconn_download_next(eptr);
551 }
552 return;
553 }
554 if (fsync(eptr->metafd)<0) {
555 mfs_errlog_silent(LOG_NOTICE,"error syncing metafile");
556 if (eptr->downloadretrycnt>=5) {
557 masterconn_download_end(eptr);
558 } else {
559 eptr->downloadretrycnt++;
560 masterconn_download_next(eptr);
561 }
562 return;
563 }
564 eptr->dloffset+=leng;
565 eptr->downloadretrycnt=0;
566 masterconn_download_next(eptr);
567 }
568
masterconn_beforeclose(masterconn * eptr)569 void masterconn_beforeclose(masterconn *eptr) {
570 if (eptr->downloading==11 || eptr->downloading==12) { // old (version less than 1.6.18) master patch
571 syslog(LOG_WARNING,"old master detected - please upgrade your master server and then restart metalogger");
572 eptr->oldmode=1;
573 }
574 if (eptr->metafd>=0) {
575 close(eptr->metafd);
576 eptr->metafd=-1;
577 unlink("metadata_ml.tmp");
578 unlink("changelog_ml.tmp");
579 }
580 if (eptr->logfd) {
581 fclose(eptr->logfd);
582 eptr->logfd = NULL;
583 }
584 }
585
masterconn_gotpacket(masterconn * eptr,uint32_t type,const uint8_t * data,uint32_t length)586 void masterconn_gotpacket(masterconn *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
587 switch (type) {
588 case ANTOAN_NOP:
589 break;
590 case ANTOAN_UNKNOWN_COMMAND: // for future use
591 break;
592 case ANTOAN_BAD_COMMAND_SIZE: // for future use
593 break;
594 case MATOAN_METACHANGES_LOG:
595 masterconn_metachanges_log(eptr,data,length);
596 break;
597 case MATOAN_DOWNLOAD_INFO:
598 masterconn_download_info(eptr,data,length);
599 break;
600 case MATOAN_DOWNLOAD_DATA:
601 masterconn_download_data(eptr,data,length);
602 break;
603 default:
604 syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
605 eptr->mode = KILL;
606 }
607 }
608
masterconn_connected(masterconn * eptr)609 void masterconn_connected(masterconn *eptr) {
610 double now;
611
612 now = monotonic_seconds();
613 tcpnodelay(eptr->sock);
614 eptr->mode = DATA;
615 eptr->lastread = now;
616 eptr->lastwrite = now;
617 eptr->input_bytesleft = 8;
618 eptr->input_startptr = eptr->input_hdr;
619 eptr->input_end = 0;
620 eptr->input_packet = NULL;
621 eptr->inputhead = NULL;
622 eptr->inputtail = &(eptr->inputhead);
623 eptr->outputhead = NULL;
624 eptr->outputtail = &(eptr->outputhead);
625
626 masterconn_sendregister(eptr);
627 if (lastlogversion==0) {
628 masterconn_metadownloadinit();
629 }
630 }
631
masterconn_initconnect(masterconn * eptr)632 int masterconn_initconnect(masterconn *eptr) {
633 int status;
634 if (eptr->masteraddrvalid==0) {
635 uint32_t mip,bip;
636 uint16_t mport;
637 if (tcpresolve(BindHost,NULL,&bip,NULL,1)<0) {
638 bip = 0;
639 }
640 eptr->bindip = bip;
641 if (tcpresolve(MasterHost,MasterPort,&mip,&mport,0)>=0) {
642 eptr->masterip = mip;
643 eptr->masterport = mport;
644 eptr->masteraddrvalid = 1;
645 } else {
646 mfs_arg_syslog(LOG_WARNING,"can't resolve master host/port (%s:%s)",MasterHost,MasterPort);
647 return -1;
648 }
649 }
650 eptr->sock=tcpsocket();
651 if (eptr->sock<0) {
652 mfs_errlog(LOG_WARNING,"create socket, error");
653 return -1;
654 }
655 if (tcpnonblock(eptr->sock)<0) {
656 mfs_errlog(LOG_WARNING,"set nonblock, error");
657 tcpclose(eptr->sock);
658 eptr->sock = -1;
659 return -1;
660 }
661 if (eptr->bindip>0) {
662 if (tcpnumbind(eptr->sock,eptr->bindip,0)<0) {
663 mfs_errlog(LOG_WARNING,"can't bind socket to given ip");
664 tcpclose(eptr->sock);
665 eptr->sock = -1;
666 return -1;
667 }
668 }
669 status = tcpnumconnect(eptr->sock,eptr->masterip,eptr->masterport);
670 if (status<0) {
671 mfs_errlog(LOG_WARNING,"connect failed, error");
672 tcpclose(eptr->sock);
673 eptr->sock = -1;
674 eptr->masteraddrvalid = 0;
675 return -1;
676 }
677 if (status==0) {
678 syslog(LOG_NOTICE,"connected to Master immediately");
679 masterconn_connected(eptr);
680 } else {
681 eptr->mode = CONNECTING;
682 eptr->conntime = monotonic_seconds();
683 syslog(LOG_NOTICE,"connecting ...");
684 }
685 return 0;
686 }
687
masterconn_connecttimeout(masterconn * eptr)688 void masterconn_connecttimeout(masterconn *eptr) {
689 syslog(LOG_WARNING,"connection timed out");
690 tcpclose(eptr->sock);
691 eptr->sock = -1;
692 eptr->mode = FREE;
693 eptr->masteraddrvalid = 0;
694 }
695
masterconn_connecttest(masterconn * eptr)696 void masterconn_connecttest(masterconn *eptr) {
697 int status;
698
699 status = tcpgetstatus(eptr->sock);
700 if (status) {
701 mfs_errlog_silent(LOG_WARNING,"connection failed, error");
702 tcpclose(eptr->sock);
703 eptr->sock = -1;
704 eptr->mode = FREE;
705 eptr->masteraddrvalid = 0;
706 } else {
707 syslog(LOG_NOTICE,"connected to Master");
708 masterconn_connected(eptr);
709 }
710 }
711
masterconn_read(masterconn * eptr,double now)712 void masterconn_read(masterconn *eptr,double now) {
713 int32_t i;
714 uint32_t type,leng;
715 const uint8_t *ptr;
716 uint32_t rbleng,rbpos;
717 uint8_t err,hup;
718 static uint8_t *readbuff = NULL;
719 static uint32_t readbuffsize = 0;
720
721 if (eptr == NULL) {
722 if (readbuff != NULL) {
723 free(readbuff);
724 }
725 readbuff = NULL;
726 readbuffsize = 0;
727 return;
728 }
729
730 if (readbuffsize==0) {
731 readbuffsize = 65536;
732 readbuff = malloc(readbuffsize);
733 passert(readbuff);
734 }
735
736 rbleng = 0;
737 err = 0;
738 hup = 0;
739 for (;;) {
740 i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
741 if (i==0) {
742 hup = 1;
743 break;
744 } else if (i<0) {
745 if (ERRNO_ERROR) {
746 err = 1;
747 }
748 break;
749 } else {
750 stats_bytesin+=i;
751 rbleng += i;
752 if (rbleng==readbuffsize) {
753 readbuffsize*=2;
754 readbuff = realloc(readbuff,readbuffsize);
755 passert(readbuff);
756 } else {
757 break;
758 }
759 }
760 }
761
762 if (rbleng>0) {
763 eptr->lastread = now;
764 }
765
766 rbpos = 0;
767 while (rbpos<rbleng) {
768 if ((rbleng-rbpos)>=eptr->input_bytesleft) {
769 memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
770 i = eptr->input_bytesleft;
771 } else {
772 memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
773 i = rbleng-rbpos;
774 }
775 rbpos += i;
776 eptr->input_startptr+=i;
777 eptr->input_bytesleft-=i;
778
779 if (eptr->input_bytesleft>0) {
780 break;
781 }
782
783 if (eptr->input_packet == NULL) {
784 ptr = eptr->input_hdr;
785 type = get32bit(&ptr);
786 leng = get32bit(&ptr);
787
788 if (leng>MaxPacketSize) {
789 syslog(LOG_WARNING,"Master packet too long (%"PRIu32"/%u)",leng,MaxPacketSize);
790 eptr->input_end = 1;
791 return;
792 }
793
794 eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
795 passert(eptr->input_packet);
796 eptr->input_packet->next = NULL;
797 eptr->input_packet->type = type;
798 eptr->input_packet->leng = leng;
799
800 eptr->input_startptr = eptr->input_packet->data;
801 eptr->input_bytesleft = leng;
802 }
803
804 if (eptr->input_bytesleft>0) {
805 continue;
806 }
807
808 if (eptr->input_packet != NULL) {
809 *(eptr->inputtail) = eptr->input_packet;
810 eptr->inputtail = &(eptr->input_packet->next);
811 eptr->input_packet = NULL;
812 eptr->input_bytesleft = 8;
813 eptr->input_startptr = eptr->input_hdr;
814 }
815 }
816
817 if (hup) {
818 syslog(LOG_NOTICE,"connection was reset by Master");
819 eptr->input_end = 1;
820 } else if (err) {
821 mfs_errlog_silent(LOG_NOTICE,"read from Master error");
822 eptr->input_end = 1;
823 }
824 }
825
masterconn_parse(masterconn * eptr)826 void masterconn_parse(masterconn *eptr) {
827 in_packetstruct *ipack;
828 uint64_t starttime;
829 uint64_t currtime;
830
831 starttime = monotonic_useconds();
832 currtime = starttime;
833 while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
834 masterconn_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
835 eptr->inputhead = ipack->next;
836 free(ipack);
837 if (eptr->inputhead==NULL) {
838 eptr->inputtail = &(eptr->inputhead);
839 } else {
840 currtime = monotonic_useconds();
841 }
842 }
843 if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
844 eptr->mode = KILL;
845 }
846 }
847
masterconn_write(masterconn * eptr,double now)848 void masterconn_write(masterconn *eptr,double now) {
849 out_packetstruct *opack;
850 int32_t i;
851 #ifdef HAVE_WRITEV
852 struct iovec iovtab[100];
853 uint32_t iovdata;
854 uint32_t leng;
855 uint32_t left;
856
857 for (;;) {
858 leng = 0;
859 for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
860 iovtab[iovdata].iov_base = opack->startptr;
861 iovtab[iovdata].iov_len = opack->bytesleft;
862 leng += opack->bytesleft;
863 }
864 if (iovdata==0) {
865 return;
866 }
867 i = writev(eptr->sock,iovtab,iovdata);
868 if (i<0) {
869 if (ERRNO_ERROR) {
870 mfs_errlog_silent(LOG_NOTICE,"write to Master error");
871 eptr->mode = KILL;
872 }
873 return;
874 }
875 if (i>0) {
876 eptr->lastwrite = now;
877 }
878 stats_bytesout+=i;
879 left = i;
880 while (left>0 && eptr->outputhead!=NULL) {
881 opack = eptr->outputhead;
882 if (opack->bytesleft>left) {
883 opack->startptr+=left;
884 opack->bytesleft-=left;
885 left = 0;
886 } else {
887 left -= opack->bytesleft;
888 eptr->outputhead = opack->next;
889 if (eptr->outputhead==NULL) {
890 eptr->outputtail = &(eptr->outputhead);
891 }
892 free(opack);
893 }
894 }
895 if ((uint32_t)i < leng) {
896 return;
897 }
898 }
899 #else
900 for (;;) {
901 opack = eptr->outputhead;
902 if (opack==NULL) {
903 return;
904 }
905 i=write(eptr->sock,opack->startptr,opack->bytesleft);
906 if (i<0) {
907 if (ERRNO_ERROR) {
908 mfs_errlog_silent(LOG_NOTICE,"write to Master error");
909 eptr->mode = KILL;
910 }
911 return;
912 }
913 if (i>0) {
914 eptr->lastwrite = now;
915 }
916 stats_bytesout+=i;
917 opack->startptr+=i;
918 opack->bytesleft-=i;
919 if (opack->bytesleft>0) {
920 return;
921 }
922 eptr->outputhead = opack->next;
923 if (eptr->outputhead==NULL) {
924 eptr->outputtail = &(eptr->outputhead);
925 }
926 free(opack);
927 }
928 #endif
929 }
930
931
masterconn_desc(struct pollfd * pdesc,uint32_t * ndesc)932 void masterconn_desc(struct pollfd *pdesc,uint32_t *ndesc) {
933 uint32_t pos = *ndesc;
934 masterconn *eptr = masterconnsingleton;
935
936 eptr->pdescpos = -1;
937 if (eptr->mode==FREE || eptr->sock<0) {
938 return;
939 }
940 pdesc[pos].events = 0;
941 if (eptr->mode==DATA && eptr->input_end==0) {
942 pdesc[pos].events |= POLLIN;
943 }
944 if ((eptr->mode==DATA && eptr->outputhead!=NULL) || eptr->mode==CONNECTING) {
945 pdesc[pos].events |= POLLOUT;
946 }
947 if (pdesc[pos].events!=0) {
948 pdesc[pos].fd = eptr->sock;
949 eptr->pdescpos = pos;
950 pos++;
951 }
952 *ndesc = pos;
953 }
954
masterconn_disconnection_check(void)955 void masterconn_disconnection_check(void) {
956 masterconn *eptr = masterconnsingleton;
957 in_packetstruct *ipptr,*ipaptr;
958 out_packetstruct *opptr,*opaptr;
959
960 if (eptr->mode == KILL) {
961 masterconn_beforeclose(eptr);
962 tcpclose(eptr->sock);
963 if (eptr->input_packet) {
964 free(eptr->input_packet);
965 }
966 ipptr = eptr->inputhead;
967 while (ipptr) {
968 ipaptr = ipptr;
969 ipptr = ipptr->next;
970 free(ipaptr);
971 }
972 opptr = eptr->outputhead;
973 while (opptr) {
974 opaptr = opptr;
975 opptr = opptr->next;
976 free(opaptr);
977 }
978 eptr->mode = FREE;
979 }
980 }
981
masterconn_serve(struct pollfd * pdesc)982 void masterconn_serve(struct pollfd *pdesc) {
983 double now;
984 masterconn *eptr = masterconnsingleton;
985
986 now = monotonic_seconds();
987
988 if (eptr->mode==CONNECTING) {
989 if (eptr->sock>=0 && eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLOUT | POLLHUP | POLLERR))) { // FD_ISSET(eptr->sock,wset)) {
990 masterconn_connecttest(eptr);
991 } else if (eptr->conntime+1.0 < now) {
992 masterconn_connecttimeout(eptr);
993 }
994 } else {
995 if (eptr->pdescpos>=0) {
996 if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode==DATA) {
997 masterconn_read(eptr,now);
998 }
999 if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1000 eptr->input_end = 1;
1001 }
1002 masterconn_parse(eptr);
1003 }
1004 if (eptr->mode==DATA && eptr->lastwrite+(Timeout/3.0)<now && eptr->outputhead==NULL) {
1005 masterconn_createpacket(eptr,ANTOAN_NOP,0);
1006 }
1007 if (eptr->pdescpos>=0) {
1008 if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode==DATA) {
1009 masterconn_write(eptr,now);
1010 }
1011 }
1012 if (eptr->mode==DATA && eptr->lastread+Timeout<now) {
1013 eptr->mode = KILL;
1014 }
1015 }
1016 masterconn_disconnection_check();
1017 }
1018
masterconn_reconnect(void)1019 void masterconn_reconnect(void) {
1020 masterconn *eptr = masterconnsingleton;
1021 if (eptr->mode==FREE) {
1022 masterconn_initconnect(eptr);
1023 }
1024 }
1025
masterconn_term(void)1026 void masterconn_term(void) {
1027 masterconn *eptr = masterconnsingleton;
1028 in_packetstruct *ipptr,*ipaptr;
1029 out_packetstruct *opptr,*opaptr;
1030
1031 if (eptr->mode!=FREE) {
1032 tcpclose(eptr->sock);
1033 if (eptr->mode!=CONNECTING) {
1034 if (eptr->input_packet) {
1035 free(eptr->input_packet);
1036 }
1037 ipptr = eptr->inputhead;
1038 while (ipptr) {
1039 ipaptr = ipptr;
1040 ipptr = ipptr->next;
1041 free(ipaptr);
1042 }
1043 opptr = eptr->outputhead;
1044 while (opptr) {
1045 opaptr = opptr;
1046 opptr = opptr->next;
1047 free(opaptr);
1048 }
1049 }
1050 }
1051
1052 masterconn_read(NULL,0.0); // free internal read buffer
1053
1054 free(eptr);
1055
1056 free(MasterHost);
1057 free(MasterPort);
1058 free(BindHost);
1059 masterconnsingleton = NULL;
1060 }
1061
masterconn_reload(void)1062 void masterconn_reload(void) {
1063 masterconn *eptr = masterconnsingleton;
1064 uint32_t ReconnectionDelay;
1065 uint32_t MetaDLFreq;
1066
1067 free(MasterHost);
1068 free(MasterPort);
1069 free(BindHost);
1070
1071 MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1072 MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CONTROL_PORT);
1073 BindHost = cfg_getstr("BIND_HOST","*");
1074
1075 eptr->masteraddrvalid = 0;
1076 if (eptr->mode!=FREE) {
1077 eptr->mode = KILL;
1078 }
1079
1080 Timeout = cfg_getuint32("MASTER_TIMEOUT",10);
1081 BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1082 BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",3);
1083
1084 ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1085 MetaDLFreq = cfg_getuint32("META_DOWNLOAD_FREQ",24);
1086
1087 if (Timeout>65535) {
1088 Timeout=65535;
1089 }
1090 if (Timeout<10) {
1091 Timeout=10;
1092 }
1093 if (BackLogsNumber<5) {
1094 BackLogsNumber=5;
1095 }
1096 if (BackLogsNumber>10000) {
1097 BackLogsNumber=10000;
1098 }
1099 if (MetaDLFreq>(BackLogsNumber/2)) {
1100 MetaDLFreq=BackLogsNumber/2;
1101 }
1102 if (BackMetaCopies>99) {
1103 BackMetaCopies=99;
1104 }
1105
1106 main_time_change(reconnect_hook,ReconnectionDelay,0);
1107 main_time_change(download_hook,MetaDLFreq*3600,630);
1108 }
1109
masterconn_init(void)1110 int masterconn_init(void) {
1111 uint32_t ReconnectionDelay;
1112 uint32_t MetaDLFreq;
1113 masterconn *eptr;
1114
1115
1116 ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1117 MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1118 MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CONTROL_PORT);
1119 BindHost = cfg_getstr("BIND_HOST","*");
1120 Timeout = cfg_getuint32("MASTER_TIMEOUT",10);
1121 BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1122 BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",3);
1123 MetaDLFreq = cfg_getuint32("META_DOWNLOAD_FREQ",24);
1124
1125 if (Timeout>65535) {
1126 Timeout=65535;
1127 }
1128 if (Timeout<10) {
1129 Timeout=10;
1130 }
1131 if (BackLogsNumber<5) {
1132 BackLogsNumber=5;
1133 }
1134 if (BackLogsNumber>10000) {
1135 BackLogsNumber=10000;
1136 }
1137 if (MetaDLFreq>(BackLogsNumber/2)) {
1138 MetaDLFreq=BackLogsNumber/2;
1139 }
1140 eptr = masterconnsingleton = malloc(sizeof(masterconn));
1141 passert(eptr);
1142
1143 eptr->masteraddrvalid = 0;
1144 eptr->mode = FREE;
1145 eptr->pdescpos = -1;
1146 eptr->logfd = NULL;
1147 eptr->metafd = -1;
1148 eptr->oldmode = 0;
1149
1150 masterconn_findlastlogversion();
1151 if (masterconn_initconnect(eptr)<0) {
1152 return -1;
1153 }
1154 reconnect_hook = main_time_register(ReconnectionDelay,0,masterconn_reconnect);
1155 download_hook = main_time_register(MetaDLFreq*3600,630,masterconn_metadownloadinit);
1156 main_destruct_register(masterconn_term);
1157 main_poll_register(masterconn_desc,masterconn_serve);
1158 main_reload_register(masterconn_reload);
1159 main_time_register(1,0,masterconn_metachanges_flush);
1160 return 0;
1161 }
1162