1 /*
2  * lws-minimal-secure-streams-client
3  *
4  * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5  *
6  * This file is made available under the Creative Commons CC0 1.0
7  * Universal Public Domain Dedication.
8  *
9  *
10  * This client does not perform any INET networking... instead it opens a unix
11  * domain socket on a proxy that is listening for it, and that creates the
12  * actual secure stream connection.
13  *
14  * We are able to use the usual secure streams api in the client process, with
15  * payloads and connection state information proxied over the unix domain
16  * socket and fulfilled in the proxy process.
17  *
18  * The public client helper pieces are built as part of lws
19  */
20 #include <private-lib-core.h>
21 
22 lws_ss_state_return_t
lws_sspc_event_helper(lws_sspc_handle_t * h,lws_ss_constate_t cs,lws_ss_tx_ordinal_t flags)23 lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs,
24 		      lws_ss_tx_ordinal_t flags)
25 {
26 	if (!h)
27 		return LWSSSSRET_OK;
28 
29 	if (lws_ss_check_next_state(&h->lc, &h->prev_ss_state, cs))
30 		return LWSSSSRET_DESTROY_ME;
31 
32 	if (!h->ssi.state)
33 		return LWSSSSRET_OK;
34 
35 	return h->ssi.state((void *)((uint8_t *)&h[1]), NULL, cs, flags);
36 }
37 
38 static void
lws_sspc_sul_retry_cb(lws_sorted_usec_list_t * sul)39 lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
40 {
41 	lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry);
42 	static struct lws_client_connect_info i;
43 
44 	/*
45 	 * We may have started up before the system proxy, so be prepared with
46 	 * a sul to retry at 1Hz
47 	 */
48 
49 	memset(&i, 0, sizeof i);
50 	i.context = h->context;
51 	if (h->context->ss_proxy_port) { /* tcp */
52 		i.address = h->context->ss_proxy_address;
53 		i.port = h->context->ss_proxy_port;
54 		i.iface = h->context->ss_proxy_bind;
55 	} else {
56 		if (h->context->ss_proxy_bind)
57 			i.address = h->context->ss_proxy_bind;
58 		else
59 #if defined(__linux__)
60 			i.address = "+@proxy.ss.lws";
61 #else
62 			i.address = "+/tmp/proxy.ss.lws";
63 #endif
64 	}
65 	i.host = i.address;
66 	i.origin = i.address;
67 	i.method = "RAW";
68 	i.protocol = lws_sspc_protocols[0].name;
69 	i.local_protocol_name = lws_sspc_protocols[0].name;
70 	i.path = "";
71 	i.pwsi = &h->cwsi;
72 	i.opaque_user_data = (void *)h;
73 	i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK;
74 
75 	lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn);
76 #if defined(LWS_WITH_SYS_METRICS)
77 	lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype);
78 #endif
79 
80 	/* this wsi is the link to the proxy */
81 
82 	if (!lws_client_connect_via_info(&i)) {
83 
84 #if defined(LWS_WITH_SYS_METRICS)
85 		/*
86 		 * If any hanging caliper measurement, dump it, and free any tags
87 		 */
88 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
89 #endif
90 
91 		lws_sul_schedule(h->context, 0, &h->sul_retry,
92 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
93 
94 		return;
95 	}
96 
97 	lwsl_notice("%s: %s\n", __func__, h->cwsi->lc.gutag);
98 }
99 
100 static int
lws_sspc_serialize_metadata(lws_sspc_metadata_t * md,uint8_t * p,uint8_t * end)101 lws_sspc_serialize_metadata(lws_sspc_metadata_t *md, uint8_t *p, uint8_t *end)
102 {
103 	int n, txc;
104 
105 	if (md->name[0] == '\0') {
106 
107 		lwsl_info("%s: sending tx credit update %d\n", __func__,
108 				md->tx_cr_adjust);
109 
110 		p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE;
111 		lws_ser_wu16be(&p[1], 4);
112 		lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust);
113 
114 		n = 7;
115 
116 	} else {
117 
118 		lwsl_info("%s: sending metadata\n", __func__);
119 
120 		p[0] = LWSSS_SER_TXPRE_METADATA;
121 		txc = (int)strlen(md->name);
122 		n = txc + 1 + (int)md->len;
123 		if (n > 0xffff)
124 			/* we can't serialize this metadata in 16b length */
125 			return -1;
126 		if (n > lws_ptr_diff(end, &p[4]))
127 			/* we don't have space for this metadata */
128 			return -1;
129 		lws_ser_wu16be(&p[1], (uint16_t)n);
130 		p[3] = (uint8_t)txc;
131 		memcpy(&p[4], md->name, (unsigned int)txc);
132 		memcpy(&p[4 + txc], &md[1], md->len);
133 		n = 4 + txc + (int)md->len;
134 	}
135 
136 	lws_dll2_remove(&md->list);
137 	lws_free(md);
138 
139 	return n;
140 }
141 
142 static int
callback_sspc_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)143 callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
144 		     void *user, void *in, size_t len)
145 {
146 	lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi);
147 	size_t pktsize = wsi->a.context->max_http_header_data;
148 	void *m = (void *)((uint8_t *)&h[1]);
149 	uint8_t *pkt = NULL, *p = NULL, *end = NULL;
150 	const uint8_t *cp;
151 	uint8_t s[64];
152 	lws_usec_t us;
153 	int flags, n;
154 
155 	switch (reason) {
156 	case LWS_CALLBACK_PROTOCOL_INIT:
157 		break;
158 
159 	case LWS_CALLBACK_PROTOCOL_DESTROY:
160 		break;
161 
162 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
163 		lwsl_warn("%s: CONNECTION_ERROR\n", __func__);
164 #if defined(LWS_WITH_SYS_METRICS)
165 		/*
166 		 * If any hanging caliper measurement, dump it, and free any tags
167 		 */
168 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
169 #endif
170 		lws_set_opaque_user_data(wsi, NULL);
171 		h->cwsi = NULL;
172 		lws_sul_schedule(h->context, 0, &h->sul_retry,
173 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
174 		break;
175 
176         case LWS_CALLBACK_RAW_CONNECTED:
177 		if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup"))
178 			return -1;
179 		lwsl_info("%s: CONNECTED (%s)\n", __func__, h->ssi.streamtype);
180 
181 		h->state = LPCSCLI_SENDING_INITIAL_TX;
182 		/*
183 		 * We create the dsh at the response to the initial tx, which
184 		 * will let us know the policy's max size for it... let's
185 		 * protect the connection with a promise to complete the
186 		 * SS serialization streamtype negotation within a short period,
187 		 * we will cancel this timeout when we have the proxy's ack
188 		 * of the streamtype serialization, eg, it exists in the proxy
189 		 * policy etc
190 		 */
191 		lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
192 		lws_callback_on_writable(wsi);
193                 break;
194 
195 	case LWS_CALLBACK_RAW_CLOSE:
196 		/*
197 		 * our ss proxy Unix Domain socket has closed...
198 		 */
199 		lwsl_info("%s: LWS_CALLBACK_RAW_CLOSE: %s proxy conn down, sspc h %s\n",
200 			    __func__, lws_wsi_tag(wsi), lws_sspc_tag(h));
201 		if (!h) {
202 			lwsl_info("%s: no sspc on client proxy link close\n", __func__);
203 			break;
204 		}
205 
206 		lws_dsh_destroy(&h->dsh);
207 		if (h->ss_dangling_connected && h->ssi.state) {
208 			lws_ss_state_return_t ret_state;
209 
210 			lwsl_notice("%s: setting _DISCONNECTED\n", __func__);
211 			h->ss_dangling_connected = 0;
212 			h->prev_ss_state = LWSSSCS_DISCONNECTED;
213 			ret_state = h->ssi.state(ss_to_userobj(h), NULL,
214 						 LWSSSCS_DISCONNECTED, 0);
215 			if (ret_state == LWSSSSRET_DESTROY_ME) {
216 				h->cwsi = NULL;
217 				lws_set_opaque_user_data(wsi, NULL);
218 				lws_sspc_destroy(&h);
219 				break;
220 			}
221 		}
222 
223 		h->cwsi = NULL;
224 		/*
225 		 * schedule a reconnect in 1s
226 		 */
227 		lws_sul_schedule(h->context, 0, &h->sul_retry,
228 				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
229 
230 		break;
231 
232 	case LWS_CALLBACK_RAW_RX:
233 		/*
234 		 * ie, the proxy has sent us something
235 		 */
236 		lwsl_info("%s: RAW_RX: rx %d\n", __func__, (int)len);
237 
238 		if (!h || !h->cwsi) {
239 			lwsl_info("%s: rx when client ss destroyed\n", __func__);
240 
241 			return -1;
242 		}
243 
244 		if (!len) {
245 			lwsl_notice("%s: RAW_RX: zero len\n", __func__);
246 
247 			return -1;
248 		}
249 
250 		if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me"))
251 			n = LWSSSSRET_DISCONNECT_ME;
252 		else
253 			if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me"))
254 				n = LWSSSSRET_DESTROY_ME;
255 			else
256 				n = lws_ss_deserialize_parse(&h->parser,
257 							     lws_get_context(wsi),
258 							     h->dsh, in, len,
259 							     &h->state, h,
260 							     (lws_ss_handle_t **)m,
261 							     &h->ssi, 1);
262 		switch (n) {
263 		case LWSSSSRET_OK:
264 			break;
265 		case LWSSSSRET_DISCONNECT_ME:
266 			lwsl_notice("%s: proxlicent RX ended with DISCONNECT_ME\n",
267 					__func__);
268 			return -1;
269 		case LWSSSSRET_DESTROY_ME:
270 			lwsl_notice("%s: proxlicent RX ended with DESTROY_ME\n",
271 					__func__);
272 			lws_set_opaque_user_data(wsi, NULL);
273 			lws_sspc_destroy(&h);
274 			return -1;
275 		}
276 
277 		if (h->state == LPCSCLI_LOCAL_CONNECTED ||
278 		    h->state == LPCSCLI_ONWARD_CONNECT)
279 			lws_set_timeout(wsi, 0, 0);
280 
281 		break;
282 
283 	case LWS_CALLBACK_RAW_WRITEABLE:
284 
285 		/*
286 		 * We can transmit something to the proxy...
287 		 */
288 
289 		if (!h)
290 			break;
291 
292 		lwsl_debug("%s: WRITEABLE %s, state %d\n", __func__,
293 			   wsi->lc.gutag, h->state);
294 
295 		/*
296 		 * Management of ss timeout can happen any time and doesn't
297 		 * depend on wsi existence or state
298 		 */
299 
300 		n = 0;
301 		cp = s;
302 
303 		if (h->pending_timeout_update) {
304 			s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE;
305 			s[1] = 0;
306 			s[2] = 4;
307 			/*
308 			 *          0: use policy timeout value
309 			 * 0xffffffff: cancel the timeout
310 			 */
311 			lws_ser_wu32be(&s[3], h->timeout_ms);
312 			/* in case anything else to write */
313 			lws_callback_on_writable(h->cwsi);
314 			h->pending_timeout_update = 0;
315 			n = 7;
316 			goto do_write;
317 		}
318 
319 		s[1] = 0;
320 		/*
321 		 * This is the state of the link that connects us to the onward
322 		 * proxy
323 		 */
324 		switch (h->state) {
325 		case LPCSCLI_SENDING_INITIAL_TX:
326 			/*
327 			 * We are negotating the opening of a particular
328 			 * streamtype
329 			 */
330 			n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4;
331 
332 			s[0] = LWSSS_SER_TXPRE_STREAMTYPE;
333 			lws_ser_wu16be(&s[1], (uint16_t)n);
334 			/* SSSv1: add protocol version byte (initially 1) */
335 			s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION;
336 			lws_ser_wu32be(&s[4], (uint32_t)getpid());
337 			lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est);
338 			//h->txcr_out = txc;
339 			lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12);
340 			n += 3;
341 			h->state = LPCSCLI_WAITING_CREATE_RESULT;
342 
343 			break;
344 
345 		case LPCSCLI_LOCAL_CONNECTED:
346 
347 			// lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__);
348 
349 			/*
350 			 * Do we need to prioritize sending any metadata
351 			 * changes?
352 			 */
353 
354 			if (h->metadata_owner.count) {
355 				lws_sspc_metadata_t *md = lws_container_of(
356 					lws_dll2_get_tail(&h->metadata_owner),
357 					lws_sspc_metadata_t, list);
358 
359 				pkt = lws_malloc(pktsize + LWS_PRE, __func__);
360 				if (!pkt)
361 					goto hangup;
362 				cp = p = pkt + LWS_PRE;
363 				end = p + pktsize;
364 
365 				n = lws_sspc_serialize_metadata(md, p, end);
366 				if (n < 0)
367 					goto metadata_hangup;
368 
369 				lwsl_debug("%s: (local_conn) metadata\n", __func__);
370 
371 				goto req_write_and_issue;
372 			}
373 
374 			if (h->pending_writeable_len) {
375 				lwsl_debug("%s: (local_conn) PAYLOAD_LENGTH_HINT %u\n",
376 					   __func__, (unsigned int)h->writeable_len);
377 				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
378 				lws_ser_wu16be(&s[1], 4);
379 				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
380 				h->pending_writeable_len = 0;
381 				n = 7;
382 				goto req_write_and_issue;
383 			}
384 
385 			if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) {
386 				lwsl_info("%s: conn_req_state %d\n", __func__,
387 						h->conn_req_state);
388 				break;
389 			}
390 
391 			lwsl_info("%s: (local_conn) onward connect\n", __func__);
392 
393 			h->conn_req_state = LWSSSPC_ONW_ONGOING;
394 
395 			s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT;
396 			s[1] = 0;
397 			s[2] = 0;
398 			n = 3;
399 			break;
400 
401 		case LPCSCLI_OPERATIONAL:
402 
403 			/*
404 			 *
405 			 * - Do we need to prioritize sending any metadata
406 			 *   changes?  (includes txcr updates)
407 			 *
408 			 * - Do we need to forward a hint about the payload
409 			 *   length?
410 			 */
411 
412 			pkt = lws_malloc(pktsize + LWS_PRE, __func__);
413 			if (!pkt)
414 				goto hangup;
415 			cp = p = pkt + LWS_PRE;
416 			end = p + pktsize;
417 
418 			if (h->metadata_owner.count) {
419 				lws_sspc_metadata_t *md = lws_container_of(
420 					lws_dll2_get_tail(&h->metadata_owner),
421 					lws_sspc_metadata_t, list);
422 
423 				n = lws_sspc_serialize_metadata(md, p, end);
424 				if (n < 0)
425 					goto metadata_hangup;
426 
427 				goto req_write_and_issue;
428 			}
429 
430 			if (h->pending_writeable_len) {
431 				lwsl_info("%s: PAYLOAD_LENGTH_HINT %u\n",
432 					   __func__, (unsigned int)h->writeable_len);
433 				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
434 				lws_ser_wu16be(&s[1], 4);
435 				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
436 				h->pending_writeable_len = 0;
437 				n = 7;
438 				goto req_write_and_issue;
439 			}
440 
441 			/* we can't write anything if we don't have credit */
442 			if (!h->ignore_txc && h->txc.tx_cr <= 0) {
443 				lwsl_info("%s: WRITEABLE / OPERATIONAL:"
444 					    " lack credit (%d)\n", __func__,
445 					    h->txc.tx_cr);
446 				// break;
447 			}
448 
449 			len = pktsize - LWS_PRE - 19;
450 			flags = 0;
451 			if (!h->ssi.tx) {
452 				n = 0;
453 				goto do_write_nz;
454 			}
455 
456 			n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len,
457 				      &flags);
458 			switch (n) {
459 			case LWSSSSRET_TX_DONT_SEND:
460 				n = 0;
461 				goto do_write_nz;
462 
463 			case LWSSSSRET_DISCONNECT_ME:
464 			case LWSSSSRET_DESTROY_ME:
465 				lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__);
466 				break;
467 			default:
468 				break;
469 			}
470 
471 			h->txc.tx_cr = h->txc.tx_cr - (int)len;
472 
473 			cp = p;
474 			n = (int)(len + 19);
475 			us = lws_now_usecs();
476 			p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD;
477 			lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3));
478 			lws_ser_wu32be(&p[3], (uint32_t)flags);
479 			/* time spent here waiting to send this */
480 			lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req));
481 			/* ust that the client write happened */
482 			lws_ser_wu64be(&p[11], (uint64_t)us);
483 			h->us_earliest_write_req = 0;
484 
485 			if (flags & LWSSS_FLAG_EOM)
486 				if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) &&
487 				    h->rideshare_ofs[h->rsidx + 1])
488 					h->rsidx++;
489 
490 			break;
491 		default:
492 			break;
493 		}
494 
495 do_write_nz:
496 
497 		if (!n)
498 			break;
499 
500 do_write:
501 		if (lws_fi(&h->fic, "sspc_link_write_fail"))
502 			n = -1;
503 		else
504 			n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
505 		if (n < 0) {
506 			lwsl_notice("%s: WRITEABLE: %d\n", __func__, n);
507 
508 			goto hangup;
509 		}
510 		break;
511 
512 	default:
513 		break;
514 	}
515 
516 	lws_free(pkt);
517 
518 	return lws_callback_http_dummy(wsi, reason, user, in, len);
519 
520 metadata_hangup:
521 	lwsl_err("%s: metadata too large\n", __func__);
522 
523 hangup:
524 	lws_free(pkt);
525 	lwsl_warn("hangup\n");
526 	/* hang up on him */
527 	return -1;
528 
529 req_write_and_issue:
530 	/* in case anything else to write */
531 	lws_callback_on_writable(h->cwsi);
532 	goto do_write_nz;
533 }
534 
535 const struct lws_protocols lws_sspc_protocols[] = {
536 	{
537 		"ssproxy-protocol",
538 		callback_sspc_client,
539 		0,
540 		2048, 2048, NULL, 0
541 	},
542 	{ NULL, NULL, 0, 0, 0, NULL, 0 }
543 };
544 
545 int
lws_sspc_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_sspc_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)546 lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
547 	        void *opaque_user_data, lws_sspc_handle_t **ppss,
548 	        struct lws_sequencer *seq_owner, const char **ppayload_fmt)
549 {
550 	lws_sspc_handle_t *h;
551 	uint8_t *ua;
552 	char *p;
553 
554 	/* allocate the handle (including ssi), the user alloc,
555 	 * and the streamname */
556 
557 	h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc +
558 				strlen(ssi->streamtype) + 1);
559 	if (!h)
560 		return 1;
561 	memset(h, 0, sizeof(*h));
562 
563 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
564 	h->fic.name = "sspc";
565 	lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos));
566 	if (ssi->fic.fi_owner.count)
567 		lws_fi_import(&h->fic, &ssi->fic);
568 
569 	lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
570 #endif
571 
572 	if (lws_fi(&h->fic, "sspc_create_oom")) {
573 		/*
574 		 * We have to do this a litte later, so we can cleanly inherit
575 		 * the OOM pieces and drain the info fic
576 		 */
577 		lws_fi_destroy(&h->fic);
578 		free(h);
579 		return 1;
580 	}
581 
582 	__lws_lc_tag(&context->lcg[LWSLCG_SSP_CLIENT], &h->lc, ssi->streamtype);
583 
584 	memcpy(&h->ssi, ssi, sizeof(*ssi));
585 	ua = (uint8_t *)&h[1];
586 	memset(ua, 0, ssi->user_alloc);
587 	p = (char *)ua + ssi->user_alloc;
588 	memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
589 	h->ssi.streamtype = (const char *)p;
590 	h->context = context;
591 
592 	if (!ssi->manual_initial_tx_credit)
593 		h->txc.peer_tx_cr_est = 500000000;
594 	else
595 		h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
596 
597 	if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
598 		h->ignore_txc = 1;
599 
600 	lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
601 
602 	/* fill in the things the real api does for the caller */
603 
604 	*((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data;
605 	*((void **)(ua + ssi->handle_offset)) = h;
606 
607 	if (ppss)
608 		*ppss = h;
609 
610 	/* try the actual connect */
611 
612 	lws_sspc_sul_retry_cb(&h->sul_retry);
613 
614 	return 0;
615 }
616 
617 /* used on context destroy when iterating listed lws_ss on a pt */
618 
619 int
lws_sspc_destroy_dll(struct lws_dll2 * d,void * user)620 lws_sspc_destroy_dll(struct lws_dll2 *d, void *user)
621 {
622 	lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
623 
624 	lws_sspc_destroy(&h);
625 
626 	return 0;
627 }
628 
629 void
lws_sspc_rxmetadata_destroy(lws_sspc_handle_t * h)630 lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h)
631 {
632 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
633 			lws_dll2_get_head(&h->metadata_owner_rx)) {
634 		lws_sspc_metadata_t *md =
635 				lws_container_of(d, lws_sspc_metadata_t, list);
636 
637 		lws_dll2_remove(&md->list);
638 		lws_free(md);
639 
640 	} lws_end_foreach_dll_safe(d, d1);
641 }
642 
643 void
lws_sspc_destroy(lws_sspc_handle_t ** ph)644 lws_sspc_destroy(lws_sspc_handle_t **ph)
645 {
646 	lws_sspc_handle_t *h;
647 
648 	lwsl_debug("%s\n", __func__);
649 
650 	if (!*ph)
651 		return;
652 
653 	h = *ph;
654 
655 	if (h->destroying)
656 		return;
657 
658 	h->destroying = 1;
659 
660 	/* if this caliper is still dangling at destroy, we failed */
661 #if defined(LWS_WITH_SYS_METRICS)
662 	/*
663 	 * If any hanging caliper measurement, dump it, and free any tags
664 	 */
665 	lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
666 #endif
667 	if (h->ss_dangling_connected && h->ssi.state) {
668 		lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0);
669 		h->ss_dangling_connected = 0;
670 	}
671 
672 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
673 	lws_fi_destroy(&h->fic);
674 #endif
675 
676 	lws_sul_cancel(&h->sul_retry);
677 	lws_dll2_remove(&h->client_list);
678 
679 	if (h->dsh)
680 		lws_dsh_destroy(&h->dsh);
681 	if (h->cwsi) {
682 		lws_set_opaque_user_data(h->cwsi, NULL);
683 		lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC);
684 		h->cwsi = NULL;
685 	}
686 
687 	/* clean out any pending metadata changes that didn't make it */
688 
689 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
690 			lws_dll2_get_head(&(*ph)->metadata_owner)) {
691 		lws_sspc_metadata_t *md =
692 				lws_container_of(d, lws_sspc_metadata_t, list);
693 
694 		lws_dll2_remove(&md->list);
695 		lws_free(md);
696 
697 	} lws_end_foreach_dll_safe(d, d1);
698 
699 	lws_sspc_rxmetadata_destroy(h);
700 
701 	lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0);
702 	*ph = NULL;
703 
704 	lws_sul_cancel(&h->sul_retry);
705 
706 	__lws_lc_untag(&h->lc);
707 	free(h);
708 }
709 
710 lws_ss_state_return_t
lws_sspc_request_tx(lws_sspc_handle_t * h)711 lws_sspc_request_tx(lws_sspc_handle_t *h)
712 {
713 	if (!h || !h->cwsi)
714 		return LWSSSSRET_OK;
715 
716 	if (!h->us_earliest_write_req)
717 		h->us_earliest_write_req = lws_now_usecs();
718 
719 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
720 	    h->conn_req_state == LWSSSPC_ONW_NONE)
721 		h->conn_req_state = LWSSSPC_ONW_REQ;
722 
723 	lws_callback_on_writable(h->cwsi);
724 
725 	return LWSSSSRET_OK;
726 }
727 
728 /*
729  * Currently we fulfil the writeable part locally by just enabling POLLOUT on
730  * the UDS link, without serialization footprint, which is reasonable as far as
731  * it goes.
732  *
733  * But for the ..._len() variant, the expected payload length hint we are being
734  * told is something that must be serialized to the onward peer, since either
735  * that guy or someone upstream of him is the guy who will compose the framing
736  * with it that actually goes out.
737  *
738  * This information is needed at the upstream guy before we have sent any
739  * payload, eg, for http POST, he has to prepare the content-length in the
740  * headers, before any payload.  So we have to issue a serialization of the
741  * length at this point.
742  */
743 
744 lws_ss_state_return_t
lws_sspc_request_tx_len(lws_sspc_handle_t * h,unsigned long len)745 lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
746 {
747 	/*
748 	 * for client conns, they cannot even complete creation of the handle
749 	 * without the onwared connection to the proxy, it's not legal to start
750 	 * using it until it's operation and has the onward connection (and the
751 	 * link has called CREATED state)
752 	 */
753 
754 	if (!h)
755 		return LWSSSSRET_OK;
756 
757 	lwsl_notice("%s: setting %s writeable_len %u\n", __func__, h->lc.gutag,
758 			(unsigned int)len);
759 	h->writeable_len = len;
760 	h->pending_writeable_len = 1;
761 
762 	if (!h->us_earliest_write_req)
763 		h->us_earliest_write_req = lws_now_usecs();
764 
765 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
766 	    h->conn_req_state == LWSSSPC_ONW_NONE)
767 		h->conn_req_state = LWSSSPC_ONW_REQ;
768 
769 	/*
770 	 * We're going to use this up with serializing h->writeable_len... that
771 	 * will request again.
772 	 */
773 
774 	if (h->cwsi)
775 		lws_callback_on_writable(h->cwsi);
776 
777 	return LWSSSSRET_OK;
778 }
779 
780 int
lws_sspc_client_connect(lws_sspc_handle_t * h)781 lws_sspc_client_connect(lws_sspc_handle_t *h)
782 {
783 	if (!h || h->state == LPCSCLI_OPERATIONAL)
784 		return 0;
785 
786 	assert(h->state == LPCSCLI_LOCAL_CONNECTED);
787 	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
788 	    h->conn_req_state == LWSSSPC_ONW_NONE)
789 		h->conn_req_state = LWSSSPC_ONW_REQ;
790 	if (h->cwsi)
791 		lws_callback_on_writable(h->cwsi);
792 
793 	return 0;
794 }
795 
796 struct lws_context *
lws_sspc_get_context(struct lws_sspc_handle * h)797 lws_sspc_get_context(struct lws_sspc_handle *h)
798 {
799 	return h->context;
800 }
801 
802 const char *
lws_sspc_rideshare(struct lws_sspc_handle * h)803 lws_sspc_rideshare(struct lws_sspc_handle *h)
804 {
805 	/*
806 	 * ...the serialized RX rideshare name if any...
807 	 */
808 
809 	if (h->parser.rideshare[0]) {
810 		lwsl_info("%s: parser %s\n", __func__, h->parser.rideshare);
811 		return h->parser.rideshare;
812 	}
813 
814 	/*
815 	 * The tx rideshare index
816 	 */
817 
818 	if (h->rideshare_list[0]) {
819 		lwsl_info("%s: tx list %s\n", __func__,
820 			  &h->rideshare_list[h->rideshare_ofs[h->rsidx]]);
821 		return &h->rideshare_list[h->rideshare_ofs[h->rsidx]];
822 	}
823 
824 	/*
825 	 * ... otherwise default to our stream type name
826 	 */
827 
828 	lwsl_info("%s: def %s\n", __func__, h->ssi.streamtype);
829 
830 	return h->ssi.streamtype;
831 }
832 
833 static int
_lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len,int tx_cr_adjust)834 _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
835 		       const void *value, size_t len, int tx_cr_adjust)
836 {
837 	lws_sspc_metadata_t *md;
838 
839 	/*
840 	 * Are we replacing a pending metadata of the same name?  It's not
841 	 * efficient to do this but user code can do what it likes... let's
842 	 * optimize away the old one.
843 	 *
844 	 * Tx credit adjust always has name ""
845 	 */
846 
847 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
848 				   lws_dll2_get_head(&h->metadata_owner)) {
849 		md = lws_container_of(d, lws_sspc_metadata_t, list);
850 
851 		if (!strcmp(name, md->name)) {
852 			lws_dll2_remove(&md->list);
853 			lws_free(md);
854 			break;
855 		}
856 
857 	} lws_end_foreach_dll_safe(d, d1);
858 
859 	/*
860 	 * We have to stash the metadata and pass it to the proxy
861 	 */
862 
863 	if (lws_fi(&h->fic, "sspc_fail_metadata_set"))
864 		md = NULL;
865 	else
866 		md = lws_malloc(sizeof(*md) + len, "set metadata");
867 	if (!md) {
868 		lwsl_err("%s: OOM\n", __func__);
869 
870 		return 1;
871 	}
872 
873 	memset(md, 0, sizeof(*md));
874 
875 	md->tx_cr_adjust = tx_cr_adjust;
876 	h->txc.peer_tx_cr_est += tx_cr_adjust;
877 
878 	lws_strncpy(md->name, name, sizeof(md->name));
879 	md->len = len;
880 	if (len)
881 		memcpy(&md[1], value, len);
882 
883 	lws_dll2_add_tail(&md->list, &h->metadata_owner);
884 
885 	if (len) {
886 		lwsl_info("%s: set metadata %s\n", __func__, name);
887 		lwsl_hexdump_info(value, len);
888 	} else
889 		lwsl_info("%s: serializing tx cr adj %d\n", __func__,
890 			    (int)tx_cr_adjust);
891 
892 	if (h->cwsi)
893 		lws_callback_on_writable(h->cwsi);
894 
895 	return 0;
896 }
897 
898 int
lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len)899 lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
900 		      const void *value, size_t len)
901 {
902 	return _lws_sspc_set_metadata(h, name, value, len, 0);
903 }
904 
905 int
lws_sspc_get_metadata(struct lws_sspc_handle * h,const char * name,const void ** value,size_t * len)906 lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
907 		      const void **value, size_t *len)
908 {
909 	lws_sspc_metadata_t *md;
910 
911 	/*
912 	 * client side does not have access to policy
913 	 * and any metadata are new to it each time,
914 	 * we allocate them, removing any existing with
915 	 * the same name first
916 	 */
917 
918 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
919 			lws_dll2_get_head(&h->metadata_owner_rx)) {
920 		md = lws_container_of(d,
921 			   lws_sspc_metadata_t, list);
922 
923 		if (!strcmp(md->name, name)) {
924 			*len = md->len;
925 			*value = &md[1];
926 
927 			return 0;
928 		}
929 
930 	} lws_end_foreach_dll_safe(d, d1);
931 
932 	return 1;
933 }
934 
935 int
lws_sspc_add_peer_tx_credit(struct lws_sspc_handle * h,int32_t bump)936 lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
937 {
938 	lwsl_notice("%s: %d\n", __func__, bump);
939 	return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
940 }
941 
942 int
lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle * h)943 lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
944 {
945 	return h->txc.peer_tx_cr_est;
946 }
947 
948 void
lws_sspc_start_timeout(struct lws_sspc_handle * h,unsigned int timeout_ms)949 lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
950 {
951 	if (!h->cwsi)
952 		/* we can't fulfil it */
953 		return;
954 	h->timeout_ms = (uint32_t)timeout_ms;
955 	h->pending_timeout_update = 1;
956 	lws_callback_on_writable(h->cwsi);
957 }
958 
959 void
lws_sspc_cancel_timeout(struct lws_sspc_handle * h)960 lws_sspc_cancel_timeout(struct lws_sspc_handle *h)
961 {
962 	lws_sspc_start_timeout(h, (unsigned int)-1);
963 }
964 
965 void *
lws_sspc_to_user_object(struct lws_sspc_handle * h)966 lws_sspc_to_user_object(struct lws_sspc_handle *h)
967 {
968 	return (void *)&h[1];
969 }
970 
971 void
lws_sspc_change_handlers(struct lws_sspc_handle * h,lws_ss_state_return_t (* rx)(void * userobj,const uint8_t * buf,size_t len,int flags),lws_ss_state_return_t (* tx)(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags),lws_ss_state_return_t (* state)(void * userobj,void * h_src,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack))972 lws_sspc_change_handlers(struct lws_sspc_handle *h,
973 	lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
974 	lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
975 		  size_t *len, int *flags),
976 	lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
977 		     lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
978 {
979 	if (rx)
980 		h->ssi.rx = rx;
981 	if (tx)
982 		h->ssi.tx = tx;
983 	if (state)
984 		h->ssi.state = state;
985 }
986 
987 const char *
lws_sspc_tag(struct lws_sspc_handle * h)988 lws_sspc_tag(struct lws_sspc_handle *h)
989 {
990 	if (!h)
991 		return "[null sspc]";
992 	return lws_lc_tag(&h->lc);
993 }
994