1 /*
2  * Copyright (c) 2015-2021 Joris Vink <joris@coders.se>
3  *
4  * Permission to use, copy, modify, and distribute this software for any
5  * purpose with or without fee is hereby granted, provided that the above
6  * copyright notice and this permission notice appear in all copies.
7  *
8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15  */
16 
17 #include <sys/types.h>
18 #include <sys/socket.h>
19 
20 #include <signal.h>
21 
22 #include "kore.h"
23 #include "http.h"
24 
25 struct msg_type {
26 	u_int8_t		id;
27 	void			(*cb)(struct kore_msg *, const void *);
28 	TAILQ_ENTRY(msg_type)	list;
29 };
30 
31 static struct msg_type	*msg_type_lookup(u_int8_t);
32 static int		msg_recv_packet(struct netbuf *);
33 static int		msg_recv_data(struct netbuf *);
34 static void		msg_disconnected_parent(struct connection *);
35 static void		msg_disconnected_worker(struct connection *);
36 static void		msg_type_shutdown(struct kore_msg *, const void *);
37 
38 #if !defined(KORE_NO_HTTP)
39 static void		msg_type_websocket(struct kore_msg *, const void *);
40 #endif
41 
42 static TAILQ_HEAD(, msg_type)	msg_types;
43 static size_t			cacheidx = 0;
44 static struct connection	**conncache = NULL;
45 
46 void
kore_msg_init(void)47 kore_msg_init(void)
48 {
49 	TAILQ_INIT(&msg_types);
50 }
51 
52 void
kore_msg_parent_init(void)53 kore_msg_parent_init(void)
54 {
55 	u_int8_t		idx;
56 	struct kore_worker	*kw;
57 
58 	for (idx = 0; idx < worker_count; idx++) {
59 		if (keymgr_active == 0) {
60 			if (idx == KORE_WORKER_KEYMGR_IDX ||
61 			    idx == KORE_WORKER_ACME_IDX)
62 				continue;
63 		}
64 
65 #if !defined(KORE_USE_ACME)
66 		if (idx == KORE_WORKER_ACME_IDX)
67 			continue;
68 #endif
69 
70 		kw = kore_worker_data(idx);
71 		kore_msg_parent_add(kw);
72 	}
73 
74 	kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
75 }
76 
77 void
kore_msg_parent_add(struct kore_worker * kw)78 kore_msg_parent_add(struct kore_worker *kw)
79 {
80 	kw->msg[0] = kore_connection_new(NULL);
81 	kw->msg[0]->fd = kw->pipe[0];
82 	kw->msg[0]->read = net_read;
83 	kw->msg[0]->write = net_write;
84 	kw->msg[0]->proto = CONN_PROTO_MSG;
85 	kw->msg[0]->state = CONN_STATE_ESTABLISHED;
86 	kw->msg[0]->hdlr_extra = &kw->id;
87 	kw->msg[0]->disconnect = msg_disconnected_worker;
88 	kw->msg[0]->handle = kore_connection_handle;
89 
90 	conncache = kore_realloc(conncache,
91 	    (cacheidx + 1) * sizeof(struct connection *));
92 
93 	conncache[cacheidx++] = kw->msg[0];
94 
95 	TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
96 	kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
97 
98 	net_recv_queue(kw->msg[0], sizeof(struct kore_msg), 0, msg_recv_packet);
99 }
100 
101 void
kore_msg_parent_remove(struct kore_worker * kw)102 kore_msg_parent_remove(struct kore_worker *kw)
103 {
104 	kore_connection_disconnect(kw->msg[0]);
105 	kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
106 	(void)close(kw->pipe[1]);
107 }
108 
109 void
kore_msg_worker_init(void)110 kore_msg_worker_init(void)
111 {
112 #if !defined(KORE_NO_HTTP)
113 	kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
114 #endif
115 
116 	worker->msg[1] = kore_connection_new(NULL);
117 	worker->msg[1]->fd = worker->pipe[1];
118 	worker->msg[1]->read = net_read;
119 	worker->msg[1]->write = net_write;
120 	worker->msg[1]->proto = CONN_PROTO_MSG;
121 	worker->msg[1]->state = CONN_STATE_ESTABLISHED;
122 	worker->msg[1]->disconnect = msg_disconnected_parent;
123 	worker->msg[1]->handle = kore_connection_handle;
124 	worker->msg[1]->evt.flags = KORE_EVENT_WRITE;
125 
126 	TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
127 	kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
128 
129 	net_recv_queue(worker->msg[1],
130 	    sizeof(struct kore_msg), 0, msg_recv_packet);
131 }
132 
133 void
kore_msg_unregister(u_int8_t id)134 kore_msg_unregister(u_int8_t id)
135 {
136 	struct msg_type		*type;
137 
138 	if ((type = msg_type_lookup(id)) == NULL)
139 		return;
140 
141 	TAILQ_REMOVE(&msg_types, type, list);
142 	kore_free(type);
143 }
144 
145 int
kore_msg_register(u_int8_t id,void (* cb)(struct kore_msg *,const void *))146 kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *))
147 {
148 	struct msg_type		*type;
149 
150 	if ((type = msg_type_lookup(id)) != NULL)
151 		return (KORE_RESULT_ERROR);
152 
153 	type = kore_malloc(sizeof(*type));
154 	type->id = id;
155 	type->cb = cb;
156 	TAILQ_INSERT_TAIL(&msg_types, type, list);
157 
158 	return (KORE_RESULT_OK);
159 }
160 
161 void
kore_msg_send(u_int16_t dst,u_int8_t id,const void * data,size_t len)162 kore_msg_send(u_int16_t dst, u_int8_t id, const void *data, size_t len)
163 {
164 	struct kore_msg		m;
165 
166 	m.id = id;
167 	m.dst = dst;
168 	m.length = len;
169 	m.src = worker->id;
170 
171 	net_send_queue(worker->msg[1], &m, sizeof(m));
172 	if (data != NULL && len > 0)
173 		net_send_queue(worker->msg[1], data, len);
174 
175 	net_send_flush(worker->msg[1]);
176 }
177 
178 static int
msg_recv_packet(struct netbuf * nb)179 msg_recv_packet(struct netbuf *nb)
180 {
181 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
182 
183 	if (msg->length > 0) {
184 		net_recv_expand(nb->owner, msg->length, msg_recv_data);
185 		return (KORE_RESULT_OK);
186 	}
187 
188 	return (msg_recv_data(nb));
189 }
190 
191 static int
msg_recv_data(struct netbuf * nb)192 msg_recv_data(struct netbuf *nb)
193 {
194 	size_t			i;
195 	struct connection	*c;
196 	struct msg_type		*type;
197 	int			deliver;
198 	u_int16_t		dst, destination;
199 	struct kore_msg		*msg = (struct kore_msg *)nb->buf;
200 
201 	if ((type = msg_type_lookup(msg->id)) != NULL) {
202 		if (worker == NULL && msg->dst != KORE_MSG_PARENT)
203 			fatal("received parent msg for non parent dst");
204 		if (worker != NULL && msg->dst != worker->id)
205 			fatal("received message for incorrect worker");
206 
207 		if (msg->length > 0)
208 			type->cb(msg, nb->buf + sizeof(*msg));
209 		else
210 			type->cb(msg, NULL);
211 	}
212 
213 	if (worker == NULL && type == NULL) {
214 		destination = msg->dst;
215 
216 		for (i = 0; i < cacheidx; i++) {
217 			c = conncache[i];
218 			if (c->proto != CONN_PROTO_MSG)
219 				fatal("connection not a msg connection");
220 
221 			/*
222 			 * If hdlr_extra is NULL it just means the worker
223 			 * never started, ignore it.
224 			 */
225 			if (c->hdlr_extra == NULL)
226 				continue;
227 
228 			deliver = 1;
229 			dst = *(u_int16_t *)c->hdlr_extra;
230 
231 			if (destination == KORE_MSG_WORKER_ALL) {
232 				if (keymgr_active && dst == 0)
233 					deliver = 0;
234 			} else {
235 				if (dst != destination)
236 					deliver = 0;
237 			}
238 
239 			if (deliver == 0)
240 				continue;
241 
242 			/* This allows the worker to receive the correct id. */
243 			msg->dst = *(u_int16_t *)c->hdlr_extra;
244 
245 			net_send_queue(c, nb->buf, nb->s_off);
246 			net_send_flush(c);
247 		}
248 	}
249 
250 	net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_packet);
251 	return (KORE_RESULT_OK);
252 }
253 
254 static void
msg_disconnected_parent(struct connection * c)255 msg_disconnected_parent(struct connection *c)
256 {
257 	if (!kore_quiet)
258 		kore_log(LOG_ERR, "parent gone, shutting down");
259 
260 	if (kill(worker->pid, SIGQUIT) == -1)
261 		kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s);
262 }
263 
264 static void
msg_disconnected_worker(struct connection * c)265 msg_disconnected_worker(struct connection *c)
266 {
267 	c->hdlr_extra = NULL;
268 }
269 
270 static void
msg_type_shutdown(struct kore_msg * msg,const void * data)271 msg_type_shutdown(struct kore_msg *msg, const void *data)
272 {
273 	if (!kore_quiet) {
274 		kore_log(LOG_NOTICE,
275 		    "shutdown requested by worker %u, going down", msg->src);
276 	}
277 
278 	(void)raise(SIGQUIT);
279 }
280 
281 #if !defined(KORE_NO_HTTP)
282 static void
msg_type_websocket(struct kore_msg * msg,const void * data)283 msg_type_websocket(struct kore_msg *msg, const void *data)
284 {
285 	struct connection	*c;
286 
287 	TAILQ_FOREACH(c, &connections, list) {
288 		if (c->proto == CONN_PROTO_WEBSOCKET) {
289 			net_send_queue(c, data, msg->length);
290 			net_send_flush(c);
291 		}
292 	}
293 }
294 #endif
295 
296 static struct msg_type *
msg_type_lookup(u_int8_t id)297 msg_type_lookup(u_int8_t id)
298 {
299 	struct msg_type		*type;
300 
301 	TAILQ_FOREACH(type, &msg_types, list) {
302 		if (type->id == id)
303 			return (type);
304 	}
305 
306 	return (NULL);
307 }
308