1 /*
2  * ws protocol handler plugin for "lws-minimal"
3  *
4  * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  * This version uses an lws_ring ringbuffer to cache up to 8 messages at a time,
10  * so it's not so easy to lose messages.
11  */
12 
13 #if !defined (LWS_PLUGIN_STATIC)
14 #define LWS_DLL
15 #define LWS_INTERNAL
16 #include <libwebsockets.h>
17 #endif
18 
19 #include <string.h>
20 #include <stdlib.h>
21 
22 /* one of these created for each message */
23 
24 struct msg {
25 	void *payload; /* is malloc'd */
26 	size_t len;
27 };
28 
29 /* one of these is created for each client connecting to us */
30 
31 struct per_session_data__minimal {
32 	struct per_session_data__minimal *pss_list;
33 	struct lws *wsi;
34 	uint32_t tail;
35 };
36 
37 /* one of these is created for each vhost our protocol is used with */
38 
39 struct per_vhost_data__minimal {
40 	struct lws_context *context;
41 	struct lws_vhost *vhost;
42 	const struct lws_protocols *protocol;
43 
44 	lws_sorted_usec_list_t sul;
45 
46 	struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
47 
48 	struct lws_ring *ring; /* ringbuffer holding unsent messages */
49 	struct lws_client_connect_info i;
50 	struct lws *client_wsi;
51 };
52 
53 /* destroys the message when everyone has had a copy of it */
54 
55 static void
__minimal_destroy_message(void * _msg)56 __minimal_destroy_message(void *_msg)
57 {
58 	struct msg *msg = _msg;
59 
60 	free(msg->payload);
61 	msg->payload = NULL;
62 	msg->len = 0;
63 }
64 
65 static void
sul_connect_attempt(struct lws_sorted_usec_list * sul)66 sul_connect_attempt(struct lws_sorted_usec_list *sul)
67 {
68 	struct per_vhost_data__minimal *vhd =
69 		lws_container_of(sul, struct per_vhost_data__minimal, sul);
70 
71 	vhd->i.context = vhd->context;
72 	vhd->i.port = 443;
73 	vhd->i.address = "libwebsockets.org";
74 	vhd->i.path = "/";
75 	vhd->i.host = vhd->i.address;
76 	vhd->i.origin = vhd->i.address;
77 	vhd->i.ssl_connection = 1;
78 
79 	vhd->i.protocol = "dumb-increment-protocol";
80 	vhd->i.local_protocol_name = "lws-minimal-proxy";
81 	vhd->i.pwsi = &vhd->client_wsi;
82 
83 	if (!lws_client_connect_via_info(&vhd->i))
84 		lws_sul_schedule(vhd->context, 0, &vhd->sul,
85 				 sul_connect_attempt, 10 * LWS_US_PER_SEC);
86 }
87 
88 static int
callback_minimal(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)89 callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
90 			void *user, void *in, size_t len)
91 {
92 	struct per_session_data__minimal *pss =
93 			(struct per_session_data__minimal *)user;
94 	struct per_vhost_data__minimal *vhd =
95 			(struct per_vhost_data__minimal *)
96 			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
97 					lws_get_protocol(wsi));
98 	const struct msg *pmsg;
99 	struct msg amsg;
100 	int m;
101 
102 	switch (reason) {
103 
104 	/* --- protocol lifecycle callbacks --- */
105 
106 	case LWS_CALLBACK_PROTOCOL_INIT:
107 		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
108 				lws_get_protocol(wsi),
109 				sizeof(struct per_vhost_data__minimal));
110 		vhd->context = lws_get_context(wsi);
111 		vhd->protocol = lws_get_protocol(wsi);
112 		vhd->vhost = lws_get_vhost(wsi);
113 
114 		vhd->ring = lws_ring_create(sizeof(struct msg), 8,
115 					    __minimal_destroy_message);
116 		if (!vhd->ring)
117 			return 1;
118 
119 		sul_connect_attempt(&vhd->sul);
120 		break;
121 
122 	case LWS_CALLBACK_PROTOCOL_DESTROY:
123 		lws_ring_destroy(vhd->ring);
124 		lws_sul_cancel(&vhd->sul);
125 		break;
126 
127 	/* --- serving callbacks --- */
128 
129 	case LWS_CALLBACK_ESTABLISHED:
130 		/* add ourselves to the list of live pss held in the vhd */
131 		lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
132 		pss->tail = lws_ring_get_oldest_tail(vhd->ring);
133 		pss->wsi = wsi;
134 		break;
135 
136 	case LWS_CALLBACK_CLOSED:
137 		/* remove our closing pss from the list of live pss */
138 		lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
139 				  pss, vhd->pss_list);
140 		break;
141 
142 	case LWS_CALLBACK_SERVER_WRITEABLE:
143 		pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
144 		if (!pmsg)
145 			break;
146 
147 		/* notice we allowed for LWS_PRE in the payload already */
148 		m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
149 			      pmsg->len, LWS_WRITE_TEXT);
150 		if (m < (int)pmsg->len) {
151 			lwsl_err("ERROR %d writing to ws socket\n", m);
152 			return -1;
153 		}
154 
155 		lws_ring_consume_and_update_oldest_tail(
156 			vhd->ring,	/* lws_ring object */
157 			struct per_session_data__minimal, /* type of objects with tails */
158 			&pss->tail,	/* tail of guy doing the consuming */
159 			1,		/* number of payload objects being consumed */
160 			vhd->pss_list,	/* head of list of objects with tails */
161 			tail,		/* member name of tail in objects with tails */
162 			pss_list	/* member name of next object in objects with tails */
163 		);
164 
165 		/* more to do? */
166 		if (lws_ring_get_element(vhd->ring, &pss->tail))
167 			/* come back as soon as we can write more */
168 			lws_callback_on_writable(pss->wsi);
169 		break;
170 
171 	/* --- client callbacks --- */
172 
173 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
174 		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
175 			 in ? (char *)in : "(null)");
176 		vhd->client_wsi = NULL;
177 		lws_sul_schedule(vhd->context, 0, &vhd->sul,
178 				 sul_connect_attempt, LWS_US_PER_SEC);
179 		break;
180 
181 	case LWS_CALLBACK_CLIENT_ESTABLISHED:
182 		lwsl_user("%s: established\n", __func__);
183 		break;
184 
185 	case LWS_CALLBACK_CLIENT_RECEIVE:
186 		/* if no clients, just drop incoming */
187 		if (!vhd->pss_list)
188 			break;
189 
190 		if (!lws_ring_get_count_free_elements(vhd->ring)) {
191 			lwsl_user("dropping!\n");
192 			break;
193 		}
194 
195 		amsg.len = len;
196 		/* notice we over-allocate by LWS_PRE */
197 		amsg.payload = malloc(LWS_PRE + len);
198 		if (!amsg.payload) {
199 			lwsl_user("OOM: dropping\n");
200 			break;
201 		}
202 
203 		memcpy((char *)amsg.payload + LWS_PRE, in, len);
204 		if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
205 			__minimal_destroy_message(&amsg);
206 			lwsl_user("dropping!\n");
207 			break;
208 		}
209 
210 		/*
211 		 * let everybody know we want to write something on them
212 		 * as soon as they are ready
213 		 */
214 		lws_start_foreach_llp(struct per_session_data__minimal **,
215 				      ppss, vhd->pss_list) {
216 			lws_callback_on_writable((*ppss)->wsi);
217 		} lws_end_foreach_llp(ppss, pss_list);
218 		break;
219 
220 	case LWS_CALLBACK_CLIENT_CLOSED:
221 		vhd->client_wsi = NULL;
222 		lws_sul_schedule(vhd->context, 0, &vhd->sul,
223 				 sul_connect_attempt, LWS_US_PER_SEC);
224 		break;
225 
226 	default:
227 		break;
228 	}
229 
230 	return 0;
231 }
232 
233 #define LWS_PLUGIN_PROTOCOL_MINIMAL \
234 	{  \
235 		"lws-minimal-proxy", \
236 		callback_minimal, \
237 		sizeof(struct per_session_data__minimal), \
238 		128, \
239 		0, NULL, 0 \
240 	}
241