1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "str.h"
7 #include "strescape.h"
8 #include "log-throttle.h"
9 #include "ipc-client.h"
10 #include "program-client.h"
11 #include "var-expand.h"
12 #include "istream.h"
13 #include "ostream.h"
14 #include "iostream-temp.h"
15 #include "mail-user-hash.h"
16 #include "user-directory.h"
17 #include "mail-host.h"
18 #include "director-host.h"
19 #include "director-connection.h"
20 #include "director.h"
21 
22 #define DIRECTOR_IPC_PROXY_PATH "ipc"
23 #define DIRECTOR_DNS_SOCKET_PATH "dns-client"
24 #define DIRECTOR_RECONNECT_RETRY_SECS 60
25 #define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
26 #define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
27 #define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000)
28 #define DIRECTOR_RING_MIN_WAIT_SECS 20
29 #define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
30 #define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30)
31 
32 const char *user_kill_state_names[USER_KILL_STATE_DELAY+1] = {
33 	"none",
34 	"killing",
35 	"notify-received",
36 	"waiting-for-notify",
37 	"waiting-for-everyone",
38 	"flushing",
39 	"delay",
40 };
41 
42 static struct event_category event_category_director = {
43 	.name = "director",
44 };
45 
46 static struct log_throttle *user_move_throttle;
47 static struct log_throttle *user_kill_fail_throttle;
48 
49 static void director_hosts_purge_removed(struct director *dir);
50 
51 static const struct log_throttle_settings director_log_throttle_settings = {
52 	.throttle_at_max_per_interval = 100,
53 	.unthrottle_at_max_per_interval = 2,
54 };
55 
56 static void
57 director_user_kill_finish_delayed(struct director *dir, struct user *user,
58 				  bool skip_delay);
59 
director_is_self_ip_set(struct director * dir)60 static bool director_is_self_ip_set(struct director *dir)
61 {
62 	if (net_ip_compare(&dir->self_ip, &net_ip4_any))
63 		return FALSE;
64 
65 	if (net_ip_compare(&dir->self_ip, &net_ip6_any))
66 		return FALSE;
67 
68 	return TRUE;
69 }
70 
director_find_self_ip(struct director * dir)71 static void director_find_self_ip(struct director *dir)
72 {
73 	struct director_host *const *hosts;
74 	unsigned int i, count;
75 
76 	hosts = array_get(&dir->dir_hosts, &count);
77 	for (i = 0; i < count; i++) {
78 		if (net_try_bind(&hosts[i]->ip) == 0) {
79 			dir->self_ip = hosts[i]->ip;
80 			return;
81 		}
82 	}
83 	i_fatal("director_servers doesn't list ourself");
84 }
85 
director_find_self(struct director * dir)86 void director_find_self(struct director *dir)
87 {
88 	if (dir->self_host != NULL)
89 		return;
90 
91 	if (!director_is_self_ip_set(dir))
92 		director_find_self_ip(dir);
93 
94 	dir->self_host = director_host_lookup(dir, &dir->self_ip,
95 					      dir->self_port);
96 	if (dir->self_host == NULL) {
97 		i_fatal("director_servers doesn't list ourself (%s:%u)",
98 			net_ip2addr(&dir->self_ip), dir->self_port);
99 	}
100 	dir->self_host->self = TRUE;
101 }
102 
director_find_self_idx(struct director * dir)103 static unsigned int director_find_self_idx(struct director *dir)
104 {
105 	struct director_host *const *hosts;
106 	unsigned int i, count;
107 
108 	i_assert(dir->self_host != NULL);
109 
110 	hosts = array_get(&dir->dir_hosts, &count);
111 	for (i = 0; i < count; i++) {
112 		if (hosts[i] == dir->self_host)
113 			return i;
114 	}
115 	i_unreached();
116 }
117 
118 static bool
director_has_outgoing_connection(struct director * dir,struct director_host * host)119 director_has_outgoing_connection(struct director *dir,
120 				 struct director_host *host)
121 {
122 	struct director_connection *conn;
123 
124 	array_foreach_elem(&dir->connections, conn) {
125 		if (director_connection_get_host(conn) == host &&
126 		    !director_connection_is_incoming(conn))
127 			return TRUE;
128 	}
129 	return FALSE;
130 }
131 
132 static void
director_log_connect(struct director * dir,struct director_host * host,const char * reason)133 director_log_connect(struct director *dir, struct director_host *host,
134 		     const char *reason)
135 {
136 	string_t *str = t_str_new(128);
137 
138 	if (host->last_network_failure > 0) {
139 		str_printfa(str, ", last network failure %ds ago",
140 			    (int)(ioloop_time - host->last_network_failure));
141 	}
142 	if (host->last_protocol_failure > 0) {
143 		str_printfa(str, ", last protocol failure %ds ago",
144 			    (int)(ioloop_time - host->last_protocol_failure));
145 	}
146 	e_info(dir->event, "Connecting to %s:%u (as %s%s): %s",
147 	       host->ip_str, host->port,
148 	       net_ip2addr(&dir->self_ip), str_c(str), reason);
149 }
150 
director_connect_host(struct director * dir,struct director_host * host,const char * reason)151 int director_connect_host(struct director *dir, struct director_host *host,
152 			  const char *reason)
153 {
154 	in_port_t port;
155 	int fd;
156 
157 	if (director_has_outgoing_connection(dir, host))
158 		return 0;
159 
160 	director_log_connect(dir, host, reason);
161 	port = dir->test_port != 0 ? dir->test_port : host->port;
162 	fd = net_connect_ip(&host->ip, port, &dir->self_ip);
163 	if (fd == -1) {
164 		host->last_network_failure = ioloop_time;
165 		e_error(dir->event, "connect(%s) failed: %m", host->name);
166 		return -1;
167 	}
168 	/* Reset timestamp so that director_connect() won't skip this host
169 	   while we're still trying to connect to it */
170 	host->last_network_failure = 0;
171 
172 	(void)director_connection_init_out(dir, fd, host);
173 	return 0;
174 }
175 
176 static struct director_host *
director_get_preferred_right_host(struct director * dir)177 director_get_preferred_right_host(struct director *dir)
178 {
179 	struct director_host *const *hosts, *host;
180 	unsigned int i, count, self_idx;
181 
182 	hosts = array_get(&dir->dir_hosts, &count);
183 	if (count == 1) {
184 		/* self */
185 		return NULL;
186 	}
187 
188 	self_idx = director_find_self_idx(dir);
189 	for (i = 0; i < count; i++) {
190 		host = hosts[(self_idx + i + 1) % count];
191 		if (!host->removed)
192 			return host;
193 	}
194 	/* self, with some removed hosts */
195 	return NULL;
196 }
197 
director_quick_reconnect_retry(struct director * dir)198 static void director_quick_reconnect_retry(struct director *dir)
199 {
200 	director_connect(dir, "Alone in director ring - trying to connect to others");
201 }
202 
director_wait_for_others(struct director * dir)203 static bool director_wait_for_others(struct director *dir)
204 {
205 	struct director_host *host;
206 
207 	/* don't assume we're alone until we've attempted to connect
208 	   to others for a while */
209 	if (dir->ring_first_alone != 0 &&
210 	    ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
211 		return FALSE;
212 
213 	if (dir->ring_first_alone == 0)
214 		dir->ring_first_alone = ioloop_time;
215 	/* reset all failures and try again */
216 	array_foreach_elem(&dir->dir_hosts, host) {
217 		host->last_network_failure = 0;
218 		host->last_protocol_failure = 0;
219 	}
220 	timeout_remove(&dir->to_reconnect);
221 	dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
222 					director_quick_reconnect_retry, dir);
223 	return TRUE;
224 }
225 
director_connect(struct director * dir,const char * reason)226 void director_connect(struct director *dir, const char *reason)
227 {
228 	struct director_host *const *hosts;
229 	unsigned int i, count, self_idx;
230 
231 	self_idx = director_find_self_idx(dir);
232 
233 	/* try to connect to first working server on our right side.
234 	   the left side is supposed to connect to us. */
235 	hosts = array_get(&dir->dir_hosts, &count);
236 	for (i = 1; i < count; i++) {
237 		unsigned int idx = (self_idx + i) % count;
238 
239 		if (hosts[idx]->removed)
240 			continue;
241 
242 		if (hosts[idx]->last_network_failure +
243 		    DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
244 			/* connection failed recently, don't try retrying here */
245 			continue;
246 		}
247 		if (hosts[idx]->last_protocol_failure +
248 		    DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) {
249 			/* the director recently sent invalid protocol data,
250 			   don't try retrying yet */
251 			continue;
252 		}
253 
254 		if (director_connect_host(dir, hosts[idx], reason) == 0) {
255 			/* success */
256 			return;
257 		}
258 	}
259 
260 	if (count > 1 && director_wait_for_others(dir))
261 		return;
262 
263 	/* we're the only one */
264 	if (count > 1) {
265 		e_warning(dir->event,
266 			  "director: Couldn't connect to right side, "
267 			  "we must be the only director left");
268 	}
269 	if (dir->left != NULL) {
270 		/* since we couldn't connect to it,
271 		   it must have failed recently */
272 		e_warning(dir->event,
273 			  "director: Assuming %s is dead, disconnecting",
274 			  director_connection_get_name(dir->left));
275 		director_connection_deinit(&dir->left,
276 					   "This connection is dead?");
277 	}
278 	dir->ring_min_version = DIRECTOR_VERSION_MINOR;
279 	if (!dir->ring_handshaked)
280 		director_set_ring_handshaked(dir);
281 	else if (!dir->ring_synced)
282 		director_set_ring_synced(dir);
283 }
284 
director_set_ring_handshaked(struct director * dir)285 void director_set_ring_handshaked(struct director *dir)
286 {
287 	i_assert(!dir->ring_handshaked);
288 
289 	timeout_remove(&dir->to_handshake_warning);
290 	if (dir->ring_handshake_warning_sent) {
291 		e_warning(dir->event,
292 			  "Directors have been connected, "
293 			  "continuing delayed requests");
294 		dir->ring_handshake_warning_sent = FALSE;
295 	}
296 	e_debug(dir->event, "Director ring handshaked");
297 
298 	dir->ring_handshaked = TRUE;
299 	director_set_ring_synced(dir);
300 }
301 
director_reconnect_timeout(struct director * dir)302 static void director_reconnect_timeout(struct director *dir)
303 {
304 	struct director_host *cur_host, *preferred_host =
305 		director_get_preferred_right_host(dir);
306 
307 	cur_host = dir->right == NULL ? NULL :
308 		director_connection_get_host(dir->right);
309 
310 	if (preferred_host == NULL) {
311 		/* all directors have been removed, try again later */
312 	} else if (cur_host != preferred_host) {
313 		(void)director_connect_host(dir, preferred_host,
314 			"Reconnect attempt to preferred director");
315 	} else {
316 		/* the connection hasn't finished sync yet.
317 		   keep this timeout for now. */
318 	}
319 }
320 
director_set_ring_synced(struct director * dir)321 void director_set_ring_synced(struct director *dir)
322 {
323 	struct director_host *host;
324 
325 	i_assert(!dir->ring_synced);
326 	i_assert((dir->left != NULL && dir->right != NULL) ||
327 		 (dir->left == NULL && dir->right == NULL));
328 
329 	timeout_remove(&dir->to_handshake_warning);
330 	if (dir->ring_handshake_warning_sent) {
331 		e_warning(dir->event,
332 			  "Ring is synced, continuing delayed requests "
333 			  "(syncing took %d secs, hosts_hash=%u)",
334 			  (int)(ioloop_time - dir->ring_last_sync_time),
335 			  mail_hosts_hash(dir->mail_hosts));
336 		dir->ring_handshake_warning_sent = FALSE;
337 	}
338 
339 	host = dir->right == NULL ? NULL :
340 		director_connection_get_host(dir->right);
341 
342 	timeout_remove(&dir->to_reconnect);
343 	if (host != director_get_preferred_right_host(dir)) {
344 		/* try to reconnect to preferred host later */
345 		dir->to_reconnect =
346 			timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
347 				    director_reconnect_timeout, dir);
348 	}
349 
350 	if (dir->left != NULL)
351 		director_connection_set_synced(dir->left, TRUE);
352 	if (dir->right != NULL)
353 		director_connection_set_synced(dir->right, TRUE);
354 	timeout_remove(&dir->to_sync);
355 	dir->ring_synced = TRUE;
356 	dir->ring_last_sync_time = ioloop_time;
357 	/* If there are any director hosts still marked as "removed", we can
358 	   safely remove those now. The entire director cluster knows about the
359 	   removal now. */
360 	director_hosts_purge_removed(dir);
361 	mail_hosts_set_synced(dir->mail_hosts);
362 	director_set_state_changed(dir);
363 }
364 
director_sync_send(struct director * dir,struct director_host * host,uint32_t seq,unsigned int minor_version,unsigned int timestamp,unsigned int hosts_hash)365 void director_sync_send(struct director *dir, struct director_host *host,
366 			uint32_t seq, unsigned int minor_version,
367 			unsigned int timestamp, unsigned int hosts_hash)
368 {
369 	string_t *str;
370 
371 	if (host == dir->self_host) {
372 		dir->last_sync_sent_ring_change_counter = dir->ring_change_counter;
373 		dir->last_sync_start_time = ioloop_timeval;
374 	}
375 
376 	str = t_str_new(128);
377 	str_printfa(str, "SYNC\t%s\t%u\t%u",
378 		    host->ip_str, host->port, seq);
379 	if (minor_version > 0 &&
380 	    director_connection_get_minor_version(dir->right) > 0) {
381 		/* only minor_version>0 supports extra parameters */
382 		str_printfa(str, "\t%u\t%u\t%u", minor_version,
383 			    timestamp, hosts_hash);
384 	}
385 	str_append_c(str, '\n');
386 	director_connection_send(dir->right, str_c(str));
387 
388 	/* ping our connections in case either of them are hanging.
389 	   if they are, we want to know it fast. */
390 	if (dir->left != NULL)
391 		director_connection_ping(dir->left);
392 	director_connection_ping(dir->right);
393 }
394 
395 static bool
director_has_any_outgoing_connections(struct director * dir)396 director_has_any_outgoing_connections(struct director *dir)
397 {
398 	struct director_connection *conn;
399 
400 	array_foreach_elem(&dir->connections, conn) {
401 		if (!director_connection_is_incoming(conn))
402 			return TRUE;
403 	}
404 	return FALSE;
405 }
406 
director_resend_sync(struct director * dir)407 bool director_resend_sync(struct director *dir)
408 {
409 	if (dir->ring_synced) {
410 		/* everything ok, no need to do anything */
411 		return FALSE;
412 	}
413 
414 	if (dir->right == NULL) {
415 		/* right side connection is missing. make sure we're not
416 		   hanging due to some bug. */
417 		if (dir->to_reconnect == NULL &&
418 		    !director_has_any_outgoing_connections(dir)) {
419 			e_warning(dir->event,
420 				  "Right side connection is unexpectedly lost, reconnecting");
421 			director_connect(dir, "Right side connection lost");
422 		}
423 	} else if (dir->left != NULL) {
424 		/* send a new SYNC in case the previous one got dropped */
425 		dir->self_host->last_sync_timestamp = ioloop_time;
426 		director_sync_send(dir, dir->self_host, dir->sync_seq,
427 				   DIRECTOR_VERSION_MINOR, ioloop_time,
428 				   mail_hosts_hash(dir->mail_hosts));
429 		if (dir->to_sync != NULL)
430 			timeout_reset(dir->to_sync);
431 		return TRUE;
432 	}
433 	return FALSE;
434 }
435 
director_sync_timeout(struct director * dir)436 static void director_sync_timeout(struct director *dir)
437 {
438 	i_assert(!dir->ring_synced);
439 
440 	if (director_resend_sync(dir))
441 		e_error(dir->event, "Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq);
442 }
443 
director_set_ring_unsynced(struct director * dir)444 void director_set_ring_unsynced(struct director *dir)
445 {
446 	if (dir->ring_synced) {
447 		dir->ring_synced = FALSE;
448 		dir->ring_last_sync_time = ioloop_time;
449 	}
450 
451 	if (dir->to_sync == NULL) {
452 		dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
453 					   director_sync_timeout, dir);
454 	} else {
455 		timeout_reset(dir->to_sync);
456 	}
457 }
458 
director_sync(struct director * dir)459 static void director_sync(struct director *dir)
460 {
461 	/* we're synced again when we receive this SYNC back */
462 	dir->sync_seq++;
463 	if (dir->right == NULL && dir->left == NULL) {
464 		/* we're alone. if we're already synced,
465 		   don't become unsynced. */
466 		return;
467 	}
468 	director_set_ring_unsynced(dir);
469 
470 	if (dir->sync_frozen) {
471 		dir->sync_pending = TRUE;
472 		return;
473 	}
474 	if (dir->right == NULL) {
475 		i_assert(!dir->ring_synced ||
476 			 (dir->left == NULL && dir->right == NULL));
477 		e_debug(dir->event, "Ring is desynced (seq=%u, no right connection)",
478 			dir->sync_seq);
479 		return;
480 	}
481 
482 	e_debug(dir->event, "Ring is desynced (seq=%u, sending SYNC to %s)",
483 		dir->sync_seq, dir->right == NULL ? "(nowhere)" :
484 		director_connection_get_name(dir->right));
485 
486 	/* send PINGs to our connections more rapidly until we've synced again.
487 	   if the connection has actually died, we don't need to wait (and
488 	   delay requests) for as long to detect it */
489 	if (dir->left != NULL)
490 		director_connection_set_synced(dir->left, FALSE);
491 	director_connection_set_synced(dir->right, FALSE);
492 	director_sync_send(dir, dir->self_host, dir->sync_seq,
493 			   DIRECTOR_VERSION_MINOR, ioloop_time,
494 			   mail_hosts_hash(dir->mail_hosts));
495 }
496 
director_sync_freeze(struct director * dir)497 void director_sync_freeze(struct director *dir)
498 {
499 	struct director_connection *conn;
500 
501 	i_assert(!dir->sync_frozen);
502 	i_assert(!dir->sync_pending);
503 
504 	array_foreach_elem(&dir->connections, conn)
505 		director_connection_cork(conn);
506 	dir->sync_frozen = TRUE;
507 }
508 
director_sync_thaw(struct director * dir)509 void director_sync_thaw(struct director *dir)
510 {
511 	struct director_connection *conn;
512 
513 	i_assert(dir->sync_frozen);
514 
515 	dir->sync_frozen = FALSE;
516 	if (dir->sync_pending) {
517 		dir->sync_pending = FALSE;
518 		director_sync(dir);
519 	}
520 	array_foreach_elem(&dir->connections, conn)
521 		director_connection_uncork(conn);
522 }
523 
director_notify_ring_added(struct director_host * added_host,struct director_host * src,bool log)524 void director_notify_ring_added(struct director_host *added_host,
525 				struct director_host *src, bool log)
526 {
527 	const char *cmd;
528 
529 	if (log) {
530 		e_info(added_host->dir->event,
531 		       "Adding director %s to ring (requested by %s)",
532 		       added_host->name, src->name);
533 	}
534 
535 	added_host->dir->ring_change_counter++;
536 	cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
537 			      added_host->ip_str, added_host->port);
538 	director_update_send(added_host->dir, src, cmd);
539 }
540 
director_hosts_purge_removed(struct director * dir)541 static void director_hosts_purge_removed(struct director *dir)
542 {
543 	struct director_host *const *hosts, *host;
544 	unsigned int i, count;
545 
546 	timeout_remove(&dir->to_remove_dirs);
547 
548 	hosts = array_get(&dir->dir_hosts, &count);
549 	for (i = 0; i < count; ) {
550 		if (hosts[i]->removed) {
551 			host = hosts[i];
552 			director_host_free(&host);
553 			hosts = array_get(&dir->dir_hosts, &count);
554 		} else {
555 			i++;
556 		}
557 	}
558 }
559 
director_ring_remove(struct director_host * removed_host,struct director_host * src)560 void director_ring_remove(struct director_host *removed_host,
561 			  struct director_host *src)
562 {
563 	struct director *dir = removed_host->dir;
564 	struct director_connection *const *conns, *conn;
565 	unsigned int i, count;
566 	const char *cmd;
567 
568 	e_info(dir->event, "Removing director %s from ring (requested by %s)",
569 	       removed_host->name, src->name);
570 
571 	if (removed_host->self && !src->self) {
572 		/* others will just disconnect us */
573 		return;
574 	}
575 
576 	if (!removed_host->self) {
577 		/* mark the host as removed and fully remove it later. this
578 		   delay is needed, because the removal may trigger director
579 		   reconnections, which may send the director back and we don't
580 		   want to re-add it */
581 		removed_host->removed = TRUE;
582 		if (dir->to_remove_dirs == NULL) {
583 			dir->to_remove_dirs =
584 				timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
585 					    director_hosts_purge_removed, dir);
586 		}
587 	}
588 
589 	/* if our left or ride side gets removed, notify them first
590 	   before disconnecting. */
591 	cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
592 			      removed_host->ip_str, removed_host->port);
593 	director_update_send_version(dir, src,
594 				     DIRECTOR_VERSION_RING_REMOVE, cmd);
595 
596 	/* disconnect any connections to the host */
597 	conns = array_get(&dir->connections, &count);
598 	for (i = 0; i < count; ) {
599 		conn = conns[i];
600 		if (director_connection_get_host(conn) != removed_host ||
601 		    removed_host->self)
602 			i++;
603 		else {
604 			director_connection_deinit(&conn, "Removing from ring");
605 			conns = array_get(&dir->connections, &count);
606 		}
607 	}
608 	if (dir->right == NULL)
609 		director_connect(dir, "Reconnecting after director was removed");
610 	director_sync(dir);
611 }
612 
613 static void
director_send_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)614 director_send_host(struct director *dir, struct director_host *src,
615 		   struct director_host *orig_src,
616 		   struct mail_host *host)
617 {
618 	const char *host_tag = mail_host_get_tag(host);
619 	string_t *str;
620 
621 	if (orig_src == NULL) {
622 		orig_src = dir->self_host;
623 		orig_src->last_seq++;
624 	}
625 
626 	str = t_str_new(128);
627 	str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u",
628 		    orig_src->ip_str, orig_src->port, orig_src->last_seq,
629 		    host->ip_str, host->vhost_count);
630 	if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) {
631 		str_append_c(str, '\t');
632 		str_append_tabescaped(str, host_tag);
633 	} else if (host_tag[0] != '\0' &&
634 		   dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) {
635 		if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) {
636 			e_error(dir->event, "Ring has directors that don't support tags - removing host %s with tag '%s'",
637 				host->ip_str, host_tag);
638 		} else {
639 			e_error(dir->event, "Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
640 				host->ip_str, host_tag);
641 		}
642 		director_remove_host(dir, NULL, NULL, host);
643 		return;
644 	}
645 	if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) {
646 		str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
647 			    (long)host->last_updown_change);
648 		/* add any further version checks here - these directors ignore
649 		   any extra unknown arguments */
650 		if (host->hostname != NULL)
651 			str_append_tabescaped(str, host->hostname);
652 	}
653 	str_append_c(str, '\n');
654 	director_update_send(dir, src, str_c(str));
655 }
656 
director_resend_hosts(struct director * dir)657 void director_resend_hosts(struct director *dir)
658 {
659 	struct mail_host *host;
660 
661 	array_foreach_elem(mail_hosts_get(dir->mail_hosts), host)
662 		director_send_host(dir, dir->self_host, NULL, host);
663 }
664 
director_update_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)665 void director_update_host(struct director *dir, struct director_host *src,
666 			  struct director_host *orig_src,
667 			  struct mail_host *host)
668 {
669 	/* update state in case this is the first mail host being added */
670 	director_set_state_changed(dir);
671 
672 	e_debug(dir->event, "Updating host %s vhost_count=%u "
673 		"down=%d last_updown_change=%ld (hosts_hash=%u)",
674 		host->ip_str, host->vhost_count, host->down ? 1 : 0,
675 		(long)host->last_updown_change,
676 		mail_hosts_hash(dir->mail_hosts));
677 
678 	director_send_host(dir, src, orig_src, host);
679 
680 	/* mark the host desynced until ring is synced again. except if we're
681 	   alone in the ring that never happens. */
682 	if (dir->right != NULL || dir->left != NULL)
683 		host->desynced = TRUE;
684 	director_sync(dir);
685 }
686 
director_remove_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)687 void director_remove_host(struct director *dir, struct director_host *src,
688 			  struct director_host *orig_src,
689 			  struct mail_host *host)
690 {
691 	struct user_directory *users = host->tag->users;
692 
693 	if (src != NULL) {
694 		if (orig_src == NULL) {
695 			orig_src = dir->self_host;
696 			orig_src->last_seq++;
697 		}
698 
699 		director_update_send(dir, src, t_strdup_printf(
700 			"HOST-REMOVE\t%s\t%u\t%u\t%s\n",
701 			orig_src->ip_str, orig_src->port,
702 			orig_src->last_seq, host->ip_str));
703 	}
704 
705 	user_directory_remove_host(users, host);
706 	mail_host_remove(host);
707 	director_sync(dir);
708 }
709 
director_flush_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)710 void director_flush_host(struct director *dir, struct director_host *src,
711 			 struct director_host *orig_src,
712 			 struct mail_host *host)
713 {
714 	struct user_directory *users = host->tag->users;
715 
716 	if (orig_src == NULL) {
717 		orig_src = dir->self_host;
718 		orig_src->last_seq++;
719 	}
720 
721 	director_update_send(dir, src, t_strdup_printf(
722 		"HOST-FLUSH\t%s\t%u\t%u\t%s\n",
723 		orig_src->ip_str, orig_src->port, orig_src->last_seq,
724 		host->ip_str));
725 	user_directory_remove_host(users, host);
726 	director_sync(dir);
727 }
728 
director_update_user(struct director * dir,struct director_host * src,struct user * user)729 void director_update_user(struct director *dir, struct director_host *src,
730 			  struct user *user)
731 {
732 	struct director_connection *conn;
733 
734 	i_assert(src != NULL);
735 	i_assert(!user->weak);
736 
737 	array_foreach_elem(&dir->connections, conn) {
738 		if (director_connection_get_host(conn) == src)
739 			continue;
740 
741 		if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_USER_TIMESTAMP) {
742 			director_connection_send(conn, t_strdup_printf(
743 				"USER\t%u\t%s\t%u\n", user->username_hash, user->host->ip_str,
744 				user->timestamp));
745 		} else {
746 			director_connection_send(conn, t_strdup_printf(
747 				"USER\t%u\t%s\n", user->username_hash, user->host->ip_str));
748 		}
749 	}
750 }
751 
director_update_user_weak(struct director * dir,struct director_host * src,struct director_connection * src_conn,struct director_host * orig_src,struct user * user)752 void director_update_user_weak(struct director *dir, struct director_host *src,
753 			       struct director_connection *src_conn,
754 			       struct director_host *orig_src,
755 			       struct user *user)
756 {
757 	const char *cmd;
758 
759 	i_assert(src != NULL);
760 	i_assert(user->weak);
761 
762 	if (orig_src == NULL) {
763 		orig_src = dir->self_host;
764 		orig_src->last_seq++;
765 	}
766 
767 	cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
768 		orig_src->ip_str, orig_src->port, orig_src->last_seq,
769 		user->username_hash, user->host->ip_str);
770 
771 	if (src != dir->self_host && dir->left != NULL && dir->right != NULL &&
772 	    director_connection_get_host(dir->left) ==
773 	    director_connection_get_host(dir->right)) {
774 		/* only two directors in this ring and we're forwarding
775 		   USER-WEAK from one director back to itself via another
776 		   so it sees we've received it. we can't use
777 		   director_update_send() for this, because it doesn't send
778 		   data back to the source. */
779 		if (dir->right == src_conn)
780 			director_connection_send(dir->left, cmd);
781 		else if (dir->left == src_conn)
782 			director_connection_send(dir->right, cmd);
783 		else
784 			i_unreached();
785 	} else {
786 		director_update_send(dir, src, cmd);
787 	}
788 }
789 
790 static void
director_flush_user_continue(enum program_client_exit_status result,struct director_kill_context * ctx)791 director_flush_user_continue(enum program_client_exit_status result,
792 			     struct director_kill_context *ctx)
793 {
794 	struct director *dir = ctx->dir;
795 	ctx->callback_pending = FALSE;
796 
797 	struct user *user = user_directory_lookup(ctx->tag->users,
798 						  ctx->username_hash);
799 
800 	if (result == PROGRAM_CLIENT_EXIT_STATUS_FAILURE) {
801 		struct istream *is = iostream_temp_finish(&ctx->reply, SIZE_MAX);
802 		char *data;
803 		i_stream_set_return_partial_line(is, TRUE);
804 		data = i_stream_read_next_line(is);
805 		e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
806 			ctx->socket_path,
807 			ctx->username_hash,
808 			net_ip2addr(&ctx->host_ip),
809 			data == NULL ? "(no output to stdout)" : data);
810 		while((data = i_stream_read_next_line(is)) != NULL) {
811 			e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
812 				ctx->socket_path,
813 				ctx->username_hash,
814 				net_ip2addr(&ctx->host_ip), data);
815 		}
816 		i_stream_unref(&is);
817 	} else {
818 		o_stream_unref(&ctx->reply);
819 	}
820 	program_client_destroy(&ctx->pclient);
821 
822 	if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
823 		/* user was already freed - ignore */
824 		e_debug(dir->event, "User %u freed while flushing, result=%d",
825 			ctx->username_hash, result);
826 		i_assert(ctx->to_move == NULL);
827 		i_free(ctx);
828 	} else {
829 		/* ctx is freed later via user->kill_ctx */
830 		e_debug(dir->event, "Flushing user %u finished, result=%d",
831 			ctx->username_hash, result);
832 		director_user_kill_finish_delayed(dir, user,
833 			result == PROGRAM_CLIENT_EXIT_STATUS_SUCCESS);
834 	}
835 }
836 
837 static void
director_flush_user(struct director * dir,struct user * user)838 director_flush_user(struct director *dir, struct user *user)
839 {
840 	struct director_kill_context *ctx = user->kill_ctx;
841 	struct var_expand_table tab[] = {
842 		{ 'i', user->host->ip_str, "ip" },
843 		{ 'h', user->host->hostname, "host" },
844 		{ '\0', NULL, NULL }
845 	};
846 	const char *error;
847 
848 	/* Execute flush script, if set. Only the director that started the
849 	   user moving will call the flush script. Having each director do it
850 	   would be redundant since they're all supposed to be performing the
851 	   same flush task to the same backend.
852 
853 	   Flushing is also not triggered if we're moving a user that we just
854 	   created due to the user move. This means that the user doesn't have
855 	   an old host, so we couldn't really even perform any flushing on the
856 	   backend. */
857 	if (*dir->set->director_flush_socket == '\0' ||
858 	    ctx->old_host_ip.family == 0 ||
859 	    !ctx->kill_is_self_initiated) {
860 		director_user_kill_finish_delayed(dir, user, FALSE);
861 		return;
862 	}
863 
864 	ctx->host_ip = user->host->ip;
865 
866 	string_t *s_sock = str_new(default_pool, 32);
867 	if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) {
868 		e_error(dir->event, "Failed to expand director_flush_socket=%s: %s",
869 			dir->set->director_flush_socket, error);
870 		director_user_kill_finish_delayed(dir, user, FALSE);
871 		return;
872 	}
873 	ctx->socket_path = str_free_without_data(&s_sock);
874 
875 	struct program_client_settings set = {
876 		.client_connect_timeout_msecs = 10000,
877 		.dns_client_socket_path = DIRECTOR_DNS_SOCKET_PATH,
878 	};
879 
880 	restrict_access_init(&set.restrict_set);
881 
882 	const char *const args[] = {
883 		"FLUSH",
884 		t_strdup_printf("%u", user->username_hash),
885 		net_ip2addr(&ctx->old_host_ip),
886 		user->host->ip_str,
887 		ctx->old_host_down ? "down" : "up",
888 		dec2str(ctx->old_host_vhost_count),
889 		NULL
890 	};
891 
892 	ctx->kill_state = USER_KILL_STATE_FLUSHING;
893 	e_debug(dir->event, "Flushing user %u via %s", user->username_hash,
894 		ctx->socket_path);
895 
896 	if ((program_client_create(ctx->socket_path, args, &set, FALSE,
897 				   &ctx->pclient, &error)) != 0) {
898 		e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
899 			ctx->socket_path,
900 			user->username_hash,
901 			user->host->ip_str,
902 			error);
903 		director_flush_user_continue(PROGRAM_CLIENT_EXIT_STATUS_FAILURE,
904 					     ctx);
905 		return;
906 	}
907 
908 	ctx->reply =
909 		iostream_temp_create_named("/tmp", 0,
910 					   t_strdup_printf("flush response from %s",
911 							   user->host->ip_str));
912 	o_stream_set_no_error_handling(ctx->reply, TRUE);
913 	program_client_set_output(ctx->pclient, ctx->reply);
914 	ctx->callback_pending = TRUE;
915 	program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
916 }
917 
director_user_move_finished(struct director * dir)918 static void director_user_move_finished(struct director *dir)
919 {
920 	i_assert(dir->users_moving_count > 0);
921 	dir->users_moving_count--;
922 
923 	director_set_state_changed(dir);
924 }
925 
director_user_move_free(struct user * user)926 static void director_user_move_free(struct user *user)
927 {
928 	struct director *dir = user->kill_ctx->dir;
929 	struct director_kill_context *kill_ctx = user->kill_ctx;
930 
931 	i_assert(kill_ctx != NULL);
932 
933 	e_debug(dir->event, "User %u move finished at state=%s", user->username_hash,
934 		user_kill_state_names[kill_ctx->kill_state]);
935 
936 	if (kill_ctx->ipc_cmd != NULL)
937 		ipc_client_cmd_abort(dir->ipc_proxy, &kill_ctx->ipc_cmd);
938 	timeout_remove(&kill_ctx->to_move);
939 	i_free(kill_ctx->socket_path);
940 	i_free(kill_ctx);
941 	user->kill_ctx = NULL;
942 
943 	director_user_move_finished(dir);
944 }
945 
946 static void
director_user_kill_finish_delayed_to(struct user * user)947 director_user_kill_finish_delayed_to(struct user *user)
948 {
949 	i_assert(user->kill_ctx != NULL);
950 	i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY);
951 
952 	director_user_move_free(user);
953 }
954 
955 static void
director_user_kill_finish_delayed(struct director * dir,struct user * user,bool skip_delay)956 director_user_kill_finish_delayed(struct director *dir, struct user *user,
957 				  bool skip_delay)
958 {
959 	if (skip_delay) {
960 		user->kill_ctx->kill_state = USER_KILL_STATE_NONE;
961 		director_user_move_free(user);
962 		return;
963 	}
964 
965 	user->kill_ctx->kill_state = USER_KILL_STATE_DELAY;
966 
967 	/* wait for a while for the kills to finish in the backend server,
968 	   so there are no longer any processes running for the user before we
969 	   start letting new in connections to the new server. */
970 	timeout_remove(&user->kill_ctx->to_move);
971 	user->kill_ctx->to_move =
972 		timeout_add(dir->set->director_user_kick_delay * 1000,
973 			    director_user_kill_finish_delayed_to, user);
974 }
975 
976 static void
director_finish_user_kill(struct director * dir,struct user * user,bool self)977 director_finish_user_kill(struct director *dir, struct user *user, bool self)
978 {
979 	struct director_kill_context *kill_ctx = user->kill_ctx;
980 
981 	i_assert(kill_ctx != NULL);
982 	i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING);
983 	i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY);
984 
985 	e_debug(dir->event, "User %u kill finished - %sstate=%s", user->username_hash,
986 		self ? "we started it " : "",
987 		user_kill_state_names[kill_ctx->kill_state]);
988 
989 	if (dir->right == NULL) {
990 		/* we're alone */
991 		director_flush_user(dir, user);
992 	} else if (self ||
993 		   kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
994 		director_connection_send(dir->right, t_strdup_printf(
995 			"USER-KILLED\t%u\n", user->username_hash));
996 		kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
997 	} else {
998 		i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING);
999 		kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
1000 	}
1001 }
1002 
director_user_kill_fail_throttled(unsigned int new_events_count,void * context ATTR_UNUSED)1003 static void director_user_kill_fail_throttled(unsigned int new_events_count,
1004 					      void *context ATTR_UNUSED)
1005 {
1006 	i_error("Failed to kill %u users' connections", new_events_count);
1007 }
1008 
director_kill_user_callback(enum ipc_client_cmd_state state,const char * data,void * context)1009 static void director_kill_user_callback(enum ipc_client_cmd_state state,
1010 					const char *data, void *context)
1011 {
1012 	struct director_kill_context *ctx = context;
1013 	struct user *user;
1014 
1015 	/* don't try to abort the IPC command anymore */
1016 	ctx->ipc_cmd = NULL;
1017 
1018 	/* this is an asynchronous notification about user being killed.
1019 	   there are no guarantees about what might have happened to the user
1020 	   in the mean time. */
1021 	switch (state) {
1022 	case IPC_CLIENT_CMD_STATE_REPLY:
1023 		/* shouldn't get here. the command reply isn't finished yet. */
1024 		e_error(ctx->dir->event,
1025 			"login process sent unexpected reply to kick: %s", data);
1026 		return;
1027 	case IPC_CLIENT_CMD_STATE_OK:
1028 		break;
1029 	case IPC_CLIENT_CMD_STATE_ERROR:
1030 		if (log_throttle_accept(user_kill_fail_throttle)) {
1031 			e_error(ctx->dir->event,
1032 				"Failed to kill user %u connections: %s",
1033 				ctx->username_hash, data);
1034 		}
1035 		/* we can't really do anything but continue anyway */
1036 		break;
1037 	}
1038 
1039 	i_assert(ctx->dir->users_kicking_count > 0);
1040 	ctx->dir->users_kicking_count--;
1041 	if (ctx->dir->kick_callback != NULL)
1042 		ctx->dir->kick_callback(ctx->dir);
1043 
1044 	user = user_directory_lookup(ctx->tag->users, ctx->username_hash);
1045 	if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
1046 		/* user was already freed - ignore */
1047 		i_assert(ctx->to_move == NULL);
1048 		director_user_move_finished(ctx->dir);
1049 		i_free(ctx);
1050 	} else {
1051 		i_assert(ctx->kill_state == USER_KILL_STATE_KILLING ||
1052 			 ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED);
1053 		/* we were still waiting for the kill notification */
1054 		director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated);
1055 	}
1056 }
1057 
director_user_move_throttled(unsigned int new_events_count,void * context ATTR_UNUSED)1058 static void director_user_move_throttled(unsigned int new_events_count,
1059 					 void *context ATTR_UNUSED)
1060 {
1061 	i_error("%u users' move timed out, their state may now be inconsistent",
1062 		new_events_count);
1063 }
1064 
director_user_move_timeout(struct user * user)1065 static void director_user_move_timeout(struct user *user)
1066 {
1067 	i_assert(user->kill_ctx != NULL);
1068 	i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY);
1069 
1070 	if (log_throttle_accept(user_move_throttle)) {
1071 		e_error(user->kill_ctx->dir->event,
1072 			"Finishing user %u move timed out, "
1073 			"its state may now be inconsistent (state=%s)",
1074 			user->username_hash,
1075 			user_kill_state_names[user->kill_ctx->kill_state]);
1076 	}
1077 	if (user->kill_ctx->kill_state == USER_KILL_STATE_FLUSHING) {
1078 		o_stream_unref(&user->kill_ctx->reply);
1079 		program_client_destroy(&user->kill_ctx->pclient);
1080 	}
1081 	director_user_move_free(user);
1082 }
1083 
director_kill_user(struct director * dir,struct director_host * src,struct user * user,struct mail_tag * tag,struct mail_host * old_host,bool forced_kick)1084 void director_kill_user(struct director *dir, struct director_host *src,
1085 			struct user *user, struct mail_tag *tag,
1086 			struct mail_host *old_host, bool forced_kick)
1087 {
1088 	struct director_kill_context *ctx;
1089 	const char *cmd;
1090 
1091 	if (USER_IS_BEING_KILLED(user)) {
1092 		/* User is being moved again before the previous move
1093 		   finished. We'll just continue wherever we left off
1094 		   earlier. */
1095 		e_debug(dir->event, "User %u move restarted - previous kill_state=%s",
1096 			user->username_hash,
1097 			user_kill_state_names[user->kill_ctx->kill_state]);
1098 		return;
1099 	}
1100 
1101 	user->kill_ctx = ctx = i_new(struct director_kill_context, 1);
1102 	ctx->dir = dir;
1103 	ctx->tag = tag;
1104 	ctx->username_hash = user->username_hash;
1105 	ctx->kill_is_self_initiated = src->self;
1106 	if (old_host != NULL) {
1107 		ctx->old_host_ip = old_host->ip;
1108 		ctx->old_host_down = old_host->down;
1109 		ctx->old_host_vhost_count = old_host->vhost_count;
1110 	}
1111 
1112 	dir->users_moving_count++;
1113 	ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
1114 				   director_user_move_timeout, user);
1115 	ctx->kill_state = USER_KILL_STATE_KILLING;
1116 
1117 	if ((old_host != NULL && old_host != user->host) || forced_kick) {
1118 		cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u",
1119 				      user->username_hash);
1120 		dir->users_kicking_count++;
1121 		ctx->ipc_cmd = ipc_client_cmd(dir->ipc_proxy, cmd,
1122 					      director_kill_user_callback, ctx);
1123 	} else {
1124 		/* a) we didn't even know about the user before now.
1125 		   don't bother performing a local kick, since it wouldn't
1126 		   kick anything.
1127 		   b) our host was already correct. notify others that we have
1128 		   killed the user, but don't really do it. */
1129 		director_finish_user_kill(ctx->dir, user,
1130 					  ctx->kill_is_self_initiated);
1131 	}
1132 }
1133 
director_move_user(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash,struct mail_host * host)1134 void director_move_user(struct director *dir, struct director_host *src,
1135 			struct director_host *orig_src,
1136 			unsigned int username_hash, struct mail_host *host)
1137 {
1138 	struct user_directory *users = host->tag->users;
1139 	struct mail_host *old_host = NULL;
1140 	struct user *user;
1141 
1142 	/* 1. move this user's host, and set its "killing" flag to delay all of
1143 	   its future connections until all directors have killed the
1144 	   connections and notified us about it.
1145 
1146 	   2. tell the other directors about the move
1147 
1148 	   3. once user kill callback is called, tell the other directors
1149 	   with USER-KILLED that we're done killing the user.
1150 
1151 	   4. when some director gets a duplicate USER-KILLED, it's
1152 	   responsible for notifying all directors that user is completely
1153 	   killed.
1154 
1155 	   5. after receiving USER-KILLED-EVERYWHERE notification,
1156 	   new connections are again allowed for the user.
1157 	*/
1158 	user = user_directory_lookup(users, username_hash);
1159 	if (user == NULL) {
1160 		e_debug(dir->event, "User %u move started: User was nonexistent",
1161 			username_hash);
1162 		user = user_directory_add(users, username_hash,
1163 					  host, ioloop_time);
1164 	} else if (user->host == host) {
1165 		/* User is already in the wanted host, but another director
1166 		   didn't think so. We'll need to finish the move without
1167 		   killing any of our connections. */
1168 		old_host = user->host;
1169 		user->timestamp = ioloop_time;
1170 		e_debug(dir->event, "User %u move forwarded: host is already %s",
1171 			username_hash, host->ip_str);
1172 	} else {
1173 		/* user is looked up via the new host's tag, so if it's found
1174 		   the old tag has to be the same. */
1175 		i_assert(user->host->tag == host->tag);
1176 
1177 		old_host = user->host;
1178 		user->host->user_count--;
1179 		user->host = host;
1180 		user->host->user_count++;
1181 		user->timestamp = ioloop_time;
1182 		e_debug(dir->event, "User %u move started: host %s -> %s",
1183 			username_hash, old_host->ip_str,
1184 			host->ip_str);
1185 	}
1186 
1187 	if (orig_src == NULL) {
1188 		orig_src = dir->self_host;
1189 		orig_src->last_seq++;
1190 	}
1191 	director_update_send(dir, src, t_strdup_printf(
1192 		"USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
1193 		orig_src->ip_str, orig_src->port, orig_src->last_seq,
1194 		user->username_hash, user->host->ip_str));
1195 	/* kill the user only after sending the USER-MOVE, because the kill
1196 	   may finish instantly. */
1197 	director_kill_user(dir, src, user, host->tag, old_host, FALSE);
1198 }
1199 
1200 static void
director_kick_user_callback(enum ipc_client_cmd_state state,const char * data,void * context)1201 director_kick_user_callback(enum ipc_client_cmd_state state,
1202 			    const char *data, void *context)
1203 {
1204 	struct director *dir = context;
1205 
1206 	if (state == IPC_CLIENT_CMD_STATE_REPLY) {
1207 		/* shouldn't get here. the command reply isn't finished yet. */
1208 		e_error(dir->event, "login process sent unexpected reply to kick: %s", data);
1209 		return;
1210 	}
1211 
1212 	i_assert(dir->users_kicking_count > 0);
1213 	dir->users_kicking_count--;
1214 	if (dir->kick_callback != NULL)
1215 		dir->kick_callback(dir);
1216 }
1217 
director_kick_user(struct director * dir,struct director_host * src,struct director_host * orig_src,const char * username)1218 void director_kick_user(struct director *dir, struct director_host *src,
1219 			struct director_host *orig_src, const char *username)
1220 {
1221 	string_t *cmd = t_str_new(64);
1222 
1223 	str_append(cmd, "proxy\t*\tKICK\t");
1224 	str_append_tabescaped(cmd, username);
1225 	dir->users_kicking_count++;
1226 	ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1227 		       director_kick_user_callback, dir);
1228 
1229 	if (orig_src == NULL) {
1230 		orig_src = dir->self_host;
1231 		orig_src->last_seq++;
1232 	}
1233 	str_truncate(cmd, 0);
1234 	str_printfa(cmd, "USER-KICK\t%s\t%u\t%u\t",
1235 		orig_src->ip_str, orig_src->port, orig_src->last_seq);
1236 	str_append_tabescaped(cmd, username);
1237 	str_append_c(cmd, '\n');
1238 	director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd));
1239 }
1240 
director_kick_user_alt(struct director * dir,struct director_host * src,struct director_host * orig_src,const char * field,const char * value)1241 void director_kick_user_alt(struct director *dir, struct director_host *src,
1242 			    struct director_host *orig_src,
1243 			    const char *field, const char *value)
1244 {
1245 	string_t *cmd = t_str_new(64);
1246 
1247 	str_append(cmd, "proxy\t*\tKICK-ALT\t");
1248 	str_append_tabescaped(cmd, field);
1249 	str_append_c(cmd, '\t');
1250 	str_append_tabescaped(cmd, value);
1251 	dir->users_kicking_count++;
1252 	ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1253 		       director_kick_user_callback, dir);
1254 
1255 	if (orig_src == NULL) {
1256 		orig_src = dir->self_host;
1257 		orig_src->last_seq++;
1258 	}
1259 	str_truncate(cmd, 0);
1260 	str_printfa(cmd, "USER-KICK-ALT\t%s\t%u\t%u\t",
1261 		orig_src->ip_str, orig_src->port, orig_src->last_seq);
1262 	str_append_tabescaped(cmd, field);
1263 	str_append_c(cmd, '\t');
1264 	str_append_tabescaped(cmd, value);
1265 	str_append_c(cmd, '\n');
1266 	director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd));
1267 }
1268 
director_kick_user_hash(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash,const struct ip_addr * except_ip)1269 void director_kick_user_hash(struct director *dir, struct director_host *src,
1270 			     struct director_host *orig_src,
1271 			     unsigned int username_hash,
1272 			     const struct ip_addr *except_ip)
1273 {
1274 	const char *cmd;
1275 
1276 	cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s",
1277 			      username_hash, net_ip2addr(except_ip));
1278 	dir->users_kicking_count++;
1279 	ipc_client_cmd(dir->ipc_proxy, cmd,
1280 		       director_kick_user_callback, dir);
1281 
1282 	if (orig_src == NULL) {
1283 		orig_src = dir->self_host;
1284 		orig_src->last_seq++;
1285 	}
1286 	cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n",
1287 		orig_src->ip_str, orig_src->port, orig_src->last_seq,
1288 		username_hash, net_ip2addr(except_ip));
1289 	director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
1290 }
1291 
1292 static void
director_send_user_killed_everywhere(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1293 director_send_user_killed_everywhere(struct director *dir,
1294 				     struct director_host *src,
1295 				     struct director_host *orig_src,
1296 				     unsigned int username_hash)
1297 {
1298 	if (orig_src == NULL) {
1299 		orig_src = dir->self_host;
1300 		orig_src->last_seq++;
1301 	}
1302 	director_update_send(dir, src, t_strdup_printf(
1303 		"USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
1304 		orig_src->ip_str, orig_src->port, orig_src->last_seq,
1305 		username_hash));
1306 }
1307 
1308 static void
director_user_tag_killed(struct director * dir,struct mail_tag * tag,unsigned int username_hash)1309 director_user_tag_killed(struct director *dir, struct mail_tag *tag,
1310 			 unsigned int username_hash)
1311 {
1312 	struct user *user;
1313 
1314 	user = user_directory_lookup(tag->users, username_hash);
1315 	if (user == NULL || !USER_IS_BEING_KILLED(user))
1316 		return;
1317 
1318 	switch (user->kill_ctx->kill_state) {
1319 	case USER_KILL_STATE_KILLING:
1320 		user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
1321 		break;
1322 	case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
1323 		director_finish_user_kill(dir, user, TRUE);
1324 		break;
1325 	case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
1326 		e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED",
1327 			username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
1328 		break;
1329 	case USER_KILL_STATE_NONE:
1330 	case USER_KILL_STATE_FLUSHING:
1331 	case USER_KILL_STATE_DELAY:
1332 		/* move restarted. state=none can also happen if USER-MOVE was
1333 		   sent while we were still moving. send back
1334 		   USER-KILLED-EVERYWHERE to avoid hangs. */
1335 		director_send_user_killed_everywhere(dir, dir->self_host, NULL,
1336 						     username_hash);
1337 		break;
1338 	case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE:
1339 		director_user_killed_everywhere(dir, dir->self_host,
1340 						NULL, username_hash);
1341 		break;
1342 	}
1343 }
1344 
director_user_killed(struct director * dir,unsigned int username_hash)1345 void director_user_killed(struct director *dir, unsigned int username_hash)
1346 {
1347 	struct mail_tag *tag;
1348 
1349 	array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag)
1350 		director_user_tag_killed(dir, tag, username_hash);
1351 }
1352 
1353 static void
director_user_tag_killed_everywhere(struct director * dir,struct mail_tag * tag,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1354 director_user_tag_killed_everywhere(struct director *dir,
1355 				    struct mail_tag *tag,
1356 				    struct director_host *src,
1357 				    struct director_host *orig_src,
1358 				    unsigned int username_hash)
1359 {
1360 	struct user *user;
1361 
1362 	user = user_directory_lookup(tag->users, username_hash);
1363 	if (user == NULL) {
1364 		e_debug(dir->event, "User %u no longer exists - ignoring USER-KILLED-EVERYWHERE",
1365 			username_hash);
1366 		return;
1367 	}
1368 	if (!USER_IS_BEING_KILLED(user)) {
1369 		e_debug(dir->event, "User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE",
1370 			username_hash);
1371 		return;
1372 	}
1373 	if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) {
1374 		e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE",
1375 			username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
1376 		return;
1377 	}
1378 
1379 	director_flush_user(dir, user);
1380 	director_send_user_killed_everywhere(dir, src, orig_src, username_hash);
1381 }
1382 
director_user_killed_everywhere(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1383 void director_user_killed_everywhere(struct director *dir,
1384 				     struct director_host *src,
1385 				     struct director_host *orig_src,
1386 				     unsigned int username_hash)
1387 {
1388 	struct mail_tag *tag;
1389 
1390 	array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) {
1391 		director_user_tag_killed_everywhere(dir, tag, src, orig_src,
1392 						    username_hash);
1393 	}
1394 }
1395 
director_state_callback_timeout(struct director * dir)1396 static void director_state_callback_timeout(struct director *dir)
1397 {
1398 	timeout_remove(&dir->to_callback);
1399 	dir->state_change_callback(dir);
1400 }
1401 
director_set_state_changed(struct director * dir)1402 void director_set_state_changed(struct director *dir)
1403 {
1404 	/* we may get called to here from various places. use a timeout to
1405 	   make sure the state callback is called with a clean state. */
1406 	if (dir->to_callback == NULL) {
1407 		dir->to_callback =
1408 			timeout_add(0, director_state_callback_timeout, dir);
1409 	}
1410 }
1411 
director_update_send(struct director * dir,struct director_host * src,const char * cmd)1412 void director_update_send(struct director *dir, struct director_host *src,
1413 			  const char *cmd)
1414 {
1415 	director_update_send_version(dir, src, 0, cmd);
1416 }
1417 
director_update_send_version(struct director * dir,struct director_host * src,unsigned int min_version,const char * cmd)1418 void director_update_send_version(struct director *dir,
1419 				  struct director_host *src,
1420 				  unsigned int min_version, const char *cmd)
1421 {
1422 	struct director_connection *conn;
1423 
1424 	i_assert(src != NULL);
1425 
1426 	array_foreach_elem(&dir->connections, conn) {
1427 		if (director_connection_get_host(conn) != src &&
1428 		    director_connection_get_minor_version(conn) >= min_version)
1429 			director_connection_send(conn, cmd);
1430 	}
1431 }
1432 
director_user_freed(struct user * user)1433 static void director_user_freed(struct user *user)
1434 {
1435 	if (user->kill_ctx != NULL) {
1436 		/* director_user_expire is very short. user expired before
1437 		   moving the user finished or timed out. */
1438 		if (user->kill_ctx->callback_pending) {
1439 			/* kill_ctx is used as a callback parameter.
1440 			   only remove the timeout and finish the free later. */
1441 			timeout_remove(&user->kill_ctx->to_move);
1442 		} else {
1443 			director_user_move_free(user);
1444 		}
1445 	}
1446 }
1447 
1448 struct director *
director_init(const struct director_settings * set,const struct ip_addr * listen_ip,in_port_t listen_port,director_state_change_callback_t * callback,director_kick_callback_t * kick_callback)1449 director_init(const struct director_settings *set,
1450 	      const struct ip_addr *listen_ip, in_port_t listen_port,
1451 	      director_state_change_callback_t *callback,
1452 	      director_kick_callback_t *kick_callback)
1453 {
1454 	struct director *dir;
1455 
1456 	dir = i_new(struct director, 1);
1457 	dir->set = set;
1458 	dir->self_port = listen_port;
1459 	dir->self_ip = *listen_ip;
1460 	dir->state_change_callback = callback;
1461 	dir->kick_callback = kick_callback;
1462 	dir->event = event_create(NULL);
1463 	event_add_category(dir->event, &event_category_director);
1464 	i_array_init(&dir->dir_hosts, 16);
1465 	i_array_init(&dir->pending_requests, 16);
1466 	i_array_init(&dir->connections, 8);
1467 	dir->mail_hosts = mail_hosts_init(dir, set->director_user_expire,
1468 					  director_user_freed);
1469 
1470 	dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
1471 	dir->ring_min_version = DIRECTOR_VERSION_MINOR;
1472 	return dir;
1473 }
1474 
director_deinit(struct director ** _dir)1475 void director_deinit(struct director **_dir)
1476 {
1477 	struct director *dir = *_dir;
1478 	struct director_host *const *hostp, *host;
1479 	struct director_connection *conn, *const *connp;
1480 
1481 	*_dir = NULL;
1482 
1483 	while (array_count(&dir->connections) > 0) {
1484 		connp = array_front(&dir->connections);
1485 		conn = *connp;
1486 		director_connection_deinit(&conn, "Shutting down");
1487 	}
1488 
1489 	mail_hosts_deinit(&dir->mail_hosts);
1490 	mail_hosts_deinit(&dir->orig_config_hosts);
1491 
1492 	ipc_client_deinit(&dir->ipc_proxy);
1493 	timeout_remove(&dir->to_reconnect);
1494 	timeout_remove(&dir->to_handshake_warning);
1495 	timeout_remove(&dir->to_request);
1496 	timeout_remove(&dir->to_sync);
1497 	timeout_remove(&dir->to_remove_dirs);
1498 	timeout_remove(&dir->to_callback);
1499 	while (array_count(&dir->dir_hosts) > 0) {
1500 		hostp = array_front(&dir->dir_hosts);
1501 		host = *hostp;
1502 		director_host_free(&host);
1503 	}
1504 	array_free(&dir->pending_requests);
1505 	array_free(&dir->dir_hosts);
1506 	array_free(&dir->connections);
1507 	event_unref(&dir->event);
1508 	i_free(dir);
1509 }
1510 
1511 struct director_user_iter {
1512 	struct director *dir;
1513 	unsigned int tag_idx;
1514 	struct user_directory_iter *user_iter;
1515 	bool iter_until_current_tail;
1516 };
1517 
1518 struct director_user_iter *
director_iterate_users_init(struct director * dir,bool iter_until_current_tail)1519 director_iterate_users_init(struct director *dir, bool iter_until_current_tail)
1520 {
1521 	struct director_user_iter *iter = i_new(struct director_user_iter, 1);
1522 	iter->dir = dir;
1523 	iter->iter_until_current_tail = iter_until_current_tail;
1524 	return iter;
1525 }
1526 
director_iterate_users_next(struct director_user_iter * iter)1527 struct user *director_iterate_users_next(struct director_user_iter *iter)
1528 {
1529 	const ARRAY_TYPE(mail_tag) *tags;
1530 	struct user *user;
1531 
1532 	i_assert(iter != NULL);
1533 
1534 	if (iter->user_iter == NULL) {
1535 		tags = mail_hosts_get_tags(iter->dir->mail_hosts);
1536 		if (iter->tag_idx >= array_count(tags))
1537 			return NULL;
1538 		struct mail_tag *tag = array_idx_elem(tags, iter->tag_idx);
1539 		iter->user_iter = user_directory_iter_init(tag->users,
1540 			iter->iter_until_current_tail);
1541 	}
1542 	user = user_directory_iter_next(iter->user_iter);
1543 	if (user == NULL) {
1544 		user_directory_iter_deinit(&iter->user_iter);
1545 		iter->tag_idx++;
1546 		return director_iterate_users_next(iter);
1547 	} else
1548 		return user;
1549 }
1550 
director_iterate_users_deinit(struct director_user_iter ** _iter)1551 void director_iterate_users_deinit(struct director_user_iter **_iter)
1552 {
1553 	i_assert(_iter != NULL && *_iter != NULL);
1554 	struct director_user_iter *iter = *_iter;
1555 	*_iter = NULL;
1556 	if (iter->user_iter != NULL)
1557 		user_directory_iter_deinit(&iter->user_iter);
1558 	i_free(iter);
1559 }
1560 
1561 bool
director_get_username_hash(struct director * dir,const char * username,unsigned int * hash_r)1562 director_get_username_hash(struct director *dir, const char *username,
1563 			   unsigned int *hash_r)
1564 {
1565 	const char *error;
1566 
1567 	if (mail_user_hash(username, dir->set->director_username_hash, hash_r,
1568 			   &error))
1569 		return TRUE;
1570 	e_error(dir->event, "Failed to expand director_username_hash=%s: %s",
1571 		dir->set->director_username_hash, error);
1572 	return FALSE;
1573 }
1574 
directors_init(void)1575 void directors_init(void)
1576 {
1577 	user_move_throttle =
1578 		log_throttle_init(&director_log_throttle_settings,
1579 				  director_user_move_throttled, NULL);
1580 	user_kill_fail_throttle =
1581 		log_throttle_init(&director_log_throttle_settings,
1582 				  director_user_kill_fail_throttled, NULL);
1583 }
1584 
directors_deinit(void)1585 void directors_deinit(void)
1586 {
1587 	log_throttle_deinit(&user_move_throttle);
1588 	log_throttle_deinit(&user_kill_fail_throttle);
1589 }
1590