1 /*
2 * Copyright (C) 2016 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <time.h>
26 #include <stddef.h>
27 #include <sys/types.h>
28 #include <sys/uio.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <syslog.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <netinet/in.h>
38 #ifdef HAVE_WRITEV
39 #include <sys/uio.h>
40 #endif
41
42 #include "MFSCommunication.h"
43 #include "datapack.h"
44 #include "masterconn.h"
45 #include "cfg.h"
46 #include "main.h"
47 #include "sockets.h"
48 #include "hddspacemgr.h"
49 #include "slogger.h"
50 #include "massert.h"
51 #include "random.h"
52 #include "bgjobs.h"
53 #include "csserv.h"
54 #include "clocks.h"
55
56 #define MaxPacketSize MATOCS_MAXPACKETSIZE
57
58 // has to be less than MaxPacketSize on master side divided by 8
59 #define LOSTCHUNKLIMIT 25000
60 // has to be less than MaxPacketSize on master side divided by 12
61 #define NEWCHUNKLIMIT 25000
62
63 #define REPORT_LOAD_FREQ 60
64 #define REPORT_SPACE_FREQ 1
65
66 // force disconnection X seconds after term signal
67 #define FORCE_DISCONNECTION_TO 5.0
68
69 // mode
70 enum {FREE,CONNECTING,DATA,KILL,CLOSE};
71
72 enum {IJ_GET_CHUNK_BLOCKS,IJ_GET_CHUNK_CHECKSUM,IJ_GET_CHUNK_CHECKSUM_TAB};
73
74 // masterconn.registerstate
75 enum {UNREGISTERED,WAITING,INPROGRESS,REGISTERED};
76
77 typedef struct idlejob {
78 uint32_t jobid;
79 uint8_t op;
80 uint8_t valid;
81 uint64_t chunkid;
82 uint32_t version;
83 struct idlejob *next,**prev;
84 uint8_t buff[1];
85 } idlejob;
86
87 typedef struct out_packetstruct {
88 struct out_packetstruct *next;
89 uint8_t *startptr;
90 uint32_t bytesleft;
91 uint32_t conncnt;
92 uint8_t data[1];
93 } out_packetstruct;
94
95 typedef struct in_packetstruct {
96 struct in_packetstruct *next;
97 uint32_t type,leng;
98 uint8_t data[1];
99 } in_packetstruct;
100
101 typedef struct masterconn {
102 uint8_t mode;
103 int sock;
104 int32_t pdescpos;
105 double lastread,lastwrite,conntime;
106 uint8_t input_hdr[8];
107 uint8_t *input_startptr;
108 uint32_t input_bytesleft;
109 uint8_t input_end;
110 in_packetstruct *input_packet;
111 in_packetstruct *inputhead,**inputtail;
112 out_packetstruct *outputhead,**outputtail;
113
114 uint32_t masterversion;
115 uint32_t conncnt;
116 uint32_t bindip;
117 uint32_t masterip;
118 uint16_t masterport;
119 uint16_t timeout;
120 uint8_t masteraddrvalid;
121 uint8_t registerstate;
122 uint8_t new_register_mode;
123 // uint8_t accepted;
124 } masterconn;
125
126 static masterconn *masterconnsingleton=NULL;
127 static idlejob *idlejobs=NULL;
128 static uint8_t csidvalid = 0;
129 static void *reconnect_hook;
130 static void *manager_time_hook;
131 static double wantexittime = 0.0;
132
133 static uint64_t stats_bytesout=0;
134 static uint64_t stats_bytesin=0;
135
136 // from config
137 // static uint32_t BackLogsNumber;
138 static char *MasterHost;
139 static char *MasterPort;
140 static char *BindHost;
141 static uint32_t Timeout;
142 static uint16_t ChunkServerId = 0;
143 static uint64_t MetaFileId = 0;
144
145 // static FILE *logfd;
146
masterconn_stats(uint64_t * bin,uint64_t * bout)147 void masterconn_stats(uint64_t *bin,uint64_t *bout) {
148 *bin = stats_bytesin;
149 *bout = stats_bytesout;
150 stats_bytesin = 0;
151 stats_bytesout = 0;
152 }
153
masterconn_initcsid(void)154 static inline void masterconn_initcsid(void) {
155 int fd;
156 uint8_t buff[10];
157 const uint8_t *rptr;
158 ssize_t ret;
159 if (csidvalid) {
160 return;
161 }
162 ChunkServerId = 0;
163 MetaFileId = 0;
164 csidvalid = 1;
165 fd = open("chunkserverid.mfs",O_RDWR);
166 if (fd>=0) {
167 ret = read(fd,buff,10);
168 rptr = buff;
169 if (ret>=2) {
170 ChunkServerId = get16bit(&rptr);
171 }
172 if (ret>=10) {
173 MetaFileId = get64bit(&rptr);
174 }
175 close(fd);
176 }
177 }
178
masterconn_getcsid(void)179 uint16_t masterconn_getcsid(void) {
180 masterconn_initcsid();
181 return ChunkServerId;
182 }
183
masterconn_getmetaid(void)184 uint64_t masterconn_getmetaid(void) {
185 masterconn_initcsid();
186 return MetaFileId;
187 }
188
masterconn_setcsid(uint16_t csid,uint64_t metafileid)189 static inline void masterconn_setcsid(uint16_t csid,uint64_t metafileid) {
190 int fd;
191 uint8_t buff[10],*wptr;
192 if (ChunkServerId!=csid || MetaFileId!=metafileid) {
193 if (csid>0) {
194 ChunkServerId = csid;
195 }
196 if (metafileid>0) {
197 MetaFileId = metafileid;
198 }
199 wptr = buff;
200 put16bit(&wptr,ChunkServerId);
201 put64bit(&wptr,MetaFileId);
202 fd = open("chunkserverid.mfs",O_CREAT | O_TRUNC | O_RDWR,0666);
203 if (fd>=0) {
204 if (write(fd,buff,10)!=10) {
205 syslog(LOG_WARNING,"can't store chunkserver id (write error)");
206 }
207 close(fd);
208 } else {
209 syslog(LOG_WARNING,"can't store chunkserver id (open error)");
210 }
211 }
212 }
213
masterconn_getmasterip(void)214 uint32_t masterconn_getmasterip(void) {
215 masterconn *eptr = masterconnsingleton;
216 if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
217 return eptr->masterip;
218 }
219 return 0;
220 }
221
masterconn_getmasterport(void)222 uint16_t masterconn_getmasterport(void) {
223 masterconn *eptr = masterconnsingleton;
224 if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
225 return eptr->masterport;
226 }
227 return 0;
228 }
229
masterconn_create_detached_packet(masterconn * eptr,uint32_t type,uint32_t size)230 void* masterconn_create_detached_packet(masterconn *eptr,uint32_t type,uint32_t size) {
231 out_packetstruct *outpacket;
232 uint8_t *ptr;
233 uint32_t psize;
234
235 psize = size+8;
236 outpacket=malloc(offsetof(out_packetstruct,data)+psize);
237 passert(outpacket);
238 outpacket->bytesleft = psize;
239 ptr = outpacket->data;
240 put32bit(&ptr,type);
241 put32bit(&ptr,size);
242 outpacket->startptr = outpacket->data;
243 outpacket->next = NULL;
244 outpacket->conncnt = eptr->conncnt;
245 return outpacket;
246 }
247
masterconn_get_packet_data(void * packet)248 uint8_t* masterconn_get_packet_data(void *packet) {
249 out_packetstruct *outpacket = (out_packetstruct*)packet;
250 return (outpacket->data+8);
251 }
252
masterconn_delete_packet(void * packet)253 void masterconn_delete_packet(void *packet) {
254 free(packet);
255 }
256
masterconn_attach_packet(masterconn * eptr,void * packet)257 void masterconn_attach_packet(masterconn *eptr,void *packet) {
258 out_packetstruct *outpacket = (out_packetstruct*)packet;
259 *(eptr->outputtail) = outpacket;
260 eptr->outputtail = &(outpacket->next);
261 }
262
masterconn_create_attached_packet(masterconn * eptr,uint32_t type,uint32_t size)263 uint8_t* masterconn_create_attached_packet(masterconn *eptr,uint32_t type,uint32_t size) {
264 out_packetstruct *outpacket;
265 uint8_t *ptr;
266 uint32_t psize;
267
268 psize = size+8;
269 outpacket=malloc(offsetof(out_packetstruct,data)+psize);
270 passert(outpacket);
271 outpacket->bytesleft = psize;
272 ptr = outpacket->data;
273 put32bit(&ptr,type);
274 put32bit(&ptr,size);
275 outpacket->startptr = outpacket->data;
276 outpacket->next = NULL;
277 *(eptr->outputtail) = outpacket;
278 eptr->outputtail = &(outpacket->next);
279 return ptr;
280 }
281
masterconn_sendregister(masterconn * eptr)282 void masterconn_sendregister(masterconn *eptr) {
283 uint8_t *buff;
284 uint32_t myip;
285 uint16_t myport;
286 uint64_t usedspace,totalspace;
287 uint64_t tdusedspace,tdtotalspace;
288 uint32_t chunkcount,tdchunkcount;
289
290
291 myip = csserv_getlistenip();
292 myport = csserv_getlistenport();
293 if (eptr->new_register_mode) {
294 #ifdef MFSDEBUG
295 syslog(LOG_NOTICE,"register ver. 6 - init + space info");
296 #endif
297 hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
298 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2+2+8+8+4+8+8+4);
299 put8bit(&buff,60);
300 put32bit(&buff,VERSHEX);
301 put32bit(&buff,myip);
302 put16bit(&buff,myport);
303 put16bit(&buff,Timeout);
304 put16bit(&buff,masterconn_getcsid());
305 put64bit(&buff,usedspace);
306 put64bit(&buff,totalspace);
307 put32bit(&buff,chunkcount);
308 put64bit(&buff,tdusedspace);
309 put64bit(&buff,tdtotalspace);
310 put32bit(&buff,tdchunkcount);
311 } else {
312 #ifdef MFSDEBUG
313 syslog(LOG_NOTICE,"register ver. 5 - init");
314 #endif
315 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2);
316 put8bit(&buff,50);
317 put32bit(&buff,VERSHEX);
318 put32bit(&buff,myip);
319 put16bit(&buff,myport);
320 if (Timeout>0) {
321 put16bit(&buff,Timeout);
322 } else {
323 put16bit(&buff,10);
324 }
325 }
326 }
327
masterconn_sendchunksinfo(masterconn * eptr)328 void masterconn_sendchunksinfo(masterconn *eptr) {
329 uint8_t *buff;
330 uint32_t chunks;
331 uint64_t usedspace,totalspace;
332 uint64_t tdusedspace,tdtotalspace;
333 uint32_t chunkcount,tdchunkcount;
334
335 #ifdef MFSDEBUG
336 syslog(LOG_NOTICE,"register ver. %u - chunks info",(eptr->new_register_mode)?6:5);
337 #endif
338 hdd_get_chunks_begin(0);
339 while ((chunks = hdd_get_chunks_next_list_count())) {
340 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+chunks*(8+4));
341 if (eptr->new_register_mode) {
342 put8bit(&buff,61);
343 } else {
344 put8bit(&buff,51);
345 }
346 hdd_get_chunks_next_list_data(buff);
347 }
348 hdd_get_chunks_end();
349 if (eptr->new_register_mode) {
350 #ifdef MFSDEBUG
351 syslog(LOG_NOTICE,"register ver. 6 - end");
352 #endif
353 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1);
354 put8bit(&buff,62);
355 } else {
356 #ifdef MFSDEBUG
357 syslog(LOG_NOTICE,"register ver. 5 - end + space info");
358 #endif
359 hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
360 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+8+8+4+8+8+4);
361 put8bit(&buff,52);
362 put64bit(&buff,usedspace);
363 put64bit(&buff,totalspace);
364 put32bit(&buff,chunkcount);
365 put64bit(&buff,tdusedspace);
366 put64bit(&buff,tdtotalspace);
367 put32bit(&buff,tdchunkcount);
368 }
369 }
370
masterconn_sendnextchunks(masterconn * eptr)371 void masterconn_sendnextchunks(masterconn *eptr) {
372 uint8_t *buff;
373 uint32_t chunks;
374 chunks = hdd_get_chunks_next_list_count();
375 if (chunks==0) {
376 hdd_get_chunks_end();
377 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1);
378 put8bit(&buff,62);
379 eptr->registerstate = REGISTERED;
380 } else {
381 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+chunks*(8+4));
382 put8bit(&buff,61);
383 hdd_get_chunks_next_list_data(buff);
384 }
385 }
386
masterconn_master_ack(masterconn * eptr,const uint8_t * data,uint32_t length)387 void masterconn_master_ack(masterconn *eptr,const uint8_t *data,uint32_t length) {
388 uint8_t atype;
389 uint64_t metafileid;
390 uint16_t csid;
391 if (length!=17 && length!=15 && length!=9 && length!=7 && length!=5 && length!=1) {
392 syslog(LOG_NOTICE,"MATOCS_MASTER_ACK - wrong size (%"PRIu32"/1|5|7|9|15|17)",length);
393 eptr->mode = KILL;
394 return;
395 }
396 atype = get8bit(&data);
397 if (atype==0) {
398 csid = 0;
399 metafileid = 0;
400 if (length>=5) {
401 eptr->masterversion = get32bit(&data);
402 }
403 if (length>=9) {
404 if (Timeout==0) {
405 eptr->timeout = get16bit(&data);
406 } else {
407 data+=2;
408 }
409 csid = get16bit(&data);
410 }
411 if (length>=17) {
412 metafileid = get64bit(&data);
413 if (metafileid>0 && MetaFileId>0 && metafileid!=MetaFileId) { // wrong MFS instance - abort
414 syslog(LOG_WARNING,"MATOCS_MASTER_ACK - wrong meta data id. Can't connect to master");
415 eptr->registerstate = REGISTERED; // do not switch to register ver. 5
416 eptr->mode = KILL;
417 return;
418 }
419 }
420 if (csid>0 || metafileid>0) {
421 masterconn_setcsid(csid,metafileid);
422 }
423 if (eptr->masterversion<VERSION2INT(2,0,0)) {
424 if (eptr->registerstate != REGISTERED) {
425 if (eptr->registerstate == INPROGRESS) {
426 hdd_get_chunks_end();
427 }
428 eptr->registerstate = REGISTERED;
429 masterconn_sendchunksinfo(eptr);
430 }
431 } else {
432 if (eptr->registerstate == UNREGISTERED || eptr->registerstate == WAITING) {
433 hdd_get_chunks_begin(1);
434 eptr->registerstate = INPROGRESS;
435 }
436 if (eptr->registerstate == INPROGRESS) {
437 masterconn_sendnextchunks(eptr);
438 }
439 }
440 } else if (atype==1 && length==5) {
441 uint32_t mip;
442 mip = get32bit(&data);
443 if (mip) {
444 // redirect to leader
445 eptr->masterip = mip;
446 eptr->new_register_mode = 3;
447 if (eptr->registerstate == INPROGRESS) {
448 hdd_get_chunks_end();
449 }
450 eptr->registerstate = WAITING;
451 #ifdef MFSDEBUG
452 syslog(LOG_NOTICE,"masterconn: redirected to other master");
453 #endif
454 } else {
455 // leader not known - just reconnect
456 eptr->masteraddrvalid = 0;
457 syslog(LOG_NOTICE,"masterconn: follower doesn't know who is the leader, reconnect to another master");
458 }
459 eptr->mode = CLOSE;
460 } else if (atype==2 && (length==7 || length==15)) {
461 #ifdef MFSDEBUG
462 syslog(LOG_NOTICE,"masterconn: wait for acceptance");
463 #endif
464 if (eptr->registerstate == INPROGRESS) {
465 hdd_get_chunks_end();
466 }
467 eptr->registerstate = WAITING;
468 eptr->masterversion = get32bit(&data);
469 if (Timeout==0) {
470 eptr->timeout = get16bit(&data);
471 } else {
472 data+=2;
473 }
474 if (length>=15) {
475 metafileid = get64bit(&data);
476 if (metafileid>0 && MetaFileId>0 && metafileid!=MetaFileId) { // wrong MFS instance - abort
477 syslog(LOG_WARNING,"MATOCS_MASTER_ACK - wrong meta data id. Can't connect to master");
478 eptr->registerstate = REGISTERED; // do not switch to register ver. 5
479 eptr->mode = KILL;
480 return;
481 }
482 }
483 } else {
484 syslog(LOG_NOTICE,"MATOCS_MASTER_ACK - bad type/length: %u/%u",atype,length);
485 eptr->mode = KILL;
486 }
487 }
488
489 /*
490 void masterconn_sendregister_v4(masterconn *eptr) {
491 uint8_t *buff;
492 uint32_t chunks,myip;
493 uint16_t myport;
494 uint64_t usedspace,totalspace;
495 uint64_t tdusedspace,tdtotalspace;
496 uint32_t chunkcount,tdchunkcount;
497
498 myip = csserv_getlistenip();
499 myport = csserv_getlistenport();
500 hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
501 hdd_get_chunks_begin();
502 chunks = hdd_get_chunks_count();
503 buff = masterconn_create_attached_packet(eptr,CSTOMA_REGISTER,1+4+4+2+2+8+8+4+8+8+4+chunks*(8+4));
504 put8bit(&buff,4);
505 put16bit(&buff,VERSMAJ);
506 put8bit(&buff,VERSMID);
507 put8bit(&buff,VERSMIN);
508 put32bit(&buff,myip);
509 put16bit(&buff,myport);
510 put16bit(&buff,Timeout);
511 put64bit(&buff,usedspace);
512 put64bit(&buff,totalspace);
513 put32bit(&buff,chunkcount);
514 put64bit(&buff,tdusedspace);
515 put64bit(&buff,tdtotalspace);
516 put32bit(&buff,tdchunkcount);
517 if (chunks>0) {
518 hdd_get_chunks_data(buff);
519 }
520 hdd_get_chunks_end();
521 }
522 */
523 /*
524 void masterconn_send_space(uint64_t usedspace,uint64_t totalspace,uint32_t chunkcount,uint64_t tdusedspace,uint64_t tdtotalspace,uint32_t tdchunkcount) {
525 uint8_t *buff;
526 masterconn *eptr = masterconnsingleton;
527
528 // syslog(LOG_NOTICE,"%"PRIu64",%"PRIu64,usedspace,totalspace);
529 if (eptr->mode==DATA || eptr->mode==HEADER) {
530 buff = masterconn_create_attached_packet(eptr,CSTOMA_SPACE,8+8+4+8+8+4);
531 if (buff) {
532 put64bit(&buff,usedspace);
533 put64bit(&buff,totalspace);
534 put32bit(&buff,chunkcount);
535 put64bit(&buff,tdusedspace);
536 put64bit(&buff,tdtotalspace);
537 put32bit(&buff,tdchunkcount);
538 }
539 }
540 }
541 */
542 /*
543 void masterconn_send_chunk_damaged(uint64_t chunkid) {
544 uint8_t *buff;
545 masterconn *eptr = masterconnsingleton;
546 if (eptr->mode==DATA || eptr->mode==HEADER) {
547 buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_DAMAGED,8);
548 if (buff) {
549 put64bit(&buff,chunkid);
550 }
551 }
552 }
553
554 void masterconn_send_chunk_lost(uint64_t chunkid) {
555 uint8_t *buff;
556 masterconn *eptr = masterconnsingleton;
557 if (eptr->mode==DATA || eptr->mode==HEADER) {
558 buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_LOST,8);
559 if (buff) {
560 put64bit(&buff,chunkid);
561 }
562 }
563 }
564
565 void masterconn_send_error_occurred() {
566 masterconn *eptr = masterconnsingleton;
567 if (eptr->mode==DATA || eptr->mode==HEADER) {
568 masterconn_create_attached_packet(eptr,CSTOMA_ERROR_OCCURRED,0);
569 }
570 }
571 */
572
masterconn_check_hdd_space()573 void masterconn_check_hdd_space() {
574 masterconn *eptr = masterconnsingleton;
575 uint8_t *buff;
576 if ((eptr->registerstate==REGISTERED || eptr->registerstate==INPROGRESS) && eptr->mode==DATA) {
577 if (hdd_spacechanged()) {
578 uint64_t usedspace,totalspace,tdusedspace,tdtotalspace;
579 uint32_t chunkcount,tdchunkcount;
580 buff = masterconn_create_attached_packet(eptr,CSTOMA_SPACE,8+8+4+8+8+4);
581 hdd_get_space(&usedspace,&totalspace,&chunkcount,&tdusedspace,&tdtotalspace,&tdchunkcount);
582 put64bit(&buff,usedspace);
583 put64bit(&buff,totalspace);
584 put32bit(&buff,chunkcount);
585 put64bit(&buff,tdusedspace);
586 put64bit(&buff,tdtotalspace);
587 put32bit(&buff,tdchunkcount);
588 }
589 }
590 }
591
masterconn_check_hdd_reports()592 void masterconn_check_hdd_reports() {
593 masterconn *eptr = masterconnsingleton;
594 uint32_t errorcounter;
595 uint32_t chunkcounter;
596 uint8_t *buff;
597 if (eptr->registerstate==REGISTERED && eptr->mode==DATA) {
598 errorcounter = hdd_errorcounter();
599 while (errorcounter) {
600 masterconn_create_attached_packet(eptr,CSTOMA_ERROR_OCCURRED,0);
601 errorcounter--;
602 }
603 chunkcounter = hdd_get_damaged_chunk_count(); // lock
604 if (chunkcounter) {
605 buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_DAMAGED,8*chunkcounter);
606 hdd_get_damaged_chunk_data(buff); // unlock
607 } else {
608 hdd_get_damaged_chunk_data(NULL);
609 }
610 chunkcounter = hdd_get_lost_chunk_count(LOSTCHUNKLIMIT); // lock
611 if (chunkcounter) {
612 buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_LOST,8*chunkcounter);
613 hdd_get_lost_chunk_data(buff,LOSTCHUNKLIMIT); // unlock
614 } else {
615 hdd_get_lost_chunk_data(NULL,0);
616 }
617 chunkcounter = hdd_get_new_chunk_count(NEWCHUNKLIMIT); // lock
618 if (chunkcounter) {
619 buff = masterconn_create_attached_packet(eptr,CSTOMA_CHUNK_NEW,12*chunkcounter);
620 hdd_get_new_chunk_data(buff,NEWCHUNKLIMIT); // unlock
621 } else {
622 hdd_get_new_chunk_data(NULL,0);
623 }
624 }
625 }
626
masterconn_reportload(void)627 void masterconn_reportload(void) {
628 masterconn *eptr = masterconnsingleton;
629 uint32_t load;
630 uint8_t *buff;
631 if (eptr->mode==DATA && eptr->masterversion>=VERSION2INT(1,6,28) && eptr->registerstate==REGISTERED) {
632 load = job_getload();
633 buff = masterconn_create_attached_packet(eptr,CSTOMA_CURRENT_LOAD,4);
634 put32bit(&buff,load);
635 }
636 }
637
masterconn_jobfinished(uint8_t status,void * packet)638 void masterconn_jobfinished(uint8_t status,void *packet) {
639 uint8_t *ptr;
640 masterconn *eptr = masterconnsingleton;
641 if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
642 ptr = masterconn_get_packet_data(packet);
643 ptr[8]=status;
644 masterconn_attach_packet(eptr,packet);
645 } else {
646 masterconn_delete_packet(packet);
647 }
648 }
649
masterconn_chunkopfinished(uint8_t status,void * packet)650 void masterconn_chunkopfinished(uint8_t status,void *packet) {
651 uint8_t *ptr;
652 masterconn *eptr = masterconnsingleton;
653 if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
654 ptr = masterconn_get_packet_data(packet);
655 ptr[32]=status;
656 masterconn_attach_packet(eptr,packet);
657 } else {
658 masterconn_delete_packet(packet);
659 }
660 }
661
masterconn_replicationfinished(uint8_t status,void * packet)662 void masterconn_replicationfinished(uint8_t status,void *packet) {
663 uint8_t *ptr;
664 masterconn *eptr = masterconnsingleton;
665 // syslog(LOG_NOTICE,"job replication status: %"PRIu8,status);
666 if (eptr && eptr->conncnt==((out_packetstruct*)packet)->conncnt && eptr->mode==DATA) {
667 ptr = masterconn_get_packet_data(packet);
668 ptr[12]=status;
669 masterconn_attach_packet(eptr,packet);
670 } else {
671 masterconn_delete_packet(packet);
672 }
673 }
674
masterconn_create(masterconn * eptr,const uint8_t * data,uint32_t length)675 void masterconn_create(masterconn *eptr,const uint8_t *data,uint32_t length) {
676 uint64_t chunkid;
677 uint32_t version;
678 uint8_t *ptr;
679 void *packet;
680
681 if (length!=8+4) {
682 syslog(LOG_NOTICE,"MATOCS_CREATE - wrong size (%"PRIu32"/12)",length);
683 eptr->mode = KILL;
684 return;
685 }
686 chunkid = get64bit(&data);
687 version = get32bit(&data);
688 packet = masterconn_create_detached_packet(eptr,CSTOMA_CREATE,8+1);
689 ptr = masterconn_get_packet_data(packet);
690 put64bit(&ptr,chunkid);
691 job_create(masterconn_jobfinished,packet,chunkid,version);
692 }
693
masterconn_delete(masterconn * eptr,const uint8_t * data,uint32_t length)694 void masterconn_delete(masterconn *eptr,const uint8_t *data,uint32_t length) {
695 uint64_t chunkid;
696 uint32_t version;
697 uint8_t *ptr;
698 void *packet;
699
700 if (length!=8+4) {
701 syslog(LOG_NOTICE,"MATOCS_DELETE - wrong size (%"PRIu32"/12)",length);
702 eptr->mode = KILL;
703 return;
704 }
705 chunkid = get64bit(&data);
706 version = get32bit(&data);
707 packet = masterconn_create_detached_packet(eptr,CSTOMA_DELETE,8+1);
708 ptr = masterconn_get_packet_data(packet);
709 put64bit(&ptr,chunkid);
710 job_delete(masterconn_jobfinished,packet,chunkid,version);
711 }
712
masterconn_setversion(masterconn * eptr,const uint8_t * data,uint32_t length)713 void masterconn_setversion(masterconn *eptr,const uint8_t *data,uint32_t length) {
714 uint64_t chunkid;
715 uint32_t version;
716 uint32_t newversion;
717 uint8_t *ptr;
718 void *packet;
719
720 if (length!=8+4+4) {
721 syslog(LOG_NOTICE,"MATOCS_SET_VERSION - wrong size (%"PRIu32"/16)",length);
722 eptr->mode = KILL;
723 return;
724 }
725 chunkid = get64bit(&data);
726 newversion = get32bit(&data);
727 version = get32bit(&data);
728 packet = masterconn_create_detached_packet(eptr,CSTOMA_SET_VERSION,8+1);
729 ptr = masterconn_get_packet_data(packet);
730 put64bit(&ptr,chunkid);
731 job_version(masterconn_jobfinished,packet,chunkid,version,newversion);
732 }
733
masterconn_duplicate(masterconn * eptr,const uint8_t * data,uint32_t length)734 void masterconn_duplicate(masterconn *eptr,const uint8_t *data,uint32_t length) {
735 uint64_t chunkid;
736 uint32_t version;
737 uint64_t copychunkid;
738 uint32_t copyversion;
739 uint8_t *ptr;
740 void *packet;
741
742 if (length!=8+4+8+4) {
743 syslog(LOG_NOTICE,"MATOCS_DUPLICATE - wrong size (%"PRIu32"/24)",length);
744 eptr->mode = KILL;
745 return;
746 }
747 copychunkid = get64bit(&data);
748 copyversion = get32bit(&data);
749 chunkid = get64bit(&data);
750 version = get32bit(&data);
751 packet = masterconn_create_detached_packet(eptr,CSTOMA_DUPLICATE,8+1);
752 ptr = masterconn_get_packet_data(packet);
753 put64bit(&ptr,copychunkid);
754 job_duplicate(masterconn_jobfinished,packet,chunkid,version,version,copychunkid,copyversion);
755 }
756
masterconn_truncate(masterconn * eptr,const uint8_t * data,uint32_t length)757 void masterconn_truncate(masterconn *eptr,const uint8_t *data,uint32_t length) {
758 uint64_t chunkid;
759 uint32_t version;
760 uint32_t leng;
761 uint32_t newversion;
762 uint8_t *ptr;
763 void *packet;
764
765 if (length!=8+4+4+4) {
766 syslog(LOG_NOTICE,"MATOCS_TRUNCATE - wrong size (%"PRIu32"/20)",length);
767 eptr->mode = KILL;
768 return;
769 }
770 chunkid = get64bit(&data);
771 leng = get32bit(&data);
772 newversion = get32bit(&data);
773 version = get32bit(&data);
774 packet = masterconn_create_detached_packet(eptr,CSTOMA_TRUNCATE,8+1);
775 ptr = masterconn_get_packet_data(packet);
776 put64bit(&ptr,chunkid);
777 job_truncate(masterconn_jobfinished,packet,chunkid,version,newversion,leng);
778 }
779
masterconn_duptrunc(masterconn * eptr,const uint8_t * data,uint32_t length)780 void masterconn_duptrunc(masterconn *eptr,const uint8_t *data,uint32_t length) {
781 uint64_t chunkid;
782 uint32_t version;
783 uint64_t copychunkid;
784 uint32_t copyversion;
785 uint32_t leng;
786 uint8_t *ptr;
787 void *packet;
788
789 if (length!=8+4+8+4+4) {
790 syslog(LOG_NOTICE,"MATOCS_DUPTRUNC - wrong size (%"PRIu32"/28)",length);
791 eptr->mode = KILL;
792 return;
793 }
794 copychunkid = get64bit(&data);
795 copyversion = get32bit(&data);
796 chunkid = get64bit(&data);
797 version = get32bit(&data);
798 leng = get32bit(&data);
799 packet = masterconn_create_detached_packet(eptr,CSTOMA_DUPTRUNC,8+1);
800 ptr = masterconn_get_packet_data(packet);
801 put64bit(&ptr,copychunkid);
802 job_duptrunc(masterconn_jobfinished,packet,chunkid,version,version,copychunkid,copyversion,leng);
803 }
804
masterconn_chunkop(masterconn * eptr,const uint8_t * data,uint32_t length)805 void masterconn_chunkop(masterconn *eptr,const uint8_t *data,uint32_t length) {
806 uint64_t chunkid;
807 uint32_t version,newversion;
808 uint64_t copychunkid;
809 uint32_t copyversion;
810 uint32_t leng;
811 uint8_t *ptr;
812 void *packet;
813
814 if (length!=8+4+8+4+4+4) {
815 syslog(LOG_NOTICE,"MATOCS_CHUNKOP - wrong size (%"PRIu32"/32)",length);
816 eptr->mode = KILL;
817 return;
818 }
819 chunkid = get64bit(&data);
820 version = get32bit(&data);
821 newversion = get32bit(&data);
822 copychunkid = get64bit(&data);
823 copyversion = get32bit(&data);
824 leng = get32bit(&data);
825 packet = masterconn_create_detached_packet(eptr,CSTOMA_CHUNKOP,8+4+4+8+4+4+1);
826 ptr = masterconn_get_packet_data(packet);
827 put64bit(&ptr,chunkid);
828 put32bit(&ptr,version);
829 put32bit(&ptr,newversion);
830 put64bit(&ptr,copychunkid);
831 put32bit(&ptr,copyversion);
832 put32bit(&ptr,leng);
833 job_chunkop(masterconn_chunkopfinished,packet,chunkid,version,newversion,copychunkid,copyversion,leng);
834 }
835
masterconn_replicate(masterconn * eptr,const uint8_t * data,uint32_t length)836 void masterconn_replicate(masterconn *eptr,const uint8_t *data,uint32_t length) {
837 uint64_t chunkid;
838 uint32_t version;
839 uint32_t ip;
840 uint16_t port;
841 uint8_t *ptr;
842 void *packet;
843
844 if (length!=8+4+4+2 && (length<12+18 || length>12+18*100 || (length-12)%18!=0)) {
845 syslog(LOG_NOTICE,"MATOCS_REPLICATE - wrong size (%"PRIu32"/18|12+n*18[n:1..100])",length);
846 eptr->mode = KILL;
847 return;
848 }
849 chunkid = get64bit(&data);
850 version = get32bit(&data);
851 packet = masterconn_create_detached_packet(eptr,CSTOMA_REPLICATE,8+4+1);
852 ptr = masterconn_get_packet_data(packet);
853 put64bit(&ptr,chunkid);
854 put32bit(&ptr,version);
855 if (length==8+4+4+2) {
856 ip = get32bit(&data);
857 port = get16bit(&data);
858 // syslog(LOG_NOTICE,"start job replication (%08"PRIX64":%04"PRIX32":%04"PRIX32":%02"PRIX16")",chunkid,version,ip,port);
859 job_replicate_simple(masterconn_replicationfinished,packet,chunkid,version,ip,port);
860 } else {
861 job_replicate(masterconn_replicationfinished,packet,chunkid,version,(length-12)/18,data);
862 }
863 }
864
masterconn_idlejob_finished(uint8_t status,void * ijp)865 void masterconn_idlejob_finished(uint8_t status,void *ijp) {
866 idlejob *ij = (idlejob*)ijp;
867 masterconn *eptr = masterconnsingleton;
868 uint8_t *ptr;
869
870 if (eptr && eptr->mode == DATA && ij->valid) {
871 switch (ij->op) {
872 case IJ_GET_CHUNK_BLOCKS:
873 ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_BLOCKS,8+4+2+1);
874 put64bit(&ptr,ij->chunkid);
875 put32bit(&ptr,ij->version);
876 memcpy(ptr,ij->buff,2);
877 ptr+=2;
878 put8bit(&ptr,status);
879 break;
880 case IJ_GET_CHUNK_CHECKSUM:
881 if (status!=STATUS_OK) {
882 ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+1);
883 } else {
884 ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+4);
885 }
886 put64bit(&ptr,ij->chunkid);
887 put32bit(&ptr,ij->version);
888 if (status!=STATUS_OK) {
889 put8bit(&ptr,status);
890 } else {
891 memcpy(ptr,ij->buff,4);
892 }
893 break;
894 case IJ_GET_CHUNK_CHECKSUM_TAB:
895 if (status!=STATUS_OK) {
896 ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+1);
897 } else {
898 ptr = masterconn_create_attached_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+4096);
899 }
900 put64bit(&ptr,ij->chunkid);
901 put32bit(&ptr,ij->version);
902 if (status!=STATUS_OK) {
903 put8bit(&ptr,status);
904 } else {
905 memcpy(ptr,ij->buff,4096);
906 }
907 break;
908 }
909 }
910 if (ij->valid) {
911 *(ij->prev) = ij->next;
912 if (ij->next) {
913 ij->next->prev = ij->prev;
914 }
915 }
916 free(ij);
917 }
918
masterconn_get_chunk_blocks(masterconn * eptr,const uint8_t * data,uint32_t length)919 void masterconn_get_chunk_blocks(masterconn *eptr,const uint8_t *data,uint32_t length) {
920 idlejob *ij;
921
922 if (length!=8+4) {
923 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_BLOCKS - wrong size (%"PRIu32"/12)",length);
924 eptr->mode = KILL;
925 return;
926 }
927 ij = malloc(offsetof(idlejob,buff)+2);
928 ij->op = IJ_GET_CHUNK_BLOCKS;
929 ij->chunkid = get64bit(&data);
930 ij->version = get32bit(&data);
931 ij->valid = 1;
932 ij->next = idlejobs;
933 ij->prev = &(idlejobs);
934 idlejobs = ij;
935 ij->jobid = job_get_chunk_blocks(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
936 }
937
masterconn_get_chunk_checksum(masterconn * eptr,const uint8_t * data,uint32_t length)938 void masterconn_get_chunk_checksum(masterconn *eptr,const uint8_t *data,uint32_t length) {
939 idlejob *ij;
940
941 if (length!=8+4) {
942 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM - wrong size (%"PRIu32"/12)",length);
943 eptr->mode = KILL;
944 return;
945 }
946 ij = malloc(offsetof(idlejob,buff)+4);
947 ij->op = IJ_GET_CHUNK_CHECKSUM;
948 ij->chunkid = get64bit(&data);
949 ij->version = get32bit(&data);
950 ij->valid = 1;
951 ij->next = idlejobs;
952 ij->prev = &(idlejobs);
953 idlejobs = ij;
954 ij->jobid = job_get_chunk_checksum(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
955 }
956
masterconn_get_chunk_checksum_tab(masterconn * eptr,const uint8_t * data,uint32_t length)957 void masterconn_get_chunk_checksum_tab(masterconn *eptr,const uint8_t *data,uint32_t length) {
958 idlejob *ij;
959
960 if (length!=8+4) {
961 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM_TAB - wrong size (%"PRIu32"/12)",length);
962 eptr->mode = KILL;
963 return;
964 }
965 ij = malloc(offsetof(idlejob,buff)+4096);
966 ij->op = IJ_GET_CHUNK_CHECKSUM_TAB;
967 ij->chunkid = get64bit(&data);
968 ij->version = get32bit(&data);
969 ij->valid = 1;
970 ij->next = idlejobs;
971 ij->prev = &(idlejobs);
972 idlejobs = ij;
973 ij->jobid = job_get_chunk_checksum_tab(masterconn_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
974 }
975
976
masterconn_gotpacket(masterconn * eptr,uint32_t type,const uint8_t * data,uint32_t length)977 void masterconn_gotpacket(masterconn *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
978 switch (type) {
979 case ANTOAN_NOP:
980 eptr->masteraddrvalid = 1;
981 if (eptr->registerstate==UNREGISTERED) {
982 eptr->registerstate=REGISTERED;
983 masterconn_sendchunksinfo(eptr);
984 }
985 break;
986 case ANTOAN_UNKNOWN_COMMAND: // for future use
987 break;
988 case ANTOAN_BAD_COMMAND_SIZE: // for future use
989 break;
990 case MATOCS_CREATE:
991 masterconn_create(eptr,data,length);
992 break;
993 case MATOCS_DELETE:
994 masterconn_delete(eptr,data,length);
995 break;
996 case MATOCS_SET_VERSION:
997 masterconn_setversion(eptr,data,length);
998 break;
999 case MATOCS_DUPLICATE:
1000 masterconn_duplicate(eptr,data,length);
1001 break;
1002 case MATOCS_REPLICATE:
1003 masterconn_replicate(eptr,data,length);
1004 break;
1005 case MATOCS_CHUNKOP:
1006 masterconn_chunkop(eptr,data,length);
1007 break;
1008 case MATOCS_TRUNCATE:
1009 masterconn_truncate(eptr,data,length);
1010 break;
1011 case MATOCS_DUPTRUNC:
1012 masterconn_duptrunc(eptr,data,length);
1013 break;
1014 case ANTOCS_GET_CHUNK_BLOCKS:
1015 masterconn_get_chunk_blocks(eptr,data,length);
1016 break;
1017 case ANTOCS_GET_CHUNK_CHECKSUM:
1018 masterconn_get_chunk_checksum(eptr,data,length);
1019 break;
1020 case ANTOCS_GET_CHUNK_CHECKSUM_TAB:
1021 masterconn_get_chunk_checksum_tab(eptr,data,length);
1022 break;
1023 case MATOCS_MASTER_ACK:
1024 eptr->masteraddrvalid = 1;
1025 eptr->new_register_mode = 3;
1026 masterconn_master_ack(eptr,data,length);
1027 break;
1028 default:
1029 syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
1030 eptr->mode = KILL;
1031 }
1032 }
1033
1034
masterconn_connected(masterconn * eptr)1035 void masterconn_connected(masterconn *eptr) {
1036 double now;
1037
1038 now = monotonic_seconds();
1039
1040 tcpnodelay(eptr->sock);
1041 eptr->mode = DATA;
1042 eptr->lastread = now;
1043 eptr->lastwrite = now;
1044 eptr->input_bytesleft = 8;
1045 eptr->input_startptr = eptr->input_hdr;
1046 eptr->input_end = 0;
1047 eptr->input_packet = NULL;
1048 eptr->inputhead = NULL;
1049 eptr->inputtail = &(eptr->inputhead);
1050 eptr->outputhead = NULL;
1051 eptr->outputtail = &(eptr->outputhead);
1052 eptr->conncnt++;
1053 eptr->masterversion = 0;
1054 eptr->registerstate = UNREGISTERED;
1055
1056 masterconn_sendregister(eptr);
1057 }
1058
masterconn_initconnect(masterconn * eptr)1059 int masterconn_initconnect(masterconn *eptr) {
1060 int status;
1061 if (eptr->masteraddrvalid==0) {
1062 uint32_t mip,bip;
1063 uint16_t mport;
1064 if (tcpresolve(BindHost,NULL,&bip,NULL,1)<0) {
1065 bip = 0;
1066 }
1067 eptr->bindip = bip;
1068 if (tcpresolve(MasterHost,MasterPort,&mip,&mport,0)>=0) {
1069 if ((mip&0xFF000000)!=0x7F000000) {
1070 // eptr->new_register_mode = 3;
1071 eptr->masterip = mip;
1072 eptr->masterport = mport;
1073 } else {
1074 mfs_arg_syslog(LOG_WARNING,"master connection module: localhost (%u.%u.%u.%u) can't be used for connecting with master (use ip address of network controller)",(mip>>24)&0xFF,(mip>>16)&0xFF,(mip>>8)&0xFF,mip&0xFF);
1075 return -1;
1076 }
1077 } else {
1078 mfs_arg_syslog(LOG_WARNING,"master connection module: can't resolve master host/port (%s:%s)",MasterHost,MasterPort);
1079 return -1;
1080 }
1081 }
1082 eptr->masteraddrvalid = 0;
1083 eptr->sock=tcpsocket();
1084 if (eptr->sock<0) {
1085 mfs_errlog(LOG_WARNING,"master connection module: create socket error");
1086 return -1;
1087 }
1088 if (tcpnonblock(eptr->sock)<0) {
1089 mfs_errlog(LOG_WARNING,"master connection module: set nonblock error");
1090 tcpclose(eptr->sock);
1091 eptr->sock = -1;
1092 return -1;
1093 }
1094 if (eptr->bindip>0) {
1095 if (tcpnumbind(eptr->sock,eptr->bindip,0)<0) {
1096 mfs_errlog(LOG_WARNING,"master connection module: can't bind socket to given ip");
1097 tcpclose(eptr->sock);
1098 eptr->sock = -1;
1099 return -1;
1100 }
1101 }
1102 status = tcpnumconnect(eptr->sock,eptr->masterip,eptr->masterport);
1103 if (status<0) {
1104 mfs_errlog(LOG_WARNING,"master connection module: connect failed");
1105 tcpclose(eptr->sock);
1106 eptr->sock = -1;
1107 return -1;
1108 }
1109 if (status==0) {
1110 syslog(LOG_NOTICE,"connected to Master immediately");
1111 masterconn_connected(eptr);
1112 } else {
1113 eptr->mode = CONNECTING;
1114 eptr->conntime = monotonic_seconds();
1115 syslog(LOG_NOTICE,"connecting ...");
1116 }
1117 return 0;
1118 }
1119
masterconn_connecttimeout(masterconn * eptr)1120 void masterconn_connecttimeout(masterconn *eptr) {
1121 syslog(LOG_WARNING,"connection timed out");
1122 tcpclose(eptr->sock);
1123 eptr->sock = -1;
1124 eptr->mode = FREE;
1125 eptr->masteraddrvalid = 0;
1126 }
1127
masterconn_connecttest(masterconn * eptr)1128 void masterconn_connecttest(masterconn *eptr) {
1129 int status;
1130
1131 status = tcpgetstatus(eptr->sock);
1132 if (status) {
1133 mfs_errlog_silent(LOG_WARNING,"connection failed, error");
1134 tcpclose(eptr->sock);
1135 eptr->sock = -1;
1136 eptr->mode = FREE;
1137 eptr->masteraddrvalid = 0;
1138 } else {
1139 syslog(LOG_NOTICE,"connected to Master");
1140 masterconn_connected(eptr);
1141 }
1142 }
1143
masterconn_read(masterconn * eptr,double now)1144 void masterconn_read(masterconn *eptr,double now) {
1145 int32_t i;
1146 uint32_t type,leng;
1147 const uint8_t *ptr;
1148 uint32_t rbleng,rbpos;
1149 uint8_t err,hup;
1150 static uint8_t *readbuff = NULL;
1151 static uint32_t readbuffsize = 0;
1152
1153 if (eptr == NULL) {
1154 if (readbuff != NULL) {
1155 free(readbuff);
1156 }
1157 readbuff = NULL;
1158 readbuffsize = 0;
1159 return;
1160 }
1161
1162 if (readbuffsize==0) {
1163 readbuffsize = 65536;
1164 readbuff = malloc(readbuffsize);
1165 passert(readbuff);
1166 }
1167
1168 rbleng = 0;
1169 err = 0;
1170 hup = 0;
1171 for (;;) {
1172 i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
1173 if (i==0) {
1174 hup = 1;
1175 break;
1176 } else if (i<0) {
1177 if (ERRNO_ERROR) {
1178 err = 1;
1179 }
1180 break;
1181 } else {
1182 stats_bytesin+=i;
1183 rbleng += i;
1184 if (rbleng==readbuffsize) {
1185 readbuffsize*=2;
1186 readbuff = realloc(readbuff,readbuffsize);
1187 passert(readbuff);
1188 } else {
1189 break;
1190 }
1191 }
1192 }
1193
1194 if (rbleng>0) {
1195 eptr->lastread = now;
1196 }
1197
1198 rbpos = 0;
1199 while (rbpos<rbleng) {
1200 if ((rbleng-rbpos)>=eptr->input_bytesleft) {
1201 memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
1202 i = eptr->input_bytesleft;
1203 } else {
1204 memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
1205 i = rbleng-rbpos;
1206 }
1207 rbpos += i;
1208 eptr->input_startptr+=i;
1209 eptr->input_bytesleft-=i;
1210
1211 if (eptr->input_bytesleft>0) {
1212 break;
1213 }
1214
1215 if (eptr->input_packet == NULL) {
1216 ptr = eptr->input_hdr;
1217 type = get32bit(&ptr);
1218 leng = get32bit(&ptr);
1219
1220 if (leng>MaxPacketSize) {
1221 syslog(LOG_WARNING,"Master packet too long (%"PRIu32"/%u)",leng,MaxPacketSize);
1222 eptr->input_end = 1;
1223 return;
1224 }
1225
1226 eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
1227 passert(eptr->input_packet);
1228 eptr->input_packet->next = NULL;
1229 eptr->input_packet->type = type;
1230 eptr->input_packet->leng = leng;
1231
1232 eptr->input_startptr = eptr->input_packet->data;
1233 eptr->input_bytesleft = leng;
1234 }
1235
1236 if (eptr->input_bytesleft>0) {
1237 continue;
1238 }
1239
1240 if (eptr->input_packet != NULL) {
1241 *(eptr->inputtail) = eptr->input_packet;
1242 eptr->inputtail = &(eptr->input_packet->next);
1243 eptr->input_packet = NULL;
1244 eptr->input_bytesleft = 8;
1245 eptr->input_startptr = eptr->input_hdr;
1246 }
1247 }
1248
1249 if (hup) {
1250 syslog(LOG_NOTICE,"connection was reset by Master");
1251 eptr->input_end = 1;
1252 } else if (err) {
1253 mfs_errlog_silent(LOG_NOTICE,"read from Master error");
1254 eptr->input_end = 1;
1255 }
1256 }
1257
masterconn_parse(masterconn * eptr)1258 void masterconn_parse(masterconn *eptr) {
1259 in_packetstruct *ipack;
1260 uint64_t starttime;
1261 uint64_t currtime;
1262
1263 starttime = monotonic_useconds();
1264 currtime = starttime;
1265 while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
1266 masterconn_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
1267 eptr->inputhead = ipack->next;
1268 free(ipack);
1269 if (eptr->inputhead==NULL) {
1270 eptr->inputtail = &(eptr->inputhead);
1271 } else {
1272 currtime = monotonic_useconds();
1273 }
1274 }
1275 if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
1276 eptr->mode = KILL;
1277 }
1278 }
1279
masterconn_write(masterconn * eptr,double now)1280 void masterconn_write(masterconn *eptr,double now) {
1281 out_packetstruct *opack;
1282 int32_t i;
1283 #ifdef HAVE_WRITEV
1284 struct iovec iovtab[100];
1285 uint32_t iovdata;
1286 uint32_t leng;
1287 uint32_t left;
1288
1289 for (;;) {
1290 leng = 0;
1291 for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
1292 iovtab[iovdata].iov_base = opack->startptr;
1293 iovtab[iovdata].iov_len = opack->bytesleft;
1294 leng += opack->bytesleft;
1295 }
1296 if (iovdata==0) {
1297 return;
1298 }
1299 i = writev(eptr->sock,iovtab,iovdata);
1300 if (i<0) {
1301 if (ERRNO_ERROR) {
1302 mfs_errlog_silent(LOG_NOTICE,"write to Master error");
1303 eptr->mode = KILL;
1304 }
1305 return;
1306 }
1307 if (i>0) {
1308 eptr->lastwrite = now;
1309 }
1310 stats_bytesout+=i;
1311 left = i;
1312 while (left>0 && eptr->outputhead!=NULL) {
1313 opack = eptr->outputhead;
1314 if (opack->bytesleft>left) {
1315 opack->startptr+=left;
1316 opack->bytesleft-=left;
1317 left = 0;
1318 } else {
1319 left -= opack->bytesleft;
1320 eptr->outputhead = opack->next;
1321 if (eptr->outputhead==NULL) {
1322 eptr->outputtail = &(eptr->outputhead);
1323 }
1324 free(opack);
1325 }
1326 }
1327 if ((uint32_t)i < leng) {
1328 return;
1329 }
1330 }
1331 #else
1332 for (;;) {
1333 opack = eptr->outputhead;
1334 if (opack==NULL) {
1335 return;
1336 }
1337 i=write(eptr->sock,opack->startptr,opack->bytesleft);
1338 if (i<0) {
1339 if (ERRNO_ERROR) {
1340 mfs_errlog_silent(LOG_NOTICE,"write to Master error");
1341 eptr->mode = KILL;
1342 }
1343 return;
1344 }
1345 if (i>0) {
1346 eptr->lastwrite = now;
1347 }
1348 stats_bytesout+=i;
1349 opack->startptr+=i;
1350 opack->bytesleft-=i;
1351 if (opack->bytesleft>0) {
1352 return;
1353 }
1354 eptr->outputhead = opack->next;
1355 if (eptr->outputhead==NULL) {
1356 eptr->outputtail = &(eptr->outputhead);
1357 }
1358 free(opack);
1359 }
1360 #endif
1361 }
1362
1363
masterconn_desc(struct pollfd * pdesc,uint32_t * ndesc)1364 void masterconn_desc(struct pollfd *pdesc,uint32_t *ndesc) {
1365 uint32_t pos = *ndesc;
1366 masterconn *eptr = masterconnsingleton;
1367
1368 eptr->pdescpos = -1;
1369 if (eptr->mode==FREE || eptr->sock<0) {
1370 return;
1371 }
1372 pdesc[pos].events = 0;
1373 if (eptr->mode==DATA && eptr->input_end==0) {
1374 pdesc[pos].events |= POLLIN;
1375 }
1376 if ((eptr->mode==DATA && eptr->outputhead!=NULL) || eptr->mode==CONNECTING) {
1377 pdesc[pos].events |= POLLOUT;
1378 }
1379 if (pdesc[pos].events!=0) {
1380 pdesc[pos].fd = eptr->sock;
1381 eptr->pdescpos = pos;
1382 pos++;
1383 }
1384 *ndesc = pos;
1385 }
1386
masterconn_disconnection_check(void)1387 void masterconn_disconnection_check(void) {
1388 masterconn *eptr = masterconnsingleton;
1389 in_packetstruct *ipptr,*ipaptr;
1390 out_packetstruct *opptr,*opaptr;
1391 idlejob *ij,*nij;
1392
1393 if (eptr->mode == KILL || eptr->mode == CLOSE) {
1394 // masterconn_beforeclose(eptr);
1395 tcpclose(eptr->sock);
1396 if (eptr->input_packet) {
1397 free(eptr->input_packet);
1398 }
1399 ipptr = eptr->inputhead;
1400 while (ipptr) {
1401 ipaptr = ipptr;
1402 ipptr = ipptr->next;
1403 free(ipaptr);
1404 }
1405 opptr = eptr->outputhead;
1406 while (opptr) {
1407 opaptr = opptr;
1408 opptr = opptr->next;
1409 free(opaptr);
1410 }
1411 for (ij=idlejobs ; ij ; ij=nij) {
1412 nij = ij->next;
1413 job_pool_disable_job(ij->jobid);
1414 ij->next = NULL;
1415 ij->prev = NULL;
1416 ij->valid = 0;
1417 }
1418 idlejobs = NULL;
1419 if (eptr->registerstate == INPROGRESS) {
1420 hdd_get_chunks_end();
1421 }
1422 if (eptr->registerstate == UNREGISTERED && eptr->mode==KILL) {
1423 if (eptr->new_register_mode>0) {
1424 eptr->new_register_mode--;
1425 } else {
1426 eptr->new_register_mode=3;
1427 }
1428 if (eptr->new_register_mode==0) {
1429 eptr->masteraddrvalid = 1; // switch to old register mode and try again using same address
1430 } else {
1431 eptr->masteraddrvalid = 0; // in new register mode always resolve master address
1432 }
1433 }
1434 eptr->mode = FREE;
1435 }
1436 }
1437
masterconn_serve(struct pollfd * pdesc)1438 void masterconn_serve(struct pollfd *pdesc) {
1439 double now;
1440 masterconn *eptr = masterconnsingleton;
1441
1442 now = monotonic_seconds();
1443
1444 if (eptr->mode==CONNECTING) {
1445 if (eptr->sock>=0 && eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLOUT | POLLHUP | POLLERR))) { // FD_ISSET(eptr->sock,wset)) {
1446 masterconn_connecttest(eptr);
1447 } else if (eptr->conntime+1.0 < now) {
1448 masterconn_connecttimeout(eptr);
1449 }
1450 } else {
1451 if (eptr->pdescpos>=0) {
1452 if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode==DATA) {
1453 masterconn_read(eptr,now);
1454 }
1455 if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1456 syslog(LOG_NOTICE,"masterconn: connection closed by master");
1457 eptr->input_end = 1;
1458 }
1459 masterconn_parse(eptr);
1460 }
1461 if (eptr->mode==DATA && eptr->lastwrite+(eptr->timeout/3.0)<now && eptr->outputhead==NULL) {
1462 masterconn_create_attached_packet(eptr,ANTOAN_NOP,0);
1463 }
1464 if (eptr->pdescpos>=0) {
1465 if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode==DATA) {
1466 masterconn_write(eptr,now);
1467 }
1468 }
1469 if (eptr->mode==DATA && eptr->lastread+eptr->timeout<now) {
1470 syslog(LOG_NOTICE,"masterconn: connection timed out");
1471 eptr->mode = KILL;
1472 }
1473 }
1474 if (wantexittime>0.0 && wantexittime+FORCE_DISCONNECTION_TO < now) {
1475 eptr->mode = KILL;
1476 }
1477 masterconn_disconnection_check();
1478 }
1479
masterconn_reconnect(void)1480 void masterconn_reconnect(void) {
1481 masterconn *eptr = masterconnsingleton;
1482 if (eptr->mode==FREE && wantexittime==0.0) {
1483 masterconn_initconnect(eptr);
1484 }
1485 }
1486
masterconn_term(void)1487 void masterconn_term(void) {
1488 masterconn *eptr = masterconnsingleton;
1489 in_packetstruct *ipptr,*ipaptr;
1490 out_packetstruct *opptr,*opaptr;
1491
1492 if (eptr->mode!=FREE) {
1493 tcpclose(eptr->sock);
1494 if (eptr->mode!=CONNECTING) {
1495 if (eptr->input_packet) {
1496 free(eptr->input_packet);
1497 }
1498 ipptr = eptr->inputhead;
1499 while (ipptr) {
1500 ipaptr = ipptr;
1501 ipptr = ipptr->next;
1502 free(ipaptr);
1503 }
1504 opptr = eptr->outputhead;
1505 while (opptr) {
1506 opaptr = opptr;
1507 opptr = opptr->next;
1508 free(opaptr);
1509 }
1510 }
1511 }
1512
1513 masterconn_read(NULL,0.0); // free internal read buffer
1514
1515 free(eptr);
1516
1517 free(MasterHost);
1518 free(MasterPort);
1519 free(BindHost);
1520 masterconnsingleton = NULL;
1521 }
1522
masterconn_reload(void)1523 void masterconn_reload(void) {
1524 masterconn *eptr = masterconnsingleton;
1525 uint32_t ReconnectionDelay;
1526
1527 free(MasterHost);
1528 free(MasterPort);
1529 free(BindHost);
1530
1531 MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1532 MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CS_PORT);
1533 BindHost = cfg_getstr("BIND_HOST","*");
1534
1535 eptr->masteraddrvalid = 0;
1536 if (eptr->mode!=FREE) {
1537 eptr->mode = KILL;
1538 }
1539 Timeout = cfg_getuint32("MASTER_TIMEOUT",0);
1540
1541 ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1542
1543 if (Timeout>65535) {
1544 Timeout=65535;
1545 }
1546 if (Timeout<10 && Timeout>0) {
1547 Timeout=10;
1548 }
1549
1550 main_time_change(reconnect_hook,ReconnectionDelay,0);
1551 }
1552
masterconn_wantexit(void)1553 void masterconn_wantexit(void) {
1554 wantexittime = monotonic_seconds();
1555 }
1556
masterconn_init(void)1557 int masterconn_init(void) {
1558 uint32_t ReconnectionDelay;
1559 masterconn *eptr;
1560
1561 masterconn_initcsid();
1562
1563 manager_time_hook = NULL;
1564
1565 ReconnectionDelay = cfg_getuint32("MASTER_RECONNECTION_DELAY",5);
1566 MasterHost = cfg_getstr("MASTER_HOST",DEFAULT_MASTERNAME);
1567 MasterPort = cfg_getstr("MASTER_PORT",DEFAULT_MASTER_CS_PORT);
1568 BindHost = cfg_getstr("BIND_HOST","*");
1569 Timeout = cfg_getuint32("MASTER_TIMEOUT",0);
1570 // BackLogsNumber = cfg_getuint32("BACK_LOGS",50);
1571
1572 if (Timeout>65535) {
1573 Timeout=65535;
1574 }
1575 if (Timeout<10 && Timeout>0) {
1576 Timeout=10;
1577 }
1578 eptr = masterconnsingleton = malloc(sizeof(masterconn));
1579 passert(eptr);
1580
1581 eptr->masteraddrvalid = 0;
1582 eptr->new_register_mode = 3;
1583 eptr->masterversion = 0;
1584 eptr->mode = FREE;
1585 eptr->pdescpos = -1;
1586 eptr->conncnt = 0;
1587 if (Timeout>0) {
1588 eptr->timeout = Timeout;
1589 } else {
1590 eptr->timeout = 10;
1591 }
1592 // logfd = NULL;
1593
1594 wantexittime = 0.0;
1595
1596 if (masterconn_initconnect(eptr)<0) {
1597 return -1;
1598 }
1599
1600 main_time_register(REPORT_LOAD_FREQ,0,masterconn_reportload);
1601 main_time_register(REPORT_SPACE_FREQ,0,masterconn_check_hdd_space);
1602 main_eachloop_register(masterconn_check_hdd_reports);
1603 reconnect_hook = main_time_register(ReconnectionDelay,rndu32_ranged(ReconnectionDelay),masterconn_reconnect);
1604 main_destruct_register(masterconn_term);
1605 main_poll_register(masterconn_desc,masterconn_serve);
1606 main_wantexit_register(masterconn_wantexit);
1607 // main_canexit_register(masterconn_canexit);
1608 main_reload_register(masterconn_reload);
1609 return 0;
1610 }
1611