1 /*
2  * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
3  * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com>
4  * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20  * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef DDEBUG
30 #define DDEBUG 0
31 #endif
32 
33 #include <nginx.h>
34 #include "ngx_postgres_ddebug.h"
35 #include "ngx_postgres_module.h"
36 #include "ngx_postgres_keepalive.h"
37 #include "ngx_postgres_processor.h"
38 
39 
40 ngx_int_t
ngx_postgres_upstream_init(ngx_conf_t * cf,ngx_http_upstream_srv_conf_t * uscf)41 ngx_postgres_upstream_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *uscf)
42 {
43     ngx_postgres_upstream_srv_conf_t  *pgscf;
44     ngx_postgres_upstream_server_t    *server;
45     ngx_postgres_upstream_peers_t     *peers;
46     ngx_uint_t                         i, j, n;
47 
48     dd("entering");
49 
50     uscf->peer.init = ngx_postgres_upstream_init_peer;
51 
52     pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
53 
54     if (pgscf->servers == NULL || pgscf->servers->nelts == 0) {
55         ngx_log_error(NGX_LOG_ERR, cf->log, 0,
56                       "postgres: no \"postgres_server\" defined"
57                       " in upstream \"%V\" in %s:%ui",
58                       &uscf->host, uscf->file_name, uscf->line);
59 
60         dd("returning NGX_ERROR");
61         return NGX_ERROR;
62     }
63 
64     /* pgscf->servers != NULL */
65 
66     server = uscf->servers->elts;
67 
68     n = 0;
69 
70     for (i = 0; i < uscf->servers->nelts; i++) {
71         n += server[i].naddrs;
72     }
73 
74     peers = ngx_pcalloc(cf->pool, sizeof(ngx_postgres_upstream_peers_t)
75             + sizeof(ngx_postgres_upstream_peer_t) * (n - 1));
76 
77     if (peers == NULL) {
78         dd("returning NGX_ERROR");
79         return NGX_ERROR;
80     }
81 
82     peers->single = (n == 1);
83     peers->number = n;
84     peers->name = &uscf->host;
85 
86     n = 0;
87 
88     for (i = 0; i < uscf->servers->nelts; i++) {
89         for (j = 0; j < server[i].naddrs; j++) {
90             peers->peer[n].sockaddr = server[i].addrs[j].sockaddr;
91             peers->peer[n].socklen = server[i].addrs[j].socklen;
92             peers->peer[n].name = server[i].addrs[j].name;
93             peers->peer[n].port = server[i].port;
94             peers->peer[n].family = server[i].family;
95             peers->peer[n].dbname = server[i].dbname;
96             peers->peer[n].user = server[i].user;
97             peers->peer[n].password = server[i].password;
98 
99             peers->peer[n].host.data = ngx_pnalloc(cf->pool,
100                                                    NGX_SOCKADDR_STRLEN);
101             if (peers->peer[n].host.data == NULL) {
102                 dd("returning NGX_ERROR");
103                 return NGX_ERROR;
104             }
105 
106             peers->peer[n].host.len = ngx_sock_ntop(peers->peer[n].sockaddr,
107 #if defined(nginx_version) && (nginx_version >= 1005003)
108                                           peers->peer[n].socklen,
109 #endif
110                                           peers->peer[n].host.data,
111                                           NGX_SOCKADDR_STRLEN, 0);
112             if (peers->peer[n].host.len == 0) {
113                 dd("returning NGX_ERROR");
114                 return NGX_ERROR;
115             }
116 
117             n++;
118         }
119     }
120 
121     pgscf->peers = peers;
122     pgscf->active_conns = 0;
123 
124     if (pgscf->max_cached) {
125         dd("returning");
126         return ngx_postgres_keepalive_init(cf->pool, pgscf);
127     }
128 
129     dd("returning NGX_OK");
130     return NGX_OK;
131 }
132 
133 ngx_int_t
ngx_postgres_upstream_init_peer(ngx_http_request_t * r,ngx_http_upstream_srv_conf_t * uscf)134 ngx_postgres_upstream_init_peer(ngx_http_request_t *r,
135     ngx_http_upstream_srv_conf_t *uscf)
136 {
137     ngx_postgres_upstream_peer_data_t  *pgdt;
138     ngx_postgres_upstream_srv_conf_t   *pgscf;
139     ngx_postgres_loc_conf_t            *pglcf;
140     ngx_postgres_ctx_t                 *pgctx;
141     ngx_http_core_loc_conf_t           *clcf;
142     ngx_http_upstream_t                *u;
143     ngx_postgres_mixed_t               *query;
144     ngx_str_t                           sql;
145     ngx_uint_t                          i;
146 
147     dd("entering");
148 
149     pgdt = ngx_pcalloc(r->pool, sizeof(ngx_postgres_upstream_peer_data_t));
150     if (pgdt == NULL) {
151         goto failed;
152     }
153 
154     u = r->upstream;
155 
156     pgdt->upstream = u;
157     pgdt->request = r;
158 
159     pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
160     pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
161     pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);
162 
163     pgdt->srv_conf = pgscf;
164     pgdt->loc_conf = pglcf;
165 
166     u->peer.data = pgdt;
167     u->peer.get = ngx_postgres_upstream_get_peer;
168     u->peer.free = ngx_postgres_upstream_free_peer;
169 
170     if (pglcf->query.methods_set & r->method) {
171         /* method-specific query */
172         dd("using method-specific query");
173 
174         query = pglcf->query.methods->elts;
175         for (i = 0; i < pglcf->query.methods->nelts; i++) {
176             if (query[i].key & r->method) {
177                 query = &query[i];
178                 break;
179             }
180         }
181 
182         if (i == pglcf->query.methods->nelts) {
183             goto failed;
184         }
185     } else {
186         /* default query */
187         dd("using default query");
188 
189         query = pglcf->query.def;
190     }
191 
192     if (query->cv) {
193         /* complex value */
194         dd("using complex value");
195 
196         if (ngx_http_complex_value(r, query->cv, &sql) != NGX_OK) {
197             goto failed;
198         }
199 
200         if (sql.len == 0) {
201             clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
202 
203             ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
204                           "postgres: empty \"postgres_query\" (was: \"%V\")"
205                           " in location \"%V\"", &query->cv->value,
206                           &clcf->name);
207 
208             goto failed;
209         }
210 
211         pgdt->query = sql;
212     } else {
213         /* simple value */
214         dd("using simple value");
215 
216         pgdt->query = query->sv;
217     }
218 
219     /* set $postgres_query */
220     pgctx->var_query = pgdt->query;
221 
222     dd("returning NGX_OK");
223     return NGX_OK;
224 
225 failed:
226 
227 #if defined(nginx_version) && (nginx_version >= 8017)
228     dd("returning NGX_ERROR");
229     return NGX_ERROR;
230 #else
231     r->upstream->peer.data = NULL;
232 
233     dd("returning NGX_OK (NGX_ERROR)");
234     return NGX_OK;
235 #endif
236 }
237 
238 ngx_int_t
ngx_postgres_upstream_get_peer(ngx_peer_connection_t * pc,void * data)239 ngx_postgres_upstream_get_peer(ngx_peer_connection_t *pc, void *data)
240 {
241     ngx_postgres_upstream_peer_data_t  *pgdt = data;
242     ngx_postgres_upstream_srv_conf_t   *pgscf;
243 #if defined(nginx_version) && (nginx_version < 8017)
244     ngx_postgres_ctx_t                 *pgctx;
245 #endif
246     ngx_postgres_upstream_peers_t      *peers;
247     ngx_postgres_upstream_peer_t       *peer;
248     ngx_connection_t                   *pgxc = NULL;
249     int                                 fd;
250     ngx_event_t                        *rev, *wev;
251     ngx_int_t                           rc;
252     u_char                             *connstring, *last;
253     size_t                              len;
254 
255     dd("entering");
256 
257 #if defined(nginx_version) && (nginx_version < 8017)
258     if (data == NULL) {
259         goto failed;
260     }
261 
262     pgctx = ngx_http_get_module_ctx(pgdt->request, ngx_postgres_module);
263 #endif
264 
265     pgscf = pgdt->srv_conf;
266 
267     pgdt->failed = 0;
268 
269     if (pgscf->max_cached && pgscf->single) {
270         rc = ngx_postgres_keepalive_get_peer_single(pc, pgdt, pgscf);
271         if (rc != NGX_DECLINED) {
272             /* re-use keepalive peer */
273             dd("re-using keepalive peer (single)");
274 
275             pgdt->state = state_db_send_query;
276 
277             ngx_postgres_process_events(pgdt->request);
278 
279             dd("returning NGX_AGAIN");
280             return NGX_AGAIN;
281         }
282     }
283 
284     peers = pgscf->peers;
285 
286     if (pgscf->current > peers->number - 1) {
287         pgscf->current = 0;
288     }
289 
290     peer = &peers->peer[pgscf->current++];
291 
292     pgdt->name.len = peer->name.len;
293     pgdt->name.data = peer->name.data;
294 
295     pgdt->sockaddr = *peer->sockaddr;
296 
297     pc->name = &pgdt->name;
298     pc->sockaddr = &pgdt->sockaddr;
299     pc->socklen = peer->socklen;
300     pc->cached = 0;
301 
302     if ((pgscf->max_cached) && (!pgscf->single)) {
303         rc = ngx_postgres_keepalive_get_peer_multi(pc, pgdt, pgscf);
304         if (rc != NGX_DECLINED) {
305             /* re-use keepalive peer */
306             dd("re-using keepalive peer (multi)");
307 
308             pgdt->state = state_db_send_query;
309 
310             ngx_postgres_process_events(pgdt->request);
311 
312             dd("returning NGX_AGAIN");
313             return NGX_AGAIN;
314         }
315     }
316 
317     if ((pgscf->reject) && (pgscf->active_conns >= pgscf->max_cached)) {
318         ngx_log_error(NGX_LOG_INFO, pc->log, 0,
319                       "postgres: keepalive connection pool is full,"
320                       " rejecting request to upstream \"%V\"", &peer->name);
321 
322         /* a bit hack-ish way to return error response (setup part) */
323         pc->connection = ngx_get_connection(0, pc->log);
324 
325 #if defined(nginx_version) && (nginx_version < 8017)
326         pgctx->status = NGX_HTTP_SERVICE_UNAVAILABLE;
327 #endif
328 
329         dd("returning NGX_AGAIN (NGX_HTTP_SERVICE_UNAVAILABLE)");
330         return NGX_AGAIN;
331     }
332 
333     /* sizeof("...") - 1 + 1 (for spaces and '\0' omitted */
334     /* we hope that unix sockets connection string will be always shorter than tcp/ip one (because 'host' is shorter than 'hostaddr') */
335     len = sizeof("hostaddr=") + peer->host.len
336         + sizeof("port=") + sizeof("65535") - 1
337         + sizeof("dbname=") + peer->dbname.len
338         + sizeof("port=") + sizeof("65535") - 1
339         + sizeof("dbname=") + peer->dbname.len
340         + sizeof("user=") + peer->user.len
341         + sizeof("password=") + peer->password.len
342         + sizeof("sslmode=disable");
343 
344     connstring = ngx_pnalloc(pgdt->request->pool, len);
345     if (connstring == NULL) {
346 #if defined(nginx_version) && (nginx_version >= 8017)
347         dd("returning NGX_ERROR");
348         return NGX_ERROR;
349 #else
350         goto failed;
351 #endif
352     }
353 
354     if(peer->family != AF_UNIX)
355         last = ngx_snprintf(connstring, len - 1,
356                             "hostaddr=%V port=%d dbname=%V user=%V password=%V"
357                             " sslmode=disable",
358                             &peer->host, peer->port, &peer->dbname, &peer->user,
359                             &peer->password);
360     else
361         last = ngx_snprintf(connstring, len - 1,
362                             "host=%s port=%d dbname=%V user=%V password=%V"
363                             " sslmode=disable",
364                             &peer->host.data[5], peer->port, &peer->dbname, &peer->user,
365                             &peer->password);
366     *last = '\0';
367 
368     dd("PostgreSQL connection string: %s", connstring);
369 
370     /*
371      * internal checks in PQsetnonblocking are taking care of any
372      * PQconnectStart failures, so we don't need to check them here.
373      */
374 
375     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
376                    "postgres: connecting");
377 
378     pgdt->pgconn = PQconnectStart((const char *)connstring);
379     if (PQsetnonblocking(pgdt->pgconn, 1) == -1) {
380         ngx_log_error(NGX_LOG_ERR, pc->log, 0,
381                       "postgres: connection failed: %s in upstream \"%V\"",
382                       PQerrorMessage(pgdt->pgconn), &peer->name);
383 
384         PQfinish(pgdt->pgconn);
385         pgdt->pgconn = NULL;
386 
387 #if defined(nginx_version) && (nginx_version >= 8017)
388         dd("returning NGX_DECLINED");
389         return NGX_DECLINED;
390 #else
391         pgctx->status = NGX_HTTP_BAD_GATEWAY;
392         goto failed;
393 #endif
394     }
395 
396 #if defined(DDEBUG) && (DDEBUG > 1)
397     PQtrace(pgdt->pgconn, stderr);
398 #endif
399 
400     dd("connection status:%d", (int) PQstatus(pgdt->pgconn));
401 
402     /* take spot in keepalive connection pool */
403     pgscf->active_conns++;
404 
405     /* add the file descriptor (fd) into an nginx connection structure */
406 
407     fd = PQsocket(pgdt->pgconn);
408     if (fd == -1) {
409         ngx_log_error(NGX_LOG_ERR, pc->log, 0,
410                       "postgres: failed to get connection fd");
411 
412         goto invalid;
413     }
414 
415     ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
416                    "postgres: connection fd:%d", fd);
417 
418     pgxc = pc->connection = ngx_get_connection(fd, pc->log);
419 
420     if (pgxc == NULL) {
421         ngx_log_error(NGX_LOG_ERR, pc->log, 0,
422                       "postgres: failed to get a free nginx connection");
423 
424         goto invalid;
425     }
426 
427     pgxc->log = pc->log;
428     pgxc->log_error = pc->log_error;
429     pgxc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
430 
431     rev = pgxc->read;
432     wev = pgxc->write;
433 
434     rev->log = pc->log;
435     wev->log = pc->log;
436 
437     /* register the connection with postgres connection fd into the
438      * nginx event model */
439 
440     if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
441         dd("NGX_USE_RTSIG_EVENT");
442         if (ngx_add_conn(pgxc) != NGX_OK) {
443             goto bad_add;
444         }
445 
446     } else if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
447         dd("NGX_USE_CLEAR_EVENT");
448         if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
449             goto bad_add;
450         }
451 
452         if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
453             goto bad_add;
454         }
455 
456     } else {
457         dd("NGX_USE_LEVEL_EVENT");
458         if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
459             goto bad_add;
460         }
461 
462         if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
463             goto bad_add;
464         }
465     }
466 
467     pgxc->log->action = "connecting to PostgreSQL database";
468     pgdt->state = state_db_connect;
469 
470     dd("returning NGX_AGAIN");
471     return NGX_AGAIN;
472 
473 bad_add:
474 
475     ngx_log_error(NGX_LOG_ERR, pc->log, 0,
476                   "postgres: failed to add nginx connection");
477 
478 invalid:
479 
480     ngx_postgres_upstream_free_connection(pc->log, pc->connection,
481                                           pgdt->pgconn, pgscf);
482 
483 #if defined(nginx_version) && (nginx_version >= 8017)
484     dd("returning NGX_ERROR");
485     return NGX_ERROR;
486 #else
487 
488 failed:
489 
490     /* a bit hack-ish way to return error response (setup part) */
491     pc->connection = ngx_get_connection(0, pc->log);
492 
493     dd("returning NGX_AGAIN (NGX_ERROR)");
494     return NGX_AGAIN;
495 #endif
496 }
497 
498 void
ngx_postgres_upstream_free_peer(ngx_peer_connection_t * pc,void * data,ngx_uint_t state)499 ngx_postgres_upstream_free_peer(ngx_peer_connection_t *pc,
500     void *data, ngx_uint_t state)
501 {
502     ngx_postgres_upstream_peer_data_t  *pgdt = data;
503     ngx_postgres_upstream_srv_conf_t   *pgscf;
504 
505     dd("entering");
506 
507 #if defined(nginx_version) && (nginx_version < 8017)
508     if (data == NULL) {
509         dd("returning");
510         return;
511     }
512 #endif
513 
514     pgscf = pgdt->srv_conf;
515 
516     if (pgscf->max_cached) {
517         ngx_postgres_keepalive_free_peer(pc, pgdt, pgscf, state);
518     }
519 
520     if (pc->connection) {
521         dd("free connection to PostgreSQL database");
522 
523         ngx_postgres_upstream_free_connection(pc->log, pc->connection,
524                 pgdt->pgconn, pgscf);
525 
526 
527         pgdt->pgconn = NULL;
528         pc->connection = NULL;
529     }
530 
531     dd("returning");
532 }
533 
534 ngx_flag_t
ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t * peer)535 ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t *peer)
536 {
537     dd("entering & returning");
538     return (peer->get == ngx_postgres_upstream_get_peer);
539 }
540 
541 void
ngx_postgres_upstream_free_connection(ngx_log_t * log,ngx_connection_t * c,PGconn * pgconn,ngx_postgres_upstream_srv_conf_t * pgscf)542 ngx_postgres_upstream_free_connection(ngx_log_t *log, ngx_connection_t *c,
543     PGconn *pgconn, ngx_postgres_upstream_srv_conf_t *pgscf)
544 {
545     ngx_event_t  *rev, *wev;
546 
547     dd("entering");
548 
549     PQfinish(pgconn);
550 
551     if (c) {
552         rev = c->read;
553         wev = c->write;
554 
555         if (rev->timer_set) {
556             ngx_del_timer(rev);
557         }
558 
559         if (wev->timer_set) {
560             ngx_del_timer(wev);
561         }
562 
563         if (ngx_del_conn) {
564            ngx_del_conn(c, NGX_CLOSE_EVENT);
565         } else {
566             if (rev->active || rev->disabled) {
567                 ngx_del_event(rev, NGX_READ_EVENT, NGX_CLOSE_EVENT);
568             }
569 
570             if (wev->active || wev->disabled) {
571                 ngx_del_event(wev, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
572             }
573         }
574 
575 #if defined(nginx_version) && nginx_version >= 1007005
576         if (rev->posted) {
577 #else
578         if (rev->prev) {
579 #endif
580             ngx_delete_posted_event(rev);
581         }
582 
583 #if defined(nginx_version) && nginx_version >= 1007005
584         if (wev->posted) {
585 #else
586         if (wev->prev) {
587 #endif
588             ngx_delete_posted_event(wev);
589         }
590 
591         rev->closed = 1;
592         wev->closed = 1;
593 
594 #if defined(nginx_version) && (nginx_version >= 1001004)
595         if (c->pool) {
596             ngx_destroy_pool(c->pool);
597         }
598 #endif
599 
600         ngx_free_connection(c);
601 
602         c->fd = (ngx_socket_t) -1;
603     }
604 
605     /* free spot in keepalive connection pool */
606     pgscf->active_conns--;
607 
608     dd("returning");
609 }
610