1 /*
2  * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <time.h>
26 #include <stddef.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <syslog.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <netinet/in.h>
38 #ifdef HAVE_WRITEV
39 #include <sys/uio.h>
40 #endif
41 
42 #include "MFSCommunication.h"
43 #include "datapack.h"
44 #include "masterconn.h"
45 #include "cfg.h"
46 #include "main.h"
47 #include "sockets.h"
48 #include "hddspacemgr.h"
49 #include "slogger.h"
50 #include "massert.h"
51 #include "random.h"
52 #include "bgjobs.h"
53 #include "csserv.h"
54 #include "clocks.h"
55 
56 #define MaxPacketSize MATOCS_MAXPACKETSIZE
57 
58 // has to be less than MaxPacketSize on master side divided by 8
59 #define LOSTCHUNKLIMIT 25000
60 // has to be less than MaxPacketSize on master side divided by 12
61 #define NEWCHUNKLIMIT 25000
62 
63 #define REPORT_LOAD_FREQ 60
64 #define REPORT_SPACE_FREQ 1
65 
66 // force disconnection X seconds after term signal
67 #define FORCE_DISCONNECTION_TO 5.0
68 
69 // mode
70 enum {FREE,CONNECTING,DATA,KILL,CLOSE};
71 
72 enum {IJ_GET_CHUNK_BLOCKS,IJ_GET_CHUNK_CHECKSUM,IJ_GET_CHUNK_CHECKSUM_TAB};
73 
74 // masterconn.registerstate
75 enum {UNREGISTERED,WAITING,INPROGRESS,REGISTERED};
76 
77 typedef struct idlejob {
78 	uint32_t jobid;
79 	uint8_t op;
80 	uint8_t valid;
81 	uint64_t chunkid;
82 	uint32_t version;
83 	struct idlejob *next,**prev;
84 	uint8_t buff[1];
85 } idlejob;
86 
87 typedef struct out_packetstruct {
88 	struct out_packetstruct *next;
89 	uint8_t *startptr;
90 	uint32_t bytesleft;
91 	uint32_t conncnt;
92 	uint8_t data[1];
93 } out_packetstruct;
94 
95 typedef struct in_packetstruct {
96 	struct in_packetstruct *next;
97 	uint32_t type,leng;
98 	uint8_t data[1];
99 } in_packetstruct;
100 
101 typedef struct masterconn {
102 	uint8_t mode;
103 	int sock;
104 	int32_t pdescpos;
105 	double lastread,lastwrite,conntime;
106 	uint8_t input_hdr[8];
107 	uint8_t *input_startptr;
108 	uint32_t input_bytesleft;
109 	uint8_t input_end;
110 	in_packetstruct *input_packet;
111 	in_packetstruct *inputhead,**inputtail;
112 	out_packetstruct *outputhead,**outputtail;
113 
114 	uint32_t masterversion;
115 	uint32_t conncnt;
116 	uint32_t bindip;
117 	uint32_t masterip;
118 	uint16_t masterport;
119 	uint16_t timeout;
120 	uint8_t masteraddrvalid;
121 	uint8_t registerstate;
122 	uint8_t new_register_mode;
123 //	uint8_t accepted;
124 } masterconn;
125 
126 static masterconn *masterconnsingleton=NULL;
127 static idlejob *idlejobs=NULL;
128 static uint8_t csidvalid = 0;
129 static void *reconnect_hook;
130 static void *manager_time_hook;
131 static double wantexittime = 0.0;
132 
133 static uint64_t stats_bytesout=0;
134 static uint64_t stats_bytesin=0;
135 
136 // from config
137 // static uint32_t BackLogsNumber;
138 static char *MasterHost;
139 static char *MasterPort;
140 static char *BindHost;
141 static uint32_t Timeout;
142 static uint16_t ChunkServerId = 0;
143 static uint64_t MetaFileId = 0;
144 
145 // static FILE *logfd;
146 
masterconn_stats(uint64_t * bin,uint64_t * bout)147 void masterconn_stats(uint64_t *bin,uint64_t *bout) {
148 	*bin = stats_bytesin;
149 	*bout = stats_bytesout;
150 	stats_bytesin = 0;
151 	stats_bytesout = 0;
152 }
153 
masterconn_initcsid(void)154 static inline void masterconn_initcsid(void) {
155 	int fd;
156 	uint8_t buff[10];
157 	const uint8_t *rptr;
158 	ssize_t ret;
159 	if (csidvalid) {
160 		return;
161 	}
162 	ChunkServerId = 0;
163 	MetaFileId = 0;
164 	csidvalid = 1;
165 	fd = open("chunkserverid.mfs",O_RDWR);
166 	if (fd>=0) {
167 		ret = read(fd,buff,10);
168 		rptr = buff;
169 		if (ret>=2) {
170 			ChunkServerId = get16bit(&rptr);
171 		}
172 		if (ret>=10) {
173 			MetaFileId = get64bit(&rptr);
174 		}
175 		close(fd);
176 	}
177 }
178 
masterconn_getcsid(void)179 uint16_t masterconn_getcsid(void) {
180 	masterconn_initcsid();
181 	return ChunkServerId;
182 }
183 
masterconn_getmetaid(void)184 uint64_t masterconn_getmetaid(void) {
185 	masterconn_initcsid();
186 	return MetaFileId;
187 }
188 
masterconn_setcsid(uint16_t csid,uint64_t metafileid)189 static inline void masterconn_setcsid(uint16_t csid,uint64_t metafileid) {
190 	int fd;
191 	uint8_t buff[10],*wptr;
192 	if (ChunkServerId!=csid || MetaFileId!=metafileid) {
193 		if (csid>0) {
194 			ChunkServerId = csid;
195 		}
196 		if (metafileid>0) {
197 			MetaFileId = metafileid;
198 		}
199 		wptr = buff;
200 		put16bit(&wptr,ChunkServerId);
201 		put64bit(&wptr,MetaFileId);
202 		fd = open("chunkserverid.mfs",O_CREAT | O_TRUNC | O_RDWR,0666);
203 		if (fd>=0) {
204 			if (write(fd,buff,10)!=10) {
205 				syslog(LOG_WARNING,"can't store chunkserver id (write error)");
206 			}
207 			close(fd);
208 		} else {
209 			syslog(LOG_WARNING,"can't store chunkserver id (open error)");
210 		}
211 	}
212 }
213 
masterconn_getmasterip(void)214 uint32_t masterconn_getmasterip(void) {
215 	masterconn *eptr = masterconnsingleton;
216 	if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
217 		return eptr->masterip;
218 	}
219 	return 0;
220 }
221 
masterconn_getmasterport(void)222 uint16_t masterconn_getmasterport(void) {
223 	masterconn *eptr = masterconnsingleton;
224 	if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
225 		return eptr->masterport;
226 	}
227 	return 0;
228 }
229 
masterconn_create_detached_packet(masterconn * eptr,uint32_t type,uint32_t size)230 void* masterconn_create_detached_packet(masterconn *eptr,uint32_t type,uint32_t size) {
231 	out_packetstruct *outpacket;
232 	uint8_t *ptr;
233 	uint32_t psize;
234 
235 	psize = size+8;
236 	outpacket=malloc(offsetof(out_packetstruct,data)+psize);
237 	passert(outpacket);
238 	outpacket->bytesleft = psize;
239 	ptr = outpacket->data;
240 	put32bit(&ptr,type);
241 	put32bit(&ptr,size);
242 	outpacket->startptr = outpacket->data;
243 	outpacket->next = NULL;
244 	outpacket->conncnt = eptr->conncnt;
245 	return outpacket;
246 }
247 
masterconn_get_packet_data(void * packet)248 uint8_t* masterconn_get_packet_data(void *packet) {
249 	out_packetstruct *outpacket = (out_packetstruct*)packet;
250 	return (outpacket->data+8);
251 }
252 
masterconn_delete_packet(void * packet)253 void masterconn_delete_packet(void *packet) {
254 	free(packet);
255 }
256 
masterconn_attach_packet(masterconn * eptr,void * packet)257 void masterconn_attach_packet(masterconn *eptr,void *packet) {
258 	out_packetstruct *outpacket = (out_packetstruct*)packet;
259 	*(eptr->outputtail) = outpacket;
260 	eptr->outputtail = &(outpacket->next);
261 }
262 
masterconn_create_attached_packet(masterconn * eptr,uint32_t type,uint32_t size)263 uint8_t* masterconn_create_attached_packet(masterconn *eptr,uint32_t type,uint32_t size) {
264 	out_packetstruct *outpacket;
265 	uint8_t *ptr;
266 	uint32_t psize;
267 
268 	psize = size+8;
269 	outpacket=malloc(offsetof(out_packetstruct,data)+psize);
270 	passert(outpacket);
271 	outpacket->bytesleft = psize;
272 	ptr = outpacket->data;
273 	put32bit(&ptr,type);
274 	put32bit(&ptr,size);
275 	outpacket->startptr = outpacket->data;
276 	outpacket->next = NULL;
277 	*(eptr->outputtail) = outpacket;
278 	eptr->outputtail = &(outpacket->next);
279 	return ptr;
280 }
281 
masterconn_sendregister(masterconn * eptr)282 void masterconn_sendregister(masterconn *eptr) {
283 	uint8_t *buff;
284 	uint32_t myip;
285 	uint16_t myport;
286 	uint64_t usedspace,totalspace;
287 	uint64_t tdusedspace,tdtotalspace;
288 	uint32_t chunkcount,tdchunkcount;
289 
290 
291 	myip = csserv_getlistenip();
292 	myport = csserv_getlistenport();
293 	if (eptr->new_register_mode) {
294 #ifdef MFSDEBUG
295 		syslog(LOG_NOTICE,"register ver. 6 - init + space info");
296 #endif
297 		hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
298 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2+2+8+8+4+8+8+4);
299 		put8bit(&buff,60);
300 		put32bit(&buff,VERSHEX);
301 		put32bit(&buff,myip);
302 		put16bit(&buff,myport);
303 		put16bit(&buff,Timeout);
304 		put16bit(&buff,masterconn_getcsid());
305 		put64bit(&buff,usedspace);
306 		put64bit(&buff,totalspace);
307 		put32bit(&buff,chunkcount);
308 		put64bit(&buff,tdusedspace);
309 		put64bit(&buff,tdtotalspace);
310 		put32bit(&buff,tdchunkcount);
311 	} else {
312 #ifdef MFSDEBUG
313 		syslog(LOG_NOTICE,"register ver. 5 - init");
314 #endif
315 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2);
316 		put8bit(&buff,50);
317 		put32bit(&buff,VERSHEX);
318 		put32bit(&buff,myip);
319 		put16bit(&buff,myport);
320 		if (Timeout>0) {
321 			put16bit(&buff,Timeout);
322 		} else {
323 			put16bit(&buff,10);
324 		}
325 	}
326 }
327 
masterconn_sendchunksinfo(masterconn * eptr)328 void masterconn_sendchunksinfo(masterconn *eptr) {
329 	uint8_t *buff;
330 	uint32_t chunks;
331 	uint64_t usedspace,totalspace;
332 	uint64_t tdusedspace,tdtotalspace;
333 	uint32_t chunkcount,tdchunkcount;
334 
335 #ifdef MFSDEBUG
336 	syslog(LOG_NOTICE,"register ver. %u - chunks info",(eptr->new_register_mode)?6:5);
337 #endif
338 	hdd_get_chunks_begin(0);
339 	while ((chunks = hdd_get_chunks_next_list_count())) {
340 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+chunks*(8+4));
341 		if (eptr->new_register_mode) {
342 			put8bit(&buff,61);
343 		} else {
344 			put8bit(&buff,51);
345 		}
346 		hdd_get_chunks_next_list_data(buff);
347 	}
348 	hdd_get_chunks_end();
349 	if (eptr->new_register_mode) {
350 #ifdef MFSDEBUG
351 		syslog(LOG_NOTICE,"register ver. 6 - end");
352 #endif
353 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1);
354 		put8bit(&buff,62);
355 	} else {
356 #ifdef MFSDEBUG
357 		syslog(LOG_NOTICE,"register ver. 5 - end + space info");
358 #endif
359 		hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
360 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+8+8+4+8+8+4);
361 		put8bit(&buff,52);
362 		put64bit(&buff,usedspace);
363 		put64bit(&buff,totalspace);
364 		put32bit(&buff,chunkcount);
365 		put64bit(&buff,tdusedspace);
366 		put64bit(&buff,tdtotalspace);
367 		put32bit(&buff,tdchunkcount);
368 	}
369 }
370 
masterconn_sendnextchunks(masterconn * eptr)371 void masterconn_sendnextchunks(masterconn *eptr) {
372 	uint8_t *buff;
373 	uint32_t chunks;
374 	chunks = hdd_get_chunks_next_list_count();
375 	if (chunks==0) {
376 		hdd_get_chunks_end();
377 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1);
378 		put8bit(&buff,62);
379 		eptr->registerstate = REGISTERED;
380 	} else {
381 		buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+chunks*(8+4));
382 		put8bit(&buff,61);
383 		hdd_get_chunks_next_list_data(buff);
384 	}
385 }
386 
masterconn_master_ack(masterconn * eptr,const uint8_t * data,uint32_t length)387 void masterconn_master_ack(masterconn *eptr,const uint8_t *data,uint32_t length) {
388 	uint8_t atype;
389 	uint64_t metafileid;
390 	uint16_t csid;
391 	if (length!=17 && length!=15 && length!=9 && length!=7 && length!=5 && length!=1) {
392 		syslog(LOG_NOTICE,"MATOCS_MASTER_ACK - wrong size (%"PRIu32"/1|5|7|9|15|17)",length);
393 		eptr->mode = KILL;
394 		return;
395 	}
396 	atype = get8bit(&data);
397 	if (atype==0) {
398 		csid = 0;
399 		metafileid = 0;
400 		if (length>=5) {
401 			eptr->masterversion = get32bit(&data);
402 		}
403 		if (length>=9) {
404 			if (Timeout==0) {
405 				eptr->timeout = get16bit(&data);
406 			} else {
407 				data+=2;
408 			}
409 			csid = get16bit(&data);
410 		}
411 		if (length>=17) {
412 			metafileid = get64bit(&data);
413 			if (metafileid>0 && MetaFileId>0 && metafileid!=MetaFileId) { // wrong MFS instance - abort
414 				syslog(LOG_WARNING,"MATOCS_MASTER_ACK - wrong meta data id. Can't connect to master");
415 				eptr->registerstate = REGISTERED; // do not switch to register ver. 5
416 				eptr->mode = KILL;
417 				return;
418 			}
419 		}
420 		if (csid>0 || metafileid>0) {
421 			masterconn_setcsid(csid,metafileid);
422 		}
423 		if (eptr->masterversion<VERSION2INT(2,0,0)) {
424 			if (eptr->registerstate != REGISTERED) {
425 				if (eptr->registerstate == INPROGRESS) {
426 					hdd_get_chunks_end();
427 				}
428 				eptr->registerstate = REGISTERED;
429 				masterconn_sendchunksinfo(eptr);
430 			}
431 		} else {
432 			if (eptr->registerstate == UNREGISTERED || eptr->registerstate == WAITING) {
433 				hdd_get_chunks_begin(1);
434 				eptr->registerstate = INPROGRESS;
435 			}
436 			if (eptr->registerstate == INPROGRESS) {
437 				masterconn_sendnextchunks(eptr);
438 			}
439 		}
440 	} else if (atype==1 && length==5) {
441 		uint32_t mip;
442 		mip = get32bit(&data);
443 		if (mip) {
444 			// redirect to leader
445 			eptr->masterip = mip;
446 			eptr->new_register_mode = 3;
447 			if (eptr->registerstate == INPROGRESS) {
448 				hdd_get_chunks_end();
449 			}
450 			eptr->registerstate = WAITING;
451 #ifdef MFSDEBUG
452 			syslog(LOG_NOTICE,"masterconn: redirected to other master");
453 #endif
454 		} else {
455 			// leader not known - just reconnect
456 			eptr->masteraddrvalid = 0;
457 			syslog(LOG_NOTICE,"masterconn: follower doesn't know who is the leader, reconnect to another master");
458 		}
459 		eptr->mode = CLOSE;
460 	} else if (atype==2 && (length==7 || length==15)) {
461 #ifdef MFSDEBUG
462 		syslog(LOG_NOTICE,"masterconn: wait for acceptance");
463 #endif
464 		if (eptr->registerstate == INPROGRESS) {
465 			hdd_get_chunks_end();
466 		}
467 		eptr->registerstate = WAITING;
468 		eptr->masterversion = get32bit(&data);
469 		if (Timeout==0) {
470 			eptr->timeout = get16bit(&data);
471 		} else {
472 			data+=2;
473 		}
474 		if (length>=15) {
475 			metafileid = get64bit(&data);
476 			if (metafileid>0 && MetaFileId>0 && metafileid!=MetaFileId) { // wrong MFS instance - abort
477 				syslog(LOG_WARNING,"MATOCS_MASTER_ACK - wrong meta data id. Can't connect to master");
478 				eptr->registerstate = REGISTERED; // do not switch to register ver. 5
479 				eptr->mode = KILL;
480 				return;
481 			}
482 		}
483 	} else {
484 		syslog(LOG_NOTICE,"MATOCS_MASTER_ACK - bad type/length: %u/%u",atype,length);
485 		eptr->mode = KILL;
486 	}
487 }
488 
489 /*
490 void masterconn_sendregister_v4(masterconn *eptr) {
491 	uint8_t *buff;
492 	uint32_t chunks,myip;
493 	uint16_t myport;
494 	uint64_t usedspace,totalspace;
495 	uint64_t tdusedspace,tdtotalspace;
496 	uint32_t chunkcount,tdchunkcount;
497 
498 	myip = csserv_getlistenip();
499 	myport = csserv_getlistenport();
500 	hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
501 	hdd_get_chunks_begin();
502 	chunks = hdd_get_chunks_count();
503 	buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2+8+8+4+8+8+4+chunks*(8+4));
504 	put8bit(&buff,4);
505 	put16bit(&buff,VERSMAJ);
506 	put8bit(&buff,VERSMID);
507 	put8bit(&buff,VERSMIN);
508 	put32bit(&buff,myip);
509 	put16bit(&buff,myport);
510 	put16bit(&buff,Timeout);
511 	put64bit(&buff,usedspace);
512 	put64bit(&buff,totalspace);
513 	put32bit(&buff,chunkcount);
514 	put64bit(&buff,tdusedspace);
515 	put64bit(&buff,tdtotalspace);
516 	put32bit(&buff,tdchunkcount);
517 	if (chunks>0) {
518 		hdd_get_chunks_data(buff);
519 	}
520 	hdd_get_chunks_end();
521 }
522 */
523 /*
524 void masterconn_send_space(uint64_t usedspace,uint64_t totalspace,uint32_t chunkcount,uint64_t tdusedspace,uint64_t tdtotalspace,uint32_t tdchunkcount) {
525 	uint8_t *buff;
526 	masterconn *eptr = masterconnsingleton;
527 
528 //	syslog(LOG_NOTICE,"%"PRIu64",%"PRIu64,usedspace,totalspace);
529 	if (eptr->mode==DATA || eptr->mode==HEADER) {
530 		buff = masterconn_create_attached_packet(eptr,CSTOMA_SPACE,8+8+4+8+8+4);
531 		if (buff) {
532 			put64bit(&buff,usedspace);
533 			put64bit(&buff,totalspace);
534 			put32bit(&buff,chunkcount);
535 			put64bit(&buff,tdusedspace);
536 			put64bit(&buff,tdtotalspace);
537 			put32bit(&buff,tdchunkcount);
538 		}
539 	}
540 }
541 */
542 /*
543 void masterconn_send_chunk_damaged(uint64_t chunkid) {
544 	uint8_t *buff;
545 	masterconn *eptr = masterconnsingleton;
546 	if (eptr->mode==DATA || eptr->mode==HEADER) {
547 		buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_DAMAGED,8);
548 		if (buff) {
549 			put64bit(&buff,chunkid);
550 		}
551 	}
552 }
553 
554 void masterconn_send_chunk_lost(uint64_t chunkid) {
555 	uint8_t *buff;
556 	masterconn *eptr = masterconnsingleton;
557 	if (eptr->mode==DATA || eptr->mode==HEADER) {
558 		buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_LOST,8);
559 		if (buff) {
560 			put64bit(&buff,chunkid);
561 		}
562 	}
563 }
564 
565 void masterconn_send_error_occurred() {
566 	masterconn *eptr = masterconnsingleton;
567 	if (eptr->mode==DATA || eptr->mode==HEADER) {
568 		masterconn_create_attached_packet(eptr,CSTOMA_ERROR_OCCURRED,0);
569 	}
570 }
571 */
572 
masterconn_check_hdd_space()573 void masterconn_check_hdd_space() {
574 	masterconn *eptr = masterconnsingleton;
575 	uint8_t *buff;
576 	if ((eptr->registerstate==REGISTERED || eptr->registerstate==INPROGRESS) && eptr->mode==DATA) {
577 		if (hdd_spacechanged()) {
578 			uint64_t usedspace,totalspace,tdusedspace,tdtotalspace;
579 			uint32_t chunkcount,tdchunkcount;
580 			buff = masterconn_create_attached_packet(eptr,CSTOMA_SPACE,8+8+4+8+8+4);
581 			hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
582 			put64bit(&buff,usedspace);
583 			put64bit(&buff,totalspace);
584 			put32bit(&buff,chunkcount);
585 			put64bit(&buff,tdusedspace);
586 			put64bit(&buff,tdtotalspace);
587 			put32bit(&buff,tdchunkcount);
588 		}
589 	}
590 }
591 
masterconn_check_hdd_reports()592 void masterconn_check_hdd_reports() {
593 	masterconn *eptr = masterconnsingleton;
594 	uint32_t errorcounter;
595 	uint32_t chunkcounter;
596 	uint8_t *buff;
597 	if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
598 		errorcounter = hdd_errorcounter();
599 		while (errorcounter) {
600 			masterconn_create_attached_packet(eptr,CSTOMA_ERROR_OCCURRED,0);
601 			errorcounter--;
602 		}
603 		chunkcounter = hdd_get_damaged_chunk_count();	// lock
604 		if (chunkcounter) {
605 			buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_DAMAGED,8*chunkcounter);
606 			hdd_get_damaged_chunk_data(buff);	// unlock
607 		} else {
608 			hdd_get_damaged_chunk_data(NULL);
609 		}
610 		chunkcounter = hdd_get_lost_chunk_count(LOSTCHUNKLIMIT);	// lock
611 		if (chunkcounter) {
612 			buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_LOST,8*chunkcounter);
613 			hdd_get_lost_chunk_data(buff,LOSTCHUNKLIMIT);	// unlock
614 		} else {
615 			hdd_get_lost_chunk_data(NULL,0);
616 		}
617 		chunkcounter = hdd_get_new_chunk_count(NEWCHUNKLIMIT);	// lock
618 		if (chunkcounter) {
619 			buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_NEW,12*chunkcounter);
620 			hdd_get_new_chunk_data(buff,NEWCHUNKLIMIT);	// unlock
621 		} else {
622 			hdd_get_new_chunk_data(NULL,0);
623 		}
624 	}
625 }
626 
masterconn_reportload(void)627 void masterconn_reportload(void) {
628 	masterconn *eptr = masterconnsingleton;
629 	uint32_t load;
630 	uint8_t *buff;
631 	if (eptr->mode==DATA && eptr->masterversion>=VERSION2INT(1,6,28) && eptr->registerstate==REGISTERED) {
632 		load = job_getload();
633 		buff = masterconn_create_attached_packet(eptr,CSTOMA_CURRENT_LOAD,4);
634 		put32bit(&buff,load);
635 	}
636 }
637 
masterconn_jobfinished(uint8_t status,void * packet)638 void masterconn_jobfinished(uint8_t status,void *packet) {
639 	uint8_t *ptr;
640 	masterconn *eptr = masterconnsingleton;
641 	if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
642 		ptr = masterconn_get_packet_data(packet);
643 		ptr[8]=status;
644 		masterconn_attach_packet(eptr,packet);
645 	} else {
646 		masterconn_delete_packet(packet);
647 	}
648 }
649 
masterconn_chunkopfinished(uint8_t status,void * packet)650 void masterconn_chunkopfinished(uint8_t status,void *packet) {
651 	uint8_t *ptr;
652 	masterconn *eptr = masterconnsingleton;
653 	if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
654 		ptr = masterconn_get_packet_data(packet);
655 		ptr[32]=status;
656 		masterconn_attach_packet(eptr,packet);
657 	} else {
658 		masterconn_delete_packet(packet);
659 	}
660 }
661 
masterconn_replicationfinished(uint8_t status,void * packet)662 void masterconn_replicationfinished(uint8_t status,void *packet) {
663 	uint8_t *ptr;
664 	masterconn *eptr = masterconnsingleton;
665 //	syslog(LOG_NOTICE,"job replication status: %"PRIu8,status);
666 	if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
667 		ptr = masterconn_get_packet_data(packet);
668 		ptr[12]=status;
669 		masterconn_attach_packet(eptr,packet);
670 	} else {
671 		masterconn_delete_packet(packet);
672 	}
673 }
674 
masterconn_create(masterconn * eptr,const uint8_t * data,uint32_t length)675 void masterconn_create(masterconn *eptr,const uint8_t *data,uint32_t length) {
676 	uint64_t chunkid;
677 	uint32_t version;
678 	uint8_t *ptr;
679 	void *packet;
680 
681 	if (length!=8+4) {
682 		syslog(LOG_NOTICE,"MATOCS_CREATE - wrong size (%"PRIu32"/12)",length);
683 		eptr->mode = KILL;
684 		return;
685 	}
686 	chunkid = get64bit(&data);
687 	version = get32bit(&data);
688 	packet = masterconn_create_detached_packet(eptr,CSTOMA_CREATE,8+1);
689 	ptr = masterconn_get_packet_data(packet);
690 	put64bit(&ptr,chunkid);
691 	job_create(masterconn_jobfinished,packet,chunkid,version);
692 }
693 
masterconn_delete(masterconn * eptr,const uint8_t * data,uint32_t length)694 void masterconn_delete(masterconn *eptr,const uint8_t *data,uint32_t length) {
695 	uint64_t chunkid;
696 	uint32_t version;
697 	uint8_t *ptr;
698 	void *packet;
699 
700 	if (length!=8+4) {
701 		syslog(LOG_NOTICE,"MATOCS_DELETE - wrong size (%"PRIu32"/12)",length);
702 		eptr->mode = KILL;
703 		return;
704 	}
705 	chunkid = get64bit(&data);
706 	version = get32bit(&data);
707 	packet = masterconn_create_detached_packet(eptr,CSTOMA_DELETE,8+1);
708 	ptr = masterconn_get_packet_data(packet);
709 	put64bit(&ptr,chunkid);
710 	job_delete(masterconn_jobfinished,packet,chunkid,version);
711 }
712 
masterconn_setversion(masterconn * eptr,const uint8_t * data,uint32_t length)713 void masterconn_setversion(masterconn *eptr,const uint8_t *data,uint32_t length) {
714 	uint64_t chunkid;
715 	uint32_t version;
716 	uint32_t newversion;
717 	uint8_t *ptr;
718 	void *packet;
719 
720 	if (length!=8+4+4) {
721 		syslog(LOG_NOTICE,"MATOCS_SET_VERSION - wrong size (%"PRIu32"/16)",length);
722 		eptr->mode = KILL;
723 		return;
724 	}
725 	chunkid = get64bit(&data);
726 	newversion = get32bit(&data);
727 	version = get32bit(&data);
728 	packet = masterconn_create_detached_packet(eptr,CSTOMA_SET_VERSION,8+1);
729 	ptr = masterconn_get_packet_data(packet);
730 	put64bit(&ptr,chunkid);
731 	job_version(masterconn_jobfinished,packet,chunkid,version,newversion);
732 }
733 
masterconn_duplicate(masterconn * eptr,const uint8_t * data,uint32_t length)734 void masterconn_duplicate(masterconn *eptr,const uint8_t *data,uint32_t length) {
735 	uint64_t chunkid;
736 	uint32_t version;
737 	uint64_t copychunkid;
738 	uint32_t copyversion;
739 	uint8_t *ptr;
740 	void *packet;
741 
742 	if (length!=8+4+8+4) {
743 		syslog(LOG_NOTICE,"MATOCS_DUPLICATE - wrong size (%"PRIu32"/24)",length);
744 		eptr->mode = KILL;
745 		return;
746 	}
747 	copychunkid = get64bit(&data);
748 	copyversion = get32bit(&data);
749 	chunkid = get64bit(&data);
750 	version = get32bit(&data);
751 	packet = masterconn_create_detached_packet(eptr,CSTOMA_DUPLICATE,8+1);
752 	ptr = masterconn_get_packet_data(packet);
753 	put64bit(&ptr,copychunkid);
754 	job_duplicate(masterconn_jobfinished,packet,chunkid,version,version,copychunkid,copyversion);
755 }
756 
masterconn_truncate(masterconn * eptr,const uint8_t * data,uint32_t length)757 void masterconn_truncate(masterconn *eptr,const uint8_t *data,uint32_t length) {
758 	uint64_t chunkid;
759 	uint32_t version;
760 	uint32_t leng;
761 	uint32_t newversion;
762 	uint8_t *ptr;
763 	void *packet;
764 
765 	if (length!=8+4+4+4) {
766 		syslog(LOG_NOTICE,"MATOCS_TRUNCATE - wrong size (%"PRIu32"/20)",length);
767 		eptr->mode = KILL;
768 		return;
769 	}
770 	chunkid = get64bit(&data);
771 	leng = get32bit(&data);
772 	newversion = get32bit(&data);
773 	version = get32bit(&data);
774 	packet = masterconn_create_detached_packet(eptr,CSTOMA_TRUNCATE,8+1);
775 	ptr = masterconn_get_packet_data(packet);
776 	put64bit(&ptr,chunkid);
777 	job_truncate(masterconn_jobfinished,packet,chunkid,version,newversion,leng);
778 }
779 
masterconn_duptrunc(masterconn * eptr,const uint8_t * data,uint32_t length)780 void masterconn_duptrunc(masterconn *eptr,const uint8_t *data,uint32_t length) {
781 	uint64_t chunkid;
782 	uint32_t version;
783 	uint64_t copychunkid;
784 	uint32_t copyversion;
785 	uint32_t leng;
786 	uint8_t *ptr;
787 	void *packet;
788 
789 	if (length!=8+4+8+4+4) {
790 		syslog(LOG_NOTICE,"MATOCS_DUPTRUNC - wrong size (%"PRIu32"/28)",length);
791 		eptr->mode = KILL;
792 		return;
793 	}
794 	copychunkid = get64bit(&data);
795 	copyversion = get32bit(&data);
796 	chunkid = get64bit(&data);
797 	version = get32bit(&data);
798 	leng = get32bit(&data);
799 	packet = masterconn_create_detached_packet(eptr,CSTOMA_DUPTRUNC,8+1);
800 	ptr = masterconn_get_packet_data(packet);
801 	put64bit(&ptr,copychunkid);
802 	job_duptrunc(masterconn_jobfinished,packet,chunkid,version,version,copychunkid,copyversion,leng);
803 }
804 
masterconn_chunkop(masterconn * eptr,const uint8_t * data,uint32_t length)805 void masterconn_chunkop(masterconn *eptr,const uint8_t *data,uint32_t length) {
806 	uint64_t chunkid;
807 	uint32_t version,newversion;
808 	uint64_t copychunkid;
809 	uint32_t copyversion;
810 	uint32_t leng;
811 	uint8_t *ptr;
812 	void *packet;
813 
814 	if (length!=8+4+8+4+4+4) {
815 		syslog(LOG_NOTICE,"MATOCS_CHUNKOP - wrong size (%"PRIu32"/32)",length);
816 		eptr->mode = KILL;
817 		return;
818 	}
819 	chunkid = get64bit(&data);
820 	version = get32bit(&data);
821 	newversion = get32bit(&data);
822 	copychunkid = get64bit(&data);
823 	copyversion = get32bit(&data);
824 	leng = get32bit(&data);
825 	packet = masterconn_create_detached_packet(eptr,CSTOMA_CHUNKOP,8+4+4+8+4+4+1);
826 	ptr = masterconn_get_packet_data(packet);
827 	put64bit(&ptr,chunkid);
828 	put32bit(&ptr,version);
829 	put32bit(&ptr,newversion);
830 	put64bit(&ptr,copychunkid);
831 	put32bit(&ptr,copyversion);
832 	put32bit(&ptr,leng);
833 	job_chunkop(masterconn_chunkopfinished,packet,chunkid,version,newversion,copychunkid,copyversion,leng);
834 }
835 
masterconn_replicate(masterconn * eptr,const uint8_t * data,uint32_t length)836 void masterconn_replicate(masterconn *eptr,const uint8_t *data,uint32_t length) {
837 	uint64_t chunkid;
838 	uint32_t version;
839 	uint32_t ip;
840 	uint16_t port;
841 	uint8_t *ptr;
842 	void *packet;
843 
844 	if (length!=8+4+4+2 && (length<12+18 || length>12+18*100 || (length-12)%18!=0)) {
845 		syslog(LOG_NOTICE,"MATOCS_REPLICATE - wrong size (%"PRIu32"/18|12+n*18[n:1..100])",length);
846 		eptr->mode = KILL;
847 		return;
848 	}
849 	chunkid = get64bit(&data);
850 	version = get32bit(&data);
851 	packet = masterconn_create_detached_packet(eptr,CSTOMA_REPLICATE,8+4+1);
852 	ptr = masterconn_get_packet_data(packet);
853 	put64bit(&ptr,chunkid);
854 	put32bit(&ptr,version);
855 	if (length==8+4+4+2) {
856 		ip = get32bit(&data);
857 		port = get16bit(&data);
858 //		syslog(LOG_NOTICE,"start job replication (%08"PRIX64":%04"PRIX32":%04"PRIX32":%02"PRIX16")",chunkid,version,ip,port);
859 		job_replicate_simple(masterconn_replicationfinished,packet,chunkid,version,ip,port);
860 	} else {
861 		job_replicate(masterconn_replicationfinished,packet,chunkid,version,(length-12)/18,data);
862 	}
863 }
864 
masterconn_idlejob_finished(uint8_t status,void * ijp)865 void masterconn_idlejob_finished(uint8_t status,void *ijp) {
866 	idlejob *ij = (idlejob*)ijp;
867 	masterconn *eptr = masterconnsingleton;
868 	uint8_t *ptr;
869 
870 	if (eptr && eptr->mode == DATA && ij->valid) {
871 		switch (ij->op) {
872 			case IJ_GET_CHUNK_BLOCKS:
873 				ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_BLOCKS,8+4+2+1);
874 				put64bit(&ptr,ij->chunkid);
875 				put32bit(&ptr,ij->version);
876 				memcpy(ptr,ij->buff,2);
877 				ptr+=2;
878 				put8bit(&ptr,status);
879 				break;
880 			case IJ_GET_CHUNK_CHECKSUM:
881 				if (status!=STATUS_OK) {
882 					ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+1);
883 				} else {
884 					ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+4);
885 				}
886 				put64bit(&ptr,ij->chunkid);
887 				put32bit(&ptr,ij->version);
888 				if (status!=STATUS_OK) {
889 					put8bit(&ptr,status);
890 				} else {
891 					memcpy(ptr,ij->buff,4);
892 				}
893 				break;
894 			case IJ_GET_CHUNK_CHECKSUM_TAB:
895 				if (status!=STATUS_OK) {
896 					ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+1);
897 				} else {
898 					ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+4096);
899 				}
900 				put64bit(&ptr,ij->chunkid);
901 				put32bit(&ptr,ij->version);
902 				if (status!=STATUS_OK) {
903 					put8bit(&ptr,status);
904 				} else {
905 					memcpy(ptr,ij->buff,4096);
906 				}
907 				break;
908 		}
909 	}
910 	if (ij->valid) {
911 		*(ij->prev) = ij->next;
912 		if (ij->next) {
913 			ij->next->prev = ij->prev;
914 		}
915 	}
916 	free(ij);
917 }
918 
masterconn_get_chunk_blocks(masterconn * eptr,const uint8_t * data,uint32_t length)919 void masterconn_get_chunk_blocks(masterconn *eptr,const uint8_t *data,uint32_t length) {
920 	idlejob *ij;
921 
922 	if (length!=8+4) {
923 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_BLOCKS - wrong size (%"PRIu32"/12)",length);
924 		eptr->mode = KILL;
925 		return;
926 	}
927 	ij = malloc(offsetof(idlejob,buff)+2);
928 	ij->op = IJ_GET_CHUNK_BLOCKS;
929 	ij->chunkid = get64bit(&data);
930 	ij->version = get32bit(&data);
931 	ij->valid = 1;
932 	ij->next = idlejobs;
933 	ij->prev = &(idlejobs);
934 	idlejobs = ij;
935 	ij->jobid = job_get_chunk_blocks(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
936 }
937 
masterconn_get_chunk_checksum(masterconn * eptr,const uint8_t * data,uint32_t length)938 void masterconn_get_chunk_checksum(masterconn *eptr,const uint8_t *data,uint32_t length) {
939 	idlejob *ij;
940 
941 	if (length!=8+4) {
942 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM - wrong size (%"PRIu32"/12)",length);
943 		eptr->mode = KILL;
944 		return;
945 	}
946 	ij = malloc(offsetof(idlejob,buff)+4);
947 	ij->op = IJ_GET_CHUNK_CHECKSUM;
948 	ij->chunkid = get64bit(&data);
949 	ij->version = get32bit(&data);
950 	ij->valid = 1;
951 	ij->next = idlejobs;
952 	ij->prev = &(idlejobs);
953 	idlejobs = ij;
954 	ij->jobid = job_get_chunk_checksum(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
955 }
956 
masterconn_get_chunk_checksum_tab(masterconn * eptr,const uint8_t * data,uint32_t length)957 void masterconn_get_chunk_checksum_tab(masterconn *eptr,const uint8_t *data,uint32_t length) {
958 	idlejob *ij;
959 
960 	if (length!=8+4) {
961 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM_TAB - wrong size (%"PRIu32"/12)",length);
962 		eptr->mode = KILL;
963 		return;
964 	}
965 	ij = malloc(offsetof(idlejob,buff)+4096);
966 	ij->op = IJ_GET_CHUNK_CHECKSUM_TAB;
967 	ij->chunkid = get64bit(&data);
968 	ij->version = get32bit(&data);
969 	ij->valid = 1;
970 	ij->next = idlejobs;
971 	ij->prev = &(idlejobs);
972 	idlejobs = ij;
973 	ij->jobid = job_get_chunk_checksum_tab(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
974 }
975 
976 
masterconn_gotpacket(masterconn * eptr,uint32_t type,const uint8_t * data,uint32_t length)977 void masterconn_gotpacket(masterconn *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
978 	switch (type) {
979 		case ANTOAN_NOP:
980 			eptr->masteraddrvalid = 1;
981 			if (eptr->registerstate==UNREGISTERED) {
982 				eptr->registerstate=REGISTERED;
983 				masterconn_sendchunksinfo(eptr);
984 			}
985 			break;
986 		case ANTOAN_UNKNOWN_COMMAND: // for future use
987 			break;
988 		case ANTOAN_BAD_COMMAND_SIZE: // for future use
989 			break;
990 		case MATOCS_CREATE:
991 			masterconn_create(eptr,data,length);
992 			break;
993 		case MATOCS_DELETE:
994 			masterconn_delete(eptr,data,length);
995 			break;
996 		case MATOCS_SET_VERSION:
997 			masterconn_setversion(eptr,data,length);
998 			break;
999 		case MATOCS_DUPLICATE:
1000 			masterconn_duplicate(eptr,data,length);
1001 			break;
1002 		case MATOCS_REPLICATE:
1003 			masterconn_replicate(eptr,data,length);
1004 			break;
1005 		case MATOCS_CHUNKOP:
1006 			masterconn_chunkop(eptr,data,length);
1007 			break;
1008 		case MATOCS_TRUNCATE:
1009 			masterconn_truncate(eptr,data,length);
1010 			break;
1011 		case MATOCS_DUPTRUNC:
1012 			masterconn_duptrunc(eptr,data,length);
1013 			break;
1014 		case ANTOCS_GET_CHUNK_BLOCKS:
1015 			masterconn_get_chunk_blocks(eptr,data,length);
1016 			break;
1017 		case ANTOCS_GET_CHUNK_CHECKSUM:
1018 			masterconn_get_chunk_checksum(eptr,data,length);
1019 			break;
1020 		case ANTOCS_GET_CHUNK_CHECKSUM_TAB:
1021 			masterconn_get_chunk_checksum_tab(eptr,data,length);
1022 			break;
1023 		case MATOCS_MASTER_ACK:
1024 			eptr->masteraddrvalid = 1;
1025 			eptr->new_register_mode = 3;
1026 			masterconn_master_ack(eptr,data,length);
1027 			break;
1028 		default:
1029 			syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
1030 			eptr->mode = KILL;
1031 	}
1032 }
1033 
1034 
masterconn_connected(masterconn * eptr)1035 void masterconn_connected(masterconn *eptr) {
1036 	double now;
1037 
1038 	now = monotonic_seconds();
1039 
1040 	tcpnodelay(eptr->sock);
1041 	eptr->mode = DATA;
1042 	eptr->lastread = now;
1043 	eptr->lastwrite = now;
1044 	eptr->input_bytesleft = 8;
1045 	eptr->input_startptr = eptr->input_hdr;
1046 	eptr->input_end = 0;
1047 	eptr->input_packet = NULL;
1048 	eptr->inputhead = NULL;
1049 	eptr->inputtail = &(eptr->inputhead);
1050 	eptr->outputhead = NULL;
1051 	eptr->outputtail = &(eptr->outputhead);
1052 	eptr->conncnt++;
1053 	eptr->masterversion = 0;
1054 	eptr->registerstate = UNREGISTERED;
1055 
1056 	masterconn_sendregister(eptr);
1057 }
1058 
masterconn_initconnect(masterconn * eptr)1059 int masterconn_initconnect(masterconn *eptr) {
1060 	int status;
1061 	if (eptr->masteraddrvalid==0) {
1062 		uint32_t mip,bip;
1063 		uint16_t mport;
1064 		if (tcpresolve(BindHost,NULL,&bip,NULL,1)<0) {
1065 			bip = 0;
1066 		}
1067 		eptr->bindip = bip;
1068 		if (tcpresolve(MasterHost,MasterPort,&mip,&mport,0)>=0) {
1069 			if ((mip&0xFF000000)!=0x7F000000) {
1070 //				eptr->new_register_mode = 3;
1071 				eptr->masterip = mip;
1072 				eptr->masterport = mport;
1073 			} else {
1074 				mfs_arg_syslog(LOG_WARNING,"master connection module: localhost (%u.%u.%u.%u) can't be used for connecting with master (use ip address of network controller)",(mip>>24)&0xFF,(mip>>16)&0xFF,(mip>>8)&0xFF,mip&0xFF);
1075 				return -1;
1076 			}
1077 		} else {
1078 			mfs_arg_syslog(LOG_WARNING,"master connection module: can't resolve master host/port (%s:%s)",MasterHost,MasterPort);
1079 			return -1;
1080 		}
1081 	}
1082 	eptr->masteraddrvalid = 0;
1083 	eptr->sock=tcpsocket();
1084 	if (eptr->sock<0) {
1085 		mfs_errlog(LOG_WARNING,"master connection module: create socket error");
1086 		return -1;
1087 	}
1088 	if (tcpnonblock(eptr->sock)<0) {
1089 		mfs_errlog(LOG_WARNING,"master connection module: set nonblock error");
1090 		tcpclose(eptr->sock);
1091 		eptr->sock = -1;
1092 		return -1;
1093 	}
1094 	if (eptr->bindip>0) {
1095 		if (tcpnumbind(eptr->sock,eptr->bindip,0)<0) {
1096 			mfs_errlog(LOG_WARNING,"master connection module: can't bind socket to given ip");
1097 			tcpclose(eptr->sock);
1098 			eptr->sock = -1;
1099 			return -1;
1100 		}
1101 	}
1102 	status = tcpnumconnect(eptr->sock,eptr->masterip,eptr->masterport);
1103 	if (status<0) {
1104 		mfs_errlog(LOG_WARNING,"master connection module: connect failed");
1105 		tcpclose(eptr->sock);
1106 		eptr->sock = -1;
1107 		return -1;
1108 	}
1109 	if (status==0) {
1110 		syslog(LOG_NOTICE,"connected to Master immediately");
1111 		masterconn_connected(eptr);
1112 	} else {
1113 		eptr->mode = CONNECTING;
1114 		eptr->conntime = monotonic_seconds();
1115 		syslog(LOG_NOTICE,"connecting ...");
1116 	}
1117 	return 0;
1118 }
1119 
masterconn_connecttimeout(masterconn * eptr)1120 void masterconn_connecttimeout(masterconn *eptr) {
1121 	syslog(LOG_WARNING,"connection timed out");
1122 	tcpclose(eptr->sock);
1123 	eptr->sock = -1;
1124 	eptr->mode = FREE;
1125 	eptr->masteraddrvalid = 0;
1126 }
1127 
masterconn_connecttest(masterconn * eptr)1128 void masterconn_connecttest(masterconn *eptr) {
1129 	int status;
1130 
1131 	status = tcpgetstatus(eptr->sock);
1132 	if (status) {
1133 		mfs_errlog_silent(LOG_WARNING,"connection failed, error");
1134 		tcpclose(eptr->sock);
1135 		eptr->sock = -1;
1136 		eptr->mode = FREE;
1137 		eptr->masteraddrvalid = 0;
1138 	} else {
1139 		syslog(LOG_NOTICE,"connected to Master");
1140 		masterconn_connected(eptr);
1141 	}
1142 }
1143 
masterconn_read(masterconn * eptr,double now)1144 void masterconn_read(masterconn *eptr,double now) {
1145 	int32_t i;
1146 	uint32_t type,leng;
1147 	const uint8_t *ptr;
1148 	uint32_t rbleng,rbpos;
1149 	uint8_t err,hup;
1150 	static uint8_t *readbuff = NULL;
1151 	static uint32_t readbuffsize = 0;
1152 
1153 	if (eptr == NULL) {
1154 		if (readbuff != NULL) {
1155 			free(readbuff);
1156 		}
1157 		readbuff = NULL;
1158 		readbuffsize = 0;
1159 		return;
1160 	}
1161 
1162 	if (readbuffsize==0) {
1163 		readbuffsize = 65536;
1164 		readbuff = malloc(readbuffsize);
1165 		passert(readbuff);
1166 	}
1167 
1168 	rbleng = 0;
1169 	err = 0;
1170 	hup = 0;
1171 	for (;;) {
1172 		i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
1173 		if (i==0) {
1174 			hup = 1;
1175 			break;
1176 		} else if (i<0) {
1177 			if (ERRNO_ERROR) {
1178 				err = 1;
1179 			}
1180 			break;
1181 		} else {
1182 			stats_bytesin+=i;
1183 			rbleng += i;
1184 			if (rbleng==readbuffsize) {
1185 				readbuffsize*=2;
1186 				readbuff = realloc(readbuff,readbuffsize);
1187 				passert(readbuff);
1188 			} else {
1189 				break;
1190 			}
1191 		}
1192 	}
1193 
1194 	if (rbleng>0) {
1195 		eptr->lastread = now;
1196 	}
1197 
1198 	rbpos = 0;
1199 	while (rbpos<rbleng) {
1200 		if ((rbleng-rbpos)>=eptr->input_bytesleft) {
1201 			memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
1202 			i = eptr->input_bytesleft;
1203 		} else {
1204 			memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
1205 			i = rbleng-rbpos;
1206 		}
1207 		rbpos += i;
1208 		eptr->input_startptr+=i;
1209 		eptr->input_bytesleft-=i;
1210 
1211 		if (eptr->input_bytesleft>0) {
1212 			break;
1213 		}
1214 
1215 		if (eptr->input_packet == NULL) {
1216 			ptr = eptr->input_hdr;
1217 			type = get32bit(&ptr);
1218 			leng = get32bit(&ptr);
1219 
1220 			if (leng>MaxPacketSize) {
1221 				syslog(LOG_WARNING,"Master packet too long (%"PRIu32"/%u)",leng,MaxPacketSize);
1222 				eptr->input_end = 1;
1223 				return;
1224 			}
1225 
1226 			eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
1227 			passert(eptr->input_packet);
1228 			eptr->input_packet->next = NULL;
1229 			eptr->input_packet->type = type;
1230 			eptr->input_packet->leng = leng;
1231 
1232 			eptr->input_startptr = eptr->input_packet->data;
1233 			eptr->input_bytesleft = leng;
1234 		}
1235 
1236 		if (eptr->input_bytesleft>0) {
1237 			continue;
1238 		}
1239 
1240 		if (eptr->input_packet != NULL) {
1241 			*(eptr->inputtail) = eptr->input_packet;
1242 			eptr->inputtail = &(eptr->input_packet->next);
1243 			eptr->input_packet = NULL;
1244 			eptr->input_bytesleft = 8;
1245 			eptr->input_startptr = eptr->input_hdr;
1246 		}
1247 	}
1248 
1249 	if (hup) {
1250 		syslog(LOG_NOTICE,"connection was reset by Master");
1251 		eptr->input_end = 1;
1252 	} else if (err) {
1253 		mfs_errlog_silent(LOG_NOTICE,"read from Master error");
1254 		eptr->input_end = 1;
1255 	}
1256 }
1257 
masterconn_parse(masterconn * eptr)1258 void masterconn_parse(masterconn *eptr) {
1259 	in_packetstruct *ipack;
1260 	uint64_t starttime;
1261 	uint64_t currtime;
1262 
1263 	starttime = monotonic_useconds();
1264 	currtime = starttime;
1265 	while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
1266 		masterconn_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
1267 		eptr->inputhead = ipack->next;
1268 		free(ipack);
1269 		if (eptr->inputhead==NULL) {
1270 			eptr->inputtail = &(eptr->inputhead);
1271 		} else {
1272 			currtime = monotonic_useconds();
1273 		}
1274 	}
1275 	if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
1276 		eptr->mode = KILL;
1277 	}
1278 }
1279 
masterconn_write(masterconn * eptr,double now)1280 void masterconn_write(masterconn *eptr,double now) {
1281 	out_packetstruct *opack;
1282 	int32_t i;
1283 #ifdef HAVE_WRITEV
1284 	struct iovec iovtab[100];
1285 	uint32_t iovdata;
1286 	uint32_t leng;
1287 	uint32_t left;
1288 
1289 	for (;;) {
1290 		leng = 0;
1291 		for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
1292 			iovtab[iovdata].iov_base = opack->startptr;
1293 			iovtab[iovdata].iov_len = opack->bytesleft;
1294 			leng += opack->bytesleft;
1295 		}
1296 		if (iovdata==0) {
1297 			return;
1298 		}
1299 		i = writev(eptr->sock,iovtab,iovdata);
1300 		if (i<0) {
1301 			if (ERRNO_ERROR) {
1302 				mfs_errlog_silent(LOG_NOTICE,"write to Master error");
1303 				eptr->mode = KILL;
1304 			}
1305 			return;
1306 		}
1307 		if (i>0) {
1308 			eptr->lastwrite = now;
1309 		}
1310 		stats_bytesout+=i;
1311 		left = i;
1312 		while (left>0 && eptr->outputhead!=NULL) {
1313 			opack = eptr->outputhead;
1314 			if (opack->bytesleft>left) {
1315 				opack->startptr+=left;
1316 				opack->bytesleft-=left;
1317 				left = 0;
1318 			} else {
1319 				left -= opack->bytesleft;
1320 				eptr->outputhead = opack->next;
1321 				if (eptr->outputhead==NULL) {
1322 					eptr->outputtail = &(eptr->outputhead);
1323 				}
1324 				free(opack);
1325 			}
1326 		}
1327 		if ((uint32_t)i < leng) {
1328 			return;
1329 		}
1330 	}
1331 #else
1332 	for (;;) {
1333 		opack = eptr->outputhead;
1334 		if (opack==NULL) {
1335 			return;
1336 		}
1337 		i=write(eptr->sock,opack->startptr,opack->bytesleft);
1338 		if (i<0) {
1339 			if (ERRNO_ERROR) {
1340 				mfs_errlog_silent(LOG_NOTICE,"write to Master error");
1341 				eptr->mode = KILL;
1342 			}
1343 			return;
1344 		}
1345 		if (i>0) {
1346 			eptr->lastwrite = now;
1347 		}
1348 		stats_bytesout+=i;
1349 		opack->startptr+=i;
1350 		opack->bytesleft-=i;
1351 		if (opack->bytesleft>0) {
1352 			return;
1353 		}
1354 		eptr->outputhead = opack->next;
1355 		if (eptr->outputhead==NULL) {
1356 			eptr->outputtail = &(eptr->outputhead);
1357 		}
1358 		free(opack);
1359 	}
1360 #endif
1361 }
1362 
1363 
masterconn_desc(struct pollfd * pdesc,uint32_t * ndesc)1364 void masterconn_desc(struct pollfd *pdesc,uint32_t *ndesc) {
1365 	uint32_t pos = *ndesc;
1366 	masterconn *eptr = masterconnsingleton;
1367 
1368 	eptr->pdescpos = -1;
1369 	if (eptr->mode==FREE || eptr->sock<0) {
1370 		return;
1371 	}
1372 	pdesc[pos].events = 0;
1373 	if (eptr->mode==DATA && eptr->input_end==0) {
1374 		pdesc[pos].events |= POLLIN;
1375 	}
1376 	if ((eptr->mode==DATA && eptr->outputhead!=NULL) || eptr->mode==CONNECTING) {
1377 		pdesc[pos].events |= POLLOUT;
1378 	}
1379 	if (pdesc[pos].events!=0) {
1380 		pdesc[pos].fd = eptr->sock;
1381 		eptr->pdescpos = pos;
1382 		pos++;
1383 	}
1384 	*ndesc = pos;
1385 }
1386 
masterconn_disconnection_check(void)1387 void masterconn_disconnection_check(void) {
1388 	masterconn *eptr = masterconnsingleton;
1389 	in_packetstruct *ipptr,*ipaptr;
1390 	out_packetstruct *opptr,*opaptr;
1391 	idlejob *ij,*nij;
1392 
1393 	if (eptr->mode == KILL || eptr->mode == CLOSE) {
1394 		// masterconn_beforeclose(eptr);
1395 		tcpclose(eptr->sock);
1396 		if (eptr->input_packet) {
1397 			free(eptr->input_packet);
1398 		}
1399 		ipptr = eptr->inputhead;
1400 		while (ipptr) {
1401 			ipaptr = ipptr;
1402 			ipptr = ipptr->next;
1403 			free(ipaptr);
1404 		}
1405 		opptr = eptr->outputhead;
1406 		while (opptr) {
1407 			opaptr = opptr;
1408 			opptr = opptr->next;
1409 			free(opaptr);
1410 		}
1411 		for (ij=idlejobs ; ij ; ij=nij) {
1412 			nij = ij->next;
1413 			job_pool_disable_job(ij->jobid);
1414 			ij->next = NULL;
1415 			ij->prev = NULL;
1416 			ij->valid = 0;
1417 		}
1418 		idlejobs = NULL;
1419 		if (eptr->registerstate == INPROGRESS) {
1420 			hdd_get_chunks_end();
1421 		}
1422 		if (eptr->registerstate == UNREGISTERED && eptr->mode==KILL) {
1423 			if (eptr->new_register_mode>0) {
1424 				eptr->new_register_mode--;
1425 			} else {
1426 				eptr->new_register_mode=3;
1427 			}
1428 			if (eptr->new_register_mode==0) {
1429 				eptr->masteraddrvalid = 1; // switch to old register mode and try again using same address
1430 			} else {
1431 				eptr->masteraddrvalid = 0; // in new register mode always resolve master address
1432 			}
1433 		}
1434 		eptr->mode = FREE;
1435 	}
1436 }
1437 
masterconn_serve(struct pollfd * pdesc)1438 void masterconn_serve(struct pollfd *pdesc) {
1439 	double now;
1440 	masterconn *eptr = masterconnsingleton;
1441 
1442 	now = monotonic_seconds();
1443 
1444 	if (eptr->mode==CONNECTING) {
1445 		if (eptr->sock>=0 && eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLOUT | POLLHUP | POLLERR))) { // FD_ISSET(eptr->sock,wset)) {
1446 			masterconn_connecttest(eptr);
1447 		} else if (eptr->conntime+1.0 < now) {
1448 			masterconn_connecttimeout(eptr);
1449 		}
1450 	} else {
1451 		if (eptr->pdescpos>=0) {
1452 			if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode==DATA) {
1453 				masterconn_read(eptr,now);
1454 			}
1455 			if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1456 				syslog(LOG_NOTICE,"masterconn: connection closed by master");
1457 				eptr->input_end = 1;
1458 			}
1459 			masterconn_parse(eptr);
1460 		}
1461 		if (eptr->mode==DATA && eptr->lastwrite+(eptr->timeout/3.0)<now && eptr->outputhead==NULL) {
1462 			masterconn_create_attached_packet(eptr,ANTOAN_NOP,0);
1463 		}
1464 		if (eptr->pdescpos>=0) {
1465 			if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode==DATA) {
1466 				masterconn_write(eptr,now);
1467 			}
1468 		}
1469 		if (eptr->mode==DATA && eptr->lastread+eptr->timeout<now) {
1470 			syslog(LOG_NOTICE,"masterconn: connection timed out");
1471 			eptr->mode = KILL;
1472 		}
1473 	}
1474 	if (wantexittime>0.0 && wantexittime+FORCE_DISCONNECTION_TO < now) {
1475 		eptr->mode = KILL;
1476 	}
1477 	masterconn_disconnection_check();
1478 }
1479 
masterconn_reconnect(void)1480 void masterconn_reconnect(void) {
1481 	masterconn *eptr = masterconnsingleton;
1482 	if (eptr->mode==FREE && wantexittime==0.0) {
1483 		masterconn_initconnect(eptr);
1484 	}
1485 }
1486 
masterconn_term(void)1487 void masterconn_term(void) {
1488 	masterconn *eptr = masterconnsingleton;
1489 	in_packetstruct *ipptr,*ipaptr;
1490 	out_packetstruct *opptr,*opaptr;
1491 
1492 	if (eptr->mode!=FREE) {
1493 		tcpclose(eptr->sock);
1494 		if (eptr->mode!=CONNECTING) {
1495 			if (eptr->input_packet) {
1496 				free(eptr->input_packet);
1497 			}
1498 			ipptr = eptr->inputhead;
1499 			while (ipptr) {
1500 				ipaptr = ipptr;
1501 				ipptr = ipptr->next;
1502 				free(ipaptr);
1503 			}
1504 			opptr = eptr->outputhead;
1505 			while (opptr) {
1506 				opaptr = opptr;
1507 				opptr = opptr->next;
1508 				free(opaptr);
1509 			}
1510 		}
1511 	}
1512 
1513 	masterconn_read(NULL,0.0); // free internal read buffer
1514 
1515 	free(eptr);
1516 
1517 	free(MasterHost);
1518 	free(MasterPort);
1519 	free(BindHost);
1520 	masterconnsingleton = NULL;
1521 }
1522 
masterconn_reload(void)1523 void masterconn_reload(void) {
1524 	masterconn *eptr = masterconnsingleton;
1525 	uint32_t ReconnectionDelay;
1526 
1527 	free(MasterHost);
1528 	free(MasterPort);
1529 	free(BindHost);
1530 
1531 	MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1532 	MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CS_PORT);
1533 	BindHost = cfg_getstr("BIND_HOST","*");
1534 
1535 	eptr->masteraddrvalid = 0;
1536 	if (eptr->mode!=FREE) {
1537 		eptr->mode = KILL;
1538 	}
1539 	Timeout = cfg_getuint32("MASTER_TIMEOUT",0);
1540 
1541 	ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1542 
1543 	if (Timeout>65535) {
1544 		Timeout=65535;
1545 	}
1546 	if (Timeout<10 && Timeout>0) {
1547 		Timeout=10;
1548 	}
1549 
1550 	main_time_change(reconnect_hook,ReconnectionDelay,0);
1551 }
1552 
masterconn_wantexit(void)1553 void masterconn_wantexit(void) {
1554 	wantexittime = monotonic_seconds();
1555 }
1556 
masterconn_init(void)1557 int masterconn_init(void) {
1558 	uint32_t ReconnectionDelay;
1559 	masterconn *eptr;
1560 
1561 	masterconn_initcsid();
1562 
1563 	manager_time_hook = NULL;
1564 
1565 	ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1566 	MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1567 	MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CS_PORT);
1568 	BindHost = cfg_getstr("BIND_HOST","*");
1569 	Timeout = cfg_getuint32("MASTER_TIMEOUT",0);
1570 //	BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1571 
1572 	if (Timeout>65535) {
1573 		Timeout=65535;
1574 	}
1575 	if (Timeout<10 && Timeout>0) {
1576 		Timeout=10;
1577 	}
1578 	eptr = masterconnsingleton = malloc(sizeof(masterconn));
1579 	passert(eptr);
1580 
1581 	eptr->masteraddrvalid = 0;
1582 	eptr->new_register_mode = 3;
1583 	eptr->masterversion = 0;
1584 	eptr->mode = FREE;
1585 	eptr->pdescpos = -1;
1586 	eptr->conncnt = 0;
1587 	if (Timeout>0) {
1588 		eptr->timeout = Timeout;
1589 	} else {
1590 		eptr->timeout = 10;
1591 	}
1592 //	logfd = NULL;
1593 
1594 	wantexittime = 0.0;
1595 
1596 	if (masterconn_initconnect(eptr)<0) {
1597 		return -1;
1598 	}
1599 
1600 	main_time_register(REPORT_LOAD_FREQ,0,masterconn_reportload);
1601 	main_time_register(REPORT_SPACE_FREQ,0,masterconn_check_hdd_space);
1602 	main_eachloop_register(masterconn_check_hdd_reports);
1603 	reconnect_hook = main_time_register(ReconnectionDelay,rndu32_ranged(ReconnectionDelay),masterconn_reconnect);
1604 	main_destruct_register(masterconn_term);
1605 	main_poll_register(masterconn_desc,masterconn_serve);
1606 	main_wantexit_register(masterconn_wantexit);
1607 //	main_canexit_register(masterconn_canexit);
1608 	main_reload_register(masterconn_reload);
1609 	return 0;
1610 }
1611