1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2
3 /*
4 Handshaking:
5
6 Incoming director connections send:
7
8 VERSION
9 ME
10 <wait for DONE from remote handshake>
11 DONE
12 <make this connection our "left" connection, potentially disconnecting
13 another one>
14
15 Outgoing director connections send:
16
17 VERSION
18 ME
19 [0..n] DIRECTOR
20 HOST-HAND-START
21 [0..n] HOST
22 HOST-HAND-END
23 [0..n] USER
24 <possibly other non-handshake commands between USERs>
25 DONE
26 <wait for DONE from remote>
27 <make this connection our "right" connection, potentially disconnecting
28 another one>
29 */
30
31 #include "lib.h"
32 #include "ioloop.h"
33 #include "array.h"
34 #include "net.h"
35 #include "istream.h"
36 #include "ostream.h"
37 #include "str.h"
38 #include "strescape.h"
39 #include "time-util.h"
40 #include "master-service.h"
41 #include "mail-host.h"
42 #include "director.h"
43 #include "director-host.h"
44 #include "director-request.h"
45 #include "director-connection.h"
46
47 #include <unistd.h>
48 #include <sys/time.h>
49 #include <sys/resource.h>
50
51 #define MAX_INBUF_SIZE 1024
52 #define OUTBUF_FLUSH_THRESHOLD (1024*128)
53 /* Max time to wait for connect() to finish before aborting */
54 #define DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS (10*1000)
55 /* Max idling time before "ME" command must have been received,
56 or we'll disconnect. */
57 #define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000)
58 /* Max time to wait for USERs in handshake to be sent. With a lot of users the
59 kernel may quickly eat up everything we send, while the receiver is busy
60 parsing the data. */
61 #define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000)
62 /* Max idling time before "DONE" command must have been received,
63 or we'll disconnect. Use a slightly larger value than for _SEND_USERS_ so
64 that we'll get a better error if the sender decides to disconnect. */
65 #define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (40*1000)
66 /* How long to wait to send PING when connection is idle */
67 #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
68 /* How long to wait before sending PING while waiting for SYNC reply */
69 #define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000
70 /* Log a warning if PING reply or PONG response takes longer than this */
71 #define DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS (5*1000)
72 /* If outgoing director connection exists for less than this many seconds,
73 mark the host as failed so we won't try to reconnect to it immediately */
74 #define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40
75 /* If USER request doesn't have a timestamp, user isn't refreshed if it was
76 already refreshed director_user_expire/4 seconds ago. This value is the
77 hardcoded maximum for that value. */
78 #define DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS 15
79 #define DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS 1000
80 #define DIRECTOR_WAIT_DISCONNECT_SECS 10
81 #define DIRECTOR_HANDSHAKE_WARN_SECS 29
82 #define DIRECTOR_HANDSHAKE_BYTES_LOG_MIN_SECS (60*30)
83 #define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
84 /* If we receive SYNCs with a timestamp this many seconds higher than the last
85 valid received SYNC timestamp, assume that we lost the director's restart
86 notification and reset the last_sync_seq */
87 #define DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS (60*2)
88 #define DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS 1
89 /* How many USER entries to send during handshake before going back to ioloop
90 to see if there's other work to be done as well. */
91 #define DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH 10000
92
93 #define CMD_IS_USER_HANDSHAKE(minor_version, args) \
94 ((minor_version) < DIRECTOR_VERSION_HANDSHAKE_U_CMD && \
95 str_array_length(args) > 2)
96
97 #define DIRECTOR_OPT_CONSISTENT_HASHING "consistent-hashing"
98
99 struct director_connection {
100 int refcount;
101 struct director *dir;
102 struct event *event;
103 char *name;
104 struct timeval created, connected_time, me_received_time;
105 struct timeval connected_user_cpu;
106 unsigned int minor_version;
107
108 struct timeval last_input, last_output;
109 size_t peak_bytes_buffered;
110
111 struct timeval ping_sent_time;
112 size_t ping_sent_buffer_size;
113 struct timeval ping_sent_user_cpu;
114 uoff_t ping_sent_input_offset, ping_sent_output_offset;
115 unsigned int last_ping_msecs;
116
117 /* for incoming connections the director host isn't known until
118 ME-line is received */
119 struct director_host *host;
120 /* this is set only for wrong connections: */
121 struct director_host *connect_request_to;
122
123 int fd;
124 struct io *io;
125 struct istream *input;
126 struct ostream *output;
127 struct timeout *to_disconnect, *to_ping, *to_pong;
128
129 struct director_user_iter *user_iter;
130 unsigned int users_received, handshake_users_received;
131 unsigned int handshake_users_sent;
132
133 /* set during command execution */
134 const char *cur_cmd, *const *cur_args;
135
136 bool in:1;
137 bool connected:1;
138 bool version_received:1;
139 bool me_received:1;
140 bool handshake_received:1;
141 bool ignore_host_events:1;
142 bool handshake_sending_hosts:1;
143 bool ping_waiting:1;
144 bool synced:1;
145 bool wrong_host:1;
146 bool verifying_left:1;
147 bool users_unsorted:1;
148 bool connected_user_cpu_set:1;
149 };
150
151 static bool director_connection_unref(struct director_connection *conn);
152 static void director_finish_sending_handshake(struct director_connection *conn);
153 static void director_connection_disconnected(struct director_connection **conn,
154 const char *reason);
155 static void director_connection_reconnect(struct director_connection **conn,
156 const char *reason);
157 static void
158 director_connection_log_disconnect(struct director_connection *conn, int err,
159 const char *errstr);
160 static int director_connection_send_done(struct director_connection *conn);
161
162 static void
director_connection_set_name(struct director_connection * conn,const char * name)163 director_connection_set_name(struct director_connection *conn, const char *name)
164 {
165 char *old_name = conn->name;
166 conn->name = i_strdup(name);
167 i_free(old_name);
168
169 event_set_append_log_prefix(conn->event,
170 t_strdup_printf("director(%s): ", conn->name));
171 }
172
173 static void ATTR_FORMAT(2, 3)
director_cmd_error(struct director_connection * conn,const char * fmt,...)174 director_cmd_error(struct director_connection *conn, const char *fmt, ...)
175 {
176 va_list args;
177
178 va_start(args, fmt);
179 e_error(conn->event, "Command %s: %s (input: %s)",
180 conn->cur_cmd, t_strdup_vprintf(fmt, args),
181 t_strarray_join(conn->cur_args, "\t"));
182 va_end(args);
183
184 if (conn->host != NULL)
185 conn->host->last_protocol_failure = ioloop_time;
186 }
187
188 static void
director_connection_append_stats(struct director_connection * conn,string_t * str)189 director_connection_append_stats(struct director_connection *conn, string_t *str)
190 {
191 struct rusage usage;
192
193 str_printfa(str, "bytes in=%"PRIuUOFF_T", bytes out=%"PRIuUOFF_T,
194 conn->input->v_offset, conn->output->offset);
195 str_printfa(str, ", %u+%u USERs received",
196 conn->handshake_users_received, conn->users_received);
197 if (conn->handshake_users_sent > 0) {
198 str_printfa(str, ", %u USERs sent in handshake",
199 conn->handshake_users_sent);
200 }
201 if (conn->last_input.tv_sec > 0) {
202 int input_msecs = timeval_diff_msecs(&ioloop_timeval,
203 &conn->last_input);
204 str_printfa(str, ", last input %u.%03u s ago",
205 input_msecs/1000, input_msecs%1000);
206 }
207 if (conn->last_output.tv_sec > 0) {
208 int output_msecs = timeval_diff_msecs(&ioloop_timeval,
209 &conn->last_output);
210 str_printfa(str, ", last output %u.%03u s ago",
211 output_msecs/1000, output_msecs%1000);
212 }
213 if (conn->connected) {
214 int connected_msecs = timeval_diff_msecs(&ioloop_timeval,
215 &conn->connected_time);
216 str_printfa(str, ", connected %u.%03u s ago",
217 connected_msecs/1000, connected_msecs%1000);
218 }
219 if (o_stream_get_buffer_used_size(conn->output) > 0) {
220 str_printfa(str, ", %zu bytes in output buffer",
221 o_stream_get_buffer_used_size(conn->output));
222 }
223 str_printfa(str, ", %zu peak output buffer size",
224 conn->peak_bytes_buffered);
225 if (conn->connected_user_cpu_set &&
226 getrusage(RUSAGE_SELF, &usage) == 0) {
227 /* this isn't measuring the CPU usage used by the connection
228 itself, but it can still be a useful measurement */
229 int diff = timeval_diff_msecs(&usage.ru_utime,
230 &conn->connected_user_cpu);
231 str_printfa(str, ", %d.%03d CPU secs since connected",
232 diff / 1000, diff % 1000);
233 }
234 }
235
236 static void
director_connection_init_timeout(struct director_connection * conn)237 director_connection_init_timeout(struct director_connection *conn)
238 {
239 struct timeval start_time;
240 string_t *reason = t_str_new(128);
241
242 if (!conn->connected) {
243 start_time = conn->created;
244 str_append(reason, "Connect timed out");
245 } else if (!conn->me_received) {
246 start_time = conn->connected_time;
247 str_append(reason, "Handshaking ME timed out");
248 } else if (!conn->in) {
249 start_time = conn->me_received_time;
250 str_append(reason, "Sending handshake timed out");
251 } else {
252 start_time = conn->me_received_time;
253 str_append(reason, "Handshaking DONE timed out");
254 }
255 int msecs = timeval_diff_msecs(&ioloop_timeval, &start_time);
256 str_printfa(reason, " (%u.%03u secs, ", msecs/1000, msecs%1000);
257 director_connection_append_stats(conn, reason);
258 str_append_c(reason, ')');
259
260 e_error(conn->event, "%s", str_c(reason));
261 director_connection_disconnected(&conn, "Handshake timeout");
262 }
263
264 static void
director_connection_set_ping_timeout(struct director_connection * conn)265 director_connection_set_ping_timeout(struct director_connection *conn)
266 {
267 unsigned int msecs;
268
269 msecs = conn->synced || !conn->handshake_received ?
270 DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
271 DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
272
273 timeout_remove(&conn->to_ping);
274 conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
275 }
276
director_connection_wait_timeout(struct director_connection * conn)277 static void director_connection_wait_timeout(struct director_connection *conn)
278 {
279 director_connection_log_disconnect(conn, ETIMEDOUT, "");
280 director_connection_deinit(&conn,
281 "Timeout waiting for disconnect after CONNECT");
282 }
283
director_connection_send_connect(struct director_connection * conn,struct director_host * host)284 static void director_connection_send_connect(struct director_connection *conn,
285 struct director_host *host)
286 {
287 const char *connect_str;
288
289 if (conn->to_disconnect != NULL)
290 return;
291
292 connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
293 host->ip_str, host->port);
294 director_connection_send(conn, connect_str);
295 o_stream_uncork(conn->output);
296
297 /* wait for a while for the remote to disconnect, so it will hopefully
298 see our CONNECT command. we'll also log the warning later to avoid
299 multiple log lines about it. */
300 conn->connect_request_to = host;
301 director_host_ref(conn->connect_request_to);
302
303 conn->to_disconnect =
304 timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000,
305 director_connection_wait_timeout, conn);
306 }
307
director_connection_assigned(struct director_connection * conn)308 static void director_connection_assigned(struct director_connection *conn)
309 {
310 struct director *dir = conn->dir;
311
312 if (dir->left != NULL && dir->right != NULL) {
313 /* we're connected to both directors. see if the ring is
314 finished by sending a SYNC. if we get it back, it's done. */
315 dir->sync_seq++;
316 director_set_ring_unsynced(dir);
317 director_sync_send(dir, dir->self_host, dir->sync_seq,
318 DIRECTOR_VERSION_MINOR, ioloop_time,
319 mail_hosts_hash(dir->mail_hosts));
320 }
321 director_connection_set_ping_timeout(conn);
322 }
323
director_connection_assign_left(struct director_connection * conn)324 static bool director_connection_assign_left(struct director_connection *conn)
325 {
326 struct director *dir = conn->dir;
327
328 i_assert(conn->in);
329 i_assert(dir->left != conn);
330
331 /* make sure this is the correct incoming connection */
332 if (conn->host->self) {
333 e_error(conn->event, "Connection from self, dropping");
334 return FALSE;
335 } else if (dir->left == NULL) {
336 /* no conflicts yet */
337 } else if (dir->left->host == conn->host) {
338 e_warning(conn->event,
339 "Replacing left director connection %s with %s",
340 dir->left->host->name, conn->host->name);
341 director_connection_deinit(&dir->left, t_strdup_printf(
342 "Replacing with %s", conn->host->name));
343 } else if (dir->left->verifying_left) {
344 /* we're waiting to verify if our current left is still
345 working. if we don't receive a PONG, the current left
346 gets disconnected and a new left gets assigned. if we do
347 receive a PONG, we'll wait until the current left
348 disconnects us and then reassign the new left. */
349 return TRUE;
350 } else if (director_host_cmp_to_self(dir->left->host, conn->host,
351 dir->self_host) < 0) {
352 /* the old connection is the correct one.
353 refer the client there (FIXME: do we ever get here?) */
354 director_connection_send_connect(conn, dir->left->host);
355 return TRUE;
356 } else {
357 /* this new connection is the correct one, but wait until the
358 old connection gets disconnected before using this one.
359 that guarantees that the director inserting itself into
360 the ring has finished handshaking its left side, so the
361 switch will be fast. */
362 return TRUE;
363 }
364 dir->left = conn;
365 director_connection_set_name(conn,
366 t_strdup_printf("%s/left", conn->host->name));
367 director_connection_assigned(conn);
368 return TRUE;
369 }
370
director_assign_left(struct director * dir)371 static void director_assign_left(struct director *dir)
372 {
373 struct director_connection *conn;
374
375 array_foreach_elem(&dir->connections, conn) {
376 if (conn->in && conn->handshake_received &&
377 conn->to_disconnect == NULL && conn != dir->left) {
378 /* either use this or disconnect it */
379 if (!director_connection_assign_left(conn)) {
380 /* we don't want this */
381 director_connection_deinit(&conn,
382 "Unwanted incoming connection");
383 director_assign_left(dir);
384 break;
385 }
386 }
387 }
388 }
389
director_has_outgoing_connections(struct director * dir)390 static bool director_has_outgoing_connections(struct director *dir)
391 {
392 struct director_connection *conn;
393
394 array_foreach_elem(&dir->connections, conn) {
395 if (!conn->in && conn->to_disconnect == NULL)
396 return TRUE;
397 }
398 return FALSE;
399 }
400
director_send_delayed_syncs(struct director * dir)401 static void director_send_delayed_syncs(struct director *dir)
402 {
403 struct director_host *host;
404
405 i_assert(dir->right != NULL);
406
407 e_debug(dir->right->event, "Sending delayed SYNCs");
408 array_foreach_elem(&dir->dir_hosts, host) {
409 if (host->delayed_sync_seq == 0)
410 continue;
411
412 director_sync_send(dir, host, host->delayed_sync_seq,
413 host->delayed_sync_minor_version,
414 host->delayed_sync_timestamp,
415 host->delayed_sync_hosts_hash);
416 host->delayed_sync_seq = 0;
417 }
418 }
419
director_connection_assign_right(struct director_connection * conn)420 static bool director_connection_assign_right(struct director_connection *conn)
421 {
422 struct director *dir = conn->dir;
423
424 i_assert(!conn->in);
425
426 if (dir->right != NULL) {
427 /* see if we should disconnect or keep the existing
428 connection. */
429 if (director_host_cmp_to_self(conn->host, dir->right->host,
430 dir->self_host) <= 0) {
431 /* the old connection is the correct one */
432 e_warning(conn->event,
433 "Aborting incorrect outgoing connection to %s "
434 "(already connected to correct one: %s)",
435 conn->host->name, dir->right->host->name);
436 conn->wrong_host = TRUE;
437 return FALSE;
438 }
439 e_warning(conn->event,
440 "Replacing right director connection %s with %s",
441 dir->right->host->name, conn->host->name);
442 director_connection_deinit(&dir->right, t_strdup_printf(
443 "Replacing with %s", conn->host->name));
444 }
445 dir->right = conn;
446 director_connection_set_name(conn,
447 t_strdup_printf("%s/right", conn->host->name));
448 director_connection_assigned(conn);
449 director_send_delayed_syncs(dir);
450 return TRUE;
451 }
452
453 static bool
director_args_parse_ip_port(struct director_connection * conn,const char * const * args,struct ip_addr * ip_r,in_port_t * port_r)454 director_args_parse_ip_port(struct director_connection *conn,
455 const char *const *args,
456 struct ip_addr *ip_r, in_port_t *port_r)
457 {
458 if (args[0] == NULL || args[1] == NULL) {
459 director_cmd_error(conn, "Missing IP+port parameters");
460 return FALSE;
461 }
462 if (net_addr2ip(args[0], ip_r) < 0) {
463 director_cmd_error(conn, "Invalid IP address: %s", args[0]);
464 return FALSE;
465 }
466 if (net_str2port(args[1], port_r) < 0) {
467 director_cmd_error(conn, "Invalid port: %s", args[1]);
468 return FALSE;
469 }
470 return TRUE;
471 }
472
director_cmd_me(struct director_connection * conn,const char * const * args)473 static bool director_cmd_me(struct director_connection *conn,
474 const char *const *args)
475 {
476 struct director *dir = conn->dir;
477 const char *connect_str;
478 struct ip_addr ip;
479 in_port_t port;
480 time_t next_comm_attempt;
481
482 if (!director_args_parse_ip_port(conn, args, &ip, &port))
483 return FALSE;
484 if (conn->me_received) {
485 director_cmd_error(conn, "Duplicate ME");
486 return FALSE;
487 }
488
489 if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) ||
490 conn->host->port != port)) {
491 e_error(conn->event,
492 "Remote director thinks it's someone else "
493 "(connected to %s:%u, remote says it's %s:%u)",
494 conn->host->ip_str, conn->host->port,
495 net_ip2addr(&ip), port);
496 return FALSE;
497 }
498 conn->me_received = TRUE;
499 conn->me_received_time = ioloop_timeval;
500
501 if (args[2] != NULL) {
502 time_t remote_time;
503 int diff;
504
505 if (str_to_time(args[2], &remote_time) < 0) {
506 director_cmd_error(conn, "Invalid ME timestamp");
507 return FALSE;
508 }
509 diff = ioloop_time - remote_time;
510 if (diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS ||
511 (diff < 0 && -diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS)) {
512 e_warning(conn->event,
513 "Director %s clock differs from ours by %d secs",
514 conn->name, diff);
515 }
516 }
517
518 timeout_remove(&conn->to_ping);
519 if (conn->in) {
520 conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
521 director_connection_init_timeout, conn);
522 } else {
523 conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS,
524 director_connection_init_timeout, conn);
525 }
526
527 if (!conn->in)
528 return TRUE;
529
530 /* Incoming connection:
531
532 a) we don't have an established ring yet. make sure we're connecting
533 to our right side (which might become our left side).
534
535 b) it's our current "left" connection. the previous connection
536 is most likely dead.
537
538 c) we have an existing ring. tell our current "left" to connect to
539 it with CONNECT command.
540
541 d) the incoming connection doesn't belong to us at all, refer it
542 elsewhere with CONNECT. however, before disconnecting it verify
543 first that our left side is actually still functional.
544 */
545 i_assert(conn->host == NULL);
546 conn->host = director_host_get(dir, &ip, port);
547 /* the host shouldn't be removed at this point, but if for some
548 reason it is we don't want to crash */
549 conn->host->removed = FALSE;
550 director_host_ref(conn->host);
551 /* make sure we don't keep old sequence values across restarts */
552 director_host_restarted(conn->host);
553
554 next_comm_attempt = conn->host->last_protocol_failure +
555 DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS;
556 if (next_comm_attempt > ioloop_time) {
557 /* the director recently sent invalid protocol data,
558 don't try retrying yet */
559 e_error(conn->event,
560 "Remote sent invalid protocol data recently, "
561 "waiting %u secs before allowing further communication",
562 (unsigned int)(next_comm_attempt-ioloop_time));
563 return FALSE;
564 } else if (dir->left == NULL) {
565 /* a) - just in case the left is also our right side reset
566 its failed state, so we can connect to it */
567 conn->host->last_network_failure = 0;
568 if (!director_has_outgoing_connections(dir))
569 director_connect(dir, "Connecting to left");
570 } else if (dir->left->host == conn->host) {
571 /* b) */
572 i_assert(dir->left != conn);
573 director_connection_deinit(&dir->left,
574 "Replacing with new incoming connection");
575 } else if (director_host_cmp_to_self(conn->host, dir->left->host,
576 dir->self_host) < 0) {
577 /* c) */
578 connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
579 conn->host->ip_str,
580 conn->host->port);
581 director_connection_send(dir->left, connect_str);
582 } else {
583 /* d) */
584 dir->left->verifying_left = TRUE;
585 director_connection_ping(dir->left);
586 }
587 return TRUE;
588 }
589
590 static inline bool
user_need_refresh(struct director * dir,struct user * user,time_t timestamp,bool unknown_timestamp)591 user_need_refresh(struct director *dir, struct user *user,
592 time_t timestamp, bool unknown_timestamp)
593 {
594 if (timestamp <= (time_t)user->timestamp) {
595 /* we already have this timestamp */
596 return FALSE;
597 }
598 if (unknown_timestamp) {
599 /* Old director sent USER command without timestamp. We don't
600 know what it is exactly, but we can assume that it's very
601 close to the current time (which timestamp parameter is
602 already set to). However, try to break USER loops here when
603 director ring latency is >1sec, but below skip_recent_secs
604 by just not refreshing the user. */
605 time_t skip_recent_secs =
606 I_MIN(dir->set->director_user_expire/4,
607 DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS);
608 if ((time_t)user->timestamp + skip_recent_secs >= timestamp)
609 return FALSE;
610 }
611 return TRUE;
612 }
613
614 static int
director_user_refresh(struct director_connection * conn,unsigned int username_hash,struct mail_host * host,time_t timestamp,bool weak,bool * forced_r,struct user ** user_r)615 director_user_refresh(struct director_connection *conn,
616 unsigned int username_hash, struct mail_host *host,
617 time_t timestamp, bool weak, bool *forced_r,
618 struct user **user_r)
619 {
620 struct director *dir = conn->dir;
621 struct user *user;
622 bool ret = FALSE, unset_weak_user = FALSE;
623 struct user_directory *users = host->tag->users;
624 bool unknown_timestamp = (timestamp == (time_t)-1);
625
626 *forced_r = FALSE;
627
628 if (unknown_timestamp) {
629 /* Old director version sent USER without timestamp. */
630 timestamp = ioloop_time;
631 }
632
633 if (timestamp + (time_t)dir->set->director_user_expire <= ioloop_time) {
634 /* Ignore this refresh entirely, regardless of whether the
635 user already exists or not. */
636 e_debug(conn->event,
637 "user refresh: %u has expired timestamp %"PRIdTIME_T,
638 username_hash, timestamp);
639 return -1;
640 }
641
642 user = user_directory_lookup(users, username_hash);
643 if (user == NULL) {
644 *user_r = user_directory_add(users, username_hash,
645 host, timestamp);
646 (*user_r)->weak = weak;
647 e_debug(conn->event, "user refresh: %u added", username_hash);
648 return 1;
649 }
650
651 if (user->weak) {
652 if (!weak) {
653 /* removing user's weakness */
654 e_debug(conn->event, "user refresh: %u weakness removed",
655 username_hash);
656 unset_weak_user = TRUE;
657 user->weak = FALSE;
658 ret = TRUE;
659 } else {
660 /* weak user marked again as weak */
661 }
662 } else if (weak &&
663 !user_directory_user_is_recently_updated(users, user)) {
664 /* mark the user as weak */
665 e_debug(conn->event, "user refresh: %u set weak", username_hash);
666 user->weak = TRUE;
667 ret = TRUE;
668 } else if (weak) {
669 e_debug(conn->event,
670 "user refresh: %u weak update to %s ignored, "
671 "we recently changed it to %s",
672 username_hash, host->ip_str,
673 user->host->ip_str);
674 host = user->host;
675 ret = TRUE;
676 } else if (user->host == host) {
677 /* update to the same host */
678 } else if (user_directory_user_is_near_expiring(users, user)) {
679 /* host conflict for a user that is already near expiring. we can
680 assume that the other director had already dropped this user
681 and we should have as well. use the new host. */
682 e_debug(conn->event, "user refresh: %u is nearly expired, "
683 "replacing host %s with %s", username_hash,
684 user->host->ip_str, host->ip_str);
685 ret = TRUE;
686 } else if (USER_IS_BEING_KILLED(user)) {
687 /* user is still being moved - ignore conflicting host updates
688 from other directors who don't yet know about the move. */
689 e_debug(conn->event, "user refresh: %u is being moved, "
690 "preserve its host %s instead of replacing with %s",
691 username_hash, user->host->ip_str, host->ip_str);
692 host = user->host;
693 } else {
694 /* non-weak user received a non-weak update with
695 conflicting host. this shouldn't happen. */
696 string_t *str = t_str_new(128);
697
698 str_printfa(str, "User hash %u "
699 "is being redirected to two hosts: %s and %s",
700 username_hash, user->host->ip_str, host->ip_str);
701 str_printfa(str, " (old_ts=%ld", (long)user->timestamp);
702
703 if (!conn->handshake_received) {
704 str_printfa(str, ",handshaking,recv_ts=%ld",
705 (long)timestamp);
706 }
707 if (USER_IS_BEING_KILLED(user)) {
708 if (user->kill_ctx->to_move != NULL)
709 str_append(str, ",moving");
710 str_printfa(str, ",kill_state=%s",
711 user_kill_state_names[user->kill_ctx->kill_state]);
712 }
713 str_append_c(str, ')');
714 e_error(conn->event, "%s", str_c(str));
715
716 /* we want all the directors to redirect the user to same
717 server, but we don't want two directors fighting over which
718 server it belongs to, so always use the lower IP address */
719 if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
720 /* change the host. we'll also need to remove the user
721 from the old host's user_count, because we can't
722 keep track of the user for more than one host.
723
724 send the updated USER back to the sender as well. */
725 *forced_r = TRUE;
726 } else {
727 /* keep the host */
728 host = user->host;
729 }
730 /* especially IMAP connections can take a long time to die.
731 make sure we kill off the connections in the wrong
732 backends. */
733 director_kick_user_hash(dir, dir->self_host, NULL,
734 username_hash, &host->ip);
735 ret = TRUE;
736 }
737 if (user->host != host) {
738 user->host->user_count--;
739 user->host = host;
740 user->host->user_count++;
741 ret = TRUE;
742 }
743 /* Update user's timestamp if it's higher than the current one. Note
744 that we'll preserve the original timestamp. This is important when
745 the director ring is slow and a single USER can traverse through
746 the ring more than a second. We don't want to get into a loop where
747 the same USER goes through the ring forever. */
748 if (user_need_refresh(dir, user, timestamp, unknown_timestamp)) {
749 /* NOTE: This makes the users list somewhat out-of-order.
750 It's not a big problem - most likely it's only a few seconds
751 difference. The worst that can happen is that some users
752 take up memory that should have been freed already. */
753 e_debug(conn->event, "user refresh: %u refreshed timestamp from %u to %"PRIdTIME_T,
754 username_hash, user->timestamp, timestamp);
755 user_directory_refresh(users, user);
756 user->timestamp = timestamp;
757 ret = TRUE;
758 } else {
759 e_debug(conn->event, "user refresh: %u ignored timestamp %"PRIdTIME_T" (we have %u)",
760 username_hash, timestamp, user->timestamp);
761 }
762
763 if (unset_weak_user) {
764 /* user is no longer weak. handle pending requests for
765 this user if there are any */
766 director_set_state_changed(conn->dir);
767 }
768
769 *user_r = user;
770 return ret ? 1 : 0;
771 }
772
773 static bool
director_handshake_cmd_user(struct director_connection * conn,const char * const * args)774 director_handshake_cmd_user(struct director_connection *conn,
775 const char *const *args)
776 {
777 unsigned int username_hash, timestamp;
778 struct ip_addr ip;
779 struct mail_host *host;
780 struct user *user;
781 bool weak, forced;
782
783 if (str_array_length(args) < 3 ||
784 str_to_uint(args[0], &username_hash) < 0 ||
785 net_addr2ip(args[1], &ip) < 0 ||
786 str_to_uint(args[2], ×tamp) < 0) {
787 director_cmd_error(conn, "Invalid parameters");
788 return FALSE;
789 }
790 weak = args[3] != NULL && args[3][0] == 'w';
791 conn->handshake_users_received++;
792
793 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
794 if (host == NULL) {
795 e_error(conn->event, "USER used unknown host %s in handshake",
796 args[1]);
797 return FALSE;
798 }
799
800 if ((time_t)timestamp > ioloop_time) {
801 /* The other director's clock seems to be into the future
802 compared to us. Don't set any of our users' timestamps into
803 future though. It's most likely only 1 second difference. */
804 timestamp = ioloop_time;
805 }
806 conn->dir->num_incoming_requests++;
807 if (director_user_refresh(conn, username_hash, host,
808 timestamp, weak, &forced, &user) < 0) {
809 /* user expired - ignore */
810 return TRUE;
811 }
812 /* Possibilities:
813
814 a) The user didn't exist yet, and it was added with the given
815 timestamp.
816
817 b) The user existed, but with an older timestamp. The timestamp
818 wasn't yet updated, so do it here below.
819
820 c) The user existed with a newer timestamp. This is either because
821 we already received a non-handshake USER update for this user, or
822 our director saw a login for this user. Ignore this update.
823
824 (We never want to change the user's timestamp to be older, because
825 that could result in directors going to a loop fighting each others
826 over a flipping timestamp.) */
827 if (user->timestamp < timestamp)
828 user->timestamp = timestamp;
829 /* always sort users after handshaking to make sure the order
830 is correct */
831 conn->users_unsorted = TRUE;
832 return TRUE;
833 }
834
835 static bool
director_cmd_user(struct director_connection * conn,const char * const * args)836 director_cmd_user(struct director_connection *conn,
837 const char *const *args)
838 {
839 unsigned int username_hash;
840 struct ip_addr ip;
841 struct mail_host *host;
842 struct user *user;
843 bool forced;
844 time_t timestamp = (time_t)-1;
845
846 if (str_array_length(args) < 2 ||
847 str_to_uint(args[0], &username_hash) < 0 ||
848 net_addr2ip(args[1], &ip) < 0 ||
849 (args[2] != NULL && str_to_time(args[2], ×tamp) < 0)) {
850 director_cmd_error(conn, "Invalid parameters");
851 return FALSE;
852 }
853
854 /* could this before it's potentially ignored */
855 conn->dir->num_incoming_requests++;
856
857 conn->users_received++;
858 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
859 if (host == NULL) {
860 /* we probably just removed this host. */
861 return TRUE;
862 }
863
864 if (director_user_refresh(conn, username_hash,
865 host, timestamp, FALSE, &forced, &user) > 0) {
866 /* user changed - forward the USER in the ring */
867 struct director_host *src_host =
868 forced ? conn->dir->self_host : conn->host;
869 i_assert(!user->weak);
870 director_update_user(conn->dir, src_host, user);
871 }
872 return TRUE;
873 }
874
director_cmd_director(struct director_connection * conn,const char * const * args)875 static bool director_cmd_director(struct director_connection *conn,
876 const char *const *args)
877 {
878 struct director_host *host;
879 struct ip_addr ip;
880 in_port_t port;
881 bool log_add = FALSE;
882
883 if (!director_args_parse_ip_port(conn, args, &ip, &port))
884 return FALSE;
885
886 host = director_host_lookup(conn->dir, &ip, port);
887 if (host != NULL) {
888 if (host == conn->dir->self_host) {
889 /* ignore updates to ourself */
890 return TRUE;
891 }
892 if (host->removed) {
893 /* ignore re-adds of removed directors */
894 return TRUE;
895 }
896
897 /* already have this. just reset its last_network_failure
898 timestamp, since it might be up now, but only if this
899 isn't part of the handshake. (if it was, reseting the
900 timestamp could cause us to rapidly keep trying to connect
901 to it) */
902 if (conn->handshake_received)
903 host->last_network_failure = 0;
904 /* it also may have been restarted, reset its state */
905 director_host_restarted(host);
906 } else {
907 /* save the director and forward it */
908 host = director_host_add(conn->dir, &ip, port);
909 log_add = TRUE;
910 }
911 /* just forward this to the entire ring until it reaches back to
912 itself. some hosts may see this twice, but that's the only way to
913 guarantee that it gets seen by everyone. resetting the host multiple
914 times may cause us to handle its commands multiple times, but the
915 commands can handle that. however, we need to also handle a
916 situation where the added director never comes back - we don't want
917 to send the director information in a loop forever. */
918 if (conn->dir->right != NULL &&
919 director_host_cmp_to_self(host, conn->dir->right->host,
920 conn->dir->self_host) > 0) {
921 e_debug(conn->event,
922 "Received DIRECTOR update for a host where we should be connected to. "
923 "Not forwarding it since it's probably crashed.");
924 } else {
925 director_notify_ring_added(host,
926 director_connection_get_host(conn), log_add);
927 }
928 return TRUE;
929 }
930
director_cmd_director_remove(struct director_connection * conn,const char * const * args)931 static bool director_cmd_director_remove(struct director_connection *conn,
932 const char *const *args)
933 {
934 struct director_host *host;
935 struct ip_addr ip;
936 in_port_t port;
937
938 if (!director_args_parse_ip_port(conn, args, &ip, &port))
939 return FALSE;
940
941 host = director_host_lookup(conn->dir, &ip, port);
942 if (host != NULL && !host->removed)
943 director_ring_remove(host, director_connection_get_host(conn));
944 return TRUE;
945 }
946
947 static bool
director_cmd_host_hand_start(struct director_connection * conn,const char * const * args)948 director_cmd_host_hand_start(struct director_connection *conn,
949 const char *const *args)
950 {
951 const ARRAY_TYPE(mail_host) *hosts;
952 struct mail_host *const *hostp;
953 unsigned int remote_ring_completed;
954
955 if (args[0] == NULL ||
956 str_to_uint(args[0], &remote_ring_completed) < 0) {
957 director_cmd_error(conn, "Invalid parameters");
958 return FALSE;
959 }
960
961 if (remote_ring_completed != 0 && !conn->dir->ring_handshaked) {
962 /* clear everything we have and use only what remote sends us */
963 e_debug(conn->event, "We're joining a ring - replace all hosts");
964 hosts = mail_hosts_get(conn->dir->mail_hosts);
965 while (array_count(hosts) > 0) {
966 hostp = array_front(hosts);
967 director_remove_host(conn->dir, NULL, NULL, *hostp);
968 }
969 } else if (remote_ring_completed == 0 && conn->dir->ring_handshaked) {
970 /* ignore whatever remote sends */
971 e_debug(conn->event, "Remote is joining our ring - "
972 "ignore all remote HOSTs");
973 conn->ignore_host_events = TRUE;
974 } else {
975 e_debug(conn->event, "Merge rings' hosts");
976 }
977 conn->handshake_sending_hosts = TRUE;
978 return TRUE;
979 }
980
981 static int
director_cmd_is_seen_full(struct director_connection * conn,const char * const ** _args,unsigned int * seq_r,struct director_host ** host_r)982 director_cmd_is_seen_full(struct director_connection *conn,
983 const char *const **_args, unsigned int *seq_r,
984 struct director_host **host_r)
985 {
986 const char *const *args = *_args;
987 struct ip_addr ip;
988 in_port_t port;
989 unsigned int seq;
990 struct director_host *host;
991
992 if (str_array_length(args) < 3 ||
993 net_addr2ip(args[0], &ip) < 0 ||
994 net_str2port(args[1], &port) < 0 ||
995 str_to_uint(args[2], &seq) < 0) {
996 director_cmd_error(conn, "Invalid parameters");
997 return -1;
998 }
999 *_args = args + 3;
1000 *seq_r = seq;
1001
1002 host = director_host_lookup(conn->dir, &ip, port);
1003 if (host == NULL || host->removed) {
1004 /* director is already gone, but we can't be sure if this
1005 command was sent everywhere. re-send it as if it was from
1006 ourself. */
1007 *host_r = NULL;
1008 } else {
1009 *host_r = host;
1010 if (seq <= host->last_seq) {
1011 /* already seen this */
1012 return 1;
1013 }
1014 host->last_seq = seq;
1015 }
1016 return 0;
1017 }
1018
1019 static int
director_cmd_is_seen(struct director_connection * conn,const char * const ** _args,struct director_host ** host_r)1020 director_cmd_is_seen(struct director_connection *conn,
1021 const char *const **_args,
1022 struct director_host **host_r)
1023 {
1024 unsigned int seq;
1025
1026 return director_cmd_is_seen_full(conn, _args, &seq, host_r);
1027 }
1028
1029 static bool
director_cmd_user_weak(struct director_connection * conn,const char * const * args)1030 director_cmd_user_weak(struct director_connection *conn,
1031 const char *const *args)
1032 {
1033 struct director_host *dir_host;
1034 struct ip_addr ip;
1035 unsigned int username_hash;
1036 struct mail_host *host;
1037 struct user *user;
1038 struct director_host *src_host = conn->host;
1039 bool weak = TRUE, weak_forward = FALSE, forced;
1040 int ret;
1041
1042 /* note that unlike other commands we don't want to just ignore
1043 duplicate commands */
1044 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0)
1045 return FALSE;
1046
1047 /* could this before it's potentially ignored */
1048 conn->dir->num_incoming_requests++;
1049
1050 if (str_array_length(args) != 2 ||
1051 str_to_uint(args[0], &username_hash) < 0 ||
1052 net_addr2ip(args[1], &ip) < 0) {
1053 director_cmd_error(conn, "Invalid parameters");
1054 return FALSE;
1055 }
1056
1057 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1058 if (host == NULL) {
1059 /* we probably just removed this host. */
1060 return TRUE;
1061 }
1062
1063 if (ret == 0) {
1064 /* First time we're seeing this - forward it to others also.
1065 We'll want to do it even if the user was already marked as
1066 weak, because otherwise if two directors mark the user weak
1067 at the same time both the USER-WEAK notifications reach
1068 only half the directors until they collide and neither one
1069 finishes going through the whole ring marking the user
1070 non-weak. */
1071 weak_forward = TRUE;
1072 } else if (dir_host == conn->dir->self_host) {
1073 /* We originated this USER-WEAK request. The entire ring has seen
1074 it and there weren't any conflicts. Make the user non-weak. */
1075 e_debug(conn->event,
1076 "user refresh: %u Our USER-WEAK seen by the entire ring",
1077 username_hash);
1078 src_host = conn->dir->self_host;
1079 weak = FALSE;
1080 } else {
1081 /* The original USER-WEAK sender will send a new non-weak USER
1082 update saying what really happened. We'll still need to forward
1083 this around the ring to the origin so it also knows it has
1084 travelled through the ring. */
1085 e_debug(conn->event,
1086 "user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring",
1087 username_hash, dir_host->ip_str);
1088 weak_forward = TRUE;
1089 }
1090
1091 ret = director_user_refresh(conn, username_hash,
1092 host, ioloop_time, weak, &forced, &user);
1093 /* user is refreshed with ioloop_time, it can't be expired already */
1094 i_assert(ret >= 0);
1095 if (ret > 0 || weak_forward) {
1096 /* user changed, or we've decided that we need to forward
1097 the weakness notification to the rest of the ring even
1098 though we already knew it. */
1099 if (forced)
1100 src_host = conn->dir->self_host;
1101 if (!user->weak)
1102 director_update_user(conn->dir, src_host, user);
1103 else {
1104 director_update_user_weak(conn->dir, src_host, conn,
1105 dir_host, user);
1106 }
1107 }
1108 return TRUE;
1109 }
1110
1111 static bool ATTR_NULL(3)
director_cmd_host_int(struct director_connection * conn,const char * const * args,struct director_host * dir_host)1112 director_cmd_host_int(struct director_connection *conn, const char *const *args,
1113 struct director_host *dir_host)
1114 {
1115 struct director_host *src_host = conn->host;
1116 struct mail_host *host;
1117 struct ip_addr ip;
1118 const char *tag = "", *host_tag, *hostname = NULL;
1119 unsigned int arg_count, vhost_count;
1120 bool update, down = FALSE;
1121 time_t last_updown_change = 0;
1122
1123 arg_count = str_array_length(args);
1124 if (arg_count < 2 ||
1125 net_addr2ip(args[0], &ip) < 0 ||
1126 str_to_uint(args[1], &vhost_count) < 0) {
1127 director_cmd_error(conn, "Invalid parameters");
1128 return FALSE;
1129 }
1130 if (arg_count >= 3)
1131 tag = args[2];
1132 if (arg_count >= 4) {
1133 if ((args[3][0] != 'D' && args[3][0] != 'U') ||
1134 str_to_time(args[3]+1, &last_updown_change) < 0) {
1135 director_cmd_error(conn, "Invalid updown parameters");
1136 return FALSE;
1137 }
1138 down = args[3][0] == 'D';
1139 }
1140 if (arg_count >= 5)
1141 hostname = args[4];
1142 if (conn->ignore_host_events) {
1143 /* remote is sending hosts in a handshake, but it doesn't have
1144 a completed ring and we do. */
1145 i_assert(conn->handshake_sending_hosts);
1146 return TRUE;
1147 }
1148 if (tag[0] != '\0' && conn->minor_version < DIRECTOR_VERSION_TAGS_V2) {
1149 director_cmd_error(conn, "Received a host tag from older director version with incompatible tagging support");
1150 return FALSE;
1151 }
1152
1153 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1154 if (host == NULL) {
1155 host = mail_host_add_hostname(conn->dir->mail_hosts,
1156 hostname, &ip, tag);
1157 update = TRUE;
1158 } else {
1159 update = host->vhost_count != vhost_count ||
1160 host->down != down;
1161
1162 host_tag = mail_host_get_tag(host);
1163 if (strcmp(tag, host_tag) != 0) {
1164 e_error(conn->event,
1165 "Host %s changed tag from '%s' to '%s'",
1166 host->ip_str,
1167 host_tag, tag);
1168 mail_host_set_tag(host, tag);
1169 update = TRUE;
1170 }
1171 if (update && host->desynced) {
1172 string_t *str = t_str_new(128);
1173
1174 str_printfa(str, "Host %s is being updated before previous update had finished (",
1175 host->ip_str);
1176 if (host->down != down &&
1177 host->last_updown_change > last_updown_change) {
1178 /* our host has a newer change. preserve it. */
1179 down = host->down;
1180 }
1181 if (host->down != down) {
1182 if (host->down)
1183 str_append(str, "down -> up");
1184 else
1185 str_append(str, "up -> down");
1186 }
1187 if (host->vhost_count != vhost_count) {
1188 if (host->down != down)
1189 str_append(str, ", ");
1190 str_printfa(str, "vhosts %u -> %u",
1191 host->vhost_count, vhost_count);
1192 }
1193 str_append(str, ") - ");
1194
1195 vhost_count = I_MIN(vhost_count, host->vhost_count);
1196 str_printfa(str, "setting to state=%s vhosts=%u",
1197 down ? "down" : "up", vhost_count);
1198 e_warning(conn->event, "%s", str_c(str));
1199 /* make the change appear to come from us, so it
1200 reaches the full ring */
1201 dir_host = NULL;
1202 src_host = conn->dir->self_host;
1203 }
1204 if (update) {
1205 /* Make sure the host's timestamp never shrinks.
1206 Otherwise we might get into a loop where the up/down
1207 state keeps switching. */
1208 last_updown_change = I_MAX(last_updown_change,
1209 host->last_updown_change);
1210 }
1211 }
1212
1213 if (update) {
1214 const char *log_prefix = t_strdup_printf("director(%s): ",
1215 conn->name);
1216 mail_host_set_down(host, down, last_updown_change, log_prefix);
1217 mail_host_set_vhost_count(host, vhost_count, log_prefix);
1218 director_update_host(conn->dir, src_host, dir_host, host);
1219 } else {
1220 e_debug(conn->event,
1221 "Ignoring host %s update vhost_count=%u "
1222 "down=%d last_updown_change=%ld (hosts_hash=%u)",
1223 net_ip2addr(&ip), vhost_count, down ? 1 : 0,
1224 (long)last_updown_change,
1225 mail_hosts_hash(conn->dir->mail_hosts));
1226 }
1227 return TRUE;
1228 }
1229
1230 static bool
director_cmd_host_handshake(struct director_connection * conn,const char * const * args)1231 director_cmd_host_handshake(struct director_connection *conn,
1232 const char *const *args)
1233 {
1234 return director_cmd_host_int(conn, args, NULL);
1235 }
1236
1237 static bool
director_cmd_host(struct director_connection * conn,const char * const * args)1238 director_cmd_host(struct director_connection *conn, const char *const *args)
1239 {
1240 struct director_host *dir_host;
1241 int ret;
1242
1243 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1244 return ret > 0;
1245 return director_cmd_host_int(conn, args, dir_host);
1246 }
1247
1248 static bool
director_cmd_host_remove(struct director_connection * conn,const char * const * args)1249 director_cmd_host_remove(struct director_connection *conn,
1250 const char *const *args)
1251 {
1252 struct director_host *dir_host;
1253 struct mail_host *host;
1254 struct ip_addr ip;
1255 int ret;
1256
1257 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1258 return ret > 0;
1259
1260 if (str_array_length(args) != 1 ||
1261 net_addr2ip(args[0], &ip) < 0) {
1262 director_cmd_error(conn, "Invalid parameters");
1263 return FALSE;
1264 }
1265
1266 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1267 if (host != NULL)
1268 director_remove_host(conn->dir, conn->host, dir_host, host);
1269 return TRUE;
1270 }
1271
1272 static bool
director_cmd_host_flush(struct director_connection * conn,const char * const * args)1273 director_cmd_host_flush(struct director_connection *conn,
1274 const char *const *args)
1275 {
1276 struct director_host *dir_host;
1277 struct mail_host *host;
1278 struct ip_addr ip;
1279 int ret;
1280
1281 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1282 return ret > 0;
1283
1284 if (str_array_length(args) != 1 ||
1285 net_addr2ip(args[0], &ip) < 0) {
1286 director_cmd_error(conn, "Invalid parameters");
1287 return FALSE;
1288 }
1289
1290 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1291 if (host != NULL)
1292 director_flush_host(conn->dir, conn->host, dir_host, host);
1293 return TRUE;
1294 }
1295
1296 static bool
director_cmd_user_move(struct director_connection * conn,const char * const * args)1297 director_cmd_user_move(struct director_connection *conn,
1298 const char *const *args)
1299 {
1300 struct director_host *dir_host;
1301 struct mail_host *host;
1302 struct ip_addr ip;
1303 unsigned int username_hash;
1304 int ret;
1305
1306 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1307 return ret > 0;
1308
1309 if (str_array_length(args) != 2 ||
1310 str_to_uint(args[0], &username_hash) < 0 ||
1311 net_addr2ip(args[1], &ip) < 0) {
1312 director_cmd_error(conn, "Invalid parameters");
1313 return FALSE;
1314 }
1315
1316 host = mail_host_lookup(conn->dir->mail_hosts, &ip);
1317 if (host != NULL) {
1318 director_move_user(conn->dir, conn->host, dir_host,
1319 username_hash, host);
1320 }
1321 return TRUE;
1322 }
1323
1324 static bool
director_cmd_user_kick(struct director_connection * conn,const char * const * args)1325 director_cmd_user_kick(struct director_connection *conn,
1326 const char *const *args)
1327 {
1328 struct director_host *dir_host;
1329 int ret;
1330
1331 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1332 return ret > 0;
1333
1334 if (str_array_length(args) != 1) {
1335 director_cmd_error(conn, "Invalid parameters");
1336 return FALSE;
1337 }
1338
1339 director_kick_user(conn->dir, conn->host, dir_host, args[0]);
1340 return TRUE;
1341 }
1342
1343 static bool
director_cmd_user_kick_alt(struct director_connection * conn,const char * const * args)1344 director_cmd_user_kick_alt(struct director_connection *conn,
1345 const char *const *args)
1346 {
1347 struct director_host *dir_host;
1348 int ret;
1349
1350 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1351 return ret > 0;
1352
1353 if (str_array_length(args) != 2) {
1354 director_cmd_error(conn, "Invalid parameters");
1355 return FALSE;
1356 }
1357
1358 director_kick_user_alt(conn->dir, conn->host, dir_host, args[0], args[1]);
1359 return TRUE;
1360 }
1361
1362 static bool
director_cmd_user_kick_hash(struct director_connection * conn,const char * const * args)1363 director_cmd_user_kick_hash(struct director_connection *conn,
1364 const char *const *args)
1365 {
1366 struct director_host *dir_host;
1367 unsigned int username_hash;
1368 struct ip_addr except_ip;
1369 int ret;
1370
1371 if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
1372 return ret > 0;
1373
1374 if (str_array_length(args) != 2 ||
1375 str_to_uint(args[0], &username_hash) < 0 ||
1376 net_addr2ip(args[1], &except_ip) < 0) {
1377 director_cmd_error(conn, "Invalid parameters");
1378 return FALSE;
1379 }
1380
1381 director_kick_user_hash(conn->dir, conn->host, dir_host,
1382 username_hash, &except_ip);
1383 return TRUE;
1384 }
1385
1386 static bool
director_cmd_user_killed(struct director_connection * conn,const char * const * args)1387 director_cmd_user_killed(struct director_connection *conn,
1388 const char *const *args)
1389 {
1390 unsigned int username_hash;
1391
1392 if (str_array_length(args) != 1 ||
1393 str_to_uint(args[0], &username_hash) < 0) {
1394 director_cmd_error(conn, "Invalid parameters");
1395 return FALSE;
1396 }
1397
1398 director_user_killed(conn->dir, username_hash);
1399 return TRUE;
1400 }
1401
1402 static bool
director_cmd_user_killed_everywhere(struct director_connection * conn,const char * const * args)1403 director_cmd_user_killed_everywhere(struct director_connection *conn,
1404 const char *const *args)
1405 {
1406 struct director_host *dir_host;
1407 unsigned int seq, username_hash;
1408 int ret;
1409
1410 if ((ret = director_cmd_is_seen_full(conn, &args, &seq, &dir_host)) < 0)
1411 return FALSE;
1412
1413 if (str_array_length(args) != 1 ||
1414 str_to_uint(args[0], &username_hash) < 0) {
1415 director_cmd_error(conn, "Invalid parameters");
1416 return FALSE;
1417 }
1418
1419 if (ret > 0) {
1420 i_assert(dir_host != NULL);
1421 e_debug(conn->event,
1422 "User %u - ignoring already seen USER-KILLED-EVERYWHERE "
1423 "with seq=%u <= %s.last_seq=%u", username_hash,
1424 seq, dir_host->name, dir_host->last_seq);
1425 return TRUE;
1426 }
1427
1428 director_user_killed_everywhere(conn->dir, conn->host,
1429 dir_host, username_hash);
1430 return TRUE;
1431 }
1432
director_handshake_cmd_done(struct director_connection * conn)1433 static bool director_handshake_cmd_done(struct director_connection *conn)
1434 {
1435 struct director *dir = conn->dir;
1436 int handshake_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->connected_time);
1437 string_t *str;
1438
1439 if (conn->users_unsorted && conn->user_iter == NULL) {
1440 /* we sent our user list before receiving remote's */
1441 conn->users_unsorted = FALSE;
1442 mail_hosts_sort_users(conn->dir->mail_hosts);
1443 }
1444
1445 str = t_str_new(128);
1446 str_printfa(str, "Handshake finished in %u.%03u secs (",
1447 handshake_msecs/1000, handshake_msecs%1000);
1448 director_connection_append_stats(conn, str);
1449 str_append_c(str, ')');
1450 if (handshake_msecs >= DIRECTOR_HANDSHAKE_WARN_SECS*1000)
1451 e_warning(conn->event, "%s", str_c(str));
1452 else
1453 e_info(conn->event, "%s", str_c(str));
1454
1455 /* the host is up now, make sure we can connect to it immediately
1456 if needed */
1457 conn->host->last_network_failure = 0;
1458
1459 conn->handshake_received = TRUE;
1460 if (conn->in) {
1461 /* handshaked to left side. tell it we've received the
1462 whole handshake. */
1463 director_connection_send(conn, "DONE\n");
1464
1465 /* tell the "right" director about the "left" one */
1466 director_update_send(dir, director_connection_get_host(conn),
1467 t_strdup_printf("DIRECTOR\t%s\t%u\n",
1468 conn->host->ip_str,
1469 conn->host->port));
1470 /* this is our "left" side. */
1471 return director_connection_assign_left(conn);
1472 } else {
1473 /* handshaked to "right" side. */
1474 return director_connection_assign_right(conn);
1475 }
1476 }
1477
1478 static int
director_handshake_cmd_options(struct director_connection * conn,const char * const * args)1479 director_handshake_cmd_options(struct director_connection *conn,
1480 const char *const *args)
1481 {
1482 bool consistent_hashing = FALSE;
1483 unsigned int i;
1484
1485 for (i = 0; args[i] != NULL; i++) {
1486 if (strcmp(args[i], DIRECTOR_OPT_CONSISTENT_HASHING) == 0)
1487 consistent_hashing = TRUE;
1488 }
1489 if (!consistent_hashing) {
1490 e_error(conn->event, "director_consistent_hashing settings "
1491 "differ between directors. Set "
1492 "director_consistent_hashing=yes on old directors");
1493 return -1;
1494 }
1495 return 1;
1496 }
1497
1498 static int
director_connection_handle_handshake(struct director_connection * conn,const char * cmd,const char * const * args)1499 director_connection_handle_handshake(struct director_connection *conn,
1500 const char *cmd, const char *const *args)
1501 {
1502 unsigned int major_version;
1503
1504 /* both incoming and outgoing connections get VERSION and ME */
1505 if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
1506 if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
1507 e_error(conn->event, "Wrong protocol in socket "
1508 "(%s vs %s)",
1509 args[0], DIRECTOR_VERSION_NAME);
1510 return -1;
1511 } else if (str_to_uint(args[1], &major_version) < 0 ||
1512 str_to_uint(args[2], &conn->minor_version) < 0) {
1513 e_error(conn->event, "Invalid protocol version: "
1514 "%s.%s", args[1], args[2]);
1515 return -1;
1516 } else if (major_version != DIRECTOR_VERSION_MAJOR) {
1517 e_error(conn->event, "Incompatible protocol version: "
1518 "%u vs %u", major_version,
1519 DIRECTOR_VERSION_MAJOR);
1520 return -1;
1521 }
1522 if (conn->minor_version < DIRECTOR_VERSION_TAGS_V2 &&
1523 mail_hosts_have_tags(conn->dir->mail_hosts)) {
1524 e_error(conn->event, "Director version supports incompatible tags");
1525 return -1;
1526 }
1527 conn->version_received = TRUE;
1528 director_finish_sending_handshake(conn);
1529 return 1;
1530 }
1531 if (!conn->version_received) {
1532 director_cmd_error(conn, "Incompatible protocol");
1533 return -1;
1534 }
1535
1536 if (strcmp(cmd, "ME") == 0)
1537 return director_cmd_me(conn, args) ? 1 : -1;
1538 if (!conn->me_received) {
1539 director_cmd_error(conn, "Expecting ME command first");
1540 return -1;
1541 }
1542
1543 /* incoming connections get a HOST list */
1544 if (conn->handshake_sending_hosts) {
1545 if (strcmp(cmd, "HOST") == 0)
1546 return director_cmd_host_handshake(conn, args) ? 1 : -1;
1547 if (strcmp(cmd, "HOST-HAND-END") == 0) {
1548 conn->ignore_host_events = FALSE;
1549 conn->handshake_sending_hosts = FALSE;
1550 return 1;
1551 }
1552 director_cmd_error(conn, "Unexpected command during host list");
1553 return -1;
1554 }
1555 if (strcmp(cmd, "OPTIONS") == 0)
1556 return director_handshake_cmd_options(conn, args);
1557 if (strcmp(cmd, "HOST-HAND-START") == 0) {
1558 if (!conn->in) {
1559 director_cmd_error(conn,
1560 "Host list is only for incoming connections");
1561 return -1;
1562 }
1563 return director_cmd_host_hand_start(conn, args) ? 1 : -1;
1564 }
1565
1566 if (conn->in &&
1567 (strcmp(cmd, "U") == 0 ||
1568 (strcmp(cmd, "USER") == 0 &&
1569 CMD_IS_USER_HANDSHAKE(conn->minor_version, args))))
1570 return director_handshake_cmd_user(conn, args) ? 1 : -1;
1571
1572 /* both get DONE */
1573 if (strcmp(cmd, "DONE") == 0)
1574 return director_handshake_cmd_done(conn) ? 1 : -1;
1575 return 0;
1576 }
1577
1578 static bool
director_connection_sync_host(struct director_connection * conn,struct director_host * host,uint32_t seq,unsigned int minor_version,unsigned int timestamp,unsigned int hosts_hash)1579 director_connection_sync_host(struct director_connection *conn,
1580 struct director_host *host,
1581 uint32_t seq, unsigned int minor_version,
1582 unsigned int timestamp, unsigned int hosts_hash)
1583 {
1584 struct director *dir = conn->dir;
1585
1586 if (minor_version > DIRECTOR_VERSION_MINOR) {
1587 /* we're not up to date */
1588 minor_version = DIRECTOR_VERSION_MINOR;
1589 }
1590
1591 if (host->self) {
1592 if (dir->sync_seq != seq) {
1593 /* stale SYNC event */
1594 return FALSE;
1595 }
1596 /* sync_seq increases when we get disconnected, so we must be
1597 successfully connected to both directions */
1598 i_assert(dir->left != NULL && dir->right != NULL);
1599
1600 if (hosts_hash != 0 &&
1601 hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) {
1602 e_error(conn->event, "Hosts unexpectedly changed during SYNC reply - resending"
1603 "(seq=%u, old hosts_hash=%u, new hosts_hash=%u)",
1604 seq, hosts_hash,
1605 mail_hosts_hash(dir->mail_hosts));
1606 (void)director_resend_sync(dir);
1607 return FALSE;
1608 }
1609
1610 dir->ring_min_version = minor_version;
1611 if (!dir->ring_handshaked) {
1612 /* the ring is handshaked */
1613 director_set_ring_handshaked(dir);
1614 } else if (dir->ring_synced) {
1615 /* duplicate SYNC (which was sent just in case the
1616 previous one got lost) */
1617 } else {
1618 e_debug(conn->event, "Ring is synced (%s sent seq=%u, hosts_hash=%u)",
1619 conn->name, seq,
1620 mail_hosts_hash(dir->mail_hosts));
1621 int sync_msecs =
1622 timeval_diff_msecs(&ioloop_timeval, &dir->last_sync_start_time);
1623 if (sync_msecs >= 0)
1624 dir->last_sync_msecs = sync_msecs;
1625 director_set_ring_synced(dir);
1626 }
1627 } else {
1628 if (seq < host->last_sync_seq &&
1629 timestamp < host->last_sync_timestamp +
1630 DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS) {
1631 /* stale SYNC event */
1632 e_debug(conn->event, "Ignore stale SYNC event for %s "
1633 "(seq %u < %u, timestamp=%u)",
1634 host->name, seq, host->last_sync_seq,
1635 timestamp);
1636 return FALSE;
1637 } else if (seq < host->last_sync_seq) {
1638 e_warning(conn->event,
1639 "Last SYNC seq for %s appears to be stale, resetting "
1640 "(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
1641 host->name, host->last_sync_seq,
1642 host->last_sync_timestamp, seq, timestamp);
1643 host->last_sync_seq = seq;
1644 host->last_sync_timestamp = timestamp;
1645 host->last_sync_seq_counter = 1;
1646 } else if (seq > host->last_sync_seq ||
1647 timestamp > host->last_sync_timestamp) {
1648 host->last_sync_seq = seq;
1649 host->last_sync_timestamp = timestamp;
1650 host->last_sync_seq_counter = 1;
1651 e_debug(conn->event, "Update SYNC for %s "
1652 "(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)",
1653 host->name, host->last_sync_seq,
1654 host->last_sync_timestamp, seq, timestamp);
1655 } else if (++host->last_sync_seq_counter >
1656 DIRECTOR_MAX_SYNC_SEQ_DUPLICATES) {
1657 /* we've received this too many times already */
1658 e_debug(conn->event, "Ignore duplicate #%u SYNC event for %s "
1659 "(seq=%u, timestamp %u <= %u)",
1660 host->last_sync_seq_counter, host->name, seq,
1661 timestamp, host->last_sync_timestamp);
1662 return FALSE;
1663 }
1664
1665 if (hosts_hash != 0 &&
1666 hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) {
1667 if (host->desynced_hosts_hash != hosts_hash) {
1668 e_debug(conn->event, "Ignore director %s stale SYNC request whose hosts don't match us "
1669 "(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
1670 host->ip_str, seq, hosts_hash,
1671 mail_hosts_hash(dir->mail_hosts));
1672 host->desynced_hosts_hash = hosts_hash;
1673 return FALSE;
1674 }
1675 /* we'll get here only if we received a SYNC twice
1676 with the same wrong hosts_hash. FIXME: this gets
1677 triggered unnecessarily sometimes if hosts are
1678 changing rapidly. */
1679 e_error(conn->event, "Director %s SYNC request hosts don't match us - resending hosts "
1680 "(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)",
1681 host->ip_str, seq,
1682 hosts_hash, mail_hosts_hash(dir->mail_hosts));
1683 director_resend_hosts(dir);
1684 return FALSE;
1685 }
1686 host->desynced_hosts_hash = 0;
1687 if (dir->right != NULL) {
1688 /* forward it to the connection on right */
1689 director_sync_send(dir, host, seq, minor_version,
1690 timestamp, hosts_hash);
1691 } else {
1692 e_debug(conn->event, "We have no right connection - "
1693 "delay replying to SYNC until finished");
1694 host->delayed_sync_seq = seq;
1695 host->delayed_sync_minor_version = minor_version;
1696 host->delayed_sync_timestamp = timestamp;
1697 host->delayed_sync_hosts_hash = hosts_hash;
1698 }
1699 }
1700 return TRUE;
1701 }
1702
director_connection_sync(struct director_connection * conn,const char * const * args)1703 static bool director_connection_sync(struct director_connection *conn,
1704 const char *const *args)
1705 {
1706 struct director *dir = conn->dir;
1707 struct director_host *host;
1708 struct ip_addr ip;
1709 in_port_t port;
1710 unsigned int arg_count, seq, minor_version = 0, timestamp = ioloop_time;
1711 unsigned int hosts_hash = 0;
1712
1713 arg_count = str_array_length(args);
1714 if (arg_count < 3 ||
1715 !director_args_parse_ip_port(conn, args, &ip, &port) ||
1716 str_to_uint(args[2], &seq) < 0) {
1717 director_cmd_error(conn, "Invalid parameters");
1718 return FALSE;
1719 }
1720 if (arg_count >= 4 && str_to_uint(args[3], &minor_version) < 0) {
1721 director_cmd_error(conn, "Invalid parameters");
1722 return FALSE;
1723 }
1724 if (arg_count >= 5 && str_to_uint(args[4], ×tamp) < 0) {
1725 director_cmd_error(conn, "Invalid parameters");
1726 return FALSE;
1727 }
1728 if (arg_count >= 6 && str_to_uint(args[5], &hosts_hash) < 0) {
1729 director_cmd_error(conn, "Invalid parameters");
1730 return FALSE;
1731 }
1732
1733 /* find the originating director. if we don't see it, it was already
1734 removed and we can ignore this sync. */
1735 host = director_host_lookup(dir, &ip, port);
1736 if (host != NULL) {
1737 if (!director_connection_sync_host(conn, host, seq,
1738 minor_version, timestamp,
1739 hosts_hash))
1740 return TRUE;
1741 }
1742
1743 /* If directors got disconnected while we were waiting a SYNC reply,
1744 it might have gotten lost. If we've received a DIRECTOR update since
1745 the last time we sent a SYNC, retry sending it here to make sure
1746 it doesn't get stuck. We don't want to do this too eagerly because
1747 it may trigger desynced_hosts_hash != hosts_hash mismatch, which
1748 causes unnecessary error logging and hosts-resending. */
1749 if ((host == NULL || !host->self) &&
1750 dir->last_sync_sent_ring_change_counter != dir->ring_change_counter &&
1751 (time_t)dir->self_host->last_sync_timestamp != ioloop_time)
1752 (void)director_resend_sync(dir);
1753 return TRUE;
1754 }
1755
director_disconnect_timeout(struct director_connection * conn)1756 static void director_disconnect_timeout(struct director_connection *conn)
1757 {
1758 director_connection_deinit(&conn, "CONNECT requested");
1759 }
1760
1761 static void
director_reconnect_after_wrong_connect_timeout(struct director_connection * conn)1762 director_reconnect_after_wrong_connect_timeout(struct director_connection *conn)
1763 {
1764 struct director *dir = conn->dir;
1765
1766 director_connection_deinit(&conn, "Wrong CONNECT requested");
1767 if (dir->right == NULL)
1768 director_connect(dir, "Reconnecting after wrong CONNECT request");
1769 }
1770
1771 static void
director_reconnect_after_wrong_connect(struct director_connection * conn)1772 director_reconnect_after_wrong_connect(struct director_connection *conn)
1773 {
1774 if (conn->to_disconnect != NULL)
1775 return;
1776 conn->to_disconnect =
1777 timeout_add_short(DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS,
1778 director_reconnect_after_wrong_connect_timeout, conn);
1779 }
1780
director_cmd_connect(struct director_connection * conn,const char * const * args)1781 static bool director_cmd_connect(struct director_connection *conn,
1782 const char *const *args)
1783 {
1784 struct director *dir = conn->dir;
1785 struct director_host *host;
1786 struct ip_addr ip;
1787 in_port_t port;
1788 const char *right_state;
1789
1790 if (str_array_length(args) != 2 ||
1791 !director_args_parse_ip_port(conn, args, &ip, &port)) {
1792 director_cmd_error(conn, "Invalid parameters");
1793 return FALSE;
1794 }
1795
1796 host = director_host_get(conn->dir, &ip, port);
1797
1798 /* remote suggests us to connect elsewhere */
1799 if (dir->right != NULL &&
1800 director_host_cmp_to_self(host, dir->right->host,
1801 dir->self_host) <= 0) {
1802 /* the old connection is the correct one */
1803 e_debug(conn->event, "Ignoring CONNECT request to %s (current right is %s)",
1804 host->name, dir->right->name);
1805 director_reconnect_after_wrong_connect(conn);
1806 return TRUE;
1807 }
1808 if (host->removed) {
1809 e_debug(conn->event, "Ignoring CONNECT request to %s (director is removed)",
1810 host->name);
1811 director_reconnect_after_wrong_connect(conn);
1812 return TRUE;
1813 }
1814
1815 /* reset failure timestamp so we'll actually try to connect there. */
1816 host->last_network_failure = 0;
1817 /* reset removed-flag, so we don't crash */
1818 host->removed = FALSE;
1819
1820 if (dir->right == NULL) {
1821 right_state = "initializing right";
1822 } else {
1823 right_state = t_strdup_printf("replacing current right %s",
1824 dir->right->name);
1825 /* disconnect from right side immediately - it's not accepting
1826 any further commands from us. */
1827 if (conn->dir->right != conn)
1828 director_connection_deinit(&conn->dir->right, "CONNECT requested");
1829 else if (conn->to_disconnect == NULL) {
1830 conn->to_disconnect =
1831 timeout_add_short(0, director_disconnect_timeout, conn);
1832 }
1833 }
1834
1835 /* connect here */
1836 (void)director_connect_host(dir, host, t_strdup_printf(
1837 "Received CONNECT request from %s - %s",
1838 conn->name, right_state));
1839 return TRUE;
1840 }
1841
director_disconnect_wrong_lefts(struct director * dir)1842 static void director_disconnect_wrong_lefts(struct director *dir)
1843 {
1844 struct director_connection *conn;
1845
1846 array_foreach_elem(&dir->connections, conn) {
1847 if (conn->in && conn != dir->left && conn->me_received &&
1848 conn->to_disconnect == NULL &&
1849 director_host_cmp_to_self(dir->left->host, conn->host,
1850 dir->self_host) < 0)
1851 director_connection_send_connect(conn, dir->left->host);
1852 }
1853 }
1854
director_cmd_ping(struct director_connection * conn,const char * const * args)1855 static bool director_cmd_ping(struct director_connection *conn,
1856 const char *const *args)
1857 {
1858 time_t sent_time;
1859 uintmax_t send_buffer_size;
1860
1861 if (str_array_length(args) >= 2 &&
1862 str_to_time(args[0], &sent_time) == 0 &&
1863 str_to_uintmax(args[1], &send_buffer_size) == 0) {
1864 int diff_secs = ioloop_time - sent_time;
1865 if (diff_secs*1000+500 > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) {
1866 e_warning(conn->event,
1867 "PING response took %d secs to receive "
1868 "(send buffer was %ju bytes)",
1869 diff_secs, send_buffer_size);
1870 }
1871 }
1872 director_connection_send(conn,
1873 t_strdup_printf("PONG\t%"PRIdTIME_T"\t%zu\n",
1874 ioloop_time, o_stream_get_buffer_used_size(conn->output)));
1875 return TRUE;
1876 }
1877
1878 static void
director_ping_append_extra(struct director_connection * conn,string_t * str,time_t pong_sent_time,uintmax_t pong_send_buffer_size)1879 director_ping_append_extra(struct director_connection *conn, string_t *str,
1880 time_t pong_sent_time,
1881 uintmax_t pong_send_buffer_size)
1882 {
1883 struct rusage usage;
1884
1885 str_printfa(str, "buffer size at PING was %zu bytes", conn->ping_sent_buffer_size);
1886 if (pong_sent_time != 0) {
1887 str_printfa(str, ", remote sent it %"PRIdTIME_T" secs ago",
1888 ioloop_time - pong_sent_time);
1889 }
1890 if (pong_send_buffer_size != (uintmax_t)-1) {
1891 str_printfa(str, ", remote buffer size at PONG was %ju bytes",
1892 pong_send_buffer_size);
1893 }
1894 if (conn->ping_sent_user_cpu.tv_sec != (time_t)-1 &&
1895 getrusage(RUSAGE_SELF, &usage) == 0) {
1896 int diff = timeval_diff_msecs(&usage.ru_utime,
1897 &conn->ping_sent_user_cpu);
1898 str_printfa(str, ", %u.%03u CPU secs since PING was sent",
1899 diff/1000, diff%1000);
1900 }
1901 str_printfa(str, ", %"PRIuUOFF_T" bytes input",
1902 conn->input->v_offset - conn->ping_sent_input_offset);
1903 str_printfa(str, ", %"PRIuUOFF_T" bytes output",
1904 conn->output->offset - conn->ping_sent_output_offset);
1905 }
1906
director_cmd_pong(struct director_connection * conn,const char * const * args)1907 static bool director_cmd_pong(struct director_connection *conn,
1908 const char *const *args)
1909 {
1910 time_t sent_time;
1911 uintmax_t send_buffer_size;
1912
1913 if (!conn->ping_waiting)
1914 return TRUE;
1915 conn->ping_waiting = FALSE;
1916 timeout_remove(&conn->to_pong);
1917
1918 if (str_array_length(args) < 2 ||
1919 str_to_time(args[0], &sent_time) < 0 ||
1920 str_to_uintmax(args[1], &send_buffer_size) < 0) {
1921 sent_time = 0;
1922 send_buffer_size = (uintmax_t)-1;
1923 }
1924
1925 int ping_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
1926 if (ping_msecs >= 0) {
1927 if (ping_msecs > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) {
1928 string_t *extra = t_str_new(128);
1929 director_ping_append_extra(conn, extra, sent_time, send_buffer_size);
1930 e_warning(conn->event,
1931 "PONG response took %u.%03u secs (%s)",
1932 ping_msecs/1000, ping_msecs%1000,
1933 str_c(extra));
1934 }
1935 conn->last_ping_msecs = ping_msecs;
1936 }
1937
1938 if (conn->verifying_left) {
1939 conn->verifying_left = FALSE;
1940 if (conn == conn->dir->left) {
1941 /* our left side is functional. tell all the wrong
1942 incoming connections to connect to it instead. */
1943 director_disconnect_wrong_lefts(conn->dir);
1944 }
1945 }
1946
1947 director_connection_set_ping_timeout(conn);
1948 return TRUE;
1949 }
1950
1951 static bool
director_connection_handle_cmd(struct director_connection * conn,const char * cmd,const char * const * args)1952 director_connection_handle_cmd(struct director_connection *conn,
1953 const char *cmd, const char *const *args)
1954 {
1955 int ret;
1956
1957 if (!conn->handshake_received) {
1958 ret = director_connection_handle_handshake(conn, cmd, args);
1959 if (ret > 0)
1960 return TRUE;
1961 if (ret < 0) {
1962 /* invalid commands during handshake,
1963 we probably don't want to reconnect here */
1964 return FALSE;
1965 }
1966 /* allow also other commands during handshake */
1967 }
1968
1969 if (strcmp(cmd, "PING") == 0)
1970 return director_cmd_ping(conn, args);
1971 if (strcmp(cmd, "PONG") == 0)
1972 return director_cmd_pong(conn, args);
1973 if (strcmp(cmd, "USER") == 0)
1974 return director_cmd_user(conn, args);
1975 if (strcmp(cmd, "USER-WEAK") == 0)
1976 return director_cmd_user_weak(conn, args);
1977 if (strcmp(cmd, "HOST") == 0)
1978 return director_cmd_host(conn, args);
1979 if (strcmp(cmd, "HOST-REMOVE") == 0)
1980 return director_cmd_host_remove(conn, args);
1981 if (strcmp(cmd, "HOST-FLUSH") == 0)
1982 return director_cmd_host_flush(conn, args);
1983 if (strcmp(cmd, "USER-MOVE") == 0)
1984 return director_cmd_user_move(conn, args);
1985 if (strcmp(cmd, "USER-KICK") == 0)
1986 return director_cmd_user_kick(conn, args);
1987 if (strcmp(cmd, "USER-KICK-ALT") == 0)
1988 return director_cmd_user_kick_alt(conn, args);
1989 if (strcmp(cmd, "USER-KICK-HASH") == 0)
1990 return director_cmd_user_kick_hash(conn, args);
1991 if (strcmp(cmd, "USER-KILLED") == 0)
1992 return director_cmd_user_killed(conn, args);
1993 if (strcmp(cmd, "USER-KILLED-EVERYWHERE") == 0)
1994 return director_cmd_user_killed_everywhere(conn, args);
1995 if (strcmp(cmd, "DIRECTOR") == 0)
1996 return director_cmd_director(conn, args);
1997 if (strcmp(cmd, "DIRECTOR-REMOVE") == 0)
1998 return director_cmd_director_remove(conn, args);
1999 if (strcmp(cmd, "SYNC") == 0)
2000 return director_connection_sync(conn, args);
2001 if (strcmp(cmd, "CONNECT") == 0)
2002 return director_cmd_connect(conn, args);
2003 if (strcmp(cmd, "QUIT") == 0) {
2004 e_warning(conn->event,
2005 "Director %s disconnected us with reason: %s",
2006 conn->name, t_strarray_join(args, " "));
2007 return FALSE;
2008 }
2009
2010 director_cmd_error(conn, "Unknown command %s", cmd);
2011 return FALSE;
2012 }
2013
2014 static bool
director_connection_handle_line(struct director_connection * conn,char * line)2015 director_connection_handle_line(struct director_connection *conn,
2016 char *line)
2017 {
2018 const char *cmd, *const *args;
2019 bool ret;
2020
2021 e_debug(conn->event, "input: %s", line);
2022
2023 args = t_strsplit_tabescaped_inplace(line);
2024 cmd = args[0];
2025 if (cmd == NULL) {
2026 director_cmd_error(conn, "Received empty line");
2027 return FALSE;
2028 }
2029
2030 conn->cur_cmd = cmd;
2031 conn->cur_args = args;
2032 ret = director_connection_handle_cmd(conn, cmd, args+1);
2033 conn->cur_cmd = NULL;
2034 conn->cur_args = NULL;
2035 return ret;
2036 }
2037
2038 static void
director_connection_log_disconnect(struct director_connection * conn,int err,const char * errstr)2039 director_connection_log_disconnect(struct director_connection *conn, int err,
2040 const char *errstr)
2041 {
2042 string_t *str = t_str_new(128);
2043
2044 i_assert(conn->connected);
2045
2046 if (conn->connect_request_to != NULL) {
2047 e_warning(conn->event,
2048 "Director %s tried to connect to us, "
2049 "should use %s instead",
2050 conn->name, conn->connect_request_to->name);
2051 return;
2052 }
2053
2054 str_printfa(str, "Director %s disconnected: ", conn->name);
2055 str_append(str, "Connection closed");
2056 if (err != 0 && err != EPIPE) {
2057 errno = err;
2058 if (errstr[0] == '\0')
2059 str_printfa(str, ": %m");
2060 else
2061 str_printfa(str, ": %s", errstr);
2062 }
2063
2064 str_append(str, " (");
2065 director_connection_append_stats(conn, str);
2066
2067 if (!conn->me_received)
2068 str_append(str, ", handshake ME not received");
2069 else if (!conn->handshake_received)
2070 str_append(str, ", handshake DONE not received");
2071 if (conn->synced)
2072 str_append(str, ", synced");
2073 str_append_c(str, ')');
2074 e_error(conn->event, "%s", str_c(str));
2075 }
2076
director_connection_input(struct director_connection * conn)2077 static void director_connection_input(struct director_connection *conn)
2078 {
2079 struct director *dir = conn->dir;
2080 char *line;
2081 uoff_t prev_offset;
2082 bool ret;
2083
2084 switch (i_stream_read(conn->input)) {
2085 case 0:
2086 return;
2087 case -1:
2088 /* disconnected */
2089 director_connection_log_disconnect(conn, conn->input->stream_errno,
2090 i_stream_get_error(conn->input));
2091 director_connection_disconnected(&conn, i_stream_get_error(conn->input));
2092 return;
2093 case -2:
2094 /* buffer full */
2095 director_cmd_error(conn, "Director sent us more than %d bytes",
2096 MAX_INBUF_SIZE);
2097 director_connection_reconnect(&conn, "Too long input line");
2098 return;
2099 }
2100
2101 if (conn->to_disconnect != NULL) {
2102 /* just read everything the remote sends, and wait for it
2103 to disconnect. we mainly just want the remote to read the
2104 CONNECT we sent it. */
2105 i_stream_skip(conn->input, i_stream_get_data_size(conn->input));
2106 return;
2107 }
2108 conn->last_input = ioloop_timeval;
2109 conn->refcount++;
2110
2111 director_sync_freeze(dir);
2112 prev_offset = conn->input->v_offset;
2113 while ((line = i_stream_next_line(conn->input)) != NULL) {
2114 dir->ring_traffic_input += conn->input->v_offset - prev_offset;
2115 prev_offset = conn->input->v_offset;
2116
2117 T_BEGIN {
2118 ret = director_connection_handle_line(conn, line);
2119 } T_END;
2120
2121 if (!ret) {
2122 if (!director_connection_unref(conn))
2123 break;
2124 director_connection_reconnect(&conn, t_strdup_printf(
2125 "Invalid input: %s", line));
2126 break;
2127 }
2128 }
2129 director_sync_thaw(dir);
2130 if (conn != NULL) {
2131 if (director_connection_unref(conn))
2132 timeout_reset(conn->to_ping);
2133 }
2134 }
2135
director_connection_send_directors(struct director_connection * conn)2136 static void director_connection_send_directors(struct director_connection *conn)
2137 {
2138 struct director_host *host;
2139 string_t *str = t_str_new(64);
2140
2141 array_foreach_elem(&conn->dir->dir_hosts, host) {
2142 if (host->removed)
2143 continue;
2144
2145 str_truncate(str, 0);
2146 str_printfa(str, "DIRECTOR\t%s\t%u\n",
2147 host->ip_str, host->port);
2148 director_connection_send(conn, str_c(str));
2149 }
2150 }
2151
2152 static void
director_connection_send_hosts(struct director_connection * conn)2153 director_connection_send_hosts(struct director_connection *conn)
2154 {
2155 struct mail_host *host;
2156 bool send_updowns;
2157 string_t *str = t_str_new(128);
2158
2159 i_assert(conn->version_received);
2160
2161 send_updowns = conn->minor_version >= DIRECTOR_VERSION_UPDOWN;
2162
2163 str_printfa(str, "HOST-HAND-START\t%u\n",
2164 conn->dir->ring_handshaked ? 1 : 0);
2165 array_foreach_elem(mail_hosts_get(conn->dir->mail_hosts), host) {
2166 const char *host_tag = mail_host_get_tag(host);
2167
2168 str_printfa(str, "HOST\t%s\t%u",
2169 host->ip_str, host->vhost_count);
2170 if (host_tag[0] != '\0' || send_updowns) {
2171 str_append_c(str, '\t');
2172 str_append_tabescaped(str, host_tag);
2173 }
2174 if (send_updowns) {
2175 str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U',
2176 (long)host->last_updown_change);
2177 if (host->hostname != NULL)
2178 str_append_tabescaped(str, host->hostname);
2179 }
2180 str_append_c(str, '\n');
2181 director_connection_send(conn, str_c(str));
2182 str_truncate(str, 0);
2183 }
2184 str_printfa(str, "HOST-HAND-END\t%u\n",
2185 conn->dir->ring_handshaked ? 1 : 0);
2186 director_connection_send(conn, str_c(str));
2187 }
2188
director_connection_send_done(struct director_connection * conn)2189 static int director_connection_send_done(struct director_connection *conn)
2190 {
2191 i_assert(conn->version_received);
2192
2193 if (conn->minor_version >= DIRECTOR_VERSION_OPTIONS) {
2194 director_connection_send(conn,
2195 "OPTIONS\t"DIRECTOR_OPT_CONSISTENT_HASHING"\n");
2196 } else {
2197 e_error(conn->event, "Director version is too old for supporting director_consistent_hashing=yes");
2198 return -1;
2199 }
2200 director_connection_send(conn, "DONE\n");
2201 return 0;
2202 }
2203
director_connection_send_users(struct director_connection * conn)2204 static int director_connection_send_users(struct director_connection *conn)
2205 {
2206 struct user *user;
2207 string_t *str = t_str_new(128);
2208 char dec_buf[MAX_INT_STRLEN];
2209 unsigned int sent_count = 0;
2210 int ret;
2211
2212 i_assert(conn->version_received);
2213
2214 /* with new versions use "U" for sending the handshake users, because
2215 otherwise their parameters may look identical and can't be
2216 distinguished. */
2217 if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_HANDSHAKE_U_CMD)
2218 str_append(str, "U\t");
2219 else
2220 str_append(str, "USER\t");
2221 size_t cmd_prefix_len = str_len(str);
2222 while ((user = director_iterate_users_next(conn->user_iter)) != NULL) {
2223 str_truncate(str, cmd_prefix_len);
2224 str_append(str, dec2str_buf(dec_buf, user->username_hash));
2225 str_append_c(str, '\t');
2226 str_append(str, user->host->ip_str);
2227 str_append_c(str, '\t');
2228 str_append(str, dec2str_buf(dec_buf, user->timestamp));
2229 if (user->weak)
2230 str_append(str, "\tw");
2231 str_append_c(str, '\n');
2232
2233 conn->handshake_users_sent++;
2234 director_connection_send(conn, str_c(str));
2235 if (++sent_count >= DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH) {
2236 /* Don't send too much at once to avoid hangs */
2237 timeout_reset(conn->to_ping);
2238 return 0;
2239 }
2240
2241 if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
2242 if ((ret = o_stream_flush(conn->output)) <= 0) {
2243 /* continue later */
2244 timeout_reset(conn->to_ping);
2245 return ret;
2246 }
2247 }
2248 }
2249 director_iterate_users_deinit(&conn->user_iter);
2250 if (director_connection_send_done(conn) < 0)
2251 return -1;
2252
2253 if (conn->users_unsorted && conn->handshake_received) {
2254 /* we received remote's list of users before sending ours */
2255 conn->users_unsorted = FALSE;
2256 mail_hosts_sort_users(conn->dir->mail_hosts);
2257 }
2258
2259 ret = o_stream_flush(conn->output);
2260 timeout_reset(conn->to_ping);
2261 return ret;
2262 }
2263
director_connection_output(struct director_connection * conn)2264 static int director_connection_output(struct director_connection *conn)
2265 {
2266 int ret;
2267
2268 conn->last_output = ioloop_timeval;
2269 if (conn->user_iter != NULL) {
2270 /* still handshaking USER list */
2271 ret = director_connection_send_users(conn);
2272 if (ret < 0) {
2273 director_connection_log_disconnect(conn, conn->output->stream_errno,
2274 o_stream_get_error(conn->output));
2275 director_connection_disconnected(&conn,
2276 o_stream_get_error(conn->output));
2277 } else {
2278 o_stream_set_flush_pending(conn->output, TRUE);
2279 }
2280 return ret;
2281 }
2282 return o_stream_flush(conn->output);
2283 }
2284
2285 static struct director_connection *
director_connection_init_common(struct director * dir,int fd)2286 director_connection_init_common(struct director *dir, int fd)
2287 {
2288 struct director_connection *conn;
2289
2290 conn = i_new(struct director_connection, 1);
2291 conn->refcount = 1;
2292 conn->created = ioloop_timeval;
2293 conn->fd = fd;
2294 conn->dir = dir;
2295 conn->event = event_create(dir->event);
2296 conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE);
2297 conn->output = o_stream_create_fd(conn->fd, dir->set->director_output_buffer_size);
2298 o_stream_set_no_error_handling(conn->output, TRUE);
2299 array_push_back(&dir->connections, &conn);
2300 return conn;
2301 }
2302
director_connection_send_handshake(struct director_connection * conn)2303 static void director_connection_send_handshake(struct director_connection *conn)
2304 {
2305 director_connection_send(conn, t_strdup_printf(
2306 "VERSION\t"DIRECTOR_VERSION_NAME"\t%u\t%u\n"
2307 "ME\t%s\t%u\t%lld\n",
2308 DIRECTOR_VERSION_MAJOR, DIRECTOR_VERSION_MINOR,
2309 net_ip2addr(&conn->dir->self_ip), conn->dir->self_port,
2310 (long long)time(NULL)));
2311 }
2312
director_connection_set_connected(struct director_connection * conn)2313 static void director_connection_set_connected(struct director_connection *conn)
2314 {
2315 struct rusage usage;
2316
2317 conn->connected = TRUE;
2318 conn->connected_time = ioloop_timeval;
2319
2320 if (getrusage(RUSAGE_SELF, &usage) == 0) {
2321 conn->connected_user_cpu_set = TRUE;
2322 conn->connected_user_cpu = usage.ru_utime;
2323 }
2324 }
2325
2326 struct director_connection *
director_connection_init_in(struct director * dir,int fd,const struct ip_addr * ip)2327 director_connection_init_in(struct director *dir, int fd,
2328 const struct ip_addr *ip)
2329 {
2330 struct director_connection *conn;
2331
2332 conn = director_connection_init_common(dir, fd);
2333 conn->in = TRUE;
2334 director_connection_set_connected(conn);
2335 director_connection_set_name(conn,
2336 t_strdup_printf("%s/in", net_ip2addr(ip)));
2337 conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
2338 conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
2339 director_connection_init_timeout, conn);
2340
2341 e_info(conn->event, "Incoming connection from director %s", conn->name);
2342 director_connection_send_handshake(conn);
2343 return conn;
2344 }
2345
director_connection_connected(struct director_connection * conn)2346 static void director_connection_connected(struct director_connection *conn)
2347 {
2348 int err;
2349
2350 if ((err = net_geterror(conn->fd)) != 0) {
2351 e_error(conn->event, "connect() failed: %s", strerror(err));
2352 director_connection_disconnected(&conn, strerror(err));
2353 return;
2354 }
2355 director_connection_set_connected(conn);
2356 o_stream_set_flush_callback(conn->output,
2357 director_connection_output, conn);
2358
2359 io_remove(&conn->io);
2360 conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
2361
2362 timeout_remove(&conn->to_ping);
2363 conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
2364 director_connection_init_timeout, conn);
2365
2366 o_stream_cork(conn->output);
2367 director_connection_send_handshake(conn);
2368 director_connection_send_directors(conn);
2369 o_stream_uncork(conn->output);
2370 /* send the rest of the handshake after we've received the remote's
2371 version number */
2372 }
2373
director_finish_sending_handshake(struct director_connection * conn)2374 static void director_finish_sending_handshake(struct director_connection *conn)
2375 {
2376 if (
2377 conn->in) {
2378 /* only outgoing connections send hosts & users */
2379 return;
2380 }
2381 o_stream_cork(conn->output);
2382 director_connection_send_hosts(conn);
2383
2384 i_assert(conn->user_iter == NULL);
2385 /* Iterate only through users that aren't refreshed since the
2386 iteration started. The refreshed users will already be sent as
2387 regular USER updates, so they don't need to be sent again.
2388
2389 We especially don't want to send these users again, because
2390 otherwise in a rapidly changing director we might never end up
2391 sending all the users when they constantly keep being added to the
2392 end of the list. (The iteration lists users in order from older to
2393 newer.) */
2394 conn->user_iter = director_iterate_users_init(conn->dir, TRUE);
2395
2396 if (director_connection_send_users(conn) == 0)
2397 o_stream_set_flush_pending(conn->output, TRUE);
2398
2399 o_stream_uncork(conn->output);
2400 }
2401
2402 struct director_connection *
director_connection_init_out(struct director * dir,int fd,struct director_host * host)2403 director_connection_init_out(struct director *dir, int fd,
2404 struct director_host *host)
2405 {
2406 struct director_connection *conn;
2407
2408 i_assert(!host->removed);
2409
2410 /* make sure we don't keep old sequence values across restarts */
2411 director_host_restarted(host);
2412
2413 conn = director_connection_init_common(dir, fd);
2414 director_connection_set_name(conn,
2415 t_strdup_printf("%s/out", host->name));
2416 conn->host = host;
2417 director_host_ref(host);
2418 conn->io = io_add(conn->fd, IO_WRITE,
2419 director_connection_connected, conn);
2420 conn->to_ping = timeout_add(DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS,
2421 director_connection_init_timeout, conn);
2422 return conn;
2423 }
2424
director_connection_deinit(struct director_connection ** _conn,const char * remote_reason)2425 void director_connection_deinit(struct director_connection **_conn,
2426 const char *remote_reason)
2427 {
2428 struct director_connection *const *conns, *conn = *_conn;
2429 struct director *dir = conn->dir;
2430 unsigned int i, count;
2431
2432 *_conn = NULL;
2433
2434 i_assert(conn->fd != -1);
2435
2436 if (conn->host != NULL) {
2437 e_debug(conn->event, "Disconnecting from %s: %s",
2438 conn->host->name, remote_reason);
2439 }
2440 if (*remote_reason != '\0' &&
2441 conn->minor_version >= DIRECTOR_VERSION_QUIT) {
2442 o_stream_nsend_str(conn->output, t_strdup_printf(
2443 "QUIT\t%s\n", remote_reason));
2444 }
2445
2446 conns = array_get(&dir->connections, &count);
2447 for (i = 0; i < count; i++) {
2448 if (conns[i] == conn) {
2449 array_delete(&dir->connections, i, 1);
2450 break;
2451 }
2452 }
2453 i_assert(i < count);
2454 if (dir->left == conn) {
2455 dir->left = NULL;
2456 /* if there is already another handshaked incoming connection,
2457 use it as the new "left" */
2458 director_assign_left(dir);
2459 }
2460 if (dir->right == conn)
2461 dir->right = NULL;
2462
2463 if (conn->users_unsorted) {
2464 /* Users were received, but handshake didn't finish.
2465 Finish sorting so the users won't stay in wrong order. */
2466 mail_hosts_sort_users(conn->dir->mail_hosts);
2467 }
2468
2469 if (conn->connect_request_to != NULL) {
2470 director_host_unref(conn->connect_request_to);
2471 conn->connect_request_to = NULL;
2472 }
2473 if (conn->user_iter != NULL)
2474 director_iterate_users_deinit(&conn->user_iter);
2475 timeout_remove(&conn->to_disconnect);
2476 timeout_remove(&conn->to_pong);
2477 timeout_remove(&conn->to_ping);
2478 io_remove(&conn->io);
2479 i_stream_close(conn->input);
2480 o_stream_close(conn->output);
2481 i_close_fd(&conn->fd);
2482
2483 if (conn->in)
2484 master_service_client_connection_destroyed(master_service);
2485 director_connection_unref(conn);
2486
2487 if (dir->left == NULL || dir->right == NULL) {
2488 /* we aren't synced until we're again connected to a ring */
2489 dir->sync_seq++;
2490 director_set_ring_unsynced(dir);
2491 }
2492 }
2493
director_connection_unref(struct director_connection * conn)2494 static bool director_connection_unref(struct director_connection *conn)
2495 {
2496 i_assert(conn->refcount > 0);
2497 if (--conn->refcount > 0)
2498 return TRUE;
2499
2500 if (conn->host != NULL)
2501 director_host_unref(conn->host);
2502 i_stream_unref(&conn->input);
2503 o_stream_unref(&conn->output);
2504 event_unref(&conn->event);
2505 i_free(conn->name);
2506 i_free(conn);
2507 return FALSE;
2508 }
2509
director_connection_disconnected(struct director_connection ** _conn,const char * reason)2510 static void director_connection_disconnected(struct director_connection **_conn,
2511 const char *reason)
2512 {
2513 struct director_connection *conn = *_conn;
2514 struct director *dir = conn->dir;
2515
2516 if ((conn->connected_time.tv_sec == 0 ||
2517 conn->connected_time.tv_sec + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time) &&
2518 conn->host != NULL) {
2519 /* connection didn't exist for very long, assume it has a
2520 network problem */
2521 conn->host->last_network_failure = ioloop_time;
2522 }
2523
2524 director_connection_deinit(_conn, reason);
2525 if (dir->right == NULL)
2526 director_connect(dir, "Reconnecting after disconnection");
2527 }
2528
director_connection_reconnect(struct director_connection ** _conn,const char * reason)2529 static void director_connection_reconnect(struct director_connection **_conn,
2530 const char *reason)
2531 {
2532 struct director_connection *conn = *_conn;
2533 struct director *dir = conn->dir;
2534
2535 director_connection_deinit(_conn, reason);
2536 if (dir->right == NULL)
2537 director_connect(dir, "Reconnecting after error");
2538 }
2539
director_disconnect_write_error(struct director_connection * conn)2540 static void director_disconnect_write_error(struct director_connection *conn)
2541 {
2542 struct director *dir = conn->dir;
2543
2544 director_connection_deinit(&conn, "write failure");
2545 if (dir->right == NULL)
2546 director_connect(dir, "Reconnecting after write failure");
2547 }
2548
director_connection_send(struct director_connection * conn,const char * data)2549 void director_connection_send(struct director_connection *conn,
2550 const char *data)
2551 {
2552 size_t len = strlen(data);
2553 off_t ret;
2554
2555 if (conn->output->closed || !conn->connected)
2556 return;
2557
2558 if (event_want_debug(conn->event)) T_BEGIN {
2559 const char *const *lines = t_strsplit(data, "\n");
2560 for (; lines[1] != NULL; lines++)
2561 e_debug(conn->event, "output: %s", *lines);
2562 } T_END;
2563 ret = o_stream_send(conn->output, data, len);
2564 if (ret != (off_t)len) {
2565 if (ret < 0) {
2566 director_connection_log_disconnect(conn,
2567 conn->output->stream_errno,
2568 t_strdup_printf("write() failed: %s",
2569 o_stream_get_error(conn->output)));
2570 } else {
2571 director_connection_log_disconnect(conn, EINVAL,
2572 t_strdup_printf("Output buffer full at %zu",
2573 o_stream_get_buffer_used_size(conn->output)));
2574 }
2575 o_stream_close(conn->output);
2576 /* closing the stream when output buffer is full doesn't cause
2577 disconnection itself. */
2578 timeout_remove(&conn->to_disconnect);
2579 conn->to_disconnect =
2580 timeout_add_short(0, director_disconnect_write_error, conn);
2581 } else {
2582 conn->dir->ring_traffic_output += len;
2583 conn->last_output = ioloop_timeval;
2584 conn->peak_bytes_buffered =
2585 I_MAX(conn->peak_bytes_buffered,
2586 o_stream_get_buffer_used_size(conn->output));
2587 }
2588 }
2589
2590 static void
director_connection_ping_idle_timeout(struct director_connection * conn)2591 director_connection_ping_idle_timeout(struct director_connection *conn)
2592 {
2593 string_t *str = t_str_new(128);
2594 int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
2595
2596 str_printfa(str, "Ping timed out in %u.%03u secs: ",
2597 diff/1000, diff%1000);
2598 director_ping_append_extra(conn, str, 0, (uintmax_t)-1);
2599 director_connection_log_disconnect(conn, EINVAL, str_c(str));
2600 director_connection_disconnected(&conn, "Ping timeout");
2601 }
2602
director_connection_pong_timeout(struct director_connection * conn)2603 static void director_connection_pong_timeout(struct director_connection *conn)
2604 {
2605 int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time);
2606 const char *errstr;
2607
2608 errstr = t_strdup_printf(
2609 "PONG reply not received in %u.%03u secs, "
2610 "although other input keeps coming",
2611 diff/1000, diff%1000);
2612 director_connection_log_disconnect(conn, EINVAL, errstr);
2613 director_connection_disconnected(&conn, "Pong timeout");
2614 }
2615
director_connection_ping(struct director_connection * conn)2616 void director_connection_ping(struct director_connection *conn)
2617 {
2618 if (conn->ping_waiting)
2619 return;
2620
2621 timeout_remove(&conn->to_ping);
2622 conn->to_ping = timeout_add(conn->dir->set->director_ping_idle_timeout*1000,
2623 director_connection_ping_idle_timeout, conn);
2624 conn->to_pong = timeout_add(conn->dir->set->director_ping_max_timeout*1000,
2625 director_connection_pong_timeout, conn);
2626 conn->ping_waiting = TRUE;
2627 conn->ping_sent_time = ioloop_timeval;
2628 conn->ping_sent_buffer_size = o_stream_get_buffer_used_size(conn->output);
2629 conn->ping_sent_input_offset = conn->input->v_offset;
2630 conn->ping_sent_output_offset = conn->output->offset;
2631
2632 struct rusage usage;
2633 if (getrusage(RUSAGE_SELF, &usage) == 0)
2634 conn->ping_sent_user_cpu = usage.ru_utime;
2635 else
2636 conn->ping_sent_user_cpu.tv_sec = (time_t)-1;
2637 /* send it after getting the buffer size */
2638 director_connection_send(conn, t_strdup_printf(
2639 "PING\t%"PRIdTIME_T"\t%zu\n", ioloop_time,
2640 conn->ping_sent_buffer_size));
2641 }
2642
director_connection_get_name(struct director_connection * conn)2643 const char *director_connection_get_name(struct director_connection *conn)
2644 {
2645 return conn->name;
2646 }
2647
2648 struct director_host *
director_connection_get_host(struct director_connection * conn)2649 director_connection_get_host(struct director_connection *conn)
2650 {
2651 return conn->host;
2652 }
2653
director_connection_is_handshaked(struct director_connection * conn)2654 bool director_connection_is_handshaked(struct director_connection *conn)
2655 {
2656 return conn->handshake_received;
2657 }
2658
director_connection_is_synced(struct director_connection * conn)2659 bool director_connection_is_synced(struct director_connection *conn)
2660 {
2661 return conn->synced;
2662 }
2663
director_connection_is_incoming(struct director_connection * conn)2664 bool director_connection_is_incoming(struct director_connection *conn)
2665 {
2666 return conn->in;
2667 }
2668
2669 unsigned int
director_connection_get_minor_version(struct director_connection * conn)2670 director_connection_get_minor_version(struct director_connection *conn)
2671 {
2672 return conn->minor_version;
2673 }
2674
director_connection_cork(struct director_connection * conn)2675 void director_connection_cork(struct director_connection *conn)
2676 {
2677 o_stream_cork(conn->output);
2678 }
2679
director_connection_uncork(struct director_connection * conn)2680 void director_connection_uncork(struct director_connection *conn)
2681 {
2682 o_stream_uncork(conn->output);
2683 }
2684
director_connection_set_synced(struct director_connection * conn,bool synced)2685 void director_connection_set_synced(struct director_connection *conn,
2686 bool synced)
2687 {
2688 if (conn->synced == synced)
2689 return;
2690 conn->synced = synced;
2691
2692 /* switch ping timeout, unless we're already waiting for PONG */
2693 if (conn->ping_waiting)
2694 return;
2695
2696 director_connection_set_ping_timeout(conn);
2697 }
2698
director_connection_get_status(struct director_connection * conn,struct director_connection_status * status_r)2699 void director_connection_get_status(struct director_connection *conn,
2700 struct director_connection_status *status_r)
2701 {
2702 i_zero(status_r);
2703 status_r->bytes_read = conn->input->v_offset;
2704 status_r->bytes_sent = conn->output->offset;
2705 status_r->bytes_buffered = o_stream_get_buffer_used_size(conn->output);
2706 status_r->peak_bytes_buffered = conn->peak_bytes_buffered;
2707 status_r->last_input = conn->last_input;
2708 status_r->last_output = conn->last_output;
2709 status_r->last_ping_msecs = conn->last_ping_msecs;
2710 status_r->handshake_users_sent = conn->handshake_users_sent;
2711 status_r->handshake_users_received = conn->handshake_users_received;
2712 }
2713