1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt). A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "core/nng_impl.h"
15 #include "nng/protocol/survey0/respond.h"
16
17 // Respondent protocol. The RESPONDENT protocol is the "replier" side of
18 // the surveyor pattern. This is useful for building service discovery, or
19 // voting algorithms, for example.
20
21 #ifndef NNI_PROTO_SURVEYOR_V0
22 #define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
23 #endif
24
25 #ifndef NNI_PROTO_RESPONDENT_V0
26 #define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
27 #endif
28
29 typedef struct resp0_pipe resp0_pipe;
30 typedef struct resp0_sock resp0_sock;
31 typedef struct resp0_ctx resp0_ctx;
32
33 static void resp0_pipe_send_cb(void *);
34 static void resp0_pipe_recv_cb(void *);
35 static void resp0_pipe_fini(void *);
36
37 struct resp0_ctx {
38 resp0_sock * sock;
39 uint32_t pipe_id;
40 resp0_pipe * spipe; // send pipe
41 nni_aio * saio; // send aio
42 nni_aio * raio; // recv aio
43 nni_list_node sqnode;
44 nni_list_node rqnode;
45 size_t btrace_len;
46 uint32_t btrace[NNI_MAX_MAX_TTL + 1];
47 };
48
49 // resp0_sock is our per-socket protocol private structure.
50 struct resp0_sock {
51 nni_mtx mtx;
52 nni_atomic_int ttl;
53 nni_id_map pipes;
54 resp0_ctx ctx;
55 nni_list recvpipes;
56 nni_list recvq;
57 nni_pollable readable;
58 nni_pollable writable;
59 };
60
61 // resp0_pipe is our per-pipe protocol private structure.
62 struct resp0_pipe {
63 nni_pipe * npipe;
64 resp0_sock * psock;
65 bool busy;
66 bool closed;
67 uint32_t id;
68 nni_list sendq; // contexts waiting to send
69 nni_aio aio_send;
70 nni_aio aio_recv;
71 nni_list_node rnode; // receivable linkage
72 };
73
74 static void
resp0_ctx_close(void * arg)75 resp0_ctx_close(void *arg)
76 {
77 resp0_ctx * ctx = arg;
78 resp0_sock *s = ctx->sock;
79 nni_aio * aio;
80
81 // complete any outstanding operations here, cancellation, etc.
82
83 nni_mtx_lock(&s->mtx);
84 if ((aio = ctx->saio) != NULL) {
85 resp0_pipe *p = ctx->spipe;
86 ctx->saio = NULL;
87 ctx->spipe = NULL;
88 nni_list_remove(&p->sendq, ctx);
89 nni_aio_finish_error(aio, NNG_ECLOSED);
90 }
91 if ((aio = ctx->raio) != NULL) {
92 ctx->raio = NULL;
93 nni_list_remove(&s->recvq, ctx);
94 nni_aio_finish_error(aio, NNG_ECLOSED);
95 }
96 nni_mtx_unlock(&s->mtx);
97 }
98
99 static void
resp0_ctx_fini(void * arg)100 resp0_ctx_fini(void *arg)
101 {
102 resp0_ctx *ctx = arg;
103
104 resp0_ctx_close(ctx);
105 }
106
107 static int
resp0_ctx_init(void * carg,void * sarg)108 resp0_ctx_init(void *carg, void *sarg)
109 {
110 resp0_sock *s = sarg;
111 resp0_ctx * ctx = carg;
112
113 NNI_LIST_NODE_INIT(&ctx->sqnode);
114 NNI_LIST_NODE_INIT(&ctx->rqnode);
115 ctx->btrace_len = 0;
116 ctx->sock = s;
117 ctx->pipe_id = 0;
118
119 return (0);
120 }
121
122 static void
resp0_ctx_cancel_send(nni_aio * aio,void * arg,int rv)123 resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
124 {
125 resp0_ctx * ctx = arg;
126 resp0_sock *s = ctx->sock;
127
128 nni_mtx_lock(&s->mtx);
129 if (ctx->saio != aio) {
130 nni_mtx_unlock(&s->mtx);
131 return;
132 }
133 nni_list_node_remove(&ctx->sqnode);
134 ctx->saio = NULL;
135 nni_mtx_unlock(&s->mtx);
136 nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers
137 nni_aio_finish_error(aio, rv);
138 }
139
140 static void
resp0_ctx_send(void * arg,nni_aio * aio)141 resp0_ctx_send(void *arg, nni_aio *aio)
142 {
143 resp0_ctx * ctx = arg;
144 resp0_sock *s = ctx->sock;
145 resp0_pipe *p;
146 nni_msg * msg;
147 size_t len;
148 uint32_t pid;
149 int rv;
150
151 if (nni_aio_begin(aio) != 0) {
152 return;
153 }
154 msg = nni_aio_get_msg(aio);
155 nni_msg_header_clear(msg);
156
157 if (ctx == &s->ctx) {
158 // We can't send anymore, because only one send per request.
159 nni_pollable_clear(&s->writable);
160 }
161
162 nni_mtx_lock(&s->mtx);
163 if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) {
164 nni_mtx_unlock(&s->mtx);
165 nni_aio_finish_error(aio, rv);
166 return;
167 }
168
169 if ((len = ctx->btrace_len) == 0) {
170 nni_mtx_unlock(&s->mtx);
171 nni_aio_finish_error(aio, NNG_ESTATE);
172 return;
173 }
174 pid = ctx->pipe_id;
175 ctx->pipe_id = 0;
176 ctx->btrace_len = 0;
177
178 if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
179 nni_mtx_unlock(&s->mtx);
180 nni_aio_finish_error(aio, rv);
181 return;
182 }
183
184 if ((p = nni_id_get(&s->pipes, pid)) == NULL) {
185 // Surveyor has left the building. Just discard the reply.
186 nni_mtx_unlock(&s->mtx);
187 nni_aio_set_msg(aio, NULL);
188 nni_aio_finish(aio, 0, nni_msg_len(msg));
189 nni_msg_free(msg);
190 return;
191 }
192
193 if (!p->busy) {
194 p->busy = true;
195 len = nni_msg_len(msg);
196 nni_aio_set_msg(&p->aio_send, msg);
197 nni_pipe_send(p->npipe, &p->aio_send);
198 nni_mtx_unlock(&s->mtx);
199
200 nni_aio_set_msg(aio, NULL);
201 nni_aio_finish(aio, 0, len);
202 return;
203 }
204
205 ctx->saio = aio;
206 ctx->spipe = p;
207 nni_list_append(&p->sendq, ctx);
208 nni_mtx_unlock(&s->mtx);
209 }
210
211 static void
resp0_sock_fini(void * arg)212 resp0_sock_fini(void *arg)
213 {
214 resp0_sock *s = arg;
215
216 nni_id_map_fini(&s->pipes);
217 resp0_ctx_fini(&s->ctx);
218 nni_pollable_fini(&s->writable);
219 nni_pollable_fini(&s->readable);
220 nni_mtx_fini(&s->mtx);
221 }
222
223 static int
resp0_sock_init(void * arg,nni_sock * nsock)224 resp0_sock_init(void *arg, nni_sock *nsock)
225 {
226 resp0_sock *s = arg;
227
228 NNI_ARG_UNUSED(nsock);
229
230 nni_mtx_init(&s->mtx);
231 nni_id_map_init(&s->pipes, 0, 0, false);
232
233 NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode);
234 NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode);
235
236 nni_atomic_init(&s->ttl);
237 nni_atomic_set(&s->ttl, 8); // Per RFC
238
239 (void) resp0_ctx_init(&s->ctx, s);
240
241 // We start off without being either readable or writable.
242 // Readability comes when there is something on the socket.
243 nni_pollable_init(&s->writable);
244 nni_pollable_init(&s->readable);
245 return (0);
246 }
247
248 static void
resp0_sock_open(void * arg)249 resp0_sock_open(void *arg)
250 {
251 NNI_ARG_UNUSED(arg);
252 }
253
254 static void
resp0_sock_close(void * arg)255 resp0_sock_close(void *arg)
256 {
257 resp0_sock *s = arg;
258
259 resp0_ctx_close(&s->ctx);
260 }
261
262 static void
resp0_pipe_stop(void * arg)263 resp0_pipe_stop(void *arg)
264 {
265 resp0_pipe *p = arg;
266
267 nni_aio_stop(&p->aio_send);
268 nni_aio_stop(&p->aio_recv);
269 }
270
271 static void
resp0_pipe_fini(void * arg)272 resp0_pipe_fini(void *arg)
273 {
274 resp0_pipe *p = arg;
275 nng_msg * msg;
276
277 if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
278 nni_aio_set_msg(&p->aio_recv, NULL);
279 nni_msg_free(msg);
280 }
281 nni_aio_fini(&p->aio_send);
282 nni_aio_fini(&p->aio_recv);
283 }
284
285 static int
resp0_pipe_init(void * arg,nni_pipe * npipe,void * s)286 resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
287 {
288 resp0_pipe *p = arg;
289
290 nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p);
291 nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p);
292
293 NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode);
294
295 p->npipe = npipe;
296 p->psock = s;
297 p->busy = false;
298 p->id = nni_pipe_id(npipe);
299
300 return (0);
301 }
302
303 static int
resp0_pipe_start(void * arg)304 resp0_pipe_start(void *arg)
305 {
306 resp0_pipe *p = arg;
307 resp0_sock *s = p->psock;
308 int rv;
309
310 if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
311 return (NNG_EPROTO);
312 }
313
314 nni_mtx_lock(&s->mtx);
315 rv = nni_id_set(&s->pipes, p->id, p);
316 nni_mtx_unlock(&s->mtx);
317 if (rv != 0) {
318 return (rv);
319 }
320
321 nni_pipe_recv(p->npipe, &p->aio_recv);
322 return (rv);
323 }
324
325 static void
resp0_pipe_close(void * arg)326 resp0_pipe_close(void *arg)
327 {
328 resp0_pipe *p = arg;
329 resp0_sock *s = p->psock;
330 resp0_ctx * ctx;
331
332 nni_aio_close(&p->aio_send);
333 nni_aio_close(&p->aio_recv);
334
335 nni_mtx_lock(&s->mtx);
336 p->closed = true;
337 while ((ctx = nni_list_first(&p->sendq)) != NULL) {
338 nni_aio *aio;
339 nni_msg *msg;
340 nni_list_remove(&p->sendq, ctx);
341 aio = ctx->saio;
342 ctx->saio = NULL;
343 msg = nni_aio_get_msg(aio);
344 nni_aio_set_msg(aio, NULL);
345 nni_aio_finish(aio, 0, nni_msg_len(msg));
346 nni_msg_free(msg);
347 }
348 if (p->id == s->ctx.pipe_id) {
349 // Make sure user space knows they can send a message to us,
350 // which we will happily discard.
351 nni_pollable_raise(&s->writable);
352 }
353 nni_id_remove(&s->pipes, p->id);
354 nni_mtx_unlock(&s->mtx);
355 }
356
357 static void
resp0_pipe_send_cb(void * arg)358 resp0_pipe_send_cb(void *arg)
359 {
360 resp0_pipe *p = arg;
361 resp0_sock *s = p->psock;
362 resp0_ctx * ctx;
363 nni_aio * aio;
364 nni_msg * msg;
365 size_t len;
366
367 if (nni_aio_result(&p->aio_send) != 0) {
368 nni_msg_free(nni_aio_get_msg(&p->aio_send));
369 nni_aio_set_msg(&p->aio_send, NULL);
370 nni_pipe_close(p->npipe);
371 return;
372 }
373 nni_mtx_lock(&s->mtx);
374 p->busy = false;
375 if ((ctx = nni_list_first(&p->sendq)) == NULL) {
376 // Nothing else to send.
377 if (p->id == s->ctx.pipe_id) {
378 // Mark us ready for the other side to send!
379 nni_pollable_raise(&s->writable);
380 }
381 nni_mtx_unlock(&s->mtx);
382 return;
383 }
384
385 nni_list_remove(&p->sendq, ctx);
386 aio = ctx->saio;
387 ctx->saio = NULL;
388 ctx->spipe = NULL;
389 p->busy = true;
390 msg = nni_aio_get_msg(aio);
391 len = nni_msg_len(msg);
392 nni_aio_set_msg(aio, NULL);
393 nni_aio_set_msg(&p->aio_send, msg);
394 nni_pipe_send(p->npipe, &p->aio_send);
395
396 nni_mtx_unlock(&s->mtx);
397
398 nni_aio_finish_sync(aio, 0, len);
399 }
400
401 static void
resp0_cancel_recv(nni_aio * aio,void * arg,int rv)402 resp0_cancel_recv(nni_aio *aio, void *arg, int rv)
403 {
404 resp0_ctx * ctx = arg;
405 resp0_sock *s = ctx->sock;
406
407 nni_mtx_lock(&s->mtx);
408 if (ctx->raio == aio) {
409 nni_list_remove(&s->recvq, ctx);
410 ctx->raio = NULL;
411 nni_aio_finish_error(aio, rv);
412 }
413 nni_mtx_unlock(&s->mtx);
414 }
415
416 static void
resp0_ctx_recv(void * arg,nni_aio * aio)417 resp0_ctx_recv(void *arg, nni_aio *aio)
418 {
419 resp0_ctx * ctx = arg;
420 resp0_sock *s = ctx->sock;
421 resp0_pipe *p;
422 size_t len;
423 nni_msg * msg;
424
425 if (nni_aio_begin(aio) != 0) {
426 return;
427 }
428 nni_mtx_lock(&s->mtx);
429 if ((p = nni_list_first(&s->recvpipes)) == NULL) {
430 int rv;
431 rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx);
432 if (rv != 0) {
433 nni_mtx_unlock(&s->mtx);
434 nni_aio_finish_error(aio, rv);
435 return;
436 }
437 // We cannot have two concurrent receive requests on the same
438 // context...
439 if (ctx->raio != NULL) {
440 nni_mtx_unlock(&s->mtx);
441 nni_aio_finish_error(aio, NNG_ESTATE);
442 return;
443 }
444 ctx->raio = aio;
445 nni_list_append(&s->recvq, ctx);
446 nni_mtx_unlock(&s->mtx);
447 return;
448 }
449 msg = nni_aio_get_msg(&p->aio_recv);
450 nni_aio_set_msg(&p->aio_recv, NULL);
451 nni_list_remove(&s->recvpipes, p);
452 if (nni_list_empty(&s->recvpipes)) {
453 nni_pollable_clear(&s->readable);
454 }
455 nni_pipe_recv(p->npipe, &p->aio_recv);
456
457 len = nni_msg_header_len(msg);
458 memcpy(ctx->btrace, nni_msg_header(msg), len);
459 ctx->btrace_len = len;
460 ctx->pipe_id = p->id;
461 if (ctx == &s->ctx) {
462 nni_pollable_raise(&s->writable);
463 }
464 nni_mtx_unlock(&s->mtx);
465
466 nni_msg_header_clear(msg);
467 nni_aio_set_msg(aio, msg);
468 nni_aio_finish(aio, 0, nni_msg_len(msg));
469 }
470
471 static void
resp0_pipe_recv_cb(void * arg)472 resp0_pipe_recv_cb(void *arg)
473 {
474 resp0_pipe *p = arg;
475 resp0_sock *s = p->psock;
476 resp0_ctx * ctx;
477 nni_msg * msg;
478 nni_aio * aio;
479 int hops;
480 size_t len;
481 int ttl;
482
483 if (nni_aio_result(&p->aio_recv) != 0) {
484 nni_pipe_close(p->npipe);
485 return;
486 }
487
488 ttl = nni_atomic_get(&s->ttl);
489 msg = nni_aio_get_msg(&p->aio_recv);
490 nni_msg_set_pipe(msg, p->id);
491
492 // Move backtrace from body to header
493 hops = 1;
494 for (;;) {
495 bool end = 0;
496 uint8_t *body;
497
498 if (hops > ttl) {
499 goto drop;
500 }
501 hops++;
502 if (nni_msg_len(msg) < 4) {
503 // Peer is speaking garbage, kick it.
504 nni_msg_free(msg);
505 nni_aio_set_msg(&p->aio_recv, NULL);
506 nni_pipe_close(p->npipe);
507 return;
508 }
509 body = nni_msg_body(msg);
510 end = ((body[0] & 0x80u) != 0);
511 if (nni_msg_header_append(msg, body, 4) != 0) {
512 goto drop;
513 }
514 nni_msg_trim(msg, 4);
515 if (end) {
516 break;
517 }
518 }
519
520 len = nni_msg_header_len(msg);
521
522 nni_mtx_lock(&s->mtx);
523
524 if (p->closed) {
525 // If pipe was closed, we just abandon the data from it.
526 nni_aio_set_msg(&p->aio_recv, NULL);
527 nni_mtx_unlock(&s->mtx);
528 nni_msg_free(msg);
529 return;
530 }
531 if ((ctx = nni_list_first(&s->recvq)) == NULL) {
532 // No one blocked in recv, stall.
533 nni_list_append(&s->recvpipes, p);
534 nni_pollable_raise(&s->readable);
535 nni_mtx_unlock(&s->mtx);
536 return;
537 }
538
539 nni_list_remove(&s->recvq, ctx);
540 aio = ctx->raio;
541 ctx->raio = NULL;
542 nni_aio_set_msg(&p->aio_recv, NULL);
543
544 // Start the next receive.
545 nni_pipe_recv(p->npipe, &p->aio_recv);
546
547 ctx->btrace_len = len;
548 memcpy(ctx->btrace, nni_msg_header(msg), len);
549 nni_msg_header_clear(msg);
550 ctx->pipe_id = p->id;
551
552 if ((ctx == &s->ctx) && (!p->busy)) {
553 nni_pollable_raise(&s->writable);
554 }
555 nni_mtx_unlock(&s->mtx);
556
557 nni_aio_set_msg(aio, msg);
558 nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
559 return;
560
561 drop:
562 nni_msg_free(msg);
563 nni_aio_set_msg(&p->aio_recv, NULL);
564 nni_pipe_recv(p->npipe, &p->aio_recv);
565 }
566
567 static int
resp0_sock_set_max_ttl(void * arg,const void * buf,size_t sz,nni_opt_type t)568 resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
569 {
570 resp0_sock *s = arg;
571 int ttl;
572 int rv;
573
574 if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
575 nni_atomic_set(&s->ttl, ttl);
576 }
577 return (rv);
578 }
579
580 static int
resp0_sock_get_max_ttl(void * arg,void * buf,size_t * szp,nni_opt_type t)581 resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
582 {
583 resp0_sock *s = arg;
584 return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
585 }
586
587 static int
resp0_sock_get_sendfd(void * arg,void * buf,size_t * szp,nni_opt_type t)588 resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
589 {
590 resp0_sock *s = arg;
591 int rv;
592 int fd;
593
594 if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
595 return (rv);
596 }
597 return (nni_copyout_int(fd, buf, szp, t));
598 }
599
600 static int
resp0_sock_get_recvfd(void * arg,void * buf,size_t * szp,nni_opt_type t)601 resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
602 {
603 resp0_sock *s = arg;
604 int rv;
605 int fd;
606
607 if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
608 return (rv);
609 }
610 return (nni_copyout_int(fd, buf, szp, t));
611 }
612
613 static void
resp0_sock_send(void * arg,nni_aio * aio)614 resp0_sock_send(void *arg, nni_aio *aio)
615 {
616 resp0_sock *s = arg;
617
618 resp0_ctx_send(&s->ctx, aio);
619 }
620
621 static void
resp0_sock_recv(void * arg,nni_aio * aio)622 resp0_sock_recv(void *arg, nni_aio *aio)
623 {
624 resp0_sock *s = arg;
625
626 resp0_ctx_recv(&s->ctx, aio);
627 }
628
629 static nni_proto_pipe_ops resp0_pipe_ops = {
630 .pipe_size = sizeof(resp0_pipe),
631 .pipe_init = resp0_pipe_init,
632 .pipe_fini = resp0_pipe_fini,
633 .pipe_start = resp0_pipe_start,
634 .pipe_close = resp0_pipe_close,
635 .pipe_stop = resp0_pipe_stop,
636 };
637
638 static nni_proto_ctx_ops resp0_ctx_ops = {
639 .ctx_size = sizeof(resp0_ctx),
640 .ctx_init = resp0_ctx_init,
641 .ctx_fini = resp0_ctx_fini,
642 .ctx_send = resp0_ctx_send,
643 .ctx_recv = resp0_ctx_recv,
644 };
645
646 static nni_option resp0_sock_options[] = {
647 {
648 .o_name = NNG_OPT_MAXTTL,
649 .o_get = resp0_sock_get_max_ttl,
650 .o_set = resp0_sock_set_max_ttl,
651 },
652 {
653 .o_name = NNG_OPT_RECVFD,
654 .o_get = resp0_sock_get_recvfd,
655 .o_set = NULL,
656 },
657 {
658 .o_name = NNG_OPT_SENDFD,
659 .o_get = resp0_sock_get_sendfd,
660 .o_set = NULL,
661 },
662 // terminate list
663 {
664 .o_name = NULL,
665 },
666 };
667
668 static nni_proto_sock_ops resp0_sock_ops = {
669 .sock_size = sizeof(resp0_sock),
670 .sock_init = resp0_sock_init,
671 .sock_fini = resp0_sock_fini,
672 .sock_open = resp0_sock_open,
673 .sock_close = resp0_sock_close,
674 .sock_send = resp0_sock_send,
675 .sock_recv = resp0_sock_recv,
676 .sock_options = resp0_sock_options,
677 };
678
679 static nni_proto resp0_proto = {
680 .proto_version = NNI_PROTOCOL_VERSION,
681 .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" },
682 .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
683 .proto_flags = NNI_PROTO_FLAG_SNDRCV,
684 .proto_sock_ops = &resp0_sock_ops,
685 .proto_pipe_ops = &resp0_pipe_ops,
686 .proto_ctx_ops = &resp0_ctx_ops,
687 };
688
689 int
nng_respondent0_open(nng_socket * sidp)690 nng_respondent0_open(nng_socket *sidp)
691 {
692 return (nni_proto_open(sidp, &resp0_proto));
693 }
694