1 /*
2 * Copyright (C) 2021 Jakub Kruszona-Zawadzki, Core Technology Sp. z o.o.
3 *
4 * This file is part of MooseFS.
5 *
6 * MooseFS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, version 2 (only).
9 *
10 * MooseFS is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with MooseFS; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301, USA
18 * or visit http://www.gnu.org/licenses/gpl-2.0.html
19 */
20
21 #include "config.h"
22
23 #include <stddef.h>
24 #include <time.h>
25 #include <sys/types.h>
26 #include <sys/uio.h>
27 #include <unistd.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <syslog.h>
31 #include <errno.h>
32 #include <inttypes.h>
33 #include <netinet/in.h>
34
35 #include "MFSCommunication.h"
36
37 #include "datapack.h"
38 #include "csserv.h"
39 #include "cfg.h"
40 #include "main.h"
41 #include "clocks.h"
42 #include "sockets.h"
43 #include "hddspacemgr.h"
44 #include "masterconn.h"
45 #include "charts.h"
46 #include "slogger.h"
47 #include "bgjobs.h"
48 #include "massert.h"
49
50 // connection timeout in seconds
51 #define CSSERV_TIMEOUT 5
52
53 #define MaxPacketSize CSTOCS_MAXPACKETSIZE
54
55 //csserventry.mode
56 enum {HEADER,DATA};
57
58 //csserventry.state
59 enum {IDLE,READ,WRITE,CLOSE};
60
61 struct csserventry;
62
63 enum {IJ_GET_CHUNK_BLOCKS,IJ_GET_CHUNK_CHECKSUM,IJ_GET_CHUNK_CHECKSUM_TAB};
64
65 typedef struct idlejob {
66 uint32_t jobid;
67 uint8_t op;
68 uint64_t chunkid;
69 uint32_t version;
70 struct csserventry *eptr;
71 struct idlejob *next,**prev;
72 uint8_t buff[1];
73 } idlejob;
74
75 typedef struct packetstruct {
76 struct packetstruct *next;
77 uint8_t *startptr;
78 uint32_t bytesleft;
79 uint8_t *packet;
80 } packetstruct;
81
82 typedef struct csserventry {
83 uint8_t state;
84 uint8_t mode;
85
86 int sock;
87 int32_t pdescpos;
88 double lastread,lastwrite;
89 uint32_t activity;
90 uint8_t hdrbuff[8];
91 packetstruct inputpacket;
92 packetstruct *outputhead,**outputtail;
93
94 uint32_t jobid;
95
96 struct idlejob *idlejobs;
97
98 struct csserventry *next;
99 } csserventry;
100
101 static csserventry *csservhead=NULL;
102 static int lsock;
103 static int32_t lsockpdescpos;
104
105 static uint32_t mylistenip;
106 static uint16_t mylistenport;
107
108 static uint64_t stats_bytesin=0;
109 static uint64_t stats_bytesout=0;
110
111 // from config
112 static char *ListenHost;
113 static char *ListenPort;
114
csserv_stats(uint64_t * bin,uint64_t * bout)115 void csserv_stats(uint64_t *bin,uint64_t *bout) {
116 *bin = stats_bytesin;
117 *bout = stats_bytesout;
118 stats_bytesin = 0;
119 stats_bytesout = 0;
120 }
121
csserv_create_packet(csserventry * eptr,uint32_t type,uint32_t size)122 uint8_t* csserv_create_packet(csserventry *eptr,uint32_t type,uint32_t size) {
123 packetstruct *outpacket;
124 uint8_t *ptr;
125 uint32_t psize;
126
127 outpacket = malloc(sizeof(packetstruct));
128 #ifndef __clang_analyzer__
129 passert(outpacket);
130 // clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
131 #endif
132 psize = size+8;
133 outpacket->packet=malloc(psize);
134 #ifndef __clang_analyzer__
135 passert(outpacket->packet);
136 // clang analyzer has problem with testing for (void*)(-1) which is needed for memory allocated by mmap
137 #endif
138 outpacket->bytesleft = psize;
139 ptr = outpacket->packet;
140 put32bit(&ptr,type);
141 put32bit(&ptr,size);
142 outpacket->startptr = (uint8_t*)(outpacket->packet);
143 outpacket->next = NULL;
144 *(eptr->outputtail) = outpacket;
145 eptr->outputtail = &(outpacket->next);
146 return ptr;
147 }
148
csserv_get_version(csserventry * eptr,const uint8_t * data,uint32_t length)149 void csserv_get_version(csserventry *eptr,const uint8_t *data,uint32_t length) {
150 uint32_t msgid = 0;
151 uint8_t *ptr;
152 static const char vstring[] = VERSSTR;
153 if (length!=0 && length!=4) {
154 syslog(LOG_NOTICE,"ANTOAN_GET_VERSION - wrong size (%"PRIu32"/4|0)",length);
155 eptr->state = CLOSE;
156 return;
157 }
158 if (length==4) {
159 msgid = get32bit(&data);
160 ptr = csserv_create_packet(eptr,ANTOAN_VERSION,4+4+strlen(vstring));
161 put32bit(&ptr,msgid);
162 } else {
163 ptr = csserv_create_packet(eptr,ANTOAN_VERSION,4+strlen(vstring));
164 }
165 put16bit(&ptr,VERSMAJ);
166 put8bit(&ptr,VERSMID);
167 put8bit(&ptr,VERSMIN);
168 memcpy(ptr,vstring,strlen(vstring));
169 }
170
csserv_get_config(csserventry * eptr,const uint8_t * data,uint32_t length)171 void csserv_get_config(csserventry *eptr,const uint8_t *data,uint32_t length) {
172 uint32_t msgid;
173 char name[256];
174 uint8_t nleng;
175 uint32_t vleng;
176 char *val;
177 uint8_t *ptr;
178
179 if (length<5) {
180 syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32")",length);
181 eptr->state = CLOSE;
182 return;
183 }
184 msgid = get32bit(&data);
185 nleng = get8bit(&data);
186 if (length!=5U+(uint32_t)nleng) {
187 syslog(LOG_NOTICE,"ANTOAN_GET_CONFIG - wrong size (%"PRIu32":nleng=%"PRIu8")",length,nleng);
188 eptr->state = CLOSE;
189 return;
190 }
191 memcpy(name,data,nleng);
192 name[nleng] = 0;
193 val = cfg_getstr(name,"");
194 vleng = strlen(val);
195 if (vleng>255) {
196 vleng=255;
197 }
198 ptr = csserv_create_packet(eptr,ANTOAN_CONFIG_VALUE,5+vleng);
199 put32bit(&ptr,msgid);
200 put8bit(&ptr,vleng);
201 memcpy(ptr,val,vleng);
202 }
203
csserv_iothread_finished(uint8_t status,void * e)204 void csserv_iothread_finished(uint8_t status,void *e) {
205 csserventry *eptr = (csserventry*)e;
206 if (status==0) {
207 eptr->state = CLOSE;
208 } else {
209 eptr->state = IDLE;
210 }
211 eptr->jobid = 0;
212 if (eptr->inputpacket.packet) {
213 free(eptr->inputpacket.packet);
214 }
215 eptr->inputpacket.packet=NULL;
216 }
217
csserv_read_init(csserventry * eptr,const uint8_t * data,uint32_t length)218 void csserv_read_init(csserventry *eptr,const uint8_t *data,uint32_t length) {
219 uint8_t *ptr;
220
221 eptr->state = READ;
222 eptr->jobid = job_serv_read(csserv_iothread_finished,eptr,eptr->sock,data,length);
223 if (eptr->jobid==0) { // not done - queue full
224 if (length!=20 && length!=21) {
225 syslog(LOG_NOTICE,"CLTOCS_READ - wrong size (%"PRIu32"/20|21)",length);
226 eptr->state = CLOSE;
227 return;
228 }
229 if (length==21) {
230 data++; // skip proto version
231 }
232 ptr = csserv_create_packet(eptr,CSTOCL_READ_STATUS,8+1);
233 memcpy(ptr,data,8); // copy chunkid directly from source packet
234 ptr+=8;
235 put8bit(&ptr,MFS_ERROR_NOTDONE);
236 eptr->state = IDLE;
237 }
238 }
239
csserv_write_init(csserventry * eptr,const uint8_t * data,uint32_t length)240 void csserv_write_init(csserventry *eptr,const uint8_t *data,uint32_t length) {
241 uint8_t *ptr;
242
243 eptr->state = WRITE;
244 eptr->jobid = job_serv_write(csserv_iothread_finished,eptr,eptr->sock,data,length);
245 if (eptr->jobid==0) { // not done - queue full
246 if (length&1) {
247 if (length<13 || ((length-13)%6)!=0) {
248 syslog(LOG_NOTICE,"CLTOCS_WRITE - wrong size (%"PRIu32"/13+N*6)",length);
249 eptr->state = CLOSE;
250 return;
251 }
252 data++; // skip proto version
253 } else {
254 if (length<12 || ((length-12)%6)!=0) {
255 syslog(LOG_NOTICE,"CLTOCS_WRITE - wrong size (%"PRIu32"/12+N*6)",length);
256 eptr->state = CLOSE;
257 return;
258 }
259 }
260 ptr = csserv_create_packet(eptr,CSTOCL_WRITE_STATUS,8+4+1);
261 memcpy(ptr,data,8); // copy chunkid directly from source packet
262 ptr+=8;
263 put32bit(&ptr,0);
264 put8bit(&ptr,MFS_ERROR_NOTDONE);
265 eptr->state = IDLE;
266 }
267 }
268
269 /* IDLE operations */
270
csserv_idlejob_finished(uint8_t status,void * ijp)271 void csserv_idlejob_finished(uint8_t status,void *ijp) {
272 idlejob *ij = (idlejob*)ijp;
273 csserventry *eptr = ij->eptr;
274 uint8_t *ptr;
275
276 if (eptr) {
277 switch (ij->op) {
278 case IJ_GET_CHUNK_BLOCKS:
279 ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_BLOCKS,8+4+2+1);
280 put64bit(&ptr,ij->chunkid);
281 put32bit(&ptr,ij->version);
282 if (status==MFS_STATUS_OK) {
283 memcpy(ptr,ij->buff,2);
284 ptr+=2;
285 } else {
286 put16bit(&ptr,0);
287 }
288 put8bit(&ptr,status);
289 break;
290 case IJ_GET_CHUNK_CHECKSUM:
291 if (status!=MFS_STATUS_OK) {
292 ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+1);
293 } else {
294 ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM,8+4+4);
295 }
296 put64bit(&ptr,ij->chunkid);
297 put32bit(&ptr,ij->version);
298 if (status!=MFS_STATUS_OK) {
299 put8bit(&ptr,status);
300 } else {
301 memcpy(ptr,ij->buff,4);
302 }
303 break;
304 case IJ_GET_CHUNK_CHECKSUM_TAB:
305 if (status!=MFS_STATUS_OK) {
306 ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+1);
307 } else {
308 ptr = csserv_create_packet(eptr,CSTOAN_CHUNK_CHECKSUM_TAB,8+4+4096);
309 }
310 put64bit(&ptr,ij->chunkid);
311 put32bit(&ptr,ij->version);
312 if (status!=MFS_STATUS_OK) {
313 put8bit(&ptr,status);
314 } else {
315 memcpy(ptr,ij->buff,4096);
316 }
317 break;
318 }
319 *(ij->prev) = ij->next;
320 if (ij->next) {
321 ij->next->prev = ij->prev;
322 }
323 }
324 free(ij);
325 }
326
csserv_get_chunk_blocks(csserventry * eptr,const uint8_t * data,uint32_t length)327 void csserv_get_chunk_blocks(csserventry *eptr,const uint8_t *data,uint32_t length) {
328 idlejob *ij;
329
330 if (length!=8+4) {
331 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_BLOCKS - wrong size (%"PRIu32"/12)",length);
332 eptr->state = CLOSE;
333 return;
334 }
335 ij = malloc(offsetof(idlejob,buff)+2);
336 ij->op = IJ_GET_CHUNK_BLOCKS;
337 ij->chunkid = get64bit(&data);
338 ij->version = get32bit(&data);
339 ij->eptr = eptr;
340 ij->next = eptr->idlejobs;
341 ij->prev = &(eptr->idlejobs);
342 eptr->idlejobs = ij;
343 ij->jobid = job_get_chunk_blocks(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
344 }
345
csserv_get_chunk_checksum(csserventry * eptr,const uint8_t * data,uint32_t length)346 void csserv_get_chunk_checksum(csserventry *eptr,const uint8_t *data,uint32_t length) {
347 idlejob *ij;
348
349 if (length!=8+4) {
350 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM - wrong size (%"PRIu32"/12)",length);
351 eptr->state = CLOSE;
352 return;
353 }
354 ij = malloc(offsetof(idlejob,buff)+4);
355 ij->op = IJ_GET_CHUNK_CHECKSUM;
356 ij->chunkid = get64bit(&data);
357 ij->version = get32bit(&data);
358 ij->eptr = eptr;
359 ij->next = eptr->idlejobs;
360 ij->prev = &(eptr->idlejobs);
361 eptr->idlejobs = ij;
362 ij->jobid = job_get_chunk_checksum(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
363 }
364
csserv_get_chunk_checksum_tab(csserventry * eptr,const uint8_t * data,uint32_t length)365 void csserv_get_chunk_checksum_tab(csserventry *eptr,const uint8_t *data,uint32_t length) {
366 idlejob *ij;
367
368 if (length!=8+4) {
369 syslog(LOG_NOTICE,"ANTOCS_GET_CHUNK_CHECKSUM_TAB - wrong size (%"PRIu32"/12)",length);
370 eptr->state = CLOSE;
371 return;
372 }
373 ij = malloc(offsetof(idlejob,buff)+4096);
374 ij->op = IJ_GET_CHUNK_CHECKSUM_TAB;
375 ij->chunkid = get64bit(&data);
376 ij->version = get32bit(&data);
377 ij->eptr = eptr;
378 ij->next = eptr->idlejobs;
379 ij->prev = &(eptr->idlejobs);
380 eptr->idlejobs = ij;
381 ij->jobid = job_get_chunk_checksum_tab(csserv_idlejob_finished,ij,ij->chunkid,ij->version,ij->buff);
382 }
383
csserv_hdd_list(csserventry * eptr,const uint8_t * data,uint32_t length)384 void csserv_hdd_list(csserventry *eptr,const uint8_t *data,uint32_t length) {
385 uint32_t l;
386 uint8_t *ptr;
387
388 (void)data;
389 if (length!=0) {
390 syslog(LOG_NOTICE,"CLTOCS_HDD_LIST - wrong size (%"PRIu32"/0)",length);
391 eptr->state = CLOSE;
392 return;
393 }
394 l = hdd_diskinfo_size(); // lock
395 ptr = csserv_create_packet(eptr,CSTOCL_HDD_LIST,l);
396 hdd_diskinfo_data(ptr); // unlock
397 }
398
csserv_chart(csserventry * eptr,const uint8_t * data,uint32_t length)399 void csserv_chart(csserventry *eptr,const uint8_t *data,uint32_t length) {
400 uint32_t chartid;
401 uint8_t *ptr;
402 uint32_t l;
403 uint16_t w,h;
404
405 if (length!=4 && length!=8) {
406 syslog(LOG_NOTICE,"CLTOAN_CHART - wrong size (%"PRIu32"/4|8)",length);
407 eptr->state = CLOSE;
408 return;
409 }
410 chartid = get32bit(&data);
411 if (length==8) {
412 w = get16bit(&data);
413 h = get16bit(&data);
414 } else {
415 w = 0;
416 h = 0;
417 }
418 l = charts_make_png(chartid,w,h);
419 ptr = csserv_create_packet(eptr,ANTOCL_CHART,l);
420 if (l>0) {
421 charts_get_png(ptr);
422 }
423 }
424
csserv_chart_data(csserventry * eptr,const uint8_t * data,uint32_t length)425 void csserv_chart_data(csserventry *eptr,const uint8_t *data,uint32_t length) {
426 uint32_t chartid;
427 uint8_t *ptr;
428 uint32_t l;
429 uint32_t maxentries;
430
431 if (length!=4 && length!=8) {
432 syslog(LOG_NOTICE,"CLTOAN_CHART_DATA - wrong size (%"PRIu32"/4|8)",length);
433 eptr->state = CLOSE;
434 return;
435 }
436 chartid = get32bit(&data);
437 if (length==8) {
438 maxentries = get32bit(&data);
439 } else {
440 maxentries = UINT32_C(0xFFFFFFFF);
441 }
442 l = charts_makedata(NULL,chartid,maxentries);
443 ptr = csserv_create_packet(eptr,ANTOCL_CHART_DATA,l);
444 if (l>0) {
445 charts_makedata(ptr,chartid,maxentries);
446 }
447 }
448
csserv_monotonic_data(csserventry * eptr,const uint8_t * data,uint32_t length)449 void csserv_monotonic_data(csserventry *eptr,const uint8_t *data,uint32_t length) {
450 uint8_t *ptr;
451 uint32_t l;
452 uint32_t dil;
453
454 (void)data;
455 if (length!=0) {
456 syslog(LOG_NOTICE,"CLTOAN_MONOTONIC_DATA - wrong size (%"PRIu32"/0)",length);
457 eptr->state = CLOSE;
458 return;
459 }
460 l = charts_monotonic_data(NULL);
461 dil = hdd_diskinfo_monotonic_size();
462 ptr = csserv_create_packet(eptr,ANTOCL_MONOTONIC_DATA,l+dil);
463 if (l>0) {
464 charts_monotonic_data(ptr);
465 ptr += l;
466 }
467 hdd_diskinfo_monotonic_data(ptr);
468 }
469
csserv_module_info(csserventry * eptr,const uint8_t * data,uint32_t length)470 void csserv_module_info(csserventry *eptr,const uint8_t *data,uint32_t length) {
471 uint8_t *ptr;
472
473 if (length!=0) {
474 syslog(LOG_NOTICE,"CLTOAN_MODULE_INFO - wrong size (%"PRIu32"/0)",length);
475 eptr->state = CLOSE;
476 return;
477 }
478 (void)data;
479 ptr = csserv_create_packet(eptr,ANTOCL_MODULE_INFO,21);
480 put8bit(&ptr,MODULE_TYPE_CHUNKSERVER);
481 put16bit(&ptr,VERSMAJ);
482 put8bit(&ptr,VERSMID);
483 put8bit(&ptr,VERSMIN);
484 put16bit(&ptr,masterconn_getcsid());
485 put64bit(&ptr,masterconn_getmetaid());
486 put32bit(&ptr,masterconn_getmasterip());
487 put16bit(&ptr,masterconn_getmasterport());
488 }
489
csserv_close(csserventry * eptr)490 void csserv_close(csserventry *eptr) {
491 idlejob *ij,*nij;
492
493 if (eptr->jobid>0 && (eptr->state==READ || eptr->state==WRITE)) {
494 job_pool_disable_job(eptr->jobid);
495 job_pool_change_callback(eptr->jobid,NULL,NULL);
496 }
497
498 for (ij=eptr->idlejobs ; ij ; ij=nij) {
499 nij = ij->next;
500 job_pool_disable_job(ij->jobid);
501 ij->next = NULL;
502 ij->prev = NULL;
503 ij->eptr = NULL;
504 }
505 }
506
csserv_gotpacket(csserventry * eptr,uint32_t type,const uint8_t * data,uint32_t length)507 void csserv_gotpacket(csserventry *eptr,uint32_t type,const uint8_t *data,uint32_t length) {
508 // syslog(LOG_NOTICE,"packet %u:%u",type,length);
509 if (type==ANTOAN_NOP) {
510 return;
511 }
512 if (type==ANTOAN_UNKNOWN_COMMAND) { // for future use
513 return;
514 }
515 if (type==ANTOAN_BAD_COMMAND_SIZE) { // for future use
516 return;
517 }
518 if (eptr->state==IDLE) {
519 switch (type) {
520 case ANTOAN_GET_VERSION:
521 csserv_get_version(eptr,data,length);
522 break;
523 case ANTOAN_GET_CONFIG:
524 csserv_get_config(eptr,data,length);
525 break;
526 case CLTOCS_READ:
527 csserv_read_init(eptr,data,length);
528 break;
529 case CLTOCS_WRITE:
530 csserv_write_init(eptr,data,length);
531 break;
532 case ANTOCS_GET_CHUNK_BLOCKS:
533 csserv_get_chunk_blocks(eptr,data,length);
534 break;
535 case ANTOCS_GET_CHUNK_CHECKSUM:
536 csserv_get_chunk_checksum(eptr,data,length);
537 break;
538 case ANTOCS_GET_CHUNK_CHECKSUM_TAB:
539 csserv_get_chunk_checksum_tab(eptr,data,length);
540 break;
541 case CLTOCS_HDD_LIST:
542 csserv_hdd_list(eptr,data,length);
543 break;
544 case CLTOAN_CHART:
545 csserv_chart(eptr,data,length);
546 break;
547 case CLTOAN_CHART_DATA:
548 csserv_chart_data(eptr,data,length);
549 break;
550 case CLTOAN_MONOTONIC_DATA:
551 csserv_monotonic_data(eptr,data,length);
552 break;
553 case CLTOAN_MODULE_INFO:
554 csserv_module_info(eptr,data,length);
555 break;
556 case CLTOCS_WRITE_DATA:
557 case CLTOCS_WRITE_FINISH:
558 eptr->state = CLOSE; // silently ignore those packets
559 break;
560 default:
561 syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
562 eptr->state = CLOSE;
563 }
564 } else {
565 syslog(LOG_NOTICE,"got unknown message (type:%"PRIu32")",type);
566 eptr->state = CLOSE;
567 }
568 }
569
csserv_wantexit(void)570 void csserv_wantexit(void) {
571 syslog(LOG_NOTICE,"closing %s:%s",ListenHost,ListenPort);
572 tcpclose(lsock);
573 lsock = -1;
574 }
575
csserv_term(void)576 void csserv_term(void) {
577 csserventry *eptr,*eaptr;
578 packetstruct *pptr,*paptr;
579
580 eptr = csservhead;
581 while (eptr) {
582 tcpclose(eptr->sock);
583 if (eptr->inputpacket.packet) {
584 free(eptr->inputpacket.packet);
585 }
586 pptr = eptr->outputhead;
587 while (pptr) {
588 if (pptr->packet) {
589 free(pptr->packet);
590 }
591 paptr = pptr;
592 pptr = pptr->next;
593 free(paptr);
594 }
595 eaptr = eptr;
596 eptr = eptr->next;
597 free(eaptr);
598 }
599 csservhead=NULL;
600 free(ListenHost);
601 free(ListenPort);
602 }
603
csserv_read(csserventry * eptr)604 void csserv_read(csserventry *eptr) {
605 int32_t i;
606 uint32_t type,size;
607 const uint8_t *ptr;
608
609 if (eptr->mode == HEADER) {
610 i=read(eptr->sock,eptr->inputpacket.startptr,eptr->inputpacket.bytesleft);
611 if (i==0) {
612 // syslog(LOG_NOTICE,"(read) connection closed");
613 eptr->state = CLOSE;
614 return;
615 }
616 if (i<0) {
617 if (ERRNO_ERROR) {
618 mfs_errlog_silent(LOG_NOTICE,"(read) read error");
619 eptr->state = CLOSE;
620 }
621 return;
622 }
623 stats_bytesin+=i;
624 eptr->inputpacket.startptr+=i;
625 eptr->inputpacket.bytesleft-=i;
626
627 if (eptr->inputpacket.bytesleft>0) {
628 return;
629 }
630
631 ptr = eptr->hdrbuff;
632 type = get32bit(&ptr);
633 size = get32bit(&ptr);
634
635 if (size>0) {
636 if (size>MaxPacketSize) {
637 syslog(LOG_WARNING,"(read) packet too long (%"PRIu32"/%u) ; command:%"PRIu32,size,MaxPacketSize,type);
638 eptr->state = CLOSE;
639 return;
640 }
641 eptr->inputpacket.packet = malloc(size);
642 passert(eptr->inputpacket.packet);
643 eptr->inputpacket.startptr = eptr->inputpacket.packet;
644 }
645 eptr->inputpacket.bytesleft = size;
646 eptr->mode = DATA;
647 }
648 if (eptr->mode == DATA) {
649 if (eptr->inputpacket.bytesleft>0) {
650 i=read(eptr->sock,eptr->inputpacket.startptr,eptr->inputpacket.bytesleft);
651 if (i==0) {
652 // syslog(LOG_NOTICE,"(read) connection closed");
653 eptr->state = CLOSE;
654 return;
655 }
656 if (i<0) {
657 if (ERRNO_ERROR) {
658 mfs_errlog_silent(LOG_NOTICE,"(read) read error");
659 eptr->state = CLOSE;
660 }
661 return;
662 }
663 stats_bytesin+=i;
664 eptr->inputpacket.startptr+=i;
665 eptr->inputpacket.bytesleft-=i;
666
667 if (eptr->inputpacket.bytesleft>0) {
668 return;
669 }
670 }
671 ptr = eptr->hdrbuff;
672 type = get32bit(&ptr);
673 size = get32bit(&ptr);
674
675 eptr->mode = HEADER;
676 eptr->inputpacket.bytesleft = 8;
677 eptr->inputpacket.startptr = eptr->hdrbuff;
678
679 csserv_gotpacket(eptr,type,eptr->inputpacket.packet,size);
680
681 if (eptr->state != READ && eptr->state != WRITE) {
682 if (eptr->inputpacket.packet) {
683 free(eptr->inputpacket.packet);
684 }
685 eptr->inputpacket.packet=NULL;
686 }
687 }
688 }
689
csserv_write(csserventry * eptr)690 void csserv_write(csserventry *eptr) {
691 packetstruct *pack;
692 int32_t i;
693 for (;;) {
694 pack = eptr->outputhead;
695 if (pack==NULL) {
696 return;
697 }
698 i=write(eptr->sock,pack->startptr,pack->bytesleft);
699 if (i==0) {
700 // syslog(LOG_NOTICE,"(write) connection closed");
701 eptr->state = CLOSE;
702 return;
703 }
704 if (i<0) {
705 if (ERRNO_ERROR) {
706 mfs_errlog_silent(LOG_NOTICE,"(write) write error");
707 eptr->state = CLOSE;
708 }
709 return;
710 }
711 stats_bytesout+=i;
712 pack->startptr+=i;
713 pack->bytesleft-=i;
714 if (pack->bytesleft>0) {
715 return;
716 }
717 free(pack->packet);
718 eptr->outputhead = pack->next;
719 if (eptr->outputhead==NULL) {
720 eptr->outputtail = &(eptr->outputhead);
721 }
722 free(pack);
723 }
724 }
725
csserv_desc(struct pollfd * pdesc,uint32_t * ndesc)726 void csserv_desc(struct pollfd *pdesc,uint32_t *ndesc) {
727 uint32_t pos = *ndesc;
728 csserventry *eptr;
729
730 if (lsock>=0) {
731 pdesc[pos].fd = lsock;
732 pdesc[pos].events = POLLIN;
733 lsockpdescpos = pos;
734 pos++;
735 } else {
736 lsockpdescpos = 0;
737 }
738 for (eptr=csservhead ; eptr ; eptr=eptr->next) {
739 eptr->pdescpos = -1;
740 if (eptr->state==IDLE) {
741 pdesc[pos].events = POLLIN;
742 if (eptr->outputhead!=NULL) {
743 pdesc[pos].events |= POLLOUT;
744 }
745 eptr->pdescpos = pos;
746 pdesc[pos].fd = eptr->sock;
747 pos++;
748 }
749 }
750 *ndesc = pos;
751 }
752
csserv_serve(struct pollfd * pdesc)753 void csserv_serve(struct pollfd *pdesc) {
754 double now;
755 csserventry *eptr,**kptr;
756 packetstruct *pptr,*paptr;
757 int ns;
758
759 now = monotonic_seconds();
760
761 if (lsockpdescpos>=0 && (pdesc[lsockpdescpos].revents & POLLIN) && lsock>=0) {
762 ns=tcpaccept(lsock);
763 if (ns<0) {
764 mfs_errlog_silent(LOG_NOTICE,"accept error");
765 } else {
766 tcpnonblock(ns);
767 tcpnodelay(ns);
768 eptr = malloc(sizeof(csserventry));
769 passert(eptr);
770 eptr->next = csservhead;
771 csservhead = eptr;
772 eptr->state = IDLE;
773 eptr->mode = HEADER;
774 eptr->sock = ns;
775 eptr->pdescpos = -1;
776 eptr->lastread = now;
777 eptr->lastwrite = now;
778 eptr->inputpacket.bytesleft = 8;
779 eptr->inputpacket.startptr = eptr->hdrbuff;
780 eptr->inputpacket.packet = NULL;
781 eptr->outputhead = NULL;
782 eptr->outputtail = &(eptr->outputhead);
783 eptr->jobid = 0;
784
785 eptr->idlejobs = NULL;
786 }
787 }
788
789 for (eptr=csservhead ; eptr ; eptr=eptr->next) {
790 if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & (POLLERR|POLLHUP))) {
791 eptr->state = CLOSE;
792 }
793 if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & POLLIN) && eptr->state==IDLE) {
794 eptr->lastread = now;
795 csserv_read(eptr);
796 }
797 if (eptr->state==IDLE && eptr->lastwrite+(CSSERV_TIMEOUT/3.0)<now && eptr->outputhead==NULL) {
798 csserv_create_packet(eptr,ANTOAN_NOP,0);
799 }
800 if (eptr->pdescpos>=0 && (pdesc[eptr->pdescpos].revents & POLLOUT) && eptr->state==IDLE) {
801 eptr->lastwrite = now;
802 csserv_write(eptr);
803 }
804 if (eptr->state==IDLE && eptr->lastread+CSSERV_TIMEOUT<now) {
805 // syslog(LOG_NOTICE,"csserv: connection timed out");
806 eptr->state = CLOSE;
807 }
808 }
809
810 kptr = &csservhead;
811 while ((eptr=*kptr)) {
812 if (eptr->state == CLOSE) {
813 tcpclose(eptr->sock);
814 csserv_close(eptr);
815 if (eptr->inputpacket.packet) {
816 free(eptr->inputpacket.packet);
817 }
818 // wptr = eptr->todolist;
819 // while (wptr) {
820 // waptr = wptr;
821 // wptr = wptr->next;
822 // free(waptr);
823 // }
824 pptr = eptr->outputhead;
825 while (pptr) {
826 if (pptr->packet) {
827 free(pptr->packet);
828 }
829 paptr = pptr;
830 pptr = pptr->next;
831 free(paptr);
832 }
833 *kptr = eptr->next;
834 free(eptr);
835 } else {
836 kptr = &(eptr->next);
837 }
838 }
839 }
840
csserv_getlistenip()841 uint32_t csserv_getlistenip() {
842 return mylistenip;
843 }
844
csserv_getlistenport()845 uint16_t csserv_getlistenport() {
846 return mylistenport;
847 }
848
csserv_reload(void)849 void csserv_reload(void) {
850 char *newListenHost,*newListenPort;
851 uint32_t newmylistenip;
852 uint16_t newmylistenport;
853 int newlsock;
854
855 if (lsock<0) { // this is exiting stage - ignore reload
856 return ;
857 }
858 // ThreadedServer = 1-ThreadedServer;
859
860 newListenHost = cfg_getstr("CSSERV_LISTEN_HOST","*");
861 newListenPort = cfg_getstr("CSSERV_LISTEN_PORT",DEFAULT_CS_DATA_PORT);
862 if (strcmp(newListenHost,ListenHost)==0 && strcmp(newListenPort,ListenPort)==0) {
863 free(newListenHost);
864 free(newListenPort);
865 mfs_arg_syslog(LOG_NOTICE,"main server module: socket address hasn't changed (%s:%s)",ListenHost,ListenPort);
866 return;
867 }
868
869 newlsock = tcpsocket();
870 if (newlsock<0) {
871 mfs_errlog(LOG_WARNING,"main server module: socket address has changed, but can't create new socket");
872 free(newListenHost);
873 free(newListenPort);
874 return;
875 }
876 tcpnonblock(newlsock);
877 tcpnodelay(newlsock);
878 tcpreuseaddr(newlsock);
879 tcpresolve(newListenHost,newListenPort,&newmylistenip,&newmylistenport,1);
880 if (tcpnumlisten(newlsock,newmylistenip,newmylistenport,100)<0) {
881 mfs_arg_errlog(LOG_ERR,"main server module: socket address has changed, but can't listen on socket (%s:%s)",ListenHost,ListenPort);
882 free(newListenHost);
883 free(newListenPort);
884 tcpclose(newlsock);
885 return;
886 }
887 if (tcpsetacceptfilter(newlsock)<0 && errno!=ENOTSUP) {
888 mfs_errlog_silent(LOG_NOTICE,"main server module: can't set accept filter");
889 }
890 mfs_arg_syslog(LOG_NOTICE,"main server module: socket address has changed, now listen on %s:%s",ListenHost,ListenPort);
891 free(ListenHost);
892 free(ListenPort);
893 ListenHost = newListenHost;
894 ListenPort = newListenPort;
895 tcpclose(lsock);
896 lsock = newlsock;
897 mylistenip = newmylistenip;
898 mylistenport = newmylistenport;
899 masterconn_forcereconnect();
900 }
901
csserv_init(void)902 int csserv_init(void) {
903 ListenHost = cfg_getstr("CSSERV_LISTEN_HOST","*");
904 ListenPort = cfg_getstr("CSSERV_LISTEN_PORT",DEFAULT_CS_DATA_PORT);
905
906 lsock = tcpsocket();
907 if (lsock<0) {
908 mfs_errlog(LOG_ERR,"main server module: can't create socket");
909 return -1;
910 }
911 tcpnonblock(lsock);
912 tcpnodelay(lsock);
913 tcpreuseaddr(lsock);
914 tcpresolve(ListenHost,ListenPort,&mylistenip,&mylistenport,1);
915 if (tcpnumlisten(lsock,mylistenip,mylistenport,100)<0) {
916 mfs_errlog(LOG_ERR,"main server module: can't listen on socket");
917 return -1;
918 }
919 if (tcpsetacceptfilter(lsock)<0 && errno!=ENOTSUP) {
920 mfs_errlog_silent(LOG_NOTICE,"main server module: can't set accept filter");
921 }
922 mfs_arg_syslog(LOG_NOTICE,"main server module: listen on %s:%s",ListenHost,ListenPort);
923
924 csservhead = NULL;
925 main_wantexit_register(csserv_wantexit);
926 main_reload_register(csserv_reload);
927 main_destruct_register(csserv_term);
928 main_poll_register(csserv_desc,csserv_serve);
929
930 return 0;
931 }
932