1 /*
2 * lws-minimal-secure-streams-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 *
10 * This client does not perform any INET networking... instead it opens a unix
11 * domain socket on a proxy that is listening for it, and that creates the
12 * actual secure stream connection.
13 *
14 * We are able to use the usual secure streams api in the client process, with
15 * payloads and connection state information proxied over the unix domain
16 * socket and fulfilled in the proxy process.
17 *
18 * The public client helper pieces are built as part of lws
19 */
20 #include <private-lib-core.h>
21
22 lws_ss_state_return_t
lws_sspc_event_helper(lws_sspc_handle_t * h,lws_ss_constate_t cs,lws_ss_tx_ordinal_t flags)23 lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs,
24 lws_ss_tx_ordinal_t flags)
25 {
26 if (!h)
27 return LWSSSSRET_OK;
28
29 if (lws_ss_check_next_state(&h->lc, &h->prev_ss_state, cs))
30 return LWSSSSRET_DESTROY_ME;
31
32 if (!h->ssi.state)
33 return LWSSSSRET_OK;
34
35 return h->ssi.state((void *)((uint8_t *)&h[1]), NULL, cs, flags);
36 }
37
38 static void
lws_sspc_sul_retry_cb(lws_sorted_usec_list_t * sul)39 lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
40 {
41 lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry);
42 static struct lws_client_connect_info i;
43
44 /*
45 * We may have started up before the system proxy, so be prepared with
46 * a sul to retry at 1Hz
47 */
48
49 memset(&i, 0, sizeof i);
50 i.context = h->context;
51 if (h->context->ss_proxy_port) { /* tcp */
52 i.address = h->context->ss_proxy_address;
53 i.port = h->context->ss_proxy_port;
54 i.iface = h->context->ss_proxy_bind;
55 } else {
56 if (h->context->ss_proxy_bind)
57 i.address = h->context->ss_proxy_bind;
58 else
59 #if defined(__linux__)
60 i.address = "+@proxy.ss.lws";
61 #else
62 i.address = "+/tmp/proxy.ss.lws";
63 #endif
64 }
65 i.host = i.address;
66 i.origin = i.address;
67 i.method = "RAW";
68 i.protocol = lws_sspc_protocols[0].name;
69 i.local_protocol_name = lws_sspc_protocols[0].name;
70 i.path = "";
71 i.pwsi = &h->cwsi;
72 i.opaque_user_data = (void *)h;
73 i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK;
74
75 lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn);
76 #if defined(LWS_WITH_SYS_METRICS)
77 lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype);
78 #endif
79
80 /* this wsi is the link to the proxy */
81
82 if (!lws_client_connect_via_info(&i)) {
83
84 #if defined(LWS_WITH_SYS_METRICS)
85 /*
86 * If any hanging caliper measurement, dump it, and free any tags
87 */
88 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
89 #endif
90
91 lws_sul_schedule(h->context, 0, &h->sul_retry,
92 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
93
94 return;
95 }
96
97 lwsl_notice("%s: %s\n", __func__, h->cwsi->lc.gutag);
98 }
99
100 static int
lws_sspc_serialize_metadata(lws_sspc_metadata_t * md,uint8_t * p,uint8_t * end)101 lws_sspc_serialize_metadata(lws_sspc_metadata_t *md, uint8_t *p, uint8_t *end)
102 {
103 int n, txc;
104
105 if (md->name[0] == '\0') {
106
107 lwsl_info("%s: sending tx credit update %d\n", __func__,
108 md->tx_cr_adjust);
109
110 p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE;
111 lws_ser_wu16be(&p[1], 4);
112 lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust);
113
114 n = 7;
115
116 } else {
117
118 lwsl_info("%s: sending metadata\n", __func__);
119
120 p[0] = LWSSS_SER_TXPRE_METADATA;
121 txc = (int)strlen(md->name);
122 n = txc + 1 + (int)md->len;
123 if (n > 0xffff)
124 /* we can't serialize this metadata in 16b length */
125 return -1;
126 if (n > lws_ptr_diff(end, &p[4]))
127 /* we don't have space for this metadata */
128 return -1;
129 lws_ser_wu16be(&p[1], (uint16_t)n);
130 p[3] = (uint8_t)txc;
131 memcpy(&p[4], md->name, (unsigned int)txc);
132 memcpy(&p[4 + txc], &md[1], md->len);
133 n = 4 + txc + (int)md->len;
134 }
135
136 lws_dll2_remove(&md->list);
137 lws_free(md);
138
139 return n;
140 }
141
142 static int
callback_sspc_client(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)143 callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
144 void *user, void *in, size_t len)
145 {
146 lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi);
147 size_t pktsize = wsi->a.context->max_http_header_data;
148 void *m = (void *)((uint8_t *)&h[1]);
149 uint8_t *pkt = NULL, *p = NULL, *end = NULL;
150 const uint8_t *cp;
151 uint8_t s[64];
152 lws_usec_t us;
153 int flags, n;
154
155 switch (reason) {
156 case LWS_CALLBACK_PROTOCOL_INIT:
157 break;
158
159 case LWS_CALLBACK_PROTOCOL_DESTROY:
160 break;
161
162 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
163 lwsl_warn("%s: CONNECTION_ERROR\n", __func__);
164 #if defined(LWS_WITH_SYS_METRICS)
165 /*
166 * If any hanging caliper measurement, dump it, and free any tags
167 */
168 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
169 #endif
170 lws_set_opaque_user_data(wsi, NULL);
171 h->cwsi = NULL;
172 lws_sul_schedule(h->context, 0, &h->sul_retry,
173 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
174 break;
175
176 case LWS_CALLBACK_RAW_CONNECTED:
177 if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup"))
178 return -1;
179 lwsl_info("%s: CONNECTED (%s)\n", __func__, h->ssi.streamtype);
180
181 h->state = LPCSCLI_SENDING_INITIAL_TX;
182 /*
183 * We create the dsh at the response to the initial tx, which
184 * will let us know the policy's max size for it... let's
185 * protect the connection with a promise to complete the
186 * SS serialization streamtype negotation within a short period,
187 * we will cancel this timeout when we have the proxy's ack
188 * of the streamtype serialization, eg, it exists in the proxy
189 * policy etc
190 */
191 lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
192 lws_callback_on_writable(wsi);
193 break;
194
195 case LWS_CALLBACK_RAW_CLOSE:
196 /*
197 * our ss proxy Unix Domain socket has closed...
198 */
199 lwsl_info("%s: LWS_CALLBACK_RAW_CLOSE: %s proxy conn down, sspc h %s\n",
200 __func__, lws_wsi_tag(wsi), lws_sspc_tag(h));
201 if (!h) {
202 lwsl_info("%s: no sspc on client proxy link close\n", __func__);
203 break;
204 }
205
206 lws_dsh_destroy(&h->dsh);
207 if (h->ss_dangling_connected && h->ssi.state) {
208 lws_ss_state_return_t ret_state;
209
210 lwsl_notice("%s: setting _DISCONNECTED\n", __func__);
211 h->ss_dangling_connected = 0;
212 h->prev_ss_state = LWSSSCS_DISCONNECTED;
213 ret_state = h->ssi.state(ss_to_userobj(h), NULL,
214 LWSSSCS_DISCONNECTED, 0);
215 if (ret_state == LWSSSSRET_DESTROY_ME) {
216 h->cwsi = NULL;
217 lws_set_opaque_user_data(wsi, NULL);
218 lws_sspc_destroy(&h);
219 break;
220 }
221 }
222
223 h->cwsi = NULL;
224 /*
225 * schedule a reconnect in 1s
226 */
227 lws_sul_schedule(h->context, 0, &h->sul_retry,
228 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
229
230 break;
231
232 case LWS_CALLBACK_RAW_RX:
233 /*
234 * ie, the proxy has sent us something
235 */
236 lwsl_info("%s: RAW_RX: rx %d\n", __func__, (int)len);
237
238 if (!h || !h->cwsi) {
239 lwsl_info("%s: rx when client ss destroyed\n", __func__);
240
241 return -1;
242 }
243
244 if (!len) {
245 lwsl_notice("%s: RAW_RX: zero len\n", __func__);
246
247 return -1;
248 }
249
250 if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me"))
251 n = LWSSSSRET_DISCONNECT_ME;
252 else
253 if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me"))
254 n = LWSSSSRET_DESTROY_ME;
255 else
256 n = lws_ss_deserialize_parse(&h->parser,
257 lws_get_context(wsi),
258 h->dsh, in, len,
259 &h->state, h,
260 (lws_ss_handle_t **)m,
261 &h->ssi, 1);
262 switch (n) {
263 case LWSSSSRET_OK:
264 break;
265 case LWSSSSRET_DISCONNECT_ME:
266 lwsl_notice("%s: proxlicent RX ended with DISCONNECT_ME\n",
267 __func__);
268 return -1;
269 case LWSSSSRET_DESTROY_ME:
270 lwsl_notice("%s: proxlicent RX ended with DESTROY_ME\n",
271 __func__);
272 lws_set_opaque_user_data(wsi, NULL);
273 lws_sspc_destroy(&h);
274 return -1;
275 }
276
277 if (h->state == LPCSCLI_LOCAL_CONNECTED ||
278 h->state == LPCSCLI_ONWARD_CONNECT)
279 lws_set_timeout(wsi, 0, 0);
280
281 break;
282
283 case LWS_CALLBACK_RAW_WRITEABLE:
284
285 /*
286 * We can transmit something to the proxy...
287 */
288
289 if (!h)
290 break;
291
292 lwsl_debug("%s: WRITEABLE %s, state %d\n", __func__,
293 wsi->lc.gutag, h->state);
294
295 /*
296 * Management of ss timeout can happen any time and doesn't
297 * depend on wsi existence or state
298 */
299
300 n = 0;
301 cp = s;
302
303 if (h->pending_timeout_update) {
304 s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE;
305 s[1] = 0;
306 s[2] = 4;
307 /*
308 * 0: use policy timeout value
309 * 0xffffffff: cancel the timeout
310 */
311 lws_ser_wu32be(&s[3], h->timeout_ms);
312 /* in case anything else to write */
313 lws_callback_on_writable(h->cwsi);
314 h->pending_timeout_update = 0;
315 n = 7;
316 goto do_write;
317 }
318
319 s[1] = 0;
320 /*
321 * This is the state of the link that connects us to the onward
322 * proxy
323 */
324 switch (h->state) {
325 case LPCSCLI_SENDING_INITIAL_TX:
326 /*
327 * We are negotating the opening of a particular
328 * streamtype
329 */
330 n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4;
331
332 s[0] = LWSSS_SER_TXPRE_STREAMTYPE;
333 lws_ser_wu16be(&s[1], (uint16_t)n);
334 /* SSSv1: add protocol version byte (initially 1) */
335 s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION;
336 lws_ser_wu32be(&s[4], (uint32_t)getpid());
337 lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est);
338 //h->txcr_out = txc;
339 lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12);
340 n += 3;
341 h->state = LPCSCLI_WAITING_CREATE_RESULT;
342
343 break;
344
345 case LPCSCLI_LOCAL_CONNECTED:
346
347 // lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__);
348
349 /*
350 * Do we need to prioritize sending any metadata
351 * changes?
352 */
353
354 if (h->metadata_owner.count) {
355 lws_sspc_metadata_t *md = lws_container_of(
356 lws_dll2_get_tail(&h->metadata_owner),
357 lws_sspc_metadata_t, list);
358
359 pkt = lws_malloc(pktsize + LWS_PRE, __func__);
360 if (!pkt)
361 goto hangup;
362 cp = p = pkt + LWS_PRE;
363 end = p + pktsize;
364
365 n = lws_sspc_serialize_metadata(md, p, end);
366 if (n < 0)
367 goto metadata_hangup;
368
369 lwsl_debug("%s: (local_conn) metadata\n", __func__);
370
371 goto req_write_and_issue;
372 }
373
374 if (h->pending_writeable_len) {
375 lwsl_debug("%s: (local_conn) PAYLOAD_LENGTH_HINT %u\n",
376 __func__, (unsigned int)h->writeable_len);
377 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
378 lws_ser_wu16be(&s[1], 4);
379 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
380 h->pending_writeable_len = 0;
381 n = 7;
382 goto req_write_and_issue;
383 }
384
385 if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) {
386 lwsl_info("%s: conn_req_state %d\n", __func__,
387 h->conn_req_state);
388 break;
389 }
390
391 lwsl_info("%s: (local_conn) onward connect\n", __func__);
392
393 h->conn_req_state = LWSSSPC_ONW_ONGOING;
394
395 s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT;
396 s[1] = 0;
397 s[2] = 0;
398 n = 3;
399 break;
400
401 case LPCSCLI_OPERATIONAL:
402
403 /*
404 *
405 * - Do we need to prioritize sending any metadata
406 * changes? (includes txcr updates)
407 *
408 * - Do we need to forward a hint about the payload
409 * length?
410 */
411
412 pkt = lws_malloc(pktsize + LWS_PRE, __func__);
413 if (!pkt)
414 goto hangup;
415 cp = p = pkt + LWS_PRE;
416 end = p + pktsize;
417
418 if (h->metadata_owner.count) {
419 lws_sspc_metadata_t *md = lws_container_of(
420 lws_dll2_get_tail(&h->metadata_owner),
421 lws_sspc_metadata_t, list);
422
423 n = lws_sspc_serialize_metadata(md, p, end);
424 if (n < 0)
425 goto metadata_hangup;
426
427 goto req_write_and_issue;
428 }
429
430 if (h->pending_writeable_len) {
431 lwsl_info("%s: PAYLOAD_LENGTH_HINT %u\n",
432 __func__, (unsigned int)h->writeable_len);
433 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
434 lws_ser_wu16be(&s[1], 4);
435 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
436 h->pending_writeable_len = 0;
437 n = 7;
438 goto req_write_and_issue;
439 }
440
441 /* we can't write anything if we don't have credit */
442 if (!h->ignore_txc && h->txc.tx_cr <= 0) {
443 lwsl_info("%s: WRITEABLE / OPERATIONAL:"
444 " lack credit (%d)\n", __func__,
445 h->txc.tx_cr);
446 // break;
447 }
448
449 len = pktsize - LWS_PRE - 19;
450 flags = 0;
451 if (!h->ssi.tx) {
452 n = 0;
453 goto do_write_nz;
454 }
455
456 n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len,
457 &flags);
458 switch (n) {
459 case LWSSSSRET_TX_DONT_SEND:
460 n = 0;
461 goto do_write_nz;
462
463 case LWSSSSRET_DISCONNECT_ME:
464 case LWSSSSRET_DESTROY_ME:
465 lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__);
466 break;
467 default:
468 break;
469 }
470
471 h->txc.tx_cr = h->txc.tx_cr - (int)len;
472
473 cp = p;
474 n = (int)(len + 19);
475 us = lws_now_usecs();
476 p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD;
477 lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3));
478 lws_ser_wu32be(&p[3], (uint32_t)flags);
479 /* time spent here waiting to send this */
480 lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req));
481 /* ust that the client write happened */
482 lws_ser_wu64be(&p[11], (uint64_t)us);
483 h->us_earliest_write_req = 0;
484
485 if (flags & LWSSS_FLAG_EOM)
486 if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) &&
487 h->rideshare_ofs[h->rsidx + 1])
488 h->rsidx++;
489
490 break;
491 default:
492 break;
493 }
494
495 do_write_nz:
496
497 if (!n)
498 break;
499
500 do_write:
501 if (lws_fi(&h->fic, "sspc_link_write_fail"))
502 n = -1;
503 else
504 n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
505 if (n < 0) {
506 lwsl_notice("%s: WRITEABLE: %d\n", __func__, n);
507
508 goto hangup;
509 }
510 break;
511
512 default:
513 break;
514 }
515
516 lws_free(pkt);
517
518 return lws_callback_http_dummy(wsi, reason, user, in, len);
519
520 metadata_hangup:
521 lwsl_err("%s: metadata too large\n", __func__);
522
523 hangup:
524 lws_free(pkt);
525 lwsl_warn("hangup\n");
526 /* hang up on him */
527 return -1;
528
529 req_write_and_issue:
530 /* in case anything else to write */
531 lws_callback_on_writable(h->cwsi);
532 goto do_write_nz;
533 }
534
535 const struct lws_protocols lws_sspc_protocols[] = {
536 {
537 "ssproxy-protocol",
538 callback_sspc_client,
539 0,
540 2048, 2048, NULL, 0
541 },
542 { NULL, NULL, 0, 0, 0, NULL, 0 }
543 };
544
545 int
lws_sspc_create(struct lws_context * context,int tsi,const lws_ss_info_t * ssi,void * opaque_user_data,lws_sspc_handle_t ** ppss,struct lws_sequencer * seq_owner,const char ** ppayload_fmt)546 lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
547 void *opaque_user_data, lws_sspc_handle_t **ppss,
548 struct lws_sequencer *seq_owner, const char **ppayload_fmt)
549 {
550 lws_sspc_handle_t *h;
551 uint8_t *ua;
552 char *p;
553
554 /* allocate the handle (including ssi), the user alloc,
555 * and the streamname */
556
557 h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc +
558 strlen(ssi->streamtype) + 1);
559 if (!h)
560 return 1;
561 memset(h, 0, sizeof(*h));
562
563 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
564 h->fic.name = "sspc";
565 lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos));
566 if (ssi->fic.fi_owner.count)
567 lws_fi_import(&h->fic, &ssi->fic);
568
569 lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
570 #endif
571
572 if (lws_fi(&h->fic, "sspc_create_oom")) {
573 /*
574 * We have to do this a litte later, so we can cleanly inherit
575 * the OOM pieces and drain the info fic
576 */
577 lws_fi_destroy(&h->fic);
578 free(h);
579 return 1;
580 }
581
582 __lws_lc_tag(&context->lcg[LWSLCG_SSP_CLIENT], &h->lc, ssi->streamtype);
583
584 memcpy(&h->ssi, ssi, sizeof(*ssi));
585 ua = (uint8_t *)&h[1];
586 memset(ua, 0, ssi->user_alloc);
587 p = (char *)ua + ssi->user_alloc;
588 memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
589 h->ssi.streamtype = (const char *)p;
590 h->context = context;
591
592 if (!ssi->manual_initial_tx_credit)
593 h->txc.peer_tx_cr_est = 500000000;
594 else
595 h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
596
597 if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
598 h->ignore_txc = 1;
599
600 lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
601
602 /* fill in the things the real api does for the caller */
603
604 *((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data;
605 *((void **)(ua + ssi->handle_offset)) = h;
606
607 if (ppss)
608 *ppss = h;
609
610 /* try the actual connect */
611
612 lws_sspc_sul_retry_cb(&h->sul_retry);
613
614 return 0;
615 }
616
617 /* used on context destroy when iterating listed lws_ss on a pt */
618
619 int
lws_sspc_destroy_dll(struct lws_dll2 * d,void * user)620 lws_sspc_destroy_dll(struct lws_dll2 *d, void *user)
621 {
622 lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
623
624 lws_sspc_destroy(&h);
625
626 return 0;
627 }
628
629 void
lws_sspc_rxmetadata_destroy(lws_sspc_handle_t * h)630 lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h)
631 {
632 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
633 lws_dll2_get_head(&h->metadata_owner_rx)) {
634 lws_sspc_metadata_t *md =
635 lws_container_of(d, lws_sspc_metadata_t, list);
636
637 lws_dll2_remove(&md->list);
638 lws_free(md);
639
640 } lws_end_foreach_dll_safe(d, d1);
641 }
642
643 void
lws_sspc_destroy(lws_sspc_handle_t ** ph)644 lws_sspc_destroy(lws_sspc_handle_t **ph)
645 {
646 lws_sspc_handle_t *h;
647
648 lwsl_debug("%s\n", __func__);
649
650 if (!*ph)
651 return;
652
653 h = *ph;
654
655 if (h->destroying)
656 return;
657
658 h->destroying = 1;
659
660 /* if this caliper is still dangling at destroy, we failed */
661 #if defined(LWS_WITH_SYS_METRICS)
662 /*
663 * If any hanging caliper measurement, dump it, and free any tags
664 */
665 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
666 #endif
667 if (h->ss_dangling_connected && h->ssi.state) {
668 lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0);
669 h->ss_dangling_connected = 0;
670 }
671
672 #if defined(LWS_WITH_SYS_FAULT_INJECTION)
673 lws_fi_destroy(&h->fic);
674 #endif
675
676 lws_sul_cancel(&h->sul_retry);
677 lws_dll2_remove(&h->client_list);
678
679 if (h->dsh)
680 lws_dsh_destroy(&h->dsh);
681 if (h->cwsi) {
682 lws_set_opaque_user_data(h->cwsi, NULL);
683 lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC);
684 h->cwsi = NULL;
685 }
686
687 /* clean out any pending metadata changes that didn't make it */
688
689 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
690 lws_dll2_get_head(&(*ph)->metadata_owner)) {
691 lws_sspc_metadata_t *md =
692 lws_container_of(d, lws_sspc_metadata_t, list);
693
694 lws_dll2_remove(&md->list);
695 lws_free(md);
696
697 } lws_end_foreach_dll_safe(d, d1);
698
699 lws_sspc_rxmetadata_destroy(h);
700
701 lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0);
702 *ph = NULL;
703
704 lws_sul_cancel(&h->sul_retry);
705
706 __lws_lc_untag(&h->lc);
707 free(h);
708 }
709
710 lws_ss_state_return_t
lws_sspc_request_tx(lws_sspc_handle_t * h)711 lws_sspc_request_tx(lws_sspc_handle_t *h)
712 {
713 if (!h || !h->cwsi)
714 return LWSSSSRET_OK;
715
716 if (!h->us_earliest_write_req)
717 h->us_earliest_write_req = lws_now_usecs();
718
719 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
720 h->conn_req_state == LWSSSPC_ONW_NONE)
721 h->conn_req_state = LWSSSPC_ONW_REQ;
722
723 lws_callback_on_writable(h->cwsi);
724
725 return LWSSSSRET_OK;
726 }
727
728 /*
729 * Currently we fulfil the writeable part locally by just enabling POLLOUT on
730 * the UDS link, without serialization footprint, which is reasonable as far as
731 * it goes.
732 *
733 * But for the ..._len() variant, the expected payload length hint we are being
734 * told is something that must be serialized to the onward peer, since either
735 * that guy or someone upstream of him is the guy who will compose the framing
736 * with it that actually goes out.
737 *
738 * This information is needed at the upstream guy before we have sent any
739 * payload, eg, for http POST, he has to prepare the content-length in the
740 * headers, before any payload. So we have to issue a serialization of the
741 * length at this point.
742 */
743
744 lws_ss_state_return_t
lws_sspc_request_tx_len(lws_sspc_handle_t * h,unsigned long len)745 lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
746 {
747 /*
748 * for client conns, they cannot even complete creation of the handle
749 * without the onwared connection to the proxy, it's not legal to start
750 * using it until it's operation and has the onward connection (and the
751 * link has called CREATED state)
752 */
753
754 if (!h)
755 return LWSSSSRET_OK;
756
757 lwsl_notice("%s: setting %s writeable_len %u\n", __func__, h->lc.gutag,
758 (unsigned int)len);
759 h->writeable_len = len;
760 h->pending_writeable_len = 1;
761
762 if (!h->us_earliest_write_req)
763 h->us_earliest_write_req = lws_now_usecs();
764
765 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
766 h->conn_req_state == LWSSSPC_ONW_NONE)
767 h->conn_req_state = LWSSSPC_ONW_REQ;
768
769 /*
770 * We're going to use this up with serializing h->writeable_len... that
771 * will request again.
772 */
773
774 if (h->cwsi)
775 lws_callback_on_writable(h->cwsi);
776
777 return LWSSSSRET_OK;
778 }
779
780 int
lws_sspc_client_connect(lws_sspc_handle_t * h)781 lws_sspc_client_connect(lws_sspc_handle_t *h)
782 {
783 if (!h || h->state == LPCSCLI_OPERATIONAL)
784 return 0;
785
786 assert(h->state == LPCSCLI_LOCAL_CONNECTED);
787 if (h->state == LPCSCLI_LOCAL_CONNECTED &&
788 h->conn_req_state == LWSSSPC_ONW_NONE)
789 h->conn_req_state = LWSSSPC_ONW_REQ;
790 if (h->cwsi)
791 lws_callback_on_writable(h->cwsi);
792
793 return 0;
794 }
795
796 struct lws_context *
lws_sspc_get_context(struct lws_sspc_handle * h)797 lws_sspc_get_context(struct lws_sspc_handle *h)
798 {
799 return h->context;
800 }
801
802 const char *
lws_sspc_rideshare(struct lws_sspc_handle * h)803 lws_sspc_rideshare(struct lws_sspc_handle *h)
804 {
805 /*
806 * ...the serialized RX rideshare name if any...
807 */
808
809 if (h->parser.rideshare[0]) {
810 lwsl_info("%s: parser %s\n", __func__, h->parser.rideshare);
811 return h->parser.rideshare;
812 }
813
814 /*
815 * The tx rideshare index
816 */
817
818 if (h->rideshare_list[0]) {
819 lwsl_info("%s: tx list %s\n", __func__,
820 &h->rideshare_list[h->rideshare_ofs[h->rsidx]]);
821 return &h->rideshare_list[h->rideshare_ofs[h->rsidx]];
822 }
823
824 /*
825 * ... otherwise default to our stream type name
826 */
827
828 lwsl_info("%s: def %s\n", __func__, h->ssi.streamtype);
829
830 return h->ssi.streamtype;
831 }
832
833 static int
_lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len,int tx_cr_adjust)834 _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
835 const void *value, size_t len, int tx_cr_adjust)
836 {
837 lws_sspc_metadata_t *md;
838
839 /*
840 * Are we replacing a pending metadata of the same name? It's not
841 * efficient to do this but user code can do what it likes... let's
842 * optimize away the old one.
843 *
844 * Tx credit adjust always has name ""
845 */
846
847 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
848 lws_dll2_get_head(&h->metadata_owner)) {
849 md = lws_container_of(d, lws_sspc_metadata_t, list);
850
851 if (!strcmp(name, md->name)) {
852 lws_dll2_remove(&md->list);
853 lws_free(md);
854 break;
855 }
856
857 } lws_end_foreach_dll_safe(d, d1);
858
859 /*
860 * We have to stash the metadata and pass it to the proxy
861 */
862
863 if (lws_fi(&h->fic, "sspc_fail_metadata_set"))
864 md = NULL;
865 else
866 md = lws_malloc(sizeof(*md) + len, "set metadata");
867 if (!md) {
868 lwsl_err("%s: OOM\n", __func__);
869
870 return 1;
871 }
872
873 memset(md, 0, sizeof(*md));
874
875 md->tx_cr_adjust = tx_cr_adjust;
876 h->txc.peer_tx_cr_est += tx_cr_adjust;
877
878 lws_strncpy(md->name, name, sizeof(md->name));
879 md->len = len;
880 if (len)
881 memcpy(&md[1], value, len);
882
883 lws_dll2_add_tail(&md->list, &h->metadata_owner);
884
885 if (len) {
886 lwsl_info("%s: set metadata %s\n", __func__, name);
887 lwsl_hexdump_info(value, len);
888 } else
889 lwsl_info("%s: serializing tx cr adj %d\n", __func__,
890 (int)tx_cr_adjust);
891
892 if (h->cwsi)
893 lws_callback_on_writable(h->cwsi);
894
895 return 0;
896 }
897
898 int
lws_sspc_set_metadata(struct lws_sspc_handle * h,const char * name,const void * value,size_t len)899 lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
900 const void *value, size_t len)
901 {
902 return _lws_sspc_set_metadata(h, name, value, len, 0);
903 }
904
905 int
lws_sspc_get_metadata(struct lws_sspc_handle * h,const char * name,const void ** value,size_t * len)906 lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
907 const void **value, size_t *len)
908 {
909 lws_sspc_metadata_t *md;
910
911 /*
912 * client side does not have access to policy
913 * and any metadata are new to it each time,
914 * we allocate them, removing any existing with
915 * the same name first
916 */
917
918 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
919 lws_dll2_get_head(&h->metadata_owner_rx)) {
920 md = lws_container_of(d,
921 lws_sspc_metadata_t, list);
922
923 if (!strcmp(md->name, name)) {
924 *len = md->len;
925 *value = &md[1];
926
927 return 0;
928 }
929
930 } lws_end_foreach_dll_safe(d, d1);
931
932 return 1;
933 }
934
935 int
lws_sspc_add_peer_tx_credit(struct lws_sspc_handle * h,int32_t bump)936 lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
937 {
938 lwsl_notice("%s: %d\n", __func__, bump);
939 return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
940 }
941
942 int
lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle * h)943 lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
944 {
945 return h->txc.peer_tx_cr_est;
946 }
947
948 void
lws_sspc_start_timeout(struct lws_sspc_handle * h,unsigned int timeout_ms)949 lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
950 {
951 if (!h->cwsi)
952 /* we can't fulfil it */
953 return;
954 h->timeout_ms = (uint32_t)timeout_ms;
955 h->pending_timeout_update = 1;
956 lws_callback_on_writable(h->cwsi);
957 }
958
959 void
lws_sspc_cancel_timeout(struct lws_sspc_handle * h)960 lws_sspc_cancel_timeout(struct lws_sspc_handle *h)
961 {
962 lws_sspc_start_timeout(h, (unsigned int)-1);
963 }
964
965 void *
lws_sspc_to_user_object(struct lws_sspc_handle * h)966 lws_sspc_to_user_object(struct lws_sspc_handle *h)
967 {
968 return (void *)&h[1];
969 }
970
971 void
lws_sspc_change_handlers(struct lws_sspc_handle * h,lws_ss_state_return_t (* rx)(void * userobj,const uint8_t * buf,size_t len,int flags),lws_ss_state_return_t (* tx)(void * userobj,lws_ss_tx_ordinal_t ord,uint8_t * buf,size_t * len,int * flags),lws_ss_state_return_t (* state)(void * userobj,void * h_src,lws_ss_constate_t state,lws_ss_tx_ordinal_t ack))972 lws_sspc_change_handlers(struct lws_sspc_handle *h,
973 lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
974 lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
975 size_t *len, int *flags),
976 lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
977 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
978 {
979 if (rx)
980 h->ssi.rx = rx;
981 if (tx)
982 h->ssi.tx = tx;
983 if (state)
984 h->ssi.state = state;
985 }
986
987 const char *
lws_sspc_tag(struct lws_sspc_handle * h)988 lws_sspc_tag(struct lws_sspc_handle *h)
989 {
990 if (!h)
991 return "[null sspc]";
992 return lws_lc_tag(&h->lc);
993 }
994