1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stddef.h>
26 #include <time.h>
27 #include <sys/types.h>
28 #include <sys/time.h>
29 #include <sys/uio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <syslog.h>
36 #include <errno.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #ifdef HAVE_WRITEV
40 #include <sys/uio.h>
41 #endif
42 
43 #include "MFSCommunication.h"
44 
45 #include "datapack.h"
46 #include "matomlserv.h"
47 #include "changelog.h"
48 #include "metadata.h"
49 #include "crc.h"
50 #include "cfg.h"
51 #include "main.h"
52 #include "sizestr.h"
53 #include "sockets.h"
54 #include "slogger.h"
55 #include "massert.h"
56 #include "clocks.h"
57 #include "mfsalloc.h"
58 
59 #define MaxPacketSize ANTOMA_MAXPACKETSIZE
60 
61 
62 #define OLD_CHANGES_GROUP_COUNT 10000
63 
64 // matomlserventry.mode
65 enum{KILL,DATA,CLOSE};
66 
67 // matomlserventry.clienttype
68 enum{UNKNOWN,METALOGGER};
69 
70 // matomlserventry.logstate
71 enum{NONE,DELAYED,SYNC};
72 
73 typedef struct out_packetstruct {
74 	struct out_packetstruct *next;
75 	uint8_t *startptr;
76 	uint32_t bytesleft;
77 	uint8_t data[1];
78 } out_packetstruct;
79 
80 typedef struct in_packetstruct {
81 	struct in_packetstruct *next;
82 	uint32_t type,leng;
83 	uint8_t data[1];
84 } in_packetstruct;
85 
86 typedef struct matomlserventry {
87 	uint8_t mode;
88 	int sock;
89 	int32_t pdescpos;
90 	double lastread,lastwrite;
91 	uint8_t input_hdr[8];
92 	uint8_t *input_startptr;
93 	uint32_t input_bytesleft;
94 	uint8_t input_end;
95 	in_packetstruct *input_packet;
96 	in_packetstruct *inputhead,**inputtail;
97 	out_packetstruct *outputhead,**outputtail;
98 
99 	uint16_t timeout;
100 	uint64_t next_log_version;
101 
102 	char *servstrip;		// human readable version of servip
103 	uint32_t version;
104 	uint32_t servip;
105 	uint8_t clienttype;
106 	uint8_t logstate;
107 
108 
109 	int upload_meta_fd;
110 	int upload_chain1_fd;
111 	int upload_chain2_fd;
112 
113 	struct matomlserventry *next;
114 } matomlserventry;
115 
116 static matomlserventry *matomlservhead=NULL;
117 static int lsock;
118 static int32_t lsockpdescpos;
119 
120 /*
121 typedef struct old_changes_entry {
122 	uint64_t version;
123 	uint32_t length;
124 	uint8_t *data;
125 } old_changes_entry;
126 
127 typedef struct old_changes_block {
128 	old_changes_entry old_changes_block [OLD_CHANGES_BLOCK_SIZE];
129 	uint32_t entries;
130 	uint32_t mintimestamp;
131 	uint64_t minversion;
132 	struct old_changes_block *next;
133 } old_changes_block;
134 
135 static old_changes_block *old_changes_head=NULL;
136 static old_changes_block *old_changes_current=NULL;
137 */
138 
139 // from config
140 static char *ListenHost;
141 static char *ListenPort;
142 static uint32_t listenip;
143 static uint16_t listenport;
144 
145 
146 static uint32_t BackMetaCopies;
147 
148 
149 // static uint16_t ChangelogSecondsToRemember;
150 
151 /*
152 void matomlserv_old_changes_free_block(old_changes_block *oc) {
153 	uint32_t i;
154 	for (i=0 ; i<oc->entries ; i++) {
155 		free(oc->old_changes_block[i].data);
156 	}
157 	free(oc);
158 }
159 
160 void matomlserv_store_logstring(uint64_t version,uint8_t *logstr,uint32_t logstrsize) {\
161 	old_changes_block *oc;
162 	old_changes_entry *oce;
163 	uint32_t ts;
164 	if (ChangelogSecondsToRemember==0) {
165 		while (old_changes_head) {
166 			oc = old_changes_head->next;
167 			matomlserv_old_changes_free_block(old_changes_head);
168 			old_changes_head = oc;
169 		}
170 		return;
171 	}
172 	if (old_changes_current==NULL || old_changes_head==NULL || old_changes_current->entries>=OLD_CHANGES_BLOCK_SIZE) {
173 		oc = malloc(sizeof(old_changes_block));
174 		passert(oc);
175 		ts = main_time();
176 		oc->entries = 0;
177 		oc->minversion = version;
178 		oc->mintimestamp = ts;
179 		oc->next = NULL;
180 		if (old_changes_current==NULL || old_changes_head==NULL) {
181 			old_changes_head = old_changes_current = oc;
182 		} else {
183 			old_changes_current->next = oc;
184 			old_changes_current = oc;
185 		}
186 		while (old_changes_head && old_changes_head->next && old_changes_head->next->mintimestamp+ChangelogSecondsToRemember<ts) {
187 			oc = old_changes_head->next;
188 			matomlserv_old_changes_free_block(old_changes_head);
189 			old_changes_head = oc;
190 		}
191 	}
192 	oc = old_changes_current;
193 	oce = oc->old_changes_block + oc->entries;
194 	oce->version = version;
195 	oce->length = logstrsize;
196 	oce->data = malloc(logstrsize);
197 	passert(oce->data);
198 	memcpy(oce->data,logstr,logstrsize);
199 	oc->entries++;
200 }
201 */
202 
203 
matomlserv_clientname(matomlserventry * eptr)204 static inline const char* matomlserv_clientname(matomlserventry *eptr) {
205 	switch (eptr->clienttype) {
206 		case METALOGGER:
207 			switch (eptr->logstate) {
208 				case DELAYED:
209 					return "METALOGGER-DELAYED";
210 				case SYNC:
211 					return "METALOGGER-SYNC";
212 				default:
213 					return "METALOGGER";
214 			}
215 	}
216 	return "UNKNOWN";
217 }
218 
matomlserv_mloglist_size(void)219 uint32_t matomlserv_mloglist_size(void) {
220 	matomlserventry *eptr;
221 	uint32_t i;
222 	i=0;
223 	for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
224 		if (eptr->mode!=KILL && eptr->mode!=CLOSE && eptr->clienttype==METALOGGER) {
225 			i++;
226 		}
227 	}
228 	return i*(4+4);
229 }
230 
matomlserv_mloglist_data(uint8_t * ptr)231 void matomlserv_mloglist_data(uint8_t *ptr) {
232 	matomlserventry *eptr;
233 	for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
234 		if (eptr->mode!=KILL && eptr->mode!=CLOSE && eptr->clienttype==METALOGGER) {
235 			put32bit(&ptr,eptr->version);
236 			put32bit(&ptr,eptr->servip);
237 		}
238 	}
239 }
240 
241 
matomlserv_status(void)242 void matomlserv_status(void) {
243 	matomlserventry *eptr;
244 	for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
245 		if (eptr->mode==DATA) {
246 			return;
247 		}
248 	}
249 	syslog(LOG_WARNING,"no metaloggers connected !!!");
250 }
251 
matomlserv_makestrip(uint32_t ip)252 char* matomlserv_makestrip(uint32_t ip) {
253 	uint8_t *ptr,pt[4];
254 	uint32_t l,i;
255 	char *optr;
256 	ptr = pt;
257 	put32bit(&ptr,ip);
258 	l=0;
259 	for (i=0 ; i<4 ; i++) {
260 		if (pt[i]>=100) {
261 			l+=3;
262 		} else if (pt[i]>=10) {
263 			l+=2;
264 		} else {
265 			l+=1;
266 		}
267 	}
268 	l+=4;
269 	optr = malloc(l);
270 	passert(optr);
271 	snprintf(optr,l,"%"PRIu8".%"PRIu8".%"PRIu8".%"PRIu8,pt[0],pt[1],pt[2],pt[3]);
272 	optr[l-1]=0;
273 	return optr;
274 }
275 
matomlserv_createpacket(matomlserventry * eptr,uint32_t type,uint32_t size)276 uint8_t* matomlserv_createpacket(matomlserventry *eptr,uint32_t type,uint32_t size) {
277 	out_packetstruct *outpacket;
278 	uint8_t *ptr;
279 	uint32_t psize;
280 
281 	psize = size+8;
282 	outpacket = malloc(offsetof(out_packetstruct,data)+psize);
283 #ifndef __clang_analyzer__
284 	passert(outpacket);
285 	// clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
286 #endif
287 	outpacket->bytesleft = psize;
288 	ptr = outpacket->data;
289 	put32bit(&ptr,type);
290 	put32bit(&ptr,size);
291 	outpacket->startptr = outpacket->data;
292 	outpacket->next = NULL;
293 	*(eptr->outputtail) = outpacket;
294 	eptr->outputtail = &(outpacket->next);
295 	return ptr;
296 }
297 
matomlserv_send_old_change(void * veptr,uint64_t version,uint8_t * data,uint32_t length)298 void matomlserv_send_old_change(void *veptr,uint64_t version,uint8_t *data,uint32_t length) {
299 	matomlserventry *eptr = (matomlserventry *)veptr;
300 	uint8_t *pdata;
301 
302 	pdata = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+length);
303 	put8bit(&pdata,0xFF);
304 	put64bit(&pdata,version);
305 	memcpy(pdata,data,length);
306 }
307 
308 /*
309 void matomlserv_send_old_changes(matomlserventry *eptr,uint64_t version) {
310 	uint64_t minver = changelog_get_minversion();
311 	if (minver==0) {
312 		// syslog(LOG_WARNING,"meta logger wants old changes, but storage is disabled");
313 		return;
314 	}
315 	if (version<minver) {
316 		syslog(LOG_WARNING,"meta logger wants changes since version: %"PRIu64", but minimal version in storage is: %"PRIu64,version,minver);
317 		return;
318 	}
319 	changelog_get_old_changes(version,matomlserv_send_old_change,eptr);
320 }
321 */
322 /*
323 void matomlserv_send_old_changes(matomlserventry *eptr,uint64_t version) {
324 	old_changes_block *oc;
325 	old_changes_entry *oce;
326 	uint8_t *data;
327 	uint8_t start=0;
328 	uint32_t i;
329 	if (old_changes_head==NULL) {
330 		// syslog(LOG_WARNING,"meta logger wants old changes, but storage is disabled");
331 		return;
332 	}
333 	if (old_changes_head->minversion>version) {
334 		syslog(LOG_WARNING,"meta logger wants changes since version: %"PRIu64", but minimal version in storage is: %"PRIu64,version,old_changes_head->minversion);
335 		return;
336 	}
337 	for (oc=old_changes_head ; oc ; oc=oc->next) {
338 		if (oc->minversion<=version && (oc->next==NULL || oc->next->minversion>version)) {
339 			start=1;
340 		}
341 		if (start) {
342 			for (i=0 ; i<oc->entries ; i++) {
343 				oce = oc->old_changes_block + i;
344 				if (version>=oce->version) {
345 					data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+oce->length);
346 					put8bit(&data,0xFF);
347 					put64bit(&data,oce->version);
348 					memcpy(data,oce->data,oce->length);
349 				}
350 			}
351 		}
352 	}
353 }
354 */
355 
356 
matomlserv_get_version(matomlserventry * eptr,const uint8_t * data,uint32_t length)357 void matomlserv_get_version(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
358 	uint32_t msgid = 0;
359 	uint8_t *ptr;
360 	static const char vstring[] = VERSSTR;
361 	if (length!=0 && length!=4) {
362 		syslog(LOG_NOTICE,"ANTOAN_GET_VERSION - wrong size (%"PRIu32"/4|0)",length);
363 		eptr->mode = KILL;
364 		return;
365 	}
366 	if (length==4) {
367 		msgid = get32bit(&data);
368 		ptr = matomlserv_createpacket(eptr,ANTOAN_VERSION,4+4+strlen(vstring));
369 		put32bit(&ptr,msgid);
370 	} else {
371 		ptr = matomlserv_createpacket(eptr,ANTOAN_VERSION,4+strlen(vstring));
372 	}
373 	put16bit(&ptr,VERSMAJ);
374 	put8bit(&ptr,VERSMID);
375 	put8bit(&ptr,VERSMIN);
376 	memcpy(ptr,vstring,strlen(vstring));
377 }
378 
matomlserv_get_config(matomlserventry * eptr,const uint8_t * data,uint32_t length)379 void matomlserv_get_config(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
380 	uint32_t msgid;
381 	char name[256];
382 	uint8_t nleng;
383 	uint32_t vleng;
384 	char *val;
385 	uint8_t *ptr;
386 
387 	if (length<5) {
388 		syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32")",length);
389 		eptr->mode = KILL;
390 		return;
391 	}
392 	msgid = get32bit(&data);
393 	nleng = get8bit(&data);
394 	if (length!=5U+(uint32_t)nleng) {
395 		syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32":nleng=%"PRIu8")",length,nleng);
396 		eptr->mode = KILL;
397 		return;
398 	}
399 	memcpy(name,data,nleng);
400 	name[nleng] = 0;
401 	val = cfg_getstr(name,"");
402 	vleng = strlen(val);
403 	if (vleng>255) {
404 		vleng=255;
405 	}
406 	ptr = matomlserv_createpacket(eptr,ANTOAN_CONFIG_VALUE,5+vleng);
407 	put32bit(&ptr,msgid);
408 	put8bit(&ptr,vleng);
409 	memcpy(ptr,val,vleng);
410 }
411 
matomlserv_register(matomlserventry * eptr,const uint8_t * data,uint32_t length)412 void matomlserv_register(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
413 	uint8_t rversion;
414 	uint64_t req_minversion,chlog_minversion;
415 	uint32_t n;
416 
417 	if (eptr->version>0) {
418 		syslog(LOG_WARNING,"got register message from registered metalogger !!!");
419 		eptr->mode = KILL;
420 		return;
421 	}
422 	if (length<1) {
423 		syslog(LOG_NOTICE,"ANTOMA_REGISTER - wrong size (%"PRIu32")",length);
424 		eptr->mode = KILL;
425 		return;
426 	} else {
427 		rversion = get8bit(&data);
428 		if (rversion==1) {
429 			eptr->clienttype = METALOGGER;
430 			if (length!=7) {
431 				syslog(LOG_NOTICE,"ANTOMA_REGISTER (logger 1) - wrong size (%"PRIu32"/7)",length);
432 				eptr->mode = KILL;
433 				return;
434 			}
435 			eptr->version = get32bit(&data);
436 			eptr->timeout = get16bit(&data);
437 			eptr->logstate = SYNC;
438 		} else if (rversion==2) {
439 			eptr->clienttype = METALOGGER;
440 			if (length!=7+8) {
441 				syslog(LOG_NOTICE,"ANTOMA_REGISTER (logger 2) - wrong size (%"PRIu32"/15)",length);
442 				eptr->mode = KILL;
443 				return;
444 			}
445 			eptr->version = get32bit(&data);
446 			eptr->timeout = get16bit(&data);
447 			req_minversion = get64bit(&data);
448 			chlog_minversion = changelog_get_minversion();
449 			if (chlog_minversion>0 && chlog_minversion<=req_minversion) {
450 						n = changelog_get_old_changes(req_minversion,matomlserv_send_old_change,eptr,OLD_CHANGES_GROUP_COUNT);
451 						if (n<OLD_CHANGES_GROUP_COUNT) {
452 							eptr->logstate = SYNC;
453 						} else {
454 							eptr->next_log_version = req_minversion+n;
455 							eptr->logstate = DELAYED;
456 						}
457 					} else {
458 						eptr->logstate = SYNC; // desync
459 					}
460 		} else if (rversion==3 || rversion==4) { // just ignore PRO components
461 			eptr->mode = KILL;
462 			return;
463 		} else {
464 			syslog(LOG_NOTICE,"ANTOMA_REGISTER - wrong version (%"PRIu8"/1)",rversion);
465 			eptr->mode = KILL;
466 			return;
467 		}
468 		if (eptr->timeout<10) {
469 			syslog(LOG_NOTICE,"ANTOMA_REGISTER communication timeout too small (%"PRIu16" seconds - should be at least 10 seconds)",eptr->timeout);
470 			if (eptr->timeout<3) {
471 				eptr->timeout=3;
472 			}
473 //			eptr->mode = KILL;
474 			return;
475 		}
476 	}
477 }
478 
479 
matomlserv_download_start(matomlserventry * eptr,const uint8_t * data,uint32_t length)480 void matomlserv_download_start(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
481 	uint8_t filenum;
482 	uint64_t size;
483 	uint8_t *ptr;
484 	if (length!=1) {
485 		syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_START - wrong size (%"PRIu32"/1)",length);
486 		eptr->mode = KILL;
487 		return;
488 	}
489 	filenum = get8bit(&data);
490 	if (filenum==1 || filenum==2) {
491 		if (eptr->upload_meta_fd>=0) {
492 			close(eptr->upload_meta_fd);
493 			eptr->upload_meta_fd=-1;
494 		}
495 		if (eptr->upload_chain1_fd>=0) {
496 			close(eptr->upload_chain1_fd);
497 			eptr->upload_chain1_fd=-1;
498 		}
499 		if (eptr->upload_chain2_fd>=0) {
500 			close(eptr->upload_chain2_fd);
501 			eptr->upload_chain2_fd=-1;
502 		}
503 	}
504 	if (filenum==1) {
505 		eptr->upload_meta_fd = open("metadata.mfs.back",O_RDONLY);
506 		eptr->upload_chain1_fd = open("changelog.0.mfs",O_RDONLY);
507 		eptr->upload_chain2_fd = open("changelog.1.mfs",O_RDONLY);
508 	} else if (filenum==2) {
509 		eptr->upload_meta_fd = open("sessions.mfs",O_RDONLY);
510 	} else if (filenum==11) {
511 		if (eptr->upload_meta_fd>=0) {
512 			close(eptr->upload_meta_fd);
513 		}
514 		eptr->upload_meta_fd = eptr->upload_chain1_fd;
515 		eptr->upload_chain1_fd = -1;
516 	} else if (filenum==12) {
517 		if (eptr->upload_meta_fd>=0) {
518 			close(eptr->upload_meta_fd);
519 		}
520 		eptr->upload_meta_fd = eptr->upload_chain2_fd;
521 		eptr->upload_chain2_fd = -1;
522 	} else {
523 		eptr->mode = KILL;
524 		return;
525 	}
526 	if (eptr->upload_meta_fd<0) {
527 		if (filenum==11 || filenum==12) {
528 			ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,8);
529 			put64bit(&ptr,0);
530 			return;
531 		} else {
532 			ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,1);
533 			put8bit(&ptr,0xff);	// error
534 			return;
535 		}
536 	}
537 	size = lseek(eptr->upload_meta_fd,0,SEEK_END);
538 	ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_INFO,8);
539 	put64bit(&ptr,size);	// ok
540 }
541 
matomlserv_download_request(matomlserventry * eptr,const uint8_t * data,uint32_t length)542 void matomlserv_download_request(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
543 	uint8_t *ptr;
544 	uint64_t offset;
545 	uint32_t leng;
546 	uint32_t crc;
547 	ssize_t ret;
548 
549 	if (length!=12) {
550 		syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_REQUEST - wrong size (%"PRIu32"/12)",length);
551 		eptr->mode = KILL;
552 		return;
553 	}
554 	if (eptr->upload_meta_fd<0) {
555 		syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_REQUEST - file not opened");
556 		eptr->mode = KILL;
557 		return;
558 	}
559 	offset = get64bit(&data);
560 	leng = get32bit(&data);
561 	ptr = matomlserv_createpacket(eptr,MATOAN_DOWNLOAD_DATA,16+leng);
562 	put64bit(&ptr,offset);
563 	put32bit(&ptr,leng);
564 #ifdef HAVE_PREAD
565 	ret = pread(eptr->upload_meta_fd,ptr+4,leng,offset);
566 #else /* HAVE_PWRITE */
567 	lseek(eptr->upload_meta_fd,offset,SEEK_SET);
568 	ret = read(eptr->upload_meta_fd,ptr+4,leng);
569 #endif /* HAVE_PWRITE */
570 	if (ret!=(ssize_t)leng) {
571 		mfs_errlog_silent(LOG_NOTICE,"error reading metafile");
572 		eptr->mode = KILL;
573 		return;
574 	}
575 	crc = mycrc32(0,ptr+4,leng);
576 	put32bit(&ptr,crc);
577 }
578 
matomlserv_download_end(matomlserventry * eptr,const uint8_t * data,uint32_t length)579 void matomlserv_download_end(matomlserventry *eptr,const uint8_t *data,uint32_t length) {
580 	(void)data;
581 	if (length!=0) {
582 		syslog(LOG_NOTICE,"ANTOMA_DOWNLOAD_END - wrong size (%"PRIu32"/0)",length);
583 		eptr->mode = KILL;
584 		return;
585 	}
586 	if (eptr->upload_meta_fd>=0) {
587 		close(eptr->upload_meta_fd);
588 		eptr->upload_meta_fd=-1;
589 	}
590 }
591 
592 
matomlserv_broadcast_logstring(uint64_t version,uint8_t * logstr,uint32_t logstrsize)593 void matomlserv_broadcast_logstring(uint64_t version,uint8_t *logstr,uint32_t logstrsize) {
594 	matomlserventry *eptr;
595 	uint8_t *data;
596 
597 	for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
598 		if (eptr->version>0 && eptr->clienttype==METALOGGER && eptr->logstate==SYNC) {
599 			data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,9+logstrsize);
600 			put8bit(&data,0xFF);
601 			put64bit(&data,version);
602 			memcpy(data,logstr,logstrsize);
603 		}
604 	}
605 }
606 
matomlserv_broadcast_logrotate(void)607 void matomlserv_broadcast_logrotate(void) {
608 	matomlserventry *eptr;
609 	uint8_t *data;
610 
611 	for (eptr = matomlservhead ; eptr ; eptr=eptr->next) {
612 		if (eptr->version>0 && eptr->clienttype==METALOGGER) {
613 			data = matomlserv_createpacket(eptr,MATOAN_METACHANGES_LOG,1);
614 			put8bit(&data,0x55);
615 		}
616 	}
617 }
618 
matomlserv_beforeclose(matomlserventry * eptr)619 void matomlserv_beforeclose(matomlserventry *eptr) {
620 	if (eptr->upload_meta_fd>=0) {
621 		close(eptr->upload_meta_fd);
622 		eptr->upload_meta_fd=-1;
623 	}
624 	if (eptr->upload_chain1_fd>=0) {
625 		close(eptr->upload_chain1_fd);
626 		eptr->upload_chain1_fd=-1;
627 	}
628 	if (eptr->upload_chain2_fd>=0) {
629 		close(eptr->upload_chain2_fd);
630 		eptr->upload_chain2_fd=-1;
631 	}
632 }
633 
matomlserv_gotpacket(matomlserventry * eptr,uint32_t type,const uint8_t * data,uint32_t length)634 void matomlserv_gotpacket(matomlserventry *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
635 	switch (type) {
636 		case ANTOAN_NOP:
637 			break;
638 		case ANTOAN_UNKNOWN_COMMAND: // for future use
639 			break;
640 		case ANTOAN_BAD_COMMAND_SIZE: // for future use
641 			break;
642 		case ANTOAN_GET_VERSION:
643 			matomlserv_get_version(eptr,data,length);
644 			break;
645 		case ANTOAN_GET_CONFIG:
646 			matomlserv_get_config(eptr,data,length);
647 			break;
648 		case ANTOMA_REGISTER:
649 			matomlserv_register(eptr,data,length);
650 			break;
651 		case ANTOMA_DOWNLOAD_START:
652 			matomlserv_download_start(eptr,data,length);
653 			break;
654 		case ANTOMA_DOWNLOAD_REQUEST:
655 			matomlserv_download_request(eptr,data,length);
656 			break;
657 		case ANTOMA_DOWNLOAD_END:
658 			matomlserv_download_end(eptr,data,length);
659 			break;
660 		default:
661 			syslog(LOG_NOTICE,"master control module: got unknown message (type:%"PRIu32")",type);
662 			eptr->mode = KILL;
663 	}
664 }
665 
matomlserv_read(matomlserventry * eptr,double now)666 void matomlserv_read(matomlserventry *eptr,double now) {
667 	int32_t i;
668 	uint32_t type,leng;
669 	const uint8_t *ptr;
670 	uint32_t rbleng,rbpos;
671 	uint8_t err,hup;
672 	static uint8_t *readbuff = NULL;
673 	static uint32_t readbuffsize = 0;
674 
675 	if (eptr == NULL) {
676 		if (readbuff != NULL) {
677 			free(readbuff);
678 		}
679 		readbuff = NULL;
680 		readbuffsize = 0;
681 		return;
682 	}
683 
684 	if (readbuffsize==0) {
685 		readbuffsize = 65536;
686 		readbuff = malloc(readbuffsize);
687 		passert(readbuff);
688 	}
689 
690 	rbleng = 0;
691 	err = 0;
692 	hup = 0;
693 	for (;;) {
694 		i = read(eptr->sock,readbuff+rbleng,readbuffsize-rbleng);
695 		if (i==0) {
696 			hup = 1;
697 			break;
698 		} else if (i<0) {
699 			if (ERRNO_ERROR) {
700 				err = 1;
701 			}
702 			break;
703 		} else {
704 			rbleng += i;
705 			if (rbleng==readbuffsize) {
706 				readbuffsize*=2;
707 				readbuff = mfsrealloc(readbuff,readbuffsize);
708 				passert(readbuff);
709 			} else {
710 				break;
711 			}
712 		}
713 	}
714 
715 	if (rbleng>0) {
716 		eptr->lastread = now;
717 	}
718 
719 	rbpos = 0;
720 	while (rbpos<rbleng) {
721 		if ((rbleng-rbpos)>=eptr->input_bytesleft) {
722 			memcpy(eptr->input_startptr,readbuff+rbpos,eptr->input_bytesleft);
723 			i = eptr->input_bytesleft;
724 		} else {
725 			memcpy(eptr->input_startptr,readbuff+rbpos,rbleng-rbpos);
726 			i = rbleng-rbpos;
727 		}
728 		rbpos += i;
729 		eptr->input_startptr+=i;
730 		eptr->input_bytesleft-=i;
731 
732 		if (eptr->input_bytesleft>0) {
733 			break;
734 		}
735 
736 		if (eptr->input_packet == NULL) {
737 			ptr = eptr->input_hdr;
738 			type = get32bit(&ptr);
739 			leng = get32bit(&ptr);
740 
741 			if (leng>MaxPacketSize) {
742 				syslog(LOG_WARNING,"ML(%s) packet too long (%"PRIu32"/%u) ; command:%"PRIu32,eptr->servstrip,leng,MaxPacketSize,type);
743 				eptr->input_end = 1;
744 				return;
745 			}
746 
747 			eptr->input_packet = malloc(offsetof(in_packetstruct,data)+leng);
748 			passert(eptr->input_packet);
749 			eptr->input_packet->next = NULL;
750 			eptr->input_packet->type = type;
751 			eptr->input_packet->leng = leng;
752 
753 			eptr->input_startptr = eptr->input_packet->data;
754 			eptr->input_bytesleft = leng;
755 		}
756 
757 		if (eptr->input_bytesleft>0) {
758 			continue;
759 		}
760 
761 		if (eptr->input_packet != NULL) {
762 			*(eptr->inputtail) = eptr->input_packet;
763 			eptr->inputtail = &(eptr->input_packet->next);
764 			eptr->input_packet = NULL;
765 			eptr->input_bytesleft = 8;
766 			eptr->input_startptr = eptr->input_hdr;
767 		}
768 	}
769 
770 	if (hup) {
771 		syslog(LOG_NOTICE,"connection with %s(%s) has been closed by peer",matomlserv_clientname(eptr),eptr->servstrip);
772 		eptr->input_end = 1;
773 	} else if (err) {
774 		mfs_arg_errlog_silent(LOG_NOTICE,"read from ML(%s) error",eptr->servstrip);
775 		eptr->input_end = 1;
776 	}
777 }
778 
matomlserv_parse(matomlserventry * eptr)779 void matomlserv_parse(matomlserventry *eptr) {
780 	in_packetstruct *ipack;
781 	uint64_t starttime;
782 	uint64_t currtime;
783 
784 	starttime = monotonic_useconds();
785 	currtime = starttime;
786 	while (eptr->mode==DATA && (ipack = eptr->inputhead)!=NULL && starttime+10000>currtime) {
787 		matomlserv_gotpacket(eptr,ipack->type,ipack->data,ipack->leng);
788 		eptr->inputhead = ipack->next;
789 		free(ipack);
790 		if (eptr->inputhead==NULL) {
791 			eptr->inputtail = &(eptr->inputhead);
792 		} else {
793 			currtime = monotonic_useconds();
794 		}
795 	}
796 	if (eptr->mode==DATA && eptr->inputhead==NULL && eptr->input_end) {
797 		eptr->mode = KILL;
798 	}
799 }
800 
matomlserv_write(matomlserventry * eptr,double now)801 void matomlserv_write(matomlserventry *eptr,double now) {
802 	out_packetstruct *opack;
803 	int32_t i;
804 #ifdef HAVE_WRITEV
805 	struct iovec iovtab[100];
806 	uint32_t iovdata;
807 	uint32_t leng;
808 	uint32_t left;
809 
810 	for (;;) {
811 		leng = 0;
812 		for (iovdata=0,opack=eptr->outputhead ; iovdata<100 && opack!=NULL ; iovdata++,opack=opack->next) {
813 			iovtab[iovdata].iov_base = opack->startptr;
814 			iovtab[iovdata].iov_len = opack->bytesleft;
815 			leng += opack->bytesleft;
816 		}
817 		if (iovdata==0) {
818 			return;
819 		}
820 		i = writev(eptr->sock,iovtab,iovdata);
821 		if (i<0) {
822 			if (ERRNO_ERROR) {
823 				mfs_arg_errlog_silent(LOG_NOTICE,"write to ML(%s) error",eptr->servstrip);
824 				eptr->mode = KILL;
825 			}
826 			return;
827 		}
828 		if (i>0) {
829 			eptr->lastwrite = now;
830 		}
831 		left = i;
832 		while (left>0 && eptr->outputhead!=NULL) {
833 			opack = eptr->outputhead;
834 			if (opack->bytesleft>left) {
835 				opack->startptr+=left;
836 				opack->bytesleft-=left;
837 				left = 0;
838 			} else {
839 				left -= opack->bytesleft;
840 				eptr->outputhead = opack->next;
841 				if (eptr->outputhead==NULL) {
842 					eptr->outputtail = &(eptr->outputhead);
843 				}
844 				free(opack);
845 			}
846 		}
847 		if ((uint32_t)i < leng) {
848 			return;
849 		}
850 	}
851 #else
852 	for (;;) {
853 		opack = eptr->outputhead;
854 		if (opack==NULL) {
855 			return;
856 		}
857 		i=write(eptr->sock,opack->startptr,opack->bytesleft);
858 		if (i<0) {
859 			if (ERRNO_ERROR) {
860 				mfs_arg_errlog_silent(LOG_NOTICE,"write to ML(%s) error",eptr->servstrip);
861 				eptr->mode = KILL;
862 			}
863 			return;
864 		}
865 		if (i>0) {
866 			eptr->lastwrite = now;
867 		}
868 		opack->startptr+=i;
869 		opack->bytesleft-=i;
870 		if (opack->bytesleft>0) {
871 			return;
872 		}
873 		eptr->outputhead = opack->next;
874 		if (eptr->outputhead==NULL) {
875 			eptr->outputtail = &(eptr->outputhead);
876 		}
877 		free(opack);
878 	}
879 #endif
880 }
881 
matomlserv_desc(struct pollfd * pdesc,uint32_t * ndesc)882 void matomlserv_desc(struct pollfd *pdesc,uint32_t *ndesc) {
883 	uint32_t pos = *ndesc;
884 	int events;
885 	matomlserventry *eptr;
886 	pdesc[pos].fd = lsock;
887 	pdesc[pos].events = POLLIN;
888 	lsockpdescpos = pos;
889 	pos++;
890 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
891 		events = 0;
892 		if (eptr->input_end==0) {
893 			events |= POLLIN;
894 		}
895 		if (eptr->outputhead!=NULL) {
896 			events |= POLLOUT;
897 		}
898 		if (events) {
899 			pdesc[pos].fd = eptr->sock;
900 			pdesc[pos].events = POLLIN;
901 			eptr->pdescpos = pos;
902 			pos++;
903 		} else {
904 			eptr->pdescpos = -1;
905 		}
906 	}
907 	*ndesc = pos;
908 }
909 
matomlserv_disconnection_loop(void)910 void matomlserv_disconnection_loop(void) {
911 	matomlserventry *eptr,**kptr;
912 	in_packetstruct *ipptr,*ipaptr;
913 	out_packetstruct *opptr,*opaptr;
914 
915 	kptr = &matomlservhead;
916 	while ((eptr=*kptr)) {
917 		if (eptr->mode==KILL || eptr->mode==CLOSE) {
918 			matomlserv_beforeclose(eptr);
919 			if (eptr->mode==KILL) {
920 				tcpclose(eptr->sock);
921 			} else {
922 				close(eptr->sock);
923 			}
924 			if (eptr->input_packet) {
925 				free(eptr->input_packet);
926 			}
927 			ipptr = eptr->inputhead;
928 			while (ipptr) {
929 				ipaptr = ipptr;
930 				ipptr = ipptr->next;
931 				free(ipaptr);
932 			}
933 			opptr = eptr->outputhead;
934 			while (opptr) {
935 				opaptr = opptr;
936 				opptr = opptr->next;
937 				free(opaptr);
938 			}
939 			if (eptr->servstrip) {
940 				free(eptr->servstrip);
941 			}
942 			*kptr = eptr->next;
943 			free(eptr);
944 		} else {
945 			kptr = &(eptr->next);
946 		}
947 	}
948 }
949 
matomlserv_serve(struct pollfd * pdesc)950 void matomlserv_serve(struct pollfd *pdesc) {
951 	double now;
952 	matomlserventry *eptr;
953 	int ns;
954 	static double lastaction = 0.0;
955 	double timeoutadd;
956 	uint32_t n;
957 
958 	now = monotonic_seconds();
959 // timeout fix
960 	if (lastaction>0.0) {
961 		timeoutadd = now-lastaction;
962 		if (timeoutadd>1.0) {
963 			for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
964 				eptr->lastread += timeoutadd;
965 			}
966 		}
967 	}
968 	lastaction = now;
969 
970 	if (lsockpdescpos>=0 && (pdesc[lsockpdescpos].revents & POLLIN)) {
971 		ns=tcpaccept(lsock);
972 		if (ns<0) {
973 			mfs_errlog_silent(LOG_NOTICE,"Master<->ML socket: accept error");
974 		} else {
975 			tcpnonblock(ns);
976 			tcpnodelay(ns);
977 			eptr = malloc(sizeof(matomlserventry));
978 			passert(eptr);
979 			eptr->next = matomlservhead;
980 			matomlservhead = eptr;
981 			eptr->sock = ns;
982 			eptr->pdescpos = -1;
983 			eptr->mode = DATA;
984 			eptr->lastread = now;
985 			eptr->lastwrite = now;
986 			eptr->input_bytesleft = 8;
987 			eptr->input_startptr = eptr->input_hdr;
988 			eptr->input_end = 0;
989 			eptr->input_packet = NULL;
990 			eptr->inputhead = NULL;
991 			eptr->inputtail = &(eptr->inputhead);
992 			eptr->outputhead = NULL;
993 			eptr->outputtail = &(eptr->outputhead);
994 			eptr->timeout = 10;
995 
996 			tcpgetpeer(eptr->sock,&(eptr->servip),NULL);
997 			eptr->servstrip = matomlserv_makestrip(eptr->servip);
998 			eptr->version = 0;
999 			eptr->clienttype = UNKNOWN;
1000 			eptr->logstate = NONE;
1001 			eptr->upload_meta_fd = -1;
1002 			eptr->upload_chain1_fd = -1;
1003 			eptr->upload_chain2_fd = -1;
1004 		}
1005 	}
1006 
1007 // read
1008 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1009 		if (eptr->pdescpos>=0) {
1010 			if ((pdesc[eptr->pdescpos].revents & (POLLERR|POLLIN))==POLLIN && eptr->mode!=KILL) {
1011 				matomlserv_read(eptr,now);
1012 			}
1013 			if (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP)) {
1014 				eptr->input_end = 1;
1015 			}
1016 		}
1017 		matomlserv_parse(eptr);
1018 	}
1019 
1020 // write
1021 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1022 		if ((eptr->lastwrite+(eptr->timeout/3.0))<now && eptr->outputhead==NULL && eptr->clienttype!=UNKNOWN) {
1023 			matomlserv_createpacket(eptr,ANTOAN_NOP,0);
1024 		}
1025 		if (eptr->pdescpos>=0) {
1026 			if ((((pdesc[eptr->pdescpos].events & POLLOUT)==0 && (eptr->outputhead)) || (pdesc[eptr->pdescpos].revents & POLLOUT)) && eptr->mode!=KILL) {
1027 				matomlserv_write(eptr,now);
1028 			}
1029 		}
1030 		if ((eptr->lastread+eptr->timeout)<now) {
1031 			eptr->mode = KILL;
1032 		}
1033 		if (eptr->logstate==DELAYED && eptr->outputhead==NULL) {
1034 			n = changelog_get_old_changes(eptr->next_log_version,matomlserv_send_old_change,eptr,OLD_CHANGES_GROUP_COUNT);
1035 			if (n<OLD_CHANGES_GROUP_COUNT) {
1036 				eptr->logstate=SYNC;
1037 			} else {
1038 				eptr->next_log_version += n;
1039 			}
1040 		}
1041 	}
1042 	matomlserv_disconnection_loop();
1043 }
1044 
matomlserv_keep_alive(void)1045 void matomlserv_keep_alive(void) {
1046 	double now;
1047 	matomlserventry *eptr;
1048 
1049 	now = monotonic_seconds();
1050 // read
1051 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1052 		if (eptr->mode == DATA && eptr->input_end==0) {
1053 			matomlserv_read(eptr,now);
1054 		}
1055 	}
1056 // write
1057 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1058 		if ((eptr->lastwrite+(eptr->timeout/3.0))<now && eptr->outputhead==NULL && eptr->clienttype!=UNKNOWN) {
1059 			matomlserv_createpacket(eptr,ANTOAN_NOP,0);
1060 		}
1061 		if (eptr->mode == DATA && eptr->outputhead) {
1062 			matomlserv_write(eptr,now);
1063 		}
1064 	}
1065 }
1066 
matomlserv_close_lsock(void)1067 void matomlserv_close_lsock(void) { // after fork
1068 	if (lsock>=0) {
1069 		close(lsock);
1070 	}
1071 }
1072 
matomlserv_term(void)1073 void matomlserv_term(void) {
1074 	matomlserventry *eptr,*eaptr;
1075 	in_packetstruct *ipptr,*ipaptr;
1076 	out_packetstruct *opptr,*opaptr;
1077 	syslog(LOG_INFO,"master control module: closing %s:%s",ListenHost,ListenPort);
1078 	tcpclose(lsock);
1079 
1080 	eptr = matomlservhead;
1081 	while (eptr) {
1082 		if (eptr->input_packet) {
1083 			free(eptr->input_packet);
1084 		}
1085 		ipptr = eptr->inputhead;
1086 		while (ipptr) {
1087 			ipaptr = ipptr;
1088 			ipptr = ipptr->next;
1089 			free(ipaptr);
1090 		}
1091 		opptr = eptr->outputhead;
1092 		while (opptr) {
1093 			opaptr = opptr;
1094 			opptr = opptr->next;
1095 			free(opaptr);
1096 		}
1097 		eaptr = eptr;
1098 		eptr = eptr->next;
1099 		free(eaptr);
1100 	}
1101 	matomlservhead=NULL;
1102 
1103 	matomlserv_read(NULL,0.0); // free internal read buffer
1104 
1105 	free(ListenHost);
1106 	free(ListenPort);
1107 }
1108 
matomlserv_no_more_pending_jobs(void)1109 int matomlserv_no_more_pending_jobs(void) {
1110 	matomlserventry *eptr;
1111 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1112 		if (eptr->outputhead!=NULL) {
1113 			return 0;
1114 		}
1115 	}
1116 	return 1;
1117 }
1118 
matomlserv_disconnect_all(void)1119 void matomlserv_disconnect_all(void) {
1120 	matomlserventry *eptr;
1121 	for (eptr=matomlservhead ; eptr ; eptr=eptr->next) {
1122 		eptr->mode = KILL;
1123 	}
1124 	matomlserv_disconnection_loop();
1125 }
1126 
matomlserv_getport(void)1127 uint16_t matomlserv_getport(void) {
1128 	return listenport;
1129 }
1130 
matomlserv_reload_common(void)1131 void matomlserv_reload_common(void) {
1132 	BackMetaCopies = cfg_getuint32("BACK_META_KEEP_PREVIOUS",1);
1133 	if (BackMetaCopies>99) {
1134 		BackMetaCopies=99;
1135 	}
1136 }
1137 
matomlserv_reload(void)1138 void matomlserv_reload(void) {
1139 	char *oldListenHost,*oldListenPort;
1140 	uint32_t oldlistenip;
1141 	uint16_t oldlistenport;
1142 	int newlsock;
1143 
1144 	matomlserv_reload_common();
1145 
1146 	oldListenHost = ListenHost;
1147 	oldListenPort = ListenPort;
1148 	oldlistenip = listenip;
1149 	oldlistenport = listenport;
1150 
1151 	ListenHost = cfg_getstr("MATOML_LISTEN_HOST","*");
1152 	ListenPort = cfg_getstr("MATOML_LISTEN_PORT",DEFAULT_MASTER_CONTROL_PORT);
1153 	if (strcmp(oldListenHost,ListenHost)==0 && strcmp(oldListenPort,ListenPort)==0) {
1154 		free(oldListenHost);
1155 		free(oldListenPort);
1156 		mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: socket address hasn't changed (%s:%s)",ListenHost,ListenPort);
1157 		return;
1158 	}
1159 
1160 	newlsock = tcpsocket();
1161 	if (newlsock<0) {
1162 		mfs_errlog(LOG_WARNING,"master <-> metaloggers module: socket address has changed, but can't create new socket");
1163 		free(ListenHost);
1164 		free(ListenPort);
1165 		ListenHost = oldListenHost;
1166 		ListenPort = oldListenPort;
1167 		return;
1168 	}
1169 	tcpnonblock(newlsock);
1170 	tcpnodelay(newlsock);
1171 	tcpreuseaddr(newlsock);
1172 	if (tcpresolve(ListenHost,ListenPort,&listenip,&listenport,1)<0) {
1173 		mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: socket address has changed, but can't be resolved (%s:%s)",ListenHost,ListenPort);
1174 		free(ListenHost);
1175 		free(ListenPort);
1176 		ListenHost = oldListenHost;
1177 		ListenPort = oldListenPort;
1178 		listenip = oldlistenip;
1179 		listenport = oldlistenport;
1180 		tcpclose(newlsock);
1181 		return;
1182 	}
1183 	if (tcpnumlisten(newlsock,listenip,listenport,100)<0) {
1184 		mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: socket address has changed, but can't listen on socket (%s:%s)",ListenHost,ListenPort);
1185 		free(ListenHost);
1186 		free(ListenPort);
1187 		ListenHost = oldListenHost;
1188 		ListenPort = oldListenPort;
1189 		listenip = oldlistenip;
1190 		listenport = oldlistenport;
1191 		tcpclose(newlsock);
1192 		return;
1193 	}
1194 	if (tcpsetacceptfilter(newlsock)<0 && errno!=ENOTSUP) {
1195 		mfs_errlog_silent(LOG_NOTICE,"master <-> metaloggers module: can't set accept filter");
1196 	}
1197 	mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: socket address has changed, now listen on %s:%s",ListenHost,ListenPort);
1198 	free(oldListenHost);
1199 	free(oldListenPort);
1200 	tcpclose(lsock);
1201 	lsock = newlsock;
1202 
1203 //	ChangelogSecondsToRemember = cfg_getuint16("MATOAN_LOG_PRESERVE_SECONDS",600);
1204 //	if (ChangelogSecondsToRemember>3600) {
1205 //		syslog(LOG_WARNING,"Number of seconds of change logs to be preserved in master is too big (%"PRIu16") - decreasing to 3600 seconds",ChangelogSecondsToRemember);
1206 //		ChangelogSecondsToRemember=3600;
1207 //	}
1208 }
1209 
matomlserv_init(void)1210 int matomlserv_init(void) {
1211 	matomlserv_reload_common();
1212 
1213 	ListenHost = cfg_getstr("MATOML_LISTEN_HOST","*");
1214 	ListenPort = cfg_getstr("MATOML_LISTEN_PORT",DEFAULT_MASTER_CONTROL_PORT);
1215 
1216 	lsock = tcpsocket();
1217 	if (lsock<0) {
1218 		mfs_errlog(LOG_ERR,"master <-> metaloggers module: can't create socket");
1219 		return -1;
1220 	}
1221 	tcpnonblock(lsock);
1222 	tcpnodelay(lsock);
1223 	tcpreuseaddr(lsock);
1224 	if (tcpresolve(ListenHost,ListenPort,&listenip,&listenport,1)<0) {
1225 		mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: can't resolve %s:%s",ListenHost,ListenPort);
1226 		return -1;
1227 	}
1228 	if (tcpnumlisten(lsock,listenip,listenport,100)<0) {
1229 		mfs_arg_errlog(LOG_ERR,"master <-> metaloggers module: can't listen on %s:%s",ListenHost,ListenPort);
1230 		return -1;
1231 	}
1232 	if (tcpsetacceptfilter(lsock)<0 && errno!=ENOTSUP) {
1233 		mfs_errlog_silent(LOG_NOTICE,"master <-> metaloggers module: can't set accept filter");
1234 	}
1235 	mfs_arg_syslog(LOG_NOTICE,"master <-> metaloggers module: listen on %s:%s",ListenHost,ListenPort);
1236 
1237 	matomlservhead = NULL;
1238 //	ChangelogSecondsToRemember = cfg_getuint16("MATOAN_LOG_PRESERVE_SECONDS",600);
1239 //	if (ChangelogSecondsToRemember>3600) {
1240 //		syslog(LOG_WARNING,"Number of seconds of change logs to be preserved in master is too big (%"PRIu16") - decreasing to 3600 seconds",ChangelogSecondsToRemember);
1241 //		ChangelogSecondsToRemember=3600;
1242 //	}
1243 	main_reload_register(matomlserv_reload);
1244 	main_destruct_register(matomlserv_term);
1245 	main_poll_register(matomlserv_desc,matomlserv_serve);
1246 	main_keepalive_register(matomlserv_keep_alive);
1247 	main_time_register(3600,0,matomlserv_status);
1248 	return 0;
1249 }
1250