1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <time.h>
26 #include <stddef.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <sys/uio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #ifdef HAVE_WRITEV
40 #include <sys/uio.h>
41 #endif
42 
43 #include "MFSCommunication.h"
44 #include "datapack.h"
45 #include "masterconn.h"
46 #include "crc.h"
47 #include "cfg.h"
48 #include "main.h"
49 #include "slogger.h"
50 #include "massert.h"
51 #include "sockets.h"
52 #include "clocks.h"
53 
54 #define MaxPacketSize ANTOMA_MAXPACKETSIZE
55 
56 #define META_DL_BLOCK ((((MATOAN_MAXPACKETSIZE) - 1000) < 1000000) ? ((MATOAN_MAXPACKETSIZE) - 1000) : 1000000)
57 
58 // mode
59 enum {FREE,CONNECTING,DATA,KILL};
60 
61 typedef struct out_packetstruct {
62 	struct out_packetstruct *next;
63 	uint8_t *startptr;
64 	uint32_t bytesleft;
65 	uint8_t data[1];
66 } out_packetstruct;
67 
68 typedef struct in_packetstruct {
69 	struct in_packetstruct *next;
70 	uint32_t type,leng;
71 	uint8_t data[1];
72 } in_packetstruct;
73 
74 typedef struct masterconn {
75 	uint8_t mode;
76 	int sock;
77 	int32_t pdescpos;
78 	double lastread,lastwrite,conntime;
79 	uint8_t input_hdr[8];
80 	uint8_t *input_startptr;
81 	uint32_t input_bytesleft;
82 	uint8_t input_end;
83 	in_packetstruct *input_packet;
84 	in_packetstruct *inputhead,**inputtail;
85 	out_packetstruct *outputhead,**outputtail;
86 
87 	uint32_t bindip;
88 	uint32_t masterip;
89 	uint16_t masterport;
90 	uint8_t masteraddrvalid;
91 
92 	uint8_t downloadretrycnt;
93 	uint8_t downloading;
94 	uint8_t oldmode;
95 	FILE *logfd;	// using stdio because this is text file
96 	int metafd;	// using standard unix I/O because this is binary file
97 	uint64_t filesize;
98 	uint64_t dloffset;
99 	uint64_t dlstartuts;
100 } masterconn;
101 
102 static masterconn *masterconnsingleton=NULL;
103 
104 // from config
105 static uint32_t BackLogsNumber;
106 static uint32_t BackMetaCopies;
107 static char *MasterHost;
108 static char *MasterPort;
109 static char *BindHost;
110 static uint32_t Timeout;
111 static void *reconnect_hook;
112 static void *download_hook;
113 static uint64_t lastlogversion=0;
114 
115 static uint32_t stats_bytesout=0;
116 static uint32_t stats_bytesin=0;
117 
masterconn_stats(uint32_t * bin,uint32_t * bout)118 void masterconn_stats(uint32_t *bin,uint32_t *bout) {
119 	*bin = stats_bytesin;
120 	*bout = stats_bytesout;
121 	stats_bytesin = 0;
122 	stats_bytesout = 0;
123 }
124 
masterconn_findlastlogversion(void)125 void masterconn_findlastlogversion(void) {
126 	struct stat st;
127 	uint8_t buff[32800];	// 32800 = 32768 + 32
128 	uint64_t size;
129 	uint32_t buffpos;
130 	uint64_t lastnewline;
131 	int fd;
132 
133 	lastlogversion = 0;
134 
135 	if (stat("metadata_ml.mfs.back",&st)<0 || st.st_size==0 || (st.st_mode & S_IFMT)!=S_IFREG) {
136 		return;
137 	}
138 
139 	fd = open("changelog_ml.0.back",O_RDWR);
140 	if (fd<0) {
141 		return;
142 	}
143 	fstat(fd,&st);
144 	size = st.st_size;
145 	memset(buff,0,32);
146 	lastnewline = 0;
147 	while (size>0 && size+200000>(uint64_t)(st.st_size)) {
148 		if (size>32768) {
149 			memcpy(buff+32768,buff,32);
150 			size-=32768;
151 			lseek(fd,size,SEEK_SET);
152 			if (read(fd,buff,32768)!=32768) {
153 				lastlogversion = 0;
154 				close(fd);
155 				return;
156 			}
157 			buffpos = 32768;
158 		} else {
159 			memmove(buff+size,buff,32);
160 			lseek(fd,0,SEEK_SET);
161 			if (read(fd,buff,size)!=(ssize_t)size) {
162 				lastlogversion = 0;
163 				close(fd);
164 				return;
165 			}
166 			buffpos = size;
167 			size = 0;
168 		}
169 		// size = position in file of first byte in buff
170 		// buffpos = position of last byte in buff to search
171 		while (buffpos>0) {
172 			buffpos--;
173 			if (buff[buffpos]=='\n') {
174 				if (lastnewline==0) {
175 					lastnewline = size + buffpos;
176 				} else {
177 					if (lastnewline+1 != (uint64_t)(st.st_size)) {	// garbage at the end of file - truncate
178 						if (ftruncate(fd,lastnewline+1)<0) {
179 							lastlogversion = 0;
180 							close(fd);
181 							return;
182 						}
183 					}
184 					buffpos++;
185 					while (buffpos<32800 && buff[buffpos]>='0' && buff[buffpos]<='9') {
186 						lastlogversion *= 10;
187 						lastlogversion += buff[buffpos]-'0';
188 						buffpos++;
189 					}
190 					if (buffpos==32800 || buff[buffpos]!=':') {
191 						lastlogversion = 0;
192 					}
193 					close(fd);
194 					return;
195 				}
196 			}
197 		}
198 	}
199 	close(fd);
200 	return;
201 }
202 
masterconn_createpacket(masterconn * eptr,uint32_t type,uint32_t size)203 uint8_t* masterconn_createpacket(masterconn *eptr,uint32_t type,uint32_t size) {
204 	out_packetstruct *outpacket;
205 	uint8_t *ptr;
206 	uint32_t psize;
207 
208 	psize = size+8;
209 	outpacket=malloc(offsetof(out_packetstruct,data)+psize);
210 	passert(outpacket);
211 	outpacket->bytesleft = psize;
212 	ptr = outpacket->data;
213 	put32bit(&ptr,type);
214 	put32bit(&ptr,size);
215 	outpacket->startptr = outpacket->data;
216 	outpacket->next = NULL;
217 	*(eptr->outputtail) = outpacket;
218 	eptr->outputtail = &(outpacket->next);
219 	return ptr;
220 }
221 
masterconn_sendregister(masterconn * eptr)222 void masterconn_sendregister(masterconn *eptr) {
223 	uint8_t *buff;
224 
225 	eptr->downloading=0;
226 	eptr->metafd=-1;
227 	eptr->logfd=NULL;
228 
229 	if (lastlogversion>0) {
230 		buff = masterconn_createpacket(eptr,ANTOMA_REGISTER,1+4+2+8);
231 		put8bit(&buff,2);
232 		put16bit(&buff,VERSMAJ);
233 		put8bit(&buff,VERSMID);
234 		put8bit(&buff,VERSMIN);
235 		put16bit(&buff,Timeout);
236 		put64bit(&buff,lastlogversion);
237 	} else {
238 		buff = masterconn_createpacket(eptr,ANTOMA_REGISTER,1+4+2);
239 		put8bit(&buff,1);
240 		put16bit(&buff,VERSMAJ);
241 		put8bit(&buff,VERSMID);
242 		put8bit(&buff,VERSMIN);
243 		put16bit(&buff,Timeout);
244 	}
245 }
246 
247 
masterconn_metachanges_log(masterconn * eptr,const uint8_t * data,uint32_t length)248 void masterconn_metachanges_log(masterconn *eptr,const uint8_t *data,uint32_t length) {
249 	char logname1[100],logname2[100];
250 	uint32_t i;
251 	uint64_t version;
252 	if (length==1 && data[0]==0x55) {
253 		if (eptr->logfd!=NULL) {
254 			fclose(eptr->logfd);
255 			eptr->logfd=NULL;
256 		}
257 		if (BackLogsNumber>0) {
258 			for (i=BackLogsNumber ; i>0 ; i--) {
259 				snprintf(logname1,100,"changelog_ml.%"PRIu32".mfs",i);
260 				snprintf(logname2,100,"changelog_ml.%"PRIu32".mfs",i-1);
261 				rename(logname2,logname1);
262 			}
263 		} else {
264 			unlink("changelog_ml.0.mfs");
265 		}
266 		return;
267 	}
268 	if (length<10) {
269 		syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - wrong size (%"PRIu32"/9+data)",length);
270 		eptr->mode = KILL;
271 		return;
272 	}
273 	if (data[0]!=0xFF) {
274 		syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - wrong packet");
275 		eptr->mode = KILL;
276 		return;
277 	}
278 	if (data[length-1]!='\0') {
279 		syslog(LOG_NOTICE,"MATOAN_METACHANGES_LOG - invalid string");
280 		eptr->mode = KILL;
281 		return;
282 	}
283 
284 	data++;
285 	version = get64bit(&data);
286 
287 	if (lastlogversion>0 && version!=lastlogversion+1) {
288 		syslog(LOG_WARNING, "some changes lost: [%"PRIu64"-%"PRIu64"], download metadata again",lastlogversion,version-1);
289 		if (eptr->logfd!=NULL) {
290 			fclose(eptr->logfd);
291 			eptr->logfd=NULL;
292 		}
293 		for (i=0 ; i<=BackLogsNumber ; i++) {
294 			snprintf(logname1,100,"changelog_ml.%"PRIu32".mfs",i);
295 			unlink(logname1);
296 		}
297 		lastlogversion = 0;
298 		eptr->mode = KILL;
299 		return;
300 	}
301 
302 	if (eptr->logfd==NULL) {
303 		eptr->logfd = fopen("changelog_ml.0.mfs","a");
304 	}
305 
306 	if (eptr->logfd) {
307 		fprintf(eptr->logfd,"%"PRIu64": %s\n",version,data);
308 		lastlogversion = version;
309 	} else {
310 		syslog(LOG_NOTICE,"lost MFS change %"PRIu64": %s",version,data);
311 	}
312 }
313 
masterconn_metachanges_flush(void)314 void masterconn_metachanges_flush(void) {
315 	masterconn *eptr = masterconnsingleton;
316 	if (eptr->logfd) {
317 		fflush(eptr->logfd);
318 	}
319 }
320 
masterconn_download_end(masterconn * eptr)321 int masterconn_download_end(masterconn *eptr) {
322 	eptr->downloading=0;
323 	masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_END,0);
324 	if (eptr->metafd>=0) {
325 		if (close(eptr->metafd)<0) {
326 			mfs_errlog_silent(LOG_NOTICE,"error closing metafile");
327 			eptr->metafd=-1;
328 			return -1;
329 		}
330 		eptr->metafd=-1;
331 	}
332 	return 0;
333 }
334 
masterconn_download_init(masterconn * eptr,uint8_t filenum)335 void masterconn_download_init(masterconn *eptr,uint8_t filenum) {
336 	uint8_t *ptr;
337 //	syslog(LOG_NOTICE,"download_init %d",filenum);
338 	if (eptr->mode==DATA && eptr->downloading==0) {
339 //		syslog(LOG_NOTICE,"sending packet");
340 		ptr = masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_START,1);
341 		put8bit(&ptr,filenum);
342 		eptr->downloading=filenum;
343 	}
344 }
345 
masterconn_metadownloadinit(void)346 void masterconn_metadownloadinit(void) {
347 	masterconn_download_init(masterconnsingleton,1);
348 }
349 
masterconn_metadata_check(char * name)350 int masterconn_metadata_check(char *name) {
351 	int fd;
352 	char chkbuff[16];
353 	char eofmark[16];
354 	const uint8_t *rptr;
355 	uint64_t metaversion,metaid;
356 	fd = open(name,O_RDONLY);
357 	if (fd<0) {
358 		syslog(LOG_WARNING,"can't open downloaded metadata");
359 		return -1;
360 	}
361 	if (read(fd,chkbuff,8)!=8) {
362 		syslog(LOG_WARNING,"can't read downloaded metadata");
363 		close(fd);
364 		return -1;
365 	}
366 	if (memcmp(chkbuff,"MFSM NEW",8)==0) { // silently ignore "new file"
367 		close(fd);
368 		return -1;
369 	}
370 	if (memcmp(chkbuff,MFSSIGNATURE "M ",5)==0 && chkbuff[5]>='1' && chkbuff[5]<='9' && chkbuff[6]=='.' && chkbuff[7]>='0' && chkbuff[7]<='9') {
371 		uint8_t fver = ((chkbuff[5]-'0')<<4)+(chkbuff[7]-'0');
372 		if (fver<0x17) {
373 			memset(eofmark,0,16);
374 		} else {
375 			memcpy(eofmark,"[MFS EOF MARKER]",16);
376 			if (fver>=0x20) {
377 				if (read(fd,chkbuff,16)!=16) {
378 					syslog(LOG_WARNING,"can't read downloaded metadata");
379 					close(fd);
380 					return -1;
381 				}
382 				rptr = (uint8_t*)chkbuff;
383 				metaversion = get64bit(&rptr);
384 				metaid = get64bit(&rptr);
385 				syslog(LOG_NOTICE,"meta data version: %"PRIu64", meta data id: 0x%016"PRIX64,metaversion,metaid);
386 			}
387 		}
388 	} else {
389 		syslog(LOG_WARNING,"bad metadata file format");
390 		close(fd);
391 		return -1;
392 	}
393 	lseek(fd,-16,SEEK_END);
394 	if (read(fd,chkbuff,16)!=16) {
395 		syslog(LOG_WARNING,"can't read downloaded metadata");
396 		close(fd);
397 		return -1;
398 	}
399 	close(fd);
400 	if (memcmp(chkbuff,eofmark,16)!=0) {
401 		syslog(LOG_WARNING,"truncated metadata file !!!");
402 		return -1;
403 	}
404 	return 0;
405 }
406 
masterconn_download_next(masterconn * eptr)407 void masterconn_download_next(masterconn *eptr) {
408 	uint8_t *ptr;
409 	uint8_t filenum;
410 	int64_t dltime;
411 	if (eptr->dloffset>=eptr->filesize) {	// end of file
412 		filenum = eptr->downloading;
413 		if (masterconn_download_end(eptr)<0) {
414 			return;
415 		}
416 		dltime = monotonic_useconds()-eptr->dlstartuts;
417 		if (dltime<=0) {
418 			dltime=1;
419 		}
420 		syslog(LOG_NOTICE,"%s downloaded %"PRIu64"B/%"PRIu64".%06"PRIu32"s (%.3lf MB/s)",(filenum==1)?"metadata":(filenum==11)?"changelog_0":(filenum==12)?"changelog_1":"???",eptr->filesize,dltime/1000000,(uint32_t)(dltime%1000000),(double)(eptr->filesize)/(double)(dltime));
421 		if (filenum==1) {
422 			if (masterconn_metadata_check("metadata_ml.tmp")==0) {
423 				if (BackMetaCopies>0) {
424 					char metaname1[100],metaname2[100];
425 					int i;
426 					for (i=BackMetaCopies-1 ; i>0 ; i--) {
427 						snprintf(metaname1,100,"metadata_ml.mfs.back.%"PRIu32,i+1);
428 						snprintf(metaname2,100,"metadata_ml.mfs.back.%"PRIu32,i);
429 						rename(metaname2,metaname1);
430 					}
431 					rename("metadata_ml.mfs.back","metadata_ml.mfs.back.1");
432 				}
433 				if (rename("metadata_ml.tmp","metadata_ml.mfs.back")<0) {
434 					syslog(LOG_NOTICE,"can't rename downloaded metadata - do it manually before next download");
435 				}
436 			}
437 			if (eptr->oldmode==0) {
438 				masterconn_download_init(eptr,11);
439 			}
440 		} else if (filenum==11) {
441 			if (rename("changelog_ml.tmp","changelog_ml_back.0.mfs")<0) {
442 				syslog(LOG_NOTICE,"can't rename downloaded changelog - do it manually before next download");
443 			}
444 			masterconn_download_init(eptr,12);
445 		} else if (filenum==12) {
446 			if (rename("changelog_ml.tmp","changelog_ml_back.1.mfs")<0) {
447 				syslog(LOG_NOTICE,"can't rename downloaded changelog - do it manually before next download");
448 			}
449 		}
450 	} else {	// send request for next data packet
451 		ptr = masterconn_createpacket(eptr,ANTOMA_DOWNLOAD_REQUEST,12);
452 		put64bit(&ptr,eptr->dloffset);
453 		if (eptr->filesize-eptr->dloffset>META_DL_BLOCK) {
454 			put32bit(&ptr,META_DL_BLOCK);
455 		} else {
456 			put32bit(&ptr,eptr->filesize-eptr->dloffset);
457 		}
458 	}
459 }
460 
masterconn_download_info(masterconn * eptr,const uint8_t * data,uint32_t length)461 void masterconn_download_info(masterconn *eptr,const uint8_t *data,uint32_t length) {
462 	if (length!=1 && length!=8) {
463 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_INFO - wrong size (%"PRIu32"/1|8)",length);
464 		eptr->mode = KILL;
465 		return;
466 	}
467 	passert(data);
468 	if (length==1) {
469 		eptr->downloading = 0;
470 		syslog(LOG_NOTICE,"download start error");
471 		return;
472 	}
473 	eptr->filesize = get64bit(&data);
474 	eptr->dloffset = 0;
475 	eptr->downloadretrycnt = 0;
476 	eptr->dlstartuts = monotonic_useconds();
477 	if (eptr->downloading==1) {
478 		eptr->metafd = open("metadata_ml.tmp",O_WRONLY | O_TRUNC | O_CREAT,0666);
479 	} else if (eptr->downloading==11 || eptr->downloading==12) {
480 		eptr->metafd = open("changelog_ml.tmp",O_WRONLY | O_TRUNC | O_CREAT,0666);
481 	} else {
482 		syslog(LOG_NOTICE,"unexpected MATOAN_DOWNLOAD_INFO packet");
483 		eptr->mode = KILL;
484 		return;
485 	}
486 	if (eptr->metafd<0) {
487 		mfs_errlog_silent(LOG_NOTICE,"error opening metafile");
488 		masterconn_download_end(eptr);
489 		return;
490 	}
491 	masterconn_download_next(eptr);
492 }
493 
masterconn_download_data(masterconn * eptr,const uint8_t * data,uint32_t length)494 void masterconn_download_data(masterconn *eptr,const uint8_t *data,uint32_t length) {
495 	uint64_t offset;
496 	uint32_t leng;
497 	uint32_t crc;
498 	ssize_t ret;
499 	if (eptr->metafd<0) {
500 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - file not opened");
501 		eptr->mode = KILL;
502 		return;
503 	}
504 	if (length<16) {
505 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - wrong size (%"PRIu32"/16+data)",length);
506 		eptr->mode = KILL;
507 		return;
508 	}
509 	passert(data);
510 	offset = get64bit(&data);
511 	leng = get32bit(&data);
512 	crc = get32bit(&data);
513 	if (leng+16!=length) {
514 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - wrong size (%"PRIu32"/16+%"PRIu32")",length,leng);
515 		eptr->mode = KILL;
516 		return;
517 	}
518 	if (offset!=eptr->dloffset) {
519 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - unexpected file offset (%"PRIu64"/%"PRIu64")",offset,eptr->dloffset);
520 		eptr->mode = KILL;
521 		return;
522 	}
523 	if (offset+leng>eptr->filesize) {
524 		syslog(LOG_NOTICE,"MATOAN_DOWNLOAD_DATA - unexpected file size (%"PRIu64"/%"PRIu64")",offset+leng,eptr->filesize);
525 		eptr->mode = KILL;
526 		return;
527 	}
528 #ifdef HAVE_PWRITE
529 	ret = pwrite(eptr->metafd,data,leng,offset);
530 #else /* HAVE_PWRITE */
531 	lseek(eptr->metafd,offset,SEEK_SET);
532 	ret = write(eptr->metafd,data,leng);
533 #endif /* HAVE_PWRITE */
534 	if (ret!=(ssize_t)leng) {
535 		mfs_errlog_silent(LOG_NOTICE,"error writing metafile");
536 		if (eptr->downloadretrycnt>=5) {
537 			masterconn_download_end(eptr);
538 		} else {
539 			eptr->downloadretrycnt++;
540 			masterconn_download_next(eptr);
541 		}
542 		return;
543 	}
544 	if (crc!=mycrc32(0,data,leng)) {
545 		syslog(LOG_NOTICE,"metafile data crc error");
546 		if (eptr->downloadretrycnt>=5) {
547 			masterconn_download_end(eptr);
548 		} else {
549 			eptr->downloadretrycnt++;
550 			masterconn_download_next(eptr);
551 		}
552 		return;
553 	}
554 	if (fsync(eptr->metafd)<0) {
555 		mfs_errlog_silent(LOG_NOTICE,"error syncing metafile");
556 		if (eptr->downloadretrycnt>=5) {
557 			masterconn_download_end(eptr);
558 		} else {
559 			eptr->downloadretrycnt++;
560 			masterconn_download_next(eptr);
561 		}
562 		return;
563 	}
564 	eptr->dloffset+=leng;
565 	eptr->downloadretrycnt=0;
566 	masterconn_download_next(eptr);
567 }
568 
masterconn_beforeclose(masterconn * eptr)569 void masterconn_beforeclose(masterconn *eptr) {
570 	if (eptr->downloading==11 || eptr->downloading==12) {	// old (version less than 1.6.18) master patch
571 		syslog(LOG_WARNING,"old master detected - please upgrade your master server and then restart metalogger");
572 		eptr->oldmode=1;
573 	}
574 	if (eptr->metafd>=0) {
575 		close(eptr->metafd);
576 		eptr->metafd=-1;
577 		unlink("metadata_ml.tmp");
578 		unlink("changelog_ml.tmp");
579 	}
580 	if (eptr->logfd) {
581 		fclose(eptr->logfd);
582 		eptr->logfd = NULL;
583 	}
584 }
585 
masterconn_gotpacket(masterconn * eptr,uint32_t type,const uint8_t * data,uint32_t length)586 void masterconn_gotpacket(masterconn *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
587 	switch (type) {
588 		case ANTOAN_NOP:
589 			break;
590 		case ANTOAN_UNKNOWN_COMMAND: // for future use
591 			break;
592 		case ANTOAN_BAD_COMMAND_SIZE: // for future use
593 			break;
594 		case MATOAN_METACHANGES_LOG:
595 			masterconn_metachanges_log(eptr,data,length);
596 			break;
597 		case MATOAN_DOWNLOAD_INFO:
598 			masterconn_download_info(eptr,data,length);
599 			break;
600 		case MATOAN_DOWNLOAD_DATA:
601 			masterconn_download_data(eptr,data,length);
602 			break;
603 		default:
604 			syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
605 			eptr->mode = KILL;
606 	}
607 }
608 
masterconn_connected(masterconn * eptr)609 void masterconn_connected(masterconn *eptr) {
610 	double now;
611 
612 	now = monotonic_seconds();
613 	tcpnodelay(eptr->sock);
614 	eptr->mode = DATA;
615 	eptr->lastread = now;
616 	eptr->lastwrite = now;
617 	eptr->input_bytesleft = 8;
618 	eptr->input_startptr = eptr->input_hdr;
619 	eptr->input_end = 0;
620 	eptr->input_packet = NULL;
621 	eptr->inputhead = NULL;
622 	eptr->inputtail = &(eptr->inputhead);
623 	eptr->outputhead = NULL;
624 	eptr->outputtail = &(eptr->outputhead);
625 
626 	masterconn_sendregister(eptr);
627 	if (lastlogversion==0) {
628 		masterconn_metadownloadinit();
629 	}
630 }
631 
masterconn_initconnect(masterconn * eptr)632 int masterconn_initconnect(masterconn *eptr) {
633 	int status;
634 	if (eptr->masteraddrvalid==0) {
635 		uint32_t mip,bip;
636 		uint16_t mport;
637 		if (tcpresolve(BindHost,NULL,&bip,NULL,1)<0) {
638 			bip = 0;
639 		}
640 		eptr->bindip = bip;
641 		if (tcpresolve(MasterHost,MasterPort,&mip,&mport,0)>=0) {
642 			eptr->masterip = mip;
643 			eptr->masterport = mport;
644 			eptr->masteraddrvalid = 1;
645 		} else {
646 			mfs_arg_syslog(LOG_WARNING,"can't resolve master host/port (%s:%s)",MasterHost,MasterPort);
647 			return -1;
648 		}
649 	}
650 	eptr->sock=tcpsocket();
651 	if (eptr->sock<0) {
652 		mfs_errlog(LOG_WARNING,"create socket, error");
653 		return -1;
654 	}
655 	if (tcpnonblock(eptr->sock)<0) {
656 		mfs_errlog(LOG_WARNING,"set nonblock, error");
657 		tcpclose(eptr->sock);
658 		eptr->sock = -1;
659 		return -1;
660 	}
661 	if (eptr->bindip>0) {
662 		if (tcpnumbind(eptr->sock,eptr->bindip,0)<0) {
663 			mfs_errlog(LOG_WARNING,"can't bind socket to given ip");
664 			tcpclose(eptr->sock);
665 			eptr->sock = -1;
666 			return -1;
667 		}
668 	}
669 	status = tcpnumconnect(eptr->sock,eptr->masterip,eptr->masterport);
670 	if (status<0) {
671 		mfs_errlog(LOG_WARNING,"connect failed, error");
672 		tcpclose(eptr->sock);
673 		eptr->sock = -1;
674 		eptr->masteraddrvalid = 0;
675 		return -1;
676 	}
677 	if (status==0) {
678 		syslog(LOG_NOTICE,"connected to Master immediately");
679 		masterconn_connected(eptr);
680 	} else {
681 		eptr->mode = CONNECTING;
682 		eptr->conntime = monotonic_seconds();
683 		syslog(LOG_NOTICE,"connecting ...");
684 	}
685 	return 0;
686 }
687 
masterconn_connecttimeout(masterconn * eptr)688 void masterconn_connecttimeout(masterconn *eptr) {
689 	syslog(LOG_WARNING,"connection timed out");
690 	tcpclose(eptr->sock);
691 	eptr->sock = -1;
692 	eptr->mode = FREE;
693 	eptr->masteraddrvalid = 0;
694 }
695 
masterconn_connecttest(masterconn * eptr)696 void masterconn_connecttest(masterconn *eptr) {
697 	int status;
698 
699 	status = tcpgetstatus(eptr->sock);
700 	if (status) {
701 		mfs_errlog_silent(LOG_WARNING,"connection failed, error");
702 		tcpclose(eptr->sock);
703 		eptr->sock = -1;
704 		eptr->mode = FREE;
705 		eptr->masteraddrvalid = 0;
706 	} else {
707 		syslog(LOG_NOTICE,"connected to Master");
708 		masterconn_connected(eptr);
709 	}
710 }
711 
masterconn_read(masterconn * eptr,double now)712 void masterconn_read(masterconn *eptr,double now) {
713 	int32_t i;
714 	uint32_t type,leng;
715 	const uint8_t *ptr;
716 	uint32_t rbleng,rbpos;
717 	uint8_t err,hup;
718 	static uint8_t *readbuff = NULL;
719 	static uint32_t readbuffsize = 0;
720 
721 	if (eptr == NULL) {
722 		if (readbuff != NULL) {
723 			free(readbuff);
724 		}
725 		readbuff = NULL;
726 		readbuffsize = 0;
727 		return;
728 	}
729 
730 	if (readbuffsize==0) {
731 		readbuffsize = 65536;
732 		readbuff = malloc(readbuffsize);
733 		passert(readbuff);
734 	}
735 
736 	rbleng = 0;
737 	err = 0;
738 	hup = 0;
739 	for (;;) {
740 		i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
741 		if (i==0) {
742 			hup = 1;
743 			break;
744 		} else if (i<0) {
745 			if (ERRNO_ERROR) {
746 				err = 1;
747 			}
748 			break;
749 		} else {
750 			stats_bytesin+=i;
751 			rbleng += i;
752 			if (rbleng==readbuffsize) {
753 				readbuffsize*=2;
754 				readbuff = realloc(readbuff,readbuffsize);
755 				passert(readbuff);
756 			} else {
757 				break;
758 			}
759 		}
760 	}
761 
762 	if (rbleng>0) {
763 		eptr->lastread = now;
764 	}
765 
766 	rbpos = 0;
767 	while (rbpos<rbleng) {
768 		if ((rbleng-rbpos)>=eptr->input_bytesleft) {
769 			memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
770 			i = eptr->input_bytesleft;
771 		} else {
772 			memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
773 			i = rbleng-rbpos;
774 		}
775 		rbpos += i;
776 		eptr->input_startptr+=i;
777 		eptr->input_bytesleft-=i;
778 
779 		if (eptr->input_bytesleft>0) {
780 			break;
781 		}
782 
783 		if (eptr->input_packet == NULL) {
784 			ptr = eptr->input_hdr;
785 			type = get32bit(&ptr);
786 			leng = get32bit(&ptr);
787 
788 			if (leng>MaxPacketSize) {
789 				syslog(LOG_WARNING,"Master packet too long (%"PRIu32"/%u)",leng,MaxPacketSize);
790 				eptr->input_end = 1;
791 				return;
792 			}
793 
794 			eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
795 			passert(eptr->input_packet);
796 			eptr->input_packet->next = NULL;
797 			eptr->input_packet->type = type;
798 			eptr->input_packet->leng = leng;
799 
800 			eptr->input_startptr = eptr->input_packet->data;
801 			eptr->input_bytesleft = leng;
802 		}
803 
804 		if (eptr->input_bytesleft>0) {
805 			continue;
806 		}
807 
808 		if (eptr->input_packet != NULL) {
809 			*(eptr->inputtail) = eptr->input_packet;
810 			eptr->inputtail = &(eptr->input_packet->next);
811 			eptr->input_packet = NULL;
812 			eptr->input_bytesleft = 8;
813 			eptr->input_startptr = eptr->input_hdr;
814 		}
815 	}
816 
817 	if (hup) {
818 		syslog(LOG_NOTICE,"connection was reset by Master");
819 		eptr->input_end = 1;
820 	} else if (err) {
821 		mfs_errlog_silent(LOG_NOTICE,"read from Master error");
822 		eptr->input_end = 1;
823 	}
824 }
825 
masterconn_parse(masterconn * eptr)826 void masterconn_parse(masterconn *eptr) {
827 	in_packetstruct *ipack;
828 	uint64_t starttime;
829 	uint64_t currtime;
830 
831 	starttime = monotonic_useconds();
832 	currtime = starttime;
833 	while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
834 		masterconn_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
835 		eptr->inputhead = ipack->next;
836 		free(ipack);
837 		if (eptr->inputhead==NULL) {
838 			eptr->inputtail = &(eptr->inputhead);
839 		} else {
840 			currtime = monotonic_useconds();
841 		}
842 	}
843 	if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
844 		eptr->mode = KILL;
845 	}
846 }
847 
masterconn_write(masterconn * eptr,double now)848 void masterconn_write(masterconn *eptr,double now) {
849 	out_packetstruct *opack;
850 	int32_t i;
851 #ifdef HAVE_WRITEV
852 	struct iovec iovtab[100];
853 	uint32_t iovdata;
854 	uint32_t leng;
855 	uint32_t left;
856 
857 	for (;;) {
858 		leng = 0;
859 		for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
860 			iovtab[iovdata].iov_base = opack->startptr;
861 			iovtab[iovdata].iov_len = opack->bytesleft;
862 			leng += opack->bytesleft;
863 		}
864 		if (iovdata==0) {
865 			return;
866 		}
867 		i = writev(eptr->sock,iovtab,iovdata);
868 		if (i<0) {
869 			if (ERRNO_ERROR) {
870 				mfs_errlog_silent(LOG_NOTICE,"write to Master error");
871 				eptr->mode = KILL;
872 			}
873 			return;
874 		}
875 		if (i>0) {
876 			eptr->lastwrite = now;
877 		}
878 		stats_bytesout+=i;
879 		left = i;
880 		while (left>0 && eptr->outputhead!=NULL) {
881 			opack = eptr->outputhead;
882 			if (opack->bytesleft>left) {
883 				opack->startptr+=left;
884 				opack->bytesleft-=left;
885 				left = 0;
886 			} else {
887 				left -= opack->bytesleft;
888 				eptr->outputhead = opack->next;
889 				if (eptr->outputhead==NULL) {
890 					eptr->outputtail = &(eptr->outputhead);
891 				}
892 				free(opack);
893 			}
894 		}
895 		if ((uint32_t)i < leng) {
896 			return;
897 		}
898 	}
899 #else
900 	for (;;) {
901 		opack = eptr->outputhead;
902 		if (opack==NULL) {
903 			return;
904 		}
905 		i=write(eptr->sock,opack->startptr,opack->bytesleft);
906 		if (i<0) {
907 			if (ERRNO_ERROR) {
908 				mfs_errlog_silent(LOG_NOTICE,"write to Master error");
909 				eptr->mode = KILL;
910 			}
911 			return;
912 		}
913 		if (i>0) {
914 			eptr->lastwrite = now;
915 		}
916 		stats_bytesout+=i;
917 		opack->startptr+=i;
918 		opack->bytesleft-=i;
919 		if (opack->bytesleft>0) {
920 			return;
921 		}
922 		eptr->outputhead = opack->next;
923 		if (eptr->outputhead==NULL) {
924 			eptr->outputtail = &(eptr->outputhead);
925 		}
926 		free(opack);
927 	}
928 #endif
929 }
930 
931 
masterconn_desc(struct pollfd * pdesc,uint32_t * ndesc)932 void masterconn_desc(struct pollfd *pdesc,uint32_t *ndesc) {
933 	uint32_t pos = *ndesc;
934 	masterconn *eptr = masterconnsingleton;
935 
936 	eptr->pdescpos = -1;
937 	if (eptr->mode==FREE || eptr->sock<0) {
938 		return;
939 	}
940 	pdesc[pos].events = 0;
941 	if (eptr->mode==DATA && eptr->input_end==0) {
942 		pdesc[pos].events |= POLLIN;
943 	}
944 	if ((eptr->mode==DATA && eptr->outputhead!=NULL) || eptr->mode==CONNECTING) {
945 		pdesc[pos].events |= POLLOUT;
946 	}
947 	if (pdesc[pos].events!=0) {
948 		pdesc[pos].fd = eptr->sock;
949 		eptr->pdescpos = pos;
950 		pos++;
951 	}
952 	*ndesc = pos;
953 }
954 
masterconn_disconnection_check(void)955 void masterconn_disconnection_check(void) {
956 	masterconn *eptr = masterconnsingleton;
957 	in_packetstruct *ipptr,*ipaptr;
958 	out_packetstruct *opptr,*opaptr;
959 
960 	if (eptr->mode == KILL) {
961 		masterconn_beforeclose(eptr);
962 		tcpclose(eptr->sock);
963 		if (eptr->input_packet) {
964 			free(eptr->input_packet);
965 		}
966 		ipptr = eptr->inputhead;
967 		while (ipptr) {
968 			ipaptr = ipptr;
969 			ipptr = ipptr->next;
970 			free(ipaptr);
971 		}
972 		opptr = eptr->outputhead;
973 		while (opptr) {
974 			opaptr = opptr;
975 			opptr = opptr->next;
976 			free(opaptr);
977 		}
978 		eptr->mode = FREE;
979 	}
980 }
981 
masterconn_serve(struct pollfd * pdesc)982 void masterconn_serve(struct pollfd *pdesc) {
983 	double now;
984 	masterconn *eptr = masterconnsingleton;
985 
986 	now = monotonic_seconds();
987 
988 	if (eptr->mode==CONNECTING) {
989 		if (eptr->sock>=0 && eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLOUT | POLLHUP | POLLERR))) { // FD_ISSET(eptr->sock,wset)) {
990 			masterconn_connecttest(eptr);
991 		} else if (eptr->conntime+1.0 < now) {
992 			masterconn_connecttimeout(eptr);
993 		}
994 	} else {
995 		if (eptr->pdescpos>=0) {
996 			if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode==DATA) {
997 				masterconn_read(eptr,now);
998 			}
999 			if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1000 				eptr->input_end = 1;
1001 			}
1002 			masterconn_parse(eptr);
1003 		}
1004 		if (eptr->mode==DATA && eptr->lastwrite+(Timeout/3.0)<now && eptr->outputhead==NULL) {
1005 			masterconn_createpacket(eptr,ANTOAN_NOP,0);
1006 		}
1007 		if (eptr->pdescpos>=0) {
1008 			if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode==DATA) {
1009 				masterconn_write(eptr,now);
1010 			}
1011 		}
1012 		if (eptr->mode==DATA && eptr->lastread+Timeout<now) {
1013 			eptr->mode = KILL;
1014 		}
1015 	}
1016 	masterconn_disconnection_check();
1017 }
1018 
masterconn_reconnect(void)1019 void masterconn_reconnect(void) {
1020 	masterconn *eptr = masterconnsingleton;
1021 	if (eptr->mode==FREE) {
1022 		masterconn_initconnect(eptr);
1023 	}
1024 }
1025 
masterconn_term(void)1026 void masterconn_term(void) {
1027 	masterconn *eptr = masterconnsingleton;
1028 	in_packetstruct *ipptr,*ipaptr;
1029 	out_packetstruct *opptr,*opaptr;
1030 
1031 	if (eptr->mode!=FREE) {
1032 		tcpclose(eptr->sock);
1033 		if (eptr->mode!=CONNECTING) {
1034 			if (eptr->input_packet) {
1035 				free(eptr->input_packet);
1036 			}
1037 			ipptr = eptr->inputhead;
1038 			while (ipptr) {
1039 				ipaptr = ipptr;
1040 				ipptr = ipptr->next;
1041 				free(ipaptr);
1042 			}
1043 			opptr = eptr->outputhead;
1044 			while (opptr) {
1045 				opaptr = opptr;
1046 				opptr = opptr->next;
1047 				free(opaptr);
1048 			}
1049 		}
1050 	}
1051 
1052 	masterconn_read(NULL,0.0); // free internal read buffer
1053 
1054 	free(eptr);
1055 
1056 	free(MasterHost);
1057 	free(MasterPort);
1058 	free(BindHost);
1059 	masterconnsingleton = NULL;
1060 }
1061 
masterconn_reload(void)1062 void masterconn_reload(void) {
1063 	masterconn *eptr = masterconnsingleton;
1064 	uint32_t ReconnectionDelay;
1065 	uint32_t MetaDLFreq;
1066 
1067 	free(MasterHost);
1068 	free(MasterPort);
1069 	free(BindHost);
1070 
1071 	MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1072 	MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CONTROL_PORT);
1073 	BindHost = cfg_getstr("BIND_HOST","*");
1074 
1075 	eptr->masteraddrvalid = 0;
1076 	if (eptr->mode!=FREE) {
1077 		eptr->mode = KILL;
1078 	}
1079 
1080 	Timeout = cfg_getuint32("MASTER_TIMEOUT",10);
1081 	BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1082 	BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",3);
1083 
1084 	ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1085 	MetaDLFreq = cfg_getuint32("META_DOWNLOAD_FREQ",24);
1086 
1087 	if (Timeout>65535) {
1088 		Timeout=65535;
1089 	}
1090 	if (Timeout<10) {
1091 		Timeout=10;
1092 	}
1093 	if (BackLogsNumber<5) {
1094 		BackLogsNumber=5;
1095 	}
1096 	if (BackLogsNumber>10000) {
1097 		BackLogsNumber=10000;
1098 	}
1099 	if (MetaDLFreq>(BackLogsNumber/2)) {
1100 		MetaDLFreq=BackLogsNumber/2;
1101 	}
1102 	if (BackMetaCopies>99) {
1103 		BackMetaCopies=99;
1104 	}
1105 
1106 	main_time_change(reconnect_hook,ReconnectionDelay,0);
1107 	main_time_change(download_hook,MetaDLFreq*3600,630);
1108 }
1109 
masterconn_init(void)1110 int masterconn_init(void) {
1111 	uint32_t ReconnectionDelay;
1112 	uint32_t MetaDLFreq;
1113 	masterconn *eptr;
1114 
1115 
1116 	ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1117 	MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1118 	MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CONTROL_PORT);
1119 	BindHost = cfg_getstr("BIND_HOST","*");
1120 	Timeout = cfg_getuint32("MASTER_TIMEOUT",10);
1121 	BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1122 	BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",3);
1123 	MetaDLFreq = cfg_getuint32("META_DOWNLOAD_FREQ",24);
1124 
1125 	if (Timeout>65535) {
1126 		Timeout=65535;
1127 	}
1128 	if (Timeout<10) {
1129 		Timeout=10;
1130 	}
1131 	if (BackLogsNumber<5) {
1132 		BackLogsNumber=5;
1133 	}
1134 	if (BackLogsNumber>10000) {
1135 		BackLogsNumber=10000;
1136 	}
1137 	if (MetaDLFreq>(BackLogsNumber/2)) {
1138 		MetaDLFreq=BackLogsNumber/2;
1139 	}
1140 	eptr = masterconnsingleton = malloc(sizeof(masterconn));
1141 	passert(eptr);
1142 
1143 	eptr->masteraddrvalid = 0;
1144 	eptr->mode = FREE;
1145 	eptr->pdescpos = -1;
1146 	eptr->logfd = NULL;
1147 	eptr->metafd = -1;
1148 	eptr->oldmode = 0;
1149 
1150 	masterconn_findlastlogversion();
1151 	if (masterconn_initconnect(eptr)<0) {
1152 		return -1;
1153 	}
1154 	reconnect_hook = main_time_register(ReconnectionDelay,0,masterconn_reconnect);
1155 	download_hook = main_time_register(MetaDLFreq*3600,630,masterconn_metadownloadinit);
1156 	main_destruct_register(masterconn_term);
1157 	main_poll_register(masterconn_desc,masterconn_serve);
1158 	main_reload_register(masterconn_reload);
1159 	main_time_register(1,0,masterconn_metachanges_flush);
1160 	return 0;
1161 }
1162