1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "array.h"
5 #include "ioloop.h"
6 #include "dsync-client.h"
7 #include "replicator-settings.h"
8 #include "replicator-queue.h"
9 #include "replicator-brain.h"
10 
11 struct replicator_sync_context {
12 	struct replicator_brain *brain;
13 	struct replicator_user *user;
14 };
15 
16 struct replicator_brain {
17 	pool_t pool;
18 	struct replicator_queue *queue;
19 	const struct replicator_settings *set;
20 	struct timeout *to;
21 
22 	ARRAY_TYPE(dsync_client) dsync_clients;
23 
24 	bool deinitializing:1;
25 };
26 
27 static void replicator_brain_fill(struct replicator_brain *brain);
28 
replicator_brain_queue_changed(void * context)29 static void replicator_brain_queue_changed(void *context)
30 {
31 	struct replicator_brain *brain = context;
32 
33 	replicator_brain_fill(brain);
34 }
35 
36 struct replicator_brain *
replicator_brain_init(struct replicator_queue * queue,const struct replicator_settings * set)37 replicator_brain_init(struct replicator_queue *queue,
38 		      const struct replicator_settings *set)
39 {
40 	struct replicator_brain *brain;
41 	pool_t pool;
42 
43 	pool = pool_alloconly_create("replication brain", 1024);
44 	brain = p_new(pool, struct replicator_brain, 1);
45 	brain->pool = pool;
46 	brain->queue = queue;
47 	brain->set = set;
48 	p_array_init(&brain->dsync_clients, pool, 16);
49 	replicator_queue_set_change_callback(queue,
50 		replicator_brain_queue_changed, brain);
51 	replicator_brain_fill(brain);
52 	return brain;
53 }
54 
replicator_brain_deinit(struct replicator_brain ** _brain)55 void replicator_brain_deinit(struct replicator_brain **_brain)
56 {
57 	struct replicator_brain *brain = *_brain;
58 	struct dsync_client *conn;
59 
60 	*_brain = NULL;
61 
62 	brain->deinitializing = TRUE;
63 	array_foreach_elem(&brain->dsync_clients, conn)
64 		dsync_client_deinit(&conn);
65 	timeout_remove(&brain->to);
66 	pool_unref(&brain->pool);
67 }
68 
69 struct replicator_queue *
replicator_brain_get_queue(struct replicator_brain * brain)70 replicator_brain_get_queue(struct replicator_brain *brain)
71 {
72 	return brain->queue;
73 }
74 
75 const struct replicator_settings *
replicator_brain_get_settings(struct replicator_brain * brain)76 replicator_brain_get_settings(struct replicator_brain *brain)
77 {
78 	return brain->set;
79 }
80 
ARRAY_TYPE(dsync_client)81 const ARRAY_TYPE(dsync_client) *
82 replicator_brain_get_dsync_clients(struct replicator_brain *brain)
83 {
84 	return &brain->dsync_clients;
85 }
86 
87 static struct dsync_client *
get_dsync_client(struct replicator_brain * brain)88 get_dsync_client(struct replicator_brain *brain)
89 {
90 	struct dsync_client *conn;
91 
92 	array_foreach_elem(&brain->dsync_clients, conn) {
93 		if (!dsync_client_is_busy(conn))
94 			return conn;
95 	}
96 	if (array_count(&brain->dsync_clients) ==
97 	    brain->set->replication_max_conns)
98 		return NULL;
99 
100 	conn = dsync_client_init(brain->set->doveadm_socket_path,
101 				 brain->set->replication_dsync_parameters);
102 	array_push_back(&brain->dsync_clients, &conn);
103 	return conn;
104 }
105 
dsync_callback(enum dsync_reply reply,const char * state,void * context)106 static void dsync_callback(enum dsync_reply reply, const char *state,
107 			   void *context)
108 {
109 	struct replicator_sync_context *ctx = context;
110 	struct replicator_user *user = ctx->user;
111 
112 	if (!replicator_user_unref(&user)) {
113 		/* user was already removed */
114 	} else if (reply == DSYNC_REPLY_NOUSER ||
115 		   reply == DSYNC_REPLY_NOREPLICATE) {
116 		/* user no longer exists, or is not wanted for replication,
117 		   remove from replication */
118 		replicator_queue_remove(ctx->brain->queue, &ctx->user);
119 	} else {
120 		i_free(ctx->user->state);
121 		ctx->user->state = i_strdup_empty(state);
122 		ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK;
123 		if (reply == DSYNC_REPLY_OK)
124 			ctx->user->last_successful_sync = ioloop_time;
125 		replicator_queue_push(ctx->brain->queue, ctx->user);
126 	}
127 	if (!ctx->brain->deinitializing)
128 		replicator_brain_fill(ctx->brain);
129 	i_free(ctx);
130 }
131 
132 static bool
dsync_replicate(struct replicator_brain * brain,struct replicator_user * user)133 dsync_replicate(struct replicator_brain *brain, struct replicator_user *user)
134 {
135 	struct replicator_sync_context *ctx;
136 	struct dsync_client *conn;
137 	time_t next_full_sync;
138 	bool full;
139 
140 	conn = get_dsync_client(brain);
141 	if (conn == NULL)
142 		return FALSE;
143 
144 	next_full_sync = user->last_full_sync +
145 		brain->set->replication_full_sync_interval;
146 	full = next_full_sync <= ioloop_time;
147 	/* update the sync times immediately. if the replication fails we still
148 	   wouldn't want it to be retried immediately. */
149 	user->last_fast_sync = ioloop_time;
150 	if (full || user->force_full_sync) {
151 		user->last_full_sync = ioloop_time;
152 		user->force_full_sync = FALSE;
153 	}
154 	/* reset priority also. if more updates arrive during replication
155 	   we'll do another replication to make sure nothing gets lost */
156 	user->priority = REPLICATION_PRIORITY_NONE;
157 
158 	ctx = i_new(struct replicator_sync_context, 1);
159 	ctx->brain = brain;
160 	ctx->user = user;
161 	replicator_user_ref(user);
162 	dsync_client_sync(conn, user->username, user->state, full,
163 			  dsync_callback, ctx);
164 	return TRUE;
165 }
166 
replicator_brain_timeout(struct replicator_brain * brain)167 static void replicator_brain_timeout(struct replicator_brain *brain)
168 {
169 	timeout_remove(&brain->to);
170 	replicator_brain_fill(brain);
171 }
172 
replicator_brain_fill_next(struct replicator_brain * brain)173 static bool replicator_brain_fill_next(struct replicator_brain *brain)
174 {
175 	struct replicator_user *user;
176 	unsigned int next_secs;
177 
178 	user = replicator_queue_pop(brain->queue, &next_secs);
179 	if (user == NULL) {
180 		/* nothing more to do */
181 		timeout_remove(&brain->to);
182 		brain->to = timeout_add(next_secs * 1000,
183 					replicator_brain_timeout, brain);
184 		return FALSE;
185 	}
186 
187 	if (!dsync_replicate(brain, user)) {
188 		/* all connections were full, put the user back to queue */
189 		replicator_queue_push(brain->queue, user);
190 		return FALSE;
191 	}
192 	/* replication started for the user */
193 	return TRUE;
194 }
195 
replicator_brain_fill(struct replicator_brain * brain)196 static void replicator_brain_fill(struct replicator_brain *brain)
197 {
198 	while (replicator_brain_fill_next(brain)) ;
199 }
200