1 /*
2  *     Copyright (c) 2000-2006 Alberto Reggiori <areggiori@webweaving.org>
3  *                        Dirk-Willem van Gulik <dirkx@webweaving.org>
4  *
5  * NOTICE
6  *
7  * This product is distributed under a BSD/ASF like license as described in the 'LICENSE'
8  * file you should have received together with this source code. If you did not get a
9  * a copy of such a license agreement you can pick up one at:
10  *
11  *     http://rdfstore.sourceforge.net/LICENSE
12  *
13  * Perl 'tie' interface to a socket connection. Possibly to
14  * the a server which runs a thin feneer to the Berkely DB.
15  *
16  */
17 
18 #include "dbms.h"
19 #include "dbms_comms.h"
20 
21 #include <stdio.h>
22 
23 typedef dbms   *DBMS;
24 
25 static char     _erm[256] = "\0";
26 
27 static char    *dbms_error[] = {
28 	/* E_UNDEF         1000 */
29 	"Not defined",
30 	/* E_NONNUL        1001 */
31 	"Undefined Error",
32 	/* E_FULLREAD      1002 */
33 	"Could not receive all bytes from DBMS server",
34 	/* E_FULLWRITE     1003 */
35 	"Could not send all bytes to DBMS server",
36 	/* E_CLOSE         1004 */
37 	"DBMS server closed the connection",
38 	/* E_HOSTNAME      1005 */
39 	"Could not find/resolve DBMS servers hostname",
40 	/* E_VERSION       1006 */
41 	"DBMS Version not supported",
42 	/* E_PROTO         1007 */
43 	"DBMS Reply not understood",
44 	/* E_ERROR         1008 */
45 	"DBMS Server side error"
46 	/* E_NOMEM         1009 */
47 	"Out of memory",
48 	/* E_RETRY         1010 */
49 	"Failed after several retries",
50 	/* E_NOPE          1011 */
51 	"No such database",
52 	/* E_XXX           1012 */
53 	"No such database",
54 	/* E_TOOBIG        1013 */
55 	"Packed bigger than static",
56 	/* E_BUG           1014 */
57 	"Conceptual error"
58 };
59 
60 static dbms_error_t reconnect(dbms * me);
61 static dbms_error_t reselect(dbms * me);
62 static dbms_error_t getpack(dbms * me, unsigned long len, DBT * r);
63 static dbms_error_t i_comms(dbms * me, int token, int *retval, DBT * v1, DBT * v2, DBT * r1, DBT * r2);
64 
65 static void
mark_dbms_error(dbms * me,char * msg,dbms_error_t erx)66 mark_dbms_error(dbms * me, char *msg, dbms_error_t erx)
67 {
68 	bzero(me->err, sizeof(me->err));
69 	if (erx == E_ERROR) {
70 		snprintf(me->err, sizeof(me->err),
71 			 "DBMS Error %s: %s", msg,
72 			 errno == 0 ? "" : (strlen(strerror(errno)) <= sizeof(me->err)) ? strerror(errno) : "");
73 	} else if ((erx > E_UNDEF) && (erx <= E_BUG)) {
74 		strncat(me->err, msg, sizeof(me->err) - 1);
75 		strncat(me->err, ": ", sizeof(me->err) - 1);
76 		strncat(me->err, dbms_error[erx - E_UNDEF], sizeof(me->err) - 1);
77 	} else {
78 		strncat(me->err, msg, sizeof(me->err) - 1);
79 		strncat(me->err, ": ", sizeof(me->err) - 1);
80 		if (strlen(strerror(erx)) <= sizeof(me->err) - strlen(me->err) - 1)
81 			strncat(me->err, strerror(erx), sizeof(me->err) - 1);
82 	};
83 
84 	if (strlen(me->err) <= sizeof(_erm))
85 		strcpy(_erm, me->err);
86 };
87 
88 static void
set_dbms_error(dbms * me,char * msg,dbms_error_t erx)89 set_dbms_error(dbms * me, char *msg, dbms_error_t erx)
90 {
91 	mark_dbms_error(me, msg, erx);
92 	if (me->error)
93 		(*(me->error)) (me->err, erx);
94 };
95 
96 extern char    *
dbms_get_error(dbms * me)97 dbms_get_error(dbms * me)
98 {
99 	if (me == NULL)
100 		return _erm;
101 	else
102 		return me->err;
103 }
104 
105 static void
_warning(dbms_cause_t event,int count)106 _warning(dbms_cause_t event, int count)
107 {
108 	/*
109 	 * Note: _erm use NOT thread safe. Should change function signature
110 	 * to pass dbms * pointer.
111 	 */
112 	if (event == DBMS_EVENT_RECONNECT)
113 		fprintf(stderr, "DBMS Reconnecting %i (%s)...\n", count, _erm);
114 	else if (event == DBMS_EVENT_WAITING)
115 		fprintf(stderr, "DBMS Waiting %i...\n", count);
116 	else
117 		fprintf(stderr, "DBMS Unknown event (%s)\n", _erm);
118 }
119 
120 static int      cnt = 0;
121 static FILE    *logfile = NULL;
122 
123 static void
_tlog(char * fmt,...)124 _tlog(char *fmt,...)
125 {
126 	if (!logfile)
127 		return;
128 	{
129 		char            tmp[1024];
130 		char            buf[ 128 * 1024 ];
131 		va_list         ap;
132 		time_t          tt;
133 		time(&tt);
134 		snprintf(tmp, sizeof(tmp), "%04d:%20s %s", cnt, asctime(gmtime(&tt)), fmt);
135 
136 		va_start(ap, fmt);
137 		vsnprintf(buf,sizeof(buf)-1, tmp, ap);
138 		va_end(ap);
139 
140 		fprintf(logfile,"%s\n",buf);
141 		fflush(logfile);
142 	}
143 }
144 
_token2name(int x)145 static char * _token2name(int x) {
146 	x = x & MASK_TOKEN;
147 #define CC(x) case x: return #x; break;
148 	switch (x) {
149 CC(TOKEN_ERROR);
150 CC(TOKEN_FETCH     );
151 CC(TOKEN_STORE    );
152 CC(TOKEN_DELETE  );
153 CC(TOKEN_NEXTKEY );
154 CC(TOKEN_FIRSTKEY);
155 CC(TOKEN_FROM );
156 CC(TOKEN_EXISTS );
157 CC(TOKEN_SYNC  );
158 CC(TOKEN_INIT  );
159 CC(TOKEN_CLOSE );
160 CC(TOKEN_CLEAR );
161 CC(TOKEN_FDPASS   );
162 CC(TOKEN_PING    );
163 CC(TOKEN_INC    );
164 CC(TOKEN_LIST   );
165 CC(TOKEN_DEC   );
166 CC(TOKEN_PACKINC  );
167 CC(TOKEN_PACKDEC );
168 CC(TOKEN_DROP    );
169 	default: 	return "TOKEN_UNKNOWN";
170 	};
171 	return "XXX";
172 }
173 
174 static char    *
_hex(dbms * me,int len,void * str)175 _hex(dbms * me, int len, void *str)
176 {
177 	size_t		i;
178 	char           *r = NULL;
179 
180 	if (len == 0) {
181 		r = (char *) (*me->malloc)( strlen("[0]\"\"")+1 );
182 		strcpy( r, "[0]\"\"" );
183 		return r;
184 		};
185 
186 	if (str == NULL) {
187 		r = (char *) (*me->malloc)( strlen("<null>")+1 );
188 		strcpy( r, "<null>" );
189 		return r;
190 		};
191 
192 	if (len > 50000) {
193 		r = (char *) (*me->malloc)( strlen("<toolong>")+1 );
194 		strcpy( r, "<toolong>" );
195 		return r;
196 		};
197 
198 	r = (char *) (*me->malloc)(3*len + 100);
199 	if (r == NULL) {
200 		r = (char *) (*me->malloc)( strlen("<outofmem>")+1 );
201 		strcpy( r, "<outofmem>" );
202 		return r;
203 		};
204 
205 	sprintf(r, "[%06d]\"", len);
206 
207 	for (i = 0; i < len; i++) {
208 		char p[3];
209 		unsigned int c = ((unsigned char *) str)[i];
210 
211 		if (c && isprint(c) && (c != '%')) {
212 			p[0] =  c; p[1] = '\0';
213 		} else {
214 			sprintf(p,"%%%02x", c);
215 		};
216 		strcat(r,p);
217 	};
218 	strcat(r,"\"");
219 	return r;
220 }
221 
222 extern dbms    *
dbms_connect(char * name,char * host,int port,dbms_xsmode_t mode,void * (* _my_malloc)(size_t s),void (* _my_free)(void * adr),void (* _my_report)(dbms_cause_t event,int count),void (* _my_error)(char * err,int erx),int bt_compare_fcn_type)223 dbms_connect(
224 	     char *name, char *host, int port,
225 	     dbms_xsmode_t mode,
226 	     void *(*_my_malloc) (size_t s),
227 	     void (*_my_free) (void *adr),
228 	     void (*_my_report) (dbms_cause_t event, int count),
229 	     void (*_my_error) (char *err, int erx),
230 	     int bt_compare_fcn_type
231 )
232 {
233 	dbms           *me;
234 	int             err = 0;
235 
236 	if ((name == NULL) || (*name == '\0'))
237 		return NULL;
238 
239 	if ((host == NULL) || (*host == '\0'))
240 		host = DBMS_HOST;
241 
242 	if (port == 0)
243 		port = DBMS_PORT;
244 
245 
246 	if (_my_malloc == NULL)
247 		_my_malloc = &malloc;
248 
249 	if (_my_free == NULL)
250 		_my_free = &free;
251 
252 	if (_my_report == NULL)
253 		_my_report = &_warning;
254 
255 	me = (dbms *) (*_my_malloc) (sizeof(dbms));
256 	if (me == NULL)
257 		return NULL;	/* rely on errno */
258 
259 	me->bt_compare_fcn_type = bt_compare_fcn_type;
260 
261 	me->malloc = _my_malloc;
262 	me->free = _my_free;
263 	me->callback = _my_report;
264 	me->error = _my_error;
265 	bzero(me->err, sizeof(me->err));
266 
267 	switch (mode) {
268 	case DBMS_XSMODE_DEFAULT:
269 		mode = DBMS_MODE;	/* default */
270 		break;
271 		;;
272 	case DBMS_XSMODE_RDONLY:
273 		break;
274 		;;
275 	case DBMS_XSMODE_RDWR:
276 		break;
277 		;;
278 	case DBMS_XSMODE_CREAT:
279 		break;
280 		;;
281 	case DBMS_XSMODE_DROP:
282 		break;
283 		;;
284 	default:
285 		{
286 			char            _buff[1024];
287 			snprintf(_buff, sizeof(_buff), "Unknown DBMS Access type (%d)", (int) mode);
288 			set_dbms_error(me, _buff, 0);
289 		}
290 		(*(me->free)) (me);
291 		return NULL;
292 		break;
293 	}
294 
295 	me->sockfd = -1;
296 	me->mode = (u_long) mode;
297 	me->port = port;
298 	me->name = (char *) (*me->malloc)( strlen(name)+1 );
299 	if( me->name == NULL ) {
300 		(*(me->free)) (me);
301 		return NULL;
302 		};
303 	strcpy( me->name, name );
304 	me->host = (char *) (*me->malloc)( strlen(host)+1 );
305 	if( me->host == NULL ) {
306 		(*(me->free)) (me->name);
307 		(*(me->free)) (me);
308 		return NULL;
309 		};
310 	strcpy( me->host, host );
311 
312 	/*
313 	 * quick and dirty hack to check for IP vs FQHN and fall through when
314 	 * in doubt to resolving.
315 	 */
316 	me->addr = INADDR_NONE;
317 	{
318 		int             i = 0;
319 		for (; me->host[i] != '\0'; i++)
320 			if (!isdigit((int) (me->host[i])) && me->host[i] != '.')
321 				break;
322 
323 		if (me->host[i] == '\0')
324 			me->addr = inet_addr(host);
325 	}
326 
327 	if (me->addr == INADDR_NONE) {
328 		struct hostent *hp;
329 		if ((hp = gethostbyname(me->host)) == NULL) {
330 			set_dbms_error(me, "Hostname lookup failed", errno);
331 			(*(me->free)) (me->name);
332 			(*(me->free)) (me->host);
333 			(*(me->free)) (me);
334 			return NULL;
335 		};
336 		/*
337 		 * copy the address, rather than the pointer as we need it
338 		 * later. It is an unsigned long.
339 		 */
340 		me->addr = *(u_long *) hp->h_addr;
341 	};
342 
343 	if ((err = reconnect(me))) {
344 		set_dbms_error(me, "Connection failed", err);
345 		(*(me->free)) (me->name);
346 		(*(me->free)) (me->host);
347 		(*(me->free)) (me);
348 		return NULL;
349 	};
350 
351 	if ((err = reselect(me))) {
352 		set_dbms_error(me, "Selection failed", err);
353 		(*(me->free)) (me->name);
354 		(*(me->free)) (me->host);
355 		(*(me->free)) (me);
356 		return NULL;
357 	};
358 
359 	{
360 		char * file = getenv("DBMS_LOG");
361 		cnt++;
362 		if (file && (logfile == NULL)) {
363 			if ((logfile = fopen(file, "a"))) {
364 				fprintf(stderr, "Logging to %s\n", file);
365 			} else {
366 				fprintf(stderr,"Failure to log to %s: %s\n",file,strerror(errno));
367 			};
368 		}
369 		if (logfile)
370 			_tlog("start %d %s",cnt,name);
371 	}
372 	return me;
373 }
374 
375 
376 static          dbms_error_t
reconnect(dbms * me)377 reconnect(
378 	  dbms * me
379 )
380 {
381 	struct sockaddr_in server;
382 	int             one = 1;
383 	int             csnd_len, csnd, sndbuf = 16 * 1024;
384 	int             e = 0;
385 
386 	/*
387 	 * we could moan if me->sockfd is still	set.. or do a silent close,
388 	 * just in case ?
389 	 */
390 	if (me->sockfd >= 0) {
391 		shutdown(me->sockfd, 2);
392 		close(me->sockfd);
393 	}
394 	if ((me->sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
395 		set_dbms_error(me, "socket", errno);
396 		return E_ERROR;
397 	}
398 	if (0) {
399 		/*
400 		 * allow for re-use; to avoid that we have to wait for a fair
401 		 * amounth of time after disasterous crashes,
402 		 */
403 		if ((setsockopt(me->sockfd, SOL_SOCKET, SO_REUSEADDR,
404 				(const char *) &one, sizeof(one))) < 0) {
405 			set_dbms_error(me, "setsockopt(reuse)", errno);
406 			me->sockfd = -1;
407 			close(me->sockfd);
408 			return E_ERROR;
409 		};
410 	}
411 	csnd_len = sizeof(csnd);
412 	if (getsockopt(me->sockfd, SOL_SOCKET, SO_SNDBUF,
413 		       (void *) &csnd, (void *) &csnd_len) < 0) {
414 		set_dbms_error(me, "getsockopt(sndbuff)", errno);
415 		me->sockfd = -1;
416 		close(me->sockfd);
417 		return E_ERROR;
418 	};
419 	assert(csnd_len == sizeof(csnd));
420 
421 	/*
422 	 * only set when smaller
423 	 */
424 	if ((csnd < sndbuf) &&
425 	    (setsockopt(me->sockfd, SOL_SOCKET, SO_SNDBUF,
426 			(const void *) &sndbuf, sizeof(sndbuf)) < 0)) {
427 		set_dbms_error(me, "setsockopt(sndbuff)", errno);
428 		me->sockfd = -1;
429 		close(me->sockfd);
430 		return E_ERROR;
431 	};
432 
433 	/*
434 	 * Discard any data still in our send buffer whe closing and do not
435 	 * linger around for any thing from the server.
436 	 */
437 	{
438 		struct linger   l = {1, 0};	/* Linger On, Lingertime 0 */
439 		if ((setsockopt(me->sockfd, SOL_SOCKET, SO_LINGER,
440 				(const char *) &l, sizeof(l))) < 0) {
441 			set_dbms_error(me, "setsockopt(disble-nagle)", errno);
442 			me->sockfd = -1;
443 			close(me->sockfd);
444 			return E_ERROR;
445 		};
446 	}
447 	/*
448 	 * disable Nagle, for speed
449 	 */
450 	if ((setsockopt(me->sockfd, IPPROTO_TCP, TCP_NODELAY,
451 			(const char *) &one, sizeof(one))) < 0) {
452 		set_dbms_error(me, "setsockopt(disble-nagle)", errno);
453 		me->sockfd = -1;
454 		close(me->sockfd);
455 		return E_ERROR;
456 	};
457 
458 	/*
459 	 * larger buffer; as we know that we can initially slide open the
460 	 * window bigger.
461 	 */
462 
463 	while (e++ < 4) {
464 		bzero((char *) &server, sizeof(server));
465 
466 		server.sin_family = AF_INET;
467 		server.sin_addr.s_addr = me->addr;
468 		server.sin_port = htons(me->port);
469 
470 		if (connect(me->sockfd, (struct sockaddr *) & server, sizeof(server)) == 0)
471 			return 0;
472 		if (errno != EADDRINUSE)
473 			break;
474 
475 		usleep(e * e * 100 * 1000);	/* wait 0.1, 0.4, 0.9, 2.5
476 						 * second */
477 	}
478 	mark_dbms_error(me, "connect()", errno);
479 	me->sockfd = -1;
480 	return E_ERROR;
481 }
482 
483 dbms_error_t
dbms_disconnect(dbms * me)484 dbms_disconnect(
485 		dbms * me
486 )
487 {
488 	int             retval;
489 
490 	assert(me);
491 	assert(me->sockfd >= 0);
492 
493 	dbms_comms(me, TOKEN_CLOSE, &retval, NULL, NULL, NULL, NULL);
494 #ifdef TPS
495 	if (getenv("GATEWAY_INTERFACE") == NULL)
496 		fprintf(stderr, "Performance: %ld # %.2f mSec/trans = %.1f\n",
497 			ttps, ttime / ttps / 1000.0,
498 			1000000.0 * ttps / ttime
499 			);
500 #endif
501 	shutdown(me->sockfd, 2);
502 	close(me->sockfd);
503 	(*(me->free)) (me->name);
504 	(*(me->free)) (me->host);
505 	(*(me->free)) (me);
506 	if (logfile) fclose(logfile);
507 
508 	return 0;
509 }
510 
511 static          dbms_error_t
getpack(dbms * me,unsigned long len,DBT * r)512 getpack(
513 	dbms * me,
514 	unsigned long len,
515 	DBT * r
516 )
517 {
518 	unsigned int    gotten;
519 	char           *at;
520 
521 	r->size = 0;
522 	r->data = NULL;
523 
524 	if (len == 0)
525 		return 0;
526 
527 	if (r == NULL)
528 		return E_BUG;
529 
530 #ifdef STATIC_SC_BUFF
531 	if (len > MAX_SC_PAYLOAD)
532 		return E_TOOBIG;
533 #endif
534 	r->size = 0;
535 	r->data = (char *) (*me->malloc) (len);
536 
537 	if (r->data == 0)
538 		return E_NOMEM;
539 
540 	/* should block ? */
541 
542 	for (at = r->data, gotten = 0; gotten < len;) {
543 		ssize_t         l;
544 		l = recv(me->sockfd, at, len - gotten, 0);
545 		if (l < 0) {
546 			set_dbms_error(me, "packet-recv()", errno);
547 			(*me->free) (r->data);
548 			r->data = NULL;
549 			return E_ERROR;
550 		} else if (l == 0) {
551 			(*me->free) (r->data);
552 			r->data = NULL;
553 			return E_CLOSE;
554 		};
555 		gotten += l, at += l;
556 	};
557 
558 	r->size = len;
559 	return 0;
560 };
561 
562 static
563 dbms_error_t
i_comms(dbms * me,int token,int * retval,DBT * v1,DBT * v2,DBT * r1,DBT * r2)564 i_comms(
565 	dbms * me,
566 	int token,
567 	int *retval,
568 	DBT * v1,
569 	DBT * v2,
570 	DBT * r1,
571 	DBT * r2
572 )
573 {
574 	int             err = 0;
575 	DBT             rr1, rr2;
576 	struct header   cmd;
577 	struct iovec    iov[3];
578 	struct msghdr   msg;
579 	size_t          s;
580 
581 	if (retval)
582 		*retval = -1;
583 
584 	rr1.data = rr2.data = NULL;
585 
586 	cmd.token = token | F_CLIENT_SIDE;
587 
588 	cmd.len1 = htonl((v1 == NULL) ? 0 : v1->size);
589 	cmd.len2 = htonl((v2 == NULL) ? 0 : v2->size);
590 
591 	iov[0].iov_base = (char *) &cmd;
592 	iov[0].iov_len = sizeof(cmd);
593 
594 	iov[1].iov_base = (v1 == NULL) ? NULL : v1->data;
595 	iov[1].iov_len = (v1 == NULL) ? 0 : v1->size;
596 
597 	iov[2].iov_base = (v2 == NULL) ? NULL : v2->data;
598 	iov[2].iov_len = (v2 == NULL) ? 0 : v2->size;
599 
600 #ifdef STATIC_CS_BUFF
601 	if (iov[0].iov_len + iov[1].iov_len + iov[2].iov_len > MAX_CS_PAYLOAD)
602 		return E_TOOBIG;
603 #endif
604 	msg.msg_name = NULL;
605 	msg.msg_namelen = 0;
606 	msg.msg_iov = iov;
607 	msg.msg_iovlen = 3;
608 /* temporal fix to make CYGWN compile the basic thing - need better solution */
609 #ifndef RDFSTORE_PLATFORM_CYGWIN
610 	msg.msg_control = NULL;
611 	msg.msg_controllen = 0;
612 	msg.msg_flags = 0;
613 #endif
614 	s = sendmsg(me->sockfd, &msg, 0);
615 
616 	if (s == 0) {
617 		err = E_CLOSE;
618 		goto retry_com;
619 	} else if (s < 0) {
620 		mark_dbms_error(me, "sendmsg()", errno);
621 		err = E_ERROR;
622 		goto retry_com;
623 	} else if (s != iov[0].iov_len + iov[1].iov_len + iov[2].iov_len) {
624 		err = E_FULLWRITE;
625 		goto exit_com;
626 	};
627 
628 	s = recv(me->sockfd, &cmd, sizeof(cmd), 0);
629 
630 	if (s == 0) {
631 		err = E_CLOSE;
632 		goto retry_com;
633 	} else if (s < 0) {
634 		mark_dbms_error(me, "header-recv()", errno);
635 		err = E_ERROR;
636 		goto retry_com;
637 	} else if (s != sizeof(cmd)) {
638 		err = E_FULLREAD;
639 		goto exit_com;
640 	};
641 
642 	cmd.len1 = ntohl(cmd.len1);
643 	cmd.len2 = ntohl(cmd.len2);
644 
645 	rr2.data = rr1.data = NULL;
646 	if ((err = getpack(me, cmd.len1, r1 ? r1 : &rr1)) != 0)
647 		goto retry_com;
648 
649 	if ((err = getpack(me, cmd.len2, r2 ? r2 : &rr2)) != 0)
650 		goto retry_com;
651 
652 	if ((cmd.token & MASK_TOKEN) == TOKEN_ERROR) {
653 		char           *d = NULL;
654 		int             l = 0;
655 		if (r1) {
656 			l = r1->size;
657 			d = r1->data;
658 		} else {
659 			l = rr1.size;
660 			d = rr1.data;
661 		};
662 		errno = 0;
663 		if ((d) && (l > 0)) {
664 			d[l] = '\0';
665 		} else {
666 			d = "DBMS side errror, no cause reported";
667 		};
668 		err = E_ERROR;
669 		errno = 0;
670 		set_dbms_error(me, d, err);
671 		goto exit_com;
672 	} else if (((cmd.token & MASK_TOKEN) != token) ||
673 		   ((cmd.token | F_SERVER_SIDE) == 0)) {
674 		err = E_PROTO;
675 		goto exit_com;
676 	};
677 
678 	if ((rr1.data != NULL) && (rr1.size)) {
679 		(*me->free) (rr1.data);
680 		rr1.size = 0;
681 	};
682 
683 	if ((rr2.data != NULL) && (rr2.size)) {
684 		(*me->free) (rr2.data);
685 		rr1.size = 0;
686 	};
687 
688 	if ((cmd.token & MASK_STATUS) == F_FOUND) {
689 		if (retval)
690 			*retval = 0;
691 	} else {
692 		if (retval)
693 			*retval = 1;
694 		if (r1 != NULL) {
695 			if ((r1->size) && (r1->size))
696 				(*me->free) (r1->data);
697 			r1->data = NULL;
698 			r1->size = 0;
699 		};
700 		if (r2 != NULL) {
701 			if ((r2->size) && (r2->size))
702 				(*me->free) (r2->data);
703 			r2->data = NULL;
704 			r2->size = 0;
705 		};
706 	};
707 
708 	err = 0;
709 	goto done_com;
710 
711 retry_com:
712 exit_com:
713 	if ((r1 != NULL) && (r1->data != NULL) && (r1->size != 0)) {
714 		(*me->free) (r1->data);
715 		r1->size = 0;
716 	};
717 
718 	if ((r2 != NULL) && (r2->data != NULL) && (r2->size != 0)) {
719 		(*me->free) (r2->data);
720 		r2->size = 0;
721 	};
722 
723 	if ((rr1.data != NULL) && (rr1.size)) {
724 		(*me->free) (rr1.data);
725 		rr1.size = 0;
726 	};
727 
728 	if ((rr2.data != NULL) && (rr1.size)) {
729 		(*me->free) (rr2.data);
730 		rr2.size = 0;
731 	};
732 
733 done_com:
734 
735 	return err;
736 }
737 
738 
739 static          dbms_error_t
reselect(dbms * me)740 reselect(dbms * me)
741 {
742 	DBT             r1, r2, v1;
743 	int             retval;
744 	u_long          buff[3];
745 	int             err = 0;
746 	u_long          proto = DBMS_PROTO;
747 	u_long          mode = me->mode;
748 	char           *name = me->name;
749 	u_long          bt_compare_fcn_type = me->bt_compare_fcn_type;
750 
751 	assert(sizeof(buff) == 12);	/* really 4 bytes on the network ? */
752 
753 	buff[0] = htonl(proto);
754 	buff[1] = htonl(mode);
755 	buff[2] = htonl(bt_compare_fcn_type);
756 
757 	r1.size = sizeof(buff);
758 	r1.data = &buff;
759 
760 	r2.size = strlen(name);
761 	r2.data = name;
762 
763 	v1.data = NULL;		/* set up buffer for return protocol and
764 				 * confirmation */
765 	v1.size = 0;
766 
767 	if ((err = i_comms(me, TOKEN_INIT, &retval, &r1, &r2, &v1, NULL))) {
768 		/*
769 		 * keep the exit code fprintf(stderr,"Fail2\n");
770 		 */
771 	} else if (retval == 1) {
772 		err = E_NOPE;
773 	} else if (retval < 0) {
774 		err = E_PROTO;
775 	} else if (ntohl(*((u_long *) v1.data)) > DBMS_PROTO) {
776 		err = E_VERSION;
777 	};
778 
779 	if (v1.size)
780 		(*me->free) (v1.data);
781 	return err;
782 }
783 
784 extern          dbms_error_t
dbms_comms(dbms * me,int token,int * retval,DBT * v1,DBT * v2,DBT * r1,DBT * r2)785 dbms_comms(
786 	   dbms * me,
787 	   int token,
788 	   int *retval,
789 	   DBT * v1,
790 	   DBT * v2,
791 	   DBT * r1,
792 	   DBT * r2
793 )
794 {
795 	int             errs = 5;
796 	int             err = 0;
797 
798 	struct sigaction act, oact;
799 #ifdef TPS
800 	gettimeofday(&tstart, NULL);
801 #endif
802 
803 	if (logfile) {
804 		char           *p1 = NULL;
805 		char           *p2 = NULL;
806 		if (v1)
807 			p1 = _hex(me, v1->size, v1->data);
808 		if (v2)
809 			p2 = _hex(me, v2->size, v2->data);
810 
811 		_tlog("%s@%s:%d %s(%02d) >>> %s %s",
812 		      me->name, me->host,me->port, _token2name(token), token,
813 		      p1 ? p1 : "<null>",
814 		      p2 ? p2 : "<null>");
815 		if (p1) (*me->free)(p1);
816 		if (p2) (*me->free)(p2);
817 	}
818 	/*
819 	 * for now, SA_RESTART any interupted function calls
820 	 */
821 	act.sa_handler = SIG_IGN;
822 	sigemptyset(&act.sa_mask);
823 	act.sa_flags = SA_RESTART;
824 
825 	sigaction(SIGPIPE, &act, &oact);
826 	if (retval)
827 		*retval = -1;
828 
829 	/*
830 	 * now obviously this is wrong; we do _not_ want to continue during
831 	 * certain errors.. ah well..
832 	 */
833 	for (errs = 0; errs < 10; errs++) {
834 		if ((me->sockfd >= 0) &&
835 		  ((err = i_comms(me, token, retval, v1, v2, r1, r2)) == 0))
836 			break;
837 
838 		/*
839 		 * we could of course exit on certain errors, but which ? we
840 		 * call recv, et.al.
841 		 */
842 		if (err == EAGAIN || err == EINTR)
843 			continue;
844 
845 		/*
846 		 * If the DB on the other end reported an error - then it
847 		 * obviously is not a comms problem - so retrying makes no
848 		 * sense
849 		 */
850 		if (err == E_ERROR)
851 			break;
852 
853 		sleep(errs * 2);
854 		shutdown(me->sockfd, 2);
855 		close(me->sockfd);
856 
857 		me->sockfd = -1;/* mark that we have an issue */
858 		if (((err = reconnect(me)) == 0) &&
859 		    ((err = reselect(me)) == 0)) {
860 			if (errs)
861 				(*(me->callback)) (DBMS_EVENT_RECONNECT, errs);
862 		} else if (errs)
863 			(*(me->callback)) (DBMS_EVENT_WAITING, errs);
864 	};
865 
866 #ifdef TPS
867 	gettimeofday(&tnow, NULL);
868 	ttps++;
869 	ttime +=
870 		(tnow.tv_sec - tstart.tv_sec) * 1000000 +
871 		(tnow.tv_usec - tstart.tv_usec) * 1;
872 #endif
873 	/*
874 	 * restore whatever it was before
875 	 */
876 	sigaction(SIGPIPE, &oact, &act);
877 	if (logfile) {
878 		char           *q1 = NULL;
879 		char           *q2 = NULL;
880 		if (r1)
881 			q1 = _hex(me, r1->size, r1->data);
882 		if (r2)
883 			q2 = _hex(me, r2->size, r2->data);
884 
885 		_tlog("%s@%s:%d %s(%02d) <<< %s %s",
886 		      me->name, me->host,me->port, _token2name(token), token,
887 		      q1 ? q1 : "<null>",
888 		      q2 ? q2 : "<null>");
889 		if (q1) (*me->free)(q1);
890 		if (q2) (*me->free)(q2);
891 	}
892 	return err;
893 };
894