1 /*
2  * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3  *
4  * This file is part of MooseFS.
5  *
6  * MooseFS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, version 2 (only).
9  *
10  * MooseFS is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with MooseFS; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18  * or visit http://www.gnu.org/licenses/gpl-2.0.html
19  */
20 
21 #include "config.h"
22 
23 #include <stddef.h>
24 #include <time.h>
25 #include <sys/types.h>
26 #include <sys/uio.h>
27 #include <unistd.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <syslog.h>
31 #include <errno.h>
32 #include <inttypes.h>
33 #include <netinet/in.h>
34 
35 #include "MFSCommunication.h"
36 
37 #include "datapack.h"
38 #include "csserv.h"
39 #include "cfg.h"
40 #include "main.h"
41 #include "clocks.h"
42 #include "sockets.h"
43 #include "hddspacemgr.h"
44 #include "masterconn.h"
45 #include "charts.h"
46 #include "slogger.h"
47 #include "bgjobs.h"
48 #include "massert.h"
49 
50 // connection timeout in seconds
51 #define CSSERV_TIMEOUT 5
52 
53 #define MaxPacketSize CSTOCS_MAXPACKETSIZE
54 
55 //csserventry.mode
56 enum {HEADER,DATA};
57 
58 //csserventry.state
59 enum {IDLE,READ,WRITE,CLOSE};
60 
61 struct csserventry;
62 
63 enum {IJ_GET_CHUNK_BLOCKS,IJ_GET_CHUNK_CHECKSUM,IJ_GET_CHUNK_CHECKSUM_TAB};
64 
65 typedef struct idlejob {
66 	uint32_t jobid;
67 	uint8_t op;
68 	uint64_t chunkid;
69 	uint32_t version;
70 	struct csserventry *eptr;
71 	struct idlejob *next,**prev;
72 	uint8_t buff[1];
73 } idlejob;
74 
75 typedef struct packetstruct {
76 	struct packetstruct *next;
77 	uint8_t *startptr;
78 	uint32_t bytesleft;
79 	uint8_t *packet;
80 } packetstruct;
81 
82 typedef struct csserventry {
83 	uint8_t state;
84 	uint8_t mode;
85 
86 	int sock;
87 	int32_t pdescpos;
88 	double lastread,lastwrite;
89 	uint32_t activity;
90 	uint8_t hdrbuff[8];
91 	packetstruct inputpacket;
92 	packetstruct *outputhead,**outputtail;
93 
94 	uint32_t jobid;
95 
96 	struct idlejob *idlejobs;
97 
98 	struct csserventry *next;
99 } csserventry;
100 
101 static csserventry *csservhead=NULL;
102 static int lsock;
103 static int32_t lsockpdescpos;
104 
105 static uint32_t mylistenip;
106 static uint16_t mylistenport;
107 
108 static uint64_t stats_bytesin=0;
109 static uint64_t stats_bytesout=0;
110 
111 // from config
112 static char *ListenHost;
113 static char *ListenPort;
114 
csserv_stats(uint64_t * bin,uint64_t * bout)115 void csserv_stats(uint64_t *bin,uint64_t *bout) {
116 	*bin = stats_bytesin;
117 	*bout = stats_bytesout;
118 	stats_bytesin = 0;
119 	stats_bytesout = 0;
120 }
121 
csserv_create_packet(csserventry * eptr,uint32_t type,uint32_t size)122 uint8_t* csserv_create_packet(csserventry *eptr,uint32_t type,uint32_t size) {
123 	packetstruct *outpacket;
124 	uint8_t *ptr;
125 	uint32_t psize;
126 
127 	outpacket = malloc(sizeof(packetstruct));
128 #ifndef __clang_analyzer__
129 	passert(outpacket);
130 	// clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
131 #endif
132 	psize = size+8;
133 	outpacket->packet=malloc(psize);
134 #ifndef __clang_analyzer__
135 	passert(outpacket->packet);
136 	// clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
137 #endif
138 	outpacket->bytesleft = psize;
139 	ptr = outpacket->packet;
140 	put32bit(&ptr,type);
141 	put32bit(&ptr,size);
142 	outpacket->startptr = (uint8_t*)(outpacket->packet);
143 	outpacket->next = NULL;
144 	*(eptr->outputtail) = outpacket;
145 	eptr->outputtail = &(outpacket->next);
146 	return ptr;
147 }
148 
csserv_get_version(csserventry * eptr,const uint8_t * data,uint32_t length)149 void csserv_get_version(csserventry *eptr,const uint8_t *data,uint32_t length) {
150 	uint32_t msgid = 0;
151 	uint8_t *ptr;
152 	static const char vstring[] = VERSSTR;
153 	if (length!=0 && length!=4) {
154 		syslog(LOG_NOTICE,"ANTOAN_GET_VERSION - wrong size (%"PRIu32"/4|0)",length);
155 		eptr->state = CLOSE;
156 		return;
157 	}
158 	if (length==4) {
159 		msgid = get32bit(&data);
160 		ptr = csserv_create_packet(eptr,ANTOAN_VERSION,4+4+strlen(vstring));
161 		put32bit(&ptr,msgid);
162 	} else {
163 		ptr = csserv_create_packet(eptr,ANTOAN_VERSION,4+strlen(vstring));
164 	}
165 	put16bit(&ptr,VERSMAJ);
166 	put8bit(&ptr,VERSMID);
167 	put8bit(&ptr,VERSMIN);
168 	memcpy(ptr,vstring,strlen(vstring));
169 }
170 
csserv_get_config(csserventry * eptr,const uint8_t * data,uint32_t length)171 void csserv_get_config(csserventry *eptr,const uint8_t *data,uint32_t length) {
172 	uint32_t msgid;
173 	char name[256];
174 	uint8_t nleng;
175 	uint32_t vleng;
176 	char *val;
177 	uint8_t *ptr;
178 
179 	if (length<5) {
180 		syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32")",length);
181 		eptr->state = CLOSE;
182 		return;
183 	}
184 	msgid = get32bit(&data);
185 	nleng = get8bit(&data);
186 	if (length!=5U+(uint32_t)nleng) {
187 		syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32":nleng=%"PRIu8")",length,nleng);
188 		eptr->state = CLOSE;
189 		return;
190 	}
191 	memcpy(name,data,nleng);
192 	name[nleng] = 0;
193 	val = cfg_getstr(name,"");
194 	vleng = strlen(val);
195 	if (vleng>255) {
196 		vleng=255;
197 	}
198 	ptr = csserv_create_packet(eptr,ANTOAN_CONFIG_VALUE,5+vleng);
199 	put32bit(&ptr,msgid);
200 	put8bit(&ptr,vleng);
201 	memcpy(ptr,val,vleng);
202 }
203 
csserv_iothread_finished(uint8_t status,void * e)204 void csserv_iothread_finished(uint8_t status,void *e) {
205 	csserventry *eptr = (csserventry*)e;
206 	if (status==0) {
207 		eptr->state = CLOSE;
208 	} else {
209 		eptr->state = IDLE;
210 	}
211 	eptr->jobid = 0;
212 	if (eptr->inputpacket.packet) {
213 		free(eptr->inputpacket.packet);
214 	}
215 	eptr->inputpacket.packet=NULL;
216 }
217 
csserv_read_init(csserventry * eptr,const uint8_t * data,uint32_t length)218 void csserv_read_init(csserventry *eptr,const uint8_t *data,uint32_t length) {
219 	uint8_t *ptr;
220 
221 	eptr->state = READ;
222 	eptr->jobid = job_serv_read(csserv_iothread_finished,eptr,eptr->sock,data,length);
223 	if (eptr->jobid==0) { // not done - queue full
224 		if (length!=20 && length!=21) {
225 			syslog(LOG_NOTICE,"CLTOCS_READ - wrong size (%"PRIu32"/20|21)",length);
226 			eptr->state = CLOSE;
227 			return;
228 		}
229 		if (length==21) {
230 			data++; // skip proto version
231 		}
232 		ptr = csserv_create_packet(eptr,CSTOCL_READ_STATUS,8+1);
233 		memcpy(ptr,data,8); // copy chunkid directly from source packet
234 		ptr+=8;
235 		put8bit(&ptr,MFS_ERROR_NOTDONE);
236 		eptr->state = IDLE;
237 	}
238 }
239 
csserv_write_init(csserventry * eptr,const uint8_t * data,uint32_t length)240 void csserv_write_init(csserventry *eptr,const uint8_t *data,uint32_t length) {
241 	uint8_t *ptr;
242 
243 	eptr->state = WRITE;
244 	eptr->jobid = job_serv_write(csserv_iothread_finished,eptr,eptr->sock,data,length);
245 	if (eptr->jobid==0) { // not done - queue full
246 		if (length&1) {
247 			if (length<13 || ((length-13)%6)!=0) {
248 				syslog(LOG_NOTICE,"CLTOCS_WRITE - wrong size (%"PRIu32"/13+N*6)",length);
249 				eptr->state = CLOSE;
250 				return;
251 			}
252 			data++; // skip proto version
253 		} else {
254 			if (length<12 || ((length-12)%6)!=0) {
255 				syslog(LOG_NOTICE,"CLTOCS_WRITE - wrong size (%"PRIu32"/12+N*6)",length);
256 				eptr->state = CLOSE;
257 				return;
258 			}
259 		}
260 		ptr = csserv_create_packet(eptr,CSTOCL_WRITE_STATUS,8+4+1);
261 		memcpy(ptr,data,8); // copy chunkid directly from source packet
262 		ptr+=8;
263 		put32bit(&ptr,0);
264 		put8bit(&ptr,MFS_ERROR_NOTDONE);
265 		eptr->state = IDLE;
266 	}
267 }
268 
269 /* IDLE operations */
270 
csserv_idlejob_finished(uint8_t status,void * ijp)271 void csserv_idlejob_finished(uint8_t status,void *ijp) {
272 	idlejob *ij = (idlejob*)ijp;
273 	csserventry *eptr = ij->eptr;
274 	uint8_t *ptr;
275 
276 	if (eptr) {
277 		switch (ij->op) {
278 			case IJ_GET_CHUNK_BLOCKS:
279 				ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_BLOCKS,8+4+2+1);
280 				put64bit(&ptr,ij->chunkid);
281 				put32bit(&ptr,ij->version);
282 				if (status==MFS_STATUS_OK) {
283 					memcpy(ptr,ij->buff,2);
284 					ptr+=2;
285 				} else {
286 					put16bit(&ptr,0);
287 				}
288 				put8bit(&ptr,status);
289 				break;
290 			case IJ_GET_CHUNK_CHECKSUM:
291 				if (status!=MFS_STATUS_OK) {
292 					ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+1);
293 				} else {
294 					ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+4);
295 				}
296 				put64bit(&ptr,ij->chunkid);
297 				put32bit(&ptr,ij->version);
298 				if (status!=MFS_STATUS_OK) {
299 					put8bit(&ptr,status);
300 				} else {
301 					memcpy(ptr,ij->buff,4);
302 				}
303 				break;
304 			case IJ_GET_CHUNK_CHECKSUM_TAB:
305 				if (status!=MFS_STATUS_OK) {
306 					ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+1);
307 				} else {
308 					ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+4096);
309 				}
310 				put64bit(&ptr,ij->chunkid);
311 				put32bit(&ptr,ij->version);
312 				if (status!=MFS_STATUS_OK) {
313 					put8bit(&ptr,status);
314 				} else {
315 					memcpy(ptr,ij->buff,4096);
316 				}
317 				break;
318 		}
319 		*(ij->prev) = ij->next;
320 		if (ij->next) {
321 			ij->next->prev = ij->prev;
322 		}
323 	}
324 	free(ij);
325 }
326 
csserv_get_chunk_blocks(csserventry * eptr,const uint8_t * data,uint32_t length)327 void csserv_get_chunk_blocks(csserventry *eptr,const uint8_t *data,uint32_t length) {
328 	idlejob *ij;
329 
330 	if (length!=8+4) {
331 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_BLOCKS - wrong size (%"PRIu32"/12)",length);
332 		eptr->state = CLOSE;
333 		return;
334 	}
335 	ij = malloc(offsetof(idlejob,buff)+2);
336 	ij->op = IJ_GET_CHUNK_BLOCKS;
337 	ij->chunkid = get64bit(&data);
338 	ij->version = get32bit(&data);
339 	ij->eptr = eptr;
340 	ij->next = eptr->idlejobs;
341 	ij->prev = &(eptr->idlejobs);
342 	eptr->idlejobs = ij;
343 	ij->jobid = job_get_chunk_blocks(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
344 }
345 
csserv_get_chunk_checksum(csserventry * eptr,const uint8_t * data,uint32_t length)346 void csserv_get_chunk_checksum(csserventry *eptr,const uint8_t *data,uint32_t length) {
347 	idlejob *ij;
348 
349 	if (length!=8+4) {
350 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM - wrong size (%"PRIu32"/12)",length);
351 		eptr->state = CLOSE;
352 		return;
353 	}
354 	ij = malloc(offsetof(idlejob,buff)+4);
355 	ij->op = IJ_GET_CHUNK_CHECKSUM;
356 	ij->chunkid = get64bit(&data);
357 	ij->version = get32bit(&data);
358 	ij->eptr = eptr;
359 	ij->next = eptr->idlejobs;
360 	ij->prev = &(eptr->idlejobs);
361 	eptr->idlejobs = ij;
362 	ij->jobid = job_get_chunk_checksum(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
363 }
364 
csserv_get_chunk_checksum_tab(csserventry * eptr,const uint8_t * data,uint32_t length)365 void csserv_get_chunk_checksum_tab(csserventry *eptr,const uint8_t *data,uint32_t length) {
366 	idlejob *ij;
367 
368 	if (length!=8+4) {
369 		syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM_TAB - wrong size (%"PRIu32"/12)",length);
370 		eptr->state = CLOSE;
371 		return;
372 	}
373 	ij = malloc(offsetof(idlejob,buff)+4096);
374 	ij->op = IJ_GET_CHUNK_CHECKSUM_TAB;
375 	ij->chunkid = get64bit(&data);
376 	ij->version = get32bit(&data);
377 	ij->eptr = eptr;
378 	ij->next = eptr->idlejobs;
379 	ij->prev = &(eptr->idlejobs);
380 	eptr->idlejobs = ij;
381 	ij->jobid = job_get_chunk_checksum_tab(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
382 }
383 
csserv_hdd_list(csserventry * eptr,const uint8_t * data,uint32_t length)384 void csserv_hdd_list(csserventry *eptr,const uint8_t *data,uint32_t length) {
385 	uint32_t l;
386 	uint8_t *ptr;
387 
388 	(void)data;
389 	if (length!=0) {
390 		syslog(LOG_NOTICE,"CLTOCS_HDD_LIST - wrong size (%"PRIu32"/0)",length);
391 		eptr->state = CLOSE;
392 		return;
393 	}
394 	l = hdd_diskinfo_size();	// lock
395 	ptr = csserv_create_packet(eptr,CSTOCL_HDD_LIST,l);
396 	hdd_diskinfo_data(ptr);	// unlock
397 }
398 
csserv_chart(csserventry * eptr,const uint8_t * data,uint32_t length)399 void csserv_chart(csserventry *eptr,const uint8_t *data,uint32_t length) {
400 	uint32_t chartid;
401 	uint8_t *ptr;
402 	uint32_t l;
403 	uint16_t w,h;
404 
405 	if (length!=4 && length!=8) {
406 		syslog(LOG_NOTICE,"CLTOAN_CHART - wrong size (%"PRIu32"/4|8)",length);
407 		eptr->state = CLOSE;
408 		return;
409 	}
410 	chartid = get32bit(&data);
411 	if (length==8) {
412 		w = get16bit(&data);
413 		h = get16bit(&data);
414 	} else {
415 		w = 0;
416 		h = 0;
417 	}
418 	l = charts_make_png(chartid,w,h);
419 	ptr = csserv_create_packet(eptr,ANTOCL_CHART,l);
420 	if (l>0) {
421 		charts_get_png(ptr);
422 	}
423 }
424 
csserv_chart_data(csserventry * eptr,const uint8_t * data,uint32_t length)425 void csserv_chart_data(csserventry *eptr,const uint8_t *data,uint32_t length) {
426 	uint32_t chartid;
427 	uint8_t *ptr;
428 	uint32_t l;
429 	uint32_t maxentries;
430 
431 	if (length!=4 && length!=8) {
432 		syslog(LOG_NOTICE,"CLTOAN_CHART_DATA - wrong size (%"PRIu32"/4|8)",length);
433 		eptr->state = CLOSE;
434 		return;
435 	}
436 	chartid = get32bit(&data);
437 	if (length==8) {
438 		maxentries = get32bit(&data);
439 	} else {
440 		maxentries = UINT32_C(0xFFFFFFFF);
441 	}
442 	l = charts_makedata(NULL,chartid,maxentries);
443 	ptr = csserv_create_packet(eptr,ANTOCL_CHART_DATA,l);
444 	if (l>0) {
445 		charts_makedata(ptr,chartid,maxentries);
446 	}
447 }
448 
csserv_monotonic_data(csserventry * eptr,const uint8_t * data,uint32_t length)449 void csserv_monotonic_data(csserventry *eptr,const uint8_t *data,uint32_t length) {
450 	uint8_t *ptr;
451 	uint32_t l;
452 	uint32_t dil;
453 
454 	(void)data;
455 	if (length!=0) {
456 		syslog(LOG_NOTICE,"CLTOAN_MONOTONIC_DATA - wrong size (%"PRIu32"/0)",length);
457 		eptr->state = CLOSE;
458 		return;
459 	}
460 	l = charts_monotonic_data(NULL);
461 	dil = hdd_diskinfo_monotonic_size();
462 	ptr = csserv_create_packet(eptr,ANTOCL_MONOTONIC_DATA,l+dil);
463 	if (l>0) {
464 		charts_monotonic_data(ptr);
465 		ptr += l;
466 	}
467 	hdd_diskinfo_monotonic_data(ptr);
468 }
469 
csserv_module_info(csserventry * eptr,const uint8_t * data,uint32_t length)470 void csserv_module_info(csserventry *eptr,const uint8_t *data,uint32_t length) {
471 	uint8_t *ptr;
472 
473 	if (length!=0) {
474 		syslog(LOG_NOTICE,"CLTOAN_MODULE_INFO - wrong size (%"PRIu32"/0)",length);
475 		eptr->state = CLOSE;
476 		return;
477 	}
478 	(void)data;
479 	ptr = csserv_create_packet(eptr,ANTOCL_MODULE_INFO,21);
480 	put8bit(&ptr,MODULE_TYPE_CHUNKSERVER);
481 	put16bit(&ptr,VERSMAJ);
482 	put8bit(&ptr,VERSMID);
483 	put8bit(&ptr,VERSMIN);
484 	put16bit(&ptr,masterconn_getcsid());
485 	put64bit(&ptr,masterconn_getmetaid());
486 	put32bit(&ptr,masterconn_getmasterip());
487 	put16bit(&ptr,masterconn_getmasterport());
488 }
489 
csserv_close(csserventry * eptr)490 void csserv_close(csserventry *eptr) {
491 	idlejob *ij,*nij;
492 
493 	if (eptr->jobid>0 && (eptr->state==READ || eptr->state==WRITE)) {
494 		job_pool_disable_job(eptr->jobid);
495 		job_pool_change_callback(eptr->jobid,NULL,NULL);
496 	}
497 
498 	for (ij=eptr->idlejobs ; ij ; ij=nij) {
499 		nij = ij->next;
500 		job_pool_disable_job(ij->jobid);
501 		ij->next = NULL;
502 		ij->prev = NULL;
503 		ij->eptr = NULL;
504 	}
505 }
506 
csserv_gotpacket(csserventry * eptr,uint32_t type,const uint8_t * data,uint32_t length)507 void csserv_gotpacket(csserventry *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
508 //	syslog(LOG_NOTICE,"packet %u:%u",type,length);
509 	if (type==ANTOAN_NOP) {
510 		return;
511 	}
512 	if (type==ANTOAN_UNKNOWN_COMMAND) { // for future use
513 		return;
514 	}
515 	if (type==ANTOAN_BAD_COMMAND_SIZE) { // for future use
516 		return;
517 	}
518 	if (eptr->state==IDLE) {
519 		switch (type) {
520 		case ANTOAN_GET_VERSION:
521 			csserv_get_version(eptr,data,length);
522 			break;
523 		case ANTOAN_GET_CONFIG:
524 			csserv_get_config(eptr,data,length);
525 			break;
526 		case CLTOCS_READ:
527 			csserv_read_init(eptr,data,length);
528 			break;
529 		case CLTOCS_WRITE:
530 			csserv_write_init(eptr,data,length);
531 			break;
532 		case ANTOCS_GET_CHUNK_BLOCKS:
533 			csserv_get_chunk_blocks(eptr,data,length);
534 			break;
535 		case ANTOCS_GET_CHUNK_CHECKSUM:
536 			csserv_get_chunk_checksum(eptr,data,length);
537 			break;
538 		case ANTOCS_GET_CHUNK_CHECKSUM_TAB:
539 			csserv_get_chunk_checksum_tab(eptr,data,length);
540 			break;
541 		case CLTOCS_HDD_LIST:
542 			csserv_hdd_list(eptr,data,length);
543 			break;
544 		case CLTOAN_CHART:
545 			csserv_chart(eptr,data,length);
546 			break;
547 		case CLTOAN_CHART_DATA:
548 			csserv_chart_data(eptr,data,length);
549 			break;
550 		case CLTOAN_MONOTONIC_DATA:
551 			csserv_monotonic_data(eptr,data,length);
552 			break;
553 		case CLTOAN_MODULE_INFO:
554 			csserv_module_info(eptr,data,length);
555 			break;
556 		case CLTOCS_WRITE_DATA:
557 		case CLTOCS_WRITE_FINISH:
558 			eptr->state = CLOSE; // silently ignore those packets
559 			break;
560 		default:
561 			syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
562 			eptr->state = CLOSE;
563 		}
564 	} else {
565 		syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
566 		eptr->state = CLOSE;
567 	}
568 }
569 
csserv_wantexit(void)570 void csserv_wantexit(void) {
571 	syslog(LOG_NOTICE,"closing %s:%s",ListenHost,ListenPort);
572 	tcpclose(lsock);
573 	lsock = -1;
574 }
575 
csserv_term(void)576 void csserv_term(void) {
577 	csserventry *eptr,*eaptr;
578 	packetstruct *pptr,*paptr;
579 
580 	eptr = csservhead;
581 	while (eptr) {
582 		tcpclose(eptr->sock);
583 		if (eptr->inputpacket.packet) {
584 			free(eptr->inputpacket.packet);
585 		}
586 		pptr = eptr->outputhead;
587 		while (pptr) {
588 			if (pptr->packet) {
589 				free(pptr->packet);
590 			}
591 			paptr = pptr;
592 			pptr = pptr->next;
593 			free(paptr);
594 		}
595 		eaptr = eptr;
596 		eptr = eptr->next;
597 		free(eaptr);
598 	}
599 	csservhead=NULL;
600 	free(ListenHost);
601 	free(ListenPort);
602 }
603 
csserv_read(csserventry * eptr)604 void csserv_read(csserventry *eptr) {
605 	int32_t i;
606 	uint32_t type,size;
607 	const uint8_t *ptr;
608 
609 	if (eptr->mode == HEADER) {
610 		i=read(eptr->sock,eptr->inputpacket.startptr,eptr->inputpacket.bytesleft);
611 		if (i==0) {
612 //			syslog(LOG_NOTICE,"(read) connection closed");
613 			eptr->state = CLOSE;
614 			return;
615 		}
616 		if (i<0) {
617 			if (ERRNO_ERROR) {
618 				mfs_errlog_silent(LOG_NOTICE,"(read) read error");
619 				eptr->state = CLOSE;
620 			}
621 			return;
622 		}
623 		stats_bytesin+=i;
624 		eptr->inputpacket.startptr+=i;
625 		eptr->inputpacket.bytesleft-=i;
626 
627 		if (eptr->inputpacket.bytesleft>0) {
628 			return;
629 		}
630 
631 		ptr = eptr->hdrbuff;
632 		type = get32bit(&ptr);
633 		size = get32bit(&ptr);
634 
635 		if (size>0) {
636 			if (size>MaxPacketSize) {
637 				syslog(LOG_WARNING,"(read) packet too long (%"PRIu32"/%u) ; command:%"PRIu32,size,MaxPacketSize,type);
638 				eptr->state = CLOSE;
639 				return;
640 			}
641 			eptr->inputpacket.packet = malloc(size);
642 			passert(eptr->inputpacket.packet);
643 			eptr->inputpacket.startptr = eptr->inputpacket.packet;
644 		}
645 		eptr->inputpacket.bytesleft = size;
646 		eptr->mode = DATA;
647 	}
648 	if (eptr->mode == DATA) {
649 		if (eptr->inputpacket.bytesleft>0) {
650 			i=read(eptr->sock,eptr->inputpacket.startptr,eptr->inputpacket.bytesleft);
651 			if (i==0) {
652 //				syslog(LOG_NOTICE,"(read) connection closed");
653 				eptr->state = CLOSE;
654 				return;
655 			}
656 			if (i<0) {
657 				if (ERRNO_ERROR) {
658 					mfs_errlog_silent(LOG_NOTICE,"(read) read error");
659 					eptr->state = CLOSE;
660 				}
661 				return;
662 			}
663 			stats_bytesin+=i;
664 			eptr->inputpacket.startptr+=i;
665 			eptr->inputpacket.bytesleft-=i;
666 
667 			if (eptr->inputpacket.bytesleft>0) {
668 				return;
669 			}
670 		}
671 		ptr = eptr->hdrbuff;
672 		type = get32bit(&ptr);
673 		size = get32bit(&ptr);
674 
675 		eptr->mode = HEADER;
676 		eptr->inputpacket.bytesleft = 8;
677 		eptr->inputpacket.startptr = eptr->hdrbuff;
678 
679 		csserv_gotpacket(eptr,type,eptr->inputpacket.packet,size);
680 
681 		if (eptr->state != READ && eptr->state != WRITE) {
682 			if (eptr->inputpacket.packet) {
683 				free(eptr->inputpacket.packet);
684 			}
685 			eptr->inputpacket.packet=NULL;
686 		}
687 	}
688 }
689 
csserv_write(csserventry * eptr)690 void csserv_write(csserventry *eptr) {
691 	packetstruct *pack;
692 	int32_t i;
693 	for (;;) {
694 		pack = eptr->outputhead;
695 		if (pack==NULL) {
696 			return;
697 		}
698 		i=write(eptr->sock,pack->startptr,pack->bytesleft);
699 		if (i==0) {
700 //			syslog(LOG_NOTICE,"(write) connection closed");
701 			eptr->state = CLOSE;
702 			return;
703 		}
704 		if (i<0) {
705 			if (ERRNO_ERROR) {
706 				mfs_errlog_silent(LOG_NOTICE,"(write) write error");
707 				eptr->state = CLOSE;
708 			}
709 			return;
710 		}
711 		stats_bytesout+=i;
712 		pack->startptr+=i;
713 		pack->bytesleft-=i;
714 		if (pack->bytesleft>0) {
715 			return;
716 		}
717 		free(pack->packet);
718 		eptr->outputhead = pack->next;
719 		if (eptr->outputhead==NULL) {
720 			eptr->outputtail = &(eptr->outputhead);
721 		}
722 		free(pack);
723 	}
724 }
725 
csserv_desc(struct pollfd * pdesc,uint32_t * ndesc)726 void csserv_desc(struct pollfd *pdesc,uint32_t *ndesc) {
727 	uint32_t pos = *ndesc;
728 	csserventry *eptr;
729 
730 	if (lsock>=0) {
731 		pdesc[pos].fd = lsock;
732 		pdesc[pos].events = POLLIN;
733 		lsockpdescpos = pos;
734 		pos++;
735 	} else {
736 		lsockpdescpos = 0;
737 	}
738 	for (eptr=csservhead ; eptr ; eptr=eptr->next) {
739 		eptr->pdescpos = -1;
740 		if (eptr->state==IDLE) {
741 			pdesc[pos].events = POLLIN;
742 			if (eptr->outputhead!=NULL) {
743 				pdesc[pos].events |= POLLOUT;
744 			}
745 			eptr->pdescpos = pos;
746 			pdesc[pos].fd = eptr->sock;
747 			pos++;
748 		}
749 	}
750 	*ndesc = pos;
751 }
752 
csserv_serve(struct pollfd * pdesc)753 void csserv_serve(struct pollfd *pdesc) {
754 	double now;
755 	csserventry *eptr,**kptr;
756 	packetstruct *pptr,*paptr;
757 	int ns;
758 
759 	now = monotonic_seconds();
760 
761 	if (lsockpdescpos>=0 && (pdesc[lsockpdescpos].revents & POLLIN) && lsock>=0) {
762 		ns=tcpaccept(lsock);
763 		if (ns<0) {
764 			mfs_errlog_silent(LOG_NOTICE,"accept error");
765 		} else {
766 			tcpnonblock(ns);
767 			tcpnodelay(ns);
768 			eptr = malloc(sizeof(csserventry));
769 			passert(eptr);
770 			eptr->next = csservhead;
771 			csservhead = eptr;
772 			eptr->state = IDLE;
773 			eptr->mode = HEADER;
774 			eptr->sock = ns;
775 			eptr->pdescpos = -1;
776 			eptr->lastread = now;
777 			eptr->lastwrite = now;
778 			eptr->inputpacket.bytesleft = 8;
779 			eptr->inputpacket.startptr = eptr->hdrbuff;
780 			eptr->inputpacket.packet = NULL;
781 			eptr->outputhead = NULL;
782 			eptr->outputtail = &(eptr->outputhead);
783 			eptr->jobid = 0;
784 
785 			eptr->idlejobs = NULL;
786 		}
787 	}
788 
789 	for (eptr=csservhead ; eptr ; eptr=eptr->next) {
790 		if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP))) {
791 			eptr->state = CLOSE;
792 		}
793 		if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & POLLIN) && eptr->state==IDLE) {
794 			eptr->lastread = now;
795 			csserv_read(eptr);
796 		}
797 		if (eptr->state==IDLE && eptr->lastwrite+(CSSERV_TIMEOUT/3.0)<now && eptr->outputhead==NULL) {
798 			csserv_create_packet(eptr,ANTOAN_NOP,0);
799 		}
800 		if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & POLLOUT) && eptr->state==IDLE) {
801 			eptr->lastwrite = now;
802 			csserv_write(eptr);
803 		}
804 		if (eptr->state==IDLE && eptr->lastread+CSSERV_TIMEOUT<now) {
805 //			syslog(LOG_NOTICE,"csserv: connection timed out");
806 			eptr->state = CLOSE;
807 		}
808 	}
809 
810 	kptr = &csservhead;
811 	while ((eptr=*kptr)) {
812 		if (eptr->state == CLOSE) {
813 			tcpclose(eptr->sock);
814 			csserv_close(eptr);
815 			if (eptr->inputpacket.packet) {
816 				free(eptr->inputpacket.packet);
817 			}
818 //			wptr = eptr->todolist;
819 //			while (wptr) {
820 //				waptr = wptr;
821 //				wptr = wptr->next;
822 //				free(waptr);
823 //			}
824 			pptr = eptr->outputhead;
825 			while (pptr) {
826 				if (pptr->packet) {
827 					free(pptr->packet);
828 				}
829 				paptr = pptr;
830 				pptr = pptr->next;
831 				free(paptr);
832 			}
833 			*kptr = eptr->next;
834 			free(eptr);
835 		} else {
836 			kptr = &(eptr->next);
837 		}
838 	}
839 }
840 
csserv_getlistenip()841 uint32_t csserv_getlistenip() {
842 	return mylistenip;
843 }
844 
csserv_getlistenport()845 uint16_t csserv_getlistenport() {
846 	return mylistenport;
847 }
848 
csserv_reload(void)849 void csserv_reload(void) {
850 	char *newListenHost,*newListenPort;
851 	uint32_t newmylistenip;
852 	uint16_t newmylistenport;
853 	int newlsock;
854 
855 	if (lsock<0) { // this is exiting stage - ignore reload
856 		return ;
857 	}
858 //	ThreadedServer = 1-ThreadedServer;
859 
860 	newListenHost = cfg_getstr("CSSERV_LISTEN_HOST","*");
861 	newListenPort = cfg_getstr("CSSERV_LISTEN_PORT",DEFAULT_CS_DATA_PORT);
862 	if (strcmp(newListenHost,ListenHost)==0 && strcmp(newListenPort,ListenPort)==0) {
863 		free(newListenHost);
864 		free(newListenPort);
865 		mfs_arg_syslog(LOG_NOTICE,"main server module: socket address hasn't changed (%s:%s)",ListenHost,ListenPort);
866 		return;
867 	}
868 
869 	newlsock = tcpsocket();
870 	if (newlsock<0) {
871 		mfs_errlog(LOG_WARNING,"main server module: socket address has changed, but can't create new socket");
872 		free(newListenHost);
873 		free(newListenPort);
874 		return;
875 	}
876 	tcpnonblock(newlsock);
877 	tcpnodelay(newlsock);
878 	tcpreuseaddr(newlsock);
879 	tcpresolve(newListenHost,newListenPort,&newmylistenip,&newmylistenport,1);
880 	if (tcpnumlisten(newlsock,newmylistenip,newmylistenport,100)<0) {
881 		mfs_arg_errlog(LOG_ERR,"main server module: socket address has changed, but can't listen on socket (%s:%s)",ListenHost,ListenPort);
882 		free(newListenHost);
883 		free(newListenPort);
884 		tcpclose(newlsock);
885 		return;
886 	}
887 	if (tcpsetacceptfilter(newlsock)<0 && errno!=ENOTSUP) {
888 		mfs_errlog_silent(LOG_NOTICE,"main server module: can't set accept filter");
889 	}
890 	mfs_arg_syslog(LOG_NOTICE,"main server module: socket address has changed, now listen on %s:%s",ListenHost,ListenPort);
891 	free(ListenHost);
892 	free(ListenPort);
893 	ListenHost = newListenHost;
894 	ListenPort = newListenPort;
895 	tcpclose(lsock);
896 	lsock = newlsock;
897 	mylistenip = newmylistenip;
898 	mylistenport = newmylistenport;
899 	masterconn_forcereconnect();
900 }
901 
csserv_init(void)902 int csserv_init(void) {
903 	ListenHost = cfg_getstr("CSSERV_LISTEN_HOST","*");
904 	ListenPort = cfg_getstr("CSSERV_LISTEN_PORT",DEFAULT_CS_DATA_PORT);
905 
906 	lsock = tcpsocket();
907 	if (lsock<0) {
908 		mfs_errlog(LOG_ERR,"main server module: can't create socket");
909 		return -1;
910 	}
911 	tcpnonblock(lsock);
912 	tcpnodelay(lsock);
913 	tcpreuseaddr(lsock);
914 	tcpresolve(ListenHost,ListenPort,&mylistenip,&mylistenport,1);
915 	if (tcpnumlisten(lsock,mylistenip,mylistenport,100)<0) {
916 		mfs_errlog(LOG_ERR,"main server module: can't listen on socket");
917 		return -1;
918 	}
919 	if (tcpsetacceptfilter(lsock)<0 && errno!=ENOTSUP) {
920 		mfs_errlog_silent(LOG_NOTICE,"main server module: can't set accept filter");
921 	}
922 	mfs_arg_syslog(LOG_NOTICE,"main server module: listen on %s:%s",ListenHost,ListenPort);
923 
924 	csservhead = NULL;
925 	main_wantexit_register(csserv_wantexit);
926 	main_reload_register(csserv_reload);
927 	main_destruct_register(csserv_term);
928 	main_poll_register(csserv_desc,csserv_serve);
929 
930 	return 0;
931 }
932