1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "llist.h"
5 #include "istream.h"
6 #include "ostream.h"
7 #include "strescape.h"
8 #include "master-service.h"
9 #include "replicator-queue.h"
10 #include "notify-connection.h"
11 
12 #include <unistd.h>
13 
14 #define MAX_INBUF_SIZE (1024*64)
15 #define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1
16 #define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0
17 
18 struct notify_connection {
19 	struct notify_connection *prev, *next;
20 	int refcount;
21 
22 	int fd;
23 	struct io *io;
24 	struct istream *input;
25 	struct ostream *output;
26 
27 	struct replicator_queue *queue;
28 
29 	bool version_received:1;
30 	bool destroyed:1;
31 };
32 
33 struct notify_sync_request {
34 	struct notify_connection *conn;
35 	unsigned int id;
36 };
37 
38 static struct notify_connection *connections;
39 
40 static void notify_connection_destroy(struct notify_connection *conn);
41 
notify_sync_callback(bool success,void * context)42 static void notify_sync_callback(bool success, void *context)
43 {
44 	struct notify_sync_request *request = context;
45 
46 	o_stream_nsend_str(request->conn->output, t_strdup_printf(
47 		"%c\t%u\n", success ? '+' : '-', request->id));
48 
49 	notify_connection_unref(&request->conn);
50 	i_free(request);
51 }
52 
53 static int
notify_connection_input_line(struct notify_connection * conn,const char * line)54 notify_connection_input_line(struct notify_connection *conn, const char *line)
55 {
56 	struct notify_sync_request *request;
57 	const char *const *args;
58 	enum replication_priority priority;
59 	unsigned int id;
60 
61 	/* U \t <username> \t <priority> [\t <sync id>] */
62 	args = t_strsplit_tabescaped(line);
63 	if (str_array_length(args) < 2) {
64 		i_error("notify client sent invalid input: %s", line);
65 		return -1;
66 	}
67 	if (strcmp(args[0], "U") != 0) {
68 		i_error("notify client sent unknown command: %s", args[0]);
69 		return -1;
70 	}
71 	if (replication_priority_parse(args[2], &priority) < 0) {
72 		i_error("notify client sent invalid priority: %s", args[2]);
73 		return -1;
74 	}
75 	if (priority != REPLICATION_PRIORITY_SYNC)
76 		(void)replicator_queue_add(conn->queue, args[1], priority);
77 	else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
78 		i_error("notify client sent invalid sync id: %s", line);
79 		return -1;
80 	} else {
81 		request = i_new(struct notify_sync_request, 1);
82 		request->conn = conn;
83 		request->id = id;
84 		notify_connection_ref(conn);
85 		replicator_queue_add_sync(conn->queue, args[1],
86 					  notify_sync_callback, request);
87 	}
88 	return 0;
89 }
90 
notify_connection_input(struct notify_connection * conn)91 static void notify_connection_input(struct notify_connection *conn)
92 {
93 	const char *line;
94 	int ret;
95 
96 	switch (i_stream_read(conn->input)) {
97 	case -2:
98 		i_error("BUG: Client connection sent too much data");
99 		notify_connection_destroy(conn);
100 		return;
101 	case -1:
102 		notify_connection_destroy(conn);
103 		return;
104 	}
105 
106 	if (!conn->version_received) {
107 		if ((line = i_stream_next_line(conn->input)) == NULL)
108 			return;
109 
110 		if (!version_string_verify(line, "replicator-notify",
111 				NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) {
112 			i_error("Notify client not compatible with this server "
113 				"(mixed old and new binaries?)");
114 			notify_connection_destroy(conn);
115 			return;
116 		}
117 		conn->version_received = TRUE;
118 	}
119 
120 	while ((line = i_stream_next_line(conn->input)) != NULL) {
121 		T_BEGIN {
122 			ret = notify_connection_input_line(conn, line);
123 		} T_END;
124 		if (ret < 0) {
125 			notify_connection_destroy(conn);
126 			break;
127 		}
128 	}
129 }
130 
131 struct notify_connection *
notify_connection_create(int fd,struct replicator_queue * queue)132 notify_connection_create(int fd, struct replicator_queue *queue)
133 {
134 	struct notify_connection *conn;
135 
136 	i_assert(fd >= 0);
137 
138 	conn = i_new(struct notify_connection, 1);
139 	conn->refcount = 1;
140 	conn->queue = queue;
141 	conn->fd = fd;
142 	conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE);
143 	conn->output = o_stream_create_fd(fd, SIZE_MAX);
144 	o_stream_set_no_error_handling(conn->output, TRUE);
145 	conn->io = io_add(fd, IO_READ, notify_connection_input, conn);
146 	conn->queue = queue;
147 
148 	DLLIST_PREPEND(&connections, conn);
149 	return conn;
150 }
151 
notify_connection_destroy(struct notify_connection * conn)152 static void notify_connection_destroy(struct notify_connection *conn)
153 {
154 	if (conn->destroyed)
155 		return;
156 	conn->destroyed = TRUE;
157 
158 	DLLIST_REMOVE(&connections, conn);
159 
160 	io_remove(&conn->io);
161 	i_stream_close(conn->input);
162 	o_stream_close(conn->output);
163 	if (close(conn->fd) < 0)
164 		i_error("close(notify connection) failed: %m");
165 	conn->fd = -1;
166 
167 	notify_connection_unref(&conn);
168 	master_service_client_connection_destroyed(master_service);
169 }
170 
notify_connection_ref(struct notify_connection * conn)171 void notify_connection_ref(struct notify_connection *conn)
172 {
173 	i_assert(conn->refcount > 0);
174 
175 	conn->refcount++;
176 }
177 
notify_connection_unref(struct notify_connection ** _conn)178 void notify_connection_unref(struct notify_connection **_conn)
179 {
180 	struct notify_connection *conn = *_conn;
181 
182 	i_assert(conn->refcount > 0);
183 
184 	*_conn = NULL;
185 	if (--conn->refcount > 0)
186 		return;
187 
188 	notify_connection_destroy(conn);
189 	i_stream_unref(&conn->input);
190 	o_stream_unref(&conn->output);
191 	i_free(conn);
192 }
193 
notify_connections_destroy_all(void)194 void notify_connections_destroy_all(void)
195 {
196 	while (connections != NULL)
197 		notify_connection_destroy(connections);
198 }
199