1 /*-
2  * Copyright 2019 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "http_context.h"
18 #include "http_private.h"
19 #include "keypair.h"
20 #include "keypairs_cache.h"
21 #include "cfg_file.h"
22 #include "contrib/libottery/ottery.h"
23 #include "contrib/http-parser/http_parser.h"
24 #include "ssl_util.h"
25 #include "rspamd.h"
26 #include "libev_helper.h"
27 
28 INIT_LOG_MODULE(http_context)
29 
30 #define msg_debug_http_context(...)  rspamd_conditional_debug_fast (NULL, NULL, \
31         rspamd_http_context_log_id, "http_context", NULL, \
32         G_STRFUNC, \
33         __VA_ARGS__)
34 
35 static struct rspamd_http_context *default_ctx = NULL;
36 
37 struct rspamd_http_keepalive_cbdata {
38 	struct rspamd_http_connection *conn;
39 	struct rspamd_http_context *ctx;
40 	GQueue *queue;
41 	GList *link;
42 	struct rspamd_io_ev ev;
43 };
44 
45 static void
rspamd_http_keepalive_queue_cleanup(GQueue * conns)46 rspamd_http_keepalive_queue_cleanup (GQueue *conns)
47 {
48 	GList *cur;
49 
50 	cur = conns->head;
51 
52 	while (cur) {
53 		struct rspamd_http_keepalive_cbdata *cbd;
54 
55 		cbd = (struct rspamd_http_keepalive_cbdata *)cur->data;
56 		/* unref call closes fd, so we need to remove ev watcher first! */
57 		rspamd_ev_watcher_stop (cbd->ctx->event_loop, &cbd->ev);
58 		rspamd_http_connection_unref (cbd->conn);
59 		g_free (cbd);
60 
61 		cur = cur->next;
62 	}
63 
64 	g_queue_clear (conns);
65 }
66 
67 static void
rspamd_http_context_client_rotate_ev(struct ev_loop * loop,ev_timer * w,int revents)68 rspamd_http_context_client_rotate_ev (struct ev_loop *loop, ev_timer *w, int revents)
69 {
70 	struct rspamd_http_context *ctx = (struct rspamd_http_context *)w->data;
71 	gpointer kp;
72 
73 	w->repeat = rspamd_time_jitter (ctx->config.client_key_rotate_time, 0);
74 	msg_debug_http_context ("rotate local keypair, next rotate in %.0f seconds",
75 			w->repeat);
76 
77 	ev_timer_again (loop, w);
78 
79 	kp = ctx->client_kp;
80 	ctx->client_kp = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
81 			RSPAMD_CRYPTOBOX_MODE_25519);
82 	rspamd_keypair_unref (kp);
83 }
84 
85 static struct rspamd_http_context*
rspamd_http_context_new_default(struct rspamd_config * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)86 rspamd_http_context_new_default (struct rspamd_config *cfg,
87 								 struct ev_loop *ev_base,
88 								 struct upstream_ctx *ups_ctx)
89 {
90 	struct rspamd_http_context *ctx;
91 
92 	static const int default_kp_size = 1024;
93 	static const gdouble default_rotate_time = 120;
94 	static const gdouble default_keepalive_interval = 65;
95 	static const gchar *default_user_agent = "rspamd-" RSPAMD_VERSION_FULL;
96 	static const gchar *default_server_hdr = "rspamd/" RSPAMD_VERSION_FULL;
97 
98 	ctx = g_malloc0 (sizeof (*ctx));
99 	ctx->config.kp_cache_size_client = default_kp_size;
100 	ctx->config.kp_cache_size_server = default_kp_size;
101 	ctx->config.client_key_rotate_time = default_rotate_time;
102 	ctx->config.user_agent = default_user_agent;
103 	ctx->config.keepalive_interval = default_keepalive_interval;
104 	ctx->config.server_hdr = default_server_hdr;
105 	ctx->ups_ctx = ups_ctx;
106 
107 	if (cfg) {
108 		ctx->ssl_ctx = cfg->libs_ctx->ssl_ctx;
109 		ctx->ssl_ctx_noverify = cfg->libs_ctx->ssl_ctx_noverify;
110 	}
111 	else {
112 		ctx->ssl_ctx = rspamd_init_ssl_ctx ();
113 		ctx->ssl_ctx_noverify = rspamd_init_ssl_ctx_noverify ();
114 	}
115 
116 	ctx->event_loop = ev_base;
117 
118 	ctx->keep_alive_hash = kh_init (rspamd_keep_alive_hash);
119 
120 	return ctx;
121 }
122 
123 static void
rspamd_http_context_parse_proxy(struct rspamd_http_context * ctx,const gchar * name,struct upstream_list ** pls)124 rspamd_http_context_parse_proxy (struct rspamd_http_context *ctx,
125 								 const gchar *name,
126 								 struct upstream_list **pls)
127 {
128 	struct http_parser_url u;
129 	struct upstream_list *uls;
130 
131 	if (!ctx->ups_ctx) {
132 		msg_err ("cannot parse http_proxy %s - upstreams context is udefined", name);
133 		return;
134 	}
135 
136 	memset (&u, 0, sizeof (u));
137 
138 	if (http_parser_parse_url (name, strlen (name), 1, &u) == 0) {
139 		if (!(u.field_set & (1u << UF_HOST)) || u.port == 0) {
140 			msg_err ("cannot parse http(s) proxy %s - invalid host or port", name);
141 
142 			return;
143 		}
144 
145 		uls = rspamd_upstreams_create (ctx->ups_ctx);
146 
147 		if (!rspamd_upstreams_parse_line_len (uls,
148 				name + u.field_data[UF_HOST].off,
149 				u.field_data[UF_HOST].len, u.port, NULL)) {
150 			msg_err ("cannot parse http(s) proxy %s - invalid data", name);
151 
152 			rspamd_upstreams_destroy (uls);
153 		}
154 		else {
155 			*pls = uls;
156 			msg_info ("set http(s) proxy to %s", name);
157 		}
158 	}
159 	else {
160 		uls = rspamd_upstreams_create (ctx->ups_ctx);
161 
162 		if (!rspamd_upstreams_parse_line (uls,
163 				name, 3128, NULL)) {
164 			msg_err ("cannot parse http(s) proxy %s - invalid data", name);
165 
166 			rspamd_upstreams_destroy (uls);
167 		}
168 		else {
169 			*pls = uls;
170 			msg_info ("set http(s) proxy to %s", name);
171 		}
172 	}
173 }
174 
175 static void
rspamd_http_context_init(struct rspamd_http_context * ctx)176 rspamd_http_context_init (struct rspamd_http_context *ctx)
177 {
178 	if (ctx->config.kp_cache_size_client > 0) {
179 		ctx->client_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_client);
180 	}
181 
182 	if (ctx->config.kp_cache_size_server > 0) {
183 		ctx->server_kp_cache = rspamd_keypair_cache_new (ctx->config.kp_cache_size_server);
184 	}
185 
186 	if (ctx->config.client_key_rotate_time > 0 && ctx->event_loop) {
187 		double jittered = rspamd_time_jitter (ctx->config.client_key_rotate_time,
188 				0);
189 
190 		ev_timer_init (&ctx->client_rotate_ev,
191 				rspamd_http_context_client_rotate_ev, jittered, 0);
192 		ev_timer_start (ctx->event_loop, &ctx->client_rotate_ev);
193 		ctx->client_rotate_ev.data = ctx;
194 	}
195 
196 	if (ctx->config.http_proxy) {
197 		rspamd_http_context_parse_proxy (ctx, ctx->config.http_proxy,
198 				&ctx->http_proxies);
199 	}
200 
201 	default_ctx = ctx;
202 }
203 
204 struct rspamd_http_context*
rspamd_http_context_create(struct rspamd_config * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)205 rspamd_http_context_create (struct rspamd_config *cfg,
206 							struct ev_loop *ev_base,
207 							struct upstream_ctx *ups_ctx)
208 {
209 	struct rspamd_http_context *ctx;
210 	const ucl_object_t *http_obj;
211 
212 	ctx = rspamd_http_context_new_default (cfg, ev_base, ups_ctx);
213 	http_obj = ucl_object_lookup (cfg->rcl_obj, "http");
214 
215 	if (http_obj) {
216 		const ucl_object_t *server_obj, *client_obj;
217 
218 		client_obj = ucl_object_lookup (http_obj, "client");
219 
220 		if (client_obj) {
221 			const ucl_object_t *kp_size;
222 
223 			kp_size = ucl_object_lookup (client_obj, "cache_size");
224 
225 			if (kp_size) {
226 				ctx->config.kp_cache_size_client = ucl_object_toint (kp_size);
227 			}
228 
229 			const ucl_object_t *rotate_time;
230 
231 			rotate_time = ucl_object_lookup (client_obj, "rotate_time");
232 
233 			if (rotate_time) {
234 				ctx->config.client_key_rotate_time = ucl_object_todouble (rotate_time);
235 			}
236 
237 			const ucl_object_t *user_agent;
238 
239 			user_agent = ucl_object_lookup (client_obj, "user_agent");
240 
241 			if (user_agent) {
242 				ctx->config.user_agent = ucl_object_tostring (user_agent);
243 
244 				if (ctx->config.user_agent && strlen (ctx->config.user_agent) == 0) {
245 					ctx->config.user_agent = NULL;
246 				}
247 			}
248 
249 			const ucl_object_t *server_hdr;
250 			server_hdr = ucl_object_lookup (client_obj, "server_hdr");
251 
252 			if (server_hdr) {
253 				ctx->config.server_hdr = ucl_object_tostring (server_hdr);
254 
255 				if (ctx->config.server_hdr && strlen (ctx->config.server_hdr) == 0) {
256 					ctx->config.server_hdr = "";
257 				}
258 			}
259 
260 			const ucl_object_t *keepalive_interval;
261 
262 			keepalive_interval = ucl_object_lookup (client_obj, "keepalive_interval");
263 
264 			if (keepalive_interval) {
265 				ctx->config.keepalive_interval = ucl_object_todouble (keepalive_interval);
266 			}
267 
268 			const ucl_object_t *http_proxy;
269 			http_proxy = ucl_object_lookup (client_obj, "http_proxy");
270 
271 			if (http_proxy) {
272 				ctx->config.http_proxy = ucl_object_tostring (http_proxy);
273 			}
274 		}
275 
276 		server_obj = ucl_object_lookup (http_obj, "server");
277 
278 		if (server_obj) {
279 			const ucl_object_t *kp_size;
280 
281 			kp_size = ucl_object_lookup (server_obj, "cache_size");
282 
283 			if (kp_size) {
284 				ctx->config.kp_cache_size_server = ucl_object_toint (kp_size);
285 			}
286 		}
287 	}
288 
289 	rspamd_http_context_init (ctx);
290 
291 	return ctx;
292 }
293 
294 
295 void
rspamd_http_context_free(struct rspamd_http_context * ctx)296 rspamd_http_context_free (struct rspamd_http_context *ctx)
297 {
298 	if (ctx == default_ctx) {
299 		default_ctx = NULL;
300 	}
301 
302 	if (ctx->client_kp_cache) {
303 		rspamd_keypair_cache_destroy (ctx->client_kp_cache);
304 	}
305 
306 	if (ctx->server_kp_cache) {
307 		rspamd_keypair_cache_destroy (ctx->server_kp_cache);
308 	}
309 
310 	if (ctx->config.client_key_rotate_time > 0) {
311 		ev_timer_stop (ctx->event_loop, &ctx->client_rotate_ev);
312 
313 		if (ctx->client_kp) {
314 			rspamd_keypair_unref (ctx->client_kp);
315 		}
316 	}
317 
318 	struct rspamd_keepalive_hash_key *hk;
319 
320 	kh_foreach_key (ctx->keep_alive_hash, hk, {
321 		msg_debug_http_context ("cleanup keepalive elt %s (%s)",
322 				rspamd_inet_address_to_string_pretty (hk->addr),
323 				hk->host);
324 
325 		if (hk->host) {
326 			g_free (hk->host);
327 		}
328 
329 		rspamd_inet_address_free (hk->addr);
330 		rspamd_http_keepalive_queue_cleanup (&hk->conns);
331 		g_free (hk);
332 	});
333 
334 	kh_destroy (rspamd_keep_alive_hash, ctx->keep_alive_hash);
335 
336 	if (ctx->http_proxies) {
337 		rspamd_upstreams_destroy (ctx->http_proxies);
338 	}
339 
340 	g_free (ctx);
341 }
342 
343 struct rspamd_http_context*
rspamd_http_context_create_config(struct rspamd_http_context_cfg * cfg,struct ev_loop * ev_base,struct upstream_ctx * ups_ctx)344 rspamd_http_context_create_config (struct rspamd_http_context_cfg *cfg,
345 								   struct ev_loop *ev_base,
346 								   struct upstream_ctx *ups_ctx)
347 {
348 	struct rspamd_http_context *ctx;
349 
350 	ctx = rspamd_http_context_new_default (NULL, ev_base, ups_ctx);
351 	memcpy (&ctx->config, cfg, sizeof (*cfg));
352 	rspamd_http_context_init (ctx);
353 
354 	return ctx;
355 }
356 
357 struct rspamd_http_context*
rspamd_http_context_default(void)358 rspamd_http_context_default (void)
359 {
360 	g_assert (default_ctx != NULL);
361 
362 	return default_ctx;
363 }
364 
365 gint32
rspamd_keep_alive_key_hash(struct rspamd_keepalive_hash_key * k)366 rspamd_keep_alive_key_hash (struct rspamd_keepalive_hash_key *k)
367 {
368 	gint32 h;
369 
370 	h = rspamd_inet_address_port_hash (k->addr);
371 
372 	if (k->host) {
373 		h = rspamd_cryptobox_fast_hash (k->host, strlen (k->host), h);
374 	}
375 
376 	return h;
377 }
378 
379 bool
rspamd_keep_alive_key_equal(struct rspamd_keepalive_hash_key * k1,struct rspamd_keepalive_hash_key * k2)380 rspamd_keep_alive_key_equal (struct rspamd_keepalive_hash_key *k1,
381 								  struct rspamd_keepalive_hash_key *k2)
382 {
383 	if (k1->host && k2->host) {
384 		if (rspamd_inet_address_port_equal (k1->addr, k2->addr)) {
385 			return strcmp (k1->host, k2->host) == 0;
386 		}
387 	}
388 	else if (!k1->host && !k2->host) {
389 		return rspamd_inet_address_port_equal (k1->addr, k2->addr);
390 	}
391 
392 	/* One has host and another has no host */
393 	return false;
394 }
395 
396 struct rspamd_http_connection*
rspamd_http_context_check_keepalive(struct rspamd_http_context * ctx,const rspamd_inet_addr_t * addr,const gchar * host)397 rspamd_http_context_check_keepalive (struct rspamd_http_context *ctx,
398 		const rspamd_inet_addr_t *addr,
399 		const gchar *host)
400 {
401 	struct rspamd_keepalive_hash_key hk, *phk;
402 	khiter_t k;
403 
404 	hk.addr = (rspamd_inet_addr_t *)addr;
405 	hk.host = (gchar *)host;
406 
407 	k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
408 
409 	if (k != kh_end (ctx->keep_alive_hash)) {
410 		phk = kh_key (ctx->keep_alive_hash, k);
411 		GQueue *conns = &phk->conns;
412 
413 		/* Use stack based approach */
414 
415 		if (g_queue_get_length (conns) > 0) {
416 			struct rspamd_http_keepalive_cbdata *cbd;
417 			struct rspamd_http_connection *conn;
418 			gint err;
419 			socklen_t len = sizeof (gint);
420 
421 			cbd = g_queue_pop_head (conns);
422 			rspamd_ev_watcher_stop (ctx->event_loop, &cbd->ev);
423 			conn = cbd->conn;
424 			g_free (cbd);
425 
426 			if (getsockopt (conn->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len) == -1) {
427 				err = errno;
428 			}
429 
430 			if (err != 0) {
431 				rspamd_http_connection_unref (conn);
432 
433 				msg_debug_http_context ("invalid reused keepalive element %s (%s); "
434 							"%s error; "
435 							"%d connections queued",
436 						rspamd_inet_address_to_string_pretty (phk->addr),
437 						phk->host,
438 						g_strerror (err),
439 						conns->length);
440 
441 				return NULL;
442 			}
443 
444 			msg_debug_http_context ("reused keepalive element %s (%s), %d connections queued",
445 					rspamd_inet_address_to_string_pretty (phk->addr),
446 					phk->host, conns->length);
447 
448 			/* We transfer refcount here! */
449 			return conn;
450 		}
451 		else {
452 			msg_debug_http_context ("found empty keepalive element %s (%s), cannot reuse",
453 					rspamd_inet_address_to_string_pretty (phk->addr),
454 					phk->host);
455 		}
456 	}
457 
458 	return NULL;
459 }
460 
461 void
rspamd_http_context_prepare_keepalive(struct rspamd_http_context * ctx,struct rspamd_http_connection * conn,const rspamd_inet_addr_t * addr,const gchar * host)462 rspamd_http_context_prepare_keepalive (struct rspamd_http_context *ctx,
463 											struct rspamd_http_connection *conn,
464 											const rspamd_inet_addr_t *addr,
465 											const gchar *host)
466 {
467 	struct rspamd_keepalive_hash_key hk, *phk;
468 	khiter_t k;
469 
470 	hk.addr = (rspamd_inet_addr_t *)addr;
471 	hk.host = (gchar *)host;
472 
473 	k = kh_get (rspamd_keep_alive_hash, ctx->keep_alive_hash, &hk);
474 
475 	if (k != kh_end (ctx->keep_alive_hash)) {
476 		/* Reuse existing */
477 		conn->keepalive_hash_key = kh_key (ctx->keep_alive_hash, k);
478 		msg_debug_http_context ("use existing keepalive element %s (%s)",
479 				rspamd_inet_address_to_string_pretty (conn->keepalive_hash_key->addr),
480 				conn->keepalive_hash_key->host);
481 	}
482 	else {
483 		/* Create new one */
484 		GQueue empty_init = G_QUEUE_INIT;
485 		gint r;
486 
487 		phk = g_malloc (sizeof (*phk));
488 		phk->conns = empty_init;
489 		phk->host = g_strdup (host);
490 		phk->addr = rspamd_inet_address_copy (addr);
491 
492 		kh_put (rspamd_keep_alive_hash, ctx->keep_alive_hash, phk, &r);
493 		conn->keepalive_hash_key = phk;
494 
495 		msg_debug_http_context ("create new keepalive element %s (%s)",
496 				rspamd_inet_address_to_string_pretty (conn->keepalive_hash_key->addr),
497 				conn->keepalive_hash_key->host);
498 	}
499 }
500 
501 static void
rspamd_http_keepalive_handler(gint fd,short what,gpointer ud)502 rspamd_http_keepalive_handler (gint fd, short what, gpointer ud)
503 {
504 	struct rspamd_http_keepalive_cbdata *cbdata =
505 			(struct rspamd_http_keepalive_cbdata *)ud;/*
506 	 * We can get here if a remote side reported something or it has
507 	 * timed out. In both cases we just terminate keepalive connection.
508 	 */
509 
510 	g_queue_delete_link (cbdata->queue, cbdata->link);
511 	msg_debug_http_context ("remove keepalive element %s (%s), %d connections left",
512 			rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr),
513 			cbdata->conn->keepalive_hash_key->host,
514 			cbdata->queue->length);
515 	/* unref call closes fd, so we need to remove ev watcher first! */
516 	rspamd_ev_watcher_stop (cbdata->ctx->event_loop, &cbdata->ev);
517 	rspamd_http_connection_unref (cbdata->conn);
518 	g_free (cbdata);
519 }
520 
521 void
rspamd_http_context_push_keepalive(struct rspamd_http_context * ctx,struct rspamd_http_connection * conn,struct rspamd_http_message * msg,struct ev_loop * event_loop)522 rspamd_http_context_push_keepalive (struct rspamd_http_context *ctx,
523 									struct rspamd_http_connection *conn,
524 									struct rspamd_http_message *msg,
525 									struct ev_loop *event_loop)
526 {
527 	struct rspamd_http_keepalive_cbdata *cbdata;
528 	gdouble timeout = ctx->config.keepalive_interval;
529 
530 	g_assert (conn->keepalive_hash_key != NULL);
531 
532 	if (msg) {
533 		const rspamd_ftok_t *tok;
534 		rspamd_ftok_t cmp;
535 
536 		tok = rspamd_http_message_find_header (msg, "Connection");
537 
538 		if (!tok) {
539 			/* Server has not stated that it can do keep alive */
540 			conn->finished = TRUE;
541 			msg_debug_http_context ("no Connection header");
542 			return;
543 		}
544 
545 		RSPAMD_FTOK_ASSIGN (&cmp, "keep-alive");
546 
547 		if (rspamd_ftok_casecmp (&cmp, tok) != 0) {
548 			conn->finished = TRUE;
549 			msg_debug_http_context ("connection header is not `keep-alive`");
550 			return;
551 		}
552 
553 		/* We can proceed, check timeout */
554 
555 		tok = rspamd_http_message_find_header (msg, "Keep-Alive");
556 
557 		if (tok) {
558 			goffset pos = rspamd_substring_search_caseless (tok->begin,
559 					tok->len, "timeout=", sizeof ("timeout=") - 1);
560 
561 			if (pos != -1) {
562 				pos += sizeof ("timeout=");
563 
564 				gchar *end_pos = memchr (tok->begin + pos, ',', tok->len - pos);
565 				glong real_timeout;
566 
567 				if (end_pos) {
568 					if (rspamd_strtol (tok->begin + pos + 1,
569 							(end_pos - tok->begin) - pos - 1, &real_timeout) &&
570 						real_timeout > 0) {
571 						timeout = real_timeout;
572 						msg_debug_http_context ("got timeout attr %.2f", timeout);
573 					}
574 				}
575 				else {
576 					if (rspamd_strtol (tok->begin + pos + 1,
577 							tok->len - pos - 1, &real_timeout) &&
578 						real_timeout > 0) {
579 						timeout = real_timeout;
580 						msg_debug_http_context ("got timeout attr %.2f", timeout);
581 					}
582 				}
583 			}
584 		}
585 	}
586 
587 	/* Move connection to the keepalive pool */
588 	cbdata = g_malloc0 (sizeof (*cbdata));
589 
590 	cbdata->conn = rspamd_http_connection_ref (conn);
591 	/* Use stack like approach to that would easy reading */
592 	g_queue_push_head (&conn->keepalive_hash_key->conns, cbdata);
593 	cbdata->link = conn->keepalive_hash_key->conns.head;
594 
595 	cbdata->queue = &conn->keepalive_hash_key->conns;
596 	cbdata->ctx = ctx;
597 	conn->finished = FALSE;
598 
599 	rspamd_ev_watcher_init (&cbdata->ev, conn->fd, EV_READ,
600 			rspamd_http_keepalive_handler,
601 			cbdata);
602 	rspamd_ev_watcher_start (event_loop, &cbdata->ev, timeout);
603 
604 	msg_debug_http_context ("push keepalive element %s (%s), %d connections queued, %.1f timeout",
605 			rspamd_inet_address_to_string_pretty (cbdata->conn->keepalive_hash_key->addr),
606 			cbdata->conn->keepalive_hash_key->host,
607 			cbdata->queue->length,
608 			timeout);
609 }