1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2 
3 /*
4    Handshaking:
5 
6    Incoming director connections send:
7 
8    VERSION
9    ME
10    <wait for DONE from remote handshake>
11    DONE
12    <make this connection our "left" connection, potentially disconnecting
13    another one>
14 
15    Outgoing director connections send:
16 
17    VERSION
18    ME
19    [0..n] DIRECTOR
20    HOST-HAND-START
21    [0..n] HOST
22    HOST-HAND-END
23    [0..n] USER
24    <possibly other non-handshake commands between USERs>
25    DONE
26    <wait for DONE from remote>
27    <make this connection our "right" connection, potentially disconnecting
28    another one>
29 */
30 
31 #include "lib.h"
32 #include "ioloop.h"
33 #include "array.h"
34 #include "net.h"
35 #include "istream.h"
36 #include "ostream.h"
37 #include "str.h"
38 #include "strescape.h"
39 #include "time-util.h"
40 #include "master-service.h"
41 #include "mail-host.h"
42 #include "director.h"
43 #include "director-host.h"
44 #include "director-request.h"
45 #include "director-connection.h"
46 
47 #include <unistd.h>
48 #include <sys/time.h>
49 #include <sys/resource.h>
50 
51 #define MAX_INBUF_SIZE 1024
52 #define OUTBUF_FLUSH_THRESHOLD (1024*128)
53 /* Max time to wait for connect() to finish before aborting */
54 #define DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS (10*1000)
55 /* Max idling time before "ME" command must have been received,
56    or we'll disconnect. */
57 #define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000)
58 /* Max time to wait for USERs in handshake to be sent. With a lot of users the
59    kernel may quickly eat up everything we send, while the receiver is busy
60    parsing the data. */
61 #define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000)
62 /* Max idling time before "DONE" command must have been received,
63    or we'll disconnect. Use a slightly larger value than for _SEND_USERS_ so
64    that we'll get a better error if the sender decides to disconnect. */
65 #define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (40*1000)
66 /* How long to wait to send PING when connection is idle */
67 #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
68 /* How long to wait before sending PING while waiting for SYNC reply */
69 #define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
70 /* Log a warning if PING reply or PONG response takes longer than this */
71 #define DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS (5*1000)
72 /* If outgoing director connection exists for less than this many seconds,
73    mark the host as failed so we won't try to reconnect to it immediately */
74 #define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
75 /* If USER request doesn't have a timestamp, user isn't refreshed if it was
76    already refreshed director_user_expire/4 seconds ago. This value is the
77    hardcoded maximum for that value. */
78 #define DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS 15
79 #define DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS 1000
80 #define DIRECTOR_WAIT_DISCONNECT_SECS 10
81 #define DIRECTOR_HANDSHAKE_WARN_SECS 29
82 #define DIRECTOR_HANDSHAKE_BYTES_LOG_MIN_SECS (60*30)
83 #define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
84 /* If we receive SYNCs with a timestamp this many seconds higher than the last
85    valid received SYNC timestamp, assume that we lost the director's restart
86    notification and reset the last_sync_seq */
87 #define DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS (60*2)
88 #define DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS 1
89 /* How many USER entries to send during handshake before going back to ioloop
90    to see if there's other work to be done as well. */
91 #define DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH 10000
92 
93 #define CMD_IS_USER_HANDSHAKE(minor_version, args) \
94 	((minor_version) < DIRECTOR_VERSION_HANDSHAKE_U_CMD && \
95 	 str_array_length(args) > 2)
96 
97 #define DIRECTOR_OPT_CONSISTENT_HASHING "consistent-hashing"
98 
99 struct director_connection {
100 	int refcount;
101 	struct director *dir;
102 	struct event *event;
103 	char *name;
104 	struct timeval created, connected_time, me_received_time;
105 	struct timeval connected_user_cpu;
106 	unsigned int minor_version;
107 
108 	struct timeval last_input, last_output;
109 	size_t peak_bytes_buffered;
110 
111 	struct timeval ping_sent_time;
112 	size_t ping_sent_buffer_size;
113 	struct timeval ping_sent_user_cpu;
114 	uoff_t ping_sent_input_offset, ping_sent_output_offset;
115 	unsigned int last_ping_msecs;
116 
117 	/* for incoming connections the director host isn't known until
118 	   ME-line is received */
119 	struct director_host *host;
120 	/* this is set only for wrong connections: */
121 	struct director_host *connect_request_to;
122 
123 	int fd;
124 	struct io *io;
125 	struct istream *input;
126 	struct ostream *output;
127 	struct timeout *to_disconnect, *to_ping, *to_pong;
128 
129 	struct director_user_iter *user_iter;
130 	unsigned int users_received, handshake_users_received;
131 	unsigned int handshake_users_sent;
132 
133 	/* set during command execution */
134 	const char *cur_cmd, *const *cur_args;
135 
136 	bool in:1;
137 	bool connected:1;
138 	bool version_received:1;
139 	bool me_received:1;
140 	bool handshake_received:1;
141 	bool ignore_host_events:1;
142 	bool handshake_sending_hosts:1;
143 	bool ping_waiting:1;
144 	bool synced:1;
145 	bool wrong_host:1;
146 	bool verifying_left:1;
147 	bool users_unsorted:1;
148 	bool connected_user_cpu_set:1;
149 };
150 
151 static bool director_connection_unref(struct director_connection *conn);
152 static void director_finish_sending_handshake(struct director_connection *conn);
153 static void director_connection_disconnected(struct director_connection **conn,
154 					     const char *reason);
155 static void director_connection_reconnect(struct director_connection **conn,
156 					  const char *reason);
157 static void
158 director_connection_log_disconnect(struct director_connection *conn, int err,
159 				   const char *errstr);
160 static int director_connection_send_done(struct director_connection *conn);
161 
162 static void
director_connection_set_name(struct director_connection * conn,const char * name)163 director_connection_set_name(struct director_connection *conn, const char *name)
164 {
165 	char *old_name = conn->name;
166 	conn->name = i_strdup(name);
167 	i_free(old_name);
168 
169 	event_set_append_log_prefix(conn->event,
170 		t_strdup_printf("director(%s): ", conn->name));
171 }
172 
173 static void ATTR_FORMAT(2, 3)
director_cmd_error(struct director_connection * conn,const char * fmt,...)174 director_cmd_error(struct director_connection *conn, const char *fmt, ...)
175 {
176 	va_list args;
177 
178 	va_start(args, fmt);
179 	e_error(conn->event, "Command %s: %s (input: %s)",
180 		conn->cur_cmd, t_strdup_vprintf(fmt, args),
181 		t_strarray_join(conn->cur_args, "\t"));
182 	va_end(args);
183 
184 	if (conn->host != NULL)
185 		conn->host->last_protocol_failure = ioloop_time;
186 }
187 
188 static void
director_connection_append_stats(struct director_connection * conn,string_t * str)189 director_connection_append_stats(struct director_connection *conn, string_t *str)
190 {
191 	struct rusage usage;
192 
193 	str_printfa(str, "bytes in=%"PRIuUOFF_T", bytes out=%"PRIuUOFF_T,
194 		    conn->input->v_offset, conn->output->offset);
195 	str_printfa(str, ", %u+%u USERs received",
196 		    conn->handshake_users_received, conn->users_received);
197 	if (conn->handshake_users_sent > 0) {
198 		str_printfa(str, ", %u USERs sent in handshake",
199 			    conn->handshake_users_sent);
200 	}
201 	if (conn->last_input.tv_sec > 0) {
202 		int input_msecs = timeval_diff_msecs(&ioloop_timeval,
203 						     &conn->last_input);
204 		str_printfa(str, ", last input %u.%03u s ago",
205 			    input_msecs/1000, input_msecs%1000);
206 	}
207 	if (conn->last_output.tv_sec > 0) {
208 		int output_msecs = timeval_diff_msecs(&ioloop_timeval,
209 						      &conn->last_output);
210 		str_printfa(str, ", last output %u.%03u s ago",
211 			    output_msecs/1000, output_msecs%1000);
212 	}
213 	if (conn->connected) {
214 		int connected_msecs = timeval_diff_msecs(&ioloop_timeval,
215 							 &conn->connected_time);
216 		str_printfa(str, ", connected %u.%03u s ago",
217 			    connected_msecs/1000, connected_msecs%1000);
218 	}
219 	if (o_stream_get_buffer_used_size(conn->output) > 0) {
220 		str_printfa(str, ", %zu bytes in output buffer",
221 			    o_stream_get_buffer_used_size(conn->output));
222 	}
223 	str_printfa(str, ", %zu peak output buffer size",
224 		    conn->peak_bytes_buffered);
225 	if (conn->connected_user_cpu_set &&
226 	    getrusage(RUSAGE_SELF, &usage) == 0) {
227 		/* this isn't measuring the CPU usage used by the connection
228 		   itself, but it can still be a useful measurement */
229 		int diff = timeval_diff_msecs(&usage.ru_utime,
230 					      &conn->connected_user_cpu);
231 		str_printfa(str, ", %d.%03d CPU secs since connected",
232 			    diff / 1000, diff % 1000);
233 	}
234 }
235 
236 static void
director_connection_init_timeout(struct director_connection * conn)237 director_connection_init_timeout(struct director_connection *conn)
238 {
239 	struct timeval start_time;
240 	string_t *reason = t_str_new(128);
241 
242 	if (!conn->connected) {
243 		start_time = conn->created;
244 		str_append(reason, "Connect timed out");
245 	} else if (!conn->me_received) {
246 		start_time = conn->connected_time;
247 		str_append(reason, "Handshaking ME timed out");
248 	} else if (!conn->in) {
249 		start_time = conn->me_received_time;
250 		str_append(reason, "Sending handshake timed out");
251 	} else {
252 		start_time = conn->me_received_time;
253 		str_append(reason, "Handshaking DONE timed out");
254 	}
255 	int msecs = timeval_diff_msecs(&ioloop_timeval, &start_time);
256 	str_printfa(reason, " (%u.%03u secs, ", msecs/1000, msecs%1000);
257 	director_connection_append_stats(conn, reason);
258 	str_append_c(reason, ')');
259 
260 	e_error(conn->event, "%s", str_c(reason));
261 	director_connection_disconnected(&conn, "Handshake timeout");
262 }
263 
264 static void
director_connection_set_ping_timeout(struct director_connection * conn)265 director_connection_set_ping_timeout(struct director_connection *conn)
266 {
267 	unsigned int msecs;
268 
269 	msecs = conn->synced || !conn->handshake_received ?
270 		DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
271 		DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
272 
273 	timeout_remove(&conn->to_ping);
274 	conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
275 }
276 
director_connection_wait_timeout(struct director_connection * conn)277 static void director_connection_wait_timeout(struct director_connection *conn)
278 {
279 	director_connection_log_disconnect(conn, ETIMEDOUT, "");
280 	director_connection_deinit(&conn,
281 		"Timeout waiting for disconnect after CONNECT");
282 }
283 
director_connection_send_connect(struct director_connection * conn,struct director_host * host)284 static void director_connection_send_connect(struct director_connection *conn,
285 					     struct director_host *host)
286 {
287 	const char *connect_str;
288 
289 	if (conn->to_disconnect != NULL)
290 		return;
291 
292 	connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
293 				      host->ip_str, host->port);
294 	director_connection_send(conn, connect_str);
295 	o_stream_uncork(conn->output);
296 
297 	/* wait for a while for the remote to disconnect, so it will hopefully
298 	   see our CONNECT command. we'll also log the warning later to avoid
299 	   multiple log lines about it. */
300 	conn->connect_request_to = host;
301 	director_host_ref(conn->connect_request_to);
302 
303 	conn->to_disconnect =
304 		timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000,
305 			    director_connection_wait_timeout, conn);
306 }
307 
director_connection_assigned(struct director_connection * conn)308 static void director_connection_assigned(struct director_connection *conn)
309 {
310 	struct director *dir = conn->dir;
311 
312 	if (dir->left != NULL && dir->right != NULL) {
313 		/* we're connected to both directors. see if the ring is
314 		   finished by sending a SYNC. if we get it back, it's done. */
315 		dir->sync_seq++;
316 		director_set_ring_unsynced(dir);
317 		director_sync_send(dir, dir->self_host, dir->sync_seq,
318 				   DIRECTOR_VERSION_MINOR, ioloop_time,
319 				   mail_hosts_hash(dir->mail_hosts));
320 	}
321 	director_connection_set_ping_timeout(conn);
322 }
323 
director_connection_assign_left(struct director_connection * conn)324 static bool director_connection_assign_left(struct director_connection *conn)
325 {
326 	struct director *dir = conn->dir;
327 
328 	i_assert(conn->in);
329 	i_assert(dir->left != conn);
330 
331 	/* make sure this is the correct incoming connection */
332 	if (conn->host->self) {
333 		e_error(conn->event, "Connection from self, dropping");
334 		return FALSE;
335 	} else if (dir->left == NULL) {
336 		/* no conflicts yet */
337 	} else if (dir->left->host == conn->host) {
338 		e_warning(conn->event,
339 			  "Replacing left director connection %s with %s",
340 			  dir->left->host->name, conn->host->name);
341 		director_connection_deinit(&dir->left, t_strdup_printf(
342 			"Replacing with %s", conn->host->name));
343 	} else if (dir->left->verifying_left) {
344 		/* we're waiting to verify if our current left is still
345 		   working. if we don't receive a PONG, the current left
346 		   gets disconnected and a new left gets assigned. if we do
347 		   receive a PONG, we'll wait until the current left
348 		   disconnects us and then reassign the new left. */
349 		return TRUE;
350 	} else if (director_host_cmp_to_self(dir->left->host, conn->host,
351 					     dir->self_host) < 0) {
352 		/* the old connection is the correct one.
353 		   refer the client there (FIXME: do we ever get here?) */
354 		director_connection_send_connect(conn, dir->left->host);
355 		return TRUE;
356 	} else {
357 		/* this new connection is the correct one, but wait until the
358 		   old connection gets disconnected before using this one.
359 		   that guarantees that the director inserting itself into
360 		   the ring has finished handshaking its left side, so the
361 		   switch will be fast. */
362 		return TRUE;
363 	}
364 	dir->left = conn;
365 	director_connection_set_name(conn,
366 		t_strdup_printf("%s/left", conn->host->name));
367 	director_connection_assigned(conn);
368 	return TRUE;
369 }
370 
director_assign_left(struct director * dir)371 static void director_assign_left(struct director *dir)
372 {
373 	struct director_connection *conn;
374 
375 	array_foreach_elem(&dir->connections, conn) {
376 		if (conn->in && conn->handshake_received &&
377 		    conn->to_disconnect == NULL && conn != dir->left) {
378 			/* either use this or disconnect it */
379 			if (!director_connection_assign_left(conn)) {
380 				/* we don't want this */
381 				director_connection_deinit(&conn,
382 					"Unwanted incoming connection");
383 				director_assign_left(dir);
384 				break;
385 			}
386 		}
387 	}
388 }
389 
director_has_outgoing_connections(struct director * dir)390 static bool director_has_outgoing_connections(struct director *dir)
391 {
392 	struct director_connection *conn;
393 
394 	array_foreach_elem(&dir->connections, conn) {
395 		if (!conn->in && conn->to_disconnect == NULL)
396 			return TRUE;
397 	}
398 	return FALSE;
399 }
400 
director_send_delayed_syncs(struct director * dir)401 static void director_send_delayed_syncs(struct director *dir)
402 {
403 	struct director_host *host;
404 
405 	i_assert(dir->right != NULL);
406 
407 	e_debug(dir->right->event, "Sending delayed SYNCs");
408 	array_foreach_elem(&dir->dir_hosts, host) {
409 		if (host->delayed_sync_seq == 0)
410 			continue;
411 
412 		director_sync_send(dir, host, host->delayed_sync_seq,
413 				   host->delayed_sync_minor_version,
414 				   host->delayed_sync_timestamp,
415 				   host->delayed_sync_hosts_hash);
416 		host->delayed_sync_seq = 0;
417 	}
418 }
419 
director_connection_assign_right(struct director_connection * conn)420 static bool director_connection_assign_right(struct director_connection *conn)
421 {
422 	struct director *dir = conn->dir;
423 
424 	i_assert(!conn->in);
425 
426 	if (dir->right != NULL) {
427 		/* see if we should disconnect or keep the existing
428 		   connection. */
429 		if (director_host_cmp_to_self(conn->host, dir->right->host,
430 					      dir->self_host) <= 0) {
431 			/* the old connection is the correct one */
432 			e_warning(conn->event,
433 				  "Aborting incorrect outgoing connection to %s "
434 				  "(already connected to correct one: %s)",
435 				  conn->host->name, dir->right->host->name);
436 			conn->wrong_host = TRUE;
437 			return FALSE;
438 		}
439 		e_warning(conn->event,
440 			  "Replacing right director connection %s with %s",
441 			  dir->right->host->name, conn->host->name);
442 		director_connection_deinit(&dir->right, t_strdup_printf(
443 			"Replacing with %s", conn->host->name));
444 	}
445 	dir->right = conn;
446 	director_connection_set_name(conn,
447 		t_strdup_printf("%s/right", conn->host->name));
448 	director_connection_assigned(conn);
449 	director_send_delayed_syncs(dir);
450 	return TRUE;
451 }
452 
453 static bool
director_args_parse_ip_port(struct director_connection * conn,const char * const * args,struct ip_addr * ip_r,in_port_t * port_r)454 director_args_parse_ip_port(struct director_connection *conn,
455 			    const char *const *args,
456 			    struct ip_addr *ip_r, in_port_t *port_r)
457 {
458 	if (args[0] == NULL || args[1] == NULL) {
459 		director_cmd_error(conn, "Missing IP+port parameters");
460 		return FALSE;
461 	}
462 	if (net_addr2ip(args[0], ip_r) < 0) {
463 		director_cmd_error(conn, "Invalid IP address: %s", args[0]);
464 		return FALSE;
465 	}
466 	if (net_str2port(args[1], port_r) < 0) {
467 		director_cmd_error(conn, "Invalid port: %s", args[1]);
468 		return FALSE;
469 	}
470 	return TRUE;
471 }
472 
director_cmd_me(struct director_connection * conn,const char * const * args)473 static bool director_cmd_me(struct director_connection *conn,
474 			    const char *const *args)
475 {
476 	struct director *dir = conn->dir;
477 	const char *connect_str;
478 	struct ip_addr ip;
479 	in_port_t port;
480 	time_t next_comm_attempt;
481 
482 	if (!director_args_parse_ip_port(conn, args, &ip, &port))
483 		return FALSE;
484 	if (conn->me_received) {
485 		director_cmd_error(conn, "Duplicate ME");
486 		return FALSE;
487 	}
488 
489 	if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
490 			  conn->host->port != port)) {
491 		e_error(conn->event,
492 			"Remote director thinks it's someone else "
493 			"(connected to %s:%u, remote says it's %s:%u)",
494 			conn->host->ip_str, conn->host->port,
495 			net_ip2addr(&ip), port);
496 		return FALSE;
497 	}
498 	conn->me_received = TRUE;
499 	conn->me_received_time = ioloop_timeval;
500 
501 	if (args[2] != NULL) {
502 		time_t remote_time;
503 		int diff;
504 
505 		if (str_to_time(args[2], &remote_time) < 0) {
506 			director_cmd_error(conn, "Invalid ME timestamp");
507 			return FALSE;
508 		}
509 		diff = ioloop_time - remote_time;
510 		if (diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS ||
511 		    (diff < 0 && -diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS)) {
512 			e_warning(conn->event,
513 				  "Director %s clock differs from ours by %d secs",
514 				  conn->name, diff);
515 		}
516 	}
517 
518 	timeout_remove(&conn->to_ping);
519 	if (conn->in) {
520 		conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
521 					    director_connection_init_timeout, conn);
522 	} else {
523 		conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS,
524 					    director_connection_init_timeout, conn);
525 	}
526 
527 	if (!conn->in)
528 		return TRUE;
529 
530 	/* Incoming connection:
531 
532 	   a) we don't have an established ring yet. make sure we're connecting
533 	   to our right side (which might become our left side).
534 
535 	   b) it's our current "left" connection. the previous connection
536 	   is most likely dead.
537 
538 	   c) we have an existing ring. tell our current "left" to connect to
539 	   it with CONNECT command.
540 
541 	   d) the incoming connection doesn't belong to us at all, refer it
542 	   elsewhere with CONNECT. however, before disconnecting it verify
543 	   first that our left side is actually still functional.
544 	*/
545 	i_assert(conn->host == NULL);
546 	conn->host = director_host_get(dir, &ip, port);
547 	/* the host shouldn't be removed at this point, but if for some
548 	   reason it is we don't want to crash */
549 	conn->host->removed = FALSE;
550 	director_host_ref(conn->host);
551 	/* make sure we don't keep old sequence values across restarts */
552 	director_host_restarted(conn->host);
553 
554 	next_comm_attempt = conn->host->last_protocol_failure +
555 		DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS;
556 	if (next_comm_attempt > ioloop_time) {
557 		/* the director recently sent invalid protocol data,
558 		   don't try retrying yet */
559 		e_error(conn->event,
560 			"Remote sent invalid protocol data recently, "
561 			"waiting %u secs before allowing further communication",
562 			(unsigned int)(next_comm_attempt-ioloop_time));
563 		return FALSE;
564 	} else if (dir->left == NULL) {
565 		/* a) - just in case the left is also our right side reset
566 		   its failed state, so we can connect to it */
567 		conn->host->last_network_failure = 0;
568 		if (!director_has_outgoing_connections(dir))
569 			director_connect(dir, "Connecting to left");
570 	} else if (dir->left->host == conn->host) {
571 		/* b) */
572 		i_assert(dir->left != conn);
573 		director_connection_deinit(&dir->left,
574 			"Replacing with new incoming connection");
575 	} else if (director_host_cmp_to_self(conn->host, dir->left->host,
576 					     dir->self_host) < 0) {
577 		/* c) */
578 		connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
579 					      conn->host->ip_str,
580 					      conn->host->port);
581 		director_connection_send(dir->left, connect_str);
582 	} else {
583 		/* d) */
584 		dir->left->verifying_left = TRUE;
585 		director_connection_ping(dir->left);
586 	}
587 	return TRUE;
588 }
589 
590 static inline bool
user_need_refresh(struct director * dir,struct user * user,time_t timestamp,bool unknown_timestamp)591 user_need_refresh(struct director *dir, struct user *user,
592 		  time_t timestamp, bool unknown_timestamp)
593 {
594 	if (timestamp <= (time_t)user->timestamp) {
595 		/* we already have this timestamp */
596 		return FALSE;
597 	}
598 	if (unknown_timestamp) {
599 		/* Old director sent USER command without timestamp. We don't
600 		   know what it is exactly, but we can assume that it's very
601 		   close to the current time (which timestamp parameter is
602 		   already set to). However, try to break USER loops here when
603 		   director ring latency is >1sec, but below skip_recent_secs
604 		   by just not refreshing the user. */
605 		time_t skip_recent_secs =
606 			I_MIN(dir->set->director_user_expire/4,
607 			      DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS);
608 		if ((time_t)user->timestamp + skip_recent_secs >= timestamp)
609 			return FALSE;
610 	}
611 	return TRUE;
612 }
613 
614 static int
director_user_refresh(struct director_connection * conn,unsigned int username_hash,struct mail_host * host,time_t timestamp,bool weak,bool * forced_r,struct user ** user_r)615 director_user_refresh(struct director_connection *conn,
616 		      unsigned int username_hash, struct mail_host *host,
617 		      time_t timestamp, bool weak, bool *forced_r,
618 		      struct user **user_r)
619 {
620 	struct director *dir = conn->dir;
621 	struct user *user;
622 	bool ret = FALSE, unset_weak_user = FALSE;
623 	struct user_directory *users = host->tag->users;
624 	bool unknown_timestamp = (timestamp == (time_t)-1);
625 
626 	*forced_r = FALSE;
627 
628 	if (unknown_timestamp) {
629 		/* Old director version sent USER without timestamp. */
630 		timestamp = ioloop_time;
631 	}
632 
633 	if (timestamp + (time_t)dir->set->director_user_expire <= ioloop_time) {
634 		/* Ignore this refresh entirely, regardless of whether the
635 		   user already exists or not. */
636 		e_debug(conn->event,
637 			"user refresh: %u has expired timestamp %"PRIdTIME_T,
638 			  username_hash, timestamp);
639 		return -1;
640 	}
641 
642 	user = user_directory_lookup(users, username_hash);
643 	if (user == NULL) {
644 		*user_r = user_directory_add(users, username_hash,
645 					     host, timestamp);
646 		(*user_r)->weak = weak;
647 		e_debug(conn->event, "user refresh: %u added", username_hash);
648 		return 1;
649 	}
650 
651 	if (user->weak) {
652 		if (!weak) {
653 			/* removing user's weakness */
654 			e_debug(conn->event, "user refresh: %u weakness removed",
655 				username_hash);
656 			unset_weak_user = TRUE;
657 			user->weak = FALSE;
658 			ret = TRUE;
659 		} else {
660 			/* weak user marked again as weak */
661 		}
662 	} else if (weak &&
663 		   !user_directory_user_is_recently_updated(users, user)) {
664 		/* mark the user as weak */
665 		e_debug(conn->event, "user refresh: %u set weak", username_hash);
666 		user->weak = TRUE;
667 		ret = TRUE;
668 	} else if (weak) {
669 		e_debug(conn->event,
670 			"user refresh: %u weak update to %s ignored, "
671 			"we recently changed it to %s",
672 			username_hash, host->ip_str,
673 			user->host->ip_str);
674 		host = user->host;
675 		ret = TRUE;
676 	} else if (user->host == host) {
677 		/* update to the same host */
678 	} else if (user_directory_user_is_near_expiring(users, user)) {
679 		/* host conflict for a user that is already near expiring. we can
680 		   assume that the other director had already dropped this user
681 		   and we should have as well. use the new host. */
682 		e_debug(conn->event, "user refresh: %u is nearly expired, "
683 			"replacing host %s with %s", username_hash,
684 			user->host->ip_str, host->ip_str);
685 		ret = TRUE;
686 	} else if (USER_IS_BEING_KILLED(user)) {
687 		/* user is still being moved - ignore conflicting host updates
688 		   from other directors who don't yet know about the move. */
689 		e_debug(conn->event, "user refresh: %u is being moved, "
690 			"preserve its host %s instead of replacing with %s",
691 			username_hash, user->host->ip_str, host->ip_str);
692 		host = user->host;
693 	} else {
694 		/* non-weak user received a non-weak update with
695 		   conflicting host. this shouldn't happen. */
696 		string_t *str = t_str_new(128);
697 
698 		str_printfa(str, "User hash %u "
699 			    "is being redirected to two hosts: %s and %s",
700 			    username_hash, user->host->ip_str, host->ip_str);
701 		str_printfa(str, " (old_ts=%ld", (long)user->timestamp);
702 
703 		if (!conn->handshake_received) {
704 			str_printfa(str, ",handshaking,recv_ts=%ld",
705 				    (long)timestamp);
706 		}
707 		if (USER_IS_BEING_KILLED(user)) {
708 			if (user->kill_ctx->to_move != NULL)
709 				str_append(str, ",moving");
710 			str_printfa(str, ",kill_state=%s",
711 				    user_kill_state_names[user->kill_ctx->kill_state]);
712 		}
713 		str_append_c(str, ')');
714 		e_error(conn->event, "%s", str_c(str));
715 
716 		/* we want all the directors to redirect the user to same
717 		   server, but we don't want two directors fighting over which
718 		   server it belongs to, so always use the lower IP address */
719 		if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
720 			/* change the host. we'll also need to remove the user
721 			   from the old host's user_count, because we can't
722 			   keep track of the user for more than one host.
723 
724 			   send the updated USER back to the sender as well. */
725 			*forced_r = TRUE;
726 		} else {
727 			/* keep the host */
728 			host = user->host;
729 		}
730 		/* especially IMAP connections can take a long time to die.
731 		   make sure we kill off the connections in the wrong
732 		   backends. */
733 		director_kick_user_hash(dir, dir->self_host, NULL,
734 					username_hash, &host->ip);
735 		ret = TRUE;
736 	}
737 	if (user->host != host) {
738 		user->host->user_count--;
739 		user->host = host;
740 		user->host->user_count++;
741 		ret = TRUE;
742 	}
743 	/* Update user's timestamp if it's higher than the current one. Note
744 	   that we'll preserve the original timestamp. This is important when
745 	   the director ring is slow and a single USER can traverse through
746 	   the ring more than a second. We don't want to get into a loop where
747 	   the same USER goes through the ring forever. */
748 	if (user_need_refresh(dir, user, timestamp, unknown_timestamp)) {
749 		/* NOTE: This makes the users list somewhat out-of-order.
750 		   It's not a big problem - most likely it's only a few seconds
751 		   difference. The worst that can happen is that some users
752 		   take up memory that should have been freed already. */
753 		e_debug(conn->event, "user refresh: %u refreshed timestamp from %u to %"PRIdTIME_T,
754 			username_hash, user->timestamp, timestamp);
755 		user_directory_refresh(users, user);
756 		user->timestamp = timestamp;
757 		ret = TRUE;
758 	} else {
759 		e_debug(conn->event, "user refresh: %u ignored timestamp %"PRIdTIME_T" (we have %u)",
760 			username_hash, timestamp, user->timestamp);
761 	}
762 
763 	if (unset_weak_user) {
764 		/* user is no longer weak. handle pending requests for
765 		   this user if there are any */
766 		director_set_state_changed(conn->dir);
767 	}
768 
769 	*user_r = user;
770 	return ret ? 1 : 0;
771 }
772 
773 static bool
director_handshake_cmd_user(struct director_connection * conn,const char * const * args)774 director_handshake_cmd_user(struct director_connection *conn,
775 			    const char *const *args)
776 {
777 	unsigned int username_hash, timestamp;
778 	struct ip_addr ip;
779 	struct mail_host *host;
780 	struct user *user;
781 	bool weak, forced;
782 
783 	if (str_array_length(args) < 3 ||
784 	    str_to_uint(args[0], &username_hash) < 0 ||
785 	    net_addr2ip(args[1], &ip) < 0 ||
786 	    str_to_uint(args[2], &timestamp) < 0) {
787 		director_cmd_error(conn, "Invalid parameters");
788 		return FALSE;
789 	}
790 	weak = args[3] != NULL && args[3][0] == 'w';
791 	conn->handshake_users_received++;
792 
793 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
794 	if (host == NULL) {
795 		e_error(conn->event, "USER used unknown host %s in handshake",
796 			args[1]);
797 		return FALSE;
798 	}
799 
800 	if ((time_t)timestamp > ioloop_time) {
801 		/* The other director's clock seems to be into the future
802 		   compared to us. Don't set any of our users' timestamps into
803 		   future though. It's most likely only 1 second difference. */
804 		timestamp = ioloop_time;
805 	}
806 	conn->dir->num_incoming_requests++;
807 	if (director_user_refresh(conn, username_hash, host,
808 				  timestamp, weak, &forced, &user) < 0) {
809 		/* user expired - ignore */
810 		return TRUE;
811 	}
812 	/* Possibilities:
813 
814 	   a) The user didn't exist yet, and it was added with the given
815 	   timestamp.
816 
817 	   b) The user existed, but with an older timestamp. The timestamp
818 	   wasn't yet updated, so do it here below.
819 
820 	   c) The user existed with a newer timestamp. This is either because
821 	   we already received a non-handshake USER update for this user, or
822 	   our director saw a login for this user. Ignore this update.
823 
824 	   (We never want to change the user's timestamp to be older, because
825 	   that could result in directors going to a loop fighting each others
826 	   over a flipping timestamp.) */
827 	if (user->timestamp < timestamp)
828 		user->timestamp = timestamp;
829 	/* always sort users after handshaking to make sure the order
830 	   is correct */
831 	conn->users_unsorted = TRUE;
832 	return TRUE;
833 }
834 
835 static bool
director_cmd_user(struct director_connection * conn,const char * const * args)836 director_cmd_user(struct director_connection *conn,
837 		  const char *const *args)
838 {
839 	unsigned int username_hash;
840 	struct ip_addr ip;
841 	struct mail_host *host;
842 	struct user *user;
843 	bool forced;
844 	time_t timestamp = (time_t)-1;
845 
846 	if (str_array_length(args) < 2 ||
847 	    str_to_uint(args[0], &username_hash) < 0 ||
848 	    net_addr2ip(args[1], &ip) < 0 ||
849 	    (args[2] != NULL && str_to_time(args[2], &timestamp) < 0)) {
850 		director_cmd_error(conn, "Invalid parameters");
851 		return FALSE;
852 	}
853 
854 	/* could this before it's potentially ignored */
855 	conn->dir->num_incoming_requests++;
856 
857 	conn->users_received++;
858 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
859 	if (host == NULL) {
860 		/* we probably just removed this host. */
861 		return TRUE;
862 	}
863 
864 	if (director_user_refresh(conn, username_hash,
865 				  host, timestamp, FALSE, &forced, &user) > 0) {
866 		/* user changed - forward the USER in the ring */
867 		struct director_host *src_host =
868 			forced ? conn->dir->self_host : conn->host;
869 		i_assert(!user->weak);
870 		director_update_user(conn->dir, src_host, user);
871 	}
872 	return TRUE;
873 }
874 
director_cmd_director(struct director_connection * conn,const char * const * args)875 static bool director_cmd_director(struct director_connection *conn,
876 				  const char *const *args)
877 {
878 	struct director_host *host;
879 	struct ip_addr ip;
880 	in_port_t port;
881 	bool log_add = FALSE;
882 
883 	if (!director_args_parse_ip_port(conn, args, &ip, &port))
884 		return FALSE;
885 
886 	host = director_host_lookup(conn->dir, &ip, port);
887 	if (host != NULL) {
888 		if (host == conn->dir->self_host) {
889 			/* ignore updates to ourself */
890 			return TRUE;
891 		}
892 		if (host->removed) {
893 			/* ignore re-adds of removed directors */
894 			return TRUE;
895 		}
896 
897 		/* already have this. just reset its last_network_failure
898 		   timestamp, since it might be up now, but only if this
899 		   isn't part of the handshake. (if it was, reseting the
900 		   timestamp could cause us to rapidly keep trying to connect
901 		   to it) */
902 		if (conn->handshake_received)
903 			host->last_network_failure = 0;
904 		/* it also may have been restarted, reset its state */
905 		director_host_restarted(host);
906 	} else {
907 		/* save the director and forward it */
908 		host = director_host_add(conn->dir, &ip, port);
909 		log_add = TRUE;
910 	}
911 	/* just forward this to the entire ring until it reaches back to
912 	   itself. some hosts may see this twice, but that's the only way to
913 	   guarantee that it gets seen by everyone. resetting the host multiple
914 	   times may cause us to handle its commands multiple times, but the
915 	   commands can handle that. however, we need to also handle a
916 	   situation where the added director never comes back - we don't want
917 	   to send the director information in a loop forever. */
918 	if (conn->dir->right != NULL &&
919 	    director_host_cmp_to_self(host, conn->dir->right->host,
920 				      conn->dir->self_host) > 0) {
921 		e_debug(conn->event,
922 			"Received DIRECTOR update for a host where we should be connected to. "
923 			"Not forwarding it since it's probably crashed.");
924 	} else {
925 		director_notify_ring_added(host,
926 			director_connection_get_host(conn), log_add);
927 	}
928 	return TRUE;
929 }
930 
director_cmd_director_remove(struct director_connection * conn,const char * const * args)931 static bool director_cmd_director_remove(struct director_connection *conn,
932 					 const char *const *args)
933 {
934 	struct director_host *host;
935 	struct ip_addr ip;
936 	in_port_t port;
937 
938 	if (!director_args_parse_ip_port(conn, args, &ip, &port))
939 		return FALSE;
940 
941 	host = director_host_lookup(conn->dir, &ip, port);
942 	if (host != NULL && !host->removed)
943 		director_ring_remove(host, director_connection_get_host(conn));
944 	return TRUE;
945 }
946 
947 static bool
director_cmd_host_hand_start(struct director_connection * conn,const char * const * args)948 director_cmd_host_hand_start(struct director_connection *conn,
949 			     const char *const *args)
950 {
951 	const ARRAY_TYPE(mail_host) *hosts;
952 	struct mail_host *const *hostp;
953 	unsigned int remote_ring_completed;
954 
955 	if (args[0] == NULL ||
956 	    str_to_uint(args[0], &remote_ring_completed) < 0) {
957 		director_cmd_error(conn, "Invalid parameters");
958 		return FALSE;
959 	}
960 
961 	if (remote_ring_completed != 0 && !conn->dir->ring_handshaked) {
962 		/* clear everything we have and use only what remote sends us */
963 		e_debug(conn->event, "We're joining a ring - replace all hosts");
964 		hosts = mail_hosts_get(conn->dir->mail_hosts);
965 		while (array_count(hosts) > 0) {
966 			hostp = array_front(hosts);
967 			director_remove_host(conn->dir, NULL, NULL, *hostp);
968 		}
969 	} else if (remote_ring_completed == 0 && conn->dir->ring_handshaked) {
970 		/* ignore whatever remote sends */
971 		e_debug(conn->event, "Remote is joining our ring - "
972 			"ignore all remote HOSTs");
973 		conn->ignore_host_events = TRUE;
974 	} else {
975 		e_debug(conn->event, "Merge rings' hosts");
976 	}
977 	conn->handshake_sending_hosts = TRUE;
978 	return TRUE;
979 }
980 
981 static int
director_cmd_is_seen_full(struct director_connection * conn,const char * const ** _args,unsigned int * seq_r,struct director_host ** host_r)982 director_cmd_is_seen_full(struct director_connection *conn,
983 			  const char *const **_args, unsigned int *seq_r,
984 			  struct director_host **host_r)
985 {
986 	const char *const *args = *_args;
987 	struct ip_addr ip;
988 	in_port_t port;
989 	unsigned int seq;
990 	struct director_host *host;
991 
992 	if (str_array_length(args) < 3 ||
993 	    net_addr2ip(args[0], &ip) < 0 ||
994 	    net_str2port(args[1], &port) < 0 ||
995 	    str_to_uint(args[2], &seq) < 0) {
996 		director_cmd_error(conn, "Invalid parameters");
997 		return -1;
998 	}
999 	*_args = args + 3;
1000 	*seq_r = seq;
1001 
1002 	host = director_host_lookup(conn->dir, &ip, port);
1003 	if (host == NULL || host->removed) {
1004 		/* director is already gone, but we can't be sure if this
1005 		   command was sent everywhere. re-send it as if it was from
1006 		   ourself. */
1007 		*host_r = NULL;
1008 	} else {
1009 		*host_r = host;
1010 		if (seq <= host->last_seq) {
1011 			/* already seen this */
1012 			return 1;
1013 		}
1014 		host->last_seq = seq;
1015 	}
1016 	return 0;
1017 }
1018 
1019 static int
director_cmd_is_seen(struct director_connection * conn,const char * const ** _args,struct director_host ** host_r)1020 director_cmd_is_seen(struct director_connection *conn,
1021 		     const char *const **_args,
1022 		     struct director_host **host_r)
1023 {
1024 	unsigned int seq;
1025 
1026 	return director_cmd_is_seen_full(conn, _args, &seq, host_r);
1027 }
1028 
1029 static bool
director_cmd_user_weak(struct director_connection * conn,const char * const * args)1030 director_cmd_user_weak(struct director_connection *conn,
1031 		       const char *const *args)
1032 {
1033 	struct director_host *dir_host;
1034 	struct ip_addr ip;
1035 	unsigned int username_hash;
1036 	struct mail_host *host;
1037 	struct user *user;
1038 	struct director_host *src_host = conn->host;
1039 	bool weak = TRUE, weak_forward = FALSE, forced;
1040 	int ret;
1041 
1042 	/* note that unlike other commands we don't want to just ignore
1043 	   duplicate commands */
1044 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0)
1045 		return FALSE;
1046 
1047 	/* could this before it's potentially ignored */
1048 	conn->dir->num_incoming_requests++;
1049 
1050 	if (str_array_length(args) != 2 ||
1051 	    str_to_uint(args[0], &username_hash) < 0 ||
1052 	    net_addr2ip(args[1], &ip) < 0) {
1053 		director_cmd_error(conn, "Invalid parameters");
1054 		return FALSE;
1055 	}
1056 
1057 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1058 	if (host == NULL) {
1059 		/* we probably just removed this host. */
1060 		return TRUE;
1061 	}
1062 
1063 	if (ret == 0) {
1064 		/* First time we're seeing this - forward it to others also.
1065 		   We'll want to do it even if the user was already marked as
1066 		   weak, because otherwise if two directors mark the user weak
1067 		   at the same time both the USER-WEAK notifications reach
1068 		   only half the directors until they collide and neither one
1069 		   finishes going through the whole ring marking the user
1070 		   non-weak. */
1071 		weak_forward = TRUE;
1072 	} else if (dir_host == conn->dir->self_host) {
1073 		/* We originated this USER-WEAK request. The entire ring has seen
1074 		   it and there weren't any conflicts. Make the user non-weak. */
1075 		e_debug(conn->event,
1076 			"user refresh: %u Our USER-WEAK seen by the entire ring",
1077 			username_hash);
1078 		src_host = conn->dir->self_host;
1079 		weak = FALSE;
1080 	} else {
1081 		/* The original USER-WEAK sender will send a new non-weak USER
1082 		   update saying what really happened. We'll still need to forward
1083 		   this around the ring to the origin so it also knows it has
1084 		   travelled through the ring. */
1085 		e_debug(conn->event,
1086 			"user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring",
1087 			username_hash, dir_host->ip_str);
1088 		weak_forward = TRUE;
1089 	}
1090 
1091 	ret = director_user_refresh(conn, username_hash,
1092 				    host, ioloop_time, weak, &forced, &user);
1093 	/* user is refreshed with ioloop_time, it can't be expired already */
1094 	i_assert(ret >= 0);
1095 	if (ret > 0 || weak_forward) {
1096 		/* user changed, or we've decided that we need to forward
1097 		   the weakness notification to the rest of the ring even
1098 		   though we already knew it. */
1099 		if (forced)
1100 			src_host = conn->dir->self_host;
1101 		if (!user->weak)
1102 			director_update_user(conn->dir, src_host, user);
1103 		else {
1104 			director_update_user_weak(conn->dir, src_host, conn,
1105 						  dir_host, user);
1106 		}
1107 	}
1108 	return TRUE;
1109 }
1110 
1111 static bool ATTR_NULL(3)
director_cmd_host_int(struct director_connection * conn,const char * const * args,struct director_host * dir_host)1112 director_cmd_host_int(struct director_connection *conn, const char *const *args,
1113 		      struct director_host *dir_host)
1114 {
1115 	struct director_host *src_host = conn->host;
1116 	struct mail_host *host;
1117 	struct ip_addr ip;
1118 	const char *tag = "", *host_tag, *hostname = NULL;
1119 	unsigned int arg_count, vhost_count;
1120 	bool update, down = FALSE;
1121 	time_t last_updown_change = 0;
1122 
1123 	arg_count = str_array_length(args);
1124 	if (arg_count < 2 ||
1125 	    net_addr2ip(args[0], &ip) < 0 ||
1126 	    str_to_uint(args[1], &vhost_count) < 0) {
1127 		director_cmd_error(conn, "Invalid parameters");
1128 		return FALSE;
1129 	}
1130 	if (arg_count >= 3)
1131 		tag = args[2];
1132 	if (arg_count >= 4) {
1133 		if ((args[3][0] != 'D' && args[3][0] != 'U') ||
1134 		    str_to_time(args[3]+1, &last_updown_change) < 0) {
1135 			director_cmd_error(conn, "Invalid updown parameters");
1136 			return FALSE;
1137 		}
1138 		down = args[3][0] == 'D';
1139 	}
1140 	if (arg_count >= 5)
1141 		hostname = args[4];
1142 	if (conn->ignore_host_events) {
1143 		/* remote is sending hosts in a handshake, but it doesn't have
1144 		   a completed ring and we do. */
1145 		i_assert(conn->handshake_sending_hosts);
1146 		return TRUE;
1147 	}
1148 	if (tag[0] != '\0' && conn->minor_version < DIRECTOR_VERSION_TAGS_V2) {
1149 		director_cmd_error(conn, "Received a host tag from older director version with incompatible tagging support");
1150 		return FALSE;
1151 	}
1152 
1153 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1154 	if (host == NULL) {
1155 		host = mail_host_add_hostname(conn->dir->mail_hosts,
1156 					      hostname, &ip, tag);
1157 		update = TRUE;
1158 	} else {
1159 		update = host->vhost_count != vhost_count ||
1160 			host->down != down;
1161 
1162 		host_tag = mail_host_get_tag(host);
1163 		if (strcmp(tag, host_tag) != 0) {
1164 			e_error(conn->event,
1165 				"Host %s changed tag from '%s' to '%s'",
1166 				host->ip_str,
1167 				host_tag, tag);
1168 			mail_host_set_tag(host, tag);
1169 			update = TRUE;
1170 		}
1171 		if (update && host->desynced) {
1172 			string_t *str = t_str_new(128);
1173 
1174 			str_printfa(str, "Host %s is being updated before previous update had finished (",
1175 				  host->ip_str);
1176 			if (host->down != down &&
1177 			    host->last_updown_change > last_updown_change) {
1178 				/* our host has a newer change. preserve it. */
1179 				down = host->down;
1180 			}
1181 			if (host->down != down) {
1182 				if (host->down)
1183 					str_append(str, "down -> up");
1184 				else
1185 					str_append(str, "up -> down");
1186 			}
1187 			if (host->vhost_count != vhost_count) {
1188 				if (host->down != down)
1189 					str_append(str, ", ");
1190 				str_printfa(str, "vhosts %u -> %u",
1191 					    host->vhost_count, vhost_count);
1192 			}
1193 			str_append(str, ") - ");
1194 
1195 			vhost_count = I_MIN(vhost_count, host->vhost_count);
1196 			str_printfa(str, "setting to state=%s vhosts=%u",
1197 				    down ? "down" : "up", vhost_count);
1198 			e_warning(conn->event, "%s", str_c(str));
1199 			/* make the change appear to come from us, so it
1200 			   reaches the full ring */
1201 			dir_host = NULL;
1202 			src_host = conn->dir->self_host;
1203 		}
1204 		if (update) {
1205 			/* Make sure the host's timestamp never shrinks.
1206 			   Otherwise we might get into a loop where the up/down
1207 			   state keeps switching. */
1208 			last_updown_change = I_MAX(last_updown_change,
1209 						   host->last_updown_change);
1210 		}
1211 	}
1212 
1213 	if (update) {
1214 		const char *log_prefix = t_strdup_printf("director(%s): ",
1215 							 conn->name);
1216 		mail_host_set_down(host, down, last_updown_change, log_prefix);
1217 		mail_host_set_vhost_count(host, vhost_count, log_prefix);
1218 		director_update_host(conn->dir, src_host, dir_host, host);
1219 	} else {
1220 		e_debug(conn->event,
1221 			"Ignoring host %s update vhost_count=%u "
1222 			"down=%d last_updown_change=%ld (hosts_hash=%u)",
1223 			net_ip2addr(&ip), vhost_count, down ? 1 : 0,
1224 			(long)last_updown_change,
1225 			mail_hosts_hash(conn->dir->mail_hosts));
1226 	}
1227 	return TRUE;
1228 }
1229 
1230 static bool
director_cmd_host_handshake(struct director_connection * conn,const char * const * args)1231 director_cmd_host_handshake(struct director_connection *conn,
1232 			    const char *const *args)
1233 {
1234 	return director_cmd_host_int(conn, args, NULL);
1235 }
1236 
1237 static bool
director_cmd_host(struct director_connection * conn,const char * const * args)1238 director_cmd_host(struct director_connection *conn, const char *const *args)
1239 {
1240 	struct director_host *dir_host;
1241 	int ret;
1242 
1243 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1244 		return ret > 0;
1245 	return director_cmd_host_int(conn, args, dir_host);
1246 }
1247 
1248 static bool
director_cmd_host_remove(struct director_connection * conn,const char * const * args)1249 director_cmd_host_remove(struct director_connection *conn,
1250 			 const char *const *args)
1251 {
1252 	struct director_host *dir_host;
1253 	struct mail_host *host;
1254 	struct ip_addr ip;
1255 	int ret;
1256 
1257 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1258 		return ret > 0;
1259 
1260 	if (str_array_length(args) != 1 ||
1261 	    net_addr2ip(args[0], &ip) < 0) {
1262 		director_cmd_error(conn, "Invalid parameters");
1263 		return FALSE;
1264 	}
1265 
1266 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1267 	if (host != NULL)
1268 		director_remove_host(conn->dir, conn->host, dir_host, host);
1269 	return TRUE;
1270 }
1271 
1272 static bool
director_cmd_host_flush(struct director_connection * conn,const char * const * args)1273 director_cmd_host_flush(struct director_connection *conn,
1274 			 const char *const *args)
1275 {
1276 	struct director_host *dir_host;
1277 	struct mail_host *host;
1278 	struct ip_addr ip;
1279 	int ret;
1280 
1281 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1282 		return ret > 0;
1283 
1284 	if (str_array_length(args) != 1 ||
1285 	    net_addr2ip(args[0], &ip) < 0) {
1286 		director_cmd_error(conn, "Invalid parameters");
1287 		return FALSE;
1288 	}
1289 
1290 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1291 	if (host != NULL)
1292 		director_flush_host(conn->dir, conn->host, dir_host, host);
1293 	return TRUE;
1294 }
1295 
1296 static bool
director_cmd_user_move(struct director_connection * conn,const char * const * args)1297 director_cmd_user_move(struct director_connection *conn,
1298 		       const char *const *args)
1299 {
1300 	struct director_host *dir_host;
1301 	struct mail_host *host;
1302 	struct ip_addr ip;
1303 	unsigned int username_hash;
1304 	int ret;
1305 
1306 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1307 		return ret > 0;
1308 
1309 	if (str_array_length(args) != 2 ||
1310 	    str_to_uint(args[0], &username_hash) < 0 ||
1311 	    net_addr2ip(args[1], &ip) < 0) {
1312 		director_cmd_error(conn, "Invalid parameters");
1313 		return FALSE;
1314 	}
1315 
1316 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1317 	if (host != NULL) {
1318 		director_move_user(conn->dir, conn->host, dir_host,
1319 				   username_hash, host);
1320 	}
1321 	return TRUE;
1322 }
1323 
1324 static bool
director_cmd_user_kick(struct director_connection * conn,const char * const * args)1325 director_cmd_user_kick(struct director_connection *conn,
1326 		       const char *const *args)
1327 {
1328 	struct director_host *dir_host;
1329 	int ret;
1330 
1331 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1332 		return ret > 0;
1333 
1334 	if (str_array_length(args) != 1) {
1335 		director_cmd_error(conn, "Invalid parameters");
1336 		return FALSE;
1337 	}
1338 
1339 	director_kick_user(conn->dir, conn->host, dir_host, args[0]);
1340 	return TRUE;
1341 }
1342 
1343 static bool
director_cmd_user_kick_alt(struct director_connection * conn,const char * const * args)1344 director_cmd_user_kick_alt(struct director_connection *conn,
1345 			   const char *const *args)
1346 {
1347 	struct director_host *dir_host;
1348 	int ret;
1349 
1350 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1351 		return ret > 0;
1352 
1353 	if (str_array_length(args) != 2) {
1354 		director_cmd_error(conn, "Invalid parameters");
1355 		return FALSE;
1356 	}
1357 
1358 	director_kick_user_alt(conn->dir, conn->host, dir_host, args[0], args[1]);
1359 	return TRUE;
1360 }
1361 
1362 static bool
director_cmd_user_kick_hash(struct director_connection * conn,const char * const * args)1363 director_cmd_user_kick_hash(struct director_connection *conn,
1364 			    const char *const *args)
1365 {
1366 	struct director_host *dir_host;
1367 	unsigned int username_hash;
1368 	struct ip_addr except_ip;
1369 	int ret;
1370 
1371 	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1372 		return ret > 0;
1373 
1374 	if (str_array_length(args) != 2 ||
1375 	    str_to_uint(args[0], &username_hash) < 0 ||
1376 	    net_addr2ip(args[1], &except_ip) < 0) {
1377 		director_cmd_error(conn, "Invalid parameters");
1378 		return FALSE;
1379 	}
1380 
1381 	director_kick_user_hash(conn->dir, conn->host, dir_host,
1382 				username_hash, &except_ip);
1383 	return TRUE;
1384 }
1385 
1386 static bool
director_cmd_user_killed(struct director_connection * conn,const char * const * args)1387 director_cmd_user_killed(struct director_connection *conn,
1388 			 const char *const *args)
1389 {
1390 	unsigned int username_hash;
1391 
1392 	if (str_array_length(args) != 1 ||
1393 	    str_to_uint(args[0], &username_hash) < 0) {
1394 		director_cmd_error(conn, "Invalid parameters");
1395 		return FALSE;
1396 	}
1397 
1398 	director_user_killed(conn->dir, username_hash);
1399 	return TRUE;
1400 }
1401 
1402 static bool
director_cmd_user_killed_everywhere(struct director_connection * conn,const char * const * args)1403 director_cmd_user_killed_everywhere(struct director_connection *conn,
1404 				    const char *const *args)
1405 {
1406 	struct director_host *dir_host;
1407 	unsigned int seq, username_hash;
1408 	int ret;
1409 
1410 	if ((ret = director_cmd_is_seen_full(conn, &args, &seq, &dir_host)) < 0)
1411 		return FALSE;
1412 
1413 	if (str_array_length(args) != 1 ||
1414 	    str_to_uint(args[0], &username_hash) < 0) {
1415 		director_cmd_error(conn, "Invalid parameters");
1416 		return FALSE;
1417 	}
1418 
1419 	if (ret > 0) {
1420 		i_assert(dir_host != NULL);
1421 		e_debug(conn->event,
1422 			"User %u - ignoring already seen USER-KILLED-EVERYWHERE "
1423 			"with seq=%u <= %s.last_seq=%u", username_hash,
1424 			seq, dir_host->name, dir_host->last_seq);
1425 		return TRUE;
1426 	}
1427 
1428 	director_user_killed_everywhere(conn->dir, conn->host,
1429 					dir_host, username_hash);
1430 	return TRUE;
1431 }
1432 
director_handshake_cmd_done(struct director_connection * conn)1433 static bool director_handshake_cmd_done(struct director_connection *conn)
1434 {
1435 	struct director *dir = conn->dir;
1436 	int handshake_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->connected_time);
1437 	string_t *str;
1438 
1439 	if (conn->users_unsorted && conn->user_iter == NULL) {
1440 		/* we sent our user list before receiving remote's */
1441 		conn->users_unsorted = FALSE;
1442 		mail_hosts_sort_users(conn->dir->mail_hosts);
1443 	}
1444 
1445 	str = t_str_new(128);
1446 	str_printfa(str, "Handshake finished in %u.%03u secs (",
1447 		    handshake_msecs/1000, handshake_msecs%1000);
1448 	director_connection_append_stats(conn, str);
1449 	str_append_c(str, ')');
1450 	if (handshake_msecs >= DIRECTOR_HANDSHAKE_WARN_SECS*1000)
1451 		e_warning(conn->event, "%s", str_c(str));
1452 	else
1453 		e_info(conn->event, "%s", str_c(str));
1454 
1455 	/* the host is up now, make sure we can connect to it immediately
1456 	   if needed */
1457 	conn->host->last_network_failure = 0;
1458 
1459 	conn->handshake_received = TRUE;
1460 	if (conn->in) {
1461 		/* handshaked to left side. tell it we've received the
1462 		   whole handshake. */
1463 		director_connection_send(conn, "DONE\n");
1464 
1465 		/* tell the "right" director about the "left" one */
1466 		director_update_send(dir, director_connection_get_host(conn),
1467 			t_strdup_printf("DIRECTOR\t%s\t%u\n",
1468 					conn->host->ip_str,
1469 					conn->host->port));
1470 		/* this is our "left" side. */
1471 		return director_connection_assign_left(conn);
1472 	} else {
1473 		/* handshaked to "right" side. */
1474 		return director_connection_assign_right(conn);
1475 	}
1476 }
1477 
1478 static int
director_handshake_cmd_options(struct director_connection * conn,const char * const * args)1479 director_handshake_cmd_options(struct director_connection *conn,
1480 			       const char *const *args)
1481 {
1482 	bool consistent_hashing = FALSE;
1483 	unsigned int i;
1484 
1485 	for (i = 0; args[i] != NULL; i++) {
1486 		if (strcmp(args[i], DIRECTOR_OPT_CONSISTENT_HASHING) == 0)
1487 			consistent_hashing = TRUE;
1488 	}
1489 	if (!consistent_hashing) {
1490 		e_error(conn->event, "director_consistent_hashing settings "
1491 			"differ between directors. Set "
1492 			"director_consistent_hashing=yes on old directors");
1493 		return -1;
1494 	}
1495 	return 1;
1496 }
1497 
1498 static int
director_connection_handle_handshake(struct director_connection * conn,const char * cmd,const char * const * args)1499 director_connection_handle_handshake(struct director_connection *conn,
1500 				     const char *cmd, const char *const *args)
1501 {
1502 	unsigned int major_version;
1503 
1504 	/* both incoming and outgoing connections get VERSION and ME */
1505 	if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
1506 		if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
1507 			e_error(conn->event, "Wrong protocol in socket "
1508 				"(%s vs %s)",
1509 				args[0], DIRECTOR_VERSION_NAME);
1510 			return -1;
1511 		} else if (str_to_uint(args[1], &major_version) < 0 ||
1512 			str_to_uint(args[2], &conn->minor_version) < 0) {
1513 			e_error(conn->event, "Invalid protocol version: "
1514 				"%s.%s", args[1], args[2]);
1515 			return -1;
1516 		} else if (major_version != DIRECTOR_VERSION_MAJOR) {
1517 			e_error(conn->event, "Incompatible protocol version: "
1518 				"%u vs %u", major_version,
1519 				DIRECTOR_VERSION_MAJOR);
1520 			return -1;
1521 		}
1522 		if (conn->minor_version < DIRECTOR_VERSION_TAGS_V2 &&
1523 		    mail_hosts_have_tags(conn->dir->mail_hosts)) {
1524 			e_error(conn->event, "Director version supports incompatible tags");
1525 			return -1;
1526 		}
1527 		conn->version_received = TRUE;
1528 		director_finish_sending_handshake(conn);
1529 		return 1;
1530 	}
1531 	if (!conn->version_received) {
1532 		director_cmd_error(conn, "Incompatible protocol");
1533 		return -1;
1534 	}
1535 
1536 	if (strcmp(cmd, "ME") == 0)
1537 		return director_cmd_me(conn, args) ? 1 : -1;
1538 	if (!conn->me_received) {
1539 		director_cmd_error(conn, "Expecting ME command first");
1540 		return -1;
1541 	}
1542 
1543 	/* incoming connections get a HOST list */
1544 	if (conn->handshake_sending_hosts) {
1545 		if (strcmp(cmd, "HOST") == 0)
1546 			return director_cmd_host_handshake(conn, args) ? 1 : -1;
1547 		if (strcmp(cmd, "HOST-HAND-END") == 0) {
1548 			conn->ignore_host_events = FALSE;
1549 			conn->handshake_sending_hosts = FALSE;
1550 			return 1;
1551 		}
1552 		director_cmd_error(conn, "Unexpected command during host list");
1553 		return -1;
1554 	}
1555 	if (strcmp(cmd, "OPTIONS") == 0)
1556 		return director_handshake_cmd_options(conn, args);
1557 	if (strcmp(cmd, "HOST-HAND-START") == 0) {
1558 		if (!conn->in) {
1559 			director_cmd_error(conn,
1560 				"Host list is only for incoming connections");
1561 			return -1;
1562 		}
1563 		return director_cmd_host_hand_start(conn, args) ? 1 : -1;
1564 	}
1565 
1566 	if (conn->in &&
1567 	    (strcmp(cmd, "U") == 0 ||
1568 	     (strcmp(cmd, "USER") == 0 &&
1569 	      CMD_IS_USER_HANDSHAKE(conn->minor_version, args))))
1570 		return director_handshake_cmd_user(conn, args) ? 1 : -1;
1571 
1572 	/* both get DONE */
1573 	if (strcmp(cmd, "DONE") == 0)
1574 		return director_handshake_cmd_done(conn) ? 1 : -1;
1575 	return 0;
1576 }
1577 
1578 static bool
director_connection_sync_host(struct director_connection * conn,struct director_host * host,uint32_t seq,unsigned int minor_version,unsigned int timestamp,unsigned int hosts_hash)1579 director_connection_sync_host(struct director_connection *conn,
1580 			      struct director_host *host,
1581 			      uint32_t seq, unsigned int minor_version,
1582 			      unsigned int timestamp, unsigned int hosts_hash)
1583 {
1584 	struct director *dir = conn->dir;
1585 
1586 	if (minor_version > DIRECTOR_VERSION_MINOR) {
1587 		/* we're not up to date */
1588 		minor_version = DIRECTOR_VERSION_MINOR;
1589 	}
1590 
1591 	if (host->self) {
1592 		if (dir->sync_seq != seq) {
1593 			/* stale SYNC event */
1594 			return FALSE;
1595 		}
1596 		/* sync_seq increases when we get disconnected, so we must be
1597 		   successfully connected to both directions */
1598 		i_assert(dir->left != NULL && dir->right != NULL);
1599 
1600 		if (hosts_hash != 0 &&
1601 		    hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) {
1602 			e_error(conn->event, "Hosts unexpectedly changed during SYNC reply - resending"
1603 				"(seq=%u, old hosts_hash=%u, new hosts_hash=%u)",
1604 				seq, hosts_hash,
1605 				mail_hosts_hash(dir->mail_hosts));
1606 			(void)director_resend_sync(dir);
1607 			return FALSE;
1608 		}
1609 
1610 		dir->ring_min_version = minor_version;
1611 		if (!dir->ring_handshaked) {
1612 			/* the ring is handshaked */
1613 			director_set_ring_handshaked(dir);
1614 		} else if (dir->ring_synced) {
1615 			/* duplicate SYNC (which was sent just in case the
1616 			   previous one got lost) */
1617 		} else {
1618 			e_debug(conn->event, "Ring is synced (%s sent seq=%u, hosts_hash=%u)",
1619 				conn->name, seq,
1620 				mail_hosts_hash(dir->mail_hosts));
1621 			int sync_msecs =
1622 				timeval_diff_msecs(&ioloop_timeval, &dir->last_sync_start_time);
1623 			if (sync_msecs >= 0)
1624 				dir->last_sync_msecs = sync_msecs;
1625 			director_set_ring_synced(dir);
1626 		}
1627 	} else {
1628 		if (seq < host->last_sync_seq &&
1629 		    timestamp < host->last_sync_timestamp +
1630 		    DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS) {
1631 			/* stale SYNC event */
1632 			e_debug(conn->event, "Ignore stale SYNC event for %s "
1633 				"(seq %u < %u, timestamp=%u)",
1634 				host->name, seq, host->last_sync_seq,
1635 				timestamp);
1636 			return FALSE;
1637 		} else if (seq < host->last_sync_seq) {
1638 			e_warning(conn->event,
1639 				  "Last SYNC seq for %s appears to be stale, resetting "
1640 				  "(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
1641 				  host->name, host->last_sync_seq,
1642 				  host->last_sync_timestamp, seq, timestamp);
1643 			host->last_sync_seq = seq;
1644 			host->last_sync_timestamp = timestamp;
1645 			host->last_sync_seq_counter = 1;
1646 		} else if (seq > host->last_sync_seq ||
1647 			   timestamp > host->last_sync_timestamp) {
1648 			host->last_sync_seq = seq;
1649 			host->last_sync_timestamp = timestamp;
1650 			host->last_sync_seq_counter = 1;
1651 			e_debug(conn->event, "Update SYNC for %s "
1652 				"(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
1653 				host->name, host->last_sync_seq,
1654 				host->last_sync_timestamp, seq, timestamp);
1655 		} else if (++host->last_sync_seq_counter >
1656 			   DIRECTOR_MAX_SYNC_SEQ_DUPLICATES) {
1657 			/* we've received this too many times already */
1658 			e_debug(conn->event, "Ignore duplicate #%u SYNC event for %s "
1659 				"(seq=%u, timestamp %u <= %u)",
1660 				host->last_sync_seq_counter, host->name, seq,
1661 				timestamp, host->last_sync_timestamp);
1662 			return FALSE;
1663 		}
1664 
1665 		if (hosts_hash != 0 &&
1666 		    hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) {
1667 			if (host->desynced_hosts_hash != hosts_hash) {
1668 				e_debug(conn->event, "Ignore director %s stale SYNC request whose hosts don't match us "
1669 					"(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
1670 					host->ip_str, seq, hosts_hash,
1671 					mail_hosts_hash(dir->mail_hosts));
1672 				host->desynced_hosts_hash = hosts_hash;
1673 				return FALSE;
1674 			}
1675 			/* we'll get here only if we received a SYNC twice
1676 			   with the same wrong hosts_hash. FIXME: this gets
1677 			   triggered unnecessarily sometimes if hosts are
1678 			   changing rapidly. */
1679 			e_error(conn->event, "Director %s SYNC request hosts don't match us - resending hosts "
1680 				"(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
1681 				host->ip_str, seq,
1682 				hosts_hash, mail_hosts_hash(dir->mail_hosts));
1683 			director_resend_hosts(dir);
1684 			return FALSE;
1685 		}
1686 		host->desynced_hosts_hash = 0;
1687 		if (dir->right != NULL) {
1688 			/* forward it to the connection on right */
1689 			director_sync_send(dir, host, seq, minor_version,
1690 					   timestamp, hosts_hash);
1691 		} else {
1692 			e_debug(conn->event, "We have no right connection - "
1693 				"delay replying to SYNC until finished");
1694 			host->delayed_sync_seq = seq;
1695 			host->delayed_sync_minor_version = minor_version;
1696 			host->delayed_sync_timestamp = timestamp;
1697 			host->delayed_sync_hosts_hash = hosts_hash;
1698 		}
1699 	}
1700 	return TRUE;
1701 }
1702 
director_connection_sync(struct director_connection * conn,const char * const * args)1703 static bool director_connection_sync(struct director_connection *conn,
1704 				     const char *const *args)
1705 {
1706 	struct director *dir = conn->dir;
1707 	struct director_host *host;
1708 	struct ip_addr ip;
1709 	in_port_t port;
1710 	unsigned int arg_count, seq, minor_version = 0, timestamp = ioloop_time;
1711 	unsigned int hosts_hash = 0;
1712 
1713 	arg_count = str_array_length(args);
1714 	if (arg_count < 3 ||
1715 	    !director_args_parse_ip_port(conn, args, &ip, &port) ||
1716 	    str_to_uint(args[2], &seq) < 0) {
1717 		director_cmd_error(conn, "Invalid parameters");
1718 		return FALSE;
1719 	}
1720 	if (arg_count >= 4 && str_to_uint(args[3], &minor_version) < 0) {
1721 		director_cmd_error(conn, "Invalid parameters");
1722 		return FALSE;
1723 	}
1724 	if (arg_count >= 5 && str_to_uint(args[4], &timestamp) < 0) {
1725 		director_cmd_error(conn, "Invalid parameters");
1726 		return FALSE;
1727 	}
1728 	if (arg_count >= 6 && str_to_uint(args[5], &hosts_hash) < 0) {
1729 		director_cmd_error(conn, "Invalid parameters");
1730 		return FALSE;
1731 	}
1732 
1733 	/* find the originating director. if we don't see it, it was already
1734 	   removed and we can ignore this sync. */
1735 	host = director_host_lookup(dir, &ip, port);
1736 	if (host != NULL) {
1737 		if (!director_connection_sync_host(conn, host, seq,
1738 						   minor_version, timestamp,
1739 						   hosts_hash))
1740 			return TRUE;
1741 	}
1742 
1743 	/* If directors got disconnected while we were waiting a SYNC reply,
1744 	   it might have gotten lost. If we've received a DIRECTOR update since
1745 	   the last time we sent a SYNC, retry sending it here to make sure
1746 	   it doesn't get stuck. We don't want to do this too eagerly because
1747 	   it may trigger desynced_hosts_hash != hosts_hash mismatch, which
1748 	   causes unnecessary error logging and hosts-resending. */
1749 	if ((host == NULL || !host->self) &&
1750 	    dir->last_sync_sent_ring_change_counter != dir->ring_change_counter &&
1751 	    (time_t)dir->self_host->last_sync_timestamp != ioloop_time)
1752 		(void)director_resend_sync(dir);
1753 	return TRUE;
1754 }
1755 
director_disconnect_timeout(struct director_connection * conn)1756 static void director_disconnect_timeout(struct director_connection *conn)
1757 {
1758 	director_connection_deinit(&conn, "CONNECT requested");
1759 }
1760 
1761 static void
director_reconnect_after_wrong_connect_timeout(struct director_connection * conn)1762 director_reconnect_after_wrong_connect_timeout(struct director_connection *conn)
1763 {
1764 	struct director *dir = conn->dir;
1765 
1766 	director_connection_deinit(&conn, "Wrong CONNECT requested");
1767 	if (dir->right == NULL)
1768 		director_connect(dir, "Reconnecting after wrong CONNECT request");
1769 }
1770 
1771 static void
director_reconnect_after_wrong_connect(struct director_connection * conn)1772 director_reconnect_after_wrong_connect(struct director_connection *conn)
1773 {
1774 	if (conn->to_disconnect != NULL)
1775 		return;
1776 	conn->to_disconnect =
1777 		timeout_add_short(DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS,
1778 				  director_reconnect_after_wrong_connect_timeout, conn);
1779 }
1780 
director_cmd_connect(struct director_connection * conn,const char * const * args)1781 static bool director_cmd_connect(struct director_connection *conn,
1782 				 const char *const *args)
1783 {
1784 	struct director *dir = conn->dir;
1785 	struct director_host *host;
1786 	struct ip_addr ip;
1787 	in_port_t port;
1788 	const char *right_state;
1789 
1790 	if (str_array_length(args) != 2 ||
1791 	    !director_args_parse_ip_port(conn, args, &ip, &port)) {
1792 		director_cmd_error(conn, "Invalid parameters");
1793 		return FALSE;
1794 	}
1795 
1796 	host = director_host_get(conn->dir, &ip, port);
1797 
1798 	/* remote suggests us to connect elsewhere */
1799 	if (dir->right != NULL &&
1800 	    director_host_cmp_to_self(host, dir->right->host,
1801 				      dir->self_host) <= 0) {
1802 		/* the old connection is the correct one */
1803 		e_debug(conn->event, "Ignoring CONNECT request to %s (current right is %s)",
1804 			host->name, dir->right->name);
1805 		director_reconnect_after_wrong_connect(conn);
1806 		return TRUE;
1807 	}
1808 	if (host->removed) {
1809 		e_debug(conn->event, "Ignoring CONNECT request to %s (director is removed)",
1810 			host->name);
1811 		director_reconnect_after_wrong_connect(conn);
1812 		return TRUE;
1813 	}
1814 
1815 	/* reset failure timestamp so we'll actually try to connect there. */
1816 	host->last_network_failure = 0;
1817 	/* reset removed-flag, so we don't crash */
1818 	host->removed = FALSE;
1819 
1820 	if (dir->right == NULL) {
1821 		right_state = "initializing right";
1822 	} else {
1823 		right_state = t_strdup_printf("replacing current right %s",
1824 					      dir->right->name);
1825 		/* disconnect from right side immediately - it's not accepting
1826 		   any further commands from us. */
1827 		if (conn->dir->right != conn)
1828 			director_connection_deinit(&conn->dir->right, "CONNECT requested");
1829 		else if (conn->to_disconnect == NULL) {
1830 			conn->to_disconnect =
1831 				timeout_add_short(0, director_disconnect_timeout, conn);
1832 		}
1833 	}
1834 
1835 	/* connect here */
1836 	(void)director_connect_host(dir, host, t_strdup_printf(
1837 		"Received CONNECT request from %s - %s",
1838 		conn->name, right_state));
1839 	return TRUE;
1840 }
1841 
director_disconnect_wrong_lefts(struct director * dir)1842 static void director_disconnect_wrong_lefts(struct director *dir)
1843 {
1844 	struct director_connection *conn;
1845 
1846 	array_foreach_elem(&dir->connections, conn) {
1847 		if (conn->in && conn != dir->left && conn->me_received &&
1848 		    conn->to_disconnect == NULL &&
1849 		    director_host_cmp_to_self(dir->left->host, conn->host,
1850 					      dir->self_host) < 0)
1851 			director_connection_send_connect(conn, dir->left->host);
1852 	}
1853 }
1854 
director_cmd_ping(struct director_connection * conn,const char * const * args)1855 static bool director_cmd_ping(struct director_connection *conn,
1856 			      const char *const *args)
1857 {
1858 	time_t sent_time;
1859 	uintmax_t send_buffer_size;
1860 
1861 	if (str_array_length(args) >= 2 &&
1862 	    str_to_time(args[0], &sent_time) == 0 &&
1863 	    str_to_uintmax(args[1], &send_buffer_size) == 0) {
1864 		int diff_secs = ioloop_time - sent_time;
1865 		if (diff_secs*1000+500 > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) {
1866 			e_warning(conn->event,
1867 				  "PING response took %d secs to receive "
1868 				  "(send buffer was %ju bytes)",
1869 				  diff_secs, send_buffer_size);
1870 		}
1871 	}
1872 	director_connection_send(conn,
1873 		t_strdup_printf("PONG\t%"PRIdTIME_T"\t%zu\n",
1874 		ioloop_time, o_stream_get_buffer_used_size(conn->output)));
1875 	return TRUE;
1876 }
1877 
1878 static void
director_ping_append_extra(struct director_connection * conn,string_t * str,time_t pong_sent_time,uintmax_t pong_send_buffer_size)1879 director_ping_append_extra(struct director_connection *conn, string_t *str,
1880 			   time_t pong_sent_time,
1881 			   uintmax_t pong_send_buffer_size)
1882 {
1883 	struct rusage usage;
1884 
1885 	str_printfa(str, "buffer size at PING was %zu bytes", conn->ping_sent_buffer_size);
1886 	if (pong_sent_time != 0) {
1887 		str_printfa(str, ", remote sent it %"PRIdTIME_T" secs ago",
1888 			    ioloop_time - pong_sent_time);
1889 	}
1890 	if (pong_send_buffer_size != (uintmax_t)-1) {
1891 		str_printfa(str, ", remote buffer size at PONG was %ju bytes",
1892 			    pong_send_buffer_size);
1893 	}
1894 	if (conn->ping_sent_user_cpu.tv_sec != (time_t)-1 &&
1895 	    getrusage(RUSAGE_SELF, &usage) == 0) {
1896 		int diff = timeval_diff_msecs(&usage.ru_utime,
1897 					      &conn->ping_sent_user_cpu);
1898 		str_printfa(str, ", %u.%03u CPU secs since PING was sent",
1899 			    diff/1000, diff%1000);
1900 	}
1901 	str_printfa(str, ", %"PRIuUOFF_T" bytes input",
1902 		    conn->input->v_offset - conn->ping_sent_input_offset);
1903 	str_printfa(str, ", %"PRIuUOFF_T" bytes output",
1904 		    conn->output->offset - conn->ping_sent_output_offset);
1905 }
1906 
director_cmd_pong(struct director_connection * conn,const char * const * args)1907 static bool director_cmd_pong(struct director_connection *conn,
1908 			      const char *const *args)
1909 {
1910 	time_t sent_time;
1911 	uintmax_t send_buffer_size;
1912 
1913 	if (!conn->ping_waiting)
1914 		return TRUE;
1915 	conn->ping_waiting = FALSE;
1916 	timeout_remove(&conn->to_pong);
1917 
1918 	if (str_array_length(args) < 2 ||
1919 	    str_to_time(args[0], &sent_time) < 0 ||
1920 	    str_to_uintmax(args[1], &send_buffer_size) < 0) {
1921 		sent_time = 0;
1922 		send_buffer_size = (uintmax_t)-1;
1923 	}
1924 
1925 	int ping_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
1926 	if (ping_msecs >= 0) {
1927 		if (ping_msecs > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) {
1928 			string_t *extra = t_str_new(128);
1929 			director_ping_append_extra(conn, extra, sent_time, send_buffer_size);
1930 			e_warning(conn->event,
1931 				  "PONG response took %u.%03u secs (%s)",
1932 				  ping_msecs/1000, ping_msecs%1000,
1933 				  str_c(extra));
1934 		}
1935 		conn->last_ping_msecs = ping_msecs;
1936 	}
1937 
1938 	if (conn->verifying_left) {
1939 		conn->verifying_left = FALSE;
1940 		if (conn == conn->dir->left) {
1941 			/* our left side is functional. tell all the wrong
1942 			   incoming connections to connect to it instead. */
1943 			director_disconnect_wrong_lefts(conn->dir);
1944 		}
1945 	}
1946 
1947 	director_connection_set_ping_timeout(conn);
1948 	return TRUE;
1949 }
1950 
1951 static bool
director_connection_handle_cmd(struct director_connection * conn,const char * cmd,const char * const * args)1952 director_connection_handle_cmd(struct director_connection *conn,
1953 			       const char *cmd, const char *const *args)
1954 {
1955 	int ret;
1956 
1957 	if (!conn->handshake_received) {
1958 		ret = director_connection_handle_handshake(conn, cmd, args);
1959 		if (ret > 0)
1960 			return TRUE;
1961 		if (ret < 0) {
1962 			/* invalid commands during handshake,
1963 			   we probably don't want to reconnect here */
1964 			return FALSE;
1965 		}
1966 		/* allow also other commands during handshake */
1967 	}
1968 
1969 	if (strcmp(cmd, "PING") == 0)
1970 		return director_cmd_ping(conn, args);
1971 	if (strcmp(cmd, "PONG") == 0)
1972 		return director_cmd_pong(conn, args);
1973 	if (strcmp(cmd, "USER") == 0)
1974 		return director_cmd_user(conn, args);
1975 	if (strcmp(cmd, "USER-WEAK") == 0)
1976 		return director_cmd_user_weak(conn, args);
1977 	if (strcmp(cmd, "HOST") == 0)
1978 		return director_cmd_host(conn, args);
1979 	if (strcmp(cmd, "HOST-REMOVE") == 0)
1980 		return director_cmd_host_remove(conn, args);
1981 	if (strcmp(cmd, "HOST-FLUSH") == 0)
1982 		return director_cmd_host_flush(conn, args);
1983 	if (strcmp(cmd, "USER-MOVE") == 0)
1984 		return director_cmd_user_move(conn, args);
1985 	if (strcmp(cmd, "USER-KICK") == 0)
1986 		return director_cmd_user_kick(conn, args);
1987 	if (strcmp(cmd, "USER-KICK-ALT") == 0)
1988 		return director_cmd_user_kick_alt(conn, args);
1989 	if (strcmp(cmd, "USER-KICK-HASH") == 0)
1990 		return director_cmd_user_kick_hash(conn, args);
1991 	if (strcmp(cmd, "USER-KILLED") == 0)
1992 		return director_cmd_user_killed(conn, args);
1993 	if (strcmp(cmd, "USER-KILLED-EVERYWHERE") == 0)
1994 		return director_cmd_user_killed_everywhere(conn, args);
1995 	if (strcmp(cmd, "DIRECTOR") == 0)
1996 		return director_cmd_director(conn, args);
1997 	if (strcmp(cmd, "DIRECTOR-REMOVE") == 0)
1998 		return director_cmd_director_remove(conn, args);
1999 	if (strcmp(cmd, "SYNC") == 0)
2000 		return director_connection_sync(conn, args);
2001 	if (strcmp(cmd, "CONNECT") == 0)
2002 		return director_cmd_connect(conn, args);
2003 	if (strcmp(cmd, "QUIT") == 0) {
2004 		e_warning(conn->event,
2005 			  "Director %s disconnected us with reason: %s",
2006 			  conn->name, t_strarray_join(args, " "));
2007 		return FALSE;
2008 	}
2009 
2010 	director_cmd_error(conn, "Unknown command %s", cmd);
2011 	return FALSE;
2012 }
2013 
2014 static bool
director_connection_handle_line(struct director_connection * conn,char * line)2015 director_connection_handle_line(struct director_connection *conn,
2016 				char *line)
2017 {
2018 	const char *cmd, *const *args;
2019 	bool ret;
2020 
2021 	e_debug(conn->event, "input: %s", line);
2022 
2023 	args = t_strsplit_tabescaped_inplace(line);
2024 	cmd = args[0];
2025 	if (cmd == NULL) {
2026 		director_cmd_error(conn, "Received empty line");
2027 		return FALSE;
2028 	}
2029 
2030 	conn->cur_cmd = cmd;
2031 	conn->cur_args = args;
2032 	ret = director_connection_handle_cmd(conn, cmd, args+1);
2033 	conn->cur_cmd = NULL;
2034 	conn->cur_args = NULL;
2035 	return ret;
2036 }
2037 
2038 static void
director_connection_log_disconnect(struct director_connection * conn,int err,const char * errstr)2039 director_connection_log_disconnect(struct director_connection *conn, int err,
2040 				   const char *errstr)
2041 {
2042 	string_t *str = t_str_new(128);
2043 
2044 	i_assert(conn->connected);
2045 
2046 	if (conn->connect_request_to != NULL) {
2047 		e_warning(conn->event,
2048 			  "Director %s tried to connect to us, "
2049 			  "should use %s instead",
2050 			  conn->name, conn->connect_request_to->name);
2051 		return;
2052 	}
2053 
2054 	str_printfa(str, "Director %s disconnected: ", conn->name);
2055 	str_append(str, "Connection closed");
2056 	if (err != 0 && err != EPIPE) {
2057 		errno = err;
2058 		if (errstr[0] == '\0')
2059 			str_printfa(str, ": %m");
2060 		else
2061 			str_printfa(str, ": %s", errstr);
2062 	}
2063 
2064 	str_append(str, " (");
2065 	director_connection_append_stats(conn, str);
2066 
2067 	if (!conn->me_received)
2068 		str_append(str, ", handshake ME not received");
2069 	else if (!conn->handshake_received)
2070 		str_append(str, ", handshake DONE not received");
2071 	if (conn->synced)
2072 		str_append(str, ", synced");
2073 	str_append_c(str, ')');
2074 	e_error(conn->event, "%s", str_c(str));
2075 }
2076 
director_connection_input(struct director_connection * conn)2077 static void director_connection_input(struct director_connection *conn)
2078 {
2079 	struct director *dir = conn->dir;
2080 	char *line;
2081 	uoff_t prev_offset;
2082 	bool ret;
2083 
2084 	switch (i_stream_read(conn->input)) {
2085 	case 0:
2086 		return;
2087 	case -1:
2088 		/* disconnected */
2089 		director_connection_log_disconnect(conn, conn->input->stream_errno,
2090 						   i_stream_get_error(conn->input));
2091 		director_connection_disconnected(&conn, i_stream_get_error(conn->input));
2092 		return;
2093 	case -2:
2094 		/* buffer full */
2095 		director_cmd_error(conn, "Director sent us more than %d bytes",
2096 				   MAX_INBUF_SIZE);
2097 		director_connection_reconnect(&conn, "Too long input line");
2098 		return;
2099 	}
2100 
2101 	if (conn->to_disconnect != NULL) {
2102 		/* just read everything the remote sends, and wait for it
2103 		   to disconnect. we mainly just want the remote to read the
2104 		   CONNECT we sent it. */
2105 		i_stream_skip(conn->input, i_stream_get_data_size(conn->input));
2106 		return;
2107 	}
2108 	conn->last_input = ioloop_timeval;
2109 	conn->refcount++;
2110 
2111 	director_sync_freeze(dir);
2112 	prev_offset = conn->input->v_offset;
2113 	while ((line = i_stream_next_line(conn->input)) != NULL) {
2114 		dir->ring_traffic_input += conn->input->v_offset - prev_offset;
2115 		prev_offset = conn->input->v_offset;
2116 
2117 		T_BEGIN {
2118 			ret = director_connection_handle_line(conn, line);
2119 		} T_END;
2120 
2121 		if (!ret) {
2122 			if (!director_connection_unref(conn))
2123 				break;
2124 			director_connection_reconnect(&conn, t_strdup_printf(
2125 				"Invalid input: %s", line));
2126 			break;
2127 		}
2128 	}
2129 	director_sync_thaw(dir);
2130 	if (conn != NULL) {
2131 		if (director_connection_unref(conn))
2132 			timeout_reset(conn->to_ping);
2133 	}
2134 }
2135 
director_connection_send_directors(struct director_connection * conn)2136 static void director_connection_send_directors(struct director_connection *conn)
2137 {
2138 	struct director_host *host;
2139 	string_t *str = t_str_new(64);
2140 
2141 	array_foreach_elem(&conn->dir->dir_hosts, host) {
2142 		if (host->removed)
2143 			continue;
2144 
2145 		str_truncate(str, 0);
2146 		str_printfa(str, "DIRECTOR\t%s\t%u\n",
2147 			    host->ip_str, host->port);
2148 		director_connection_send(conn, str_c(str));
2149 	}
2150 }
2151 
2152 static void
director_connection_send_hosts(struct director_connection * conn)2153 director_connection_send_hosts(struct director_connection *conn)
2154 {
2155 	struct mail_host *host;
2156 	bool send_updowns;
2157 	string_t *str = t_str_new(128);
2158 
2159 	i_assert(conn->version_received);
2160 
2161 	send_updowns = conn->minor_version >= DIRECTOR_VERSION_UPDOWN;
2162 
2163 	str_printfa(str, "HOST-HAND-START\t%u\n",
2164 		    conn->dir->ring_handshaked ? 1 : 0);
2165 	array_foreach_elem(mail_hosts_get(conn->dir->mail_hosts), host) {
2166 		const char *host_tag = mail_host_get_tag(host);
2167 
2168 		str_printfa(str, "HOST\t%s\t%u",
2169 			    host->ip_str, host->vhost_count);
2170 		if (host_tag[0] != '\0' || send_updowns) {
2171 			str_append_c(str, '\t');
2172 			str_append_tabescaped(str, host_tag);
2173 		}
2174 		if (send_updowns) {
2175 			str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
2176 				    (long)host->last_updown_change);
2177 			if (host->hostname != NULL)
2178 				str_append_tabescaped(str, host->hostname);
2179 		}
2180 		str_append_c(str, '\n');
2181 		director_connection_send(conn, str_c(str));
2182 		str_truncate(str, 0);
2183 	}
2184 	str_printfa(str, "HOST-HAND-END\t%u\n",
2185 		    conn->dir->ring_handshaked ? 1 : 0);
2186 	director_connection_send(conn, str_c(str));
2187 }
2188 
director_connection_send_done(struct director_connection * conn)2189 static int director_connection_send_done(struct director_connection *conn)
2190 {
2191 	i_assert(conn->version_received);
2192 
2193 	if (conn->minor_version >= DIRECTOR_VERSION_OPTIONS) {
2194 		director_connection_send(conn,
2195 			"OPTIONS\t"DIRECTOR_OPT_CONSISTENT_HASHING"\n");
2196 	} else {
2197 		e_error(conn->event, "Director version is too old for supporting director_consistent_hashing=yes");
2198 		return -1;
2199 	}
2200 	director_connection_send(conn, "DONE\n");
2201 	return 0;
2202 }
2203 
director_connection_send_users(struct director_connection * conn)2204 static int director_connection_send_users(struct director_connection *conn)
2205 {
2206 	struct user *user;
2207 	string_t *str = t_str_new(128);
2208 	char dec_buf[MAX_INT_STRLEN];
2209 	unsigned int sent_count = 0;
2210 	int ret;
2211 
2212 	i_assert(conn->version_received);
2213 
2214 	/* with new versions use "U" for sending the handshake users, because
2215 	   otherwise their parameters may look identical and can't be
2216 	   distinguished. */
2217 	if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_HANDSHAKE_U_CMD)
2218 		str_append(str, "U\t");
2219 	else
2220 		str_append(str, "USER\t");
2221 	size_t cmd_prefix_len = str_len(str);
2222 	while ((user = director_iterate_users_next(conn->user_iter)) != NULL) {
2223 		str_truncate(str, cmd_prefix_len);
2224 		str_append(str, dec2str_buf(dec_buf, user->username_hash));
2225 		str_append_c(str, '\t');
2226 		str_append(str, user->host->ip_str);
2227 		str_append_c(str, '\t');
2228 		str_append(str, dec2str_buf(dec_buf, user->timestamp));
2229 		if (user->weak)
2230 			str_append(str, "\tw");
2231 		str_append_c(str, '\n');
2232 
2233 		conn->handshake_users_sent++;
2234 		director_connection_send(conn, str_c(str));
2235 		if (++sent_count >= DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH) {
2236 			/* Don't send too much at once to avoid hangs */
2237 			timeout_reset(conn->to_ping);
2238 			return 0;
2239 		}
2240 
2241 		if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
2242 			if ((ret = o_stream_flush(conn->output)) <= 0) {
2243 				/* continue later */
2244 				timeout_reset(conn->to_ping);
2245 				return ret;
2246 			}
2247 		}
2248 	}
2249 	director_iterate_users_deinit(&conn->user_iter);
2250 	if (director_connection_send_done(conn) < 0)
2251 		return -1;
2252 
2253 	if (conn->users_unsorted && conn->handshake_received) {
2254 		/* we received remote's list of users before sending ours */
2255 		conn->users_unsorted = FALSE;
2256 		mail_hosts_sort_users(conn->dir->mail_hosts);
2257 	}
2258 
2259 	ret = o_stream_flush(conn->output);
2260 	timeout_reset(conn->to_ping);
2261 	return ret;
2262 }
2263 
director_connection_output(struct director_connection * conn)2264 static int director_connection_output(struct director_connection *conn)
2265 {
2266 	int ret;
2267 
2268 	conn->last_output = ioloop_timeval;
2269 	if (conn->user_iter != NULL) {
2270 		/* still handshaking USER list */
2271 		ret = director_connection_send_users(conn);
2272 		if (ret < 0) {
2273 			director_connection_log_disconnect(conn, conn->output->stream_errno,
2274 				o_stream_get_error(conn->output));
2275 			director_connection_disconnected(&conn,
2276 				o_stream_get_error(conn->output));
2277 		} else {
2278 			o_stream_set_flush_pending(conn->output, TRUE);
2279 		}
2280 		return ret;
2281 	}
2282 	return o_stream_flush(conn->output);
2283 }
2284 
2285 static struct director_connection *
director_connection_init_common(struct director * dir,int fd)2286 director_connection_init_common(struct director *dir, int fd)
2287 {
2288 	struct director_connection *conn;
2289 
2290 	conn = i_new(struct director_connection, 1);
2291 	conn->refcount = 1;
2292 	conn->created = ioloop_timeval;
2293 	conn->fd = fd;
2294 	conn->dir = dir;
2295 	conn->event = event_create(dir->event);
2296 	conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE);
2297 	conn->output = o_stream_create_fd(conn->fd, dir->set->director_output_buffer_size);
2298 	o_stream_set_no_error_handling(conn->output, TRUE);
2299 	array_push_back(&dir->connections, &conn);
2300 	return conn;
2301 }
2302 
director_connection_send_handshake(struct director_connection * conn)2303 static void director_connection_send_handshake(struct director_connection *conn)
2304 {
2305 	director_connection_send(conn, t_strdup_printf(
2306 		"VERSION\t"DIRECTOR_VERSION_NAME"\t%u\t%u\n"
2307 		"ME\t%s\t%u\t%lld\n",
2308 		DIRECTOR_VERSION_MAJOR, DIRECTOR_VERSION_MINOR,
2309 		net_ip2addr(&conn->dir->self_ip), conn->dir->self_port,
2310 		(long long)time(NULL)));
2311 }
2312 
director_connection_set_connected(struct director_connection * conn)2313 static void director_connection_set_connected(struct director_connection *conn)
2314 {
2315 	struct rusage usage;
2316 
2317 	conn->connected = TRUE;
2318 	conn->connected_time = ioloop_timeval;
2319 
2320 	if (getrusage(RUSAGE_SELF, &usage) == 0) {
2321 		conn->connected_user_cpu_set = TRUE;
2322 		conn->connected_user_cpu = usage.ru_utime;
2323 	}
2324 }
2325 
2326 struct director_connection *
director_connection_init_in(struct director * dir,int fd,const struct ip_addr * ip)2327 director_connection_init_in(struct director *dir, int fd,
2328 			    const struct ip_addr *ip)
2329 {
2330 	struct director_connection *conn;
2331 
2332 	conn = director_connection_init_common(dir, fd);
2333 	conn->in = TRUE;
2334 	director_connection_set_connected(conn);
2335 	director_connection_set_name(conn,
2336 		t_strdup_printf("%s/in", net_ip2addr(ip)));
2337 	conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
2338 	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
2339 				    director_connection_init_timeout, conn);
2340 
2341 	e_info(conn->event, "Incoming connection from director %s", conn->name);
2342 	director_connection_send_handshake(conn);
2343 	return conn;
2344 }
2345 
director_connection_connected(struct director_connection * conn)2346 static void director_connection_connected(struct director_connection *conn)
2347 {
2348 	int err;
2349 
2350 	if ((err = net_geterror(conn->fd)) != 0) {
2351 		e_error(conn->event, "connect() failed: %s", strerror(err));
2352 		director_connection_disconnected(&conn, strerror(err));
2353 		return;
2354 	}
2355 	director_connection_set_connected(conn);
2356 	o_stream_set_flush_callback(conn->output,
2357 				    director_connection_output, conn);
2358 
2359 	io_remove(&conn->io);
2360 	conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
2361 
2362 	timeout_remove(&conn->to_ping);
2363 	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
2364 				    director_connection_init_timeout, conn);
2365 
2366 	o_stream_cork(conn->output);
2367 	director_connection_send_handshake(conn);
2368 	director_connection_send_directors(conn);
2369 	o_stream_uncork(conn->output);
2370 	/* send the rest of the handshake after we've received the remote's
2371 	   version number */
2372 }
2373 
director_finish_sending_handshake(struct director_connection * conn)2374 static void director_finish_sending_handshake(struct director_connection *conn)
2375 {
2376 	if (
2377 	    conn->in) {
2378 		/* only outgoing connections send hosts & users */
2379 		return;
2380 	}
2381 	o_stream_cork(conn->output);
2382 	director_connection_send_hosts(conn);
2383 
2384 	i_assert(conn->user_iter == NULL);
2385 	/* Iterate only through users that aren't refreshed since the
2386 	   iteration started. The refreshed users will already be sent as
2387 	   regular USER updates, so they don't need to be sent again.
2388 
2389 	   We especially don't want to send these users again, because
2390 	   otherwise in a rapidly changing director we might never end up
2391 	   sending all the users when they constantly keep being added to the
2392 	   end of the list. (The iteration lists users in order from older to
2393 	   newer.) */
2394 	conn->user_iter = director_iterate_users_init(conn->dir, TRUE);
2395 
2396 	if (director_connection_send_users(conn) == 0)
2397 		o_stream_set_flush_pending(conn->output, TRUE);
2398 
2399 	o_stream_uncork(conn->output);
2400 }
2401 
2402 struct director_connection *
director_connection_init_out(struct director * dir,int fd,struct director_host * host)2403 director_connection_init_out(struct director *dir, int fd,
2404 			     struct director_host *host)
2405 {
2406 	struct director_connection *conn;
2407 
2408 	i_assert(!host->removed);
2409 
2410 	/* make sure we don't keep old sequence values across restarts */
2411 	director_host_restarted(host);
2412 
2413 	conn = director_connection_init_common(dir, fd);
2414 	director_connection_set_name(conn,
2415 		t_strdup_printf("%s/out", host->name));
2416 	conn->host = host;
2417 	director_host_ref(host);
2418 	conn->io = io_add(conn->fd, IO_WRITE,
2419 			  director_connection_connected, conn);
2420 	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS,
2421 				    director_connection_init_timeout, conn);
2422 	return conn;
2423 }
2424 
director_connection_deinit(struct director_connection ** _conn,const char * remote_reason)2425 void director_connection_deinit(struct director_connection **_conn,
2426 				const char *remote_reason)
2427 {
2428 	struct director_connection *const *conns, *conn = *_conn;
2429 	struct director *dir = conn->dir;
2430 	unsigned int i, count;
2431 
2432 	*_conn = NULL;
2433 
2434 	i_assert(conn->fd != -1);
2435 
2436 	if (conn->host != NULL) {
2437 		e_debug(conn->event, "Disconnecting from %s: %s",
2438 			conn->host->name, remote_reason);
2439 	}
2440 	if (*remote_reason != '\0' &&
2441 	    conn->minor_version >= DIRECTOR_VERSION_QUIT) {
2442 		o_stream_nsend_str(conn->output, t_strdup_printf(
2443 			"QUIT\t%s\n", remote_reason));
2444 	}
2445 
2446 	conns = array_get(&dir->connections, &count);
2447 	for (i = 0; i < count; i++) {
2448 		if (conns[i] == conn) {
2449 			array_delete(&dir->connections, i, 1);
2450 			break;
2451 		}
2452 	}
2453 	i_assert(i < count);
2454 	if (dir->left == conn) {
2455 		dir->left = NULL;
2456 		/* if there is already another handshaked incoming connection,
2457 		   use it as the new "left" */
2458 		director_assign_left(dir);
2459 	}
2460 	if (dir->right == conn)
2461 		dir->right = NULL;
2462 
2463 	if (conn->users_unsorted) {
2464 		/* Users were received, but handshake didn't finish.
2465 		   Finish sorting so the users won't stay in wrong order. */
2466 		mail_hosts_sort_users(conn->dir->mail_hosts);
2467 	}
2468 
2469 	if (conn->connect_request_to != NULL) {
2470 		director_host_unref(conn->connect_request_to);
2471 		conn->connect_request_to = NULL;
2472 	}
2473 	if (conn->user_iter != NULL)
2474 		director_iterate_users_deinit(&conn->user_iter);
2475 	timeout_remove(&conn->to_disconnect);
2476 	timeout_remove(&conn->to_pong);
2477 	timeout_remove(&conn->to_ping);
2478 	io_remove(&conn->io);
2479 	i_stream_close(conn->input);
2480 	o_stream_close(conn->output);
2481 	i_close_fd(&conn->fd);
2482 
2483 	if (conn->in)
2484 		master_service_client_connection_destroyed(master_service);
2485 	director_connection_unref(conn);
2486 
2487 	if (dir->left == NULL || dir->right == NULL) {
2488 		/* we aren't synced until we're again connected to a ring */
2489 		dir->sync_seq++;
2490 		director_set_ring_unsynced(dir);
2491 	}
2492 }
2493 
director_connection_unref(struct director_connection * conn)2494 static bool director_connection_unref(struct director_connection *conn)
2495 {
2496 	i_assert(conn->refcount > 0);
2497 	if (--conn->refcount > 0)
2498 		return TRUE;
2499 
2500 	if (conn->host != NULL)
2501 		director_host_unref(conn->host);
2502 	i_stream_unref(&conn->input);
2503 	o_stream_unref(&conn->output);
2504 	event_unref(&conn->event);
2505 	i_free(conn->name);
2506 	i_free(conn);
2507 	return FALSE;
2508 }
2509 
director_connection_disconnected(struct director_connection ** _conn,const char * reason)2510 static void director_connection_disconnected(struct director_connection **_conn,
2511 					     const char *reason)
2512 {
2513 	struct director_connection *conn = *_conn;
2514 	struct director *dir = conn->dir;
2515 
2516 	if ((conn->connected_time.tv_sec == 0 ||
2517 	     conn->connected_time.tv_sec + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time) &&
2518 	    conn->host != NULL) {
2519 		/* connection didn't exist for very long, assume it has a
2520 		   network problem */
2521 		conn->host->last_network_failure = ioloop_time;
2522 	}
2523 
2524 	director_connection_deinit(_conn, reason);
2525 	if (dir->right == NULL)
2526 		director_connect(dir, "Reconnecting after disconnection");
2527 }
2528 
director_connection_reconnect(struct director_connection ** _conn,const char * reason)2529 static void director_connection_reconnect(struct director_connection **_conn,
2530 					  const char *reason)
2531 {
2532 	struct director_connection *conn = *_conn;
2533 	struct director *dir = conn->dir;
2534 
2535 	director_connection_deinit(_conn, reason);
2536 	if (dir->right == NULL)
2537 		director_connect(dir, "Reconnecting after error");
2538 }
2539 
director_disconnect_write_error(struct director_connection * conn)2540 static void director_disconnect_write_error(struct director_connection *conn)
2541 {
2542 	struct director *dir = conn->dir;
2543 
2544 	director_connection_deinit(&conn, "write failure");
2545 	if (dir->right == NULL)
2546 		director_connect(dir, "Reconnecting after write failure");
2547 }
2548 
director_connection_send(struct director_connection * conn,const char * data)2549 void director_connection_send(struct director_connection *conn,
2550 			      const char *data)
2551 {
2552 	size_t len = strlen(data);
2553 	off_t ret;
2554 
2555 	if (conn->output->closed || !conn->connected)
2556 		return;
2557 
2558 	if (event_want_debug(conn->event)) T_BEGIN {
2559 		const char *const *lines = t_strsplit(data, "\n");
2560 		for (; lines[1] != NULL; lines++)
2561 			e_debug(conn->event, "output: %s", *lines);
2562 	} T_END;
2563 	ret = o_stream_send(conn->output, data, len);
2564 	if (ret != (off_t)len) {
2565 		if (ret < 0) {
2566 			director_connection_log_disconnect(conn,
2567 				conn->output->stream_errno,
2568 				t_strdup_printf("write() failed: %s",
2569 					o_stream_get_error(conn->output)));
2570 		} else {
2571 			director_connection_log_disconnect(conn, EINVAL,
2572 				t_strdup_printf("Output buffer full at %zu",
2573 					o_stream_get_buffer_used_size(conn->output)));
2574 		}
2575 		o_stream_close(conn->output);
2576 		/* closing the stream when output buffer is full doesn't cause
2577 		   disconnection itself. */
2578 		timeout_remove(&conn->to_disconnect);
2579 		conn->to_disconnect =
2580 			timeout_add_short(0, director_disconnect_write_error, conn);
2581 	} else {
2582 		conn->dir->ring_traffic_output += len;
2583 		conn->last_output = ioloop_timeval;
2584 		conn->peak_bytes_buffered =
2585 			I_MAX(conn->peak_bytes_buffered,
2586 			      o_stream_get_buffer_used_size(conn->output));
2587 	}
2588 }
2589 
2590 static void
director_connection_ping_idle_timeout(struct director_connection * conn)2591 director_connection_ping_idle_timeout(struct director_connection *conn)
2592 {
2593 	string_t *str = t_str_new(128);
2594 	int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
2595 
2596 	str_printfa(str, "Ping timed out in %u.%03u secs: ",
2597 		    diff/1000, diff%1000);
2598 	director_ping_append_extra(conn, str, 0, (uintmax_t)-1);
2599 	director_connection_log_disconnect(conn, EINVAL, str_c(str));
2600 	director_connection_disconnected(&conn, "Ping timeout");
2601 }
2602 
director_connection_pong_timeout(struct director_connection * conn)2603 static void director_connection_pong_timeout(struct director_connection *conn)
2604 {
2605 	int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
2606 	const char *errstr;
2607 
2608 	errstr = t_strdup_printf(
2609 		"PONG reply not received in %u.%03u secs, "
2610 		"although other input keeps coming",
2611 		diff/1000, diff%1000);
2612 	director_connection_log_disconnect(conn, EINVAL, errstr);
2613 	director_connection_disconnected(&conn, "Pong timeout");
2614 }
2615 
director_connection_ping(struct director_connection * conn)2616 void director_connection_ping(struct director_connection *conn)
2617 {
2618 	if (conn->ping_waiting)
2619 		return;
2620 
2621 	timeout_remove(&conn->to_ping);
2622 	conn->to_ping = timeout_add(conn->dir->set->director_ping_idle_timeout*1000,
2623 				    director_connection_ping_idle_timeout, conn);
2624 	conn->to_pong = timeout_add(conn->dir->set->director_ping_max_timeout*1000,
2625 				    director_connection_pong_timeout, conn);
2626 	conn->ping_waiting = TRUE;
2627 	conn->ping_sent_time = ioloop_timeval;
2628 	conn->ping_sent_buffer_size = o_stream_get_buffer_used_size(conn->output);
2629 	conn->ping_sent_input_offset = conn->input->v_offset;
2630 	conn->ping_sent_output_offset = conn->output->offset;
2631 
2632 	struct rusage usage;
2633 	if (getrusage(RUSAGE_SELF, &usage) == 0)
2634 		conn->ping_sent_user_cpu = usage.ru_utime;
2635 	else
2636 		conn->ping_sent_user_cpu.tv_sec = (time_t)-1;
2637 	/* send it after getting the buffer size */
2638 	director_connection_send(conn, t_strdup_printf(
2639 		"PING\t%"PRIdTIME_T"\t%zu\n", ioloop_time,
2640 		conn->ping_sent_buffer_size));
2641 }
2642 
director_connection_get_name(struct director_connection * conn)2643 const char *director_connection_get_name(struct director_connection *conn)
2644 {
2645 	return conn->name;
2646 }
2647 
2648 struct director_host *
director_connection_get_host(struct director_connection * conn)2649 director_connection_get_host(struct director_connection *conn)
2650 {
2651 	return conn->host;
2652 }
2653 
director_connection_is_handshaked(struct director_connection * conn)2654 bool director_connection_is_handshaked(struct director_connection *conn)
2655 {
2656 	return conn->handshake_received;
2657 }
2658 
director_connection_is_synced(struct director_connection * conn)2659 bool director_connection_is_synced(struct director_connection *conn)
2660 {
2661 	return conn->synced;
2662 }
2663 
director_connection_is_incoming(struct director_connection * conn)2664 bool director_connection_is_incoming(struct director_connection *conn)
2665 {
2666 	return conn->in;
2667 }
2668 
2669 unsigned int
director_connection_get_minor_version(struct director_connection * conn)2670 director_connection_get_minor_version(struct director_connection *conn)
2671 {
2672 	return conn->minor_version;
2673 }
2674 
director_connection_cork(struct director_connection * conn)2675 void director_connection_cork(struct director_connection *conn)
2676 {
2677 	o_stream_cork(conn->output);
2678 }
2679 
director_connection_uncork(struct director_connection * conn)2680 void director_connection_uncork(struct director_connection *conn)
2681 {
2682 	o_stream_uncork(conn->output);
2683 }
2684 
director_connection_set_synced(struct director_connection * conn,bool synced)2685 void director_connection_set_synced(struct director_connection *conn,
2686 				    bool synced)
2687 {
2688 	if (conn->synced == synced)
2689 		return;
2690 	conn->synced = synced;
2691 
2692 	/* switch ping timeout, unless we're already waiting for PONG */
2693 	if (conn->ping_waiting)
2694 		return;
2695 
2696 	director_connection_set_ping_timeout(conn);
2697 }
2698 
director_connection_get_status(struct director_connection * conn,struct director_connection_status * status_r)2699 void director_connection_get_status(struct director_connection *conn,
2700 				    struct director_connection_status *status_r)
2701 {
2702 	i_zero(status_r);
2703 	status_r->bytes_read = conn->input->v_offset;
2704 	status_r->bytes_sent = conn->output->offset;
2705 	status_r->bytes_buffered = o_stream_get_buffer_used_size(conn->output);
2706 	status_r->peak_bytes_buffered = conn->peak_bytes_buffered;
2707 	status_r->last_input = conn->last_input;
2708 	status_r->last_output = conn->last_output;
2709 	status_r->last_ping_msecs = conn->last_ping_msecs;
2710 	status_r->handshake_users_sent = conn->handshake_users_sent;
2711 	status_r->handshake_users_received = conn->handshake_users_received;
2712 }
2713