1 /* connection_int.c - code used by the connection object
2 *
3 * Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org>
4 * Copyright (C) 2020-2021 The Psycopg Team
5 *
6 * This file is part of psycopg.
7 *
8 * psycopg2 is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU Lesser General Public License as published
10 * by the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * In addition, as a special exception, the copyright holders give
14 * permission to link this program with the OpenSSL library (or with
15 * modified versions of OpenSSL that use the same license as OpenSSL),
16 * and distribute linked combinations including the two.
17 *
18 * You must obey the GNU Lesser General Public License in all respects for
19 * all of the code used other than OpenSSL.
20 *
21 * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
22 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 * License for more details.
25 */
26
27 #define PSYCOPG_MODULE
28 #include "psycopg/psycopg.h"
29
30 #include "psycopg/connection.h"
31 #include "psycopg/cursor.h"
32 #include "psycopg/pqpath.h"
33 #include "psycopg/green.h"
34 #include "psycopg/notify.h"
35
36 #include <stdlib.h>
37 #include <string.h>
38
39 /* String indexes match the ISOLATION_LEVEL_* consts */
40 const char *srv_isolevels[] = {
41 NULL, /* autocommit */
42 "READ COMMITTED",
43 "REPEATABLE READ",
44 "SERIALIZABLE",
45 "READ UNCOMMITTED",
46 "default" /* only to set GUC, not for BEGIN */
47 };
48
49 /* Read only false, true */
50 const char *srv_readonly[] = {
51 " READ WRITE",
52 " READ ONLY",
53 "" /* default */
54 };
55
56 /* Deferrable false, true */
57 const char *srv_deferrable[] = {
58 " NOT DEFERRABLE",
59 " DEFERRABLE",
60 "" /* default */
61 };
62
63 /* On/Off/Default GUC states
64 */
65 const char *srv_state_guc[] = {
66 "off",
67 "on",
68 "default"
69 };
70
71
72 const int SRV_STATE_UNCHANGED = -1;
73
74
75 /* Return a new "string" from a char* from the database.
76 *
77 * On Py2 just get a string, on Py3 decode it in the connection codec.
78 *
79 * Use a fallback if the connection is NULL.
80 */
81 PyObject *
conn_text_from_chars(connectionObject * self,const char * str)82 conn_text_from_chars(connectionObject *self, const char *str)
83 {
84 return psyco_text_from_chars_safe(str, -1, self ? self->pydecoder : NULL);
85 }
86
87
88 /* Encode an unicode object into a bytes object in the connection encoding.
89 *
90 * If no connection or encoding is available, default to utf8
91 */
92 PyObject *
conn_encode(connectionObject * self,PyObject * u)93 conn_encode(connectionObject *self, PyObject *u)
94 {
95 PyObject *t = NULL;
96 PyObject *rv = NULL;
97
98 if (!(self && self->pyencoder)) {
99 rv = PyUnicode_AsUTF8String(u);
100 goto exit;
101 }
102
103 if (!(t = PyObject_CallFunctionObjArgs(self->pyencoder, u, NULL))) {
104 goto exit;
105 }
106
107 if (!(rv = PyTuple_GetItem(t, 0))) { goto exit; }
108 Py_INCREF(rv);
109
110 exit:
111 Py_XDECREF(t);
112
113 return rv;
114 }
115
116
117 /* decode a c string into a Python unicode in the connection encoding
118 *
119 * len can be < 0: in this case it will be calculated
120 *
121 * If no connection or encoding is available, default to utf8
122 */
123 PyObject *
conn_decode(connectionObject * self,const char * str,Py_ssize_t len)124 conn_decode(connectionObject *self, const char *str, Py_ssize_t len)
125 {
126 if (len < 0) { len = strlen(str); }
127
128 if (self) {
129 if (self->cdecoder) {
130 return self->cdecoder(str, len, NULL);
131 }
132 else if (self->pydecoder) {
133 PyObject *b = NULL;
134 PyObject *t = NULL;
135 PyObject *rv = NULL;
136
137 if (!(b = Bytes_FromStringAndSize(str, len))) { goto error; }
138 if (!(t = PyObject_CallFunctionObjArgs(self->pydecoder, b, NULL))) {
139 goto error;
140 }
141 if (!(rv = PyTuple_GetItem(t, 0))) { goto error; }
142 Py_INCREF(rv); /* PyTuple_GetItem gives a borrowes one */
143 error:
144 Py_XDECREF(t);
145 Py_XDECREF(b);
146 return rv;
147 }
148 else {
149 return PyUnicode_FromStringAndSize(str, len);
150 }
151 }
152 else {
153 return PyUnicode_FromStringAndSize(str, len);
154 }
155 }
156
157 /* conn_notice_callback - process notices */
158
159 static void
conn_notice_callback(void * args,const char * message)160 conn_notice_callback(void *args, const char *message)
161 {
162 struct connectionObject_notice *notice;
163 connectionObject *self = (connectionObject *)args;
164
165 Dprintf("conn_notice_callback: %s", message);
166
167 /* NOTE: if we get here and the connection is unlocked then there is a
168 problem but this should happen because the notice callback is only
169 called from libpq and when we're inside libpq the connection is usually
170 locked.
171 */
172 notice = (struct connectionObject_notice *)
173 malloc(sizeof(struct connectionObject_notice));
174 if (NULL == notice) {
175 /* Discard the notice in case of failed allocation. */
176 return;
177 }
178 notice->next = NULL;
179 notice->message = strdup(message);
180 if (NULL == notice->message) {
181 free(notice);
182 return;
183 }
184
185 if (NULL == self->last_notice) {
186 self->notice_pending = self->last_notice = notice;
187 }
188 else {
189 self->last_notice->next = notice;
190 self->last_notice = notice;
191 }
192 }
193
194 /* Expose the notices received as Python objects.
195 *
196 * The function should be called with the connection lock and the GIL.
197 */
198 void
conn_notice_process(connectionObject * self)199 conn_notice_process(connectionObject *self)
200 {
201 struct connectionObject_notice *notice;
202 PyObject *msg = NULL;
203 PyObject *tmp = NULL;
204 static PyObject *append;
205
206 if (NULL == self->notice_pending) {
207 return;
208 }
209
210 if (!append) {
211 if (!(append = Text_FromUTF8("append"))) {
212 goto error;
213 }
214 }
215
216 notice = self->notice_pending;
217 while (notice != NULL) {
218 Dprintf("conn_notice_process: %s", notice->message);
219
220 if (!(msg = conn_text_from_chars(self, notice->message))) { goto error; }
221
222 if (!(tmp = PyObject_CallMethodObjArgs(
223 self->notice_list, append, msg, NULL))) {
224
225 goto error;
226 }
227
228 Py_DECREF(tmp); tmp = NULL;
229 Py_DECREF(msg); msg = NULL;
230
231 notice = notice->next;
232 }
233
234 /* Remove the oldest item if the queue is getting too long. */
235 if (PyList_Check(self->notice_list)) {
236 Py_ssize_t nnotices;
237 nnotices = PyList_GET_SIZE(self->notice_list);
238 if (nnotices > CONN_NOTICES_LIMIT) {
239 if (-1 == PySequence_DelSlice(self->notice_list,
240 0, nnotices - CONN_NOTICES_LIMIT)) {
241 PyErr_Clear();
242 }
243 }
244 }
245
246 conn_notice_clean(self);
247 return;
248
249 error:
250 Py_XDECREF(tmp);
251 Py_XDECREF(msg);
252 conn_notice_clean(self);
253
254 /* TODO: the caller doesn't expects errors from us */
255 PyErr_Clear();
256 }
257
258 void
conn_notice_clean(connectionObject * self)259 conn_notice_clean(connectionObject *self)
260 {
261 struct connectionObject_notice *tmp, *notice;
262
263 notice = self->notice_pending;
264
265 while (notice != NULL) {
266 tmp = notice;
267 notice = notice->next;
268 free(tmp->message);
269 free(tmp);
270 }
271
272 self->last_notice = self->notice_pending = NULL;
273 }
274
275
276 /* conn_notifies_process - make received notification available
277 *
278 * The function should be called with the connection lock and holding the GIL.
279 */
280
281 void
conn_notifies_process(connectionObject * self)282 conn_notifies_process(connectionObject *self)
283 {
284 PGnotify *pgn = NULL;
285 PyObject *notify = NULL;
286 PyObject *pid = NULL, *channel = NULL, *payload = NULL;
287 PyObject *tmp = NULL;
288
289 static PyObject *append;
290
291 if (!append) {
292 if (!(append = Text_FromUTF8("append"))) {
293 goto error;
294 }
295 }
296
297 while ((pgn = PQnotifies(self->pgconn)) != NULL) {
298
299 Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
300 (int) pgn->be_pid, pgn->relname);
301
302 if (!(pid = PyInt_FromLong((long)pgn->be_pid))) { goto error; }
303 if (!(channel = conn_text_from_chars(self, pgn->relname))) { goto error; }
304 if (!(payload = conn_text_from_chars(self, pgn->extra))) { goto error; }
305
306 if (!(notify = PyObject_CallFunctionObjArgs((PyObject *)¬ifyType,
307 pid, channel, payload, NULL))) {
308 goto error;
309 }
310
311 Py_DECREF(pid); pid = NULL;
312 Py_DECREF(channel); channel = NULL;
313 Py_DECREF(payload); payload = NULL;
314
315 if (!(tmp = PyObject_CallMethodObjArgs(
316 self->notifies, append, notify, NULL))) {
317 goto error;
318 }
319 Py_DECREF(tmp); tmp = NULL;
320
321 Py_DECREF(notify); notify = NULL;
322 PQfreemem(pgn); pgn = NULL;
323 }
324 return; /* no error */
325
326 error:
327 if (pgn) { PQfreemem(pgn); }
328 Py_XDECREF(tmp);
329 Py_XDECREF(notify);
330 Py_XDECREF(pid);
331 Py_XDECREF(channel);
332 Py_XDECREF(payload);
333
334 /* TODO: callers currently don't expect an error from us */
335 PyErr_Clear();
336
337 }
338
339
340 /*
341 * the conn_get_* family of functions makes it easier to obtain the connection
342 * parameters from query results or by interrogating the connection itself
343 */
344
345 int
conn_get_standard_conforming_strings(PGconn * pgconn)346 conn_get_standard_conforming_strings(PGconn *pgconn)
347 {
348 int equote;
349 const char *scs;
350 /*
351 * The presence of the 'standard_conforming_strings' parameter
352 * means that the server _accepts_ the E'' quote.
353 *
354 * If the parameter is off, the PQescapeByteaConn returns
355 * backslash escaped strings (e.g. '\001' -> "\\001"),
356 * so the E'' quotes are required to avoid warnings
357 * if 'escape_string_warning' is set.
358 *
359 * If the parameter is on, the PQescapeByteaConn returns
360 * not escaped strings (e.g. '\001' -> "\001"), relying on the
361 * fact that the '\' will pass untouched the string parser.
362 * In this case the E'' quotes are NOT to be used.
363 */
364 scs = PQparameterStatus(pgconn, "standard_conforming_strings");
365 Dprintf("conn_connect: server standard_conforming_strings parameter: %s",
366 scs ? scs : "unavailable");
367
368 equote = (scs && (0 == strcmp("off", scs)));
369 Dprintf("conn_connect: server requires E'' quotes: %s",
370 equote ? "YES" : "NO");
371
372 return equote;
373 }
374
375
376 /* Remove irrelevant chars from encoding name and turn it uppercase.
377 *
378 * Return a buffer allocated on Python heap into 'clean' and return 0 on
379 * success, otherwise return -1 and set an exception.
380 */
381 RAISES_NEG static int
clear_encoding_name(const char * enc,char ** clean)382 clear_encoding_name(const char *enc, char **clean)
383 {
384 const char *i = enc;
385 char *j, *buf;
386 int rv = -1;
387
388 /* convert to upper case and remove '-' and '_' from string */
389 if (!(j = buf = PyMem_Malloc(strlen(enc) + 1))) {
390 PyErr_NoMemory();
391 goto exit;
392 }
393
394 while (*i) {
395 if (!isalnum(*i)) {
396 ++i;
397 }
398 else {
399 *j++ = toupper(*i++);
400 }
401 }
402 *j = '\0';
403
404 Dprintf("clear_encoding_name: %s -> %s", enc, buf);
405 *clean = buf;
406 rv = 0;
407
408 exit:
409 return rv;
410 }
411
412 /* set fast access functions according to the currently selected encoding
413 */
414 static void
conn_set_fast_codec(connectionObject * self)415 conn_set_fast_codec(connectionObject *self)
416 {
417 Dprintf("conn_set_fast_codec: encoding=%s", self->encoding);
418
419 if (0 == strcmp(self->encoding, "UTF8")) {
420 Dprintf("conn_set_fast_codec: PyUnicode_DecodeUTF8");
421 self->cdecoder = PyUnicode_DecodeUTF8;
422 return;
423 }
424
425 if (0 == strcmp(self->encoding, "LATIN1")) {
426 Dprintf("conn_set_fast_codec: PyUnicode_DecodeLatin1");
427 self->cdecoder = PyUnicode_DecodeLatin1;
428 return;
429 }
430
431 Dprintf("conn_set_fast_codec: no fast codec");
432 self->cdecoder = NULL;
433 }
434
435
436 /* Return the Python encoding from a PostgreSQL encoding.
437 *
438 * Optionally return the clean version of the postgres encoding too
439 */
440 PyObject *
conn_pgenc_to_pyenc(const char * encoding,char ** clean_encoding)441 conn_pgenc_to_pyenc(const char *encoding, char **clean_encoding)
442 {
443 char *pgenc = NULL;
444 PyObject *rv = NULL;
445
446 if (0 > clear_encoding_name(encoding, &pgenc)) { goto exit; }
447 if (!(rv = PyDict_GetItemString(psycoEncodings, pgenc))) {
448 PyErr_Format(OperationalError,
449 "no Python encoding for PostgreSQL encoding '%s'", pgenc);
450 goto exit;
451 }
452 Py_INCREF(rv);
453
454 if (clean_encoding) {
455 *clean_encoding = pgenc;
456 }
457 else {
458 PyMem_Free(pgenc);
459 }
460
461 exit:
462 return rv;
463 }
464
465 /* Convert a Postgres encoding into Python encoding and decoding functions.
466 *
467 * Set clean_encoding to a clean version of the Postgres encoding name
468 * and pyenc and pydec to python codec functions.
469 *
470 * Return 0 on success, else -1 and set an exception.
471 */
472 RAISES_NEG static int
conn_get_python_codec(const char * encoding,char ** clean_encoding,PyObject ** pyenc,PyObject ** pydec)473 conn_get_python_codec(const char *encoding,
474 char **clean_encoding, PyObject **pyenc, PyObject **pydec)
475 {
476 int rv = -1;
477 char *pgenc = NULL;
478 PyObject *encname = NULL;
479 PyObject *enc_tmp = NULL, *dec_tmp = NULL;
480
481 /* get the Python name of the encoding as a C string */
482 if (!(encname = conn_pgenc_to_pyenc(encoding, &pgenc))) { goto exit; }
483 if (!(encname = psyco_ensure_bytes(encname))) { goto exit; }
484
485 /* Look up the codec functions */
486 if (!(enc_tmp = PyCodec_Encoder(Bytes_AS_STRING(encname)))) { goto exit; }
487 if (!(dec_tmp = PyCodec_Decoder(Bytes_AS_STRING(encname)))) { goto exit; }
488
489 /* success */
490 *pyenc = enc_tmp; enc_tmp = NULL;
491 *pydec = dec_tmp; dec_tmp = NULL;
492 *clean_encoding = pgenc; pgenc = NULL;
493 rv = 0;
494
495 exit:
496 Py_XDECREF(enc_tmp);
497 Py_XDECREF(dec_tmp);
498 Py_XDECREF(encname);
499 PyMem_Free(pgenc);
500
501 return rv;
502 }
503
504
505 /* Store the encoding in the pgconn->encoding field and set the other related
506 * encoding fields in the connection structure.
507 *
508 * Return 0 on success, else -1 and set an exception.
509 */
510 RAISES_NEG static int
conn_store_encoding(connectionObject * self,const char * encoding)511 conn_store_encoding(connectionObject *self, const char *encoding)
512 {
513 int rv = -1;
514 char *pgenc = NULL;
515 PyObject *enc_tmp = NULL, *dec_tmp = NULL;
516
517 if (0 > conn_get_python_codec(encoding, &pgenc, &enc_tmp, &dec_tmp)) {
518 goto exit;
519 }
520
521 /* Good, success: store the encoding/codec in the connection. */
522 {
523 char *tmp = self->encoding;
524 self->encoding = pgenc;
525 PyMem_Free(tmp);
526 pgenc = NULL;
527 }
528
529 Py_CLEAR(self->pyencoder);
530 self->pyencoder = enc_tmp;
531 enc_tmp = NULL;
532
533 Py_CLEAR(self->pydecoder);
534 self->pydecoder = dec_tmp;
535 dec_tmp = NULL;
536
537 conn_set_fast_codec(self);
538
539 rv = 0;
540
541 exit:
542 Py_XDECREF(enc_tmp);
543 Py_XDECREF(dec_tmp);
544 PyMem_Free(pgenc);
545 return rv;
546 }
547
548
549 /* Read the client encoding from the backend and store it in the connection.
550 *
551 * Return 0 on success, else -1.
552 */
553 RAISES_NEG static int
conn_read_encoding(connectionObject * self,PGconn * pgconn)554 conn_read_encoding(connectionObject *self, PGconn *pgconn)
555 {
556 const char *encoding;
557 int rv = -1;
558
559 encoding = PQparameterStatus(pgconn, "client_encoding");
560 Dprintf("conn_connect: client encoding: %s", encoding ? encoding : "(none)");
561 if (!encoding) {
562 PyErr_SetString(OperationalError,
563 "server didn't return client encoding");
564 goto exit;
565 }
566
567 if (0 > conn_store_encoding(self, encoding)) {
568 goto exit;
569 }
570
571 rv = 0;
572
573 exit:
574 return rv;
575 }
576
577
578 int
conn_get_protocol_version(PGconn * pgconn)579 conn_get_protocol_version(PGconn *pgconn)
580 {
581 int ret;
582 ret = PQprotocolVersion(pgconn);
583 Dprintf("conn_connect: using protocol %d", ret);
584 return ret;
585 }
586
587 int
conn_get_server_version(PGconn * pgconn)588 conn_get_server_version(PGconn *pgconn)
589 {
590 return (int)PQserverVersion(pgconn);
591 }
592
593 /* set up the cancel key of the connection.
594 * On success return 0, else set an exception and return -1
595 */
596 RAISES_NEG static int
conn_setup_cancel(connectionObject * self,PGconn * pgconn)597 conn_setup_cancel(connectionObject *self, PGconn *pgconn)
598 {
599 if (self->cancel) {
600 PQfreeCancel(self->cancel);
601 }
602
603 if (!(self->cancel = PQgetCancel(self->pgconn))) {
604 PyErr_SetString(OperationalError, "can't get cancellation key");
605 return -1;
606 }
607
608 return 0;
609 }
610
611 /* Return 1 if the "replication" keyword is set in the DSN, 0 otherwise */
612 static int
dsn_has_replication(char * pgdsn)613 dsn_has_replication(char *pgdsn)
614 {
615 int ret = 0;
616 PQconninfoOption *connopts, *ptr;
617
618 connopts = PQconninfoParse(pgdsn, NULL);
619
620 for(ptr = connopts; ptr->keyword != NULL; ptr++) {
621 if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL)
622 ret = 1;
623 }
624
625 PQconninfoFree(connopts);
626
627 return ret;
628 }
629
630
631 /* Return 1 if the server datestyle allows us to work without problems,
632 0 if it needs to be set to something better, e.g. ISO. */
633 static int
conn_is_datestyle_ok(PGconn * pgconn)634 conn_is_datestyle_ok(PGconn *pgconn)
635 {
636 const char *ds;
637
638 ds = PQparameterStatus(pgconn, "DateStyle");
639 Dprintf("conn_connect: DateStyle %s", ds);
640
641 /* pgbouncer does not pass on DateStyle */
642 if (ds == NULL)
643 return 0;
644
645 /* Return true if ds starts with "ISO"
646 * e.g. "ISO, DMY" is fine, "German" not. */
647 return (ds[0] == 'I' && ds[1] == 'S' && ds[2] == 'O');
648 }
649
650
651 /* conn_setup - setup and read basic information about the connection */
652
653 RAISES_NEG int
conn_setup(connectionObject * self)654 conn_setup(connectionObject *self)
655 {
656 int rv = -1;
657
658 self->equote = conn_get_standard_conforming_strings(self->pgconn);
659 self->server_version = conn_get_server_version(self->pgconn);
660 self->protocol = conn_get_protocol_version(self->pgconn);
661 if (3 != self->protocol) {
662 PyErr_SetString(InterfaceError, "only protocol 3 supported");
663 goto exit;
664 }
665
666 if (0 > conn_read_encoding(self, self->pgconn)) {
667 goto exit;
668 }
669
670 if (0 > conn_setup_cancel(self, self->pgconn)) {
671 goto exit;
672 }
673
674 Py_BEGIN_ALLOW_THREADS;
675 pthread_mutex_lock(&self->lock);
676 Py_BLOCK_THREADS;
677
678 if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
679 int res;
680 Py_UNBLOCK_THREADS;
681 res = pq_set_guc_locked(self, "datestyle", "ISO", &_save);
682 Py_BLOCK_THREADS;
683 if (res < 0) {
684 pq_complete_error(self);
685 goto unlock;
686 }
687 }
688
689 /* for reset */
690 self->autocommit = 0;
691 self->isolevel = ISOLATION_LEVEL_DEFAULT;
692 self->readonly = STATE_DEFAULT;
693 self->deferrable = STATE_DEFAULT;
694
695 /* success */
696 rv = 0;
697
698 unlock:
699 Py_UNBLOCK_THREADS;
700 pthread_mutex_unlock(&self->lock);
701 Py_END_ALLOW_THREADS;
702
703 exit:
704 return rv;
705 }
706
707 /* conn_connect - execute a connection to the database */
708
709 static int
_conn_sync_connect(connectionObject * self,const char * dsn)710 _conn_sync_connect(connectionObject *self, const char *dsn)
711 {
712 int green;
713
714 /* store this value to prevent inconsistencies due to a change
715 * in the middle of the function. */
716 green = psyco_green();
717 if (!green) {
718 Py_BEGIN_ALLOW_THREADS;
719 self->pgconn = PQconnectdb(dsn);
720 Py_END_ALLOW_THREADS;
721 Dprintf("conn_connect: new PG connection at %p", self->pgconn);
722 }
723 else {
724 Py_BEGIN_ALLOW_THREADS;
725 self->pgconn = PQconnectStart(dsn);
726 Py_END_ALLOW_THREADS;
727 Dprintf("conn_connect: new green PG connection at %p", self->pgconn);
728 }
729
730 if (!self->pgconn)
731 {
732 Dprintf("conn_connect: PQconnectdb(%s) FAILED", dsn);
733 PyErr_SetString(OperationalError, "PQconnectdb() failed");
734 return -1;
735 }
736 else if (PQstatus(self->pgconn) == CONNECTION_BAD)
737 {
738 Dprintf("conn_connect: PQconnectdb(%s) returned BAD", dsn);
739 PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
740 return -1;
741 }
742
743 PQsetNoticeProcessor(self->pgconn, conn_notice_callback, (void*)self);
744
745 /* if the connection is green, wait to finish connection */
746 if (green) {
747 if (0 > pq_set_non_blocking(self, 1)) {
748 return -1;
749 }
750 if (0 != psyco_wait(self)) {
751 return -1;
752 }
753 }
754
755 /* From here the connection is considered ready: with the new status,
756 * poll() will use PQisBusy instead of PQconnectPoll.
757 */
758 self->status = CONN_STATUS_READY;
759
760 if (conn_setup(self) == -1) {
761 return -1;
762 }
763
764 return 0;
765 }
766
767 static int
_conn_async_connect(connectionObject * self,const char * dsn)768 _conn_async_connect(connectionObject *self, const char *dsn)
769 {
770 PGconn *pgconn;
771
772 self->pgconn = pgconn = PQconnectStart(dsn);
773
774 Dprintf("conn_connect: new postgresql connection at %p", pgconn);
775
776 if (pgconn == NULL)
777 {
778 Dprintf("conn_connect: PQconnectStart(%s) FAILED", dsn);
779 PyErr_SetString(OperationalError, "PQconnectStart() failed");
780 return -1;
781 }
782 else if (PQstatus(pgconn) == CONNECTION_BAD)
783 {
784 Dprintf("conn_connect: PQconnectdb(%s) returned BAD", dsn);
785 PyErr_SetString(OperationalError, PQerrorMessage(pgconn));
786 return -1;
787 }
788
789 PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
790
791 /* Set the connection to nonblocking now. */
792 if (pq_set_non_blocking(self, 1) != 0) {
793 return -1;
794 }
795
796 /* The connection will be completed banging on poll():
797 * First with _conn_poll_connecting() that will finish connection,
798 * then with _conn_poll_setup_async() that will do the same job
799 * of setup_async(). */
800
801 return 0;
802 }
803
804 int
conn_connect(connectionObject * self,const char * dsn,long int async)805 conn_connect(connectionObject *self, const char *dsn, long int async)
806 {
807 int rv;
808
809 if (async == 1) {
810 Dprintf("con_connect: connecting in ASYNC mode");
811 rv = _conn_async_connect(self, dsn);
812 }
813 else {
814 Dprintf("con_connect: connecting in SYNC mode");
815 rv = _conn_sync_connect(self, dsn);
816 }
817
818 if (rv != 0) {
819 /* connection failed, so let's close ourselves */
820 self->closed = 2;
821 }
822
823 return rv;
824 }
825
826
827 /* poll during a connection attempt until the connection has established. */
828
829 static int
_conn_poll_connecting(connectionObject * self)830 _conn_poll_connecting(connectionObject *self)
831 {
832 int res = PSYCO_POLL_ERROR;
833 const char *msg;
834
835 Dprintf("conn_poll: poll connecting");
836 switch (PQconnectPoll(self->pgconn)) {
837 case PGRES_POLLING_OK:
838 res = PSYCO_POLL_OK;
839 break;
840 case PGRES_POLLING_READING:
841 res = PSYCO_POLL_READ;
842 break;
843 case PGRES_POLLING_WRITING:
844 res = PSYCO_POLL_WRITE;
845 break;
846 case PGRES_POLLING_FAILED:
847 case PGRES_POLLING_ACTIVE:
848 msg = PQerrorMessage(self->pgconn);
849 if (!(msg && *msg)) {
850 msg = "asynchronous connection failed";
851 }
852 PyErr_SetString(OperationalError, msg);
853 res = PSYCO_POLL_ERROR;
854 break;
855 }
856
857 return res;
858 }
859
860
861 /* Advance to the next state after an attempt of flushing output */
862
863 static int
_conn_poll_advance_write(connectionObject * self)864 _conn_poll_advance_write(connectionObject *self)
865 {
866 int res;
867 int flush;
868
869 Dprintf("conn_poll: poll writing");
870
871 flush = PQflush(self->pgconn);
872 Dprintf("conn_poll: PQflush() = %i", flush);
873
874 switch (flush) {
875 case 0: /* success */
876 /* we've finished pushing the query to the server. Let's start
877 reading the results. */
878 Dprintf("conn_poll: async_status -> ASYNC_READ");
879 self->async_status = ASYNC_READ;
880 res = PSYCO_POLL_READ;
881 break;
882 case 1: /* would block */
883 res = PSYCO_POLL_WRITE;
884 break;
885 case -1: /* error */
886 PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
887 res = PSYCO_POLL_ERROR;
888 break;
889 default:
890 Dprintf("conn_poll: unexpected result from flush: %d", flush);
891 res = PSYCO_POLL_ERROR;
892 break;
893 }
894 return res;
895 }
896
897
898 /* Advance to the next state after reading results */
899
900 static int
_conn_poll_advance_read(connectionObject * self)901 _conn_poll_advance_read(connectionObject *self)
902 {
903 int res;
904 int busy;
905
906 Dprintf("conn_poll: poll reading");
907
908 busy = pq_get_result_async(self);
909
910 switch (busy) {
911 case 0: /* result is ready */
912 Dprintf("conn_poll: async_status -> ASYNC_DONE");
913 self->async_status = ASYNC_DONE;
914 res = PSYCO_POLL_OK;
915 break;
916 case 1: /* result not ready: fd would block */
917 res = PSYCO_POLL_READ;
918 break;
919 case -1: /* ouch, error */
920 res = PSYCO_POLL_ERROR;
921 break;
922 default:
923 Dprintf("conn_poll: unexpected result from pq_get_result_async: %d",
924 busy);
925 res = PSYCO_POLL_ERROR;
926 break;
927 }
928 return res;
929 }
930
931
932 /* Poll the connection for the send query/retrieve result phase
933
934 Advance the async_status (usually going WRITE -> READ -> DONE) but don't
935 mess with the connection status. */
936
937 static int
_conn_poll_query(connectionObject * self)938 _conn_poll_query(connectionObject *self)
939 {
940 int res = PSYCO_POLL_ERROR;
941
942 switch (self->async_status) {
943 case ASYNC_WRITE:
944 Dprintf("conn_poll: async_status = ASYNC_WRITE");
945 res = _conn_poll_advance_write(self);
946 break;
947
948 case ASYNC_READ:
949 Dprintf("conn_poll: async_status = ASYNC_READ");
950 res = _conn_poll_advance_read(self);
951 break;
952
953 case ASYNC_DONE:
954 Dprintf("conn_poll: async_status = ASYNC_DONE");
955 /* We haven't asked anything: just check for notifications. */
956 res = _conn_poll_advance_read(self);
957 break;
958
959 default:
960 Dprintf("conn_poll: in unexpected async status: %d",
961 self->async_status);
962 res = PSYCO_POLL_ERROR;
963 break;
964 }
965
966 return res;
967 }
968
969 /* Advance to the next state during an async connection setup
970 *
971 * If the connection is green, this is performed by the regular
972 * sync code so the queries are sent by conn_setup() while in
973 * CONN_STATUS_READY state.
974 */
975 static int
_conn_poll_setup_async(connectionObject * self)976 _conn_poll_setup_async(connectionObject *self)
977 {
978 int res = PSYCO_POLL_ERROR;
979
980 switch (self->status) {
981 case CONN_STATUS_CONNECTING:
982 self->equote = conn_get_standard_conforming_strings(self->pgconn);
983 self->protocol = conn_get_protocol_version(self->pgconn);
984 self->server_version = conn_get_server_version(self->pgconn);
985 if (3 != self->protocol) {
986 PyErr_SetString(InterfaceError, "only protocol 3 supported");
987 break;
988 }
989 if (0 > conn_read_encoding(self, self->pgconn)) {
990 break;
991 }
992 if (0 > conn_setup_cancel(self, self->pgconn)) {
993 return -1;
994 }
995
996 /* asynchronous connections always use isolation level 0, the user is
997 * expected to manage the transactions himself, by sending
998 * (asynchronously) BEGIN and COMMIT statements.
999 */
1000 self->autocommit = 1;
1001
1002 /* If the datestyle is ISO or anything else good,
1003 * we can skip the CONN_STATUS_DATESTYLE step.
1004 * Note that we cannot change the datestyle on a replication
1005 * connection.
1006 */
1007 if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
1008 Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
1009 self->status = CONN_STATUS_DATESTYLE;
1010 if (0 == pq_send_query(self, psyco_datestyle)) {
1011 PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
1012 break;
1013 }
1014 Dprintf("conn_poll: async_status -> ASYNC_WRITE");
1015 self->async_status = ASYNC_WRITE;
1016 res = PSYCO_POLL_WRITE;
1017 }
1018 else {
1019 Dprintf("conn_poll: status -> CONN_STATUS_READY");
1020 self->status = CONN_STATUS_READY;
1021 res = PSYCO_POLL_OK;
1022 }
1023 break;
1024
1025 case CONN_STATUS_DATESTYLE:
1026 res = _conn_poll_query(self);
1027 if (res == PSYCO_POLL_OK) {
1028 res = PSYCO_POLL_ERROR;
1029 if (self->pgres == NULL
1030 || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) {
1031 PyErr_SetString(OperationalError, "can't set datestyle to ISO");
1032 break;
1033 }
1034 CLEARPGRES(self->pgres);
1035
1036 Dprintf("conn_poll: status -> CONN_STATUS_READY");
1037 self->status = CONN_STATUS_READY;
1038 res = PSYCO_POLL_OK;
1039 }
1040 break;
1041 }
1042 return res;
1043 }
1044
1045
1046 static cursorObject *
_conn_get_async_cursor(connectionObject * self)1047 _conn_get_async_cursor(connectionObject *self) {
1048 PyObject *py_curs;
1049
1050 if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) {
1051 PyErr_SetString(PyExc_SystemError,
1052 "got null dereferencing cursor weakref");
1053 goto error;
1054 }
1055 if (Py_None == py_curs) {
1056 PyErr_SetString(InterfaceError,
1057 "the asynchronous cursor has disappeared");
1058 goto error;
1059 }
1060
1061 Py_INCREF(py_curs);
1062 return (cursorObject *)py_curs;
1063
1064 error:
1065 pq_clear_async(self);
1066 return NULL;
1067 }
1068
1069 /* conn_poll - Main polling switch
1070 *
1071 * The function is called in all the states and connection types and invokes
1072 * the right "next step".
1073 */
1074
1075 int
conn_poll(connectionObject * self)1076 conn_poll(connectionObject *self)
1077 {
1078 int res = PSYCO_POLL_ERROR;
1079 Dprintf("conn_poll: status = %d", self->status);
1080
1081 switch (self->status) {
1082 case CONN_STATUS_SETUP:
1083 Dprintf("conn_poll: status -> CONN_STATUS_SETUP");
1084 self->status = CONN_STATUS_CONNECTING;
1085 res = PSYCO_POLL_WRITE;
1086 break;
1087
1088 case CONN_STATUS_CONNECTING:
1089 Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
1090 res = _conn_poll_connecting(self);
1091 if (res == PSYCO_POLL_OK && self->async) {
1092 res = _conn_poll_setup_async(self);
1093 }
1094 break;
1095
1096 case CONN_STATUS_DATESTYLE:
1097 Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
1098 res = _conn_poll_setup_async(self);
1099 break;
1100
1101 case CONN_STATUS_READY:
1102 case CONN_STATUS_BEGIN:
1103 case CONN_STATUS_PREPARED:
1104 Dprintf("conn_poll: status -> CONN_STATUS_*");
1105 res = _conn_poll_query(self);
1106
1107 if (res == PSYCO_POLL_OK && self->async && self->async_cursor) {
1108 cursorObject *curs;
1109
1110 /* An async query has just finished: parse the tuple in the
1111 * target cursor. */
1112 if (!(curs = _conn_get_async_cursor(self))) {
1113 res = PSYCO_POLL_ERROR;
1114 break;
1115 }
1116
1117 curs_set_result(curs, self->pgres);
1118 self->pgres = NULL;
1119
1120 /* fetch the tuples (if there are any) and build the result. We
1121 * don't care if pq_fetch return 0 or 1, but if there was an error,
1122 * we want to signal it to the caller. */
1123 if (pq_fetch(curs, 0) == -1) {
1124 res = PSYCO_POLL_ERROR;
1125 }
1126
1127 /* We have finished with our async_cursor */
1128 Py_DECREF(curs);
1129 Py_CLEAR(self->async_cursor);
1130 }
1131 break;
1132
1133 default:
1134 Dprintf("conn_poll: in unexpected state");
1135 res = PSYCO_POLL_ERROR;
1136 }
1137
1138 Dprintf("conn_poll: returning %d", res);
1139 return res;
1140 }
1141
1142 /* conn_close - do anything needed to shut down the connection */
1143
1144 void
conn_close(connectionObject * self)1145 conn_close(connectionObject *self)
1146 {
1147 /* a connection with closed == 2 still requires cleanup */
1148 if (self->closed == 1) {
1149 return;
1150 }
1151
1152 /* sets this connection as closed even for other threads; */
1153 Py_BEGIN_ALLOW_THREADS;
1154 pthread_mutex_lock(&self->lock);
1155
1156 conn_close_locked(self);
1157
1158 pthread_mutex_unlock(&self->lock);
1159 Py_END_ALLOW_THREADS;
1160 }
1161
1162
1163 /* Return a copy of the 'dsn' string with the password scrubbed.
1164 *
1165 * The string returned is allocated on the Python heap.
1166 *
1167 * In case of error return NULL and raise an exception.
1168 */
1169 char *
conn_obscure_password(const char * dsn)1170 conn_obscure_password(const char *dsn)
1171 {
1172 PQconninfoOption *options = NULL;
1173 PyObject *d = NULL, *v = NULL, *pydsn = NULL;
1174 char *rv = NULL;
1175
1176 if (!dsn) {
1177 PyErr_SetString(InternalError, "unexpected null string");
1178 goto exit;
1179 }
1180
1181 if (!(options = PQconninfoParse(dsn, NULL))) {
1182 /* unlikely: the dsn was already tested valid */
1183 PyErr_SetString(InternalError, "the connection string is not valid");
1184 goto exit;
1185 }
1186
1187 if (!(d = psyco_dict_from_conninfo_options(
1188 options, /* include_password = */ 1))) {
1189 goto exit;
1190 }
1191 if (NULL == PyDict_GetItemString(d, "password")) {
1192 /* the dsn doesn't have a password */
1193 psyco_strdup(&rv, dsn, -1);
1194 goto exit;
1195 }
1196
1197 /* scrub the password and put back the connection string together */
1198 if (!(v = Text_FromUTF8("xxx"))) { goto exit; }
1199 if (0 > PyDict_SetItemString(d, "password", v)) { goto exit; }
1200 if (!(pydsn = psyco_make_dsn(Py_None, d))) { goto exit; }
1201 if (!(pydsn = psyco_ensure_bytes(pydsn))) { goto exit; }
1202
1203 /* Return the connection string with the password replaced */
1204 psyco_strdup(&rv, Bytes_AS_STRING(pydsn), -1);
1205
1206 exit:
1207 PQconninfoFree(options);
1208 Py_XDECREF(v);
1209 Py_XDECREF(d);
1210 Py_XDECREF(pydsn);
1211
1212 return rv;
1213 }
1214
1215
1216 /* conn_close_locked - shut down the connection with the lock already taken */
1217
conn_close_locked(connectionObject * self)1218 void conn_close_locked(connectionObject *self)
1219 {
1220 if (self->closed == 1) {
1221 return;
1222 }
1223
1224 /* We used to call pq_abort_locked here, but the idea of issuing a
1225 * rollback on close/GC has been considered inappropriate.
1226 *
1227 * Dropping the connection on the server has the same effect as the
1228 * transaction is automatically rolled back. Some middleware, such as
1229 * PgBouncer, have problem with connections closed in the middle of the
1230 * transaction though: to avoid these problems the transaction should be
1231 * closed only in status CONN_STATUS_READY.
1232 */
1233 self->closed = 1;
1234
1235 /* we need to check the value of pgconn, because we get called even when
1236 * the connection fails! */
1237 if (self->pgconn) {
1238 PQfinish(self->pgconn);
1239 self->pgconn = NULL;
1240 Dprintf("conn_close: PQfinish called");
1241 }
1242 }
1243
1244 /* conn_commit - commit on a connection */
1245
1246 RAISES_NEG int
conn_commit(connectionObject * self)1247 conn_commit(connectionObject *self)
1248 {
1249 int res;
1250
1251 res = pq_commit(self);
1252 return res;
1253 }
1254
1255 /* conn_rollback - rollback a connection */
1256
1257 RAISES_NEG int
conn_rollback(connectionObject * self)1258 conn_rollback(connectionObject *self)
1259 {
1260 int res;
1261
1262 res = pq_abort(self);
1263 return res;
1264 }
1265
1266
1267 /* Change the state of the session */
1268 RAISES_NEG int
conn_set_session(connectionObject * self,int autocommit,int isolevel,int readonly,int deferrable)1269 conn_set_session(connectionObject *self, int autocommit,
1270 int isolevel, int readonly, int deferrable)
1271 {
1272 int rv = -1;
1273 int want_autocommit = autocommit == SRV_STATE_UNCHANGED ?
1274 self->autocommit : autocommit;
1275
1276 if (deferrable != SRV_STATE_UNCHANGED && self->server_version < 90100) {
1277 PyErr_SetString(ProgrammingError,
1278 "the 'deferrable' setting is only available"
1279 " from PostgreSQL 9.1");
1280 goto exit;
1281 }
1282
1283 /* Promote an isolation level to one of the levels supported by the server */
1284 if (self->server_version < 80000) {
1285 if (isolevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1286 isolevel = ISOLATION_LEVEL_READ_COMMITTED;
1287 }
1288 else if (isolevel == ISOLATION_LEVEL_REPEATABLE_READ) {
1289 isolevel = ISOLATION_LEVEL_SERIALIZABLE;
1290 }
1291 }
1292
1293 Py_BEGIN_ALLOW_THREADS;
1294 pthread_mutex_lock(&self->lock);
1295
1296 if (want_autocommit) {
1297 /* we are or are going in autocommit state, so no BEGIN will be issued:
1298 * configure the session with the characteristics requested */
1299 if (isolevel != SRV_STATE_UNCHANGED) {
1300 if (0 > pq_set_guc_locked(self,
1301 "default_transaction_isolation", srv_isolevels[isolevel],
1302 &_save)) {
1303 goto endlock;
1304 }
1305 }
1306 if (readonly != SRV_STATE_UNCHANGED) {
1307 if (0 > pq_set_guc_locked(self,
1308 "default_transaction_read_only", srv_state_guc[readonly],
1309 &_save)) {
1310 goto endlock;
1311 }
1312 }
1313 if (deferrable != SRV_STATE_UNCHANGED) {
1314 if (0 > pq_set_guc_locked(self,
1315 "default_transaction_deferrable", srv_state_guc[deferrable],
1316 &_save)) {
1317 goto endlock;
1318 }
1319 }
1320 }
1321 else if (self->autocommit) {
1322 /* we are moving from autocommit to not autocommit, so revert the
1323 * characteristics to defaults to let BEGIN do its work */
1324 if (self->isolevel != ISOLATION_LEVEL_DEFAULT) {
1325 if (0 > pq_set_guc_locked(self,
1326 "default_transaction_isolation", "default",
1327 &_save)) {
1328 goto endlock;
1329 }
1330 }
1331 if (self->readonly != STATE_DEFAULT) {
1332 if (0 > pq_set_guc_locked(self,
1333 "default_transaction_read_only", "default",
1334 &_save)) {
1335 goto endlock;
1336 }
1337 }
1338 if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) {
1339 if (0 > pq_set_guc_locked(self,
1340 "default_transaction_deferrable", "default",
1341 &_save)) {
1342 goto endlock;
1343 }
1344 }
1345 }
1346
1347 if (autocommit != SRV_STATE_UNCHANGED) {
1348 self->autocommit = autocommit;
1349 }
1350 if (isolevel != SRV_STATE_UNCHANGED) {
1351 self->isolevel = isolevel;
1352 }
1353 if (readonly != SRV_STATE_UNCHANGED) {
1354 self->readonly = readonly;
1355 }
1356 if (deferrable != SRV_STATE_UNCHANGED) {
1357 self->deferrable = deferrable;
1358 }
1359 rv = 0;
1360
1361 endlock:
1362 pthread_mutex_unlock(&self->lock);
1363 Py_END_ALLOW_THREADS;
1364
1365 if (rv < 0) {
1366 pq_complete_error(self);
1367 goto exit;
1368 }
1369
1370 Dprintf(
1371 "conn_set_session: autocommit %d, isolevel %d, readonly %d, deferrable %d",
1372 autocommit, isolevel, readonly, deferrable);
1373
1374
1375 exit:
1376 return rv;
1377 }
1378
1379
1380 /* conn_set_client_encoding - switch client encoding on connection */
1381
1382 RAISES_NEG int
conn_set_client_encoding(connectionObject * self,const char * pgenc)1383 conn_set_client_encoding(connectionObject *self, const char *pgenc)
1384 {
1385 int res = -1;
1386 char *clean_enc = NULL;
1387
1388 /* We must know what python encoding this encoding is. */
1389 if (0 > clear_encoding_name(pgenc, &clean_enc)) { goto exit; }
1390
1391 /* If the current encoding is equal to the requested one we don't
1392 issue any query to the backend */
1393 if (strcmp(self->encoding, clean_enc) == 0) {
1394 res = 0;
1395 goto exit;
1396 }
1397
1398 Py_BEGIN_ALLOW_THREADS;
1399 pthread_mutex_lock(&self->lock);
1400
1401 /* abort the current transaction, to set the encoding ouside of
1402 transactions */
1403 if ((res = pq_abort_locked(self, &_save))) {
1404 goto endlock;
1405 }
1406
1407 if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, &_save))) {
1408 goto endlock;
1409 }
1410
1411 endlock:
1412 pthread_mutex_unlock(&self->lock);
1413 Py_END_ALLOW_THREADS;
1414
1415 if (res < 0) {
1416 pq_complete_error(self);
1417 goto exit;
1418 }
1419
1420 res = conn_store_encoding(self, pgenc);
1421
1422 Dprintf("conn_set_client_encoding: encoding set to %s", self->encoding);
1423
1424 exit:
1425 PyMem_Free(clean_enc);
1426
1427 return res;
1428 }
1429
1430
1431 /* conn_tpc_begin -- begin a two-phase commit.
1432 *
1433 * The state of a connection in the middle of a TPC is exactly the same
1434 * of a normal transaction, in CONN_STATUS_BEGIN, but with the tpc_xid
1435 * member set to the xid used. This allows to reuse all the code paths used
1436 * in regular transactions, as PostgreSQL won't even know we are in a TPC
1437 * until PREPARE. */
1438
1439 RAISES_NEG int
conn_tpc_begin(connectionObject * self,xidObject * xid)1440 conn_tpc_begin(connectionObject *self, xidObject *xid)
1441 {
1442 Dprintf("conn_tpc_begin: starting transaction");
1443
1444 Py_BEGIN_ALLOW_THREADS;
1445 pthread_mutex_lock(&self->lock);
1446
1447 if (pq_begin_locked(self, &_save) < 0) {
1448 pthread_mutex_unlock(&(self->lock));
1449 Py_BLOCK_THREADS;
1450 pq_complete_error(self);
1451 return -1;
1452 }
1453
1454 pthread_mutex_unlock(&self->lock);
1455 Py_END_ALLOW_THREADS;
1456
1457 /* The transaction started ok, let's store this xid. */
1458 Py_INCREF(xid);
1459 self->tpc_xid = xid;
1460
1461 return 0;
1462 }
1463
1464
1465 /* conn_tpc_command -- run one of the TPC-related PostgreSQL commands.
1466 *
1467 * The function doesn't change the connection state as it can be used
1468 * for many commands and for recovered transactions. */
1469
1470 RAISES_NEG int
conn_tpc_command(connectionObject * self,const char * cmd,xidObject * xid)1471 conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
1472 {
1473 PyObject *tid = NULL;
1474 const char *ctid;
1475 int rv = -1;
1476
1477 Dprintf("conn_tpc_command: %s", cmd);
1478
1479 /* convert the xid into PostgreSQL transaction id while keeping the GIL */
1480 if (!(tid = psyco_ensure_bytes(xid_get_tid(xid)))) { goto exit; }
1481 if (!(ctid = Bytes_AsString(tid))) { goto exit; }
1482
1483 Py_BEGIN_ALLOW_THREADS;
1484 pthread_mutex_lock(&self->lock);
1485
1486 if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid, &_save))) {
1487 pthread_mutex_unlock(&self->lock);
1488 Py_BLOCK_THREADS;
1489 pq_complete_error(self);
1490 goto exit;
1491 }
1492
1493 pthread_mutex_unlock(&self->lock);
1494 Py_END_ALLOW_THREADS;
1495
1496 exit:
1497 Py_XDECREF(tid);
1498 return rv;
1499 }
1500
1501 /* conn_tpc_recover -- return a list of pending TPC Xid */
1502
1503 PyObject *
conn_tpc_recover(connectionObject * self)1504 conn_tpc_recover(connectionObject *self)
1505 {
1506 int status;
1507 PyObject *xids = NULL;
1508 PyObject *rv = NULL;
1509 PyObject *tmp;
1510
1511 /* store the status to restore it. */
1512 status = self->status;
1513
1514 if (!(xids = xid_recover((PyObject *)self))) { goto exit; }
1515
1516 if (status == CONN_STATUS_READY && self->status == CONN_STATUS_BEGIN) {
1517 /* recover began a transaction: let's abort it. */
1518 if (!(tmp = PyObject_CallMethod((PyObject *)self, "rollback", NULL))) {
1519 goto exit;
1520 }
1521 Py_DECREF(tmp);
1522 }
1523
1524 /* all fine */
1525 rv = xids;
1526 xids = NULL;
1527
1528 exit:
1529 Py_XDECREF(xids);
1530
1531 return rv;
1532
1533 }
1534
1535
1536 void
conn_set_result(connectionObject * self,PGresult * pgres)1537 conn_set_result(connectionObject *self, PGresult *pgres)
1538 {
1539 PQclear(self->pgres);
1540 self->pgres = pgres;
1541 }
1542
1543
1544 void
conn_set_error(connectionObject * self,const char * msg)1545 conn_set_error(connectionObject *self, const char *msg)
1546 {
1547 if (self->error) {
1548 free(self->error);
1549 self->error = NULL;
1550 }
1551 if (msg && *msg) {
1552 self->error = strdup(msg);
1553 }
1554 }
1555