1 /*
2 * Copyright (c) 2000-2006 Alberto Reggiori <areggiori@webweaving.org>
3 * Dirk-Willem van Gulik <dirkx@webweaving.org>
4 *
5 * NOTICE
6 *
7 * This product is distributed under a BSD/ASF like license as described in the 'LICENSE'
8 * file you should have received together with this source code. If you did not get a
9 * a copy of such a license agreement you can pick up one at:
10 *
11 * http://rdfstore.sourceforge.net/LICENSE
12 *
13 *
14 * $Id: deamon.c,v 1.26 2006/06/19 10:10:22 areggiori Exp $
15 */
16
17 #include "dbms.h"
18 #include "dbms_compat.h"
19 #include "dbms_comms.h"
20 #include "dbmsd.h"
21
22 #include "deamon.h"
23 #include "handler.h"
24 #include "mymalloc.h"
25
26 static int going_down = 0;
27 int client_counter = 0; /* XX static perhaps */
28
29 #ifdef STATIC_BUFF
30 static connection * free_connection_list = NULL;
31 static int free_connection_keep = 2;
32 static int free_connection_keep_max = 4;
33 static int free_connection_len = 0;
34 #endif
35
36 #define PTOK { int i; for(i=0; i<sizeof(cmd_table) / sizeof(struct command_req); i++) if ( cmd_table[i].cmd == r->cmd.token ) { dbms_log(L_DEBUG,"Token at %s:%d %s s=%d,%d",__FILE__,__LINE__, cmd_table[i].info,r->cmd.len1,r->cmd.len2); break; }; if(i>=sizeof(cmd_table) / sizeof(struct command_req)) dbms_log(L_DEBUG,"Token at %s:%d %s %d,%d",__FILE__,__LINE__, "**UNKOWN**",r->cmd.len1,r->cmd.len2); }
37
38 void
close_connection(connection * r)39 close_connection ( connection * r )
40 {
41 /* assert(r->clientfd); */
42 FD_CLR(r->clientfd,&allwset);
43 FD_CLR(r->clientfd,&allrset);
44 FD_CLR(r->clientfd,&alleset);
45 close(r->clientfd);
46 r->clientfd = 0;
47
48 /* shutdown(r->clientfd,2); */
49
50 if (r->sendbuff != NULL) {
51 myfree(r->sendbuff);
52 r->sendbuff = NULL;
53 };
54
55 if (r->recbuff != NULL) {
56 myfree(r->recbuff);
57 r->recbuff = NULL;
58 };
59
60 r->send = r->tosend = 0;
61 r->gotten = r->toget = 0;
62
63 r->close = 2; MX;
64
65 #ifdef STATIC_BUFF
66 if (free_connection_len < free_connection_keep) {
67 r->next = free_connection_list;
68 /* assert( free_connection_list != r ); */
69 free_connection_list = r;
70 free_connection_len++;
71 } else
72 #endif
73 myfree(r);
74
75 client_counter --;
76
77 return;
78 }
79
free_connection(connection * r)80 void free_connection (connection * r ) {
81
82 if (r->type == C_MUM) {
83 /* connection to the mother lost.. we _must_ exit now.. */
84 if (!going_down)
85 dbms_log(L_FATAL,"Mamma has died.. suicide time for child, fd=%d",r->clientfd);
86 cleandown(0);
87 } else
88 if (r->type == C_CLIENT) {
89 if (r->dbp) {
90 /* check if this is the last child using
91 * a certain database, and clean if such is
92 * indeed the case..
93 */
94 r->dbp->num_cls --;
95 if (r->dbp->num_cls<=0) {
96 dbms_log(L_INFORM,"Child was the last one to use %s, closing",
97 r->dbp->pfile);
98 r->dbp->close = 1; MX;
99 };
100 dbms_log(L_DEBUG,"Connection to client closed");
101 }
102 else {
103 dbms_log(L_WARN,"C_Client marked with no database ?");
104 }
105 } else
106 if (r->type == C_NEW_CLIENT) {
107 /* tough.. but that is about it.. or shall we try
108 * to send a message...
109 *
110 */
111 dbms_log(L_ERROR,"New child closing.. but then what ?");
112 } else
113 if (r->type == C_LEGACY) {
114 dbms_log(L_DEBUG,"Legacy close");
115 } else
116 if (r->type == C_CHILD) {
117 #ifdef FORKING
118 dbase * p=0;
119 child_rec * c=NULL;
120 /* we lost a connection to a child.. so try to kill
121 * it and forget about it...
122 * work out which databases are handled by this child..
123 */
124 for(p=first_dbp;p;p=p->nxt)
125 if (p->handled_by->r == r) {
126 if ((c) && (p->handled_by != c))
127 dbms_log(L_ERROR,"More than one child pointer ?");
128 p->close = 1; MX;
129 c = p->handled_by;
130 };
131 if (c==NULL) {
132 dbms_log(L_ERROR,"Child died, but no record..");
133 }
134 else {
135 /* overkill, as we do not even wait for the
136 * child to be clean up after itself.
137 */
138 c->close = 1; MX;
139 if (kill(c->pid,0) == 0) {
140 /* so the child is still alive ? */
141 dbms_log(L_DEBUG,"Sending kill signal (%d) to %d",
142 SIGTERM,c->pid);
143 kill(c->pid,SIGTERM);
144 };
145 };
146 #else
147 dbms_log(L_ERROR,"We are non forking, but still see a C_CHILD");
148 #endif
149 }
150 else {
151 dbms_log(L_ERROR,"Zapping a rather unkown connection type?");
152 };
153
154 r->type = C_UNK;
155 close_connection(r);
156 return;
157 }
158
zap(connection * r)159 void zap ( connection * r ) {
160 connection * * p;
161
162 for ( p = &client_list ; *p && *p != r; )
163 p = &((*p)->next);
164
165 if ( *p == NULL) {
166 dbms_log(L_ERROR,"Connection to zap not found");
167 return;
168 };
169
170 *p = r->next;
171 free_connection(r);
172 }
173
174
cleandown(int signo)175 void cleandown( int signo )
176 {
177 if (going_down)
178 dbms_log(L_ERROR,"Re-entry of cleandown().");
179
180 going_down = 1;
181
182 shutdown(sockfd,2);
183 close(sockfd);
184
185 /* send a kill to my children
186 */
187 if (!mum_pid)
188 kill(SIGTERM,0);
189
190 close_all_dbps();
191 #ifdef FORKING
192 clean_children();
193 #endif
194 {
195 connection * r;
196 for(r=client_list; r;) {
197 connection * q;
198 q = r; r=r->next;
199 assert(q != r);
200 close_connection(q);
201 }
202 }
203
204 /* close connection to mother */
205 if (mum)
206 close_connection(mum);
207
208
209 #ifdef RDFSTORE_DBMS_DEBUG_MALLOC
210 debug_malloc_dump(stderr);
211 #endif
212 #ifdef RDFSTORE_DBMS_DEBUG_TIME
213 fprintf(stderr,"Timedebug: Time waiting=%f, handling=%f network=%f\n",.1,.2,.3);
214 #endif
215 if (!mum_pid)
216 unlink(pid_file);
217
218 dbms_log(L_WARN,"Shutdown completed");
219 exit(0);
220 }
221
continue_send(connection * r)222 void continue_send( connection * r ) {
223 int s;
224
225 if ((r->tosend==0) || ( r->send >= r->tosend)) {
226 dbms_log(L_ERROR,"How did we get here ?");
227 r->close=1; MX;
228 return;
229 };
230
231 s = write(r->clientfd,r->sendbuff+r->send,r->tosend - r->send);
232
233 if ((s<=0) && (errno == EINTR)) {
234 dbms_log(L_INFORM,"Continued send interrupted. Retry.");
235 return;
236 }
237 else
238 if ((s<0) && (errno == EAGAIN)) {
239 dbms_log(L_WARN,"Continued send would still block");
240 return;
241 }
242 else
243 if (s<0) {
244 dbms_log(L_ERROR,"Failed to continue write %s",strerror(errno));
245 r->close=1;
246 return;
247 }
248 else
249 if (s==0) {
250 dbms_log(L_ERROR,"Client closed the connection on us");
251 r->close=1;
252 return;
253 }
254
255 r->send += s;
256 if ( r->send < r->tosend )
257 return;
258
259 r->send = r->tosend = 0;
260
261 #ifndef STATIC_SC_BUFF
262 if (r->sendbuff)
263 myfree( r->sendbuff );
264 r->sendbuff = NULL;
265 #endif
266
267 FD_CLR(r->clientfd,&allwset);
268 return;
269 }
270
dispatch(connection * r,int token,DBT * v1,DBT * v2)271 void dispatch( connection * r, int token, DBT * v1, DBT * v2) {
272 int s;
273
274 if ((r->tosend != 0) && (r->send !=0)) {
275 dbms_log(L_WARN,"dispatch, but still older data left to send");
276 goto fail_dispatch;
277 };
278
279 r->iov[0].iov_base = (void *) &(r->cmd);
280 r->iov[0].iov_len = sizeof(r->cmd);
281
282 r->iov[1].iov_base = r->v1.data =
283 (v1 == NULL) ? NULL : v1->data;
284 r->iov[1].iov_len = r->v1.size = r->cmd.len1 =
285 ( v1 == NULL ) ? 0 : v1->size;
286
287 r->iov[2].iov_base = r->v2.data =
288 (v2 == NULL) ? NULL : v2->data;
289 r->iov[2].iov_len = r->v2.size = r->cmd.len2 =
290 ( v2 == NULL ) ? 0 : v2->size;
291
292 r->tosend = sizeof(r->cmd) + r->cmd.len1 + r->cmd.len2;
293 r->send =0;
294
295 r->cmd.token = token;
296 r->cmd.len1 = htonl( r->cmd.len1 );
297 r->cmd.len2 = htonl( r->cmd.len2 );
298
299 #ifdef RDFSTORE_DBMS_DEBUG_TIME
300 gettimeofday(&(r->cmd.stamp),NULL);
301 #endif
302 /* BUG: we also use this with certain errors, in an attempt to
303 * inform the other side of the error. So it might well be
304 * that we block here... one day...
305 */
306 s=writev(r->clientfd,r->iov,3);
307
308 if (s<0) {
309 if (errno == EINTR) {
310 dbms_log(L_INFORM,"Initial write interrupted. Ignored");
311 s=0;
312 }
313 else
314 if (errno == EAGAIN) {
315 dbms_log(L_INFORM,"Initial write would block");
316 s = 0;
317 }
318 else {
319 dbms_log(L_ERROR,"Initial write error: %s",strerror(errno));
320 goto fail_dispatch;
321 };
322 }
323 else
324 if ((s==0) && (errno != EINTR)) {
325 dbms_log(L_ERROR,"Intial write; client closed connection");
326 goto fail_dispatch;
327 };
328
329 r->send += s;
330 if (r->send == r->tosend) {
331 r->send = 0;
332 r->tosend =0;
333 #ifndef STATIC_SC_BUFF
334 if (r->sendbuff)
335 myfree(r->sendbuff);
336 r->sendbuff = NULL;
337 #endif
338 }
339 else {
340 int at,i; void * p;
341 /* create a buffer for the remaining data
342 */
343
344 #if STATIC_SC_BUFF
345 if (r->tosend-r->send > MAX_SC_PAYLOAD) {
346 dbms_log(L_ERROR,
347 "Secondary write buffer of %d>%d bytes to big",
348 r->tosend - r->send,
349 MAX_SC_PAYLOAD
350 );
351 goto fail_dispatch;
352 };
353 #else
354 assert(r->tosend > r->send );
355 r->sendbuff = mymalloc( r->tosend - r->send );
356 #endif
357 assert(r->sendbuff);
358
359 if (r->sendbuff == NULL) {
360 dbms_log(L_ERROR,
361 "Out of memory whilst creating a secondary write buffer of %d bytes",
362 r->tosend - r->send
363 );
364 goto fail_dispatch;
365 };
366
367 for(p=r->sendbuff,i=0,at=0; i < 3; i++) {
368 if ( at > r->send ) {
369 memcpy(p, r->iov[i].iov_base,r->iov[i].iov_len);
370 p+=r->iov[i].iov_len;
371 } else
372 if ( at + r->iov[i].iov_len > r->send ) {
373 int offset = r->send - at;
374 int len=r->iov[i].iov_len - offset;
375 memcpy(p, r->iov[i].iov_base + offset, len);
376 p+=len;
377 }
378 else {
379 /* skip, done */
380 }
381 at += r->iov[i].iov_len;
382 };
383
384 /* redo our bookkeeping, as we have moved it all in
385 * just one contineous buffer. We had to copy, as the
386 * v1 and v2's propably just contained pointers to either
387 * a static error string or a memmap file form the DB inter
388 * face; neither which are going to live long.
389 */
390 r->tosend -= s;
391 r->send = 0;
392
393 FD_SET(r->clientfd,&allwset);
394 };
395
396 return;
397
398 fail_dispatch:
399 dbms_log(L_WARN,"dispatch failed");
400 r->close=1;MX;
401 return;
402 }
403
do_msg(connection * r,int token,char * msg)404 void do_msg ( connection * r, int token, char * msg) {
405 DBT rr;
406
407 rr.size = strlen(msg) +1;
408 rr.data = msg;
409
410 dispatch(r,token | F_SERVER_SIDE, &rr, NULL);
411 return;
412 }
413
414 connection *
handle_new_local_connection(int clientfd,int type)415 handle_new_local_connection(
416 int clientfd, int type
417 )
418 {
419 struct sockaddr_in none;
420 none.sin_addr.s_addr = INADDR_NONE;
421 return handle_new_connection(clientfd, type, none);
422 }
423
424 connection *
handle_new_connection(int clientfd,int type,struct sockaddr_in addr)425 handle_new_connection(
426 int clientfd, int type, struct sockaddr_in addr
427 )
428 {
429 connection * new;
430 int v;
431
432 if (client_counter > HARD_MAX_CLIENTS) {
433 dbms_log(L_ERROR,"Max number of clients reached (hard max), completely ignoring");
434 close(clientfd);
435 return NULL;
436 };
437
438 if (client_counter >= max_clients) {
439 connection tmp;
440 tmp.clientfd = clientfd;
441 tmp.close = tmp.send = tmp.tosend = tmp.gotten = tmp.toget = 0;
442 reply_log(&tmp,L_ERROR,"Too many connections fd=%d",clientfd);
443 close(clientfd);
444 return NULL;
445 };
446
447 if ( (v=fcntl( clientfd, F_GETFL, 0)<0) || (fcntl(clientfd, F_SETFL,v | O_NONBLOCK)<0) ) {
448 dbms_log(L_ERROR,"Could not make socket non blocking: %s",strerror(errno));
449 close(clientfd);
450 return NULL;
451 };
452
453 FD_SET(clientfd,&allrset);
454 FD_SET(clientfd,&alleset);
455
456 /* XXX we could try to fill holes in the bit array at this point;
457 * and get max fd as low as possible. But it seems that the OS
458 * already keeps the FDs as low as it can (except for OpenBSD ??)
459 */
460 if ( clientfd > maxfd )
461 maxfd=clientfd;
462
463 /* if still space, use, otherwise tack another
464 * one to the end..
465 */
466 #if STATIC_BUFF
467 if (free_connection_list != NULL) {
468 new = free_connection_list;
469 free_connection_list = new->next;
470 free_connection_len --;
471 } else {
472 assert(free_connection_len == 0);
473 if (free_connection_keep < free_connection_keep_max/2)
474 free_connection_keep *= 2;
475 else
476 if (free_connection_keep < free_connection_keep_max)
477 free_connection_keep += 2;
478 #endif
479 if ((new = (connection *) mymalloc(sizeof(connection))) == NULL )
480 {
481 dbms_log(L_ERROR,"Could not claim enough memory");
482 close(clientfd);
483 return NULL;
484 };
485 #if STATIC_BUFF
486 }
487 #endif
488 bzero(new,sizeof(connection));
489 new->next = client_list;
490 client_list = new;
491
492 bzero(new,sizeof(new));
493
494 /* Copy the needed information. */
495 new->clientfd = clientfd;
496
497 new->sendbuff = NULL;
498 #ifdef STATIC_SC_BUFF
499 if ((type != C_CHILD))
500 new->sendbuff = (unsigned char *) mymalloc(MAX_SC_PAYLOAD);
501 #endif
502 new->recbuff = NULL;
503 #ifdef STATIC_CS_BUFF
504 if ((type != C_CHILD))
505 new->recbuff = (unsigned char *) mymalloc(MAX_CS_PAYLOAD);
506 #endif
507
508 new->dbp = NULL;
509 new->type = type;
510
511 #ifdef TIMEOUT
512 new->start = time(NULL);
513 new->last = time(NULL);
514 #endif
515 new->address = addr;
516 new->close = 0;
517 new->send = new->tosend = new->gotten = new->toget = 0;
518
519 client_counter ++;
520 return new;
521 }
522
final_read(connection * r)523 void final_read( connection * r)
524 {
525 r->toget = r->gotten = 0;
526 parse_request(r);
527
528 #ifndef STATIC_CS_BUFF
529 if (r->recbuff) {
530 myfree(r->recbuff);
531 r->recbuff = NULL;
532 };
533 #endif
534 return;
535 }
536
537
initial_read(connection * r)538 void initial_read( connection * r ) {
539 struct header skip_cmd;
540 int n=0;
541
542 /* we peek, untill we have the full command buffer, and
543 * only then do we give it any attention. This safes a
544 * few syscalls.
545 */
546 errno = 0;
547 n=recv(r->clientfd,&(r->cmd),sizeof(r->cmd),MSG_PEEK);
548
549 if (n<0) dbms_log(L_DEBUG,"Read fd=%d n=%d errno=%d/%s",r->clientfd,n,errno,strerror(errno));
550
551 if ((n < 0) && (errno == EAGAIN)) {
552 dbms_log(L_ERROR,"Again read %s on %d",strerror(errno),r->clientfd);
553 return;
554 }
555 else
556 if ((n <= 0) && (errno == EINTR)) {
557 dbms_log(L_ERROR,"Interruped read %s",strerror(errno));
558 return;
559 }
560 else
561 if (n<0) {
562 if (errno != ECONNRESET)
563 dbms_log(L_ERROR,"Read error %s",strerror(errno));
564 r->close=1;MX;
565 return;
566 }
567 else
568 if (n==0) {
569 dbms_log(L_INFORM,"Client side close on read %d/%s (fd=%d)",
570 errno,strerror(errno),r->clientfd);
571 r->close=1;MX;
572 return;
573 }
574 else
575 if ( n != sizeof(r->cmd) ) {
576 /* lets log this, as we want to get an idea if this actually happens .
577 * seems not, BSD, on high load, SCO.
578 */
579 dbms_log(L_WARN,"Still waitingn for those 5 bytes, gotten LESS");
580 return;
581 }
582 else {
583 #ifdef RDFSTORE_DBMS_DEBUG_TIME
584 float s,m;
585 struct timeval t;
586 gettimeofday(&t,NULL);
587 s=t.tv_sec - r->cmd.stamp.tv_sec;
588 m=t.tv_usec - r->cmd.stamp.tv_usec;
589 MDEBUG((stderr,"Time taken %f seconds\n", s + m / 1000000.0 ));
590 total_time += s + m / 1000000.0;
591 #endif
592
593 /* check if this is ok ?, if not, do not
594 * touch it with a stick.
595 */
596 #if 0
597 if (( (r->cmd.token) & ~MASK_TOKEN ) != F_CLIENT_SIDE ) {
598 reply_log(r,L_ERROR,"Not a client side token..");
599 r->close=1; MX;
600 return;
601 };
602 #endif
603 r->cmd.token &= MASK_TOKEN;
604
605 /* set up a single buffer to get the remainder of this
606 * message
607 */
608 r->v1.size= r->cmd.len1 = ntohl( r->cmd.len1);
609 r->v2.size= r->cmd.len2 = ntohl( r->cmd.len2);
610
611 // silly endian check.
612 #if 1
613 if (r->v1.size > 2*1024*1024) {
614 reply_log(r,L_ERROR,"Size one to big");
615 r->close=1; MX;
616 return;
617 };
618
619 if (r->v1.size > 2*1024*1024) {
620 reply_log(r,L_ERROR,"Size two to big");
621 r->close=1; MX;
622 return;
623 };
624 #endif
625
626 #ifndef STATIC_CS_BUFF
627 if (r->recbuff)
628 myfree(r->recbuff);
629 r->recbuff = NULL;
630 #endif
631 r->v2.data = r->v1.data = NULL;
632 r->toget = r->gotten = 0;
633
634 if (r->cmd.len1 + r->cmd.len2 > 0) {
635 #if STATIC_CS_BUFF
636 if (r->cmd.len1 + r->cmd.len2 > MAX_CS_PAYLOAD) {
637 reply_log(r,L_ERROR,
638 "RQ string(s) to big %d>%d bytes",
639 r->cmd.len1 + r->cmd.len2,
640 MAX_CS_PAYLOAD
641 );
642 r->close=1; MX;
643 return;
644 }
645 #else
646 r->recbuff = mymalloc( r->cmd.len1 + r->cmd.len2 );
647 #endif
648 if (r->recbuff == NULL) {
649 reply_log(r,L_ERROR,
650 "No Memrory for RQ string(s) %d bytes",
651 r->cmd.len1 + r->cmd.len2);
652 r->close=1; MX;
653 return;
654 };
655 r->v1.data = r->recbuff;
656 r->v2.data = r->recbuff + r->cmd.len1;
657 r->toget = r->cmd.len1 + r->cmd.len2;
658 }
659
660 r->iov[0].iov_base = (void *) &skip_cmd;
661 r->iov[0].iov_len = sizeof( r->cmd );
662
663 r->iov[1].iov_base = r->recbuff;
664 r->iov[1].iov_len = r->toget;
665
666 reread:
667 errno = 0;
668 n = readv( r->clientfd, r->iov, 2);
669
670 if ((n<=0) && (errno == EINTR)) {
671 dbms_log(L_INFORM,"Interrupted readv. Ignored");
672 goto reread;
673 }
674 else
675 if ((n<0) && (errno == EAGAIN)) {
676 dbms_log(L_ERROR,"Would block. Even though we peeked at the cmd string. Retry");
677 goto reread;
678 }
679 else
680 if (n<0) {
681 dbms_log(L_ERROR,"Error while reading remainder: (1 %s",strerror(errno));
682 r->close=1; MX;
683 return;
684 }
685 else
686 if (n==0) {
687 dbms_log(L_INFORM,"Read, but client closed");
688 r->close=1; MX;
689 return;
690 };
691
692 assert(n >= sizeof(r->cmd));
693
694 n -= sizeof(r->cmd);
695 r->gotten += n;
696
697 if ( r->gotten >= r->toget) {
698 final_read(r);
699 return;
700 };
701 }
702 /* should not get here.. */
703 return;
704 }
705
706 void
continue_read(connection * r)707 continue_read( connection * r ) {
708 /* fill up the two buffers.. */
709 int s;
710
711 dbms_log(L_VERBOSE,"continued read for %d..",r->toget);
712 errno = 0;
713 s = read( r->clientfd, r->gotten + r->v1.data, r->toget - r->gotten);
714
715 if ((s<=0) && (errno == EINTR)) {
716 dbms_log(L_INFORM,"Interrupted continued read. Ignored");
717 return;
718 }
719 else
720 if (((s<0) && (errno == EAGAIN)) ) {
721 dbms_log(L_ERROR,"continued read, but nothing there");
722 return;
723 }
724 else
725 if (s<0) {
726 dbms_log(L_ERROR,"Error while reading remainder: (2 %s",strerror(errno));
727 r->close=1; MX;
728 return;
729 }
730 else
731 if (s==0) {
732 dbms_log(L_ERROR,"continued read, but client closed connection (%d/%s)",
733 errno,strerror(errno));
734 r->close=1; MX;
735 return;
736 };
737
738 r->gotten +=s;
739
740 if (r->gotten >= r->toget)
741 final_read(r);
742
743 return;
744 }
745
746
747