1 /* DBMS Proxy
2 *
3 * Copyright (c) 2005 Asemantics S.R.L., All Rights Reserved.
4 */
5
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <sys/un.h>
9 #include <stdio.h>
10 #include <sys/uio.h>
11 #include <unistd.h>
12 #include <string.h>
13 #include <strings.h>
14 #include <stdlib.h>
15 #include <errno.h>
16 #ifdef RDFSTORE_PLATFORM_DARWIN
17 #include <stdint.h>
18 #endif
19 #include <unistd.h>
20
21 #include <dbms.h>
22 #include <dbms_comms.h>
23
24 #include "dbmsproxy.h"
25
26 #define RBUFF (1024)
27 #define WBUFF (1024)
28
29 #ifndef pdie
30 #define pdie(x) { perror(x); exit(1); }
31 #endif
32
33 #ifndef max
34 #define max(x,y) (((x) > (y)) ? (x) : (y))
35 #endif
36
37 FILE *errorout;
38 int verbose = 0, debug = 0, trace_on = 0, sysloglog = 1, stderrlog = 0;
39
childied(int i)40 static void childied(int i)
41 {
42 return;
43 }
44
_log(int level,char * msg,...)45 static void _log(int level, char *msg,...)
46 {
47 }
48
49 static void select_loop(void);
50 static int process(unsigned char *cmd_buff, int r, unsigned char *out_buff, int *w, int maxw);
51
52 int sockfd;
53 dbms *dc;
54 fd_set rset, wset, eset, alleset, allrset, allwset;
55 int maxfd;
56
usage(const char * s)57 static void usage(const char *s)
58 {
59 fprintf(stderr, "Syntax: %s "
60 "[-U | -u <userid>] " /* run as user */
61 "[-e <file>]" /* error log */
62 "[-P <pid-file>] " /* write pid file to */
63 "[-d <directory_prefix>] " /* chroot */
64 "[-p <socket-device (default is " UNIX_SOCK ")>] " /* unix datagram socket
65 * path */
66 "[-X] " /* max error debuggin */
67 "[-t] " /* tracing */
68 "[-D] " /* do not detach */
69 "[-v] " /* version */
70 "<dbms url>\n", s);
71 exit(1);
72 }
73
main(int argc,char * argv[])74 int main(int argc, char *argv[])
75 {
76 struct sockaddr_un server;
77 char *sockpath = UNIX_SOCK;
78 int dtch = 1;
79 char *as_user = USER;
80 struct sigaction act, oact;
81 char *my_dir = DIR_PREFIX;
82 char *pid_file = PID_FILE;
83 int i, len;
84 char *pname = argv[0];
85 char ch;
86 errorout = stderr;
87
88 while ((ch = getopt(argc, argv, "p:d:u:UP:xDtvXe:Eh")) != -1)
89 switch (ch) {
90 case 'p':
91 sockpath = optarg;
92 break;
93 case 'd':
94 my_dir = optarg;
95 break;
96 case 'u':
97 as_user = optarg;
98 break;
99 case 'U':
100 as_user = NULL;
101 break;
102 case 'P':
103 pid_file = optarg;
104 break;
105 case 'x':
106 verbose++;
107 debug++;
108 if (debug > 2)
109 dtch = 0;
110 break;
111 case 'D':
112 dtch = 0;
113 break;
114 case 't':
115 trace_on = 1;
116 break;
117 case 'v':
118 printf("Version: %s\n", VERSION);
119 printf("Default dir: %s\n", DIR_PREFIX);
120 printf("Default device: %s\n", UNIX_SOCK);
121 printf("Default pidfle: %s\n", PID_FILE);
122 exit(0);
123 break;
124 case 'X':
125 verbose = debug = 100;
126 dtch = 0;
127 sysloglog = 0;
128 stderrlog = 1;
129 break;
130 case 'e':
131 stderrlog = 1;
132 if ((errorout = fopen(argv[++i], "a")) == NULL) {
133 fprintf(stderr, "Aborted. Cannot open logfile %s for writing: %s\n",
134 argv[i], strerror(errno));
135 exit(1);
136 };
137 break;
138 case 'E':
139 stderrlog = 1;
140 break;
141 case 'h':
142 default:
143 usage(pname);
144 break;
145 };
146 argc -= optind;
147 argv += optind;
148 if (argc != 1)
149 usage(pname);
150
151 {
152 char *uri = *argv;
153 char *host = NULL;
154 char *db = NULL;
155 int port = 0;
156 if (!strncasecmp(uri, "dbms://", 7))
157 uri += 7;
158 if ((db = index(uri, '/'))) {
159 char *p;
160 host = uri;
161 *db = '\0';
162 db++;
163 if ((p = index(host, ':'))) {
164 *p = '\0';
165 port = atoi(p++);
166 };
167 }
168 else {
169 db = uri;
170 };
171
172 if (!(dc = dbms_connect(db, host, port, DBMS_XSMODE_RDONLY, NULL, NULL, NULL, NULL, 0)))
173 pdie(dbms_get_error(NULL));
174 };
175
176 if (sysloglog)
177 openlog(pname, LOG_LOCAL4, LOG_PID | LOG_CONS);
178
179 unlink(sockpath); /* XX warn, etc.. */
180
181 if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
182 pdie("Could not open socket");
183
184 memset(&server, 0, sizeof(server));
185 server.sun_family = SOCK_STREAM;
186 strncpy(server.sun_path, sockpath, sizeof(server.sun_path) - 1);
187
188 #if defined(SCM_RIGHTS) && !defined(RDFSTORE_PLATFORM_LINUX)
189 len = sizeof(server.sun_family) + strlen(server.sun_path) + sizeof(server.sun_len) + 1;
190 server.sun_len = len;
191 #else
192 len = strlen(server.sun_path) + sizeof(server.sun_family);
193 #endif
194
195 if ((bind(sockfd, (struct sockaddr *) & server, len)) < 0)
196 pdie("Cannot bind server to unix domain socket.");
197
198 if (listen(sockfd, 0) < 0)
199 pdie("Could not start to listen to my port");
200
201 /*
202 * fork and detach if ness.
203 */
204 if (dtch) {
205 pid_t pid;
206 fclose(stdin);
207 if (!trace_on)
208 fclose(stdout);
209 if ((pid = fork()) < 0) {
210 perror("Could not fork");
211 exit(1);
212 }
213 else if (pid != 0) {
214 FILE *fd;
215 if (!(fd = fopen(pid_file, "w"))) {
216 fprintf(stderr, "Warning: Could not write pid file %s:%s",
217 pid_file, strerror(errno));
218 exit(1);
219 };
220 fprintf(fd, "%d\n", (int) pid);
221 fclose(fd);
222 exit(0);
223 };
224
225 /*
226 * become session leader
227 */
228 if ((setsid()) < 0)
229 pdie("Could not become session leader");
230 };
231
232 /*
233 * XXX security hole.. fix ..
234 */
235 if (as_user != NULL) {
236 struct passwd *p = getpwnam(as_user);
237 uid_t uid;
238
239 uid = (p == NULL) ? atoi(as_user) : p->pw_uid;
240
241 if (!uid || setuid(uid)) {
242 perror("Cannot setuid");
243 exit(0);
244 };
245 };
246
247 chdir(my_dir); /* change working directory */
248 chroot(my_dir); /* for sanities sake -- must be after things
249 * like pid file where written */
250 umask(0); /* clear our file mode creation mask */
251
252 FD_ZERO(&allrset);
253 FD_ZERO(&allwset);
254 FD_ZERO(&alleset);
255
256 FD_SET(sockfd, &allrset);
257 FD_SET(sockfd, &alleset);
258
259 maxfd = sockfd;
260 _log(LOG_NOTICE, "Waiting for connections on %s", sockpath);
261
262 #if 0
263 signal(SIGHUP, dumpie);
264 signal(SIGUSR1, loglevel);
265 signal(SIGUSR2, loglevel);
266 signal(SIGINT, cleandown);
267 signal(SIGQUIT, cleandown);
268 signal(SIGKILL, cleandown);
269 signal(SIGTERM, cleandown);
270 #endif
271 signal(SIGCHLD, childied);
272
273 /*
274 * for now, SA_RESTART any interupted PIPE calls
275 */
276 act.sa_handler = SIG_IGN;
277 sigemptyset(&act.sa_mask);
278 act.sa_flags = SA_RESTART;
279 sigaction(SIGPIPE, &act, &oact);
280
281 select_loop();
282 return 0; /* keep the compiler happy.. */
283 }
284
285
select_loop(void)286 void select_loop(void)
287 {
288 unsigned char *rbuff[FD_SETSIZE], *wbuff[FD_SETSIZE];
289 int fd, w[FD_SETSIZE], r[FD_SETSIZE];
290
291 for (;;) {
292 struct timeval np = {600, 0}; /* seconds and micro seconds. */
293 int n;
294
295 rset = allrset;
296 wset = allwset;
297 eset = alleset;
298
299 if ((n = select(maxfd + 1, &rset, &wset, &eset, &np)) < 0) {
300 if (errno != EINTR)
301 _log(LOG_ERR, "RWE Select Probem %s", strerror(errno));
302 continue;
303 };
304
305 #if 0
306 /* We've been idle for 600 seconds (np above) */
307 if (n == 0)
308 exit(0);
309 #endif
310
311 /*
312 * Is someone knocking on our front door ?
313 */
314 if (FD_ISSET(sockfd, &rset)) {
315 struct sockaddr_in client;
316 int len = sizeof(client);
317
318 if ((fd = accept(sockfd,
319 (struct sockaddr *) & client, &len)) < 0) {
320 _log(LOG_ERR, "Could not accept");
321 }
322 else {
323 /* activate select on my connection */
324 FD_SET(fd, &allrset);
325 FD_SET(fd, &alleset);
326 maxfd = max(maxfd, fd);
327 rbuff[fd] = malloc(RBUFF);
328 wbuff[fd] = malloc(WBUFF);
329 r[fd] = 0;
330 w[fd] = 0;
331 };
332 } /* knock on the door */
333
334 for (fd = 0; fd <= maxfd; fd++) {
335 int cc = 0;
336
337 if (fd == sockfd)
338 continue;
339
340 if (FD_ISSET(fd, &rset)) {
341 int n;
342 if (r[fd] > RBUFF) {
343 _log(LOG_ERR, "Closed due to rbuffer overfilling");
344 goto _c;
345 };
346 n = read(fd, rbuff[fd], RBUFF);
347 if (n == 0) {
348 _log(LOG_DEBUG, "Closed after zero read (normal close)");
349 goto _c;
350 }
351 else if ((n < 0) && (errno != EINTR) && (errno != EAGAIN)) {
352 _log(LOG_ERR, "Closed after read error");
353 exit(0);
354 }
355 else if (n > 0) {
356 int p;
357 r[fd] += n;
358 p = process(rbuff[fd], r[fd], wbuff[fd], &w[fd], WBUFF);
359 if (p) {
360 if (p < 0) {
361 _log(LOG_ERR, "Output buffer overflow");
362 goto _c;
363 };
364 r[fd] -= p;
365 if (r[fd] > 0) {
366 memcpy(rbuff, rbuff[fd] + p, r[fd]);
367 }
368 else {
369 r[fd] = 0;
370 }
371 FD_SET(fd, &allwset);
372 cc = 1;
373 }
374 }
375 }
376 if (cc | FD_ISSET(fd, &wset)) {
377 int n = write(fd, wbuff[fd], w[fd]);
378 if ((n == 0) && w[fd]) {
379 _log(LOG_DEBUG, "Closed on zero write");
380 goto _c;
381 }
382 else if ((n < 0) && (errno != EINTR) && (errno != EAGAIN)) {
383 _log(LOG_ERR, "Closed after write error");
384 goto _c;
385 }
386 else if (n > 0) {
387 w[fd] -= n;
388 if (w[fd] > 0) {
389 memcpy(wbuff[fd], rbuff[fd] + n, w[fd]);
390 }
391 else {
392 w[fd] = 0;
393 FD_CLR(fd, &allwset);
394 }
395 };
396 }
397 if (FD_ISSET(fd, &eset)) {
398 _log(LOG_ERR, "Somethinig nasty");
399 exit(0);
400 }
401 continue;
402
403 _c:
404 FD_CLR(fd, &allrset);
405 FD_CLR(fd, &allwset);
406 FD_CLR(fd, &alleset);
407 free(rbuff[fd]);
408 free(wbuff[fd]);
409 r[fd] = -1;
410 w[fd] = -1;
411 } /* loop over all FD's */
412 } /* endless for */
413 }
414
process(unsigned char * cmd_buff,int r,unsigned char * out_buff,int * wp,int maxw)415 int process(
416 unsigned char *cmd_buff, int r,
417 unsigned char *out_buff, int *wp,
418 int maxw
419 )
420 {
421 int at = 0;
422 out_buff += *wp;
423
424 /* Request - <1 byte len of key> [ key ] */
425 /* Reply <1 byte len>, 1 byte ok/nok, string */
426
427 for (; at < r;) {
428 unsigned char *cmd = cmd_buff + at;
429 int len = cmd[0];
430
431 char key[256];
432 DBT k, val;
433 int e;
434
435 /* Wait for more data if there is not a complete packet to process. */
436 if (r < len + 1)
437 return at;
438
439 strncpy(key, cmd + 1, len);
440 key[len] = '\0';
441
442 at = at + len + 1;
443
444 k.data = key;
445 k.size = len;
446
447 if (trace_on)
448 printf("Requesting %s\n", key);
449
450 r = 0;
451 #ifdef LOCALTEST
452 val.data = "1234567890";
453 val.size = 10;
454 e = 0;
455 #else
456 e = dbms_comms(dc, TOKEN_FETCH, &r, &k, NULL, NULL, &val);
457 #endif
458 if (e || r != 0) {
459 if (*wp + 10 > maxw)
460 return -1;
461 strcpy(out_buff + 2, dbms_get_error(dc));
462 out_buff[0] = strlen(out_buff + 2);
463 out_buff[1] = 1;
464 }
465 else {
466 if (*wp + 1 + val.size > maxw)
467 return -1;
468
469 strncpy(out_buff + 2, val.data, val.size);
470 out_buff[0] = val.size;
471 out_buff[1] = 0;
472 };
473 if (trace_on)
474 printf(" reply '%s' len(%d)\n", out_buff + 2, out_buff[0]);
475
476 /* Queue up the output */
477 *wp += 2 + out_buff[0];
478
479 };
480
481 /* Return pointer to any partial/non completed commands */
482 return at;
483 }
484