1 /*
2 * Copyright (c) 2015-2021 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/types.h>
18 #include <sys/socket.h>
19
20 #include <signal.h>
21
22 #include "kore.h"
23 #include "http.h"
24
25 struct msg_type {
26 u_int8_t id;
27 void (*cb)(struct kore_msg *, const void *);
28 TAILQ_ENTRY(msg_type) list;
29 };
30
31 static struct msg_type *msg_type_lookup(u_int8_t);
32 static int msg_recv_packet(struct netbuf *);
33 static int msg_recv_data(struct netbuf *);
34 static void msg_disconnected_parent(struct connection *);
35 static void msg_disconnected_worker(struct connection *);
36 static void msg_type_shutdown(struct kore_msg *, const void *);
37
38 #if !defined(KORE_NO_HTTP)
39 static void msg_type_websocket(struct kore_msg *, const void *);
40 #endif
41
42 static TAILQ_HEAD(, msg_type) msg_types;
43 static size_t cacheidx = 0;
44 static struct connection **conncache = NULL;
45
46 void
kore_msg_init(void)47 kore_msg_init(void)
48 {
49 TAILQ_INIT(&msg_types);
50 }
51
52 void
kore_msg_parent_init(void)53 kore_msg_parent_init(void)
54 {
55 u_int8_t idx;
56 struct kore_worker *kw;
57
58 for (idx = 0; idx < worker_count; idx++) {
59 if (keymgr_active == 0) {
60 if (idx == KORE_WORKER_KEYMGR_IDX ||
61 idx == KORE_WORKER_ACME_IDX)
62 continue;
63 }
64
65 #if !defined(KORE_USE_ACME)
66 if (idx == KORE_WORKER_ACME_IDX)
67 continue;
68 #endif
69
70 kw = kore_worker_data(idx);
71 kore_msg_parent_add(kw);
72 }
73
74 kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
75 }
76
77 void
kore_msg_parent_add(struct kore_worker * kw)78 kore_msg_parent_add(struct kore_worker *kw)
79 {
80 kw->msg[0] = kore_connection_new(NULL);
81 kw->msg[0]->fd = kw->pipe[0];
82 kw->msg[0]->read = net_read;
83 kw->msg[0]->write = net_write;
84 kw->msg[0]->proto = CONN_PROTO_MSG;
85 kw->msg[0]->state = CONN_STATE_ESTABLISHED;
86 kw->msg[0]->hdlr_extra = &kw->id;
87 kw->msg[0]->disconnect = msg_disconnected_worker;
88 kw->msg[0]->handle = kore_connection_handle;
89
90 conncache = kore_realloc(conncache,
91 (cacheidx + 1) * sizeof(struct connection *));
92
93 conncache[cacheidx++] = kw->msg[0];
94
95 TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
96 kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
97
98 net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
99 }
100
101 void
kore_msg_parent_remove(struct kore_worker * kw)102 kore_msg_parent_remove(struct kore_worker *kw)
103 {
104 kore_connection_disconnect(kw->msg[0]);
105 kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
106 (void)close(kw->pipe[1]);
107 }
108
109 void
kore_msg_worker_init(void)110 kore_msg_worker_init(void)
111 {
112 #if !defined(KORE_NO_HTTP)
113 kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
114 #endif
115
116 worker->msg[1] = kore_connection_new(NULL);
117 worker->msg[1]->fd = worker->pipe[1];
118 worker->msg[1]->read = net_read;
119 worker->msg[1]->write = net_write;
120 worker->msg[1]->proto = CONN_PROTO_MSG;
121 worker->msg[1]->state = CONN_STATE_ESTABLISHED;
122 worker->msg[1]->disconnect = msg_disconnected_parent;
123 worker->msg[1]->handle = kore_connection_handle;
124 worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
125
126 TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
127 kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
128
129 net_recv_queue(worker->msg[1],
130 sizeof(struct kore_msg), 0, msg_recv_packet);
131 }
132
133 void
kore_msg_unregister(u_int8_t id)134 kore_msg_unregister(u_int8_t id)
135 {
136 struct msg_type *type;
137
138 if ((type = msg_type_lookup(id)) == NULL)
139 return;
140
141 TAILQ_REMOVE(&msg_types, type, list);
142 kore_free(type);
143 }
144
145 int
kore_msg_register(u_int8_t id,void (* cb)(struct kore_msg *,const void *))146 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
147 {
148 struct msg_type *type;
149
150 if ((type = msg_type_lookup(id)) != NULL)
151 return (KORE_RESULT_ERROR);
152
153 type = kore_malloc(sizeof(*type));
154 type->id = id;
155 type->cb = cb;
156 TAILQ_INSERT_TAIL(&msg_types, type, list);
157
158 return (KORE_RESULT_OK);
159 }
160
161 void
kore_msg_send(u_int16_t dst,u_int8_t id,const void * data,size_t len)162 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
163 {
164 struct kore_msg m;
165
166 m.id = id;
167 m.dst = dst;
168 m.length = len;
169 m.src = worker->id;
170
171 net_send_queue(worker->msg[1], &m, sizeof(m));
172 if (data != NULL && len > 0)
173 net_send_queue(worker->msg[1], data, len);
174
175 net_send_flush(worker->msg[1]);
176 }
177
178 static int
msg_recv_packet(struct netbuf * nb)179 msg_recv_packet(struct netbuf *nb)
180 {
181 struct kore_msg *msg = (struct kore_msg *)nb->buf;
182
183 if (msg->length > 0) {
184 net_recv_expand(nb->owner, msg->length, msg_recv_data);
185 return (KORE_RESULT_OK);
186 }
187
188 return (msg_recv_data(nb));
189 }
190
191 static int
msg_recv_data(struct netbuf * nb)192 msg_recv_data(struct netbuf *nb)
193 {
194 size_t i;
195 struct connection *c;
196 struct msg_type *type;
197 int deliver;
198 u_int16_t dst, destination;
199 struct kore_msg *msg = (struct kore_msg *)nb->buf;
200
201 if ((type = msg_type_lookup(msg->id)) != NULL) {
202 if (worker == NULL && msg->dst != KORE_MSG_PARENT)
203 fatal("received parent msg for non parent dst");
204 if (worker != NULL && msg->dst != worker->id)
205 fatal("received message for incorrect worker");
206
207 if (msg->length > 0)
208 type->cb(msg, nb->buf + sizeof(*msg));
209 else
210 type->cb(msg, NULL);
211 }
212
213 if (worker == NULL && type == NULL) {
214 destination = msg->dst;
215
216 for (i = 0; i < cacheidx; i++) {
217 c = conncache[i];
218 if (c->proto != CONN_PROTO_MSG)
219 fatal("connection not a msg connection");
220
221 /*
222 * If hdlr_extra is NULL it just means the worker
223 * never started, ignore it.
224 */
225 if (c->hdlr_extra == NULL)
226 continue;
227
228 deliver = 1;
229 dst = *(u_int16_t *)c->hdlr_extra;
230
231 if (destination == KORE_MSG_WORKER_ALL) {
232 if (keymgr_active && dst == 0)
233 deliver = 0;
234 } else {
235 if (dst != destination)
236 deliver = 0;
237 }
238
239 if (deliver == 0)
240 continue;
241
242 /* This allows the worker to receive the correct id. */
243 msg->dst = *(u_int16_t *)c->hdlr_extra;
244
245 net_send_queue(c, nb->buf, nb->s_off);
246 net_send_flush(c);
247 }
248 }
249
250 net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
251 return (KORE_RESULT_OK);
252 }
253
254 static void
msg_disconnected_parent(struct connection * c)255 msg_disconnected_parent(struct connection *c)
256 {
257 if (!kore_quiet)
258 kore_log(LOG_ERR, "parent gone, shutting down");
259
260 if (kill(worker->pid, SIGQUIT) == -1)
261 kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s);
262 }
263
264 static void
msg_disconnected_worker(struct connection * c)265 msg_disconnected_worker(struct connection *c)
266 {
267 c->hdlr_extra = NULL;
268 }
269
270 static void
msg_type_shutdown(struct kore_msg * msg,const void * data)271 msg_type_shutdown(struct kore_msg *msg, const void *data)
272 {
273 if (!kore_quiet) {
274 kore_log(LOG_NOTICE,
275 "shutdown requested by worker %u, going down", msg->src);
276 }
277
278 (void)raise(SIGQUIT);
279 }
280
281 #if !defined(KORE_NO_HTTP)
282 static void
msg_type_websocket(struct kore_msg * msg,const void * data)283 msg_type_websocket(struct kore_msg *msg, const void *data)
284 {
285 struct connection *c;
286
287 TAILQ_FOREACH(c, &connections, list) {
288 if (c->proto == CONN_PROTO_WEBSOCKET) {
289 net_send_queue(c, data, msg->length);
290 net_send_flush(c);
291 }
292 }
293 }
294 #endif
295
296 static struct msg_type *
msg_type_lookup(u_int8_t id)297 msg_type_lookup(u_int8_t id)
298 {
299 struct msg_type *type;
300
301 TAILQ_FOREACH(type, &msg_types, list) {
302 if (type->id == id)
303 return (type);
304 }
305
306 return (NULL);
307 }
308