1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <stddef.h>
26 #include <time.h>
27 #include <sys/types.h>
28 #include <sys/time.h>
29 #include <sys/uio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #ifdef HAVE_WRITEV
40 #include <sys/uio.h>
41 #endif
42
43 #include "MFSCommunication.h"
44
45 #include "datapack.h"
46 #include "matomlserv.h"
47 #include "changelog.h"
48 #include "metadata.h"
49 #include "crc.h"
50 #include "cfg.h"
51 #include "main.h"
52 #include "sizestr.h"
53 #include "sockets.h"
54 #include "slogger.h"
55 #include "massert.h"
56 #include "clocks.h"
57 #include "mfsalloc.h"
58
59 #define MaxPacketSize ANTOMA_MAXPACKETSIZE
60
61
62 #define OLD_CHANGES_GROUP_COUNT 10000
63
64 // matomlserventry.mode
65 enum{KILL,DATA,CLOSE};
66
67 // matomlserventry.clienttype
68 enum{UNKNOWN,METALOGGER};
69
70 // matomlserventry.logstate
71 enum{NONE,DELAYED,SYNC};
72
73 typedef struct out_packetstruct {
74 struct out_packetstruct *next;
75 uint8_t *startptr;
76 uint32_t bytesleft;
77 uint8_t data[1];
78 } out_packetstruct;
79
80 typedef struct in_packetstruct {
81 struct in_packetstruct *next;
82 uint32_t type,leng;
83 uint8_t data[1];
84 } in_packetstruct;
85
86 typedef struct matomlserventry {
87 uint8_t mode;
88 int sock;
89 int32_t pdescpos;
90 double lastread,lastwrite;
91 uint8_t input_hdr[8];
92 uint8_t *input_startptr;
93 uint32_t input_bytesleft;
94 uint8_t input_end;
95 in_packetstruct *input_packet;
96 in_packetstruct *inputhead,**inputtail;
97 out_packetstruct *outputhead,**outputtail;
98
99 uint16_t timeout;
100 uint64_t next_log_version;
101
102 char *servstrip; // human readable version of servip
103 uint32_t version;
104 uint32_t servip;
105 uint8_t clienttype;
106 uint8_t logstate;
107
108
109 int upload_meta_fd;
110 int upload_chain1_fd;
111 int upload_chain2_fd;
112
113 struct matomlserventry *next;
114 } matomlserventry;
115
116 static matomlserventry *matomlservhead=NULL;
117 static int lsock;
118 static int32_t lsockpdescpos;
119
120 /*
121 typedef struct old_changes_entry {
122 uint64_t version;
123 uint32_t length;
124 uint8_t *data;
125 } old_changes_entry;
126
127 typedef struct old_changes_block {
128 old_changes_entry old_changes_block [OLD_CHANGES_BLOCK_SIZE];
129 uint32_t entries;
130 uint32_t mintimestamp;
131 uint64_t minversion;
132 struct old_changes_block *next;
133 } old_changes_block;
134
135 static old_changes_block *old_changes_head=NULL;
136 static old_changes_block *old_changes_current=NULL;
137 */
138
139 // from config
140 static char *ListenHost;
141 static char *ListenPort;
142 static uint32_t listenip;
143 static uint16_t listenport;
144
145
146 static uint32_t BackMetaCopies;
147
148
149 // static uint16_t ChangelogSecondsToRemember;
150
151 /*
152 void matomlserv_old_changes_free_block(old_changes_block *oc) {
153 uint32_t i;
154 for (i=0 ; i<oc->entries ; i++) {
155 free(oc->old_changes_block[i].data);
156 }
157 free(oc);
158 }
159
160 void matomlserv_store_logstring(uint64_t version,uint8_t *logstr,uint32_t logstrsize) {\
161 old_changes_block *oc;
162 old_changes_entry *oce;
163 uint32_t ts;
164 if (ChangelogSecondsToRemember==0) {
165 while (old_changes_head) {
166 oc = old_changes_head->next;
167 matomlserv_old_changes_free_block(old_changes_head);
168 old_changes_head = oc;
169 }
170 return;
171 }
172 if (old_changes_current==NULL || old_changes_head==NULL || old_changes_current->entries>=OLD_CHANGES_BLOCK_SIZE) {
173 oc = malloc(sizeof(old_changes_block));
174 passert(oc);
175 ts = main_time();
176 oc->entries = 0;
177 oc->minversion = version;
178 oc->mintimestamp = ts;
179 oc->next = NULL;
180 if (old_changes_current==NULL || old_changes_head==NULL) {
181 old_changes_head = old_changes_current = oc;
182 } else {
183 old_changes_current->next = oc;
184 old_changes_current = oc;
185 }
186 while (old_changes_head && old_changes_head->next && old_changes_head->next->mintimestamp+ChangelogSecondsToRemember<ts) {
187 oc = old_changes_head->next;
188 matomlserv_old_changes_free_block(old_changes_head);
189 old_changes_head = oc;
190 }
191 }
192 oc = old_changes_current;
193 oce = oc->old_changes_block + oc->entries;
194 oce->version = version;
195 oce->length = logstrsize;
196 oce->data = malloc(logstrsize);
197 passert(oce->data);
198 memcpy(oce->data,logstr,logstrsize);
199 oc->entries++;
200 }
201 */
202
203
matomlserv_clientname(matomlserventry * eptr)204 static inline const char* matomlserv_clientname(matomlserventry *eptr) {
205 switch (eptr->clienttype) {
206 case METALOGGER:
207 switch (eptr->logstate) {
208 case DELAYED:
209 return "METALOGGER-DELAYED";
210 case SYNC:
211 return "METALOGGER-SYNC";
212 default:
213 return "METALOGGER";
214 }
215 }
216 return "UNKNOWN";
217 }
218
matomlserv_mloglist_size(void)219 uint32_t matomlserv_mloglist_size(void) {
220 matomlserventry *eptr;
221 uint32_t i;
222 i=0;
223 for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
224 if (eptr->mode!=KILL && eptr->mode!=CLOSE && eptr->clienttype==METALOGGER) {
225 i++;
226 }
227 }
228 return i*(4+4);
229 }
230
matomlserv_mloglist_data(uint8_t * ptr)231 void matomlserv_mloglist_data(uint8_t *ptr) {
232 matomlserventry *eptr;
233 for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
234 if (eptr->mode!=KILL && eptr->mode!=CLOSE && eptr->clienttype==METALOGGER) {
235 put32bit(&ptr,eptr->version);
236 put32bit(&ptr,eptr->servip);
237 }
238 }
239 }
240
241
matomlserv_status(void)242 void matomlserv_status(void) {
243 matomlserventry *eptr;
244 for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
245 if (eptr->mode==DATA) {
246 return;
247 }
248 }
249 syslog(LOG_WARNING,"no metaloggers connected !!!");
250 }
251
matomlserv_makestrip(uint32_t ip)252 char* matomlserv_makestrip(uint32_t ip) {
253 uint8_t *ptr,pt[4];
254 uint32_t l,i;
255 char *optr;
256 ptr = pt;
257 put32bit(&ptr,ip);
258 l=0;
259 for (i=0 ; i<4 ; i++) {
260 if (pt[i]>=100) {
261 l+=3;
262 } else if (pt[i]>=10) {
263 l+=2;
264 } else {
265 l+=1;
266 }
267 }
268 l+=4;
269 optr = malloc(l);
270 passert(optr);
271 snprintf(optr,l,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,pt[0],pt[1],pt[2],pt[3]);
272 optr[l-1]=0;
273 return optr;
274 }
275
matomlserv_createpacket(matomlserventry * eptr,uint32_t type,uint32_t size)276 uint8_t* matomlserv_createpacket(matomlserventry *eptr,uint32_t type,uint32_t size) {
277 out_packetstruct *outpacket;
278 uint8_t *ptr;
279 uint32_t psize;
280
281 psize = size+8;
282 outpacket = malloc(offsetof(out_packetstruct,data)+psize);
283 #ifndef __clang_analyzer__
284 passert(outpacket);
285 // clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
286 #endif
287 outpacket->bytesleft = psize;
288 ptr = outpacket->data;
289 put32bit(&ptr,type);
290 put32bit(&ptr,size);
291 outpacket->startptr = outpacket->data;
292 outpacket->next = NULL;
293 *(eptr->outputtail) = outpacket;
294 eptr->outputtail = &(outpacket->next);
295 return ptr;
296 }
297
matomlserv_send_old_change(void * veptr,uint64_t version,uint8_t * data,uint32_t length)298 void matomlserv_send_old_change(void *veptr,uint64_t version,uint8_t *data,uint32_t length) {
299 matomlserventry *eptr = (matomlserventry *)veptr;
300 uint8_t *pdata;
301
302 pdata = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+length);
303 put8bit(&pdata,0xFF);
304 put64bit(&pdata,version);
305 memcpy(pdata,data,length);
306 }
307
308 /*
309 void matomlserv_send_old_changes(matomlserventry *eptr,uint64_t version) {
310 uint64_t minver = changelog_get_minversion();
311 if (minver==0) {
312 // syslog(LOG_WARNING,"meta logger wants old changes, but storage is disabled");
313 return;
314 }
315 if (version<minver) {
316 syslog(LOG_WARNING,"meta logger wants changes since version: %"PRIu64", but minimal version in storage is: %"PRIu64,version,minver);
317 return;
318 }
319 changelog_get_old_changes(version,matomlserv_send_old_change,eptr);
320 }
321 */
322 /*
323 void matomlserv_send_old_changes(matomlserventry *eptr,uint64_t version) {
324 old_changes_block *oc;
325 old_changes_entry *oce;
326 uint8_t *data;
327 uint8_t start=0;
328 uint32_t i;
329 if (old_changes_head==NULL) {
330 // syslog(LOG_WARNING,"meta logger wants old changes, but storage is disabled");
331 return;
332 }
333 if (old_changes_head->minversion>version) {
334 syslog(LOG_WARNING,"meta logger wants changes since version: %"PRIu64", but minimal version in storage is: %"PRIu64,version,old_changes_head->minversion);
335 return;
336 }
337 for (oc=old_changes_head ; oc ; oc=oc->next) {
338 if (oc->minversion<=version && (oc->next==NULL || oc->next->minversion>version)) {
339 start=1;
340 }
341 if (start) {
342 for (i=0 ; i<oc->entries ; i++) {
343 oce = oc->old_changes_block + i;
344 if (version>=oce->version) {
345 data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+oce->length);
346 put8bit(&data,0xFF);
347 put64bit(&data,oce->version);
348 memcpy(data,oce->data,oce->length);
349 }
350 }
351 }
352 }
353 }
354 */
355
356
matomlserv_get_version(matomlserventry * eptr,const uint8_t * data,uint32_t length)357 void matomlserv_get_version(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
358 uint32_t msgid = 0;
359 uint8_t *ptr;
360 static const char vstring[] = VERSSTR;
361 if (length!=0 && length!=4) {
362 syslog(LOG_NOTICE,"ANTOAN_GET_VERSION - wrong size (%"PRIu32"/4|0)",length);
363 eptr->mode = KILL;
364 return;
365 }
366 if (length==4) {
367 msgid = get32bit(&data);
368 ptr = matomlserv_createpacket(eptr,ANTOAN_VERSION,4+4+strlen(vstring));
369 put32bit(&ptr,msgid);
370 } else {
371 ptr = matomlserv_createpacket(eptr,ANTOAN_VERSION,4+strlen(vstring));
372 }
373 put16bit(&ptr,VERSMAJ);
374 put8bit(&ptr,VERSMID);
375 put8bit(&ptr,VERSMIN);
376 memcpy(ptr,vstring,strlen(vstring));
377 }
378
matomlserv_get_config(matomlserventry * eptr,const uint8_t * data,uint32_t length)379 void matomlserv_get_config(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
380 uint32_t msgid;
381 char name[256];
382 uint8_t nleng;
383 uint32_t vleng;
384 char *val;
385 uint8_t *ptr;
386
387 if (length<5) {
388 syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32")",length);
389 eptr->mode = KILL;
390 return;
391 }
392 msgid = get32bit(&data);
393 nleng = get8bit(&data);
394 if (length!=5U+(uint32_t)nleng) {
395 syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32":nleng=%"PRIu8")",length,nleng);
396 eptr->mode = KILL;
397 return;
398 }
399 memcpy(name,data,nleng);
400 name[nleng] = 0;
401 val = cfg_getstr(name,"");
402 vleng = strlen(val);
403 if (vleng>255) {
404 vleng=255;
405 }
406 ptr = matomlserv_createpacket(eptr,ANTOAN_CONFIG_VALUE,5+vleng);
407 put32bit(&ptr,msgid);
408 put8bit(&ptr,vleng);
409 memcpy(ptr,val,vleng);
410 }
411
matomlserv_register(matomlserventry * eptr,const uint8_t * data,uint32_t length)412 void matomlserv_register(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
413 uint8_t rversion;
414 uint64_t req_minversion,chlog_minversion;
415 uint32_t n;
416
417 if (eptr->version>0) {
418 syslog(LOG_WARNING,"got register message from registered metalogger !!!");
419 eptr->mode = KILL;
420 return;
421 }
422 if (length<1) {
423 syslog(LOG_NOTICE,"ANTOMA_REGISTER - wrong size (%"PRIu32")",length);
424 eptr->mode = KILL;
425 return;
426 } else {
427 rversion = get8bit(&data);
428 if (rversion==1) {
429 eptr->clienttype = METALOGGER;
430 if (length!=7) {
431 syslog(LOG_NOTICE,"ANTOMA_REGISTER (logger 1) - wrong size (%"PRIu32"/7)",length);
432 eptr->mode = KILL;
433 return;
434 }
435 eptr->version = get32bit(&data);
436 eptr->timeout = get16bit(&data);
437 eptr->logstate = SYNC;
438 } else if (rversion==2) {
439 eptr->clienttype = METALOGGER;
440 if (length!=7+8) {
441 syslog(LOG_NOTICE,"ANTOMA_REGISTER (logger 2) - wrong size (%"PRIu32"/15)",length);
442 eptr->mode = KILL;
443 return;
444 }
445 eptr->version = get32bit(&data);
446 eptr->timeout = get16bit(&data);
447 req_minversion = get64bit(&data);
448 chlog_minversion = changelog_get_minversion();
449 if (chlog_minversion>0 && chlog_minversion<=req_minversion) {
450 n = changelog_get_old_changes(req_minversion,matomlserv_send_old_change,eptr,OLD_CHANGES_GROUP_COUNT);
451 if (n<OLD_CHANGES_GROUP_COUNT) {
452 eptr->logstate = SYNC;
453 } else {
454 eptr->next_log_version = req_minversion+n;
455 eptr->logstate = DELAYED;
456 }
457 } else {
458 eptr->logstate = SYNC; // desync
459 }
460 } else if (rversion==3 || rversion==4) { // just ignore PRO components
461 eptr->mode = KILL;
462 return;
463 } else {
464 syslog(LOG_NOTICE,"ANTOMA_REGISTER - wrong version (%"PRIu8"/1)",rversion);
465 eptr->mode = KILL;
466 return;
467 }
468 if (eptr->timeout<10) {
469 syslog(LOG_NOTICE,"ANTOMA_REGISTER communication timeout too small (%"PRIu16" seconds - should be at least 10 seconds)",eptr->timeout);
470 if (eptr->timeout<3) {
471 eptr->timeout=3;
472 }
473 // eptr->mode = KILL;
474 return;
475 }
476 }
477 }
478
479
matomlserv_download_start(matomlserventry * eptr,const uint8_t * data,uint32_t length)480 void matomlserv_download_start(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
481 uint8_t filenum;
482 uint64_t size;
483 uint8_t *ptr;
484 if (length!=1) {
485 syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_START - wrong size (%"PRIu32"/1)",length);
486 eptr->mode = KILL;
487 return;
488 }
489 filenum = get8bit(&data);
490 if (filenum==1 || filenum==2) {
491 if (eptr->upload_meta_fd>=0) {
492 close(eptr->upload_meta_fd);
493 eptr->upload_meta_fd=-1;
494 }
495 if (eptr->upload_chain1_fd>=0) {
496 close(eptr->upload_chain1_fd);
497 eptr->upload_chain1_fd=-1;
498 }
499 if (eptr->upload_chain2_fd>=0) {
500 close(eptr->upload_chain2_fd);
501 eptr->upload_chain2_fd=-1;
502 }
503 }
504 if (filenum==1) {
505 eptr->upload_meta_fd = open("metadata.mfs.back",O_RDONLY);
506 eptr->upload_chain1_fd = open("changelog.0.mfs",O_RDONLY);
507 eptr->upload_chain2_fd = open("changelog.1.mfs",O_RDONLY);
508 } else if (filenum==2) {
509 eptr->upload_meta_fd = open("sessions.mfs",O_RDONLY);
510 } else if (filenum==11) {
511 if (eptr->upload_meta_fd>=0) {
512 close(eptr->upload_meta_fd);
513 }
514 eptr->upload_meta_fd = eptr->upload_chain1_fd;
515 eptr->upload_chain1_fd = -1;
516 } else if (filenum==12) {
517 if (eptr->upload_meta_fd>=0) {
518 close(eptr->upload_meta_fd);
519 }
520 eptr->upload_meta_fd = eptr->upload_chain2_fd;
521 eptr->upload_chain2_fd = -1;
522 } else {
523 eptr->mode = KILL;
524 return;
525 }
526 if (eptr->upload_meta_fd<0) {
527 if (filenum==11 || filenum==12) {
528 ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,8);
529 put64bit(&ptr,0);
530 return;
531 } else {
532 ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,1);
533 put8bit(&ptr,0xff); // error
534 return;
535 }
536 }
537 size = lseek(eptr->upload_meta_fd,0,SEEK_END);
538 ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,8);
539 put64bit(&ptr,size); // ok
540 }
541
matomlserv_download_request(matomlserventry * eptr,const uint8_t * data,uint32_t length)542 void matomlserv_download_request(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
543 uint8_t *ptr;
544 uint64_t offset;
545 uint32_t leng;
546 uint32_t crc;
547 ssize_t ret;
548
549 if (length!=12) {
550 syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_REQUEST - wrong size (%"PRIu32"/12)",length);
551 eptr->mode = KILL;
552 return;
553 }
554 if (eptr->upload_meta_fd<0) {
555 syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_REQUEST - file not opened");
556 eptr->mode = KILL;
557 return;
558 }
559 offset = get64bit(&data);
560 leng = get32bit(&data);
561 ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_DATA,16+leng);
562 put64bit(&ptr,offset);
563 put32bit(&ptr,leng);
564 #ifdef HAVE_PREAD
565 ret = pread(eptr->upload_meta_fd,ptr+4,leng,offset);
566 #else /* HAVE_PWRITE */
567 lseek(eptr->upload_meta_fd,offset,SEEK_SET);
568 ret = read(eptr->upload_meta_fd,ptr+4,leng);
569 #endif /* HAVE_PWRITE */
570 if (ret!=(ssize_t)leng) {
571 mfs_errlog_silent(LOG_NOTICE,"error reading metafile");
572 eptr->mode = KILL;
573 return;
574 }
575 crc = mycrc32(0,ptr+4,leng);
576 put32bit(&ptr,crc);
577 }
578
matomlserv_download_end(matomlserventry * eptr,const uint8_t * data,uint32_t length)579 void matomlserv_download_end(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
580 (void)data;
581 if (length!=0) {
582 syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_END - wrong size (%"PRIu32"/0)",length);
583 eptr->mode = KILL;
584 return;
585 }
586 if (eptr->upload_meta_fd>=0) {
587 close(eptr->upload_meta_fd);
588 eptr->upload_meta_fd=-1;
589 }
590 }
591
592
matomlserv_broadcast_logstring(uint64_t version,uint8_t * logstr,uint32_t logstrsize)593 void matomlserv_broadcast_logstring(uint64_t version,uint8_t *logstr,uint32_t logstrsize) {
594 matomlserventry *eptr;
595 uint8_t *data;
596
597 for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
598 if (eptr->version>0 && eptr->clienttype==METALOGGER && eptr->logstate==SYNC) {
599 data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+logstrsize);
600 put8bit(&data,0xFF);
601 put64bit(&data,version);
602 memcpy(data,logstr,logstrsize);
603 }
604 }
605 }
606
matomlserv_broadcast_logrotate(void)607 void matomlserv_broadcast_logrotate(void) {
608 matomlserventry *eptr;
609 uint8_t *data;
610
611 for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
612 if (eptr->version>0 && eptr->clienttype==METALOGGER) {
613 data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,1);
614 put8bit(&data,0x55);
615 }
616 }
617 }
618
matomlserv_beforeclose(matomlserventry * eptr)619 void matomlserv_beforeclose(matomlserventry *eptr) {
620 if (eptr->upload_meta_fd>=0) {
621 close(eptr->upload_meta_fd);
622 eptr->upload_meta_fd=-1;
623 }
624 if (eptr->upload_chain1_fd>=0) {
625 close(eptr->upload_chain1_fd);
626 eptr->upload_chain1_fd=-1;
627 }
628 if (eptr->upload_chain2_fd>=0) {
629 close(eptr->upload_chain2_fd);
630 eptr->upload_chain2_fd=-1;
631 }
632 }
633
matomlserv_gotpacket(matomlserventry * eptr,uint32_t type,const uint8_t * data,uint32_t length)634 void matomlserv_gotpacket(matomlserventry *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
635 switch (type) {
636 case ANTOAN_NOP:
637 break;
638 case ANTOAN_UNKNOWN_COMMAND: // for future use
639 break;
640 case ANTOAN_BAD_COMMAND_SIZE: // for future use
641 break;
642 case ANTOAN_GET_VERSION:
643 matomlserv_get_version(eptr,data,length);
644 break;
645 case ANTOAN_GET_CONFIG:
646 matomlserv_get_config(eptr,data,length);
647 break;
648 case ANTOMA_REGISTER:
649 matomlserv_register(eptr,data,length);
650 break;
651 case ANTOMA_DOWNLOAD_START:
652 matomlserv_download_start(eptr,data,length);
653 break;
654 case ANTOMA_DOWNLOAD_REQUEST:
655 matomlserv_download_request(eptr,data,length);
656 break;
657 case ANTOMA_DOWNLOAD_END:
658 matomlserv_download_end(eptr,data,length);
659 break;
660 default:
661 syslog(LOG_NOTICE,"master control module: got unknown message (type:%"PRIu32")",type);
662 eptr->mode = KILL;
663 }
664 }
665
matomlserv_read(matomlserventry * eptr,double now)666 void matomlserv_read(matomlserventry *eptr,double now) {
667 int32_t i;
668 uint32_t type,leng;
669 const uint8_t *ptr;
670 uint32_t rbleng,rbpos;
671 uint8_t err,hup;
672 static uint8_t *readbuff = NULL;
673 static uint32_t readbuffsize = 0;
674
675 if (eptr == NULL) {
676 if (readbuff != NULL) {
677 free(readbuff);
678 }
679 readbuff = NULL;
680 readbuffsize = 0;
681 return;
682 }
683
684 if (readbuffsize==0) {
685 readbuffsize = 65536;
686 readbuff = malloc(readbuffsize);
687 passert(readbuff);
688 }
689
690 rbleng = 0;
691 err = 0;
692 hup = 0;
693 for (;;) {
694 i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
695 if (i==0) {
696 hup = 1;
697 break;
698 } else if (i<0) {
699 if (ERRNO_ERROR) {
700 err = 1;
701 }
702 break;
703 } else {
704 rbleng += i;
705 if (rbleng==readbuffsize) {
706 readbuffsize*=2;
707 readbuff = mfsrealloc(readbuff,readbuffsize);
708 passert(readbuff);
709 } else {
710 break;
711 }
712 }
713 }
714
715 if (rbleng>0) {
716 eptr->lastread = now;
717 }
718
719 rbpos = 0;
720 while (rbpos<rbleng) {
721 if ((rbleng-rbpos)>=eptr->input_bytesleft) {
722 memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
723 i = eptr->input_bytesleft;
724 } else {
725 memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
726 i = rbleng-rbpos;
727 }
728 rbpos += i;
729 eptr->input_startptr+=i;
730 eptr->input_bytesleft-=i;
731
732 if (eptr->input_bytesleft>0) {
733 break;
734 }
735
736 if (eptr->input_packet == NULL) {
737 ptr = eptr->input_hdr;
738 type = get32bit(&ptr);
739 leng = get32bit(&ptr);
740
741 if (leng>MaxPacketSize) {
742 syslog(LOG_WARNING,"ML(%s) packet too long (%"PRIu32"/%u) ; command:%"PRIu32,eptr->servstrip,leng,MaxPacketSize,type);
743 eptr->input_end = 1;
744 return;
745 }
746
747 eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
748 passert(eptr->input_packet);
749 eptr->input_packet->next = NULL;
750 eptr->input_packet->type = type;
751 eptr->input_packet->leng = leng;
752
753 eptr->input_startptr = eptr->input_packet->data;
754 eptr->input_bytesleft = leng;
755 }
756
757 if (eptr->input_bytesleft>0) {
758 continue;
759 }
760
761 if (eptr->input_packet != NULL) {
762 *(eptr->inputtail) = eptr->input_packet;
763 eptr->inputtail = &(eptr->input_packet->next);
764 eptr->input_packet = NULL;
765 eptr->input_bytesleft = 8;
766 eptr->input_startptr = eptr->input_hdr;
767 }
768 }
769
770 if (hup) {
771 syslog(LOG_NOTICE,"connection with %s(%s) has been closed by peer",matomlserv_clientname(eptr),eptr->servstrip);
772 eptr->input_end = 1;
773 } else if (err) {
774 mfs_arg_errlog_silent(LOG_NOTICE,"read from ML(%s) error",eptr->servstrip);
775 eptr->input_end = 1;
776 }
777 }
778
matomlserv_parse(matomlserventry * eptr)779 void matomlserv_parse(matomlserventry *eptr) {
780 in_packetstruct *ipack;
781 uint64_t starttime;
782 uint64_t currtime;
783
784 starttime = monotonic_useconds();
785 currtime = starttime;
786 while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
787 matomlserv_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
788 eptr->inputhead = ipack->next;
789 free(ipack);
790 if (eptr->inputhead==NULL) {
791 eptr->inputtail = &(eptr->inputhead);
792 } else {
793 currtime = monotonic_useconds();
794 }
795 }
796 if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
797 eptr->mode = KILL;
798 }
799 }
800
matomlserv_write(matomlserventry * eptr,double now)801 void matomlserv_write(matomlserventry *eptr,double now) {
802 out_packetstruct *opack;
803 int32_t i;
804 #ifdef HAVE_WRITEV
805 struct iovec iovtab[100];
806 uint32_t iovdata;
807 uint32_t leng;
808 uint32_t left;
809
810 for (;;) {
811 leng = 0;
812 for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
813 iovtab[iovdata].iov_base = opack->startptr;
814 iovtab[iovdata].iov_len = opack->bytesleft;
815 leng += opack->bytesleft;
816 }
817 if (iovdata==0) {
818 return;
819 }
820 i = writev(eptr->sock,iovtab,iovdata);
821 if (i<0) {
822 if (ERRNO_ERROR) {
823 mfs_arg_errlog_silent(LOG_NOTICE,"write to ML(%s) error",eptr->servstrip);
824 eptr->mode = KILL;
825 }
826 return;
827 }
828 if (i>0) {
829 eptr->lastwrite = now;
830 }
831 left = i;
832 while (left>0 && eptr->outputhead!=NULL) {
833 opack = eptr->outputhead;
834 if (opack->bytesleft>left) {
835 opack->startptr+=left;
836 opack->bytesleft-=left;
837 left = 0;
838 } else {
839 left -= opack->bytesleft;
840 eptr->outputhead = opack->next;
841 if (eptr->outputhead==NULL) {
842 eptr->outputtail = &(eptr->outputhead);
843 }
844 free(opack);
845 }
846 }
847 if ((uint32_t)i < leng) {
848 return;
849 }
850 }
851 #else
852 for (;;) {
853 opack = eptr->outputhead;
854 if (opack==NULL) {
855 return;
856 }
857 i=write(eptr->sock,opack->startptr,opack->bytesleft);
858 if (i<0) {
859 if (ERRNO_ERROR) {
860 mfs_arg_errlog_silent(LOG_NOTICE,"write to ML(%s) error",eptr->servstrip);
861 eptr->mode = KILL;
862 }
863 return;
864 }
865 if (i>0) {
866 eptr->lastwrite = now;
867 }
868 opack->startptr+=i;
869 opack->bytesleft-=i;
870 if (opack->bytesleft>0) {
871 return;
872 }
873 eptr->outputhead = opack->next;
874 if (eptr->outputhead==NULL) {
875 eptr->outputtail = &(eptr->outputhead);
876 }
877 free(opack);
878 }
879 #endif
880 }
881
matomlserv_desc(struct pollfd * pdesc,uint32_t * ndesc)882 void matomlserv_desc(struct pollfd *pdesc,uint32_t *ndesc) {
883 uint32_t pos = *ndesc;
884 int events;
885 matomlserventry *eptr;
886 pdesc[pos].fd = lsock;
887 pdesc[pos].events = POLLIN;
888 lsockpdescpos = pos;
889 pos++;
890 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
891 events = 0;
892 if (eptr->input_end==0) {
893 events |= POLLIN;
894 }
895 if (eptr->outputhead!=NULL) {
896 events |= POLLOUT;
897 }
898 if (events) {
899 pdesc[pos].fd = eptr->sock;
900 pdesc[pos].events = POLLIN;
901 eptr->pdescpos = pos;
902 pos++;
903 } else {
904 eptr->pdescpos = -1;
905 }
906 }
907 *ndesc = pos;
908 }
909
matomlserv_disconnection_loop(void)910 void matomlserv_disconnection_loop(void) {
911 matomlserventry *eptr,**kptr;
912 in_packetstruct *ipptr,*ipaptr;
913 out_packetstruct *opptr,*opaptr;
914
915 kptr = &matomlservhead;
916 while ((eptr=*kptr)) {
917 if (eptr->mode==KILL || eptr->mode==CLOSE) {
918 matomlserv_beforeclose(eptr);
919 if (eptr->mode==KILL) {
920 tcpclose(eptr->sock);
921 } else {
922 close(eptr->sock);
923 }
924 if (eptr->input_packet) {
925 free(eptr->input_packet);
926 }
927 ipptr = eptr->inputhead;
928 while (ipptr) {
929 ipaptr = ipptr;
930 ipptr = ipptr->next;
931 free(ipaptr);
932 }
933 opptr = eptr->outputhead;
934 while (opptr) {
935 opaptr = opptr;
936 opptr = opptr->next;
937 free(opaptr);
938 }
939 if (eptr->servstrip) {
940 free(eptr->servstrip);
941 }
942 *kptr = eptr->next;
943 free(eptr);
944 } else {
945 kptr = &(eptr->next);
946 }
947 }
948 }
949
matomlserv_serve(struct pollfd * pdesc)950 void matomlserv_serve(struct pollfd *pdesc) {
951 double now;
952 matomlserventry *eptr;
953 int ns;
954 static double lastaction = 0.0;
955 double timeoutadd;
956 uint32_t n;
957
958 now = monotonic_seconds();
959 // timeout fix
960 if (lastaction>0.0) {
961 timeoutadd = now-lastaction;
962 if (timeoutadd>1.0) {
963 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
964 eptr->lastread += timeoutadd;
965 }
966 }
967 }
968 lastaction = now;
969
970 if (lsockpdescpos>=0 && (pdesc[lsockpdescpos].revents & POLLIN)) {
971 ns=tcpaccept(lsock);
972 if (ns<0) {
973 mfs_errlog_silent(LOG_NOTICE,"Master<->ML socket: accept error");
974 } else {
975 tcpnonblock(ns);
976 tcpnodelay(ns);
977 eptr = malloc(sizeof(matomlserventry));
978 passert(eptr);
979 eptr->next = matomlservhead;
980 matomlservhead = eptr;
981 eptr->sock = ns;
982 eptr->pdescpos = -1;
983 eptr->mode = DATA;
984 eptr->lastread = now;
985 eptr->lastwrite = now;
986 eptr->input_bytesleft = 8;
987 eptr->input_startptr = eptr->input_hdr;
988 eptr->input_end = 0;
989 eptr->input_packet = NULL;
990 eptr->inputhead = NULL;
991 eptr->inputtail = &(eptr->inputhead);
992 eptr->outputhead = NULL;
993 eptr->outputtail = &(eptr->outputhead);
994 eptr->timeout = 10;
995
996 tcpgetpeer(eptr->sock,&(eptr->servip),NULL);
997 eptr->servstrip = matomlserv_makestrip(eptr->servip);
998 eptr->version = 0;
999 eptr->clienttype = UNKNOWN;
1000 eptr->logstate = NONE;
1001 eptr->upload_meta_fd = -1;
1002 eptr->upload_chain1_fd = -1;
1003 eptr->upload_chain2_fd = -1;
1004 }
1005 }
1006
1007 // read
1008 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1009 if (eptr->pdescpos>=0) {
1010 if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode!=KILL) {
1011 matomlserv_read(eptr,now);
1012 }
1013 if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1014 eptr->input_end = 1;
1015 }
1016 }
1017 matomlserv_parse(eptr);
1018 }
1019
1020 // write
1021 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1022 if ((eptr->lastwrite+(eptr->timeout/3.0))<now && eptr->outputhead==NULL && eptr->clienttype!=UNKNOWN) {
1023 matomlserv_createpacket(eptr,ANTOAN_NOP,0);
1024 }
1025 if (eptr->pdescpos>=0) {
1026 if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode!=KILL) {
1027 matomlserv_write(eptr,now);
1028 }
1029 }
1030 if ((eptr->lastread+eptr->timeout)<now) {
1031 eptr->mode = KILL;
1032 }
1033 if (eptr->logstate==DELAYED && eptr->outputhead==NULL) {
1034 n = changelog_get_old_changes(eptr->next_log_version,matomlserv_send_old_change,eptr,OLD_CHANGES_GROUP_COUNT);
1035 if (n<OLD_CHANGES_GROUP_COUNT) {
1036 eptr->logstate=SYNC;
1037 } else {
1038 eptr->next_log_version += n;
1039 }
1040 }
1041 }
1042 matomlserv_disconnection_loop();
1043 }
1044
matomlserv_keep_alive(void)1045 void matomlserv_keep_alive(void) {
1046 double now;
1047 matomlserventry *eptr;
1048
1049 now = monotonic_seconds();
1050 // read
1051 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1052 if (eptr->mode == DATA && eptr->input_end==0) {
1053 matomlserv_read(eptr,now);
1054 }
1055 }
1056 // write
1057 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1058 if ((eptr->lastwrite+(eptr->timeout/3.0))<now && eptr->outputhead==NULL && eptr->clienttype!=UNKNOWN) {
1059 matomlserv_createpacket(eptr,ANTOAN_NOP,0);
1060 }
1061 if (eptr->mode == DATA && eptr->outputhead) {
1062 matomlserv_write(eptr,now);
1063 }
1064 }
1065 }
1066
matomlserv_close_lsock(void)1067 void matomlserv_close_lsock(void) { // after fork
1068 if (lsock>=0) {
1069 close(lsock);
1070 }
1071 }
1072
matomlserv_term(void)1073 void matomlserv_term(void) {
1074 matomlserventry *eptr,*eaptr;
1075 in_packetstruct *ipptr,*ipaptr;
1076 out_packetstruct *opptr,*opaptr;
1077 syslog(LOG_INFO,"master control module: closing %s:%s",ListenHost,ListenPort);
1078 tcpclose(lsock);
1079
1080 eptr = matomlservhead;
1081 while (eptr) {
1082 if (eptr->input_packet) {
1083 free(eptr->input_packet);
1084 }
1085 ipptr = eptr->inputhead;
1086 while (ipptr) {
1087 ipaptr = ipptr;
1088 ipptr = ipptr->next;
1089 free(ipaptr);
1090 }
1091 opptr = eptr->outputhead;
1092 while (opptr) {
1093 opaptr = opptr;
1094 opptr = opptr->next;
1095 free(opaptr);
1096 }
1097 eaptr = eptr;
1098 eptr = eptr->next;
1099 free(eaptr);
1100 }
1101 matomlservhead=NULL;
1102
1103 matomlserv_read(NULL,0.0); // free internal read buffer
1104
1105 free(ListenHost);
1106 free(ListenPort);
1107 }
1108
matomlserv_no_more_pending_jobs(void)1109 int matomlserv_no_more_pending_jobs(void) {
1110 matomlserventry *eptr;
1111 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1112 if (eptr->outputhead!=NULL) {
1113 return 0;
1114 }
1115 }
1116 return 1;
1117 }
1118
matomlserv_disconnect_all(void)1119 void matomlserv_disconnect_all(void) {
1120 matomlserventry *eptr;
1121 for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1122 eptr->mode = KILL;
1123 }
1124 matomlserv_disconnection_loop();
1125 }
1126
matomlserv_getport(void)1127 uint16_t matomlserv_getport(void) {
1128 return listenport;
1129 }
1130
matomlserv_reload_common(void)1131 void matomlserv_reload_common(void) {
1132 BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",1);
1133 if (BackMetaCopies>99) {
1134 BackMetaCopies=99;
1135 }
1136 }
1137
matomlserv_reload(void)1138 void matomlserv_reload(void) {
1139 char *oldListenHost,*oldListenPort;
1140 uint32_t oldlistenip;
1141 uint16_t oldlistenport;
1142 int newlsock;
1143
1144 matomlserv_reload_common();
1145
1146 oldListenHost = ListenHost;
1147 oldListenPort = ListenPort;
1148 oldlistenip = listenip;
1149 oldlistenport = listenport;
1150
1151 ListenHost = cfg_getstr("MATOML_LISTEN_HOST","*");
1152 ListenPort = cfg_getstr("MATOML_LISTEN_PORT",DEFAULT_MASTER_CONTROL_PORT);
1153 if (strcmp(oldListenHost,ListenHost)==0 && strcmp(oldListenPort,ListenPort)==0) {
1154 free(oldListenHost);
1155 free(oldListenPort);
1156 mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: socket address hasn't changed (%s:%s)",ListenHost,ListenPort);
1157 return;
1158 }
1159
1160 newlsock = tcpsocket();
1161 if (newlsock<0) {
1162 mfs_errlog(LOG_WARNING,"master <-> metaloggers module: socket address has changed, but can't create new socket");
1163 free(ListenHost);
1164 free(ListenPort);
1165 ListenHost = oldListenHost;
1166 ListenPort = oldListenPort;
1167 return;
1168 }
1169 tcpnonblock(newlsock);
1170 tcpnodelay(newlsock);
1171 tcpreuseaddr(newlsock);
1172 if (tcpresolve(ListenHost,ListenPort,&listenip,&listenport,1)<0) {
1173 mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: socket address has changed, but can't be resolved (%s:%s)",ListenHost,ListenPort);
1174 free(ListenHost);
1175 free(ListenPort);
1176 ListenHost = oldListenHost;
1177 ListenPort = oldListenPort;
1178 listenip = oldlistenip;
1179 listenport = oldlistenport;
1180 tcpclose(newlsock);
1181 return;
1182 }
1183 if (tcpnumlisten(newlsock,listenip,listenport,100)<0) {
1184 mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: socket address has changed, but can't listen on socket (%s:%s)",ListenHost,ListenPort);
1185 free(ListenHost);
1186 free(ListenPort);
1187 ListenHost = oldListenHost;
1188 ListenPort = oldListenPort;
1189 listenip = oldlistenip;
1190 listenport = oldlistenport;
1191 tcpclose(newlsock);
1192 return;
1193 }
1194 if (tcpsetacceptfilter(newlsock)<0 && errno!=ENOTSUP) {
1195 mfs_errlog_silent(LOG_NOTICE,"master <-> metaloggers module: can't set accept filter");
1196 }
1197 mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: socket address has changed, now listen on %s:%s",ListenHost,ListenPort);
1198 free(oldListenHost);
1199 free(oldListenPort);
1200 tcpclose(lsock);
1201 lsock = newlsock;
1202
1203 // ChangelogSecondsToRemember = cfg_getuint16("MATOAN_LOG_PRESERVE_SECONDS",600);
1204 // if (ChangelogSecondsToRemember>3600) {
1205 // syslog(LOG_WARNING,"Number of seconds of change logs to be preserved in master is too big (%"PRIu16") - decreasing to 3600 seconds",ChangelogSecondsToRemember);
1206 // ChangelogSecondsToRemember=3600;
1207 // }
1208 }
1209
matomlserv_init(void)1210 int matomlserv_init(void) {
1211 matomlserv_reload_common();
1212
1213 ListenHost = cfg_getstr("MATOML_LISTEN_HOST","*");
1214 ListenPort = cfg_getstr("MATOML_LISTEN_PORT",DEFAULT_MASTER_CONTROL_PORT);
1215
1216 lsock = tcpsocket();
1217 if (lsock<0) {
1218 mfs_errlog(LOG_ERR,"master <-> metaloggers module: can't create socket");
1219 return -1;
1220 }
1221 tcpnonblock(lsock);
1222 tcpnodelay(lsock);
1223 tcpreuseaddr(lsock);
1224 if (tcpresolve(ListenHost,ListenPort,&listenip,&listenport,1)<0) {
1225 mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: can't resolve %s:%s",ListenHost,ListenPort);
1226 return -1;
1227 }
1228 if (tcpnumlisten(lsock,listenip,listenport,100)<0) {
1229 mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: can't listen on %s:%s",ListenHost,ListenPort);
1230 return -1;
1231 }
1232 if (tcpsetacceptfilter(lsock)<0 && errno!=ENOTSUP) {
1233 mfs_errlog_silent(LOG_NOTICE,"master <-> metaloggers module: can't set accept filter");
1234 }
1235 mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: listen on %s:%s",ListenHost,ListenPort);
1236
1237 matomlservhead = NULL;
1238 // ChangelogSecondsToRemember = cfg_getuint16("MATOAN_LOG_PRESERVE_SECONDS",600);
1239 // if (ChangelogSecondsToRemember>3600) {
1240 // syslog(LOG_WARNING,"Number of seconds of change logs to be preserved in master is too big (%"PRIu16") - decreasing to 3600 seconds",ChangelogSecondsToRemember);
1241 // ChangelogSecondsToRemember=3600;
1242 // }
1243 main_reload_register(matomlserv_reload);
1244 main_destruct_register(matomlserv_term);
1245 main_poll_register(matomlserv_desc,matomlserv_serve);
1246 main_keepalive_register(matomlserv_keep_alive);
1247 main_time_register(3600,0,matomlserv_status);
1248 return 0;
1249 }
1250