1 /* connection_int.c - code used by the connection object
2  *
3  * Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org>
4  * Copyright (C) 2020-2021 The Psycopg Team
5  *
6  * This file is part of psycopg.
7  *
8  * psycopg2 is free software: you can redistribute it and/or modify it
9  * under the terms of the GNU Lesser General Public License as published
10  * by the Free Software Foundation, either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * In addition, as a special exception, the copyright holders give
14  * permission to link this program with the OpenSSL library (or with
15  * modified versions of OpenSSL that use the same license as OpenSSL),
16  * and distribute linked combinations including the two.
17  *
18  * You must obey the GNU Lesser General Public License in all respects for
19  * all of the code used other than OpenSSL.
20  *
21  * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
22  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
24  * License for more details.
25  */
26 
27 #define PSYCOPG_MODULE
28 #include "psycopg/psycopg.h"
29 
30 #include "psycopg/connection.h"
31 #include "psycopg/cursor.h"
32 #include "psycopg/pqpath.h"
33 #include "psycopg/green.h"
34 #include "psycopg/notify.h"
35 
36 #include <stdlib.h>
37 #include <string.h>
38 
39 /* String indexes match the ISOLATION_LEVEL_* consts */
40 const char *srv_isolevels[] = {
41     NULL, /* autocommit */
42     "READ COMMITTED",
43     "REPEATABLE READ",
44     "SERIALIZABLE",
45     "READ UNCOMMITTED",
46     "default"       /* only to set GUC, not for BEGIN */
47 };
48 
49 /* Read only false, true */
50 const char *srv_readonly[] = {
51     " READ WRITE",
52     " READ ONLY",
53     ""      /* default */
54 };
55 
56 /* Deferrable false, true */
57 const char *srv_deferrable[] = {
58     " NOT DEFERRABLE",
59     " DEFERRABLE",
60     ""      /* default */
61 };
62 
63 /* On/Off/Default GUC states
64  */
65 const char *srv_state_guc[] = {
66     "off",
67     "on",
68     "default"
69 };
70 
71 
72 const int SRV_STATE_UNCHANGED = -1;
73 
74 
75 /* Return a new "string" from a char* from the database.
76  *
77  * On Py2 just get a string, on Py3 decode it in the connection codec.
78  *
79  * Use a fallback if the connection is NULL.
80  */
81 PyObject *
conn_text_from_chars(connectionObject * self,const char * str)82 conn_text_from_chars(connectionObject *self, const char *str)
83 {
84     return psyco_text_from_chars_safe(str, -1, self ? self->pydecoder : NULL);
85 }
86 
87 
88 /* Encode an unicode object into a bytes object in the connection encoding.
89  *
90  * If no connection or encoding is available, default to utf8
91  */
92 PyObject *
conn_encode(connectionObject * self,PyObject * u)93 conn_encode(connectionObject *self, PyObject *u)
94 {
95     PyObject *t = NULL;
96     PyObject *rv = NULL;
97 
98     if (!(self && self->pyencoder)) {
99         rv = PyUnicode_AsUTF8String(u);
100         goto exit;
101     }
102 
103     if (!(t = PyObject_CallFunctionObjArgs(self->pyencoder, u, NULL))) {
104         goto exit;
105     }
106 
107     if (!(rv = PyTuple_GetItem(t, 0))) { goto exit; }
108     Py_INCREF(rv);
109 
110 exit:
111     Py_XDECREF(t);
112 
113     return rv;
114 }
115 
116 
117 /* decode a c string into a Python unicode in the connection encoding
118  *
119  * len can be < 0: in this case it will be calculated
120  *
121  * If no connection or encoding is available, default to utf8
122  */
123 PyObject *
conn_decode(connectionObject * self,const char * str,Py_ssize_t len)124 conn_decode(connectionObject *self, const char *str, Py_ssize_t len)
125 {
126     if (len < 0) { len = strlen(str); }
127 
128     if (self) {
129         if (self->cdecoder) {
130             return self->cdecoder(str, len, NULL);
131         }
132         else if (self->pydecoder) {
133             PyObject *b = NULL;
134             PyObject *t = NULL;
135             PyObject *rv = NULL;
136 
137             if (!(b = Bytes_FromStringAndSize(str, len))) { goto error; }
138             if (!(t = PyObject_CallFunctionObjArgs(self->pydecoder, b, NULL))) {
139                 goto error;
140             }
141             if (!(rv = PyTuple_GetItem(t, 0))) { goto error; }
142             Py_INCREF(rv);      /* PyTuple_GetItem gives a borrowes one */
143 error:
144             Py_XDECREF(t);
145             Py_XDECREF(b);
146             return rv;
147         }
148         else {
149             return PyUnicode_FromStringAndSize(str, len);
150         }
151     }
152     else {
153         return PyUnicode_FromStringAndSize(str, len);
154     }
155 }
156 
157 /* conn_notice_callback - process notices */
158 
159 static void
conn_notice_callback(void * args,const char * message)160 conn_notice_callback(void *args, const char *message)
161 {
162     struct connectionObject_notice *notice;
163     connectionObject *self = (connectionObject *)args;
164 
165     Dprintf("conn_notice_callback: %s", message);
166 
167     /* NOTE: if we get here and the connection is unlocked then there is a
168        problem but this should happen because the notice callback is only
169        called from libpq and when we're inside libpq the connection is usually
170        locked.
171     */
172     notice = (struct connectionObject_notice *)
173         malloc(sizeof(struct connectionObject_notice));
174     if (NULL == notice) {
175         /* Discard the notice in case of failed allocation. */
176         return;
177     }
178     notice->next = NULL;
179     notice->message = strdup(message);
180     if (NULL == notice->message) {
181         free(notice);
182         return;
183     }
184 
185     if (NULL == self->last_notice) {
186         self->notice_pending = self->last_notice = notice;
187     }
188     else {
189         self->last_notice->next = notice;
190         self->last_notice = notice;
191     }
192 }
193 
194 /* Expose the notices received as Python objects.
195  *
196  * The function should be called with the connection lock and the GIL.
197  */
198 void
conn_notice_process(connectionObject * self)199 conn_notice_process(connectionObject *self)
200 {
201     struct connectionObject_notice *notice;
202     PyObject *msg = NULL;
203     PyObject *tmp = NULL;
204     static PyObject *append;
205 
206     if (NULL == self->notice_pending) {
207         return;
208     }
209 
210     if (!append) {
211         if (!(append = Text_FromUTF8("append"))) {
212             goto error;
213         }
214     }
215 
216     notice = self->notice_pending;
217     while (notice != NULL) {
218         Dprintf("conn_notice_process: %s", notice->message);
219 
220         if (!(msg = conn_text_from_chars(self, notice->message))) { goto error; }
221 
222         if (!(tmp = PyObject_CallMethodObjArgs(
223                 self->notice_list, append, msg, NULL))) {
224 
225             goto error;
226         }
227 
228         Py_DECREF(tmp); tmp = NULL;
229         Py_DECREF(msg); msg = NULL;
230 
231         notice = notice->next;
232     }
233 
234     /* Remove the oldest item if the queue is getting too long. */
235     if (PyList_Check(self->notice_list)) {
236         Py_ssize_t nnotices;
237         nnotices = PyList_GET_SIZE(self->notice_list);
238         if (nnotices > CONN_NOTICES_LIMIT) {
239             if (-1 == PySequence_DelSlice(self->notice_list,
240                     0, nnotices - CONN_NOTICES_LIMIT)) {
241                 PyErr_Clear();
242             }
243         }
244     }
245 
246     conn_notice_clean(self);
247     return;
248 
249 error:
250     Py_XDECREF(tmp);
251     Py_XDECREF(msg);
252     conn_notice_clean(self);
253 
254     /* TODO: the caller doesn't expects errors from us */
255     PyErr_Clear();
256 }
257 
258 void
conn_notice_clean(connectionObject * self)259 conn_notice_clean(connectionObject *self)
260 {
261     struct connectionObject_notice *tmp, *notice;
262 
263     notice = self->notice_pending;
264 
265     while (notice != NULL) {
266         tmp = notice;
267         notice = notice->next;
268         free(tmp->message);
269         free(tmp);
270     }
271 
272     self->last_notice = self->notice_pending = NULL;
273 }
274 
275 
276 /* conn_notifies_process - make received notification available
277  *
278  * The function should be called with the connection lock and holding the GIL.
279  */
280 
281 void
conn_notifies_process(connectionObject * self)282 conn_notifies_process(connectionObject *self)
283 {
284     PGnotify *pgn = NULL;
285     PyObject *notify = NULL;
286     PyObject *pid = NULL, *channel = NULL, *payload = NULL;
287     PyObject *tmp = NULL;
288 
289     static PyObject *append;
290 
291     if (!append) {
292         if (!(append = Text_FromUTF8("append"))) {
293             goto error;
294         }
295     }
296 
297     while ((pgn = PQnotifies(self->pgconn)) != NULL) {
298 
299         Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
300                 (int) pgn->be_pid, pgn->relname);
301 
302         if (!(pid = PyInt_FromLong((long)pgn->be_pid))) { goto error; }
303         if (!(channel = conn_text_from_chars(self, pgn->relname))) { goto error; }
304         if (!(payload = conn_text_from_chars(self, pgn->extra))) { goto error; }
305 
306         if (!(notify = PyObject_CallFunctionObjArgs((PyObject *)&notifyType,
307                 pid, channel, payload, NULL))) {
308             goto error;
309         }
310 
311         Py_DECREF(pid); pid = NULL;
312         Py_DECREF(channel); channel = NULL;
313         Py_DECREF(payload); payload = NULL;
314 
315         if (!(tmp = PyObject_CallMethodObjArgs(
316                 self->notifies, append, notify, NULL))) {
317             goto error;
318         }
319         Py_DECREF(tmp); tmp = NULL;
320 
321         Py_DECREF(notify); notify = NULL;
322         PQfreemem(pgn); pgn = NULL;
323     }
324     return;  /* no error */
325 
326 error:
327     if (pgn) { PQfreemem(pgn); }
328     Py_XDECREF(tmp);
329     Py_XDECREF(notify);
330     Py_XDECREF(pid);
331     Py_XDECREF(channel);
332     Py_XDECREF(payload);
333 
334     /* TODO: callers currently don't expect an error from us */
335     PyErr_Clear();
336 
337 }
338 
339 
340 /*
341  * the conn_get_* family of functions makes it easier to obtain the connection
342  * parameters from query results or by interrogating the connection itself
343 */
344 
345 int
conn_get_standard_conforming_strings(PGconn * pgconn)346 conn_get_standard_conforming_strings(PGconn *pgconn)
347 {
348     int equote;
349     const char *scs;
350     /*
351      * The presence of the 'standard_conforming_strings' parameter
352      * means that the server _accepts_ the E'' quote.
353      *
354      * If the parameter is off, the PQescapeByteaConn returns
355      * backslash escaped strings (e.g. '\001' -> "\\001"),
356      * so the E'' quotes are required to avoid warnings
357      * if 'escape_string_warning' is set.
358      *
359      * If the parameter is on, the PQescapeByteaConn returns
360      * not escaped strings (e.g. '\001' -> "\001"), relying on the
361      * fact that the '\' will pass untouched the string parser.
362      * In this case the E'' quotes are NOT to be used.
363      */
364     scs = PQparameterStatus(pgconn, "standard_conforming_strings");
365     Dprintf("conn_connect: server standard_conforming_strings parameter: %s",
366         scs ? scs : "unavailable");
367 
368     equote = (scs && (0 == strcmp("off", scs)));
369     Dprintf("conn_connect: server requires E'' quotes: %s",
370             equote ? "YES" : "NO");
371 
372     return equote;
373 }
374 
375 
376 /* Remove irrelevant chars from encoding name and turn it uppercase.
377  *
378  * Return a buffer allocated on Python heap into 'clean' and return 0 on
379  * success, otherwise return -1 and set an exception.
380  */
381 RAISES_NEG static int
clear_encoding_name(const char * enc,char ** clean)382 clear_encoding_name(const char *enc, char **clean)
383 {
384     const char *i = enc;
385     char *j, *buf;
386     int rv = -1;
387 
388     /* convert to upper case and remove '-' and '_' from string */
389     if (!(j = buf = PyMem_Malloc(strlen(enc) + 1))) {
390         PyErr_NoMemory();
391         goto exit;
392     }
393 
394     while (*i) {
395         if (!isalnum(*i)) {
396             ++i;
397         }
398         else {
399             *j++ = toupper(*i++);
400         }
401     }
402     *j = '\0';
403 
404     Dprintf("clear_encoding_name: %s -> %s", enc, buf);
405     *clean = buf;
406     rv = 0;
407 
408 exit:
409     return rv;
410 }
411 
412 /* set fast access functions according to the currently selected encoding
413  */
414 static void
conn_set_fast_codec(connectionObject * self)415 conn_set_fast_codec(connectionObject *self)
416 {
417     Dprintf("conn_set_fast_codec: encoding=%s", self->encoding);
418 
419     if (0 == strcmp(self->encoding, "UTF8")) {
420         Dprintf("conn_set_fast_codec: PyUnicode_DecodeUTF8");
421         self->cdecoder = PyUnicode_DecodeUTF8;
422         return;
423     }
424 
425     if (0 == strcmp(self->encoding, "LATIN1")) {
426         Dprintf("conn_set_fast_codec: PyUnicode_DecodeLatin1");
427         self->cdecoder = PyUnicode_DecodeLatin1;
428         return;
429     }
430 
431     Dprintf("conn_set_fast_codec: no fast codec");
432     self->cdecoder = NULL;
433 }
434 
435 
436 /* Return the Python encoding from a PostgreSQL encoding.
437  *
438  * Optionally return the clean version of the postgres encoding too
439  */
440 PyObject *
conn_pgenc_to_pyenc(const char * encoding,char ** clean_encoding)441 conn_pgenc_to_pyenc(const char *encoding, char **clean_encoding)
442 {
443     char *pgenc = NULL;
444     PyObject *rv = NULL;
445 
446     if (0 > clear_encoding_name(encoding, &pgenc)) { goto exit; }
447     if (!(rv = PyDict_GetItemString(psycoEncodings, pgenc))) {
448         PyErr_Format(OperationalError,
449             "no Python encoding for PostgreSQL encoding '%s'", pgenc);
450         goto exit;
451     }
452     Py_INCREF(rv);
453 
454     if (clean_encoding) {
455         *clean_encoding = pgenc;
456     }
457     else {
458         PyMem_Free(pgenc);
459     }
460 
461 exit:
462     return rv;
463 }
464 
465 /* Convert a Postgres encoding into Python encoding and decoding functions.
466  *
467  * Set clean_encoding to a clean version of the Postgres encoding name
468  * and pyenc and pydec to python codec functions.
469  *
470  * Return 0 on success, else -1 and set an exception.
471  */
472 RAISES_NEG static int
conn_get_python_codec(const char * encoding,char ** clean_encoding,PyObject ** pyenc,PyObject ** pydec)473 conn_get_python_codec(const char *encoding,
474     char **clean_encoding, PyObject **pyenc, PyObject **pydec)
475 {
476     int rv = -1;
477     char *pgenc = NULL;
478     PyObject *encname = NULL;
479     PyObject *enc_tmp = NULL, *dec_tmp = NULL;
480 
481     /* get the Python name of the encoding as a C string */
482     if (!(encname = conn_pgenc_to_pyenc(encoding, &pgenc))) { goto exit; }
483     if (!(encname = psyco_ensure_bytes(encname))) { goto exit; }
484 
485     /* Look up the codec functions */
486     if (!(enc_tmp = PyCodec_Encoder(Bytes_AS_STRING(encname)))) { goto exit; }
487     if (!(dec_tmp = PyCodec_Decoder(Bytes_AS_STRING(encname)))) { goto exit; }
488 
489     /* success */
490     *pyenc = enc_tmp; enc_tmp = NULL;
491     *pydec = dec_tmp; dec_tmp = NULL;
492     *clean_encoding = pgenc; pgenc = NULL;
493     rv = 0;
494 
495 exit:
496     Py_XDECREF(enc_tmp);
497     Py_XDECREF(dec_tmp);
498     Py_XDECREF(encname);
499     PyMem_Free(pgenc);
500 
501     return rv;
502 }
503 
504 
505 /* Store the encoding in the pgconn->encoding field and set the other related
506  * encoding fields in the connection structure.
507  *
508  * Return 0 on success, else -1 and set an exception.
509  */
510 RAISES_NEG static int
conn_store_encoding(connectionObject * self,const char * encoding)511 conn_store_encoding(connectionObject *self, const char *encoding)
512 {
513     int rv = -1;
514     char *pgenc = NULL;
515     PyObject *enc_tmp = NULL, *dec_tmp = NULL;
516 
517     if (0 > conn_get_python_codec(encoding, &pgenc, &enc_tmp, &dec_tmp)) {
518         goto exit;
519     }
520 
521     /* Good, success: store the encoding/codec in the connection. */
522     {
523         char *tmp = self->encoding;
524         self->encoding = pgenc;
525         PyMem_Free(tmp);
526         pgenc = NULL;
527     }
528 
529     Py_CLEAR(self->pyencoder);
530     self->pyencoder = enc_tmp;
531     enc_tmp = NULL;
532 
533     Py_CLEAR(self->pydecoder);
534     self->pydecoder = dec_tmp;
535     dec_tmp = NULL;
536 
537     conn_set_fast_codec(self);
538 
539     rv = 0;
540 
541 exit:
542     Py_XDECREF(enc_tmp);
543     Py_XDECREF(dec_tmp);
544     PyMem_Free(pgenc);
545     return rv;
546 }
547 
548 
549 /* Read the client encoding from the backend and store it in the connection.
550  *
551  * Return 0 on success, else -1.
552  */
553 RAISES_NEG static int
conn_read_encoding(connectionObject * self,PGconn * pgconn)554 conn_read_encoding(connectionObject *self, PGconn *pgconn)
555 {
556     const char *encoding;
557     int rv = -1;
558 
559     encoding = PQparameterStatus(pgconn, "client_encoding");
560     Dprintf("conn_connect: client encoding: %s", encoding ? encoding : "(none)");
561     if (!encoding) {
562         PyErr_SetString(OperationalError,
563             "server didn't return client encoding");
564         goto exit;
565     }
566 
567     if (0 > conn_store_encoding(self, encoding)) {
568         goto exit;
569     }
570 
571     rv = 0;
572 
573 exit:
574     return rv;
575 }
576 
577 
578 int
conn_get_protocol_version(PGconn * pgconn)579 conn_get_protocol_version(PGconn *pgconn)
580 {
581     int ret;
582     ret = PQprotocolVersion(pgconn);
583     Dprintf("conn_connect: using protocol %d", ret);
584     return ret;
585 }
586 
587 int
conn_get_server_version(PGconn * pgconn)588 conn_get_server_version(PGconn *pgconn)
589 {
590     return (int)PQserverVersion(pgconn);
591 }
592 
593 /* set up the cancel key of the connection.
594  * On success return 0, else set an exception and return -1
595  */
596 RAISES_NEG static int
conn_setup_cancel(connectionObject * self,PGconn * pgconn)597 conn_setup_cancel(connectionObject *self, PGconn *pgconn)
598 {
599     if (self->cancel) {
600         PQfreeCancel(self->cancel);
601     }
602 
603     if (!(self->cancel = PQgetCancel(self->pgconn))) {
604         PyErr_SetString(OperationalError, "can't get cancellation key");
605         return -1;
606     }
607 
608     return 0;
609 }
610 
611 /* Return 1 if the "replication" keyword is set in the DSN, 0 otherwise */
612 static int
dsn_has_replication(char * pgdsn)613 dsn_has_replication(char *pgdsn)
614 {
615     int ret = 0;
616     PQconninfoOption *connopts, *ptr;
617 
618     connopts = PQconninfoParse(pgdsn, NULL);
619 
620     for(ptr = connopts; ptr->keyword != NULL; ptr++) {
621       if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL)
622         ret = 1;
623     }
624 
625     PQconninfoFree(connopts);
626 
627     return ret;
628 }
629 
630 
631 /* Return 1 if the server datestyle allows us to work without problems,
632    0 if it needs to be set to something better, e.g. ISO. */
633 static int
conn_is_datestyle_ok(PGconn * pgconn)634 conn_is_datestyle_ok(PGconn *pgconn)
635 {
636     const char *ds;
637 
638     ds = PQparameterStatus(pgconn, "DateStyle");
639     Dprintf("conn_connect: DateStyle %s", ds);
640 
641     /* pgbouncer does not pass on DateStyle */
642     if (ds == NULL)
643       return 0;
644 
645     /* Return true if ds starts with "ISO"
646      * e.g. "ISO, DMY" is fine, "German" not. */
647     return (ds[0] == 'I' && ds[1] == 'S' && ds[2] == 'O');
648 }
649 
650 
651 /* conn_setup - setup and read basic information about the connection */
652 
653 RAISES_NEG int
conn_setup(connectionObject * self)654 conn_setup(connectionObject *self)
655 {
656     int rv = -1;
657 
658     self->equote = conn_get_standard_conforming_strings(self->pgconn);
659     self->server_version = conn_get_server_version(self->pgconn);
660     self->protocol = conn_get_protocol_version(self->pgconn);
661     if (3 != self->protocol) {
662         PyErr_SetString(InterfaceError, "only protocol 3 supported");
663         goto exit;
664     }
665 
666     if (0 > conn_read_encoding(self, self->pgconn)) {
667         goto exit;
668     }
669 
670     if (0 > conn_setup_cancel(self, self->pgconn)) {
671         goto exit;
672     }
673 
674     Py_BEGIN_ALLOW_THREADS;
675     pthread_mutex_lock(&self->lock);
676     Py_BLOCK_THREADS;
677 
678     if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
679         int res;
680         Py_UNBLOCK_THREADS;
681         res = pq_set_guc_locked(self, "datestyle", "ISO", &_save);
682         Py_BLOCK_THREADS;
683         if (res < 0) {
684             pq_complete_error(self);
685             goto unlock;
686         }
687     }
688 
689     /* for reset */
690     self->autocommit = 0;
691     self->isolevel = ISOLATION_LEVEL_DEFAULT;
692     self->readonly = STATE_DEFAULT;
693     self->deferrable = STATE_DEFAULT;
694 
695     /* success */
696     rv = 0;
697 
698 unlock:
699     Py_UNBLOCK_THREADS;
700     pthread_mutex_unlock(&self->lock);
701     Py_END_ALLOW_THREADS;
702 
703 exit:
704     return rv;
705 }
706 
707 /* conn_connect - execute a connection to the database */
708 
709 static int
_conn_sync_connect(connectionObject * self,const char * dsn)710 _conn_sync_connect(connectionObject *self, const char *dsn)
711 {
712     int green;
713 
714     /* store this value to prevent inconsistencies due to a change
715      * in the middle of the function. */
716     green = psyco_green();
717     if (!green) {
718         Py_BEGIN_ALLOW_THREADS;
719         self->pgconn = PQconnectdb(dsn);
720         Py_END_ALLOW_THREADS;
721         Dprintf("conn_connect: new PG connection at %p", self->pgconn);
722     }
723     else {
724         Py_BEGIN_ALLOW_THREADS;
725         self->pgconn = PQconnectStart(dsn);
726         Py_END_ALLOW_THREADS;
727         Dprintf("conn_connect: new green PG connection at %p", self->pgconn);
728     }
729 
730     if (!self->pgconn)
731     {
732         Dprintf("conn_connect: PQconnectdb(%s) FAILED", dsn);
733         PyErr_SetString(OperationalError, "PQconnectdb() failed");
734         return -1;
735     }
736     else if (PQstatus(self->pgconn) == CONNECTION_BAD)
737     {
738         Dprintf("conn_connect: PQconnectdb(%s) returned BAD", dsn);
739         PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
740         return -1;
741     }
742 
743     PQsetNoticeProcessor(self->pgconn, conn_notice_callback, (void*)self);
744 
745     /* if the connection is green, wait to finish connection */
746     if (green) {
747         if (0 > pq_set_non_blocking(self, 1)) {
748             return -1;
749         }
750         if (0 != psyco_wait(self)) {
751             return -1;
752         }
753     }
754 
755     /* From here the connection is considered ready: with the new status,
756      * poll() will use PQisBusy instead of PQconnectPoll.
757      */
758     self->status = CONN_STATUS_READY;
759 
760     if (conn_setup(self) == -1) {
761         return -1;
762     }
763 
764     return 0;
765 }
766 
767 static int
_conn_async_connect(connectionObject * self,const char * dsn)768 _conn_async_connect(connectionObject *self, const char *dsn)
769 {
770     PGconn *pgconn;
771 
772     self->pgconn = pgconn = PQconnectStart(dsn);
773 
774     Dprintf("conn_connect: new postgresql connection at %p", pgconn);
775 
776     if (pgconn == NULL)
777     {
778         Dprintf("conn_connect: PQconnectStart(%s) FAILED", dsn);
779         PyErr_SetString(OperationalError, "PQconnectStart() failed");
780         return -1;
781     }
782     else if (PQstatus(pgconn) == CONNECTION_BAD)
783     {
784         Dprintf("conn_connect: PQconnectdb(%s) returned BAD", dsn);
785         PyErr_SetString(OperationalError, PQerrorMessage(pgconn));
786         return -1;
787     }
788 
789     PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
790 
791     /* Set the connection to nonblocking now. */
792     if (pq_set_non_blocking(self, 1) != 0) {
793         return -1;
794     }
795 
796     /* The connection will be completed banging on poll():
797      * First with _conn_poll_connecting() that will finish connection,
798      * then with _conn_poll_setup_async() that will do the same job
799      * of setup_async(). */
800 
801     return 0;
802 }
803 
804 int
conn_connect(connectionObject * self,const char * dsn,long int async)805 conn_connect(connectionObject *self, const char *dsn, long int async)
806 {
807     int rv;
808 
809     if (async == 1) {
810       Dprintf("con_connect: connecting in ASYNC mode");
811       rv = _conn_async_connect(self, dsn);
812     }
813     else {
814       Dprintf("con_connect: connecting in SYNC mode");
815       rv = _conn_sync_connect(self, dsn);
816     }
817 
818     if (rv != 0) {
819         /* connection failed, so let's close ourselves */
820         self->closed = 2;
821     }
822 
823     return rv;
824 }
825 
826 
827 /* poll during a connection attempt until the connection has established. */
828 
829 static int
_conn_poll_connecting(connectionObject * self)830 _conn_poll_connecting(connectionObject *self)
831 {
832     int res = PSYCO_POLL_ERROR;
833     const char *msg;
834 
835     Dprintf("conn_poll: poll connecting");
836     switch (PQconnectPoll(self->pgconn)) {
837     case PGRES_POLLING_OK:
838         res = PSYCO_POLL_OK;
839         break;
840     case PGRES_POLLING_READING:
841         res = PSYCO_POLL_READ;
842         break;
843     case PGRES_POLLING_WRITING:
844         res = PSYCO_POLL_WRITE;
845         break;
846     case PGRES_POLLING_FAILED:
847     case PGRES_POLLING_ACTIVE:
848         msg = PQerrorMessage(self->pgconn);
849         if (!(msg && *msg)) {
850             msg = "asynchronous connection failed";
851         }
852         PyErr_SetString(OperationalError, msg);
853         res = PSYCO_POLL_ERROR;
854         break;
855     }
856 
857     return res;
858 }
859 
860 
861 /* Advance to the next state after an attempt of flushing output */
862 
863 static int
_conn_poll_advance_write(connectionObject * self)864 _conn_poll_advance_write(connectionObject *self)
865 {
866     int res;
867     int flush;
868 
869     Dprintf("conn_poll: poll writing");
870 
871     flush = PQflush(self->pgconn);
872     Dprintf("conn_poll: PQflush() = %i", flush);
873 
874     switch (flush) {
875     case  0:  /* success */
876         /* we've finished pushing the query to the server. Let's start
877           reading the results. */
878         Dprintf("conn_poll: async_status -> ASYNC_READ");
879         self->async_status = ASYNC_READ;
880         res = PSYCO_POLL_READ;
881         break;
882     case  1:  /* would block */
883         res = PSYCO_POLL_WRITE;
884         break;
885     case -1:  /* error */
886         PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
887         res = PSYCO_POLL_ERROR;
888         break;
889     default:
890         Dprintf("conn_poll: unexpected result from flush: %d", flush);
891         res = PSYCO_POLL_ERROR;
892         break;
893     }
894     return res;
895 }
896 
897 
898 /* Advance to the next state after reading results */
899 
900 static int
_conn_poll_advance_read(connectionObject * self)901 _conn_poll_advance_read(connectionObject *self)
902 {
903     int res;
904     int busy;
905 
906     Dprintf("conn_poll: poll reading");
907 
908     busy = pq_get_result_async(self);
909 
910     switch (busy) {
911     case 0: /* result is ready */
912         Dprintf("conn_poll: async_status -> ASYNC_DONE");
913         self->async_status = ASYNC_DONE;
914         res = PSYCO_POLL_OK;
915         break;
916     case 1: /* result not ready: fd would block */
917         res = PSYCO_POLL_READ;
918         break;
919     case -1: /* ouch, error */
920         res = PSYCO_POLL_ERROR;
921         break;
922     default:
923         Dprintf("conn_poll: unexpected result from pq_get_result_async: %d",
924             busy);
925         res = PSYCO_POLL_ERROR;
926         break;
927     }
928     return res;
929 }
930 
931 
932 /* Poll the connection for the send query/retrieve result phase
933 
934   Advance the async_status (usually going WRITE -> READ -> DONE) but don't
935   mess with the connection status. */
936 
937 static int
_conn_poll_query(connectionObject * self)938 _conn_poll_query(connectionObject *self)
939 {
940     int res = PSYCO_POLL_ERROR;
941 
942     switch (self->async_status) {
943     case ASYNC_WRITE:
944         Dprintf("conn_poll: async_status = ASYNC_WRITE");
945         res = _conn_poll_advance_write(self);
946         break;
947 
948     case ASYNC_READ:
949         Dprintf("conn_poll: async_status = ASYNC_READ");
950         res = _conn_poll_advance_read(self);
951         break;
952 
953     case ASYNC_DONE:
954         Dprintf("conn_poll: async_status = ASYNC_DONE");
955         /* We haven't asked anything: just check for notifications. */
956         res = _conn_poll_advance_read(self);
957         break;
958 
959     default:
960         Dprintf("conn_poll: in unexpected async status: %d",
961                 self->async_status);
962         res = PSYCO_POLL_ERROR;
963         break;
964     }
965 
966     return res;
967 }
968 
969 /* Advance to the next state during an async connection setup
970  *
971  * If the connection is green, this is performed by the regular
972  * sync code so the queries are sent by conn_setup() while in
973  * CONN_STATUS_READY state.
974  */
975 static int
_conn_poll_setup_async(connectionObject * self)976 _conn_poll_setup_async(connectionObject *self)
977 {
978     int res = PSYCO_POLL_ERROR;
979 
980     switch (self->status) {
981     case CONN_STATUS_CONNECTING:
982         self->equote = conn_get_standard_conforming_strings(self->pgconn);
983         self->protocol = conn_get_protocol_version(self->pgconn);
984         self->server_version = conn_get_server_version(self->pgconn);
985         if (3 != self->protocol) {
986             PyErr_SetString(InterfaceError, "only protocol 3 supported");
987             break;
988         }
989         if (0 > conn_read_encoding(self, self->pgconn)) {
990             break;
991         }
992         if (0 > conn_setup_cancel(self, self->pgconn)) {
993             return -1;
994         }
995 
996         /* asynchronous connections always use isolation level 0, the user is
997          * expected to manage the transactions himself, by sending
998          * (asynchronously) BEGIN and COMMIT statements.
999          */
1000         self->autocommit = 1;
1001 
1002         /* If the datestyle is ISO or anything else good,
1003          * we can skip the CONN_STATUS_DATESTYLE step.
1004          * Note that we cannot change the datestyle on a replication
1005          * connection.
1006          */
1007         if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
1008             Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
1009             self->status = CONN_STATUS_DATESTYLE;
1010             if (0 == pq_send_query(self, psyco_datestyle)) {
1011                 PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
1012                 break;
1013             }
1014             Dprintf("conn_poll: async_status -> ASYNC_WRITE");
1015             self->async_status = ASYNC_WRITE;
1016             res = PSYCO_POLL_WRITE;
1017         }
1018         else {
1019             Dprintf("conn_poll: status -> CONN_STATUS_READY");
1020             self->status = CONN_STATUS_READY;
1021             res = PSYCO_POLL_OK;
1022         }
1023         break;
1024 
1025     case CONN_STATUS_DATESTYLE:
1026         res = _conn_poll_query(self);
1027         if (res == PSYCO_POLL_OK) {
1028             res = PSYCO_POLL_ERROR;
1029             if (self->pgres == NULL
1030                     || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) {
1031                 PyErr_SetString(OperationalError, "can't set datestyle to ISO");
1032                 break;
1033             }
1034             CLEARPGRES(self->pgres);
1035 
1036             Dprintf("conn_poll: status -> CONN_STATUS_READY");
1037             self->status = CONN_STATUS_READY;
1038             res = PSYCO_POLL_OK;
1039         }
1040         break;
1041     }
1042     return res;
1043 }
1044 
1045 
1046 static cursorObject *
_conn_get_async_cursor(connectionObject * self)1047 _conn_get_async_cursor(connectionObject *self) {
1048     PyObject *py_curs;
1049 
1050     if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) {
1051         PyErr_SetString(PyExc_SystemError,
1052             "got null dereferencing cursor weakref");
1053         goto error;
1054     }
1055     if (Py_None == py_curs) {
1056         PyErr_SetString(InterfaceError,
1057             "the asynchronous cursor has disappeared");
1058         goto error;
1059     }
1060 
1061     Py_INCREF(py_curs);
1062     return (cursorObject *)py_curs;
1063 
1064 error:
1065     pq_clear_async(self);
1066     return NULL;
1067 }
1068 
1069 /* conn_poll - Main polling switch
1070  *
1071  * The function is called in all the states and connection types and invokes
1072  * the right "next step".
1073  */
1074 
1075 int
conn_poll(connectionObject * self)1076 conn_poll(connectionObject *self)
1077 {
1078     int res = PSYCO_POLL_ERROR;
1079     Dprintf("conn_poll: status = %d", self->status);
1080 
1081     switch (self->status) {
1082     case CONN_STATUS_SETUP:
1083         Dprintf("conn_poll: status -> CONN_STATUS_SETUP");
1084         self->status = CONN_STATUS_CONNECTING;
1085         res = PSYCO_POLL_WRITE;
1086         break;
1087 
1088     case CONN_STATUS_CONNECTING:
1089         Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
1090         res = _conn_poll_connecting(self);
1091         if (res == PSYCO_POLL_OK && self->async) {
1092             res = _conn_poll_setup_async(self);
1093         }
1094         break;
1095 
1096     case CONN_STATUS_DATESTYLE:
1097         Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
1098         res = _conn_poll_setup_async(self);
1099         break;
1100 
1101     case CONN_STATUS_READY:
1102     case CONN_STATUS_BEGIN:
1103     case CONN_STATUS_PREPARED:
1104         Dprintf("conn_poll: status -> CONN_STATUS_*");
1105         res = _conn_poll_query(self);
1106 
1107         if (res == PSYCO_POLL_OK && self->async && self->async_cursor) {
1108             cursorObject *curs;
1109 
1110             /* An async query has just finished: parse the tuple in the
1111              * target cursor. */
1112             if (!(curs = _conn_get_async_cursor(self))) {
1113                 res = PSYCO_POLL_ERROR;
1114                 break;
1115             }
1116 
1117             curs_set_result(curs, self->pgres);
1118             self->pgres = NULL;
1119 
1120             /* fetch the tuples (if there are any) and build the result. We
1121              * don't care if pq_fetch return 0 or 1, but if there was an error,
1122              * we want to signal it to the caller. */
1123             if (pq_fetch(curs, 0) == -1) {
1124                res = PSYCO_POLL_ERROR;
1125             }
1126 
1127             /* We have finished with our async_cursor */
1128             Py_DECREF(curs);
1129             Py_CLEAR(self->async_cursor);
1130         }
1131         break;
1132 
1133     default:
1134         Dprintf("conn_poll: in unexpected state");
1135         res = PSYCO_POLL_ERROR;
1136     }
1137 
1138     Dprintf("conn_poll: returning %d", res);
1139     return res;
1140 }
1141 
1142 /* conn_close - do anything needed to shut down the connection */
1143 
1144 void
conn_close(connectionObject * self)1145 conn_close(connectionObject *self)
1146 {
1147     /* a connection with closed == 2 still requires cleanup */
1148     if (self->closed == 1) {
1149         return;
1150     }
1151 
1152     /* sets this connection as closed even for other threads; */
1153     Py_BEGIN_ALLOW_THREADS;
1154     pthread_mutex_lock(&self->lock);
1155 
1156     conn_close_locked(self);
1157 
1158     pthread_mutex_unlock(&self->lock);
1159     Py_END_ALLOW_THREADS;
1160 }
1161 
1162 
1163 /* Return a copy of the 'dsn' string with the password scrubbed.
1164  *
1165  * The string returned is allocated on the Python heap.
1166  *
1167  * In case of error return NULL and raise an exception.
1168  */
1169 char *
conn_obscure_password(const char * dsn)1170 conn_obscure_password(const char *dsn)
1171 {
1172     PQconninfoOption *options = NULL;
1173     PyObject *d = NULL, *v = NULL, *pydsn = NULL;
1174     char *rv = NULL;
1175 
1176     if (!dsn) {
1177         PyErr_SetString(InternalError, "unexpected null string");
1178         goto exit;
1179     }
1180 
1181     if (!(options = PQconninfoParse(dsn, NULL))) {
1182         /* unlikely: the dsn was already tested valid */
1183         PyErr_SetString(InternalError, "the connection string is not valid");
1184         goto exit;
1185     }
1186 
1187     if (!(d = psyco_dict_from_conninfo_options(
1188             options, /* include_password = */ 1))) {
1189         goto exit;
1190     }
1191     if (NULL == PyDict_GetItemString(d, "password")) {
1192         /* the dsn doesn't have a password */
1193         psyco_strdup(&rv, dsn, -1);
1194         goto exit;
1195     }
1196 
1197     /* scrub the password and put back the connection string together */
1198     if (!(v = Text_FromUTF8("xxx"))) { goto exit; }
1199     if (0 > PyDict_SetItemString(d, "password", v)) { goto exit; }
1200     if (!(pydsn = psyco_make_dsn(Py_None, d))) { goto exit; }
1201     if (!(pydsn = psyco_ensure_bytes(pydsn))) { goto exit; }
1202 
1203     /* Return the connection string with the password replaced */
1204     psyco_strdup(&rv, Bytes_AS_STRING(pydsn), -1);
1205 
1206 exit:
1207     PQconninfoFree(options);
1208     Py_XDECREF(v);
1209     Py_XDECREF(d);
1210     Py_XDECREF(pydsn);
1211 
1212     return rv;
1213 }
1214 
1215 
1216 /* conn_close_locked - shut down the connection with the lock already taken */
1217 
conn_close_locked(connectionObject * self)1218 void conn_close_locked(connectionObject *self)
1219 {
1220     if (self->closed == 1) {
1221         return;
1222     }
1223 
1224     /* We used to call pq_abort_locked here, but the idea of issuing a
1225      * rollback on close/GC has been considered inappropriate.
1226      *
1227      * Dropping the connection on the server has the same effect as the
1228      * transaction is automatically rolled back. Some middleware, such as
1229      * PgBouncer, have problem with connections closed in the middle of the
1230      * transaction though: to avoid these problems the transaction should be
1231      * closed only in status CONN_STATUS_READY.
1232      */
1233     self->closed = 1;
1234 
1235     /* we need to check the value of pgconn, because we get called even when
1236      * the connection fails! */
1237     if (self->pgconn) {
1238         PQfinish(self->pgconn);
1239         self->pgconn = NULL;
1240         Dprintf("conn_close: PQfinish called");
1241     }
1242 }
1243 
1244 /* conn_commit - commit on a connection */
1245 
1246 RAISES_NEG int
conn_commit(connectionObject * self)1247 conn_commit(connectionObject *self)
1248 {
1249     int res;
1250 
1251     res = pq_commit(self);
1252     return res;
1253 }
1254 
1255 /* conn_rollback - rollback a connection */
1256 
1257 RAISES_NEG int
conn_rollback(connectionObject * self)1258 conn_rollback(connectionObject *self)
1259 {
1260     int res;
1261 
1262     res = pq_abort(self);
1263     return res;
1264 }
1265 
1266 
1267 /* Change the state of the session */
1268 RAISES_NEG int
conn_set_session(connectionObject * self,int autocommit,int isolevel,int readonly,int deferrable)1269 conn_set_session(connectionObject *self, int autocommit,
1270         int isolevel, int readonly, int deferrable)
1271 {
1272     int rv = -1;
1273     int want_autocommit = autocommit == SRV_STATE_UNCHANGED ?
1274         self->autocommit : autocommit;
1275 
1276     if (deferrable != SRV_STATE_UNCHANGED && self->server_version < 90100) {
1277         PyErr_SetString(ProgrammingError,
1278             "the 'deferrable' setting is only available"
1279             " from PostgreSQL 9.1");
1280         goto exit;
1281     }
1282 
1283     /* Promote an isolation level to one of the levels supported by the server */
1284     if (self->server_version < 80000) {
1285         if (isolevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1286             isolevel = ISOLATION_LEVEL_READ_COMMITTED;
1287         }
1288         else if (isolevel == ISOLATION_LEVEL_REPEATABLE_READ) {
1289             isolevel = ISOLATION_LEVEL_SERIALIZABLE;
1290         }
1291     }
1292 
1293     Py_BEGIN_ALLOW_THREADS;
1294     pthread_mutex_lock(&self->lock);
1295 
1296     if (want_autocommit) {
1297         /* we are or are going in autocommit state, so no BEGIN will be issued:
1298          * configure the session with the characteristics requested */
1299         if (isolevel != SRV_STATE_UNCHANGED) {
1300             if (0 > pq_set_guc_locked(self,
1301                     "default_transaction_isolation", srv_isolevels[isolevel],
1302                     &_save)) {
1303                 goto endlock;
1304             }
1305         }
1306         if (readonly != SRV_STATE_UNCHANGED) {
1307             if (0 > pq_set_guc_locked(self,
1308                     "default_transaction_read_only", srv_state_guc[readonly],
1309                     &_save)) {
1310                 goto endlock;
1311             }
1312         }
1313         if (deferrable != SRV_STATE_UNCHANGED) {
1314             if (0 > pq_set_guc_locked(self,
1315                     "default_transaction_deferrable", srv_state_guc[deferrable],
1316                     &_save)) {
1317                 goto endlock;
1318             }
1319         }
1320     }
1321     else if (self->autocommit) {
1322         /* we are moving from autocommit to not autocommit, so revert the
1323          * characteristics to defaults to let BEGIN do its work */
1324         if (self->isolevel != ISOLATION_LEVEL_DEFAULT) {
1325             if (0 > pq_set_guc_locked(self,
1326                     "default_transaction_isolation", "default",
1327                     &_save)) {
1328                 goto endlock;
1329             }
1330         }
1331         if (self->readonly != STATE_DEFAULT) {
1332             if (0 > pq_set_guc_locked(self,
1333                     "default_transaction_read_only", "default",
1334                     &_save)) {
1335                 goto endlock;
1336             }
1337         }
1338         if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) {
1339             if (0 > pq_set_guc_locked(self,
1340                     "default_transaction_deferrable", "default",
1341                     &_save)) {
1342                 goto endlock;
1343             }
1344         }
1345     }
1346 
1347     if (autocommit != SRV_STATE_UNCHANGED) {
1348         self->autocommit = autocommit;
1349     }
1350     if (isolevel != SRV_STATE_UNCHANGED) {
1351         self->isolevel = isolevel;
1352     }
1353     if (readonly != SRV_STATE_UNCHANGED) {
1354         self->readonly = readonly;
1355     }
1356     if (deferrable != SRV_STATE_UNCHANGED) {
1357         self->deferrable = deferrable;
1358     }
1359     rv = 0;
1360 
1361 endlock:
1362     pthread_mutex_unlock(&self->lock);
1363     Py_END_ALLOW_THREADS;
1364 
1365     if (rv < 0) {
1366         pq_complete_error(self);
1367         goto exit;
1368     }
1369 
1370     Dprintf(
1371         "conn_set_session: autocommit %d, isolevel %d, readonly %d, deferrable %d",
1372         autocommit, isolevel, readonly, deferrable);
1373 
1374 
1375 exit:
1376     return rv;
1377 }
1378 
1379 
1380 /* conn_set_client_encoding - switch client encoding on connection */
1381 
1382 RAISES_NEG int
conn_set_client_encoding(connectionObject * self,const char * pgenc)1383 conn_set_client_encoding(connectionObject *self, const char *pgenc)
1384 {
1385     int res = -1;
1386     char *clean_enc = NULL;
1387 
1388     /* We must know what python encoding this encoding is. */
1389     if (0 > clear_encoding_name(pgenc, &clean_enc)) { goto exit; }
1390 
1391     /* If the current encoding is equal to the requested one we don't
1392        issue any query to the backend */
1393     if (strcmp(self->encoding, clean_enc) == 0) {
1394         res = 0;
1395         goto exit;
1396     }
1397 
1398     Py_BEGIN_ALLOW_THREADS;
1399     pthread_mutex_lock(&self->lock);
1400 
1401     /* abort the current transaction, to set the encoding ouside of
1402        transactions */
1403     if ((res = pq_abort_locked(self, &_save))) {
1404         goto endlock;
1405     }
1406 
1407     if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, &_save))) {
1408         goto endlock;
1409     }
1410 
1411 endlock:
1412     pthread_mutex_unlock(&self->lock);
1413     Py_END_ALLOW_THREADS;
1414 
1415     if (res < 0) {
1416         pq_complete_error(self);
1417         goto exit;
1418     }
1419 
1420     res = conn_store_encoding(self, pgenc);
1421 
1422     Dprintf("conn_set_client_encoding: encoding set to %s", self->encoding);
1423 
1424 exit:
1425     PyMem_Free(clean_enc);
1426 
1427     return res;
1428 }
1429 
1430 
1431 /* conn_tpc_begin -- begin a two-phase commit.
1432  *
1433  * The state of a connection in the middle of a TPC is exactly the same
1434  * of a normal transaction, in CONN_STATUS_BEGIN, but with the tpc_xid
1435  * member set to the xid used. This allows to reuse all the code paths used
1436  * in regular transactions, as PostgreSQL won't even know we are in a TPC
1437  * until PREPARE. */
1438 
1439 RAISES_NEG int
conn_tpc_begin(connectionObject * self,xidObject * xid)1440 conn_tpc_begin(connectionObject *self, xidObject *xid)
1441 {
1442     Dprintf("conn_tpc_begin: starting transaction");
1443 
1444     Py_BEGIN_ALLOW_THREADS;
1445     pthread_mutex_lock(&self->lock);
1446 
1447     if (pq_begin_locked(self, &_save) < 0) {
1448         pthread_mutex_unlock(&(self->lock));
1449         Py_BLOCK_THREADS;
1450         pq_complete_error(self);
1451         return -1;
1452     }
1453 
1454     pthread_mutex_unlock(&self->lock);
1455     Py_END_ALLOW_THREADS;
1456 
1457     /* The transaction started ok, let's store this xid. */
1458     Py_INCREF(xid);
1459     self->tpc_xid = xid;
1460 
1461     return 0;
1462 }
1463 
1464 
1465 /* conn_tpc_command -- run one of the TPC-related PostgreSQL commands.
1466  *
1467  * The function doesn't change the connection state as it can be used
1468  * for many commands and for recovered transactions. */
1469 
1470 RAISES_NEG int
conn_tpc_command(connectionObject * self,const char * cmd,xidObject * xid)1471 conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
1472 {
1473     PyObject *tid = NULL;
1474     const char *ctid;
1475     int rv = -1;
1476 
1477     Dprintf("conn_tpc_command: %s", cmd);
1478 
1479     /* convert the xid into PostgreSQL transaction id while keeping the GIL */
1480     if (!(tid = psyco_ensure_bytes(xid_get_tid(xid)))) { goto exit; }
1481     if (!(ctid = Bytes_AsString(tid))) { goto exit; }
1482 
1483     Py_BEGIN_ALLOW_THREADS;
1484     pthread_mutex_lock(&self->lock);
1485 
1486     if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid, &_save))) {
1487         pthread_mutex_unlock(&self->lock);
1488         Py_BLOCK_THREADS;
1489         pq_complete_error(self);
1490         goto exit;
1491     }
1492 
1493     pthread_mutex_unlock(&self->lock);
1494     Py_END_ALLOW_THREADS;
1495 
1496 exit:
1497     Py_XDECREF(tid);
1498     return rv;
1499 }
1500 
1501 /* conn_tpc_recover -- return a list of pending TPC Xid */
1502 
1503 PyObject *
conn_tpc_recover(connectionObject * self)1504 conn_tpc_recover(connectionObject *self)
1505 {
1506     int status;
1507     PyObject *xids = NULL;
1508     PyObject *rv = NULL;
1509     PyObject *tmp;
1510 
1511     /* store the status to restore it. */
1512     status = self->status;
1513 
1514     if (!(xids = xid_recover((PyObject *)self))) { goto exit; }
1515 
1516     if (status == CONN_STATUS_READY && self->status == CONN_STATUS_BEGIN) {
1517         /* recover began a transaction: let's abort it. */
1518         if (!(tmp = PyObject_CallMethod((PyObject *)self, "rollback", NULL))) {
1519             goto exit;
1520         }
1521         Py_DECREF(tmp);
1522     }
1523 
1524     /* all fine */
1525     rv = xids;
1526     xids = NULL;
1527 
1528 exit:
1529     Py_XDECREF(xids);
1530 
1531     return rv;
1532 
1533 }
1534 
1535 
1536 void
conn_set_result(connectionObject * self,PGresult * pgres)1537 conn_set_result(connectionObject *self, PGresult *pgres)
1538 {
1539     PQclear(self->pgres);
1540     self->pgres = pgres;
1541 }
1542 
1543 
1544 void
conn_set_error(connectionObject * self,const char * msg)1545 conn_set_error(connectionObject *self, const char *msg)
1546 {
1547     if (self->error) {
1548         free(self->error);
1549         self->error = NULL;
1550     }
1551     if (msg && *msg) {
1552         self->error = strdup(msg);
1553     }
1554 }
1555