1 /*
2 * Copyright (c) 2000-2006 Alberto Reggiori <areggiori@webweaving.org>
3 * Dirk-Willem van Gulik <dirkx@webweaving.org>
4 *
5 * NOTICE
6 *
7 * This product is distributed under a BSD/ASF like license as described in the 'LICENSE'
8 * file you should have received together with this source code. If you did not get a
9 * a copy of such a license agreement you can pick up one at:
10 *
11 * http://rdfstore.sourceforge.net/LICENSE
12 *
13 * Perl 'tie' interface to a socket connection. Possibly to
14 * the a server which runs a thin feneer to the Berkely DB.
15 *
16 */
17
18 #include "dbms.h"
19 #include "dbms_comms.h"
20
21 #include <stdio.h>
22
23 typedef dbms *DBMS;
24
25 static char _erm[256] = "\0";
26
27 static char *dbms_error[] = {
28 /* E_UNDEF 1000 */
29 "Not defined",
30 /* E_NONNUL 1001 */
31 "Undefined Error",
32 /* E_FULLREAD 1002 */
33 "Could not receive all bytes from DBMS server",
34 /* E_FULLWRITE 1003 */
35 "Could not send all bytes to DBMS server",
36 /* E_CLOSE 1004 */
37 "DBMS server closed the connection",
38 /* E_HOSTNAME 1005 */
39 "Could not find/resolve DBMS servers hostname",
40 /* E_VERSION 1006 */
41 "DBMS Version not supported",
42 /* E_PROTO 1007 */
43 "DBMS Reply not understood",
44 /* E_ERROR 1008 */
45 "DBMS Server side error"
46 /* E_NOMEM 1009 */
47 "Out of memory",
48 /* E_RETRY 1010 */
49 "Failed after several retries",
50 /* E_NOPE 1011 */
51 "No such database",
52 /* E_XXX 1012 */
53 "No such database",
54 /* E_TOOBIG 1013 */
55 "Packed bigger than static",
56 /* E_BUG 1014 */
57 "Conceptual error"
58 };
59
60 static dbms_error_t reconnect(dbms * me);
61 static dbms_error_t reselect(dbms * me);
62 static dbms_error_t getpack(dbms * me, unsigned long len, DBT * r);
63 static dbms_error_t i_comms(dbms * me, int token, int *retval, DBT * v1, DBT * v2, DBT * r1, DBT * r2);
64
65 static void
mark_dbms_error(dbms * me,char * msg,dbms_error_t erx)66 mark_dbms_error(dbms * me, char *msg, dbms_error_t erx)
67 {
68 bzero(me->err, sizeof(me->err));
69 if (erx == E_ERROR) {
70 snprintf(me->err, sizeof(me->err),
71 "DBMS Error %s: %s", msg,
72 errno == 0 ? "" : (strlen(strerror(errno)) <= sizeof(me->err)) ? strerror(errno) : "");
73 } else if ((erx > E_UNDEF) && (erx <= E_BUG)) {
74 strncat(me->err, msg, sizeof(me->err) - 1);
75 strncat(me->err, ": ", sizeof(me->err) - 1);
76 strncat(me->err, dbms_error[erx - E_UNDEF], sizeof(me->err) - 1);
77 } else {
78 strncat(me->err, msg, sizeof(me->err) - 1);
79 strncat(me->err, ": ", sizeof(me->err) - 1);
80 if (strlen(strerror(erx)) <= sizeof(me->err) - strlen(me->err) - 1)
81 strncat(me->err, strerror(erx), sizeof(me->err) - 1);
82 };
83
84 if (strlen(me->err) <= sizeof(_erm))
85 strcpy(_erm, me->err);
86 };
87
88 static void
set_dbms_error(dbms * me,char * msg,dbms_error_t erx)89 set_dbms_error(dbms * me, char *msg, dbms_error_t erx)
90 {
91 mark_dbms_error(me, msg, erx);
92 if (me->error)
93 (*(me->error)) (me->err, erx);
94 };
95
96 extern char *
dbms_get_error(dbms * me)97 dbms_get_error(dbms * me)
98 {
99 if (me == NULL)
100 return _erm;
101 else
102 return me->err;
103 }
104
105 static void
_warning(dbms_cause_t event,int count)106 _warning(dbms_cause_t event, int count)
107 {
108 /*
109 * Note: _erm use NOT thread safe. Should change function signature
110 * to pass dbms * pointer.
111 */
112 if (event == DBMS_EVENT_RECONNECT)
113 fprintf(stderr, "DBMS Reconnecting %i (%s)...\n", count, _erm);
114 else if (event == DBMS_EVENT_WAITING)
115 fprintf(stderr, "DBMS Waiting %i...\n", count);
116 else
117 fprintf(stderr, "DBMS Unknown event (%s)\n", _erm);
118 }
119
120 static int cnt = 0;
121 static FILE *logfile = NULL;
122
123 static void
_tlog(char * fmt,...)124 _tlog(char *fmt,...)
125 {
126 if (!logfile)
127 return;
128 {
129 char tmp[1024];
130 char buf[ 128 * 1024 ];
131 va_list ap;
132 time_t tt;
133 time(&tt);
134 snprintf(tmp, sizeof(tmp), "%04d:%20s %s", cnt, asctime(gmtime(&tt)), fmt);
135
136 va_start(ap, fmt);
137 vsnprintf(buf,sizeof(buf)-1, tmp, ap);
138 va_end(ap);
139
140 fprintf(logfile,"%s\n",buf);
141 fflush(logfile);
142 }
143 }
144
_token2name(int x)145 static char * _token2name(int x) {
146 x = x & MASK_TOKEN;
147 #define CC(x) case x: return #x; break;
148 switch (x) {
149 CC(TOKEN_ERROR);
150 CC(TOKEN_FETCH );
151 CC(TOKEN_STORE );
152 CC(TOKEN_DELETE );
153 CC(TOKEN_NEXTKEY );
154 CC(TOKEN_FIRSTKEY);
155 CC(TOKEN_FROM );
156 CC(TOKEN_EXISTS );
157 CC(TOKEN_SYNC );
158 CC(TOKEN_INIT );
159 CC(TOKEN_CLOSE );
160 CC(TOKEN_CLEAR );
161 CC(TOKEN_FDPASS );
162 CC(TOKEN_PING );
163 CC(TOKEN_INC );
164 CC(TOKEN_LIST );
165 CC(TOKEN_DEC );
166 CC(TOKEN_PACKINC );
167 CC(TOKEN_PACKDEC );
168 CC(TOKEN_DROP );
169 default: return "TOKEN_UNKNOWN";
170 };
171 return "XXX";
172 }
173
174 static char *
_hex(dbms * me,int len,void * str)175 _hex(dbms * me, int len, void *str)
176 {
177 size_t i;
178 char *r = NULL;
179
180 if (len == 0) {
181 r = (char *) (*me->malloc)( strlen("[0]\"\"")+1 );
182 strcpy( r, "[0]\"\"" );
183 return r;
184 };
185
186 if (str == NULL) {
187 r = (char *) (*me->malloc)( strlen("<null>")+1 );
188 strcpy( r, "<null>" );
189 return r;
190 };
191
192 if (len > 50000) {
193 r = (char *) (*me->malloc)( strlen("<toolong>")+1 );
194 strcpy( r, "<toolong>" );
195 return r;
196 };
197
198 r = (char *) (*me->malloc)(3*len + 100);
199 if (r == NULL) {
200 r = (char *) (*me->malloc)( strlen("<outofmem>")+1 );
201 strcpy( r, "<outofmem>" );
202 return r;
203 };
204
205 sprintf(r, "[%06d]\"", len);
206
207 for (i = 0; i < len; i++) {
208 char p[3];
209 unsigned int c = ((unsigned char *) str)[i];
210
211 if (c && isprint(c) && (c != '%')) {
212 p[0] = c; p[1] = '\0';
213 } else {
214 sprintf(p,"%%%02x", c);
215 };
216 strcat(r,p);
217 };
218 strcat(r,"\"");
219 return r;
220 }
221
222 extern dbms *
dbms_connect(char * name,char * host,int port,dbms_xsmode_t mode,void * (* _my_malloc)(size_t s),void (* _my_free)(void * adr),void (* _my_report)(dbms_cause_t event,int count),void (* _my_error)(char * err,int erx),int bt_compare_fcn_type)223 dbms_connect(
224 char *name, char *host, int port,
225 dbms_xsmode_t mode,
226 void *(*_my_malloc) (size_t s),
227 void (*_my_free) (void *adr),
228 void (*_my_report) (dbms_cause_t event, int count),
229 void (*_my_error) (char *err, int erx),
230 int bt_compare_fcn_type
231 )
232 {
233 dbms *me;
234 int err = 0;
235
236 if ((name == NULL) || (*name == '\0'))
237 return NULL;
238
239 if ((host == NULL) || (*host == '\0'))
240 host = DBMS_HOST;
241
242 if (port == 0)
243 port = DBMS_PORT;
244
245
246 if (_my_malloc == NULL)
247 _my_malloc = &malloc;
248
249 if (_my_free == NULL)
250 _my_free = &free;
251
252 if (_my_report == NULL)
253 _my_report = &_warning;
254
255 me = (dbms *) (*_my_malloc) (sizeof(dbms));
256 if (me == NULL)
257 return NULL; /* rely on errno */
258
259 me->bt_compare_fcn_type = bt_compare_fcn_type;
260
261 me->malloc = _my_malloc;
262 me->free = _my_free;
263 me->callback = _my_report;
264 me->error = _my_error;
265 bzero(me->err, sizeof(me->err));
266
267 switch (mode) {
268 case DBMS_XSMODE_DEFAULT:
269 mode = DBMS_MODE; /* default */
270 break;
271 ;;
272 case DBMS_XSMODE_RDONLY:
273 break;
274 ;;
275 case DBMS_XSMODE_RDWR:
276 break;
277 ;;
278 case DBMS_XSMODE_CREAT:
279 break;
280 ;;
281 case DBMS_XSMODE_DROP:
282 break;
283 ;;
284 default:
285 {
286 char _buff[1024];
287 snprintf(_buff, sizeof(_buff), "Unknown DBMS Access type (%d)", (int) mode);
288 set_dbms_error(me, _buff, 0);
289 }
290 (*(me->free)) (me);
291 return NULL;
292 break;
293 }
294
295 me->sockfd = -1;
296 me->mode = (u_long) mode;
297 me->port = port;
298 me->name = (char *) (*me->malloc)( strlen(name)+1 );
299 if( me->name == NULL ) {
300 (*(me->free)) (me);
301 return NULL;
302 };
303 strcpy( me->name, name );
304 me->host = (char *) (*me->malloc)( strlen(host)+1 );
305 if( me->host == NULL ) {
306 (*(me->free)) (me->name);
307 (*(me->free)) (me);
308 return NULL;
309 };
310 strcpy( me->host, host );
311
312 /*
313 * quick and dirty hack to check for IP vs FQHN and fall through when
314 * in doubt to resolving.
315 */
316 me->addr = INADDR_NONE;
317 {
318 int i = 0;
319 for (; me->host[i] != '\0'; i++)
320 if (!isdigit((int) (me->host[i])) && me->host[i] != '.')
321 break;
322
323 if (me->host[i] == '\0')
324 me->addr = inet_addr(host);
325 }
326
327 if (me->addr == INADDR_NONE) {
328 struct hostent *hp;
329 if ((hp = gethostbyname(me->host)) == NULL) {
330 set_dbms_error(me, "Hostname lookup failed", errno);
331 (*(me->free)) (me->name);
332 (*(me->free)) (me->host);
333 (*(me->free)) (me);
334 return NULL;
335 };
336 /*
337 * copy the address, rather than the pointer as we need it
338 * later. It is an unsigned long.
339 */
340 me->addr = *(u_long *) hp->h_addr;
341 };
342
343 if ((err = reconnect(me))) {
344 set_dbms_error(me, "Connection failed", err);
345 (*(me->free)) (me->name);
346 (*(me->free)) (me->host);
347 (*(me->free)) (me);
348 return NULL;
349 };
350
351 if ((err = reselect(me))) {
352 set_dbms_error(me, "Selection failed", err);
353 (*(me->free)) (me->name);
354 (*(me->free)) (me->host);
355 (*(me->free)) (me);
356 return NULL;
357 };
358
359 {
360 char * file = getenv("DBMS_LOG");
361 cnt++;
362 if (file && (logfile == NULL)) {
363 if ((logfile = fopen(file, "a"))) {
364 fprintf(stderr, "Logging to %s\n", file);
365 } else {
366 fprintf(stderr,"Failure to log to %s: %s\n",file,strerror(errno));
367 };
368 }
369 if (logfile)
370 _tlog("start %d %s",cnt,name);
371 }
372 return me;
373 }
374
375
376 static dbms_error_t
reconnect(dbms * me)377 reconnect(
378 dbms * me
379 )
380 {
381 struct sockaddr_in server;
382 int one = 1;
383 int csnd_len, csnd, sndbuf = 16 * 1024;
384 int e = 0;
385
386 /*
387 * we could moan if me->sockfd is still set.. or do a silent close,
388 * just in case ?
389 */
390 if (me->sockfd >= 0) {
391 shutdown(me->sockfd, 2);
392 close(me->sockfd);
393 }
394 if ((me->sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
395 set_dbms_error(me, "socket", errno);
396 return E_ERROR;
397 }
398 if (0) {
399 /*
400 * allow for re-use; to avoid that we have to wait for a fair
401 * amounth of time after disasterous crashes,
402 */
403 if ((setsockopt(me->sockfd, SOL_SOCKET, SO_REUSEADDR,
404 (const char *) &one, sizeof(one))) < 0) {
405 set_dbms_error(me, "setsockopt(reuse)", errno);
406 me->sockfd = -1;
407 close(me->sockfd);
408 return E_ERROR;
409 };
410 }
411 csnd_len = sizeof(csnd);
412 if (getsockopt(me->sockfd, SOL_SOCKET, SO_SNDBUF,
413 (void *) &csnd, (void *) &csnd_len) < 0) {
414 set_dbms_error(me, "getsockopt(sndbuff)", errno);
415 me->sockfd = -1;
416 close(me->sockfd);
417 return E_ERROR;
418 };
419 assert(csnd_len == sizeof(csnd));
420
421 /*
422 * only set when smaller
423 */
424 if ((csnd < sndbuf) &&
425 (setsockopt(me->sockfd, SOL_SOCKET, SO_SNDBUF,
426 (const void *) &sndbuf, sizeof(sndbuf)) < 0)) {
427 set_dbms_error(me, "setsockopt(sndbuff)", errno);
428 me->sockfd = -1;
429 close(me->sockfd);
430 return E_ERROR;
431 };
432
433 /*
434 * Discard any data still in our send buffer whe closing and do not
435 * linger around for any thing from the server.
436 */
437 {
438 struct linger l = {1, 0}; /* Linger On, Lingertime 0 */
439 if ((setsockopt(me->sockfd, SOL_SOCKET, SO_LINGER,
440 (const char *) &l, sizeof(l))) < 0) {
441 set_dbms_error(me, "setsockopt(disble-nagle)", errno);
442 me->sockfd = -1;
443 close(me->sockfd);
444 return E_ERROR;
445 };
446 }
447 /*
448 * disable Nagle, for speed
449 */
450 if ((setsockopt(me->sockfd, IPPROTO_TCP, TCP_NODELAY,
451 (const char *) &one, sizeof(one))) < 0) {
452 set_dbms_error(me, "setsockopt(disble-nagle)", errno);
453 me->sockfd = -1;
454 close(me->sockfd);
455 return E_ERROR;
456 };
457
458 /*
459 * larger buffer; as we know that we can initially slide open the
460 * window bigger.
461 */
462
463 while (e++ < 4) {
464 bzero((char *) &server, sizeof(server));
465
466 server.sin_family = AF_INET;
467 server.sin_addr.s_addr = me->addr;
468 server.sin_port = htons(me->port);
469
470 if (connect(me->sockfd, (struct sockaddr *) & server, sizeof(server)) == 0)
471 return 0;
472 if (errno != EADDRINUSE)
473 break;
474
475 usleep(e * e * 100 * 1000); /* wait 0.1, 0.4, 0.9, 2.5
476 * second */
477 }
478 mark_dbms_error(me, "connect()", errno);
479 me->sockfd = -1;
480 return E_ERROR;
481 }
482
483 dbms_error_t
dbms_disconnect(dbms * me)484 dbms_disconnect(
485 dbms * me
486 )
487 {
488 int retval;
489
490 assert(me);
491 assert(me->sockfd >= 0);
492
493 dbms_comms(me, TOKEN_CLOSE, &retval, NULL, NULL, NULL, NULL);
494 #ifdef TPS
495 if (getenv("GATEWAY_INTERFACE") == NULL)
496 fprintf(stderr, "Performance: %ld # %.2f mSec/trans = %.1f\n",
497 ttps, ttime / ttps / 1000.0,
498 1000000.0 * ttps / ttime
499 );
500 #endif
501 shutdown(me->sockfd, 2);
502 close(me->sockfd);
503 (*(me->free)) (me->name);
504 (*(me->free)) (me->host);
505 (*(me->free)) (me);
506 if (logfile) fclose(logfile);
507
508 return 0;
509 }
510
511 static dbms_error_t
getpack(dbms * me,unsigned long len,DBT * r)512 getpack(
513 dbms * me,
514 unsigned long len,
515 DBT * r
516 )
517 {
518 unsigned int gotten;
519 char *at;
520
521 r->size = 0;
522 r->data = NULL;
523
524 if (len == 0)
525 return 0;
526
527 if (r == NULL)
528 return E_BUG;
529
530 #ifdef STATIC_SC_BUFF
531 if (len > MAX_SC_PAYLOAD)
532 return E_TOOBIG;
533 #endif
534 r->size = 0;
535 r->data = (char *) (*me->malloc) (len);
536
537 if (r->data == 0)
538 return E_NOMEM;
539
540 /* should block ? */
541
542 for (at = r->data, gotten = 0; gotten < len;) {
543 ssize_t l;
544 l = recv(me->sockfd, at, len - gotten, 0);
545 if (l < 0) {
546 set_dbms_error(me, "packet-recv()", errno);
547 (*me->free) (r->data);
548 r->data = NULL;
549 return E_ERROR;
550 } else if (l == 0) {
551 (*me->free) (r->data);
552 r->data = NULL;
553 return E_CLOSE;
554 };
555 gotten += l, at += l;
556 };
557
558 r->size = len;
559 return 0;
560 };
561
562 static
563 dbms_error_t
i_comms(dbms * me,int token,int * retval,DBT * v1,DBT * v2,DBT * r1,DBT * r2)564 i_comms(
565 dbms * me,
566 int token,
567 int *retval,
568 DBT * v1,
569 DBT * v2,
570 DBT * r1,
571 DBT * r2
572 )
573 {
574 int err = 0;
575 DBT rr1, rr2;
576 struct header cmd;
577 struct iovec iov[3];
578 struct msghdr msg;
579 size_t s;
580
581 if (retval)
582 *retval = -1;
583
584 rr1.data = rr2.data = NULL;
585
586 cmd.token = token | F_CLIENT_SIDE;
587
588 cmd.len1 = htonl((v1 == NULL) ? 0 : v1->size);
589 cmd.len2 = htonl((v2 == NULL) ? 0 : v2->size);
590
591 iov[0].iov_base = (char *) &cmd;
592 iov[0].iov_len = sizeof(cmd);
593
594 iov[1].iov_base = (v1 == NULL) ? NULL : v1->data;
595 iov[1].iov_len = (v1 == NULL) ? 0 : v1->size;
596
597 iov[2].iov_base = (v2 == NULL) ? NULL : v2->data;
598 iov[2].iov_len = (v2 == NULL) ? 0 : v2->size;
599
600 #ifdef STATIC_CS_BUFF
601 if (iov[0].iov_len + iov[1].iov_len + iov[2].iov_len > MAX_CS_PAYLOAD)
602 return E_TOOBIG;
603 #endif
604 msg.msg_name = NULL;
605 msg.msg_namelen = 0;
606 msg.msg_iov = iov;
607 msg.msg_iovlen = 3;
608 /* temporal fix to make CYGWN compile the basic thing - need better solution */
609 #ifndef RDFSTORE_PLATFORM_CYGWIN
610 msg.msg_control = NULL;
611 msg.msg_controllen = 0;
612 msg.msg_flags = 0;
613 #endif
614 s = sendmsg(me->sockfd, &msg, 0);
615
616 if (s == 0) {
617 err = E_CLOSE;
618 goto retry_com;
619 } else if (s < 0) {
620 mark_dbms_error(me, "sendmsg()", errno);
621 err = E_ERROR;
622 goto retry_com;
623 } else if (s != iov[0].iov_len + iov[1].iov_len + iov[2].iov_len) {
624 err = E_FULLWRITE;
625 goto exit_com;
626 };
627
628 s = recv(me->sockfd, &cmd, sizeof(cmd), 0);
629
630 if (s == 0) {
631 err = E_CLOSE;
632 goto retry_com;
633 } else if (s < 0) {
634 mark_dbms_error(me, "header-recv()", errno);
635 err = E_ERROR;
636 goto retry_com;
637 } else if (s != sizeof(cmd)) {
638 err = E_FULLREAD;
639 goto exit_com;
640 };
641
642 cmd.len1 = ntohl(cmd.len1);
643 cmd.len2 = ntohl(cmd.len2);
644
645 rr2.data = rr1.data = NULL;
646 if ((err = getpack(me, cmd.len1, r1 ? r1 : &rr1)) != 0)
647 goto retry_com;
648
649 if ((err = getpack(me, cmd.len2, r2 ? r2 : &rr2)) != 0)
650 goto retry_com;
651
652 if ((cmd.token & MASK_TOKEN) == TOKEN_ERROR) {
653 char *d = NULL;
654 int l = 0;
655 if (r1) {
656 l = r1->size;
657 d = r1->data;
658 } else {
659 l = rr1.size;
660 d = rr1.data;
661 };
662 errno = 0;
663 if ((d) && (l > 0)) {
664 d[l] = '\0';
665 } else {
666 d = "DBMS side errror, no cause reported";
667 };
668 err = E_ERROR;
669 errno = 0;
670 set_dbms_error(me, d, err);
671 goto exit_com;
672 } else if (((cmd.token & MASK_TOKEN) != token) ||
673 ((cmd.token | F_SERVER_SIDE) == 0)) {
674 err = E_PROTO;
675 goto exit_com;
676 };
677
678 if ((rr1.data != NULL) && (rr1.size)) {
679 (*me->free) (rr1.data);
680 rr1.size = 0;
681 };
682
683 if ((rr2.data != NULL) && (rr2.size)) {
684 (*me->free) (rr2.data);
685 rr1.size = 0;
686 };
687
688 if ((cmd.token & MASK_STATUS) == F_FOUND) {
689 if (retval)
690 *retval = 0;
691 } else {
692 if (retval)
693 *retval = 1;
694 if (r1 != NULL) {
695 if ((r1->size) && (r1->size))
696 (*me->free) (r1->data);
697 r1->data = NULL;
698 r1->size = 0;
699 };
700 if (r2 != NULL) {
701 if ((r2->size) && (r2->size))
702 (*me->free) (r2->data);
703 r2->data = NULL;
704 r2->size = 0;
705 };
706 };
707
708 err = 0;
709 goto done_com;
710
711 retry_com:
712 exit_com:
713 if ((r1 != NULL) && (r1->data != NULL) && (r1->size != 0)) {
714 (*me->free) (r1->data);
715 r1->size = 0;
716 };
717
718 if ((r2 != NULL) && (r2->data != NULL) && (r2->size != 0)) {
719 (*me->free) (r2->data);
720 r2->size = 0;
721 };
722
723 if ((rr1.data != NULL) && (rr1.size)) {
724 (*me->free) (rr1.data);
725 rr1.size = 0;
726 };
727
728 if ((rr2.data != NULL) && (rr1.size)) {
729 (*me->free) (rr2.data);
730 rr2.size = 0;
731 };
732
733 done_com:
734
735 return err;
736 }
737
738
739 static dbms_error_t
reselect(dbms * me)740 reselect(dbms * me)
741 {
742 DBT r1, r2, v1;
743 int retval;
744 u_long buff[3];
745 int err = 0;
746 u_long proto = DBMS_PROTO;
747 u_long mode = me->mode;
748 char *name = me->name;
749 u_long bt_compare_fcn_type = me->bt_compare_fcn_type;
750
751 assert(sizeof(buff) == 12); /* really 4 bytes on the network ? */
752
753 buff[0] = htonl(proto);
754 buff[1] = htonl(mode);
755 buff[2] = htonl(bt_compare_fcn_type);
756
757 r1.size = sizeof(buff);
758 r1.data = &buff;
759
760 r2.size = strlen(name);
761 r2.data = name;
762
763 v1.data = NULL; /* set up buffer for return protocol and
764 * confirmation */
765 v1.size = 0;
766
767 if ((err = i_comms(me, TOKEN_INIT, &retval, &r1, &r2, &v1, NULL))) {
768 /*
769 * keep the exit code fprintf(stderr,"Fail2\n");
770 */
771 } else if (retval == 1) {
772 err = E_NOPE;
773 } else if (retval < 0) {
774 err = E_PROTO;
775 } else if (ntohl(*((u_long *) v1.data)) > DBMS_PROTO) {
776 err = E_VERSION;
777 };
778
779 if (v1.size)
780 (*me->free) (v1.data);
781 return err;
782 }
783
784 extern dbms_error_t
dbms_comms(dbms * me,int token,int * retval,DBT * v1,DBT * v2,DBT * r1,DBT * r2)785 dbms_comms(
786 dbms * me,
787 int token,
788 int *retval,
789 DBT * v1,
790 DBT * v2,
791 DBT * r1,
792 DBT * r2
793 )
794 {
795 int errs = 5;
796 int err = 0;
797
798 struct sigaction act, oact;
799 #ifdef TPS
800 gettimeofday(&tstart, NULL);
801 #endif
802
803 if (logfile) {
804 char *p1 = NULL;
805 char *p2 = NULL;
806 if (v1)
807 p1 = _hex(me, v1->size, v1->data);
808 if (v2)
809 p2 = _hex(me, v2->size, v2->data);
810
811 _tlog("%s@%s:%d %s(%02d) >>> %s %s",
812 me->name, me->host,me->port, _token2name(token), token,
813 p1 ? p1 : "<null>",
814 p2 ? p2 : "<null>");
815 if (p1) (*me->free)(p1);
816 if (p2) (*me->free)(p2);
817 }
818 /*
819 * for now, SA_RESTART any interupted function calls
820 */
821 act.sa_handler = SIG_IGN;
822 sigemptyset(&act.sa_mask);
823 act.sa_flags = SA_RESTART;
824
825 sigaction(SIGPIPE, &act, &oact);
826 if (retval)
827 *retval = -1;
828
829 /*
830 * now obviously this is wrong; we do _not_ want to continue during
831 * certain errors.. ah well..
832 */
833 for (errs = 0; errs < 10; errs++) {
834 if ((me->sockfd >= 0) &&
835 ((err = i_comms(me, token, retval, v1, v2, r1, r2)) == 0))
836 break;
837
838 /*
839 * we could of course exit on certain errors, but which ? we
840 * call recv, et.al.
841 */
842 if (err == EAGAIN || err == EINTR)
843 continue;
844
845 /*
846 * If the DB on the other end reported an error - then it
847 * obviously is not a comms problem - so retrying makes no
848 * sense
849 */
850 if (err == E_ERROR)
851 break;
852
853 sleep(errs * 2);
854 shutdown(me->sockfd, 2);
855 close(me->sockfd);
856
857 me->sockfd = -1;/* mark that we have an issue */
858 if (((err = reconnect(me)) == 0) &&
859 ((err = reselect(me)) == 0)) {
860 if (errs)
861 (*(me->callback)) (DBMS_EVENT_RECONNECT, errs);
862 } else if (errs)
863 (*(me->callback)) (DBMS_EVENT_WAITING, errs);
864 };
865
866 #ifdef TPS
867 gettimeofday(&tnow, NULL);
868 ttps++;
869 ttime +=
870 (tnow.tv_sec - tstart.tv_sec) * 1000000 +
871 (tnow.tv_usec - tstart.tv_usec) * 1;
872 #endif
873 /*
874 * restore whatever it was before
875 */
876 sigaction(SIGPIPE, &oact, &act);
877 if (logfile) {
878 char *q1 = NULL;
879 char *q2 = NULL;
880 if (r1)
881 q1 = _hex(me, r1->size, r1->data);
882 if (r2)
883 q2 = _hex(me, r2->size, r2->data);
884
885 _tlog("%s@%s:%d %s(%02d) <<< %s %s",
886 me->name, me->host,me->port, _token2name(token), token,
887 q1 ? q1 : "<null>",
888 q2 ? q2 : "<null>");
889 if (q1) (*me->free)(q1);
890 if (q2) (*me->free)(q2);
891 }
892 return err;
893 };
894