1 /* Copyright (c) 2009-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "net.h"
6 #include "istream.h"
7 #include "ostream.h"
8 #include "array.h"
9 #include "aqueue.h"
10 #include "anvil-client.h"
11 
12 struct anvil_query {
13 	anvil_callback_t *callback;
14 	void *context;
15 };
16 
17 struct anvil_client {
18 	char *path;
19 	int fd;
20 	struct istream *input;
21 	struct ostream *output;
22 	struct io *io;
23 	struct timeout *to_query;
24 
25 	struct timeout *to_reconnect;
26 	time_t last_reconnect;
27 
28 	ARRAY(struct anvil_query *) queries_arr;
29 	struct aqueue *queries;
30 
31 	bool (*reconnect_callback)(void);
32 	enum anvil_client_flags flags;
33 };
34 
35 #define ANVIL_HANDSHAKE "VERSION\tanvil\t1\t0\n"
36 #define ANVIL_INBUF_SIZE 1024
37 #define ANVIL_RECONNECT_MIN_SECS 5
38 #define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)
39 
40 static void anvil_client_disconnect(struct anvil_client *client);
41 
42 struct anvil_client *
anvil_client_init(const char * path,bool (* reconnect_callback)(void),enum anvil_client_flags flags)43 anvil_client_init(const char *path, bool (*reconnect_callback)(void),
44 		  enum anvil_client_flags flags)
45 {
46 	struct anvil_client *client;
47 
48 	client = i_new(struct anvil_client, 1);
49 	client->path = i_strdup(path);
50 	client->reconnect_callback = reconnect_callback;
51 	client->flags = flags;
52 	client->fd = -1;
53 	i_array_init(&client->queries_arr, 32);
54 	client->queries = aqueue_init(&client->queries_arr.arr);
55 	return client;
56 }
57 
anvil_client_deinit(struct anvil_client ** _client)58 void anvil_client_deinit(struct anvil_client **_client)
59 {
60 	struct anvil_client *client = *_client;
61 
62 	*_client = NULL;
63 
64 	anvil_client_disconnect(client);
65 	array_free(&client->queries_arr);
66 	aqueue_deinit(&client->queries);
67 	i_free(client->path);
68 	i_assert(client->to_reconnect == NULL);
69 	i_free(client);
70 }
71 
anvil_reconnect(struct anvil_client * client)72 static void anvil_reconnect(struct anvil_client *client)
73 {
74 	anvil_client_disconnect(client);
75 	if (client->reconnect_callback != NULL) {
76 		if (!client->reconnect_callback()) {
77 			/* no reconnection */
78 			return;
79 		}
80 	}
81 
82 	if (ioloop_time - client->last_reconnect < ANVIL_RECONNECT_MIN_SECS) {
83 		if (client->to_reconnect == NULL) {
84 			client->to_reconnect =
85 				timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
86 					    anvil_reconnect, client);
87 		}
88 	} else {
89 		client->last_reconnect = ioloop_time;
90 		(void)anvil_client_connect(client, FALSE);
91 	}
92 }
93 
anvil_input(struct anvil_client * client)94 static void anvil_input(struct anvil_client *client)
95 {
96 	struct anvil_query *const *queries;
97 	struct anvil_query *query;
98 	const char *line;
99 	unsigned int count;
100 
101 	queries = array_get(&client->queries_arr, &count);
102 	while ((line = i_stream_read_next_line(client->input)) != NULL) {
103 		if (aqueue_count(client->queries) == 0) {
104 			i_error("anvil: Unexpected input: %s", line);
105 			continue;
106 		}
107 
108 		query = queries[aqueue_idx(client->queries, 0)];
109 		if (query->callback != NULL) T_BEGIN {
110 			query->callback(line, query->context);
111 		} T_END;
112 		i_free(query);
113 		aqueue_delete_tail(client->queries);
114 	}
115 	if (client->input->stream_errno != 0) {
116 		i_error("read(%s) failed: %s", client->path,
117 			i_stream_get_error(client->input));
118 		anvil_reconnect(client);
119 	} else if (client->input->eof) {
120 		i_error("read(%s) failed: EOF", client->path);
121 		anvil_reconnect(client);
122 	} else if (client->to_query != NULL) {
123 		if (aqueue_count(client->queries) == 0)
124 			timeout_remove(&client->to_query);
125 		else
126 			timeout_reset(client->to_query);
127 	}
128 }
129 
anvil_client_connect(struct anvil_client * client,bool retry)130 int anvil_client_connect(struct anvil_client *client, bool retry)
131 {
132 	int fd;
133 
134 	i_assert(client->fd == -1);
135 
136 	fd = retry ? net_connect_unix_with_retries(client->path, 5000) :
137 		net_connect_unix(client->path);
138 	if (fd == -1) {
139 		if (errno != ENOENT ||
140 		    (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
141 			i_error("net_connect_unix(%s) failed: %m",
142 				client->path);
143 		}
144 		return -1;
145 	}
146 
147 	timeout_remove(&client->to_reconnect);
148 
149 	client->fd = fd;
150 	client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
151 	client->output = o_stream_create_fd(fd, SIZE_MAX);
152 	client->io = io_add(fd, IO_READ, anvil_input, client);
153 	if (o_stream_send_str(client->output, ANVIL_HANDSHAKE) < 0) {
154 		i_error("write(%s) failed: %s", client->path,
155 			o_stream_get_error(client->output));
156 		anvil_reconnect(client);
157 		return -1;
158 	}
159 	return 0;
160 }
161 
anvil_client_cancel_queries(struct anvil_client * client)162 static void anvil_client_cancel_queries(struct anvil_client *client)
163 {
164 	struct anvil_query *const *queries, *query;
165 	unsigned int count;
166 
167 	queries = array_get(&client->queries_arr, &count);
168 	while (aqueue_count(client->queries) > 0) {
169 		query = queries[aqueue_idx(client->queries, 0)];
170 		if (query->callback != NULL)
171 			query->callback(NULL, query->context);
172 		i_free(query);
173 		aqueue_delete_tail(client->queries);
174 	}
175 	timeout_remove(&client->to_query);
176 }
177 
anvil_client_disconnect(struct anvil_client * client)178 static void anvil_client_disconnect(struct anvil_client *client)
179 {
180 	anvil_client_cancel_queries(client);
181 	if (client->fd != -1) {
182 		io_remove(&client->io);
183 		i_stream_destroy(&client->input);
184 		o_stream_destroy(&client->output);
185 		net_disconnect(client->fd);
186 		client->fd = -1;
187 	}
188 	timeout_remove(&client->to_reconnect);
189 }
190 
anvil_client_timeout(struct anvil_client * client)191 static void anvil_client_timeout(struct anvil_client *client)
192 {
193 	i_assert(aqueue_count(client->queries) > 0);
194 
195 	i_error("%s: Anvil queries timed out after %u secs - aborting queries",
196 		client->path, ANVIL_QUERY_TIMEOUT_MSECS/1000);
197 	/* perhaps reconnect helps */
198 	anvil_reconnect(client);
199 }
200 
anvil_client_send(struct anvil_client * client,const char * cmd)201 static int anvil_client_send(struct anvil_client *client, const char *cmd)
202 {
203 	struct const_iovec iov[2];
204 
205 	if (client->fd == -1) {
206 		if (anvil_client_connect(client, FALSE) < 0)
207 			return -1;
208 	}
209 
210 	iov[0].iov_base = cmd;
211 	iov[0].iov_len = strlen(cmd);
212 	iov[1].iov_base = "\n";
213 	iov[1].iov_len = 1;
214 	if (o_stream_sendv(client->output, iov, 2) < 0) {
215 		i_error("write(%s) failed: %s", client->path,
216 			o_stream_get_error(client->output));
217 		anvil_reconnect(client);
218 		return -1;
219 	}
220 	return 0;
221 }
222 
223 struct anvil_query *
anvil_client_query(struct anvil_client * client,const char * query,anvil_callback_t * callback,void * context)224 anvil_client_query(struct anvil_client *client, const char *query,
225 		   anvil_callback_t *callback, void *context)
226 {
227 	struct anvil_query *anvil_query;
228 
229 	anvil_query = i_new(struct anvil_query, 1);
230 	anvil_query->callback = callback;
231 	anvil_query->context = context;
232 	aqueue_append(client->queries, &anvil_query);
233 	if (anvil_client_send(client, query) < 0) {
234 		/* connection failure. add a delayed failure callback.
235 		   the caller may not expect the callback to be called
236 		   immediately. */
237 		timeout_remove(&client->to_query);
238 		client->to_query =
239 			timeout_add_short(0, anvil_client_cancel_queries, client);
240 	} else if (client->to_query == NULL) {
241 		client->to_query = timeout_add(ANVIL_QUERY_TIMEOUT_MSECS,
242 					       anvil_client_timeout, client);
243 	}
244 	return anvil_query;
245 }
246 
anvil_client_query_abort(struct anvil_client * client,struct anvil_query ** _query)247 void anvil_client_query_abort(struct anvil_client *client,
248 			      struct anvil_query **_query)
249 {
250 	struct anvil_query *query = *_query;
251 	struct anvil_query *const *queries;
252 	unsigned int i, count;
253 
254 	*_query = NULL;
255 
256 	count = aqueue_count(client->queries);
257 	queries = array_front(&client->queries_arr);
258 	for (i = 0; i < count; i++) {
259 		if (queries[aqueue_idx(client->queries, i)] == query) {
260 			query->callback = NULL;
261 			return;
262 		}
263 	}
264 	i_panic("anvil query to be aborted doesn't exist");
265 }
266 
anvil_client_cmd(struct anvil_client * client,const char * cmd)267 void anvil_client_cmd(struct anvil_client *client, const char *cmd)
268 {
269 	(void)anvil_client_send(client, cmd);
270 }
271 
anvil_client_is_connected(struct anvil_client * client)272 bool anvil_client_is_connected(struct anvil_client *client)
273 {
274 	return client->fd != -1;
275 }
276