1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "llist.h"
5 #include "istream.h"
6 #include "ostream.h"
7 #include "strescape.h"
8 #include "master-service.h"
9 #include "replicator-queue.h"
10 #include "notify-connection.h"
11
12 #include <unistd.h>
13
14 #define MAX_INBUF_SIZE (1024*64)
15 #define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1
16 #define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0
17
18 struct notify_connection {
19 struct notify_connection *prev, *next;
20 int refcount;
21
22 int fd;
23 struct io *io;
24 struct istream *input;
25 struct ostream *output;
26
27 struct replicator_queue *queue;
28
29 bool version_received:1;
30 bool destroyed:1;
31 };
32
33 struct notify_sync_request {
34 struct notify_connection *conn;
35 unsigned int id;
36 };
37
38 static struct notify_connection *connections;
39
40 static void notify_connection_destroy(struct notify_connection *conn);
41
notify_sync_callback(bool success,void * context)42 static void notify_sync_callback(bool success, void *context)
43 {
44 struct notify_sync_request *request = context;
45
46 o_stream_nsend_str(request->conn->output, t_strdup_printf(
47 "%c\t%u\n", success ? '+' : '-', request->id));
48
49 notify_connection_unref(&request->conn);
50 i_free(request);
51 }
52
53 static int
notify_connection_input_line(struct notify_connection * conn,const char * line)54 notify_connection_input_line(struct notify_connection *conn, const char *line)
55 {
56 struct notify_sync_request *request;
57 const char *const *args;
58 enum replication_priority priority;
59 unsigned int id;
60
61 /* U \t <username> \t <priority> [\t <sync id>] */
62 args = t_strsplit_tabescaped(line);
63 if (str_array_length(args) < 2) {
64 i_error("notify client sent invalid input: %s", line);
65 return -1;
66 }
67 if (strcmp(args[0], "U") != 0) {
68 i_error("notify client sent unknown command: %s", args[0]);
69 return -1;
70 }
71 if (replication_priority_parse(args[2], &priority) < 0) {
72 i_error("notify client sent invalid priority: %s", args[2]);
73 return -1;
74 }
75 if (priority != REPLICATION_PRIORITY_SYNC)
76 (void)replicator_queue_add(conn->queue, args[1], priority);
77 else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
78 i_error("notify client sent invalid sync id: %s", line);
79 return -1;
80 } else {
81 request = i_new(struct notify_sync_request, 1);
82 request->conn = conn;
83 request->id = id;
84 notify_connection_ref(conn);
85 replicator_queue_add_sync(conn->queue, args[1],
86 notify_sync_callback, request);
87 }
88 return 0;
89 }
90
notify_connection_input(struct notify_connection * conn)91 static void notify_connection_input(struct notify_connection *conn)
92 {
93 const char *line;
94 int ret;
95
96 switch (i_stream_read(conn->input)) {
97 case -2:
98 i_error("BUG: Client connection sent too much data");
99 notify_connection_destroy(conn);
100 return;
101 case -1:
102 notify_connection_destroy(conn);
103 return;
104 }
105
106 if (!conn->version_received) {
107 if ((line = i_stream_next_line(conn->input)) == NULL)
108 return;
109
110 if (!version_string_verify(line, "replicator-notify",
111 NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) {
112 i_error("Notify client not compatible with this server "
113 "(mixed old and new binaries?)");
114 notify_connection_destroy(conn);
115 return;
116 }
117 conn->version_received = TRUE;
118 }
119
120 while ((line = i_stream_next_line(conn->input)) != NULL) {
121 T_BEGIN {
122 ret = notify_connection_input_line(conn, line);
123 } T_END;
124 if (ret < 0) {
125 notify_connection_destroy(conn);
126 break;
127 }
128 }
129 }
130
131 struct notify_connection *
notify_connection_create(int fd,struct replicator_queue * queue)132 notify_connection_create(int fd, struct replicator_queue *queue)
133 {
134 struct notify_connection *conn;
135
136 i_assert(fd >= 0);
137
138 conn = i_new(struct notify_connection, 1);
139 conn->refcount = 1;
140 conn->queue = queue;
141 conn->fd = fd;
142 conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE);
143 conn->output = o_stream_create_fd(fd, SIZE_MAX);
144 o_stream_set_no_error_handling(conn->output, TRUE);
145 conn->io = io_add(fd, IO_READ, notify_connection_input, conn);
146 conn->queue = queue;
147
148 DLLIST_PREPEND(&connections, conn);
149 return conn;
150 }
151
notify_connection_destroy(struct notify_connection * conn)152 static void notify_connection_destroy(struct notify_connection *conn)
153 {
154 if (conn->destroyed)
155 return;
156 conn->destroyed = TRUE;
157
158 DLLIST_REMOVE(&connections, conn);
159
160 io_remove(&conn->io);
161 i_stream_close(conn->input);
162 o_stream_close(conn->output);
163 if (close(conn->fd) < 0)
164 i_error("close(notify connection) failed: %m");
165 conn->fd = -1;
166
167 notify_connection_unref(&conn);
168 master_service_client_connection_destroyed(master_service);
169 }
170
notify_connection_ref(struct notify_connection * conn)171 void notify_connection_ref(struct notify_connection *conn)
172 {
173 i_assert(conn->refcount > 0);
174
175 conn->refcount++;
176 }
177
notify_connection_unref(struct notify_connection ** _conn)178 void notify_connection_unref(struct notify_connection **_conn)
179 {
180 struct notify_connection *conn = *_conn;
181
182 i_assert(conn->refcount > 0);
183
184 *_conn = NULL;
185 if (--conn->refcount > 0)
186 return;
187
188 notify_connection_destroy(conn);
189 i_stream_unref(&conn->input);
190 o_stream_unref(&conn->output);
191 i_free(conn);
192 }
193
notify_connections_destroy_all(void)194 void notify_connections_destroy_all(void)
195 {
196 while (connections != NULL)
197 notify_connection_destroy(connections);
198 }
199