1 /*---------------------------------------------------------------------------
2  * PostgreSQL Database package.
3  *
4  * Based on code written and donated 2001 by Florian Heinz.
5  *---------------------------------------------------------------------------
6  * The interface to the PostgreSQL database is implemented through the
7  * concept of a controlling object: when opening a database connection,
8  * the LPC code has to provide a callback function. The object this function
9  * is bound to is the controlling object: all queries to the database
10  * will be issued by this object, and the responses will be sent to the
11  * callback function.
12  *
13  * The interface is also asynchronous: the pg_query() efun just queues
14  * the query with the database connection, and returns immediately. When
15  * the database has finished working the query, the callback function is
16  * called with the results.
17  *
18  * The callback function can be defined by name or by closure, and can
19  * be defined with extra parameters:
20  *
21  *   void <callback>(int type, mixed ret, int id [, mixed extra...])
22  *
23  *     <type> is the type of the call, <id> identifies the query for which
24  *     this call is executed:
25  *
26  *       PGRES_TUPLES_OK: <ret> is the result from a query.
27  *                        It is either a mapping (field name as key, indexing
28  *                        <n> values for n returned tuples), or an array
29  *                        of arrays (one per row).
30  *
31  *       PGRES_COMMAND_OK: <ret> is a string which contains the
32  *                         server response (e.g. on INSERT or DELETE)
33  *
34  *       PGRES_BAD_RESPONSE,
35  *       PGRES_NONFATAL_ERROR,
36  *       PGRES_FATAL_ERROR: ret is the error-string
37  *
38  *
39  *   void <callback>(int type, mixed ret [, mixed extra...])
40  *
41  *     <type> is the type of the call, which is not related a specific query:
42  *
43  *       PGCONN_SUCCESS: The database-connection was established, <ret> is
44  *                       a dummy string.
45  *       PGCONN_FAILED:  The database-connection failed, <ret> is the error
46  *                       message.
47  *         The first message to the callback after a call to pg_connect()
48  *         is always one of these two.
49  *
50  *       PGRES_NOTICE: <ret> is a informational text.
51  *
52  *       PGCONN_ABORTED: If the connection to the backend fails
53  *                       we try to re-establish (reset) it. If the
54  *                       reset fails, the connection is closed and
55  *                       this value is returned. Consider the
56  *                       connection gone and don't try to close or
57  *                       otherwise operate further on it.
58  *                       <ret> is a dummy string.
59  *---------------------------------------------------------------------------
60  */
61 
62 #include "driver.h"
63 
64 #ifdef USE_PGSQL
65 
66 #include "typedefs.h"
67 
68 #include "my-alloca.h"
69 #include <errno.h>
70 #include <stddef.h>
71 #include <time.h>
72 #include <unistd.h>
73 
74 #ifdef HAVE_POLL
75 
76 #include <sys/poll.h>
77 
78 #else
79 
80 #    if defined(_AIX)
81 #        include <sys/select.h>
82 #    endif
83 
84 #endif /* HAVE_POLL */
85 
86 #include <libpq-fe.h>
87 
88 #include "pkg-pgsql.h"
89 
90 #include "actions.h"
91 #include "array.h"
92 #include "gcollect.h"
93 #include "instrs.h"
94 #include "interpret.h"
95 #include "main.h"
96 #include "mapping.h"
97 #include "mstrings.h"
98 #include "simulate.h"
99 #include "stdstrings.h"
100 #include "xalloc.h"
101 
102 #include "../mudlib/sys/pgsql.h"
103 
104 /*-------------------------------------------------------------------------*/
105 
106 #define MAX_RESETS 5   /* Number of reset tries, at max. one per second */
107 
108 /*-------------------------------------------------------------------------*/
109 /* Types */
110 
111 /* --- struct query_queue_s: one entry in the query queue
112  * The queue is organized as single-linked list, one for each database
113  * connection.
114  */
115 struct query_queue_s
116 {
117    long                   id;
118    char                 * str;
119    int                    flags;
120    struct query_queue_s * next;
121 };
122 
123 /* ---  struct dbconn_s: one active database connection.
124  * The connections are held in a singly linked list.
125  * There can be only one connection per object.
126  */
127 
128 struct dbconn_s
129 {
130    callback_t                  callback;  /* The callback */
131 
132    PGconn                    * conn;
133    PostgresPollingStatusType   pgstate;
134 
135    int                         state;
136    int                         fd;
137    int                         resets;
138 
139    time_t                      lastreset;
140    time_t                      lastreply;
141 
142    struct query_queue_s      * queue;
143 
144    struct dbconn_s           * next;
145 };
146 
147 
148 /* Possible struct dbconn_s.states */
149 
150 #define PG_UNCONNECTED 0
151 #define PG_CONNECTING  1
152 #define PG_RESETTING   2
153 #define PG_RESET_NEXT  3
154 #define PG_IDLE        4
155 #define PG_SENDQUERY   5
156 #define PG_WAITREPLY   6
157 #define PG_REPLYREADY  7
158 
159 
160 typedef struct dbconn_s      dbconn_t;
161 typedef struct query_queue_s query_queue_t;
162 
163 /*-------------------------------------------------------------------------*/
164 
165 static dbconn_t *head = NULL;
166   /* The list of database connections.
167    */
168 
169 static long query_id = 1;
170   /* The query ID counter, used to generate unique IDs.
171    */
172 
173 /*-------------------------------------------------------------------------*/
174 /* Forward Declarations */
175 
176 static void pgnotice (dbconn_t *pgconn, const char *msg);
177 
178 /*-------------------------------------------------------------------------*/
179 void
pg_setfds(fd_set * readfds,fd_set * writefds,int * nfds)180 pg_setfds (fd_set *readfds, fd_set *writefds, int *nfds)
181 
182 /* Called from the get_message() loop in comm.c, this function has to add
183  * the fds of the database connections to the fd sets.
184  */
185 
186 {
187     dbconn_t *ptr;
188 
189     for (ptr = head; ptr != NULL; ptr = ptr->next)
190     {
191         if (ptr->fd < 0)
192             continue;
193         if ((ptr->pgstate == PGRES_POLLING_WRITING)
194          || (ptr->state == PG_SENDQUERY)
195            )
196             FD_SET(ptr->fd, writefds);
197         FD_SET(ptr->fd, readfds);
198         if (*nfds <= ptr->fd)
199             *nfds = ptr->fd + 1;
200     }
201 } /* pg_setfds() */
202 
203 /*-------------------------------------------------------------------------*/
204 static dbconn_t *
find_current_connection(object_t * obj)205 find_current_connection (object_t * obj)
206 
207 /* Find the <dbconn> which has a callback in the object <obj>.
208  */
209 
210 {
211    dbconn_t *ptr = head;
212 
213    while (ptr && callback_object(&ptr->callback) != obj)
214         ptr = ptr->next;
215 
216    return ptr;
217 } /* find_current_connection() */
218 
219 /*=========================================================================*/
220 
221 /*                             Queue management                            */
222 
223 /*-------------------------------------------------------------------------*/
224 static query_queue_t *
queue(dbconn_t * db,const char * query)225 queue (dbconn_t *db, const char *query)
226 
227 /* Create a new query_queue entry for <query>, link it into the list
228  * for database connection <db>, and return it.
229  * The query string is duplicated.
230  *
231  * Throw an error when out of memory.
232  */
233 
234 {
235     query_queue_t *tmp;
236 
237     tmp = db->queue;
238     if (!tmp)
239         tmp = db->queue = xalloc(sizeof(*tmp));
240     else
241     {
242         while (tmp->next)
243             tmp = tmp->next;
244         tmp->next = xalloc(sizeof(*tmp));
245         tmp = tmp->next;
246     }
247 
248     if (!tmp)
249     {
250         outofmemory("new Postgres query");
251         /* NOTREACHED */
252         return NULL;
253     }
254 
255     memset(tmp, 0, sizeof(*tmp));
256     tmp->id  = query_id++;
257     tmp->str = string_copy(query);
258     tmp->flags = 0;
259 
260     return tmp;
261 } /* queue() */
262 
263 /*-------------------------------------------------------------------------*/
264 static void
dequeue(dbconn_t * db)265 dequeue (dbconn_t *db)
266 
267 /* Unqueue the first query from connection <db> and deallocate it.
268  */
269 
270 {
271     query_queue_t *tmp;
272 
273     tmp = db->queue;
274     db->queue = tmp->next;
275     if (tmp->str)
276         xfree(tmp->str);
277     xfree(tmp);
278 } /* dequeue() */
279 
280 /*-------------------------------------------------------------------------*/
281 static dbconn_t *
alloc_dbconn(void)282 alloc_dbconn (void)
283 
284 /* Allocate a new database connection structure, link it into the global
285  * list and return it.
286  *
287  * Throw an error when out of memory.
288  */
289 
290 {
291     dbconn_t *ret;
292 
293     memsafe(ret = xalloc(sizeof(*ret)), sizeof(*ret), "new DB connection");
294 
295     memset(ret, 0, sizeof(*ret));
296     ret->next = head;
297     ret->fd = -1;
298     ret->state = PG_UNCONNECTED;
299     head = ret;
300 
301     return ret;
302 } /* alloc_dbconn() */
303 
304 /*-------------------------------------------------------------------------*/
305 static void
dealloc_dbconn(dbconn_t * del)306 dealloc_dbconn (dbconn_t *del)
307 
308 /* Unlink the database connection <del> from the list and deallocate it
309  * and all resources held by it.
310  */
311 
312 {
313     dbconn_t *ptr;
314 
315     if (!del)
316         return;
317 
318     free_callback(&del->callback);
319 
320     if (del == head)
321         head = head->next;
322     else
323     {
324         ptr = head;
325         while (ptr->next && (ptr->next != del))
326             ptr = ptr->next;
327         if (ptr->next)
328             ptr->next = ptr->next->next;
329     }
330     while (del->queue)
331       dequeue(del);
332     xfree(del);
333 } /* dealloc_dbconn() */
334 
335 /*=========================================================================*/
336 
337 /*                          Connection management                          */
338 
339 /*-------------------------------------------------------------------------*/
340 static int
pgconnect(dbconn_t * db,char * connstr)341 pgconnect (dbconn_t *db, char *connstr)
342 
343 /* Connect <db> to a database, using <connstr> for the connection parameters.
344  * Return 0 on success, and -1 on failure.
345  */
346 
347 {
348     db->conn = PQconnectStart(connstr);
349     if (!db->conn)
350         return -1;
351 
352     if (PQstatus(db->conn) == CONNECTION_BAD)
353         return -1;
354 
355     PQsetNoticeProcessor(db->conn, (void*) pgnotice, db);
356     db->fd = PQsocket(db->conn);
357     db->state = PG_CONNECTING;
358     db->pgstate = PGRES_POLLING_WRITING;
359     return 0;
360 } /* pgconnect() */
361 
362 /*-------------------------------------------------------------------------*/
363 static void
pgclose(dbconn_t * pgconn)364 pgclose (dbconn_t *pgconn)
365 
366 /* Close the database connection of <pgconn>.
367  */
368 
369 {
370     pgconn->state = PG_UNCONNECTED;
371     if (pgconn->conn)
372         PQfinish(pgconn->conn);
373     pgconn->conn = NULL;
374     pgconn->fd = -1;
375 } /* pgclose() */
376 
377 /*-------------------------------------------------------------------------*/
378 static void
pgreset(dbconn_t * pgconn)379 pgreset (dbconn_t *pgconn)
380 
381 /* Reset the connection to <pgconn>.
382  */
383 
384 {
385     if (!PQresetStart(pgconn->conn))
386     {
387         pgclose(pgconn);
388 
389         if (callback_object(&pgconn->callback))
390         {
391             push_number(inter_sp, PGCONN_ABORTED);
392             push_ref_string(inter_sp, STR_PG_RESET_FAILED);
393             (void)apply_callback(&pgconn->callback, 2);
394         }
395         return;
396     }
397 
398     pgconn->state = PG_RESETTING;
399     pgconn->pgstate = PGRES_POLLING_WRITING;
400 } /* pgreset() */
401 
402 
403 /*=========================================================================*/
404 
405 /*                     Database Result Handling                            */
406 
407 /*-------------------------------------------------------------------------*/
408 static void
pgnotice(dbconn_t * pgconn,const char * msg)409 pgnotice (dbconn_t *pgconn, const char *msg)
410 
411 /* Database connection <pgconn> wishes to send <msg> to the controlling
412  * object.
413  */
414 
415 {
416     current_object = callback_object(&pgconn->callback);
417     command_giver = 0;
418     current_interactive = 0;
419 
420     if (current_object != NULL)
421     {
422         push_number(inter_sp, PGRES_NOTICE);
423         push_c_string(inter_sp, msg);
424         (void)apply_callback(&pgconn->callback, 2);
425     }
426     else
427     {
428         debug_message("%s PG connection object destructed.\n", time_stamp());
429         pgreset(pgconn);
430     }
431 } /* pgnotice() */
432 
433 /*-------------------------------------------------------------------------*/
434 static void
pgresult(dbconn_t * pgconn,PGresult * res)435 pgresult (dbconn_t *pgconn, PGresult *res)
436 
437 /* The most recent query on <pgconn> returned the result <res>. Encode it into
438  * a nice LPC data package and send it to the controlling object.
439  * The query is removed from <pgconn>.
440  */
441 
442 {
443     int type;
444 
445     current_object = callback_object(&pgconn->callback);
446     command_giver = 0;
447     current_interactive = 0;
448 
449     type = PQresultStatus(res);
450 
451     push_number(inter_sp, type);
452 
453     switch (type)
454     {
455     case PGRES_TUPLES_OK:
456       {
457         int nfields, ntuples, i, j;
458 
459         nfields = PQnfields(res);
460         ntuples = PQntuples(res);
461 
462         if (pgconn->queue->flags & PG_RESULT_MAP)
463         {
464             /* Return the result as mapping */
465 
466             mapping_t *map;
467 
468             if (max_mapping_size
469              && (nfields * (ntuples + 1)) > (p_int)max_mapping_size)
470             {
471                 PQclear(res);
472                 dequeue(pgconn);
473                 errorf("Query result exceeded mappingsize limit.\n");
474             }
475 
476             if (max_mapping_keys && nfields > (p_int)max_mapping_keys)
477             {
478                 PQclear(res);
479                 dequeue(pgconn);
480                 errorf("Query result exceeded mappingsize limit.\n");
481             }
482 
483             map = allocate_mapping(nfields, ntuples);
484             if (!map)
485             {
486                 push_number(inter_sp, 0);
487                 break;
488             }
489 
490             for (i = 0; i < nfields; i++)
491             {
492                 svalue_t * entry, fname;
493 
494                 put_c_string(&fname, PQfname(res, i));
495                 entry = get_map_lvalue(map, &fname);
496                 free_svalue(&fname);
497                 if (!entry)
498                     break;
499                 for (j = 0; j < ntuples; j++)
500                     put_c_string(&entry[j], PQgetvalue(res, j, i));
501             }
502 
503             push_mapping(inter_sp, map);
504         }
505         else
506         {
507             /* Return the result as array of arrays */
508 
509             vector_t * array;
510             svalue_t * entry;
511 
512             if (max_array_size
513              && (   (ntuples >= (p_int)max_array_size)
514                  || (nfields >= (p_int)max_array_size))
515                )
516             {
517                 PQclear(res);
518                 dequeue(pgconn);
519                 errorf("Query result exceeded array limit.\n");
520             }
521 
522             array = allocate_array(ntuples+1);
523             if (!array)
524             {
525                 push_number(inter_sp, 0);
526                 break;
527             }
528 
529             entry = &array->item[0];
530             put_array(entry, allocate_array(nfields));
531             for (j = 0; j < nfields; j++)
532                 put_c_string(&entry->u.vec->item[j], PQfname(res, j));
533 
534             for (i = 0; i < ntuples; i++)
535             {
536                 entry = &array->item[i+1];
537                 put_array(entry, allocate_array(nfields));
538                 for (j = 0; j < nfields; j++)
539                     put_c_string(&entry->u.vec->item[j], PQgetvalue(res, i, j));
540             }
541             push_array(inter_sp, array);
542         }
543         break;
544       }
545 
546     case PGRES_COMMAND_OK:
547         push_c_string(inter_sp, PQcmdStatus(res));
548         break;
549 
550      case PGRES_BAD_RESPONSE:
551         push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
552         break;
553 
554      case PGRES_FATAL_ERROR:
555      case PGRES_NONFATAL_ERROR:
556         push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
557         break;
558 
559      default:
560         PQclear(res);
561         return;
562     }
563 
564     if (callback_object(&pgconn->callback))
565     {
566         push_number(inter_sp, pgconn->queue->id);
567         (void)apply_callback(&pgconn->callback, 3);
568     }
569     else
570     {
571         debug_message("%s PG connection object destructed.\n", time_stamp());
572         pgreset(pgconn);
573     }
574 
575     dequeue(pgconn);
576     PQclear(res);
577 } /* pgresult() */
578 
579 
580 /*=========================================================================*/
581 
582 /*                   Database Connection Handling                          */
583 
584 /*-------------------------------------------------------------------------*/
585 static void
pg_process_connect_reset(dbconn_t * pgconn)586 pg_process_connect_reset (dbconn_t *pgconn)
587 
588 /* Reset the connection <pgconn>.
589  */
590 
591 {
592     int reset;
593     PostgresPollingStatusType (*pqpollhandler)(PGconn *);
594 
595     reset = (pgconn->state == PG_RESETTING);
596 
597     if (reset)
598         debug_message("%s PGSQL Connection resetting.\n", time_stamp());
599 
600     pqpollhandler = reset ? PQresetPoll : PQconnectPoll;
601 
602     if (pgconn->pgstate != PGRES_POLLING_ACTIVE)
603     {
604         int rc;
605 
606 #ifdef HAVE_POLL
607         struct pollfd ufd;
608 
609         ufd.fd = pgconn->fd;
610 #else
611         fd_set readfds, writefds;
612 
613         FD_ZERO(&readfds);
614         FD_ZERO(&writefds);
615 #endif /* HAVE_POLL */
616 
617         switch (pgconn->pgstate)
618         {
619         case PGRES_POLLING_READING:
620 #ifdef HAVE_POLL
621            ufd.events = POLLIN;
622 #else
623            FD_SET(pgconn->fd, &readfds);
624 #endif /* HAVE_POLL */
625            break;
626 
627         case PGRES_POLLING_WRITING:
628 #ifdef HAVE_POLL
629            ufd.events = POLLOUT;
630 #else
631            FD_SET(pgconn->fd, &writefds);
632 #endif /* HAVE_POLL */
633            break;
634 
635         default:
636            /* Shouldn't happen */
637            break;
638         }
639 
640 #ifdef HAVE_POLL
641         do {
642             rc = poll(&ufd, 1, 0);
643         } while (rc < 0 && errno == EINTR);
644 
645         if (rc > 0)
646         {
647            pgconn->pgstate = PGRES_POLLING_ACTIVE;
648            if (ufd.revents & POLLIN)
649                pgconn->lastreply = time(NULL);
650         }
651 #else
652         do {
653             struct timeval timeout;
654 
655             timeout.tv_sec = 0;
656             timeout.tv_usec = 0;
657             rc = select(pgconn->fd+1, &readfds, &writefds, NULL, &timeout);
658             if (rc >= 0 || errno != EINTR)
659                 break;
660         } while (rc < 0 && errno == EINTR);
661 
662         if (rc > 0)
663         {
664            pgconn->pgstate = PGRES_POLLING_ACTIVE;
665            if (FD_ISSET(pgconn->fd, &readfds))
666                pgconn->lastreply = time(NULL);
667         }
668 #endif /* HAVE_POLL */
669     }
670 
671     if (pgconn->pgstate == PGRES_POLLING_ACTIVE)
672         pgconn->pgstate = pqpollhandler(pgconn->conn);
673 
674     if (pgconn->pgstate == PGRES_POLLING_FAILED)
675     {
676         if (reset && (pgconn->resets < MAX_RESETS))
677         {
678             pgconn->resets++;
679             pgconn->state = PG_RESET_NEXT;
680             pgconn->lastreset = time(NULL);
681         }
682         else
683         {
684             if (callback_object(&pgconn->callback))
685             {
686                 push_number(inter_sp, reset ? PGCONN_ABORTED : PGCONN_FAILED);
687                 push_c_string(inter_sp, PQerrorMessage(pgconn->conn));
688                 (void)apply_callback(&pgconn->callback, 2);
689             }
690             else
691             {
692                 debug_message("%s PG connection object destructed.\n", time_stamp());
693             }
694             pgclose(pgconn);
695         }
696     }
697     else if (pgconn->pgstate == PGRES_POLLING_OK)
698     {
699         if (!reset)
700         {
701            /* The program should not notice a successful reset */
702             if (callback_object(&pgconn->callback))
703             {
704                 push_number(inter_sp, PGCONN_SUCCESS);
705                 push_ref_string(inter_sp, STR_SUCCESS);
706                 (void)apply_callback(&pgconn->callback, 2);
707             }
708             else
709             {
710                 debug_message("%s PG connection object destructed.\n", time_stamp());
711                 pgreset(pgconn);
712             }
713         }
714         pgconn->resets = 0;
715         if (pgconn->queue)
716             pgconn->state = PG_SENDQUERY;
717         else
718             pgconn->state = PG_IDLE;
719     }
720 } /* pg_process_connect_reset() */
721 
722 /*-------------------------------------------------------------------------*/
723 static void
pg_process_query(dbconn_t * pgconn)724 pg_process_query (dbconn_t *pgconn)
725 
726 /* Query the connection <pgconn> for data and act on it.
727  */
728 
729 {
730     int rc;
731 
732 #ifdef HAVE_POLL
733     struct pollfd ufd;
734 
735     ufd.fd = pgconn->fd;
736     ufd.events = POLLIN;
737 #else
738     struct fd_set readfds;
739 
740     FD_ZERO(&readfds);
741     FD_SET(pgconn->fd, &readfds);
742 
743 #endif /* HAVE_POLL */
744 
745     PQflush(pgconn->conn);
746 
747 #ifdef HAVE_POLL
748 
749     do {
750         rc = poll(&ufd, 1, 0);
751     } while (rc < 0 && errno == EINTR);
752 
753 #else
754 
755     do {
756         struct timeval timeout;
757 
758         timeout.tv_sec = 0;
759         timeout.tv_usec = 0;
760         rc = select(pgconn->fd+1, &readfds, NULL, NULL, &timeout);
761     } while (rc < 0 && errno == EINTR);
762 
763 #endif /* HAVE_POLL */
764 
765     if (rc > 0)
766     {
767         pgconn->lastreply = time(NULL);
768         PQconsumeInput(pgconn->conn);
769         if (!PQisBusy(pgconn->conn))
770             pgconn->state = PG_REPLYREADY;
771     }
772 } /* pg_process_query() */
773 
774 /*-------------------------------------------------------------------------*/
775 static void
pg_process_one(dbconn_t * pgconn)776 pg_process_one (dbconn_t *pgconn)
777 
778 /* Check the state of <pgconn> and take appropriate action.
779  */
780 
781 {
782     PGresult *res;
783 
784     switch (pgconn->state)
785     {
786     case PG_CONNECTING:
787     case PG_RESETTING:
788         pg_process_connect_reset(pgconn);
789         break;
790 
791     case PG_SENDQUERY:
792     case PG_IDLE:
793         if (pgconn->queue)
794         {
795             pgconn->lastreply = time(NULL);
796             PQsendQuery(pgconn->conn, pgconn->queue->str);
797             pgconn->state = PG_WAITREPLY;
798         }
799         break;
800 
801     case PG_WAITREPLY:
802         pg_process_query(pgconn);
803         break;
804 
805     case PG_UNCONNECTED:
806         dealloc_dbconn(pgconn);
807         break;
808 
809     case PG_RESET_NEXT:
810         if (pgconn->lastreset != time(NULL))
811             pgreset(pgconn);
812         break;
813     } /* switch() */
814 
815     /* Validate the connection */
816     if ((PQstatus(pgconn->conn) != CONNECTION_OK)
817      && (pgconn->state >= PG_IDLE)
818        )
819         pgreset(pgconn);
820 
821     /* If there is a result waiting, get it and forward
822      * it to the controlling object.
823      */
824     if (pgconn->state == PG_REPLYREADY)
825     {
826         do
827         {
828             res = PQgetResult(pgconn->conn);
829             if (!res)
830             {
831                 pgconn->state = PG_IDLE;
832                 pg_process_one(pgconn);
833                 break;
834             } else
835                 pgresult(pgconn, res);
836         }
837         while (!PQisBusy(pgconn->conn));
838     }
839 } /* pg_process_one() */
840 
841 /*-------------------------------------------------------------------------*/
842 void
pg_process_all(void)843 pg_process_all (void)
844 
845 /* Called from the get_message() loop in comm.c, this function checks
846  * all known database connections for their status and takes appropriate
847  * actions.
848  */
849 
850 {
851     dbconn_t *ptr = head;
852     Bool got_dead = MY_FALSE;
853 
854     while (ptr)
855     {
856         if (!callback_object(&ptr->callback)
857          && ptr->state != PG_UNCONNECTED
858            )
859         {
860             debug_message("%s PG connection object destructed.\n", time_stamp());
861             pgclose(ptr);
862             got_dead = MY_TRUE;
863         }
864         else
865            pg_process_one(ptr);
866         ptr = ptr->next;
867     }
868 
869     if (got_dead)
870         pg_purge_connections();
871 } /* pg_process_all() */
872 
873 
874 /*-------------------------------------------------------------------------*/
875 void
pg_purge_connections(void)876 pg_purge_connections (void)
877 
878 /* Check the list of database connections and purge all UNCONNECTED
879  * connections and those with destructed callback objects.
880  */
881 
882 {
883     dbconn_t *prev;
884 
885     while (head)
886     {
887         if (head->state != PG_UNCONNECTED
888          && !callback_object(&head->callback)
889            )
890         {
891             debug_message("%s PG connection object destructed.\n", time_stamp());
892             pgclose(head);
893         }
894         if (head->state == PG_UNCONNECTED)
895             dealloc_dbconn(head);
896         else
897             break;
898     }
899 
900     if (head)
901     {
902         prev = head;
903         while (prev->next)
904         {
905             if (prev->next->state != PG_UNCONNECTED
906              && !callback_object(&prev->next->callback)
907                )
908             {
909                 debug_message("%s PG connection object destructed.\n", time_stamp());
910                 pgclose(prev->next);
911             }
912             if (prev->next->state == PG_UNCONNECTED)
913                 dealloc_dbconn(prev->next);
914             else
915                 prev = prev->next;
916         }
917     }
918 } /* pg_purge_connections() */
919 
920 /*-------------------------------------------------------------------------*/
921 static
check_privilege(const char * efun_name,Bool raise_error,svalue_t * sp)922 Bool check_privilege (const char * efun_name, Bool raise_error, svalue_t * sp)
923 
924 /* Check if the user has the privileges to execute efun <efun_name>.
925  * The function executes a call to master->privilege_violation("mysql",
926  * efun_name) and evaluates the result.
927  * If the master result is TRUE, the function returns TRUE.
928  * If the master result is FALSE, the function returns FALSE if <raise_error>
929  * is FALSE, and raises an error if <raise_error> is true.
930  */
931 
932 {
933     Bool rc;
934 
935     inter_sp = sp+1;
936     put_c_string(inter_sp, efun_name);
937     rc = privilege_violation(STR_PGSQL, inter_sp, inter_sp);
938     free_svalue(inter_sp);
939     inter_sp--;
940 
941     if (rc)
942         return MY_TRUE;
943 
944     if (raise_error)
945     {
946         errorf("%s(): Privilege violation.\n", efun_name);
947         /* NOTREACHED */
948     }
949 
950     return MY_FALSE;
951 } /* check_privilege() */
952 
953 
954 
955 /*=========================================================================*/
956 
957 /*                           EFUNS                                         */
958 
959 /*-------------------------------------------------------------------------*/
960 svalue_t *
v_pg_connect(svalue_t * sp,int num_arg)961 v_pg_connect (svalue_t *sp, int num_arg)
962 
963 /* EFUN pg_connect()
964  *
965  *   int pg_connect (string conn, string fun)
966  *   int pg_connect (string conn, string fun, string|object obj, mixed extra, ...)
967  *   int pg_connect (string conn, closure cl, mixed extra, ...)
968  *
969  * Open a database connection as directed by <conn>, and assign the
970  * callback function <fun>/<cl> with the optional <extra> parameters to
971  * it.
972  *
973  * The object holding the callback function becomes the controlling object;
974  * obiously it is an error to assign more than one connection to the same
975  * controlling object.
976  *
977  * The <conn> string is in the format accepted by Postgres' PQconnectStart()
978  * API functions. Pass an empty string to use the default options, or
979  * a string holding the '<key>=<value>' options separated by whitespace.
980  * The most useful options are:
981  *   dbname:   The database name
982  *   user:     The user name to connect as.
983  *   password: Password to be used.
984  *
985  * Return 0 on success, and -1 on failure.
986  */
987 
988 {
989     dbconn_t   *db;
990     int         st;
991     int         error_index;
992     callback_t  cb;
993     object_t   *cb_object;
994     svalue_t   *arg = sp - num_arg + 1;
995 
996     check_privilege(instrs[F_PG_CONNECT].name, MY_TRUE, sp);
997 
998     /* Get the callback information */
999 
1000     error_index = setup_efun_callback(&cb, arg+1, num_arg-1);
1001 
1002     if (error_index >= 0)
1003     {
1004         vefun_bad_arg(error_index+2, arg);
1005         /* NOTREACHED */
1006         return arg;
1007     }
1008     inter_sp = sp = arg+1;
1009     put_callback(sp, &cb);
1010 
1011     cb_object = callback_object(&cb);
1012     if (!cb_object)
1013     {
1014         free_callback(&cb);
1015         errorf("pgconnect(): Callback object is destructed.\n");
1016         /* NOTREACHED */
1017         return arg;
1018     }
1019 
1020     /* Check the callback object if it has a connection already */
1021 
1022     db = find_current_connection(cb_object);
1023     if (db)
1024     {
1025         if (db->state == PG_UNCONNECTED)
1026             dealloc_dbconn(db);
1027         else
1028         {
1029             free_callback(&cb);
1030             errorf("pgconnect(): Already connected\n");
1031             /* NOTREACHED */
1032             return arg;
1033         }
1034     }
1035 
1036     /* Connect to the database */
1037 
1038     db = alloc_dbconn();
1039     db->callback = cb;
1040 
1041     st = pgconnect(db, get_txt(arg[0].u.str));
1042     if (st < 0)
1043         pgclose(db);
1044 
1045     free_svalue(arg); /* the callback entries are gone already */
1046     put_number(arg, st);
1047     return arg;
1048 } /* f_pg_connect() */
1049 
1050 /*-------------------------------------------------------------------------*/
1051 svalue_t *
f_pg_pending(svalue_t * sp)1052 f_pg_pending (svalue_t *sp)
1053 
1054 /* EFUN pg_pending()
1055  *
1056  *   int pg_pending ()
1057  *   int pg_pending (object obj)
1058  *
1059  * Return the number of pending queries for the connection on the given
1060  * object <obj> (default is the current object). The object has no
1061  * database connection, return -1.
1062  */
1063 
1064 {
1065     dbconn_t *db;
1066     int       count = -1;
1067 
1068     check_privilege(instrs[F_PG_PENDING].name, MY_TRUE, sp);
1069 
1070     db = find_current_connection(sp->u.ob);
1071     if (db)
1072     {
1073         query_queue_t * qu;
1074 
1075         for (count = 0, qu = db->queue
1076             ; qu != NULL
1077             ; count++, qu = qu->next
1078             ) NOOP;
1079     }
1080 
1081     free_svalue(sp);
1082     put_number(sp, count);
1083     return sp;
1084 } /* f_pg_pending() */
1085 
1086 /*-------------------------------------------------------------------------*/
1087 svalue_t *
v_pg_query(svalue_t * sp,int numarg)1088 v_pg_query (svalue_t *sp, int numarg)
1089 
1090 /* EFUN pg_query()
1091  *
1092  *  int pg_query (string query)
1093  *  int pg_query (string query, int flags)
1094  *
1095  * Queue a new query <query> to the database connection on the current
1096  * object. Return the unique id of the query. The query result itself
1097  * will be passed as argument to the callback function.
1098  *
1099  * <flags> can be one of these values:
1100  *   PG_RESULT_ARRAY: Pass the query result as array.
1101  *   PG_RESULT_MAP:   Pass the query result as mapping.
1102  */
1103 
1104 {
1105     dbconn_t *db;
1106     query_queue_t *q;
1107     int flags = PG_RESULT_ARRAY;
1108 
1109     check_privilege(instrs[F_PG_QUERY].name, MY_TRUE, sp);
1110 
1111     if (numarg == 2)
1112     {
1113         flags = sp->u.number;
1114         sp--;
1115     }
1116 
1117     db = find_current_connection(current_object);
1118     if (!db)
1119         errorf("pgquery(): not connected\n");
1120 
1121     q = queue(db, get_txt(sp->u.str));
1122     q->flags = flags;
1123     if (db->state == PG_IDLE)
1124         db->state = PG_SENDQUERY;
1125 
1126     free_svalue(sp);
1127     put_number(sp, q->id);
1128     return sp;
1129 } /* f_pg_query() */
1130 
1131 /*-------------------------------------------------------------------------*/
1132 svalue_t *
f_pg_close(svalue_t * sp)1133 f_pg_close (svalue_t *sp)
1134 
1135 /* EFUN pg_close()
1136  *
1137  *   void pg_close()
1138  *
1139  * Close the database connection for the current object, if there is one.
1140  */
1141 
1142 {
1143     dbconn_t *db;
1144 
1145     check_privilege(instrs[F_PG_CLOSE].name, MY_TRUE, sp);
1146 
1147     db = find_current_connection(current_object);
1148     if (db)
1149         pgclose(db);
1150 
1151     return sp;
1152 } /* f_pg_close() */
1153 
1154 /*-------------------------------------------------------------------------*/
1155 svalue_t *
f_pg_conv_string(svalue_t * sp)1156 f_pg_conv_string (svalue_t *sp)
1157 
1158 /* EFUN pg_escapeString
1159  *
1160  * string pg_conv_string(string input)
1161  *
1162  * Escape a string for use within an SQL command.
1163  */
1164 {
1165     string_t *escaped;
1166     int size = mstrsize(sp->u.str);
1167     memsafe(escaped = alloc_mstring(2 * size), 2 * size
1168                                              , "escaped sql string");
1169 
1170     // PQescapeString(char *to, char *from, size_t length);
1171     PQescapeString( (unsigned char *)get_txt(escaped)
1172                   , (unsigned char *)get_txt(sp->u.str), size);
1173     free_string_svalue(sp);
1174     put_string(sp, escaped);
1175     return sp;
1176 } /* pg_conv_string() */
1177 
1178 /*=========================================================================*/
1179 
1180 /*                          GC SUPPORT                                     */
1181 
1182 #ifdef GC_SUPPORT
1183 
1184 /*-------------------------------------------------------------------------*/
1185 void
pg_clear_refs(void)1186 pg_clear_refs (void)
1187 
1188 /* GC Support: Clear all references from the database connections
1189  */
1190 
1191 {
1192     dbconn_t *dbconn;
1193 
1194     for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
1195     {
1196         clear_ref_in_callback(&(dbconn->callback));
1197     }
1198 } /* pg_clear_refs() */
1199 
1200 /*-------------------------------------------------------------------------*/
1201 void
pg_count_refs(void)1202 pg_count_refs (void)
1203 
1204 /* GC Support: Count all references from the database connections
1205  */
1206 
1207 {
1208     dbconn_t *dbconn;
1209 
1210     for (dbconn = head; dbconn != NULL; dbconn = dbconn->next)
1211     {
1212         query_queue_t *qu;
1213 
1214         note_malloced_block_ref(dbconn);
1215         count_ref_in_callback(&(dbconn->callback));
1216 
1217         for (qu = dbconn->queue; qu != NULL; qu = qu->next)
1218         {
1219             note_malloced_block_ref(qu);
1220             note_malloced_block_ref(qu->str);
1221         }
1222     }
1223 } /* pg_count_refs() */
1224 
1225 #endif /* GC_SUPPORT */
1226 
1227 /*-------------------------------------------------------------------------*/
1228 
1229 #endif /* USE_PGSQL */
1230 
1231 /*************************************************************************/
1232