1 /*
2 * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
3 * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com>
4 * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #ifndef DDEBUG
30 #define DDEBUG 0
31 #endif
32
33 #include <nginx.h>
34 #include "ngx_postgres_ddebug.h"
35 #include "ngx_postgres_module.h"
36 #include "ngx_postgres_keepalive.h"
37 #include "ngx_postgres_processor.h"
38
39
40 ngx_int_t
ngx_postgres_upstream_init(ngx_conf_t * cf,ngx_http_upstream_srv_conf_t * uscf)41 ngx_postgres_upstream_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *uscf)
42 {
43 ngx_postgres_upstream_srv_conf_t *pgscf;
44 ngx_postgres_upstream_server_t *server;
45 ngx_postgres_upstream_peers_t *peers;
46 ngx_uint_t i, j, n;
47
48 dd("entering");
49
50 uscf->peer.init = ngx_postgres_upstream_init_peer;
51
52 pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
53
54 if (pgscf->servers == NULL || pgscf->servers->nelts == 0) {
55 ngx_log_error(NGX_LOG_ERR, cf->log, 0,
56 "postgres: no \"postgres_server\" defined"
57 " in upstream \"%V\" in %s:%ui",
58 &uscf->host, uscf->file_name, uscf->line);
59
60 dd("returning NGX_ERROR");
61 return NGX_ERROR;
62 }
63
64 /* pgscf->servers != NULL */
65
66 server = uscf->servers->elts;
67
68 n = 0;
69
70 for (i = 0; i < uscf->servers->nelts; i++) {
71 n += server[i].naddrs;
72 }
73
74 peers = ngx_pcalloc(cf->pool, sizeof(ngx_postgres_upstream_peers_t)
75 + sizeof(ngx_postgres_upstream_peer_t) * (n - 1));
76
77 if (peers == NULL) {
78 dd("returning NGX_ERROR");
79 return NGX_ERROR;
80 }
81
82 peers->single = (n == 1);
83 peers->number = n;
84 peers->name = &uscf->host;
85
86 n = 0;
87
88 for (i = 0; i < uscf->servers->nelts; i++) {
89 for (j = 0; j < server[i].naddrs; j++) {
90 peers->peer[n].sockaddr = server[i].addrs[j].sockaddr;
91 peers->peer[n].socklen = server[i].addrs[j].socklen;
92 peers->peer[n].name = server[i].addrs[j].name;
93 peers->peer[n].port = server[i].port;
94 peers->peer[n].family = server[i].family;
95 peers->peer[n].dbname = server[i].dbname;
96 peers->peer[n].user = server[i].user;
97 peers->peer[n].password = server[i].password;
98
99 peers->peer[n].host.data = ngx_pnalloc(cf->pool,
100 NGX_SOCKADDR_STRLEN);
101 if (peers->peer[n].host.data == NULL) {
102 dd("returning NGX_ERROR");
103 return NGX_ERROR;
104 }
105
106 peers->peer[n].host.len = ngx_sock_ntop(peers->peer[n].sockaddr,
107 #if defined(nginx_version) && (nginx_version >= 1005003)
108 peers->peer[n].socklen,
109 #endif
110 peers->peer[n].host.data,
111 NGX_SOCKADDR_STRLEN, 0);
112 if (peers->peer[n].host.len == 0) {
113 dd("returning NGX_ERROR");
114 return NGX_ERROR;
115 }
116
117 n++;
118 }
119 }
120
121 pgscf->peers = peers;
122 pgscf->active_conns = 0;
123
124 if (pgscf->max_cached) {
125 dd("returning");
126 return ngx_postgres_keepalive_init(cf->pool, pgscf);
127 }
128
129 dd("returning NGX_OK");
130 return NGX_OK;
131 }
132
133 ngx_int_t
ngx_postgres_upstream_init_peer(ngx_http_request_t * r,ngx_http_upstream_srv_conf_t * uscf)134 ngx_postgres_upstream_init_peer(ngx_http_request_t *r,
135 ngx_http_upstream_srv_conf_t *uscf)
136 {
137 ngx_postgres_upstream_peer_data_t *pgdt;
138 ngx_postgres_upstream_srv_conf_t *pgscf;
139 ngx_postgres_loc_conf_t *pglcf;
140 ngx_postgres_ctx_t *pgctx;
141 ngx_http_core_loc_conf_t *clcf;
142 ngx_http_upstream_t *u;
143 ngx_postgres_mixed_t *query;
144 ngx_str_t sql;
145 ngx_uint_t i;
146
147 dd("entering");
148
149 pgdt = ngx_pcalloc(r->pool, sizeof(ngx_postgres_upstream_peer_data_t));
150 if (pgdt == NULL) {
151 goto failed;
152 }
153
154 u = r->upstream;
155
156 pgdt->upstream = u;
157 pgdt->request = r;
158
159 pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
160 pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
161 pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);
162
163 pgdt->srv_conf = pgscf;
164 pgdt->loc_conf = pglcf;
165
166 u->peer.data = pgdt;
167 u->peer.get = ngx_postgres_upstream_get_peer;
168 u->peer.free = ngx_postgres_upstream_free_peer;
169
170 if (pglcf->query.methods_set & r->method) {
171 /* method-specific query */
172 dd("using method-specific query");
173
174 query = pglcf->query.methods->elts;
175 for (i = 0; i < pglcf->query.methods->nelts; i++) {
176 if (query[i].key & r->method) {
177 query = &query[i];
178 break;
179 }
180 }
181
182 if (i == pglcf->query.methods->nelts) {
183 goto failed;
184 }
185 } else {
186 /* default query */
187 dd("using default query");
188
189 query = pglcf->query.def;
190 }
191
192 if (query->cv) {
193 /* complex value */
194 dd("using complex value");
195
196 if (ngx_http_complex_value(r, query->cv, &sql) != NGX_OK) {
197 goto failed;
198 }
199
200 if (sql.len == 0) {
201 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
202
203 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
204 "postgres: empty \"postgres_query\" (was: \"%V\")"
205 " in location \"%V\"", &query->cv->value,
206 &clcf->name);
207
208 goto failed;
209 }
210
211 pgdt->query = sql;
212 } else {
213 /* simple value */
214 dd("using simple value");
215
216 pgdt->query = query->sv;
217 }
218
219 /* set $postgres_query */
220 pgctx->var_query = pgdt->query;
221
222 dd("returning NGX_OK");
223 return NGX_OK;
224
225 failed:
226
227 #if defined(nginx_version) && (nginx_version >= 8017)
228 dd("returning NGX_ERROR");
229 return NGX_ERROR;
230 #else
231 r->upstream->peer.data = NULL;
232
233 dd("returning NGX_OK (NGX_ERROR)");
234 return NGX_OK;
235 #endif
236 }
237
238 ngx_int_t
ngx_postgres_upstream_get_peer(ngx_peer_connection_t * pc,void * data)239 ngx_postgres_upstream_get_peer(ngx_peer_connection_t *pc, void *data)
240 {
241 ngx_postgres_upstream_peer_data_t *pgdt = data;
242 ngx_postgres_upstream_srv_conf_t *pgscf;
243 #if defined(nginx_version) && (nginx_version < 8017)
244 ngx_postgres_ctx_t *pgctx;
245 #endif
246 ngx_postgres_upstream_peers_t *peers;
247 ngx_postgres_upstream_peer_t *peer;
248 ngx_connection_t *pgxc = NULL;
249 int fd;
250 ngx_event_t *rev, *wev;
251 ngx_int_t rc;
252 u_char *connstring, *last;
253 size_t len;
254
255 dd("entering");
256
257 #if defined(nginx_version) && (nginx_version < 8017)
258 if (data == NULL) {
259 goto failed;
260 }
261
262 pgctx = ngx_http_get_module_ctx(pgdt->request, ngx_postgres_module);
263 #endif
264
265 pgscf = pgdt->srv_conf;
266
267 pgdt->failed = 0;
268
269 if (pgscf->max_cached && pgscf->single) {
270 rc = ngx_postgres_keepalive_get_peer_single(pc, pgdt, pgscf);
271 if (rc != NGX_DECLINED) {
272 /* re-use keepalive peer */
273 dd("re-using keepalive peer (single)");
274
275 pgdt->state = state_db_send_query;
276
277 ngx_postgres_process_events(pgdt->request);
278
279 dd("returning NGX_AGAIN");
280 return NGX_AGAIN;
281 }
282 }
283
284 peers = pgscf->peers;
285
286 if (pgscf->current > peers->number - 1) {
287 pgscf->current = 0;
288 }
289
290 peer = &peers->peer[pgscf->current++];
291
292 pgdt->name.len = peer->name.len;
293 pgdt->name.data = peer->name.data;
294
295 pgdt->sockaddr = *peer->sockaddr;
296
297 pc->name = &pgdt->name;
298 pc->sockaddr = &pgdt->sockaddr;
299 pc->socklen = peer->socklen;
300 pc->cached = 0;
301
302 if ((pgscf->max_cached) && (!pgscf->single)) {
303 rc = ngx_postgres_keepalive_get_peer_multi(pc, pgdt, pgscf);
304 if (rc != NGX_DECLINED) {
305 /* re-use keepalive peer */
306 dd("re-using keepalive peer (multi)");
307
308 pgdt->state = state_db_send_query;
309
310 ngx_postgres_process_events(pgdt->request);
311
312 dd("returning NGX_AGAIN");
313 return NGX_AGAIN;
314 }
315 }
316
317 if ((pgscf->reject) && (pgscf->active_conns >= pgscf->max_cached)) {
318 ngx_log_error(NGX_LOG_INFO, pc->log, 0,
319 "postgres: keepalive connection pool is full,"
320 " rejecting request to upstream \"%V\"", &peer->name);
321
322 /* a bit hack-ish way to return error response (setup part) */
323 pc->connection = ngx_get_connection(0, pc->log);
324
325 #if defined(nginx_version) && (nginx_version < 8017)
326 pgctx->status = NGX_HTTP_SERVICE_UNAVAILABLE;
327 #endif
328
329 dd("returning NGX_AGAIN (NGX_HTTP_SERVICE_UNAVAILABLE)");
330 return NGX_AGAIN;
331 }
332
333 /* sizeof("...") - 1 + 1 (for spaces and '\0' omitted */
334 /* we hope that unix sockets connection string will be always shorter than tcp/ip one (because 'host' is shorter than 'hostaddr') */
335 len = sizeof("hostaddr=") + peer->host.len
336 + sizeof("port=") + sizeof("65535") - 1
337 + sizeof("dbname=") + peer->dbname.len
338 + sizeof("port=") + sizeof("65535") - 1
339 + sizeof("dbname=") + peer->dbname.len
340 + sizeof("user=") + peer->user.len
341 + sizeof("password=") + peer->password.len
342 + sizeof("sslmode=disable");
343
344 connstring = ngx_pnalloc(pgdt->request->pool, len);
345 if (connstring == NULL) {
346 #if defined(nginx_version) && (nginx_version >= 8017)
347 dd("returning NGX_ERROR");
348 return NGX_ERROR;
349 #else
350 goto failed;
351 #endif
352 }
353
354 if(peer->family != AF_UNIX)
355 last = ngx_snprintf(connstring, len - 1,
356 "hostaddr=%V port=%d dbname=%V user=%V password=%V"
357 " sslmode=disable",
358 &peer->host, peer->port, &peer->dbname, &peer->user,
359 &peer->password);
360 else
361 last = ngx_snprintf(connstring, len - 1,
362 "host=%s port=%d dbname=%V user=%V password=%V"
363 " sslmode=disable",
364 &peer->host.data[5], peer->port, &peer->dbname, &peer->user,
365 &peer->password);
366 *last = '\0';
367
368 dd("PostgreSQL connection string: %s", connstring);
369
370 /*
371 * internal checks in PQsetnonblocking are taking care of any
372 * PQconnectStart failures, so we don't need to check them here.
373 */
374
375 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
376 "postgres: connecting");
377
378 pgdt->pgconn = PQconnectStart((const char *)connstring);
379 if (PQsetnonblocking(pgdt->pgconn, 1) == -1) {
380 ngx_log_error(NGX_LOG_ERR, pc->log, 0,
381 "postgres: connection failed: %s in upstream \"%V\"",
382 PQerrorMessage(pgdt->pgconn), &peer->name);
383
384 PQfinish(pgdt->pgconn);
385 pgdt->pgconn = NULL;
386
387 #if defined(nginx_version) && (nginx_version >= 8017)
388 dd("returning NGX_DECLINED");
389 return NGX_DECLINED;
390 #else
391 pgctx->status = NGX_HTTP_BAD_GATEWAY;
392 goto failed;
393 #endif
394 }
395
396 #if defined(DDEBUG) && (DDEBUG > 1)
397 PQtrace(pgdt->pgconn, stderr);
398 #endif
399
400 dd("connection status:%d", (int) PQstatus(pgdt->pgconn));
401
402 /* take spot in keepalive connection pool */
403 pgscf->active_conns++;
404
405 /* add the file descriptor (fd) into an nginx connection structure */
406
407 fd = PQsocket(pgdt->pgconn);
408 if (fd == -1) {
409 ngx_log_error(NGX_LOG_ERR, pc->log, 0,
410 "postgres: failed to get connection fd");
411
412 goto invalid;
413 }
414
415 ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
416 "postgres: connection fd:%d", fd);
417
418 pgxc = pc->connection = ngx_get_connection(fd, pc->log);
419
420 if (pgxc == NULL) {
421 ngx_log_error(NGX_LOG_ERR, pc->log, 0,
422 "postgres: failed to get a free nginx connection");
423
424 goto invalid;
425 }
426
427 pgxc->log = pc->log;
428 pgxc->log_error = pc->log_error;
429 pgxc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
430
431 rev = pgxc->read;
432 wev = pgxc->write;
433
434 rev->log = pc->log;
435 wev->log = pc->log;
436
437 /* register the connection with postgres connection fd into the
438 * nginx event model */
439
440 if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
441 dd("NGX_USE_RTSIG_EVENT");
442 if (ngx_add_conn(pgxc) != NGX_OK) {
443 goto bad_add;
444 }
445
446 } else if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
447 dd("NGX_USE_CLEAR_EVENT");
448 if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
449 goto bad_add;
450 }
451
452 if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
453 goto bad_add;
454 }
455
456 } else {
457 dd("NGX_USE_LEVEL_EVENT");
458 if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
459 goto bad_add;
460 }
461
462 if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
463 goto bad_add;
464 }
465 }
466
467 pgxc->log->action = "connecting to PostgreSQL database";
468 pgdt->state = state_db_connect;
469
470 dd("returning NGX_AGAIN");
471 return NGX_AGAIN;
472
473 bad_add:
474
475 ngx_log_error(NGX_LOG_ERR, pc->log, 0,
476 "postgres: failed to add nginx connection");
477
478 invalid:
479
480 ngx_postgres_upstream_free_connection(pc->log, pc->connection,
481 pgdt->pgconn, pgscf);
482
483 #if defined(nginx_version) && (nginx_version >= 8017)
484 dd("returning NGX_ERROR");
485 return NGX_ERROR;
486 #else
487
488 failed:
489
490 /* a bit hack-ish way to return error response (setup part) */
491 pc->connection = ngx_get_connection(0, pc->log);
492
493 dd("returning NGX_AGAIN (NGX_ERROR)");
494 return NGX_AGAIN;
495 #endif
496 }
497
498 void
ngx_postgres_upstream_free_peer(ngx_peer_connection_t * pc,void * data,ngx_uint_t state)499 ngx_postgres_upstream_free_peer(ngx_peer_connection_t *pc,
500 void *data, ngx_uint_t state)
501 {
502 ngx_postgres_upstream_peer_data_t *pgdt = data;
503 ngx_postgres_upstream_srv_conf_t *pgscf;
504
505 dd("entering");
506
507 #if defined(nginx_version) && (nginx_version < 8017)
508 if (data == NULL) {
509 dd("returning");
510 return;
511 }
512 #endif
513
514 pgscf = pgdt->srv_conf;
515
516 if (pgscf->max_cached) {
517 ngx_postgres_keepalive_free_peer(pc, pgdt, pgscf, state);
518 }
519
520 if (pc->connection) {
521 dd("free connection to PostgreSQL database");
522
523 ngx_postgres_upstream_free_connection(pc->log, pc->connection,
524 pgdt->pgconn, pgscf);
525
526
527 pgdt->pgconn = NULL;
528 pc->connection = NULL;
529 }
530
531 dd("returning");
532 }
533
534 ngx_flag_t
ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t * peer)535 ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t *peer)
536 {
537 dd("entering & returning");
538 return (peer->get == ngx_postgres_upstream_get_peer);
539 }
540
541 void
ngx_postgres_upstream_free_connection(ngx_log_t * log,ngx_connection_t * c,PGconn * pgconn,ngx_postgres_upstream_srv_conf_t * pgscf)542 ngx_postgres_upstream_free_connection(ngx_log_t *log, ngx_connection_t *c,
543 PGconn *pgconn, ngx_postgres_upstream_srv_conf_t *pgscf)
544 {
545 ngx_event_t *rev, *wev;
546
547 dd("entering");
548
549 PQfinish(pgconn);
550
551 if (c) {
552 rev = c->read;
553 wev = c->write;
554
555 if (rev->timer_set) {
556 ngx_del_timer(rev);
557 }
558
559 if (wev->timer_set) {
560 ngx_del_timer(wev);
561 }
562
563 if (ngx_del_conn) {
564 ngx_del_conn(c, NGX_CLOSE_EVENT);
565 } else {
566 if (rev->active || rev->disabled) {
567 ngx_del_event(rev, NGX_READ_EVENT, NGX_CLOSE_EVENT);
568 }
569
570 if (wev->active || wev->disabled) {
571 ngx_del_event(wev, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
572 }
573 }
574
575 #if defined(nginx_version) && nginx_version >= 1007005
576 if (rev->posted) {
577 #else
578 if (rev->prev) {
579 #endif
580 ngx_delete_posted_event(rev);
581 }
582
583 #if defined(nginx_version) && nginx_version >= 1007005
584 if (wev->posted) {
585 #else
586 if (wev->prev) {
587 #endif
588 ngx_delete_posted_event(wev);
589 }
590
591 rev->closed = 1;
592 wev->closed = 1;
593
594 #if defined(nginx_version) && (nginx_version >= 1001004)
595 if (c->pool) {
596 ngx_destroy_pool(c->pool);
597 }
598 #endif
599
600 ngx_free_connection(c);
601
602 c->fd = (ngx_socket_t) -1;
603 }
604
605 /* free spot in keepalive connection pool */
606 pgscf->active_conns--;
607
608 dd("returning");
609 }
610