1 /*-
2 * Copyright 2019 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "http_context.h"
18 #include "http_private.h"
19 #include "keypair.h"
20 #include "keypairs_cache.h"
21 #include "cfg_file.h"
22 #include "contrib/libottery/ottery.h"
23 #include "contrib/http-parser/http_parser.h"
24 #include "ssl_util.h"
25 #include "rspamd.h"
26 #include "libev_helper.h"
27
28 INIT_LOG_MODULE(http_context)
29
30 #define msg_debug_http_context(...) rspamd_conditional_debug_fast (NULL, NULL, \
31 rspamd_http_context_log_id, "http_context", NULL, \
32 G_STRFUNC, \
33 __VA_ARGS__)
34
35 static struct rspamd_http_context *default_ctx = NULL;
36
37 struct rspamd_http_keepalive_cbdata {
38 struct rspamd_http_connection *conn;
39 struct rspamd_http_context *ctx;
40 GQueue *queue;
41 GList *link;
42 struct rspamd_io_ev ev;
43 };
44
45 static void
rspamd_http_keepalive_queue_cleanup(GQueue * conns)46 rspamd_http_keepalive_queue_cleanup (GQueue *conns)
47 {
48 GList *cur;
49
50 cur = conns->head;
51
52 while (cur) {
53 struct rspamd_http_keepalive_cbdata *cbd;
54
55 cbd = (struct rspamd_http_keepalive_cbdata *)cur->data;
56 /* unref call closes fd, so we need to remove ev watcher first! */
57 rspamd_ev_watcher_stop (cbd->ctx->event_loop, &cbd->ev);
58 rspamd_http_connection_unref (cbd->conn);
59 g_free (cbd);
60
61 cur = cur->next;
62 }
63
64 g_queue_clear (conns);
65 }
66
67 static void
rspamd_http_context_client_rotate_ev(struct ev_loop * loop,ev_timer * w,int revents)68 rspamd_http_context_client_rotate_ev (struct ev_loop *loop, ev_timer *w, int revents)
69 {
70 struct rspamd_http_context *ctx = (struct rspamd_http_context *)w->data;
71 gpointer kp;
72
73 w->repeat = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0);
74 msg_debug_http_context ("rotate local keypair, next rotate in %.0f seconds",
75 w->repeat);
76
77 ev_timer_again (loop, w);
78
79 kp = ctx->client_kp;
80 ctx->client_kp = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
81 RSPAMD_CRYPTOBOX_MODE_25519);
82 rspamd_keypair_unref (kp);
83 }
84
85 static struct rspamd_http_context*
rspamd_http_context_new_default(struct rspamd_config * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)86 rspamd_http_context_new_default (struct rspamd_config *cfg,
87 struct ev_loop *ev_base,
88 struct upstream_ctx *ups_ctx)
89 {
90 struct rspamd_http_context *ctx;
91
92 static const int default_kp_size = 1024;
93 static const gdouble default_rotate_time = 120;
94 static const gdouble default_keepalive_interval = 65;
95 static const gchar *default_user_agent = "rspamd-" RSPAMD_VERSION_FULL;
96 static const gchar *default_server_hdr = "rspamd/" RSPAMD_VERSION_FULL;
97
98 ctx = g_malloc0 (sizeof (*ctx));
99 ctx->config.kp_cache_size_client = default_kp_size;
100 ctx->config.kp_cache_size_server = default_kp_size;
101 ctx->config.client_key_rotate_time = default_rotate_time;
102 ctx->config.user_agent = default_user_agent;
103 ctx->config.keepalive_interval = default_keepalive_interval;
104 ctx->config.server_hdr = default_server_hdr;
105 ctx->ups_ctx = ups_ctx;
106
107 if (cfg) {
108 ctx->ssl_ctx = cfg->libs_ctx->ssl_ctx;
109 ctx->ssl_ctx_noverify = cfg->libs_ctx->ssl_ctx_noverify;
110 }
111 else {
112 ctx->ssl_ctx = rspamd_init_ssl_ctx ();
113 ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
114 }
115
116 ctx->event_loop = ev_base;
117
118 ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
119
120 return ctx;
121 }
122
123 static void
rspamd_http_context_parse_proxy(struct rspamd_http_context * ctx,const gchar * name,struct upstream_list ** pls)124 rspamd_http_context_parse_proxy (struct rspamd_http_context *ctx,
125 const gchar *name,
126 struct upstream_list **pls)
127 {
128 struct http_parser_url u;
129 struct upstream_list *uls;
130
131 if (!ctx->ups_ctx) {
132 msg_err ("cannot parse http_proxy %s - upstreams context is udefined", name);
133 return;
134 }
135
136 memset (&u, 0, sizeof (u));
137
138 if (http_parser_parse_url (name, strlen (name), 1, &u) == 0) {
139 if (!(u.field_set & (1u << UF_HOST)) || u.port == 0) {
140 msg_err ("cannot parse http(s) proxy %s - invalid host or port", name);
141
142 return;
143 }
144
145 uls = rspamd_upstreams_create (ctx->ups_ctx);
146
147 if (!rspamd_upstreams_parse_line_len (uls,
148 name + u.field_data[UF_HOST].off,
149 u.field_data[UF_HOST].len, u.port, NULL)) {
150 msg_err ("cannot parse http(s) proxy %s - invalid data", name);
151
152 rspamd_upstreams_destroy (uls);
153 }
154 else {
155 *pls = uls;
156 msg_info ("set http(s) proxy to %s", name);
157 }
158 }
159 else {
160 uls = rspamd_upstreams_create (ctx->ups_ctx);
161
162 if (!rspamd_upstreams_parse_line (uls,
163 name, 3128, NULL)) {
164 msg_err ("cannot parse http(s) proxy %s - invalid data", name);
165
166 rspamd_upstreams_destroy (uls);
167 }
168 else {
169 *pls = uls;
170 msg_info ("set http(s) proxy to %s", name);
171 }
172 }
173 }
174
175 static void
rspamd_http_context_init(struct rspamd_http_context * ctx)176 rspamd_http_context_init (struct rspamd_http_context *ctx)
177 {
178 if (ctx->config.kp_cache_size_client > 0) {
179 ctx->client_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_client);
180 }
181
182 if (ctx->config.kp_cache_size_server > 0) {
183 ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
184 }
185
186 if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
187 double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
188 0);
189
190 ev_timer_init (&ctx->client_rotate_ev,
191 rspamd_http_context_client_rotate_ev, jittered, 0);
192 ev_timer_start (ctx->event_loop, &ctx->client_rotate_ev);
193 ctx->client_rotate_ev.data = ctx;
194 }
195
196 if (ctx->config.http_proxy) {
197 rspamd_http_context_parse_proxy (ctx, ctx->config.http_proxy,
198 &ctx->http_proxies);
199 }
200
201 default_ctx = ctx;
202 }
203
204 struct rspamd_http_context*
rspamd_http_context_create(struct rspamd_config * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)205 rspamd_http_context_create (struct rspamd_config *cfg,
206 struct ev_loop *ev_base,
207 struct upstream_ctx *ups_ctx)
208 {
209 struct rspamd_http_context *ctx;
210 const ucl_object_t *http_obj;
211
212 ctx = rspamd_http_context_new_default (cfg, ev_base, ups_ctx);
213 http_obj = ucl_object_lookup (cfg->rcl_obj, "http");
214
215 if (http_obj) {
216 const ucl_object_t *server_obj, *client_obj;
217
218 client_obj = ucl_object_lookup (http_obj, "client");
219
220 if (client_obj) {
221 const ucl_object_t *kp_size;
222
223 kp_size = ucl_object_lookup (client_obj, "cache_size");
224
225 if (kp_size) {
226 ctx->config.kp_cache_size_client = ucl_object_toint (kp_size);
227 }
228
229 const ucl_object_t *rotate_time;
230
231 rotate_time = ucl_object_lookup (client_obj, "rotate_time");
232
233 if (rotate_time) {
234 ctx->config.client_key_rotate_time = ucl_object_todouble (rotate_time);
235 }
236
237 const ucl_object_t *user_agent;
238
239 user_agent = ucl_object_lookup (client_obj, "user_agent");
240
241 if (user_agent) {
242 ctx->config.user_agent = ucl_object_tostring (user_agent);
243
244 if (ctx->config.user_agent && strlen (ctx->config.user_agent) == 0) {
245 ctx->config.user_agent = NULL;
246 }
247 }
248
249 const ucl_object_t *server_hdr;
250 server_hdr = ucl_object_lookup (client_obj, "server_hdr");
251
252 if (server_hdr) {
253 ctx->config.server_hdr = ucl_object_tostring (server_hdr);
254
255 if (ctx->config.server_hdr && strlen (ctx->config.server_hdr) == 0) {
256 ctx->config.server_hdr = "";
257 }
258 }
259
260 const ucl_object_t *keepalive_interval;
261
262 keepalive_interval = ucl_object_lookup (client_obj, "keepalive_interval");
263
264 if (keepalive_interval) {
265 ctx->config.keepalive_interval = ucl_object_todouble (keepalive_interval);
266 }
267
268 const ucl_object_t *http_proxy;
269 http_proxy = ucl_object_lookup (client_obj, "http_proxy");
270
271 if (http_proxy) {
272 ctx->config.http_proxy = ucl_object_tostring (http_proxy);
273 }
274 }
275
276 server_obj = ucl_object_lookup (http_obj, "server");
277
278 if (server_obj) {
279 const ucl_object_t *kp_size;
280
281 kp_size = ucl_object_lookup (server_obj, "cache_size");
282
283 if (kp_size) {
284 ctx->config.kp_cache_size_server = ucl_object_toint (kp_size);
285 }
286 }
287 }
288
289 rspamd_http_context_init (ctx);
290
291 return ctx;
292 }
293
294
295 void
rspamd_http_context_free(struct rspamd_http_context * ctx)296 rspamd_http_context_free (struct rspamd_http_context *ctx)
297 {
298 if (ctx == default_ctx) {
299 default_ctx = NULL;
300 }
301
302 if (ctx->client_kp_cache) {
303 rspamd_keypair_cache_destroy (ctx->client_kp_cache);
304 }
305
306 if (ctx->server_kp_cache) {
307 rspamd_keypair_cache_destroy (ctx->server_kp_cache);
308 }
309
310 if (ctx->config.client_key_rotate_time > 0) {
311 ev_timer_stop (ctx->event_loop, &ctx->client_rotate_ev);
312
313 if (ctx->client_kp) {
314 rspamd_keypair_unref (ctx->client_kp);
315 }
316 }
317
318 struct rspamd_keepalive_hash_key *hk;
319
320 kh_foreach_key (ctx->keep_alive_hash, hk, {
321 msg_debug_http_context ("cleanup keepalive elt %s (%s)",
322 rspamd_inet_address_to_string_pretty (hk->addr),
323 hk->host);
324
325 if (hk->host) {
326 g_free (hk->host);
327 }
328
329 rspamd_inet_address_free (hk->addr);
330 rspamd_http_keepalive_queue_cleanup (&hk->conns);
331 g_free (hk);
332 });
333
334 kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash);
335
336 if (ctx->http_proxies) {
337 rspamd_upstreams_destroy (ctx->http_proxies);
338 }
339
340 g_free (ctx);
341 }
342
343 struct rspamd_http_context*
rspamd_http_context_create_config(struct rspamd_http_context_cfg * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)344 rspamd_http_context_create_config (struct rspamd_http_context_cfg *cfg,
345 struct ev_loop *ev_base,
346 struct upstream_ctx *ups_ctx)
347 {
348 struct rspamd_http_context *ctx;
349
350 ctx = rspamd_http_context_new_default (NULL, ev_base, ups_ctx);
351 memcpy (&ctx->config, cfg, sizeof (*cfg));
352 rspamd_http_context_init (ctx);
353
354 return ctx;
355 }
356
357 struct rspamd_http_context*
rspamd_http_context_default(void)358 rspamd_http_context_default (void)
359 {
360 g_assert (default_ctx != NULL);
361
362 return default_ctx;
363 }
364
365 gint32
rspamd_keep_alive_key_hash(struct rspamd_keepalive_hash_key * k)366 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k)
367 {
368 gint32 h;
369
370 h = rspamd_inet_address_port_hash (k->addr);
371
372 if (k->host) {
373 h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h);
374 }
375
376 return h;
377 }
378
379 bool
rspamd_keep_alive_key_equal(struct rspamd_keepalive_hash_key * k1,struct rspamd_keepalive_hash_key * k2)380 rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1,
381 struct rspamd_keepalive_hash_key *k2)
382 {
383 if (k1->host && k2->host) {
384 if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) {
385 return strcmp (k1->host, k2->host) == 0;
386 }
387 }
388 else if (!k1->host && !k2->host) {
389 return rspamd_inet_address_port_equal (k1->addr, k2->addr);
390 }
391
392 /* One has host and another has no host */
393 return false;
394 }
395
396 struct rspamd_http_connection*
rspamd_http_context_check_keepalive(struct rspamd_http_context * ctx,const rspamd_inet_addr_t * addr,const gchar * host)397 rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
398 const rspamd_inet_addr_t *addr,
399 const gchar *host)
400 {
401 struct rspamd_keepalive_hash_key hk, *phk;
402 khiter_t k;
403
404 hk.addr = (rspamd_inet_addr_t *)addr;
405 hk.host = (gchar *)host;
406
407 k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
408
409 if (k != kh_end (ctx->keep_alive_hash)) {
410 phk = kh_key (ctx->keep_alive_hash, k);
411 GQueue *conns = &phk->conns;
412
413 /* Use stack based approach */
414
415 if (g_queue_get_length (conns) > 0) {
416 struct rspamd_http_keepalive_cbdata *cbd;
417 struct rspamd_http_connection *conn;
418 gint err;
419 socklen_t len = sizeof (gint);
420
421 cbd = g_queue_pop_head (conns);
422 rspamd_ev_watcher_stop (ctx->event_loop, &cbd->ev);
423 conn = cbd->conn;
424 g_free (cbd);
425
426 if (getsockopt (conn->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len) == -1) {
427 err = errno;
428 }
429
430 if (err != 0) {
431 rspamd_http_connection_unref (conn);
432
433 msg_debug_http_context ("invalid reused keepalive element %s (%s); "
434 "%s error; "
435 "%d connections queued",
436 rspamd_inet_address_to_string_pretty (phk->addr),
437 phk->host,
438 g_strerror (err),
439 conns->length);
440
441 return NULL;
442 }
443
444 msg_debug_http_context ("reused keepalive element %s (%s), %d connections queued",
445 rspamd_inet_address_to_string_pretty (phk->addr),
446 phk->host, conns->length);
447
448 /* We transfer refcount here! */
449 return conn;
450 }
451 else {
452 msg_debug_http_context ("found empty keepalive element %s (%s), cannot reuse",
453 rspamd_inet_address_to_string_pretty (phk->addr),
454 phk->host);
455 }
456 }
457
458 return NULL;
459 }
460
461 void
rspamd_http_context_prepare_keepalive(struct rspamd_http_context * ctx,struct rspamd_http_connection * conn,const rspamd_inet_addr_t * addr,const gchar * host)462 rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
463 struct rspamd_http_connection *conn,
464 const rspamd_inet_addr_t *addr,
465 const gchar *host)
466 {
467 struct rspamd_keepalive_hash_key hk, *phk;
468 khiter_t k;
469
470 hk.addr = (rspamd_inet_addr_t *)addr;
471 hk.host = (gchar *)host;
472
473 k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
474
475 if (k != kh_end (ctx->keep_alive_hash)) {
476 /* Reuse existing */
477 conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k);
478 msg_debug_http_context ("use existing keepalive element %s (%s)",
479 rspamd_inet_address_to_string_pretty (conn->keepalive_hash_key->addr),
480 conn->keepalive_hash_key->host);
481 }
482 else {
483 /* Create new one */
484 GQueue empty_init = G_QUEUE_INIT;
485 gint r;
486
487 phk = g_malloc (sizeof (*phk));
488 phk->conns = empty_init;
489 phk->host = g_strdup (host);
490 phk->addr = rspamd_inet_address_copy (addr);
491
492 kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r);
493 conn->keepalive_hash_key = phk;
494
495 msg_debug_http_context ("create new keepalive element %s (%s)",
496 rspamd_inet_address_to_string_pretty (conn->keepalive_hash_key->addr),
497 conn->keepalive_hash_key->host);
498 }
499 }
500
501 static void
rspamd_http_keepalive_handler(gint fd,short what,gpointer ud)502 rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
503 {
504 struct rspamd_http_keepalive_cbdata *cbdata =
505 (struct rspamd_http_keepalive_cbdata *)ud;/*
506 * We can get here if a remote side reported something or it has
507 * timed out. In both cases we just terminate keepalive connection.
508 */
509
510 g_queue_delete_link (cbdata->queue, cbdata->link);
511 msg_debug_http_context ("remove keepalive element %s (%s), %d connections left",
512 rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr),
513 cbdata->conn->keepalive_hash_key->host,
514 cbdata->queue->length);
515 /* unref call closes fd, so we need to remove ev watcher first! */
516 rspamd_ev_watcher_stop (cbdata->ctx->event_loop, &cbdata->ev);
517 rspamd_http_connection_unref (cbdata->conn);
518 g_free (cbdata);
519 }
520
521 void
rspamd_http_context_push_keepalive(struct rspamd_http_context * ctx,struct rspamd_http_connection * conn,struct rspamd_http_message * msg,struct ev_loop * event_loop)522 rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
523 struct rspamd_http_connection *conn,
524 struct rspamd_http_message *msg,
525 struct ev_loop *event_loop)
526 {
527 struct rspamd_http_keepalive_cbdata *cbdata;
528 gdouble timeout = ctx->config.keepalive_interval;
529
530 g_assert (conn->keepalive_hash_key != NULL);
531
532 if (msg) {
533 const rspamd_ftok_t *tok;
534 rspamd_ftok_t cmp;
535
536 tok = rspamd_http_message_find_header (msg, "Connection");
537
538 if (!tok) {
539 /* Server has not stated that it can do keep alive */
540 conn->finished = TRUE;
541 msg_debug_http_context ("no Connection header");
542 return;
543 }
544
545 RSPAMD_FTOK_ASSIGN (&cmp, "keep-alive");
546
547 if (rspamd_ftok_casecmp (&cmp, tok) != 0) {
548 conn->finished = TRUE;
549 msg_debug_http_context ("connection header is not `keep-alive`");
550 return;
551 }
552
553 /* We can proceed, check timeout */
554
555 tok = rspamd_http_message_find_header (msg, "Keep-Alive");
556
557 if (tok) {
558 goffset pos = rspamd_substring_search_caseless (tok->begin,
559 tok->len, "timeout=", sizeof ("timeout=") - 1);
560
561 if (pos != -1) {
562 pos += sizeof ("timeout=");
563
564 gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos);
565 glong real_timeout;
566
567 if (end_pos) {
568 if (rspamd_strtol (tok->begin + pos + 1,
569 (end_pos - tok->begin) - pos - 1, &real_timeout) &&
570 real_timeout > 0) {
571 timeout = real_timeout;
572 msg_debug_http_context ("got timeout attr %.2f", timeout);
573 }
574 }
575 else {
576 if (rspamd_strtol (tok->begin + pos + 1,
577 tok->len - pos - 1, &real_timeout) &&
578 real_timeout > 0) {
579 timeout = real_timeout;
580 msg_debug_http_context ("got timeout attr %.2f", timeout);
581 }
582 }
583 }
584 }
585 }
586
587 /* Move connection to the keepalive pool */
588 cbdata = g_malloc0 (sizeof (*cbdata));
589
590 cbdata->conn = rspamd_http_connection_ref (conn);
591 /* Use stack like approach to that would easy reading */
592 g_queue_push_head (&conn->keepalive_hash_key->conns, cbdata);
593 cbdata->link = conn->keepalive_hash_key->conns.head;
594
595 cbdata->queue = &conn->keepalive_hash_key->conns;
596 cbdata->ctx = ctx;
597 conn->finished = FALSE;
598
599 rspamd_ev_watcher_init (&cbdata->ev, conn->fd, EV_READ,
600 rspamd_http_keepalive_handler,
601 cbdata);
602 rspamd_ev_watcher_start (event_loop, &cbdata->ev, timeout);
603
604 msg_debug_http_context ("push keepalive element %s (%s), %d connections queued, %.1f timeout",
605 rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr),
606 cbdata->conn->keepalive_hash_key->host,
607 cbdata->queue->length,
608 timeout);
609 }