1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "str.h"
7 #include "strescape.h"
8 #include "log-throttle.h"
9 #include "ipc-client.h"
10 #include "program-client.h"
11 #include "var-expand.h"
12 #include "istream.h"
13 #include "ostream.h"
14 #include "iostream-temp.h"
15 #include "mail-user-hash.h"
16 #include "user-directory.h"
17 #include "mail-host.h"
18 #include "director-host.h"
19 #include "director-connection.h"
20 #include "director.h"
21
22 #define DIRECTOR_IPC_PROXY_PATH "ipc"
23 #define DIRECTOR_DNS_SOCKET_PATH "dns-client"
24 #define DIRECTOR_RECONNECT_RETRY_SECS 60
25 #define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
26 #define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
27 #define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000)
28 #define DIRECTOR_RING_MIN_WAIT_SECS 20
29 #define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
30 #define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30)
31
32 const char *user_kill_state_names[USER_KILL_STATE_DELAY+1] = {
33 "none",
34 "killing",
35 "notify-received",
36 "waiting-for-notify",
37 "waiting-for-everyone",
38 "flushing",
39 "delay",
40 };
41
42 static struct event_category event_category_director = {
43 .name = "director",
44 };
45
46 static struct log_throttle *user_move_throttle;
47 static struct log_throttle *user_kill_fail_throttle;
48
49 static void director_hosts_purge_removed(struct director *dir);
50
51 static const struct log_throttle_settings director_log_throttle_settings = {
52 .throttle_at_max_per_interval = 100,
53 .unthrottle_at_max_per_interval = 2,
54 };
55
56 static void
57 director_user_kill_finish_delayed(struct director *dir, struct user *user,
58 bool skip_delay);
59
director_is_self_ip_set(struct director * dir)60 static bool director_is_self_ip_set(struct director *dir)
61 {
62 if (net_ip_compare(&dir->self_ip, &net_ip4_any))
63 return FALSE;
64
65 if (net_ip_compare(&dir->self_ip, &net_ip6_any))
66 return FALSE;
67
68 return TRUE;
69 }
70
director_find_self_ip(struct director * dir)71 static void director_find_self_ip(struct director *dir)
72 {
73 struct director_host *const *hosts;
74 unsigned int i, count;
75
76 hosts = array_get(&dir->dir_hosts, &count);
77 for (i = 0; i < count; i++) {
78 if (net_try_bind(&hosts[i]->ip) == 0) {
79 dir->self_ip = hosts[i]->ip;
80 return;
81 }
82 }
83 i_fatal("director_servers doesn't list ourself");
84 }
85
director_find_self(struct director * dir)86 void director_find_self(struct director *dir)
87 {
88 if (dir->self_host != NULL)
89 return;
90
91 if (!director_is_self_ip_set(dir))
92 director_find_self_ip(dir);
93
94 dir->self_host = director_host_lookup(dir, &dir->self_ip,
95 dir->self_port);
96 if (dir->self_host == NULL) {
97 i_fatal("director_servers doesn't list ourself (%s:%u)",
98 net_ip2addr(&dir->self_ip), dir->self_port);
99 }
100 dir->self_host->self = TRUE;
101 }
102
director_find_self_idx(struct director * dir)103 static unsigned int director_find_self_idx(struct director *dir)
104 {
105 struct director_host *const *hosts;
106 unsigned int i, count;
107
108 i_assert(dir->self_host != NULL);
109
110 hosts = array_get(&dir->dir_hosts, &count);
111 for (i = 0; i < count; i++) {
112 if (hosts[i] == dir->self_host)
113 return i;
114 }
115 i_unreached();
116 }
117
118 static bool
director_has_outgoing_connection(struct director * dir,struct director_host * host)119 director_has_outgoing_connection(struct director *dir,
120 struct director_host *host)
121 {
122 struct director_connection *conn;
123
124 array_foreach_elem(&dir->connections, conn) {
125 if (director_connection_get_host(conn) == host &&
126 !director_connection_is_incoming(conn))
127 return TRUE;
128 }
129 return FALSE;
130 }
131
132 static void
director_log_connect(struct director * dir,struct director_host * host,const char * reason)133 director_log_connect(struct director *dir, struct director_host *host,
134 const char *reason)
135 {
136 string_t *str = t_str_new(128);
137
138 if (host->last_network_failure > 0) {
139 str_printfa(str, ", last network failure %ds ago",
140 (int)(ioloop_time - host->last_network_failure));
141 }
142 if (host->last_protocol_failure > 0) {
143 str_printfa(str, ", last protocol failure %ds ago",
144 (int)(ioloop_time - host->last_protocol_failure));
145 }
146 e_info(dir->event, "Connecting to %s:%u (as %s%s): %s",
147 host->ip_str, host->port,
148 net_ip2addr(&dir->self_ip), str_c(str), reason);
149 }
150
director_connect_host(struct director * dir,struct director_host * host,const char * reason)151 int director_connect_host(struct director *dir, struct director_host *host,
152 const char *reason)
153 {
154 in_port_t port;
155 int fd;
156
157 if (director_has_outgoing_connection(dir, host))
158 return 0;
159
160 director_log_connect(dir, host, reason);
161 port = dir->test_port != 0 ? dir->test_port : host->port;
162 fd = net_connect_ip(&host->ip, port, &dir->self_ip);
163 if (fd == -1) {
164 host->last_network_failure = ioloop_time;
165 e_error(dir->event, "connect(%s) failed: %m", host->name);
166 return -1;
167 }
168 /* Reset timestamp so that director_connect() won't skip this host
169 while we're still trying to connect to it */
170 host->last_network_failure = 0;
171
172 (void)director_connection_init_out(dir, fd, host);
173 return 0;
174 }
175
176 static struct director_host *
director_get_preferred_right_host(struct director * dir)177 director_get_preferred_right_host(struct director *dir)
178 {
179 struct director_host *const *hosts, *host;
180 unsigned int i, count, self_idx;
181
182 hosts = array_get(&dir->dir_hosts, &count);
183 if (count == 1) {
184 /* self */
185 return NULL;
186 }
187
188 self_idx = director_find_self_idx(dir);
189 for (i = 0; i < count; i++) {
190 host = hosts[(self_idx + i + 1) % count];
191 if (!host->removed)
192 return host;
193 }
194 /* self, with some removed hosts */
195 return NULL;
196 }
197
director_quick_reconnect_retry(struct director * dir)198 static void director_quick_reconnect_retry(struct director *dir)
199 {
200 director_connect(dir, "Alone in director ring - trying to connect to others");
201 }
202
director_wait_for_others(struct director * dir)203 static bool director_wait_for_others(struct director *dir)
204 {
205 struct director_host *host;
206
207 /* don't assume we're alone until we've attempted to connect
208 to others for a while */
209 if (dir->ring_first_alone != 0 &&
210 ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS)
211 return FALSE;
212
213 if (dir->ring_first_alone == 0)
214 dir->ring_first_alone = ioloop_time;
215 /* reset all failures and try again */
216 array_foreach_elem(&dir->dir_hosts, host) {
217 host->last_network_failure = 0;
218 host->last_protocol_failure = 0;
219 }
220 timeout_remove(&dir->to_reconnect);
221 dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS,
222 director_quick_reconnect_retry, dir);
223 return TRUE;
224 }
225
director_connect(struct director * dir,const char * reason)226 void director_connect(struct director *dir, const char *reason)
227 {
228 struct director_host *const *hosts;
229 unsigned int i, count, self_idx;
230
231 self_idx = director_find_self_idx(dir);
232
233 /* try to connect to first working server on our right side.
234 the left side is supposed to connect to us. */
235 hosts = array_get(&dir->dir_hosts, &count);
236 for (i = 1; i < count; i++) {
237 unsigned int idx = (self_idx + i) % count;
238
239 if (hosts[idx]->removed)
240 continue;
241
242 if (hosts[idx]->last_network_failure +
243 DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
244 /* connection failed recently, don't try retrying here */
245 continue;
246 }
247 if (hosts[idx]->last_protocol_failure +
248 DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) {
249 /* the director recently sent invalid protocol data,
250 don't try retrying yet */
251 continue;
252 }
253
254 if (director_connect_host(dir, hosts[idx], reason) == 0) {
255 /* success */
256 return;
257 }
258 }
259
260 if (count > 1 && director_wait_for_others(dir))
261 return;
262
263 /* we're the only one */
264 if (count > 1) {
265 e_warning(dir->event,
266 "director: Couldn't connect to right side, "
267 "we must be the only director left");
268 }
269 if (dir->left != NULL) {
270 /* since we couldn't connect to it,
271 it must have failed recently */
272 e_warning(dir->event,
273 "director: Assuming %s is dead, disconnecting",
274 director_connection_get_name(dir->left));
275 director_connection_deinit(&dir->left,
276 "This connection is dead?");
277 }
278 dir->ring_min_version = DIRECTOR_VERSION_MINOR;
279 if (!dir->ring_handshaked)
280 director_set_ring_handshaked(dir);
281 else if (!dir->ring_synced)
282 director_set_ring_synced(dir);
283 }
284
director_set_ring_handshaked(struct director * dir)285 void director_set_ring_handshaked(struct director *dir)
286 {
287 i_assert(!dir->ring_handshaked);
288
289 timeout_remove(&dir->to_handshake_warning);
290 if (dir->ring_handshake_warning_sent) {
291 e_warning(dir->event,
292 "Directors have been connected, "
293 "continuing delayed requests");
294 dir->ring_handshake_warning_sent = FALSE;
295 }
296 e_debug(dir->event, "Director ring handshaked");
297
298 dir->ring_handshaked = TRUE;
299 director_set_ring_synced(dir);
300 }
301
director_reconnect_timeout(struct director * dir)302 static void director_reconnect_timeout(struct director *dir)
303 {
304 struct director_host *cur_host, *preferred_host =
305 director_get_preferred_right_host(dir);
306
307 cur_host = dir->right == NULL ? NULL :
308 director_connection_get_host(dir->right);
309
310 if (preferred_host == NULL) {
311 /* all directors have been removed, try again later */
312 } else if (cur_host != preferred_host) {
313 (void)director_connect_host(dir, preferred_host,
314 "Reconnect attempt to preferred director");
315 } else {
316 /* the connection hasn't finished sync yet.
317 keep this timeout for now. */
318 }
319 }
320
director_set_ring_synced(struct director * dir)321 void director_set_ring_synced(struct director *dir)
322 {
323 struct director_host *host;
324
325 i_assert(!dir->ring_synced);
326 i_assert((dir->left != NULL && dir->right != NULL) ||
327 (dir->left == NULL && dir->right == NULL));
328
329 timeout_remove(&dir->to_handshake_warning);
330 if (dir->ring_handshake_warning_sent) {
331 e_warning(dir->event,
332 "Ring is synced, continuing delayed requests "
333 "(syncing took %d secs, hosts_hash=%u)",
334 (int)(ioloop_time - dir->ring_last_sync_time),
335 mail_hosts_hash(dir->mail_hosts));
336 dir->ring_handshake_warning_sent = FALSE;
337 }
338
339 host = dir->right == NULL ? NULL :
340 director_connection_get_host(dir->right);
341
342 timeout_remove(&dir->to_reconnect);
343 if (host != director_get_preferred_right_host(dir)) {
344 /* try to reconnect to preferred host later */
345 dir->to_reconnect =
346 timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
347 director_reconnect_timeout, dir);
348 }
349
350 if (dir->left != NULL)
351 director_connection_set_synced(dir->left, TRUE);
352 if (dir->right != NULL)
353 director_connection_set_synced(dir->right, TRUE);
354 timeout_remove(&dir->to_sync);
355 dir->ring_synced = TRUE;
356 dir->ring_last_sync_time = ioloop_time;
357 /* If there are any director hosts still marked as "removed", we can
358 safely remove those now. The entire director cluster knows about the
359 removal now. */
360 director_hosts_purge_removed(dir);
361 mail_hosts_set_synced(dir->mail_hosts);
362 director_set_state_changed(dir);
363 }
364
director_sync_send(struct director * dir,struct director_host * host,uint32_t seq,unsigned int minor_version,unsigned int timestamp,unsigned int hosts_hash)365 void director_sync_send(struct director *dir, struct director_host *host,
366 uint32_t seq, unsigned int minor_version,
367 unsigned int timestamp, unsigned int hosts_hash)
368 {
369 string_t *str;
370
371 if (host == dir->self_host) {
372 dir->last_sync_sent_ring_change_counter = dir->ring_change_counter;
373 dir->last_sync_start_time = ioloop_timeval;
374 }
375
376 str = t_str_new(128);
377 str_printfa(str, "SYNC\t%s\t%u\t%u",
378 host->ip_str, host->port, seq);
379 if (minor_version > 0 &&
380 director_connection_get_minor_version(dir->right) > 0) {
381 /* only minor_version>0 supports extra parameters */
382 str_printfa(str, "\t%u\t%u\t%u", minor_version,
383 timestamp, hosts_hash);
384 }
385 str_append_c(str, '\n');
386 director_connection_send(dir->right, str_c(str));
387
388 /* ping our connections in case either of them are hanging.
389 if they are, we want to know it fast. */
390 if (dir->left != NULL)
391 director_connection_ping(dir->left);
392 director_connection_ping(dir->right);
393 }
394
395 static bool
director_has_any_outgoing_connections(struct director * dir)396 director_has_any_outgoing_connections(struct director *dir)
397 {
398 struct director_connection *conn;
399
400 array_foreach_elem(&dir->connections, conn) {
401 if (!director_connection_is_incoming(conn))
402 return TRUE;
403 }
404 return FALSE;
405 }
406
director_resend_sync(struct director * dir)407 bool director_resend_sync(struct director *dir)
408 {
409 if (dir->ring_synced) {
410 /* everything ok, no need to do anything */
411 return FALSE;
412 }
413
414 if (dir->right == NULL) {
415 /* right side connection is missing. make sure we're not
416 hanging due to some bug. */
417 if (dir->to_reconnect == NULL &&
418 !director_has_any_outgoing_connections(dir)) {
419 e_warning(dir->event,
420 "Right side connection is unexpectedly lost, reconnecting");
421 director_connect(dir, "Right side connection lost");
422 }
423 } else if (dir->left != NULL) {
424 /* send a new SYNC in case the previous one got dropped */
425 dir->self_host->last_sync_timestamp = ioloop_time;
426 director_sync_send(dir, dir->self_host, dir->sync_seq,
427 DIRECTOR_VERSION_MINOR, ioloop_time,
428 mail_hosts_hash(dir->mail_hosts));
429 if (dir->to_sync != NULL)
430 timeout_reset(dir->to_sync);
431 return TRUE;
432 }
433 return FALSE;
434 }
435
director_sync_timeout(struct director * dir)436 static void director_sync_timeout(struct director *dir)
437 {
438 i_assert(!dir->ring_synced);
439
440 if (director_resend_sync(dir))
441 e_error(dir->event, "Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq);
442 }
443
director_set_ring_unsynced(struct director * dir)444 void director_set_ring_unsynced(struct director *dir)
445 {
446 if (dir->ring_synced) {
447 dir->ring_synced = FALSE;
448 dir->ring_last_sync_time = ioloop_time;
449 }
450
451 if (dir->to_sync == NULL) {
452 dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS,
453 director_sync_timeout, dir);
454 } else {
455 timeout_reset(dir->to_sync);
456 }
457 }
458
director_sync(struct director * dir)459 static void director_sync(struct director *dir)
460 {
461 /* we're synced again when we receive this SYNC back */
462 dir->sync_seq++;
463 if (dir->right == NULL && dir->left == NULL) {
464 /* we're alone. if we're already synced,
465 don't become unsynced. */
466 return;
467 }
468 director_set_ring_unsynced(dir);
469
470 if (dir->sync_frozen) {
471 dir->sync_pending = TRUE;
472 return;
473 }
474 if (dir->right == NULL) {
475 i_assert(!dir->ring_synced ||
476 (dir->left == NULL && dir->right == NULL));
477 e_debug(dir->event, "Ring is desynced (seq=%u, no right connection)",
478 dir->sync_seq);
479 return;
480 }
481
482 e_debug(dir->event, "Ring is desynced (seq=%u, sending SYNC to %s)",
483 dir->sync_seq, dir->right == NULL ? "(nowhere)" :
484 director_connection_get_name(dir->right));
485
486 /* send PINGs to our connections more rapidly until we've synced again.
487 if the connection has actually died, we don't need to wait (and
488 delay requests) for as long to detect it */
489 if (dir->left != NULL)
490 director_connection_set_synced(dir->left, FALSE);
491 director_connection_set_synced(dir->right, FALSE);
492 director_sync_send(dir, dir->self_host, dir->sync_seq,
493 DIRECTOR_VERSION_MINOR, ioloop_time,
494 mail_hosts_hash(dir->mail_hosts));
495 }
496
director_sync_freeze(struct director * dir)497 void director_sync_freeze(struct director *dir)
498 {
499 struct director_connection *conn;
500
501 i_assert(!dir->sync_frozen);
502 i_assert(!dir->sync_pending);
503
504 array_foreach_elem(&dir->connections, conn)
505 director_connection_cork(conn);
506 dir->sync_frozen = TRUE;
507 }
508
director_sync_thaw(struct director * dir)509 void director_sync_thaw(struct director *dir)
510 {
511 struct director_connection *conn;
512
513 i_assert(dir->sync_frozen);
514
515 dir->sync_frozen = FALSE;
516 if (dir->sync_pending) {
517 dir->sync_pending = FALSE;
518 director_sync(dir);
519 }
520 array_foreach_elem(&dir->connections, conn)
521 director_connection_uncork(conn);
522 }
523
director_notify_ring_added(struct director_host * added_host,struct director_host * src,bool log)524 void director_notify_ring_added(struct director_host *added_host,
525 struct director_host *src, bool log)
526 {
527 const char *cmd;
528
529 if (log) {
530 e_info(added_host->dir->event,
531 "Adding director %s to ring (requested by %s)",
532 added_host->name, src->name);
533 }
534
535 added_host->dir->ring_change_counter++;
536 cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
537 added_host->ip_str, added_host->port);
538 director_update_send(added_host->dir, src, cmd);
539 }
540
director_hosts_purge_removed(struct director * dir)541 static void director_hosts_purge_removed(struct director *dir)
542 {
543 struct director_host *const *hosts, *host;
544 unsigned int i, count;
545
546 timeout_remove(&dir->to_remove_dirs);
547
548 hosts = array_get(&dir->dir_hosts, &count);
549 for (i = 0; i < count; ) {
550 if (hosts[i]->removed) {
551 host = hosts[i];
552 director_host_free(&host);
553 hosts = array_get(&dir->dir_hosts, &count);
554 } else {
555 i++;
556 }
557 }
558 }
559
director_ring_remove(struct director_host * removed_host,struct director_host * src)560 void director_ring_remove(struct director_host *removed_host,
561 struct director_host *src)
562 {
563 struct director *dir = removed_host->dir;
564 struct director_connection *const *conns, *conn;
565 unsigned int i, count;
566 const char *cmd;
567
568 e_info(dir->event, "Removing director %s from ring (requested by %s)",
569 removed_host->name, src->name);
570
571 if (removed_host->self && !src->self) {
572 /* others will just disconnect us */
573 return;
574 }
575
576 if (!removed_host->self) {
577 /* mark the host as removed and fully remove it later. this
578 delay is needed, because the removal may trigger director
579 reconnections, which may send the director back and we don't
580 want to re-add it */
581 removed_host->removed = TRUE;
582 if (dir->to_remove_dirs == NULL) {
583 dir->to_remove_dirs =
584 timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
585 director_hosts_purge_removed, dir);
586 }
587 }
588
589 /* if our left or ride side gets removed, notify them first
590 before disconnecting. */
591 cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
592 removed_host->ip_str, removed_host->port);
593 director_update_send_version(dir, src,
594 DIRECTOR_VERSION_RING_REMOVE, cmd);
595
596 /* disconnect any connections to the host */
597 conns = array_get(&dir->connections, &count);
598 for (i = 0; i < count; ) {
599 conn = conns[i];
600 if (director_connection_get_host(conn) != removed_host ||
601 removed_host->self)
602 i++;
603 else {
604 director_connection_deinit(&conn, "Removing from ring");
605 conns = array_get(&dir->connections, &count);
606 }
607 }
608 if (dir->right == NULL)
609 director_connect(dir, "Reconnecting after director was removed");
610 director_sync(dir);
611 }
612
613 static void
director_send_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)614 director_send_host(struct director *dir, struct director_host *src,
615 struct director_host *orig_src,
616 struct mail_host *host)
617 {
618 const char *host_tag = mail_host_get_tag(host);
619 string_t *str;
620
621 if (orig_src == NULL) {
622 orig_src = dir->self_host;
623 orig_src->last_seq++;
624 }
625
626 str = t_str_new(128);
627 str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u",
628 orig_src->ip_str, orig_src->port, orig_src->last_seq,
629 host->ip_str, host->vhost_count);
630 if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) {
631 str_append_c(str, '\t');
632 str_append_tabescaped(str, host_tag);
633 } else if (host_tag[0] != '\0' &&
634 dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) {
635 if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) {
636 e_error(dir->event, "Ring has directors that don't support tags - removing host %s with tag '%s'",
637 host->ip_str, host_tag);
638 } else {
639 e_error(dir->event, "Ring has directors that support mixed versions of tags - removing host %s with tag '%s'",
640 host->ip_str, host_tag);
641 }
642 director_remove_host(dir, NULL, NULL, host);
643 return;
644 }
645 if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) {
646 str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
647 (long)host->last_updown_change);
648 /* add any further version checks here - these directors ignore
649 any extra unknown arguments */
650 if (host->hostname != NULL)
651 str_append_tabescaped(str, host->hostname);
652 }
653 str_append_c(str, '\n');
654 director_update_send(dir, src, str_c(str));
655 }
656
director_resend_hosts(struct director * dir)657 void director_resend_hosts(struct director *dir)
658 {
659 struct mail_host *host;
660
661 array_foreach_elem(mail_hosts_get(dir->mail_hosts), host)
662 director_send_host(dir, dir->self_host, NULL, host);
663 }
664
director_update_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)665 void director_update_host(struct director *dir, struct director_host *src,
666 struct director_host *orig_src,
667 struct mail_host *host)
668 {
669 /* update state in case this is the first mail host being added */
670 director_set_state_changed(dir);
671
672 e_debug(dir->event, "Updating host %s vhost_count=%u "
673 "down=%d last_updown_change=%ld (hosts_hash=%u)",
674 host->ip_str, host->vhost_count, host->down ? 1 : 0,
675 (long)host->last_updown_change,
676 mail_hosts_hash(dir->mail_hosts));
677
678 director_send_host(dir, src, orig_src, host);
679
680 /* mark the host desynced until ring is synced again. except if we're
681 alone in the ring that never happens. */
682 if (dir->right != NULL || dir->left != NULL)
683 host->desynced = TRUE;
684 director_sync(dir);
685 }
686
director_remove_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)687 void director_remove_host(struct director *dir, struct director_host *src,
688 struct director_host *orig_src,
689 struct mail_host *host)
690 {
691 struct user_directory *users = host->tag->users;
692
693 if (src != NULL) {
694 if (orig_src == NULL) {
695 orig_src = dir->self_host;
696 orig_src->last_seq++;
697 }
698
699 director_update_send(dir, src, t_strdup_printf(
700 "HOST-REMOVE\t%s\t%u\t%u\t%s\n",
701 orig_src->ip_str, orig_src->port,
702 orig_src->last_seq, host->ip_str));
703 }
704
705 user_directory_remove_host(users, host);
706 mail_host_remove(host);
707 director_sync(dir);
708 }
709
director_flush_host(struct director * dir,struct director_host * src,struct director_host * orig_src,struct mail_host * host)710 void director_flush_host(struct director *dir, struct director_host *src,
711 struct director_host *orig_src,
712 struct mail_host *host)
713 {
714 struct user_directory *users = host->tag->users;
715
716 if (orig_src == NULL) {
717 orig_src = dir->self_host;
718 orig_src->last_seq++;
719 }
720
721 director_update_send(dir, src, t_strdup_printf(
722 "HOST-FLUSH\t%s\t%u\t%u\t%s\n",
723 orig_src->ip_str, orig_src->port, orig_src->last_seq,
724 host->ip_str));
725 user_directory_remove_host(users, host);
726 director_sync(dir);
727 }
728
director_update_user(struct director * dir,struct director_host * src,struct user * user)729 void director_update_user(struct director *dir, struct director_host *src,
730 struct user *user)
731 {
732 struct director_connection *conn;
733
734 i_assert(src != NULL);
735 i_assert(!user->weak);
736
737 array_foreach_elem(&dir->connections, conn) {
738 if (director_connection_get_host(conn) == src)
739 continue;
740
741 if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_USER_TIMESTAMP) {
742 director_connection_send(conn, t_strdup_printf(
743 "USER\t%u\t%s\t%u\n", user->username_hash, user->host->ip_str,
744 user->timestamp));
745 } else {
746 director_connection_send(conn, t_strdup_printf(
747 "USER\t%u\t%s\n", user->username_hash, user->host->ip_str));
748 }
749 }
750 }
751
director_update_user_weak(struct director * dir,struct director_host * src,struct director_connection * src_conn,struct director_host * orig_src,struct user * user)752 void director_update_user_weak(struct director *dir, struct director_host *src,
753 struct director_connection *src_conn,
754 struct director_host *orig_src,
755 struct user *user)
756 {
757 const char *cmd;
758
759 i_assert(src != NULL);
760 i_assert(user->weak);
761
762 if (orig_src == NULL) {
763 orig_src = dir->self_host;
764 orig_src->last_seq++;
765 }
766
767 cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
768 orig_src->ip_str, orig_src->port, orig_src->last_seq,
769 user->username_hash, user->host->ip_str);
770
771 if (src != dir->self_host && dir->left != NULL && dir->right != NULL &&
772 director_connection_get_host(dir->left) ==
773 director_connection_get_host(dir->right)) {
774 /* only two directors in this ring and we're forwarding
775 USER-WEAK from one director back to itself via another
776 so it sees we've received it. we can't use
777 director_update_send() for this, because it doesn't send
778 data back to the source. */
779 if (dir->right == src_conn)
780 director_connection_send(dir->left, cmd);
781 else if (dir->left == src_conn)
782 director_connection_send(dir->right, cmd);
783 else
784 i_unreached();
785 } else {
786 director_update_send(dir, src, cmd);
787 }
788 }
789
790 static void
director_flush_user_continue(enum program_client_exit_status result,struct director_kill_context * ctx)791 director_flush_user_continue(enum program_client_exit_status result,
792 struct director_kill_context *ctx)
793 {
794 struct director *dir = ctx->dir;
795 ctx->callback_pending = FALSE;
796
797 struct user *user = user_directory_lookup(ctx->tag->users,
798 ctx->username_hash);
799
800 if (result == PROGRAM_CLIENT_EXIT_STATUS_FAILURE) {
801 struct istream *is = iostream_temp_finish(&ctx->reply, SIZE_MAX);
802 char *data;
803 i_stream_set_return_partial_line(is, TRUE);
804 data = i_stream_read_next_line(is);
805 e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
806 ctx->socket_path,
807 ctx->username_hash,
808 net_ip2addr(&ctx->host_ip),
809 data == NULL ? "(no output to stdout)" : data);
810 while((data = i_stream_read_next_line(is)) != NULL) {
811 e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
812 ctx->socket_path,
813 ctx->username_hash,
814 net_ip2addr(&ctx->host_ip), data);
815 }
816 i_stream_unref(&is);
817 } else {
818 o_stream_unref(&ctx->reply);
819 }
820 program_client_destroy(&ctx->pclient);
821
822 if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
823 /* user was already freed - ignore */
824 e_debug(dir->event, "User %u freed while flushing, result=%d",
825 ctx->username_hash, result);
826 i_assert(ctx->to_move == NULL);
827 i_free(ctx);
828 } else {
829 /* ctx is freed later via user->kill_ctx */
830 e_debug(dir->event, "Flushing user %u finished, result=%d",
831 ctx->username_hash, result);
832 director_user_kill_finish_delayed(dir, user,
833 result == PROGRAM_CLIENT_EXIT_STATUS_SUCCESS);
834 }
835 }
836
837 static void
director_flush_user(struct director * dir,struct user * user)838 director_flush_user(struct director *dir, struct user *user)
839 {
840 struct director_kill_context *ctx = user->kill_ctx;
841 struct var_expand_table tab[] = {
842 { 'i', user->host->ip_str, "ip" },
843 { 'h', user->host->hostname, "host" },
844 { '\0', NULL, NULL }
845 };
846 const char *error;
847
848 /* Execute flush script, if set. Only the director that started the
849 user moving will call the flush script. Having each director do it
850 would be redundant since they're all supposed to be performing the
851 same flush task to the same backend.
852
853 Flushing is also not triggered if we're moving a user that we just
854 created due to the user move. This means that the user doesn't have
855 an old host, so we couldn't really even perform any flushing on the
856 backend. */
857 if (*dir->set->director_flush_socket == '\0' ||
858 ctx->old_host_ip.family == 0 ||
859 !ctx->kill_is_self_initiated) {
860 director_user_kill_finish_delayed(dir, user, FALSE);
861 return;
862 }
863
864 ctx->host_ip = user->host->ip;
865
866 string_t *s_sock = str_new(default_pool, 32);
867 if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) {
868 e_error(dir->event, "Failed to expand director_flush_socket=%s: %s",
869 dir->set->director_flush_socket, error);
870 director_user_kill_finish_delayed(dir, user, FALSE);
871 return;
872 }
873 ctx->socket_path = str_free_without_data(&s_sock);
874
875 struct program_client_settings set = {
876 .client_connect_timeout_msecs = 10000,
877 .dns_client_socket_path = DIRECTOR_DNS_SOCKET_PATH,
878 };
879
880 restrict_access_init(&set.restrict_set);
881
882 const char *const args[] = {
883 "FLUSH",
884 t_strdup_printf("%u", user->username_hash),
885 net_ip2addr(&ctx->old_host_ip),
886 user->host->ip_str,
887 ctx->old_host_down ? "down" : "up",
888 dec2str(ctx->old_host_vhost_count),
889 NULL
890 };
891
892 ctx->kill_state = USER_KILL_STATE_FLUSHING;
893 e_debug(dir->event, "Flushing user %u via %s", user->username_hash,
894 ctx->socket_path);
895
896 if ((program_client_create(ctx->socket_path, args, &set, FALSE,
897 &ctx->pclient, &error)) != 0) {
898 e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s",
899 ctx->socket_path,
900 user->username_hash,
901 user->host->ip_str,
902 error);
903 director_flush_user_continue(PROGRAM_CLIENT_EXIT_STATUS_FAILURE,
904 ctx);
905 return;
906 }
907
908 ctx->reply =
909 iostream_temp_create_named("/tmp", 0,
910 t_strdup_printf("flush response from %s",
911 user->host->ip_str));
912 o_stream_set_no_error_handling(ctx->reply, TRUE);
913 program_client_set_output(ctx->pclient, ctx->reply);
914 ctx->callback_pending = TRUE;
915 program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
916 }
917
director_user_move_finished(struct director * dir)918 static void director_user_move_finished(struct director *dir)
919 {
920 i_assert(dir->users_moving_count > 0);
921 dir->users_moving_count--;
922
923 director_set_state_changed(dir);
924 }
925
director_user_move_free(struct user * user)926 static void director_user_move_free(struct user *user)
927 {
928 struct director *dir = user->kill_ctx->dir;
929 struct director_kill_context *kill_ctx = user->kill_ctx;
930
931 i_assert(kill_ctx != NULL);
932
933 e_debug(dir->event, "User %u move finished at state=%s", user->username_hash,
934 user_kill_state_names[kill_ctx->kill_state]);
935
936 if (kill_ctx->ipc_cmd != NULL)
937 ipc_client_cmd_abort(dir->ipc_proxy, &kill_ctx->ipc_cmd);
938 timeout_remove(&kill_ctx->to_move);
939 i_free(kill_ctx->socket_path);
940 i_free(kill_ctx);
941 user->kill_ctx = NULL;
942
943 director_user_move_finished(dir);
944 }
945
946 static void
director_user_kill_finish_delayed_to(struct user * user)947 director_user_kill_finish_delayed_to(struct user *user)
948 {
949 i_assert(user->kill_ctx != NULL);
950 i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY);
951
952 director_user_move_free(user);
953 }
954
955 static void
director_user_kill_finish_delayed(struct director * dir,struct user * user,bool skip_delay)956 director_user_kill_finish_delayed(struct director *dir, struct user *user,
957 bool skip_delay)
958 {
959 if (skip_delay) {
960 user->kill_ctx->kill_state = USER_KILL_STATE_NONE;
961 director_user_move_free(user);
962 return;
963 }
964
965 user->kill_ctx->kill_state = USER_KILL_STATE_DELAY;
966
967 /* wait for a while for the kills to finish in the backend server,
968 so there are no longer any processes running for the user before we
969 start letting new in connections to the new server. */
970 timeout_remove(&user->kill_ctx->to_move);
971 user->kill_ctx->to_move =
972 timeout_add(dir->set->director_user_kick_delay * 1000,
973 director_user_kill_finish_delayed_to, user);
974 }
975
976 static void
director_finish_user_kill(struct director * dir,struct user * user,bool self)977 director_finish_user_kill(struct director *dir, struct user *user, bool self)
978 {
979 struct director_kill_context *kill_ctx = user->kill_ctx;
980
981 i_assert(kill_ctx != NULL);
982 i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING);
983 i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY);
984
985 e_debug(dir->event, "User %u kill finished - %sstate=%s", user->username_hash,
986 self ? "we started it " : "",
987 user_kill_state_names[kill_ctx->kill_state]);
988
989 if (dir->right == NULL) {
990 /* we're alone */
991 director_flush_user(dir, user);
992 } else if (self ||
993 kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
994 director_connection_send(dir->right, t_strdup_printf(
995 "USER-KILLED\t%u\n", user->username_hash));
996 kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
997 } else {
998 i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING);
999 kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
1000 }
1001 }
1002
director_user_kill_fail_throttled(unsigned int new_events_count,void * context ATTR_UNUSED)1003 static void director_user_kill_fail_throttled(unsigned int new_events_count,
1004 void *context ATTR_UNUSED)
1005 {
1006 i_error("Failed to kill %u users' connections", new_events_count);
1007 }
1008
director_kill_user_callback(enum ipc_client_cmd_state state,const char * data,void * context)1009 static void director_kill_user_callback(enum ipc_client_cmd_state state,
1010 const char *data, void *context)
1011 {
1012 struct director_kill_context *ctx = context;
1013 struct user *user;
1014
1015 /* don't try to abort the IPC command anymore */
1016 ctx->ipc_cmd = NULL;
1017
1018 /* this is an asynchronous notification about user being killed.
1019 there are no guarantees about what might have happened to the user
1020 in the mean time. */
1021 switch (state) {
1022 case IPC_CLIENT_CMD_STATE_REPLY:
1023 /* shouldn't get here. the command reply isn't finished yet. */
1024 e_error(ctx->dir->event,
1025 "login process sent unexpected reply to kick: %s", data);
1026 return;
1027 case IPC_CLIENT_CMD_STATE_OK:
1028 break;
1029 case IPC_CLIENT_CMD_STATE_ERROR:
1030 if (log_throttle_accept(user_kill_fail_throttle)) {
1031 e_error(ctx->dir->event,
1032 "Failed to kill user %u connections: %s",
1033 ctx->username_hash, data);
1034 }
1035 /* we can't really do anything but continue anyway */
1036 break;
1037 }
1038
1039 i_assert(ctx->dir->users_kicking_count > 0);
1040 ctx->dir->users_kicking_count--;
1041 if (ctx->dir->kick_callback != NULL)
1042 ctx->dir->kick_callback(ctx->dir);
1043
1044 user = user_directory_lookup(ctx->tag->users, ctx->username_hash);
1045 if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
1046 /* user was already freed - ignore */
1047 i_assert(ctx->to_move == NULL);
1048 director_user_move_finished(ctx->dir);
1049 i_free(ctx);
1050 } else {
1051 i_assert(ctx->kill_state == USER_KILL_STATE_KILLING ||
1052 ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED);
1053 /* we were still waiting for the kill notification */
1054 director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated);
1055 }
1056 }
1057
director_user_move_throttled(unsigned int new_events_count,void * context ATTR_UNUSED)1058 static void director_user_move_throttled(unsigned int new_events_count,
1059 void *context ATTR_UNUSED)
1060 {
1061 i_error("%u users' move timed out, their state may now be inconsistent",
1062 new_events_count);
1063 }
1064
director_user_move_timeout(struct user * user)1065 static void director_user_move_timeout(struct user *user)
1066 {
1067 i_assert(user->kill_ctx != NULL);
1068 i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY);
1069
1070 if (log_throttle_accept(user_move_throttle)) {
1071 e_error(user->kill_ctx->dir->event,
1072 "Finishing user %u move timed out, "
1073 "its state may now be inconsistent (state=%s)",
1074 user->username_hash,
1075 user_kill_state_names[user->kill_ctx->kill_state]);
1076 }
1077 if (user->kill_ctx->kill_state == USER_KILL_STATE_FLUSHING) {
1078 o_stream_unref(&user->kill_ctx->reply);
1079 program_client_destroy(&user->kill_ctx->pclient);
1080 }
1081 director_user_move_free(user);
1082 }
1083
director_kill_user(struct director * dir,struct director_host * src,struct user * user,struct mail_tag * tag,struct mail_host * old_host,bool forced_kick)1084 void director_kill_user(struct director *dir, struct director_host *src,
1085 struct user *user, struct mail_tag *tag,
1086 struct mail_host *old_host, bool forced_kick)
1087 {
1088 struct director_kill_context *ctx;
1089 const char *cmd;
1090
1091 if (USER_IS_BEING_KILLED(user)) {
1092 /* User is being moved again before the previous move
1093 finished. We'll just continue wherever we left off
1094 earlier. */
1095 e_debug(dir->event, "User %u move restarted - previous kill_state=%s",
1096 user->username_hash,
1097 user_kill_state_names[user->kill_ctx->kill_state]);
1098 return;
1099 }
1100
1101 user->kill_ctx = ctx = i_new(struct director_kill_context, 1);
1102 ctx->dir = dir;
1103 ctx->tag = tag;
1104 ctx->username_hash = user->username_hash;
1105 ctx->kill_is_self_initiated = src->self;
1106 if (old_host != NULL) {
1107 ctx->old_host_ip = old_host->ip;
1108 ctx->old_host_down = old_host->down;
1109 ctx->old_host_vhost_count = old_host->vhost_count;
1110 }
1111
1112 dir->users_moving_count++;
1113 ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
1114 director_user_move_timeout, user);
1115 ctx->kill_state = USER_KILL_STATE_KILLING;
1116
1117 if ((old_host != NULL && old_host != user->host) || forced_kick) {
1118 cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u",
1119 user->username_hash);
1120 dir->users_kicking_count++;
1121 ctx->ipc_cmd = ipc_client_cmd(dir->ipc_proxy, cmd,
1122 director_kill_user_callback, ctx);
1123 } else {
1124 /* a) we didn't even know about the user before now.
1125 don't bother performing a local kick, since it wouldn't
1126 kick anything.
1127 b) our host was already correct. notify others that we have
1128 killed the user, but don't really do it. */
1129 director_finish_user_kill(ctx->dir, user,
1130 ctx->kill_is_self_initiated);
1131 }
1132 }
1133
director_move_user(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash,struct mail_host * host)1134 void director_move_user(struct director *dir, struct director_host *src,
1135 struct director_host *orig_src,
1136 unsigned int username_hash, struct mail_host *host)
1137 {
1138 struct user_directory *users = host->tag->users;
1139 struct mail_host *old_host = NULL;
1140 struct user *user;
1141
1142 /* 1. move this user's host, and set its "killing" flag to delay all of
1143 its future connections until all directors have killed the
1144 connections and notified us about it.
1145
1146 2. tell the other directors about the move
1147
1148 3. once user kill callback is called, tell the other directors
1149 with USER-KILLED that we're done killing the user.
1150
1151 4. when some director gets a duplicate USER-KILLED, it's
1152 responsible for notifying all directors that user is completely
1153 killed.
1154
1155 5. after receiving USER-KILLED-EVERYWHERE notification,
1156 new connections are again allowed for the user.
1157 */
1158 user = user_directory_lookup(users, username_hash);
1159 if (user == NULL) {
1160 e_debug(dir->event, "User %u move started: User was nonexistent",
1161 username_hash);
1162 user = user_directory_add(users, username_hash,
1163 host, ioloop_time);
1164 } else if (user->host == host) {
1165 /* User is already in the wanted host, but another director
1166 didn't think so. We'll need to finish the move without
1167 killing any of our connections. */
1168 old_host = user->host;
1169 user->timestamp = ioloop_time;
1170 e_debug(dir->event, "User %u move forwarded: host is already %s",
1171 username_hash, host->ip_str);
1172 } else {
1173 /* user is looked up via the new host's tag, so if it's found
1174 the old tag has to be the same. */
1175 i_assert(user->host->tag == host->tag);
1176
1177 old_host = user->host;
1178 user->host->user_count--;
1179 user->host = host;
1180 user->host->user_count++;
1181 user->timestamp = ioloop_time;
1182 e_debug(dir->event, "User %u move started: host %s -> %s",
1183 username_hash, old_host->ip_str,
1184 host->ip_str);
1185 }
1186
1187 if (orig_src == NULL) {
1188 orig_src = dir->self_host;
1189 orig_src->last_seq++;
1190 }
1191 director_update_send(dir, src, t_strdup_printf(
1192 "USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
1193 orig_src->ip_str, orig_src->port, orig_src->last_seq,
1194 user->username_hash, user->host->ip_str));
1195 /* kill the user only after sending the USER-MOVE, because the kill
1196 may finish instantly. */
1197 director_kill_user(dir, src, user, host->tag, old_host, FALSE);
1198 }
1199
1200 static void
director_kick_user_callback(enum ipc_client_cmd_state state,const char * data,void * context)1201 director_kick_user_callback(enum ipc_client_cmd_state state,
1202 const char *data, void *context)
1203 {
1204 struct director *dir = context;
1205
1206 if (state == IPC_CLIENT_CMD_STATE_REPLY) {
1207 /* shouldn't get here. the command reply isn't finished yet. */
1208 e_error(dir->event, "login process sent unexpected reply to kick: %s", data);
1209 return;
1210 }
1211
1212 i_assert(dir->users_kicking_count > 0);
1213 dir->users_kicking_count--;
1214 if (dir->kick_callback != NULL)
1215 dir->kick_callback(dir);
1216 }
1217
director_kick_user(struct director * dir,struct director_host * src,struct director_host * orig_src,const char * username)1218 void director_kick_user(struct director *dir, struct director_host *src,
1219 struct director_host *orig_src, const char *username)
1220 {
1221 string_t *cmd = t_str_new(64);
1222
1223 str_append(cmd, "proxy\t*\tKICK\t");
1224 str_append_tabescaped(cmd, username);
1225 dir->users_kicking_count++;
1226 ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1227 director_kick_user_callback, dir);
1228
1229 if (orig_src == NULL) {
1230 orig_src = dir->self_host;
1231 orig_src->last_seq++;
1232 }
1233 str_truncate(cmd, 0);
1234 str_printfa(cmd, "USER-KICK\t%s\t%u\t%u\t",
1235 orig_src->ip_str, orig_src->port, orig_src->last_seq);
1236 str_append_tabescaped(cmd, username);
1237 str_append_c(cmd, '\n');
1238 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd));
1239 }
1240
director_kick_user_alt(struct director * dir,struct director_host * src,struct director_host * orig_src,const char * field,const char * value)1241 void director_kick_user_alt(struct director *dir, struct director_host *src,
1242 struct director_host *orig_src,
1243 const char *field, const char *value)
1244 {
1245 string_t *cmd = t_str_new(64);
1246
1247 str_append(cmd, "proxy\t*\tKICK-ALT\t");
1248 str_append_tabescaped(cmd, field);
1249 str_append_c(cmd, '\t');
1250 str_append_tabescaped(cmd, value);
1251 dir->users_kicking_count++;
1252 ipc_client_cmd(dir->ipc_proxy, str_c(cmd),
1253 director_kick_user_callback, dir);
1254
1255 if (orig_src == NULL) {
1256 orig_src = dir->self_host;
1257 orig_src->last_seq++;
1258 }
1259 str_truncate(cmd, 0);
1260 str_printfa(cmd, "USER-KICK-ALT\t%s\t%u\t%u\t",
1261 orig_src->ip_str, orig_src->port, orig_src->last_seq);
1262 str_append_tabescaped(cmd, field);
1263 str_append_c(cmd, '\t');
1264 str_append_tabescaped(cmd, value);
1265 str_append_c(cmd, '\n');
1266 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd));
1267 }
1268
director_kick_user_hash(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash,const struct ip_addr * except_ip)1269 void director_kick_user_hash(struct director *dir, struct director_host *src,
1270 struct director_host *orig_src,
1271 unsigned int username_hash,
1272 const struct ip_addr *except_ip)
1273 {
1274 const char *cmd;
1275
1276 cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s",
1277 username_hash, net_ip2addr(except_ip));
1278 dir->users_kicking_count++;
1279 ipc_client_cmd(dir->ipc_proxy, cmd,
1280 director_kick_user_callback, dir);
1281
1282 if (orig_src == NULL) {
1283 orig_src = dir->self_host;
1284 orig_src->last_seq++;
1285 }
1286 cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n",
1287 orig_src->ip_str, orig_src->port, orig_src->last_seq,
1288 username_hash, net_ip2addr(except_ip));
1289 director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd);
1290 }
1291
1292 static void
director_send_user_killed_everywhere(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1293 director_send_user_killed_everywhere(struct director *dir,
1294 struct director_host *src,
1295 struct director_host *orig_src,
1296 unsigned int username_hash)
1297 {
1298 if (orig_src == NULL) {
1299 orig_src = dir->self_host;
1300 orig_src->last_seq++;
1301 }
1302 director_update_send(dir, src, t_strdup_printf(
1303 "USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
1304 orig_src->ip_str, orig_src->port, orig_src->last_seq,
1305 username_hash));
1306 }
1307
1308 static void
director_user_tag_killed(struct director * dir,struct mail_tag * tag,unsigned int username_hash)1309 director_user_tag_killed(struct director *dir, struct mail_tag *tag,
1310 unsigned int username_hash)
1311 {
1312 struct user *user;
1313
1314 user = user_directory_lookup(tag->users, username_hash);
1315 if (user == NULL || !USER_IS_BEING_KILLED(user))
1316 return;
1317
1318 switch (user->kill_ctx->kill_state) {
1319 case USER_KILL_STATE_KILLING:
1320 user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
1321 break;
1322 case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
1323 director_finish_user_kill(dir, user, TRUE);
1324 break;
1325 case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
1326 e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED",
1327 username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
1328 break;
1329 case USER_KILL_STATE_NONE:
1330 case USER_KILL_STATE_FLUSHING:
1331 case USER_KILL_STATE_DELAY:
1332 /* move restarted. state=none can also happen if USER-MOVE was
1333 sent while we were still moving. send back
1334 USER-KILLED-EVERYWHERE to avoid hangs. */
1335 director_send_user_killed_everywhere(dir, dir->self_host, NULL,
1336 username_hash);
1337 break;
1338 case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE:
1339 director_user_killed_everywhere(dir, dir->self_host,
1340 NULL, username_hash);
1341 break;
1342 }
1343 }
1344
director_user_killed(struct director * dir,unsigned int username_hash)1345 void director_user_killed(struct director *dir, unsigned int username_hash)
1346 {
1347 struct mail_tag *tag;
1348
1349 array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag)
1350 director_user_tag_killed(dir, tag, username_hash);
1351 }
1352
1353 static void
director_user_tag_killed_everywhere(struct director * dir,struct mail_tag * tag,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1354 director_user_tag_killed_everywhere(struct director *dir,
1355 struct mail_tag *tag,
1356 struct director_host *src,
1357 struct director_host *orig_src,
1358 unsigned int username_hash)
1359 {
1360 struct user *user;
1361
1362 user = user_directory_lookup(tag->users, username_hash);
1363 if (user == NULL) {
1364 e_debug(dir->event, "User %u no longer exists - ignoring USER-KILLED-EVERYWHERE",
1365 username_hash);
1366 return;
1367 }
1368 if (!USER_IS_BEING_KILLED(user)) {
1369 e_debug(dir->event, "User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE",
1370 username_hash);
1371 return;
1372 }
1373 if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) {
1374 e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE",
1375 username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
1376 return;
1377 }
1378
1379 director_flush_user(dir, user);
1380 director_send_user_killed_everywhere(dir, src, orig_src, username_hash);
1381 }
1382
director_user_killed_everywhere(struct director * dir,struct director_host * src,struct director_host * orig_src,unsigned int username_hash)1383 void director_user_killed_everywhere(struct director *dir,
1384 struct director_host *src,
1385 struct director_host *orig_src,
1386 unsigned int username_hash)
1387 {
1388 struct mail_tag *tag;
1389
1390 array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) {
1391 director_user_tag_killed_everywhere(dir, tag, src, orig_src,
1392 username_hash);
1393 }
1394 }
1395
director_state_callback_timeout(struct director * dir)1396 static void director_state_callback_timeout(struct director *dir)
1397 {
1398 timeout_remove(&dir->to_callback);
1399 dir->state_change_callback(dir);
1400 }
1401
director_set_state_changed(struct director * dir)1402 void director_set_state_changed(struct director *dir)
1403 {
1404 /* we may get called to here from various places. use a timeout to
1405 make sure the state callback is called with a clean state. */
1406 if (dir->to_callback == NULL) {
1407 dir->to_callback =
1408 timeout_add(0, director_state_callback_timeout, dir);
1409 }
1410 }
1411
director_update_send(struct director * dir,struct director_host * src,const char * cmd)1412 void director_update_send(struct director *dir, struct director_host *src,
1413 const char *cmd)
1414 {
1415 director_update_send_version(dir, src, 0, cmd);
1416 }
1417
director_update_send_version(struct director * dir,struct director_host * src,unsigned int min_version,const char * cmd)1418 void director_update_send_version(struct director *dir,
1419 struct director_host *src,
1420 unsigned int min_version, const char *cmd)
1421 {
1422 struct director_connection *conn;
1423
1424 i_assert(src != NULL);
1425
1426 array_foreach_elem(&dir->connections, conn) {
1427 if (director_connection_get_host(conn) != src &&
1428 director_connection_get_minor_version(conn) >= min_version)
1429 director_connection_send(conn, cmd);
1430 }
1431 }
1432
director_user_freed(struct user * user)1433 static void director_user_freed(struct user *user)
1434 {
1435 if (user->kill_ctx != NULL) {
1436 /* director_user_expire is very short. user expired before
1437 moving the user finished or timed out. */
1438 if (user->kill_ctx->callback_pending) {
1439 /* kill_ctx is used as a callback parameter.
1440 only remove the timeout and finish the free later. */
1441 timeout_remove(&user->kill_ctx->to_move);
1442 } else {
1443 director_user_move_free(user);
1444 }
1445 }
1446 }
1447
1448 struct director *
director_init(const struct director_settings * set,const struct ip_addr * listen_ip,in_port_t listen_port,director_state_change_callback_t * callback,director_kick_callback_t * kick_callback)1449 director_init(const struct director_settings *set,
1450 const struct ip_addr *listen_ip, in_port_t listen_port,
1451 director_state_change_callback_t *callback,
1452 director_kick_callback_t *kick_callback)
1453 {
1454 struct director *dir;
1455
1456 dir = i_new(struct director, 1);
1457 dir->set = set;
1458 dir->self_port = listen_port;
1459 dir->self_ip = *listen_ip;
1460 dir->state_change_callback = callback;
1461 dir->kick_callback = kick_callback;
1462 dir->event = event_create(NULL);
1463 event_add_category(dir->event, &event_category_director);
1464 i_array_init(&dir->dir_hosts, 16);
1465 i_array_init(&dir->pending_requests, 16);
1466 i_array_init(&dir->connections, 8);
1467 dir->mail_hosts = mail_hosts_init(dir, set->director_user_expire,
1468 director_user_freed);
1469
1470 dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
1471 dir->ring_min_version = DIRECTOR_VERSION_MINOR;
1472 return dir;
1473 }
1474
director_deinit(struct director ** _dir)1475 void director_deinit(struct director **_dir)
1476 {
1477 struct director *dir = *_dir;
1478 struct director_host *const *hostp, *host;
1479 struct director_connection *conn, *const *connp;
1480
1481 *_dir = NULL;
1482
1483 while (array_count(&dir->connections) > 0) {
1484 connp = array_front(&dir->connections);
1485 conn = *connp;
1486 director_connection_deinit(&conn, "Shutting down");
1487 }
1488
1489 mail_hosts_deinit(&dir->mail_hosts);
1490 mail_hosts_deinit(&dir->orig_config_hosts);
1491
1492 ipc_client_deinit(&dir->ipc_proxy);
1493 timeout_remove(&dir->to_reconnect);
1494 timeout_remove(&dir->to_handshake_warning);
1495 timeout_remove(&dir->to_request);
1496 timeout_remove(&dir->to_sync);
1497 timeout_remove(&dir->to_remove_dirs);
1498 timeout_remove(&dir->to_callback);
1499 while (array_count(&dir->dir_hosts) > 0) {
1500 hostp = array_front(&dir->dir_hosts);
1501 host = *hostp;
1502 director_host_free(&host);
1503 }
1504 array_free(&dir->pending_requests);
1505 array_free(&dir->dir_hosts);
1506 array_free(&dir->connections);
1507 event_unref(&dir->event);
1508 i_free(dir);
1509 }
1510
1511 struct director_user_iter {
1512 struct director *dir;
1513 unsigned int tag_idx;
1514 struct user_directory_iter *user_iter;
1515 bool iter_until_current_tail;
1516 };
1517
1518 struct director_user_iter *
director_iterate_users_init(struct director * dir,bool iter_until_current_tail)1519 director_iterate_users_init(struct director *dir, bool iter_until_current_tail)
1520 {
1521 struct director_user_iter *iter = i_new(struct director_user_iter, 1);
1522 iter->dir = dir;
1523 iter->iter_until_current_tail = iter_until_current_tail;
1524 return iter;
1525 }
1526
director_iterate_users_next(struct director_user_iter * iter)1527 struct user *director_iterate_users_next(struct director_user_iter *iter)
1528 {
1529 const ARRAY_TYPE(mail_tag) *tags;
1530 struct user *user;
1531
1532 i_assert(iter != NULL);
1533
1534 if (iter->user_iter == NULL) {
1535 tags = mail_hosts_get_tags(iter->dir->mail_hosts);
1536 if (iter->tag_idx >= array_count(tags))
1537 return NULL;
1538 struct mail_tag *tag = array_idx_elem(tags, iter->tag_idx);
1539 iter->user_iter = user_directory_iter_init(tag->users,
1540 iter->iter_until_current_tail);
1541 }
1542 user = user_directory_iter_next(iter->user_iter);
1543 if (user == NULL) {
1544 user_directory_iter_deinit(&iter->user_iter);
1545 iter->tag_idx++;
1546 return director_iterate_users_next(iter);
1547 } else
1548 return user;
1549 }
1550
director_iterate_users_deinit(struct director_user_iter ** _iter)1551 void director_iterate_users_deinit(struct director_user_iter **_iter)
1552 {
1553 i_assert(_iter != NULL && *_iter != NULL);
1554 struct director_user_iter *iter = *_iter;
1555 *_iter = NULL;
1556 if (iter->user_iter != NULL)
1557 user_directory_iter_deinit(&iter->user_iter);
1558 i_free(iter);
1559 }
1560
1561 bool
director_get_username_hash(struct director * dir,const char * username,unsigned int * hash_r)1562 director_get_username_hash(struct director *dir, const char *username,
1563 unsigned int *hash_r)
1564 {
1565 const char *error;
1566
1567 if (mail_user_hash(username, dir->set->director_username_hash, hash_r,
1568 &error))
1569 return TRUE;
1570 e_error(dir->event, "Failed to expand director_username_hash=%s: %s",
1571 dir->set->director_username_hash, error);
1572 return FALSE;
1573 }
1574
directors_init(void)1575 void directors_init(void)
1576 {
1577 user_move_throttle =
1578 log_throttle_init(&director_log_throttle_settings,
1579 director_user_move_throttled, NULL);
1580 user_kill_fail_throttle =
1581 log_throttle_init(&director_log_throttle_settings,
1582 director_user_kill_fail_throttled, NULL);
1583 }
1584
directors_deinit(void)1585 void directors_deinit(void)
1586 {
1587 log_throttle_deinit(&user_move_throttle);
1588 log_throttle_deinit(&user_kill_fail_throttle);
1589 }
1590