1 /* Copyright (c) 2009-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "net.h"
6 #include "istream.h"
7 #include "ostream.h"
8 #include "array.h"
9 #include "aqueue.h"
10 #include "anvil-client.h"
11
12 struct anvil_query {
13 anvil_callback_t *callback;
14 void *context;
15 };
16
17 struct anvil_client {
18 char *path;
19 int fd;
20 struct istream *input;
21 struct ostream *output;
22 struct io *io;
23 struct timeout *to_query;
24
25 struct timeout *to_reconnect;
26 time_t last_reconnect;
27
28 ARRAY(struct anvil_query *) queries_arr;
29 struct aqueue *queries;
30
31 bool (*reconnect_callback)(void);
32 enum anvil_client_flags flags;
33 };
34
35 #define ANVIL_HANDSHAKE "VERSION\tanvil\t1\t0\n"
36 #define ANVIL_INBUF_SIZE 1024
37 #define ANVIL_RECONNECT_MIN_SECS 5
38 #define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)
39
40 static void anvil_client_disconnect(struct anvil_client *client);
41
42 struct anvil_client *
anvil_client_init(const char * path,bool (* reconnect_callback)(void),enum anvil_client_flags flags)43 anvil_client_init(const char *path, bool (*reconnect_callback)(void),
44 enum anvil_client_flags flags)
45 {
46 struct anvil_client *client;
47
48 client = i_new(struct anvil_client, 1);
49 client->path = i_strdup(path);
50 client->reconnect_callback = reconnect_callback;
51 client->flags = flags;
52 client->fd = -1;
53 i_array_init(&client->queries_arr, 32);
54 client->queries = aqueue_init(&client->queries_arr.arr);
55 return client;
56 }
57
anvil_client_deinit(struct anvil_client ** _client)58 void anvil_client_deinit(struct anvil_client **_client)
59 {
60 struct anvil_client *client = *_client;
61
62 *_client = NULL;
63
64 anvil_client_disconnect(client);
65 array_free(&client->queries_arr);
66 aqueue_deinit(&client->queries);
67 i_free(client->path);
68 i_assert(client->to_reconnect == NULL);
69 i_free(client);
70 }
71
anvil_reconnect(struct anvil_client * client)72 static void anvil_reconnect(struct anvil_client *client)
73 {
74 anvil_client_disconnect(client);
75 if (client->reconnect_callback != NULL) {
76 if (!client->reconnect_callback()) {
77 /* no reconnection */
78 return;
79 }
80 }
81
82 if (ioloop_time - client->last_reconnect < ANVIL_RECONNECT_MIN_SECS) {
83 if (client->to_reconnect == NULL) {
84 client->to_reconnect =
85 timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
86 anvil_reconnect, client);
87 }
88 } else {
89 client->last_reconnect = ioloop_time;
90 (void)anvil_client_connect(client, FALSE);
91 }
92 }
93
anvil_input(struct anvil_client * client)94 static void anvil_input(struct anvil_client *client)
95 {
96 struct anvil_query *const *queries;
97 struct anvil_query *query;
98 const char *line;
99 unsigned int count;
100
101 queries = array_get(&client->queries_arr, &count);
102 while ((line = i_stream_read_next_line(client->input)) != NULL) {
103 if (aqueue_count(client->queries) == 0) {
104 i_error("anvil: Unexpected input: %s", line);
105 continue;
106 }
107
108 query = queries[aqueue_idx(client->queries, 0)];
109 if (query->callback != NULL) T_BEGIN {
110 query->callback(line, query->context);
111 } T_END;
112 i_free(query);
113 aqueue_delete_tail(client->queries);
114 }
115 if (client->input->stream_errno != 0) {
116 i_error("read(%s) failed: %s", client->path,
117 i_stream_get_error(client->input));
118 anvil_reconnect(client);
119 } else if (client->input->eof) {
120 i_error("read(%s) failed: EOF", client->path);
121 anvil_reconnect(client);
122 } else if (client->to_query != NULL) {
123 if (aqueue_count(client->queries) == 0)
124 timeout_remove(&client->to_query);
125 else
126 timeout_reset(client->to_query);
127 }
128 }
129
anvil_client_connect(struct anvil_client * client,bool retry)130 int anvil_client_connect(struct anvil_client *client, bool retry)
131 {
132 int fd;
133
134 i_assert(client->fd == -1);
135
136 fd = retry ? net_connect_unix_with_retries(client->path, 5000) :
137 net_connect_unix(client->path);
138 if (fd == -1) {
139 if (errno != ENOENT ||
140 (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
141 i_error("net_connect_unix(%s) failed: %m",
142 client->path);
143 }
144 return -1;
145 }
146
147 timeout_remove(&client->to_reconnect);
148
149 client->fd = fd;
150 client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
151 client->output = o_stream_create_fd(fd, SIZE_MAX);
152 client->io = io_add(fd, IO_READ, anvil_input, client);
153 if (o_stream_send_str(client->output, ANVIL_HANDSHAKE) < 0) {
154 i_error("write(%s) failed: %s", client->path,
155 o_stream_get_error(client->output));
156 anvil_reconnect(client);
157 return -1;
158 }
159 return 0;
160 }
161
anvil_client_cancel_queries(struct anvil_client * client)162 static void anvil_client_cancel_queries(struct anvil_client *client)
163 {
164 struct anvil_query *const *queries, *query;
165 unsigned int count;
166
167 queries = array_get(&client->queries_arr, &count);
168 while (aqueue_count(client->queries) > 0) {
169 query = queries[aqueue_idx(client->queries, 0)];
170 if (query->callback != NULL)
171 query->callback(NULL, query->context);
172 i_free(query);
173 aqueue_delete_tail(client->queries);
174 }
175 timeout_remove(&client->to_query);
176 }
177
anvil_client_disconnect(struct anvil_client * client)178 static void anvil_client_disconnect(struct anvil_client *client)
179 {
180 anvil_client_cancel_queries(client);
181 if (client->fd != -1) {
182 io_remove(&client->io);
183 i_stream_destroy(&client->input);
184 o_stream_destroy(&client->output);
185 net_disconnect(client->fd);
186 client->fd = -1;
187 }
188 timeout_remove(&client->to_reconnect);
189 }
190
anvil_client_timeout(struct anvil_client * client)191 static void anvil_client_timeout(struct anvil_client *client)
192 {
193 i_assert(aqueue_count(client->queries) > 0);
194
195 i_error("%s: Anvil queries timed out after %u secs - aborting queries",
196 client->path, ANVIL_QUERY_TIMEOUT_MSECS/1000);
197 /* perhaps reconnect helps */
198 anvil_reconnect(client);
199 }
200
anvil_client_send(struct anvil_client * client,const char * cmd)201 static int anvil_client_send(struct anvil_client *client, const char *cmd)
202 {
203 struct const_iovec iov[2];
204
205 if (client->fd == -1) {
206 if (anvil_client_connect(client, FALSE) < 0)
207 return -1;
208 }
209
210 iov[0].iov_base = cmd;
211 iov[0].iov_len = strlen(cmd);
212 iov[1].iov_base = "\n";
213 iov[1].iov_len = 1;
214 if (o_stream_sendv(client->output, iov, 2) < 0) {
215 i_error("write(%s) failed: %s", client->path,
216 o_stream_get_error(client->output));
217 anvil_reconnect(client);
218 return -1;
219 }
220 return 0;
221 }
222
223 struct anvil_query *
anvil_client_query(struct anvil_client * client,const char * query,anvil_callback_t * callback,void * context)224 anvil_client_query(struct anvil_client *client, const char *query,
225 anvil_callback_t *callback, void *context)
226 {
227 struct anvil_query *anvil_query;
228
229 anvil_query = i_new(struct anvil_query, 1);
230 anvil_query->callback = callback;
231 anvil_query->context = context;
232 aqueue_append(client->queries, &anvil_query);
233 if (anvil_client_send(client, query) < 0) {
234 /* connection failure. add a delayed failure callback.
235 the caller may not expect the callback to be called
236 immediately. */
237 timeout_remove(&client->to_query);
238 client->to_query =
239 timeout_add_short(0, anvil_client_cancel_queries, client);
240 } else if (client->to_query == NULL) {
241 client->to_query = timeout_add(ANVIL_QUERY_TIMEOUT_MSECS,
242 anvil_client_timeout, client);
243 }
244 return anvil_query;
245 }
246
anvil_client_query_abort(struct anvil_client * client,struct anvil_query ** _query)247 void anvil_client_query_abort(struct anvil_client *client,
248 struct anvil_query **_query)
249 {
250 struct anvil_query *query = *_query;
251 struct anvil_query *const *queries;
252 unsigned int i, count;
253
254 *_query = NULL;
255
256 count = aqueue_count(client->queries);
257 queries = array_front(&client->queries_arr);
258 for (i = 0; i < count; i++) {
259 if (queries[aqueue_idx(client->queries, i)] == query) {
260 query->callback = NULL;
261 return;
262 }
263 }
264 i_panic("anvil query to be aborted doesn't exist");
265 }
266
anvil_client_cmd(struct anvil_client * client,const char * cmd)267 void anvil_client_cmd(struct anvil_client *client, const char *cmd)
268 {
269 (void)anvil_client_send(client, cmd);
270 }
271
anvil_client_is_connected(struct anvil_client * client)272 bool anvil_client_is_connected(struct anvil_client *client)
273 {
274 return client->fd != -1;
275 }
276