1 /*---------------------------------------------------------------------------
2 * PostgreSQL Database package.
3 *
4 * Based on code written and donated 2001 by Florian Heinz.
5 *---------------------------------------------------------------------------
6 * The interface to the PostgreSQL database is implemented through the
7 * concept of a controlling object: when opening a database connection,
8 * the LPC code has to provide a callback function. The object this function
9 * is bound to is the controlling object: all queries to the database
10 * will be issued by this object, and the responses will be sent to the
11 * callback function.
12 *
13 * The interface is also asynchronous: the pg_query() efun just queues
14 * the query with the database connection, and returns immediately. When
15 * the database has finished working the query, the callback function is
16 * called with the results.
17 *
18 * The callback function can be defined by name or by closure, and can
19 * be defined with extra parameters:
20 *
21 * void <callback>(int type, mixed ret, int id [, mixed extra...])
22 *
23 * <type> is the type of the call, <id> identifies the query for which
24 * this call is executed:
25 *
26 * PGRES_TUPLES_OK: <ret> is the result from a query.
27 * It is either a mapping (field name as key, indexing
28 * <n> values for n returned tuples), or an array
29 * of arrays (one per row).
30 *
31 * PGRES_COMMAND_OK: <ret> is a string which contains the
32 * server response (e.g. on INSERT or DELETE)
33 *
34 * PGRES_BAD_RESPONSE,
35 * PGRES_NONFATAL_ERROR,
36 * PGRES_FATAL_ERROR: ret is the error-string
37 *
38 *
39 * void <callback>(int type, mixed ret [, mixed extra...])
40 *
41 * <type> is the type of the call, which is not related a specific query:
42 *
43 * PGCONN_SUCCESS: The database-connection was established, <ret> is
44 * a dummy string.
45 * PGCONN_FAILED: The database-connection failed, <ret> is the error
46 * message.
47 * The first message to the callback after a call to pg_connect()
48 * is always one of these two.
49 *
50 * PGRES_NOTICE: <ret> is a informational text.
51 *
52 * PGCONN_ABORTED: If the connection to the backend fails
53 * we try to re-establish (reset) it. If the
54 * reset fails, the connection is closed and
55 * this value is returned. Consider the
56 * connection gone and don't try to close or
57 * otherwise operate further on it.
58 * <ret> is a dummy string.
59 *---------------------------------------------------------------------------
60 */
61
62 #include "driver.h"
63
64 #ifdef USE_PGSQL
65
66 #include "typedefs.h"
67
68 #include "my-alloca.h"
69 #include <errno.h>
70 #include <stddef.h>
71 #include <time.h>
72 #include <unistd.h>
73
74 #ifdef HAVE_POLL
75
76 #include <sys/poll.h>
77
78 #else
79
80 # if defined(_AIX)
81 # include <sys/select.h>
82 # endif
83
84 #endif /* HAVE_POLL */
85
86 #include <libpq-fe.h>
87
88 #include "pkg-pgsql.h"
89
90 #include "actions.h"
91 #include "array.h"
92 #include "gcollect.h"
93 #include "instrs.h"
94 #include "interpret.h"
95 #include "main.h"
96 #include "mapping.h"
97 #include "mstrings.h"
98 #include "simulate.h"
99 #include "stdstrings.h"
100 #include "xalloc.h"
101
102 #include "../mudlib/sys/pgsql.h"
103
104 /*-------------------------------------------------------------------------*/
105
106 #define MAX_RESETS 5 /* Number of reset tries, at max. one per second */
107
108 /*-------------------------------------------------------------------------*/
109 /* Types */
110
111 /* --- struct query_queue_s: one entry in the query queue
112 * The queue is organized as single-linked list, one for each database
113 * connection.
114 */
115 struct query_queue_s
116 {
117 long id;
118 char * str;
119 int flags;
120 struct query_queue_s * next;
121 };
122
123 /* --- struct dbconn_s: one active database connection.
124 * The connections are held in a singly linked list.
125 * There can be only one connection per object.
126 */
127
128 struct dbconn_s
129 {
130 callback_t callback; /* The callback */
131
132 PGconn * conn;
133 PostgresPollingStatusType pgstate;
134
135 int state;
136 int fd;
137 int resets;
138
139 time_t lastreset;
140 time_t lastreply;
141
142 struct query_queue_s * queue;
143
144 struct dbconn_s * next;
145 };
146
147
148 /* Possible struct dbconn_s.states */
149
150 #define PG_UNCONNECTED 0
151 #define PG_CONNECTING 1
152 #define PG_RESETTING 2
153 #define PG_RESET_NEXT 3
154 #define PG_IDLE 4
155 #define PG_SENDQUERY 5
156 #define PG_WAITREPLY 6
157 #define PG_REPLYREADY 7
158
159
160 typedef struct dbconn_s dbconn_t;
161 typedef struct query_queue_s query_queue_t;
162
163 /*-------------------------------------------------------------------------*/
164
165 static dbconn_t *head = NULL;
166 /* The list of database connections.
167 */
168
169 static long query_id = 1;
170 /* The query ID counter, used to generate unique IDs.
171 */
172
173 /*-------------------------------------------------------------------------*/
174 /* Forward Declarations */
175
176 static void pgnotice (dbconn_t *pgconn, const char *msg);
177
178 /*-------------------------------------------------------------------------*/
179 void
pg_setfds(fd_set * readfds,fd_set * writefds,int * nfds)180 pg_setfds (fd_set *readfds, fd_set *writefds, int *nfds)
181
182 /* Called from the get_message() loop in comm.c, this function has to add
183 * the fds of the database connections to the fd sets.
184 */
185
186 {
187 dbconn_t *ptr;
188
189 for (ptr = head; ptr != NULL; ptr = ptr->next)
190 {
191 if (ptr->fd < 0)
192 continue;
193 if ((ptr->pgstate == PGRES_POLLING_WRITING)
194 || (ptr->state == PG_SENDQUERY)
195 )
196 FD_SET(ptr->fd, writefds);
197 FD_SET(ptr->fd, readfds);
198 if (*nfds <= ptr->fd)
199 *nfds = ptr->fd + 1;
200 }
201 } /* pg_setfds() */
202
203 /*-------------------------------------------------------------------------*/
204 static dbconn_t *
find_current_connection(object_t * obj)205 find_current_connection (object_t * obj)
206
207 /* Find the <dbconn> which has a callback in the object <obj>.
208 */
209
210 {
211 dbconn_t *ptr = head;
212
213 while (ptr && callback_object(&ptr->callback) != obj)
214 ptr = ptr->next;
215
216 return ptr;
217 } /* find_current_connection() */
218
219 /*=========================================================================*/
220
221 /* Queue management */
222
223 /*-------------------------------------------------------------------------*/
224 static query_queue_t *
queue(dbconn_t * db,const char * query)225 queue (dbconn_t *db, const char *query)
226
227 /* Create a new query_queue entry for <query>, link it into the list
228 * for database connection <db>, and return it.
229 * The query string is duplicated.
230 *
231 * Throw an error when out of memory.
232 */
233
234 {
235 query_queue_t *tmp;
236
237 tmp = db->queue;
238 if (!tmp)
239 tmp = db->queue = xalloc(sizeof(*tmp));
240 else
241 {
242 while (tmp->next)
243 tmp = tmp->next;
244 tmp->next = xalloc(sizeof(*tmp));
245 tmp = tmp->next;
246 }
247
248 if (!tmp)
249 {
250 outofmemory("new Postgres query");
251 /* NOTREACHED */
252 return NULL;
253 }
254
255 memset(tmp, 0, sizeof(*tmp));
256 tmp->id = query_id++;
257 tmp->str = string_copy(query);
258 tmp->flags = 0;
259
260 return tmp;
261 } /* queue() */
262
263 /*-------------------------------------------------------------------------*/
264 static void
dequeue(dbconn_t * db)265 dequeue (dbconn_t *db)
266
267 /* Unqueue the first query from connection <db> and deallocate it.
268 */
269
270 {
271 query_queue_t *tmp;
272
273 tmp = db->queue;
274 db->queue = tmp->next;
275 if (tmp->str)
276 xfree(tmp->str);
277 xfree(tmp);
278 } /* dequeue() */
279
280 /*-------------------------------------------------------------------------*/
281 static dbconn_t *
alloc_dbconn(void)282 alloc_dbconn (void)
283
284 /* Allocate a new database connection structure, link it into the global
285 * list and return it.
286 *
287 * Throw an error when out of memory.
288 */
289
290 {
291 dbconn_t *ret;
292
293 memsafe(ret = xalloc(sizeof(*ret)), sizeof(*ret), "new DB connection");
294
295 memset(ret, 0, sizeof(*ret));
296 ret->next = head;
297 ret->fd = -1;
298 ret->state = PG_UNCONNECTED;
299 head = ret;
300
301 return ret;
302 } /* alloc_dbconn() */
303
304 /*-------------------------------------------------------------------------*/
305 static void
dealloc_dbconn(dbconn_t * del)306 dealloc_dbconn (dbconn_t *del)
307
308 /* Unlink the database connection <del> from the list and deallocate it
309 * and all resources held by it.
310 */
311
312 {
313 dbconn_t *ptr;
314
315 if (!del)
316 return;
317
318 free_callback(&del->callback);
319
320 if (del == head)
321 head = head->next;
322 else
323 {
324 ptr = head;
325 while (ptr->next && (ptr->next != del))
326 ptr = ptr->next;
327 if (ptr->next)
328 ptr->next = ptr->next->next;
329 }
330 while (del->queue)
331 dequeue(del);
332 xfree(del);
333 } /* dealloc_dbconn() */
334
335 /*=========================================================================*/
336
337 /* Connection management */
338
339 /*-------------------------------------------------------------------------*/
340 static int
pgconnect(dbconn_t * db,char * connstr)341 pgconnect (dbconn_t *db, char *connstr)
342
343 /* Connect <db> to a database, using <connstr> for the connection parameters.
344 * Return 0 on success, and -1 on failure.
345 */
346
347 {
348 db->conn = PQconnectStart(connstr);
349 if (!db->conn)
350 return -1;
351
352 if (PQstatus(db->conn) == CONNECTION_BAD)
353 return -1;
354
355 PQsetNoticeProcessor(db->conn, (void*) pgnotice, db);
356 db->fd = PQsocket(db->conn);
357 db->state = PG_CONNECTING;
358 db->pgstate = PGRES_POLLING_WRITING;
359 return 0;
360 } /* pgconnect() */
361
362 /*-------------------------------------------------------------------------*/
363 static void
pgclose(dbconn_t * pgconn)364 pgclose (dbconn_t *pgconn)
365
366 /* Close the database connection of <pgconn>.
367 */
368
369 {
370 pgconn->state = PG_UNCONNECTED;
371 if (pgconn->conn)
372 PQfinish(pgconn->conn);
373 pgconn->conn = NULL;
374 pgconn->fd = -1;
375 } /* pgclose() */
376
377 /*-------------------------------------------------------------------------*/
378 static void
pgreset(dbconn_t * pgconn)379 pgreset (dbconn_t *pgconn)
380
381 /* Reset the connection to <pgconn>.
382 */
383
384 {
385 if (!PQresetStart(pgconn->conn))
386 {
387 pgclose(pgconn);
388
389 if (callback_object(&pgconn->callback))
390 {
391 push_number(inter_sp, PGCONN_ABORTED);
392 push_ref_string(inter_sp, STR_PG_RESET_FAILED);
393 (void)apply_callback(&pgconn->callback, 2);
394 }
395 return;
396 }
397
398 pgconn->state = PG_RESETTING;
399 pgconn->pgstate = PGRES_POLLING_WRITING;
400 } /* pgreset() */
401
402
403 /*=========================================================================*/
404
405 /* Database Result Handling */
406
407 /*-------------------------------------------------------------------------*/
408 static void
pgnotice(dbconn_t * pgconn,const char * msg)409 pgnotice (dbconn_t *pgconn, const char *msg)
410
411 /* Database connection <pgconn> wishes to send <msg> to the controlling
412 * object.
413 */
414
415 {
416 current_object = callback_object(&pgconn->callback);
417 command_giver = 0;
418 current_interactive = 0;
419
420 if (current_object != NULL)
421 {
422 push_number(inter_sp, PGRES_NOTICE);
423 push_c_string(inter_sp, msg);
424 (void)apply_callback(&pgconn->callback, 2);
425 }
426 else
427 {
428 debug_message("%s PG connection object destructed.\n", time_stamp());
429 pgreset(pgconn);
430 }
431 } /* pgnotice() */
432
433 /*-------------------------------------------------------------------------*/
434 static void
pgresult(dbconn_t * pgconn,PGresult * res)435 pgresult (dbconn_t *pgconn, PGresult *res)
436
437 /* The most recent query on <pgconn> returned the result <res>. Encode it into
438 * a nice LPC data package and send it to the controlling object.
439 * The query is removed from <pgconn>.
440 */
441
442 {
443 int type;
444
445 current_object = callback_object(&pgconn->callback);
446 command_giver = 0;
447 current_interactive = 0;
448
449 type = PQresultStatus(res);
450
451 push_number(inter_sp, type);
452
453 switch (type)
454 {
455 case PGRES_TUPLES_OK:
456 {
457 int nfields, ntuples, i, j;
458
459 nfields = PQnfields(res);
460 ntuples = PQntuples(res);
461
462 if (pgconn->queue->flags & PG_RESULT_MAP)
463 {
464 /* Return the result as mapping */
465
466 mapping_t *map;
467
468 if (max_mapping_size
469 && (nfields * (ntuples + 1)) > (p_int)max_mapping_size)
470 {
471 PQclear(res);
472 dequeue(pgconn);
473 errorf("Query result exceeded mappingsize limit.\n");
474 }
475
476 if (max_mapping_keys && nfields > (p_int)max_mapping_keys)
477 {
478 PQclear(res);
479 dequeue(pgconn);
480 errorf("Query result exceeded mappingsize limit.\n");
481 }
482
483 map = allocate_mapping(nfields, ntuples);
484 if (!map)
485 {
486 push_number(inter_sp, 0);
487 break;
488 }
489
490 for (i = 0; i < nfields; i++)
491 {
492 svalue_t * entry, fname;
493
494 put_c_string(&fname, PQfname(res, i));
495 entry = get_map_lvalue(map, &fname);
496 free_svalue(&fname);
497 if (!entry)
498 break;
499 for (j = 0; j < ntuples; j++)
500 put_c_string(&entry[j], PQgetvalue(res, j, i));
501 }
502
503 push_mapping(inter_sp, map);
504 }
505 else
506 {
507 /* Return the result as array of arrays */
508
509 vector_t * array;
510 svalue_t * entry;
511
512 if (max_array_size
513 && ( (ntuples >= (p_int)max_array_size)
514 || (nfields >= (p_int)max_array_size))
515 )
516 {
517 PQclear(res);
518 dequeue(pgconn);
519 errorf("Query result exceeded array limit.\n");
520 }
521
522 array = allocate_array(ntuples+1);
523 if (!array)
524 {
525 push_number(inter_sp, 0);
526 break;
527 }
528
529 entry = &array->item[0];
530 put_array(entry, allocate_array(nfields));
531 for (j = 0; j < nfields; j++)
532 put_c_string(&entry->u.vec->item[j], PQfname(res, j));
533
534 for (i = 0; i < ntuples; i++)
535 {
536 entry = &array->item[i+1];
537 put_array(entry, allocate_array(nfields));
538 for (j = 0; j < nfields; j++)
539 put_c_string(&entry->u.vec->item[j], PQgetvalue(res, i, j));
540 }
541 push_array(inter_sp, array);
542 }
543 break;
544 }
545
546 case PGRES_COMMAND_OK:
547 push_c_string(inter_sp, PQcmdStatus(res));
548 break;
549
550 case PGRES_BAD_RESPONSE:
551 push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
552 break;
553
554 case PGRES_FATAL_ERROR:
555 case PGRES_NONFATAL_ERROR:
556 push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
557 break;
558
559 default:
560 PQclear(res);
561 return;
562 }
563
564 if (callback_object(&pgconn->callback))
565 {
566 push_number(inter_sp, pgconn->queue->id);
567 (void)apply_callback(&pgconn->callback, 3);
568 }
569 else
570 {
571 debug_message("%s PG connection object destructed.\n", time_stamp());
572 pgreset(pgconn);
573 }
574
575 dequeue(pgconn);
576 PQclear(res);
577 } /* pgresult() */
578
579
580 /*=========================================================================*/
581
582 /* Database Connection Handling */
583
584 /*-------------------------------------------------------------------------*/
585 static void
pg_process_connect_reset(dbconn_t * pgconn)586 pg_process_connect_reset (dbconn_t *pgconn)
587
588 /* Reset the connection <pgconn>.
589 */
590
591 {
592 int reset;
593 PostgresPollingStatusType (*pqpollhandler)(PGconn *);
594
595 reset = (pgconn->state == PG_RESETTING);
596
597 if (reset)
598 debug_message("%s PGSQL Connection resetting.\n", time_stamp());
599
600 pqpollhandler = reset ? PQresetPoll : PQconnectPoll;
601
602 if (pgconn->pgstate != PGRES_POLLING_ACTIVE)
603 {
604 int rc;
605
606 #ifdef HAVE_POLL
607 struct pollfd ufd;
608
609 ufd.fd = pgconn->fd;
610 #else
611 fd_set readfds, writefds;
612
613 FD_ZERO(&readfds);
614 FD_ZERO(&writefds);
615 #endif /* HAVE_POLL */
616
617 switch (pgconn->pgstate)
618 {
619 case PGRES_POLLING_READING:
620 #ifdef HAVE_POLL
621 ufd.events = POLLIN;
622 #else
623 FD_SET(pgconn->fd, &readfds);
624 #endif /* HAVE_POLL */
625 break;
626
627 case PGRES_POLLING_WRITING:
628 #ifdef HAVE_POLL
629 ufd.events = POLLOUT;
630 #else
631 FD_SET(pgconn->fd, &writefds);
632 #endif /* HAVE_POLL */
633 break;
634
635 default:
636 /* Shouldn't happen */
637 break;
638 }
639
640 #ifdef HAVE_POLL
641 do {
642 rc = poll(&ufd, 1, 0);
643 } while (rc < 0 && errno == EINTR);
644
645 if (rc > 0)
646 {
647 pgconn->pgstate = PGRES_POLLING_ACTIVE;
648 if (ufd.revents & POLLIN)
649 pgconn->lastreply = time(NULL);
650 }
651 #else
652 do {
653 struct timeval timeout;
654
655 timeout.tv_sec = 0;
656 timeout.tv_usec = 0;
657 rc = select(pgconn->fd+1, &readfds, &writefds, NULL, &timeout);
658 if (rc >= 0 || errno != EINTR)
659 break;
660 } while (rc < 0 && errno == EINTR);
661
662 if (rc > 0)
663 {
664 pgconn->pgstate = PGRES_POLLING_ACTIVE;
665 if (FD_ISSET(pgconn->fd, &readfds))
666 pgconn->lastreply = time(NULL);
667 }
668 #endif /* HAVE_POLL */
669 }
670
671 if (pgconn->pgstate == PGRES_POLLING_ACTIVE)
672 pgconn->pgstate = pqpollhandler(pgconn->conn);
673
674 if (pgconn->pgstate == PGRES_POLLING_FAILED)
675 {
676 if (reset && (pgconn->resets < MAX_RESETS))
677 {
678 pgconn->resets++;
679 pgconn->state = PG_RESET_NEXT;
680 pgconn->lastreset = time(NULL);
681 }
682 else
683 {
684 if (callback_object(&pgconn->callback))
685 {
686 push_number(inter_sp, reset ? PGCONN_ABORTED : PGCONN_FAILED);
687 push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
688 (void)apply_callback(&pgconn->callback, 2);
689 }
690 else
691 {
692 debug_message("%s PG connection object destructed.\n", time_stamp());
693 }
694 pgclose(pgconn);
695 }
696 }
697 else if (pgconn->pgstate == PGRES_POLLING_OK)
698 {
699 if (!reset)
700 {
701 /* The program should not notice a successful reset */
702 if (callback_object(&pgconn->callback))
703 {
704 push_number(inter_sp, PGCONN_SUCCESS);
705 push_ref_string(inter_sp, STR_SUCCESS);
706 (void)apply_callback(&pgconn->callback, 2);
707 }
708 else
709 {
710 debug_message("%s PG connection object destructed.\n", time_stamp());
711 pgreset(pgconn);
712 }
713 }
714 pgconn->resets = 0;
715 if (pgconn->queue)
716 pgconn->state = PG_SENDQUERY;
717 else
718 pgconn->state = PG_IDLE;
719 }
720 } /* pg_process_connect_reset() */
721
722 /*-------------------------------------------------------------------------*/
723 static void
pg_process_query(dbconn_t * pgconn)724 pg_process_query (dbconn_t *pgconn)
725
726 /* Query the connection <pgconn> for data and act on it.
727 */
728
729 {
730 int rc;
731
732 #ifdef HAVE_POLL
733 struct pollfd ufd;
734
735 ufd.fd = pgconn->fd;
736 ufd.events = POLLIN;
737 #else
738 struct fd_set readfds;
739
740 FD_ZERO(&readfds);
741 FD_SET(pgconn->fd, &readfds);
742
743 #endif /* HAVE_POLL */
744
745 PQflush(pgconn->conn);
746
747 #ifdef HAVE_POLL
748
749 do {
750 rc = poll(&ufd, 1, 0);
751 } while (rc < 0 && errno == EINTR);
752
753 #else
754
755 do {
756 struct timeval timeout;
757
758 timeout.tv_sec = 0;
759 timeout.tv_usec = 0;
760 rc = select(pgconn->fd+1, &readfds, NULL, NULL, &timeout);
761 } while (rc < 0 && errno == EINTR);
762
763 #endif /* HAVE_POLL */
764
765 if (rc > 0)
766 {
767 pgconn->lastreply = time(NULL);
768 PQconsumeInput(pgconn->conn);
769 if (!PQisBusy(pgconn->conn))
770 pgconn->state = PG_REPLYREADY;
771 }
772 } /* pg_process_query() */
773
774 /*-------------------------------------------------------------------------*/
775 static void
pg_process_one(dbconn_t * pgconn)776 pg_process_one (dbconn_t *pgconn)
777
778 /* Check the state of <pgconn> and take appropriate action.
779 */
780
781 {
782 PGresult *res;
783
784 switch (pgconn->state)
785 {
786 case PG_CONNECTING:
787 case PG_RESETTING:
788 pg_process_connect_reset(pgconn);
789 break;
790
791 case PG_SENDQUERY:
792 case PG_IDLE:
793 if (pgconn->queue)
794 {
795 pgconn->lastreply = time(NULL);
796 PQsendQuery(pgconn->conn, pgconn->queue->str);
797 pgconn->state = PG_WAITREPLY;
798 }
799 break;
800
801 case PG_WAITREPLY:
802 pg_process_query(pgconn);
803 break;
804
805 case PG_UNCONNECTED:
806 dealloc_dbconn(pgconn);
807 break;
808
809 case PG_RESET_NEXT:
810 if (pgconn->lastreset != time(NULL))
811 pgreset(pgconn);
812 break;
813 } /* switch() */
814
815 /* Validate the connection */
816 if ((PQstatus(pgconn->conn) != CONNECTION_OK)
817 && (pgconn->state >= PG_IDLE)
818 )
819 pgreset(pgconn);
820
821 /* If there is a result waiting, get it and forward
822 * it to the controlling object.
823 */
824 if (pgconn->state == PG_REPLYREADY)
825 {
826 do
827 {
828 res = PQgetResult(pgconn->conn);
829 if (!res)
830 {
831 pgconn->state = PG_IDLE;
832 pg_process_one(pgconn);
833 break;
834 } else
835 pgresult(pgconn, res);
836 }
837 while (!PQisBusy(pgconn->conn));
838 }
839 } /* pg_process_one() */
840
841 /*-------------------------------------------------------------------------*/
842 void
pg_process_all(void)843 pg_process_all (void)
844
845 /* Called from the get_message() loop in comm.c, this function checks
846 * all known database connections for their status and takes appropriate
847 * actions.
848 */
849
850 {
851 dbconn_t *ptr = head;
852 Bool got_dead = MY_FALSE;
853
854 while (ptr)
855 {
856 if (!callback_object(&ptr->callback)
857 && ptr->state != PG_UNCONNECTED
858 )
859 {
860 debug_message("%s PG connection object destructed.\n", time_stamp());
861 pgclose(ptr);
862 got_dead = MY_TRUE;
863 }
864 else
865 pg_process_one(ptr);
866 ptr = ptr->next;
867 }
868
869 if (got_dead)
870 pg_purge_connections();
871 } /* pg_process_all() */
872
873
874 /*-------------------------------------------------------------------------*/
875 void
pg_purge_connections(void)876 pg_purge_connections (void)
877
878 /* Check the list of database connections and purge all UNCONNECTED
879 * connections and those with destructed callback objects.
880 */
881
882 {
883 dbconn_t *prev;
884
885 while (head)
886 {
887 if (head->state != PG_UNCONNECTED
888 && !callback_object(&head->callback)
889 )
890 {
891 debug_message("%s PG connection object destructed.\n", time_stamp());
892 pgclose(head);
893 }
894 if (head->state == PG_UNCONNECTED)
895 dealloc_dbconn(head);
896 else
897 break;
898 }
899
900 if (head)
901 {
902 prev = head;
903 while (prev->next)
904 {
905 if (prev->next->state != PG_UNCONNECTED
906 && !callback_object(&prev->next->callback)
907 )
908 {
909 debug_message("%s PG connection object destructed.\n", time_stamp());
910 pgclose(prev->next);
911 }
912 if (prev->next->state == PG_UNCONNECTED)
913 dealloc_dbconn(prev->next);
914 else
915 prev = prev->next;
916 }
917 }
918 } /* pg_purge_connections() */
919
920 /*-------------------------------------------------------------------------*/
921 static
check_privilege(const char * efun_name,Bool raise_error,svalue_t * sp)922 Bool check_privilege (const char * efun_name, Bool raise_error, svalue_t * sp)
923
924 /* Check if the user has the privileges to execute efun <efun_name>.
925 * The function executes a call to master->privilege_violation("mysql",
926 * efun_name) and evaluates the result.
927 * If the master result is TRUE, the function returns TRUE.
928 * If the master result is FALSE, the function returns FALSE if <raise_error>
929 * is FALSE, and raises an error if <raise_error> is true.
930 */
931
932 {
933 Bool rc;
934
935 inter_sp = sp+1;
936 put_c_string(inter_sp, efun_name);
937 rc = privilege_violation(STR_PGSQL, inter_sp, inter_sp);
938 free_svalue(inter_sp);
939 inter_sp--;
940
941 if (rc)
942 return MY_TRUE;
943
944 if (raise_error)
945 {
946 errorf("%s(): Privilege violation.\n", efun_name);
947 /* NOTREACHED */
948 }
949
950 return MY_FALSE;
951 } /* check_privilege() */
952
953
954
955 /*=========================================================================*/
956
957 /* EFUNS */
958
959 /*-------------------------------------------------------------------------*/
960 svalue_t *
v_pg_connect(svalue_t * sp,int num_arg)961 v_pg_connect (svalue_t *sp, int num_arg)
962
963 /* EFUN pg_connect()
964 *
965 * int pg_connect (string conn, string fun)
966 * int pg_connect (string conn, string fun, string|object obj, mixed extra, ...)
967 * int pg_connect (string conn, closure cl, mixed extra, ...)
968 *
969 * Open a database connection as directed by <conn>, and assign the
970 * callback function <fun>/<cl> with the optional <extra> parameters to
971 * it.
972 *
973 * The object holding the callback function becomes the controlling object;
974 * obiously it is an error to assign more than one connection to the same
975 * controlling object.
976 *
977 * The <conn> string is in the format accepted by Postgres' PQconnectStart()
978 * API functions. Pass an empty string to use the default options, or
979 * a string holding the '<key>=<value>' options separated by whitespace.
980 * The most useful options are:
981 * dbname: The database name
982 * user: The user name to connect as.
983 * password: Password to be used.
984 *
985 * Return 0 on success, and -1 on failure.
986 */
987
988 {
989 dbconn_t *db;
990 int st;
991 int error_index;
992 callback_t cb;
993 object_t *cb_object;
994 svalue_t *arg = sp - num_arg + 1;
995
996 check_privilege(instrs[F_PG_CONNECT].name, MY_TRUE, sp);
997
998 /* Get the callback information */
999
1000 error_index = setup_efun_callback(&cb, arg+1, num_arg-1);
1001
1002 if (error_index >= 0)
1003 {
1004 vefun_bad_arg(error_index+2, arg);
1005 /* NOTREACHED */
1006 return arg;
1007 }
1008 inter_sp = sp = arg+1;
1009 put_callback(sp, &cb);
1010
1011 cb_object = callback_object(&cb);
1012 if (!cb_object)
1013 {
1014 free_callback(&cb);
1015 errorf("pgconnect(): Callback object is destructed.\n");
1016 /* NOTREACHED */
1017 return arg;
1018 }
1019
1020 /* Check the callback object if it has a connection already */
1021
1022 db = find_current_connection(cb_object);
1023 if (db)
1024 {
1025 if (db->state == PG_UNCONNECTED)
1026 dealloc_dbconn(db);
1027 else
1028 {
1029 free_callback(&cb);
1030 errorf("pgconnect(): Already connected\n");
1031 /* NOTREACHED */
1032 return arg;
1033 }
1034 }
1035
1036 /* Connect to the database */
1037
1038 db = alloc_dbconn();
1039 db->callback = cb;
1040
1041 st = pgconnect(db, get_txt(arg[0].u.str));
1042 if (st < 0)
1043 pgclose(db);
1044
1045 free_svalue(arg); /* the callback entries are gone already */
1046 put_number(arg, st);
1047 return arg;
1048 } /* f_pg_connect() */
1049
1050 /*-------------------------------------------------------------------------*/
1051 svalue_t *
f_pg_pending(svalue_t * sp)1052 f_pg_pending (svalue_t *sp)
1053
1054 /* EFUN pg_pending()
1055 *
1056 * int pg_pending ()
1057 * int pg_pending (object obj)
1058 *
1059 * Return the number of pending queries for the connection on the given
1060 * object <obj> (default is the current object). The object has no
1061 * database connection, return -1.
1062 */
1063
1064 {
1065 dbconn_t *db;
1066 int count = -1;
1067
1068 check_privilege(instrs[F_PG_PENDING].name, MY_TRUE, sp);
1069
1070 db = find_current_connection(sp->u.ob);
1071 if (db)
1072 {
1073 query_queue_t * qu;
1074
1075 for (count = 0, qu = db->queue
1076 ; qu != NULL
1077 ; count++, qu = qu->next
1078 ) NOOP;
1079 }
1080
1081 free_svalue(sp);
1082 put_number(sp, count);
1083 return sp;
1084 } /* f_pg_pending() */
1085
1086 /*-------------------------------------------------------------------------*/
1087 svalue_t *
v_pg_query(svalue_t * sp,int numarg)1088 v_pg_query (svalue_t *sp, int numarg)
1089
1090 /* EFUN pg_query()
1091 *
1092 * int pg_query (string query)
1093 * int pg_query (string query, int flags)
1094 *
1095 * Queue a new query <query> to the database connection on the current
1096 * object. Return the unique id of the query. The query result itself
1097 * will be passed as argument to the callback function.
1098 *
1099 * <flags> can be one of these values:
1100 * PG_RESULT_ARRAY: Pass the query result as array.
1101 * PG_RESULT_MAP: Pass the query result as mapping.
1102 */
1103
1104 {
1105 dbconn_t *db;
1106 query_queue_t *q;
1107 int flags = PG_RESULT_ARRAY;
1108
1109 check_privilege(instrs[F_PG_QUERY].name, MY_TRUE, sp);
1110
1111 if (numarg == 2)
1112 {
1113 flags = sp->u.number;
1114 sp--;
1115 }
1116
1117 db = find_current_connection(current_object);
1118 if (!db)
1119 errorf("pgquery(): not connected\n");
1120
1121 q = queue(db, get_txt(sp->u.str));
1122 q->flags = flags;
1123 if (db->state == PG_IDLE)
1124 db->state = PG_SENDQUERY;
1125
1126 free_svalue(sp);
1127 put_number(sp, q->id);
1128 return sp;
1129 } /* f_pg_query() */
1130
1131 /*-------------------------------------------------------------------------*/
1132 svalue_t *
f_pg_close(svalue_t * sp)1133 f_pg_close (svalue_t *sp)
1134
1135 /* EFUN pg_close()
1136 *
1137 * void pg_close()
1138 *
1139 * Close the database connection for the current object, if there is one.
1140 */
1141
1142 {
1143 dbconn_t *db;
1144
1145 check_privilege(instrs[F_PG_CLOSE].name, MY_TRUE, sp);
1146
1147 db = find_current_connection(current_object);
1148 if (db)
1149 pgclose(db);
1150
1151 return sp;
1152 } /* f_pg_close() */
1153
1154 /*-------------------------------------------------------------------------*/
1155 svalue_t *
f_pg_conv_string(svalue_t * sp)1156 f_pg_conv_string (svalue_t *sp)
1157
1158 /* EFUN pg_escapeString
1159 *
1160 * string pg_conv_string(string input)
1161 *
1162 * Escape a string for use within an SQL command.
1163 */
1164 {
1165 string_t *escaped;
1166 int size = mstrsize(sp->u.str);
1167 memsafe(escaped = alloc_mstring(2 * size), 2 * size
1168 , "escaped sql string");
1169
1170 // PQescapeString(char *to, char *from, size_t length);
1171 PQescapeString( (unsigned char *)get_txt(escaped)
1172 , (unsigned char *)get_txt(sp->u.str), size);
1173 free_string_svalue(sp);
1174 put_string(sp, escaped);
1175 return sp;
1176 } /* pg_conv_string() */
1177
1178 /*=========================================================================*/
1179
1180 /* GC SUPPORT */
1181
1182 #ifdef GC_SUPPORT
1183
1184 /*-------------------------------------------------------------------------*/
1185 void
pg_clear_refs(void)1186 pg_clear_refs (void)
1187
1188 /* GC Support: Clear all references from the database connections
1189 */
1190
1191 {
1192 dbconn_t *dbconn;
1193
1194 for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
1195 {
1196 clear_ref_in_callback(&(dbconn->callback));
1197 }
1198 } /* pg_clear_refs() */
1199
1200 /*-------------------------------------------------------------------------*/
1201 void
pg_count_refs(void)1202 pg_count_refs (void)
1203
1204 /* GC Support: Count all references from the database connections
1205 */
1206
1207 {
1208 dbconn_t *dbconn;
1209
1210 for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
1211 {
1212 query_queue_t *qu;
1213
1214 note_malloced_block_ref(dbconn);
1215 count_ref_in_callback(&(dbconn->callback));
1216
1217 for (qu = dbconn->queue; qu != NULL; qu = qu->next)
1218 {
1219 note_malloced_block_ref(qu);
1220 note_malloced_block_ref(qu->str);
1221 }
1222 }
1223 } /* pg_count_refs() */
1224
1225 #endif /* GC_SUPPORT */
1226
1227 /*-------------------------------------------------------------------------*/
1228
1229 #endif /* USE_PGSQL */
1230
1231 /*************************************************************************/
1232