1 /*
2  * Copyright (c) 2015 Joris Vink <joris@coders.se>
3  *
4  * Permission to use, copy, modify, and distribute this software for any
5  * purpose with or without fee is hereby granted, provided that the above
6  * copyright notice and this permission notice appear in all copies.
7  *
8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15  */
16 
17 /*
18  * Simple example of how SSE (Server Side Events) could be used in Kore.
19  *
20  * Upon new arrivals, a join event is broadcast to all clients.
21  * If a client goes away a leave event is broadcasted.
22  * Each connection gets its own 10 second ping timer which will emit
23  * a ping event to the connection endpoint.
24  */
25 
26 #include <kore/kore.h>
27 #include <kore/http.h>
28 
29 #include "assets.h"
30 
31 void	sse_ping(void *, u_int64_t);
32 int	page(struct http_request *);
33 int	subscribe(struct http_request *);
34 void	sse_disconnect(struct connection *);
35 void	sse_send(struct connection *, void *, size_t);
36 void	sse_broadcast(struct connection *, void *, size_t);
37 int	check_header(struct http_request *, const char *, const char *);
38 
39 /*
40  * Each client subscribed to our SSE gets a state attached
41  * to their hdlr_extra pointer member.
42  */
43 struct sse_state {
44 	struct kore_timer		*timer;
45 };
46 
47 int
page(struct http_request * req)48 page(struct http_request *req)
49 {
50 	if (req->method != HTTP_METHOD_GET) {
51 		http_response_header(req, "allow", "get");
52 		http_response(req, 405, NULL, 0);
53 		return (KORE_RESULT_OK);
54 	}
55 
56 	http_response_header(req, "content-type", "text/html");
57 	http_response(req, 200, asset_index_html, asset_len_index_html);
58 	return (KORE_RESULT_OK);
59 }
60 
61 int
subscribe(struct http_request * req)62 subscribe(struct http_request *req)
63 {
64 	struct sse_state	*state;
65 	char			*hello = "event:join\ndata: client\n\n";
66 
67 	/* Preventive paranoia. */
68 	if (req->hdlr_extra != NULL) {
69 		kore_log(LOG_ERR, "%p: already subscribed", req->owner);
70 		http_response(req, 500, NULL, 0);
71 		return (KORE_RESULT_OK);
72 	}
73 
74 	/* Only allow GET methods. */
75 	if (req->method != HTTP_METHOD_GET) {
76 		http_response_header(req, "allow", "get");
77 		http_response(req, 405, NULL, 0);
78 		return (KORE_RESULT_OK);
79 	}
80 
81 	/* Only do SSE if the client told us it wanted too. */
82 	if (!check_header(req, "accept", "text/event-stream"))
83 		return (KORE_RESULT_OK);
84 
85 	/* Do not include content-length in our response. */
86 	req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
87 
88 	/* Notify existing clients of our new client now. */
89 	sse_broadcast(req->owner, hello, strlen(hello));
90 
91 	/* Set a disconnection method so we know when this client goes away. */
92 	req->owner->disconnect = sse_disconnect;
93 
94 	/* Allocate a state to be carried by our connection. */
95 	state = kore_malloc(sizeof(*state));
96 	req->owner->hdlr_extra = state;
97 
98 	/* Now start a timer to send a ping back every 10 second. */
99 	state->timer = kore_timer_add(sse_ping, 10000, req->owner, 0);
100 
101 	/* Respond that the SSE channel is now open. */
102 	kore_log(LOG_NOTICE, "%p: connected for SSE", req->owner);
103 	http_response_header(req, "content-type", "text/event-stream");
104 	http_response(req, 200, NULL, 0);
105 
106 	return (KORE_RESULT_OK);
107 }
108 
109 void
sse_broadcast(struct connection * src,void * data,size_t len)110 sse_broadcast(struct connection *src, void *data, size_t len)
111 {
112 	struct connection	*c;
113 
114 	/* Broadcast the message to all other clients. */
115 	TAILQ_FOREACH(c, &connections, list) {
116 		if (c == src)
117 			continue;
118 		sse_send(c, data, len);
119 	}
120 }
121 
122 void
sse_send(struct connection * c,void * data,size_t len)123 sse_send(struct connection *c, void *data, size_t len)
124 {
125 	struct sse_state	*state = c->hdlr_extra;
126 
127 	/* Do not send to clients that do not have a state. */
128 	if (state == NULL)
129 		return;
130 
131 	/* Queue outgoing data now. */
132 	net_send_queue(c, data, len);
133 	net_send_flush(c);
134 }
135 
136 void
sse_ping(void * arg,u_int64_t now)137 sse_ping(void *arg, u_int64_t now)
138 {
139 	struct connection		*c = arg;
140 	char				*ping = "event:ping\ndata:\n\n";
141 
142 	/* Send our ping to the client. */
143 	sse_send(c, ping, strlen(ping));
144 }
145 
146 void
sse_disconnect(struct connection * c)147 sse_disconnect(struct connection *c)
148 {
149 	struct sse_state	*state = c->hdlr_extra;
150 	char			*leaving = "event: leave\ndata: client\n\n";
151 
152 	kore_log(LOG_NOTICE, "%p: disconnecting for SSE", c);
153 
154 	/* Tell others we are leaving. */
155 	sse_broadcast(c, leaving, strlen(leaving));
156 
157 	/* Kill our timer and free/remove the state. */
158 	kore_timer_remove(state->timer);
159 	kore_free(state);
160 
161 	/* Prevent us to be called again. */
162 	c->hdlr_extra = NULL;
163 	c->disconnect = NULL;
164 }
165 
166 int
check_header(struct http_request * req,const char * name,const char * value)167 check_header(struct http_request *req, const char *name, const char *value)
168 {
169 	const char		*hdr;
170 
171 	if (!http_request_header(req, name, &hdr)) {
172 		http_response(req, 400, NULL, 0);
173 		return (KORE_RESULT_ERROR);
174 	}
175 
176 	if (strcmp(hdr, value)) {
177 		http_response(req, 400, NULL, 0);
178 		return (KORE_RESULT_ERROR);
179 	}
180 
181 	return (KORE_RESULT_OK);
182 }
183