1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "array.h"
5 #include "ioloop.h"
6 #include "dsync-client.h"
7 #include "replicator-settings.h"
8 #include "replicator-queue.h"
9 #include "replicator-brain.h"
10
11 struct replicator_sync_context {
12 struct replicator_brain *brain;
13 struct replicator_user *user;
14 };
15
16 struct replicator_brain {
17 pool_t pool;
18 struct replicator_queue *queue;
19 const struct replicator_settings *set;
20 struct timeout *to;
21
22 ARRAY_TYPE(dsync_client) dsync_clients;
23
24 bool deinitializing:1;
25 };
26
27 static void replicator_brain_fill(struct replicator_brain *brain);
28
replicator_brain_queue_changed(void * context)29 static void replicator_brain_queue_changed(void *context)
30 {
31 struct replicator_brain *brain = context;
32
33 replicator_brain_fill(brain);
34 }
35
36 struct replicator_brain *
replicator_brain_init(struct replicator_queue * queue,const struct replicator_settings * set)37 replicator_brain_init(struct replicator_queue *queue,
38 const struct replicator_settings *set)
39 {
40 struct replicator_brain *brain;
41 pool_t pool;
42
43 pool = pool_alloconly_create("replication brain", 1024);
44 brain = p_new(pool, struct replicator_brain, 1);
45 brain->pool = pool;
46 brain->queue = queue;
47 brain->set = set;
48 p_array_init(&brain->dsync_clients, pool, 16);
49 replicator_queue_set_change_callback(queue,
50 replicator_brain_queue_changed, brain);
51 replicator_brain_fill(brain);
52 return brain;
53 }
54
replicator_brain_deinit(struct replicator_brain ** _brain)55 void replicator_brain_deinit(struct replicator_brain **_brain)
56 {
57 struct replicator_brain *brain = *_brain;
58 struct dsync_client *conn;
59
60 *_brain = NULL;
61
62 brain->deinitializing = TRUE;
63 array_foreach_elem(&brain->dsync_clients, conn)
64 dsync_client_deinit(&conn);
65 timeout_remove(&brain->to);
66 pool_unref(&brain->pool);
67 }
68
69 struct replicator_queue *
replicator_brain_get_queue(struct replicator_brain * brain)70 replicator_brain_get_queue(struct replicator_brain *brain)
71 {
72 return brain->queue;
73 }
74
75 const struct replicator_settings *
replicator_brain_get_settings(struct replicator_brain * brain)76 replicator_brain_get_settings(struct replicator_brain *brain)
77 {
78 return brain->set;
79 }
80
ARRAY_TYPE(dsync_client)81 const ARRAY_TYPE(dsync_client) *
82 replicator_brain_get_dsync_clients(struct replicator_brain *brain)
83 {
84 return &brain->dsync_clients;
85 }
86
87 static struct dsync_client *
get_dsync_client(struct replicator_brain * brain)88 get_dsync_client(struct replicator_brain *brain)
89 {
90 struct dsync_client *conn;
91
92 array_foreach_elem(&brain->dsync_clients, conn) {
93 if (!dsync_client_is_busy(conn))
94 return conn;
95 }
96 if (array_count(&brain->dsync_clients) ==
97 brain->set->replication_max_conns)
98 return NULL;
99
100 conn = dsync_client_init(brain->set->doveadm_socket_path,
101 brain->set->replication_dsync_parameters);
102 array_push_back(&brain->dsync_clients, &conn);
103 return conn;
104 }
105
dsync_callback(enum dsync_reply reply,const char * state,void * context)106 static void dsync_callback(enum dsync_reply reply, const char *state,
107 void *context)
108 {
109 struct replicator_sync_context *ctx = context;
110 struct replicator_user *user = ctx->user;
111
112 if (!replicator_user_unref(&user)) {
113 /* user was already removed */
114 } else if (reply == DSYNC_REPLY_NOUSER ||
115 reply == DSYNC_REPLY_NOREPLICATE) {
116 /* user no longer exists, or is not wanted for replication,
117 remove from replication */
118 replicator_queue_remove(ctx->brain->queue, &ctx->user);
119 } else {
120 i_free(ctx->user->state);
121 ctx->user->state = i_strdup_empty(state);
122 ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK;
123 if (reply == DSYNC_REPLY_OK)
124 ctx->user->last_successful_sync = ioloop_time;
125 replicator_queue_push(ctx->brain->queue, ctx->user);
126 }
127 if (!ctx->brain->deinitializing)
128 replicator_brain_fill(ctx->brain);
129 i_free(ctx);
130 }
131
132 static bool
dsync_replicate(struct replicator_brain * brain,struct replicator_user * user)133 dsync_replicate(struct replicator_brain *brain, struct replicator_user *user)
134 {
135 struct replicator_sync_context *ctx;
136 struct dsync_client *conn;
137 time_t next_full_sync;
138 bool full;
139
140 conn = get_dsync_client(brain);
141 if (conn == NULL)
142 return FALSE;
143
144 next_full_sync = user->last_full_sync +
145 brain->set->replication_full_sync_interval;
146 full = next_full_sync <= ioloop_time;
147 /* update the sync times immediately. if the replication fails we still
148 wouldn't want it to be retried immediately. */
149 user->last_fast_sync = ioloop_time;
150 if (full || user->force_full_sync) {
151 user->last_full_sync = ioloop_time;
152 user->force_full_sync = FALSE;
153 }
154 /* reset priority also. if more updates arrive during replication
155 we'll do another replication to make sure nothing gets lost */
156 user->priority = REPLICATION_PRIORITY_NONE;
157
158 ctx = i_new(struct replicator_sync_context, 1);
159 ctx->brain = brain;
160 ctx->user = user;
161 replicator_user_ref(user);
162 dsync_client_sync(conn, user->username, user->state, full,
163 dsync_callback, ctx);
164 return TRUE;
165 }
166
replicator_brain_timeout(struct replicator_brain * brain)167 static void replicator_brain_timeout(struct replicator_brain *brain)
168 {
169 timeout_remove(&brain->to);
170 replicator_brain_fill(brain);
171 }
172
replicator_brain_fill_next(struct replicator_brain * brain)173 static bool replicator_brain_fill_next(struct replicator_brain *brain)
174 {
175 struct replicator_user *user;
176 unsigned int next_secs;
177
178 user = replicator_queue_pop(brain->queue, &next_secs);
179 if (user == NULL) {
180 /* nothing more to do */
181 timeout_remove(&brain->to);
182 brain->to = timeout_add(next_secs * 1000,
183 replicator_brain_timeout, brain);
184 return FALSE;
185 }
186
187 if (!dsync_replicate(brain, user)) {
188 /* all connections were full, put the user back to queue */
189 replicator_queue_push(brain->queue, user);
190 return FALSE;
191 }
192 /* replication started for the user */
193 return TRUE;
194 }
195
replicator_brain_fill(struct replicator_brain * brain)196 static void replicator_brain_fill(struct replicator_brain *brain)
197 {
198 while (replicator_brain_fill_next(brain)) ;
199 }
200