1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt).  A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10 
11 #include <stdlib.h>
12 #include <string.h>
13 
14 #include "core/nng_impl.h"
15 #include "nng/protocol/survey0/respond.h"
16 
17 // Respondent protocol.  The RESPONDENT protocol is the "replier" side of
18 // the surveyor pattern.  This is useful for building service discovery, or
19 // voting algorithms, for example.
20 
21 #ifndef NNI_PROTO_SURVEYOR_V0
22 #define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
23 #endif
24 
25 #ifndef NNI_PROTO_RESPONDENT_V0
26 #define NNI_PROTO_RESPONDENT_V0 NNI_PROTO(6, 3)
27 #endif
28 
29 typedef struct resp0_pipe resp0_pipe;
30 typedef struct resp0_sock resp0_sock;
31 typedef struct resp0_ctx  resp0_ctx;
32 
33 static void resp0_pipe_send_cb(void *);
34 static void resp0_pipe_recv_cb(void *);
35 static void resp0_pipe_fini(void *);
36 
37 struct resp0_ctx {
38 	resp0_sock *  sock;
39 	uint32_t      pipe_id;
40 	resp0_pipe *  spipe; // send pipe
41 	nni_aio *     saio;  // send aio
42 	nni_aio *     raio;  // recv aio
43 	nni_list_node sqnode;
44 	nni_list_node rqnode;
45 	size_t        btrace_len;
46 	uint32_t      btrace[NNI_MAX_MAX_TTL + 1];
47 };
48 
49 // resp0_sock is our per-socket protocol private structure.
50 struct resp0_sock {
51 	nni_mtx        mtx;
52 	nni_atomic_int ttl;
53 	nni_id_map     pipes;
54 	resp0_ctx      ctx;
55 	nni_list       recvpipes;
56 	nni_list       recvq;
57 	nni_pollable   readable;
58 	nni_pollable   writable;
59 };
60 
61 // resp0_pipe is our per-pipe protocol private structure.
62 struct resp0_pipe {
63 	nni_pipe *    npipe;
64 	resp0_sock *  psock;
65 	bool          busy;
66 	bool          closed;
67 	uint32_t      id;
68 	nni_list      sendq; // contexts waiting to send
69 	nni_aio       aio_send;
70 	nni_aio       aio_recv;
71 	nni_list_node rnode; // receivable linkage
72 };
73 
74 static void
resp0_ctx_close(void * arg)75 resp0_ctx_close(void *arg)
76 {
77 	resp0_ctx * ctx = arg;
78 	resp0_sock *s   = ctx->sock;
79 	nni_aio *   aio;
80 
81 	// complete any outstanding operations here, cancellation, etc.
82 
83 	nni_mtx_lock(&s->mtx);
84 	if ((aio = ctx->saio) != NULL) {
85 		resp0_pipe *p = ctx->spipe;
86 		ctx->saio     = NULL;
87 		ctx->spipe    = NULL;
88 		nni_list_remove(&p->sendq, ctx);
89 		nni_aio_finish_error(aio, NNG_ECLOSED);
90 	}
91 	if ((aio = ctx->raio) != NULL) {
92 		ctx->raio = NULL;
93 		nni_list_remove(&s->recvq, ctx);
94 		nni_aio_finish_error(aio, NNG_ECLOSED);
95 	}
96 	nni_mtx_unlock(&s->mtx);
97 }
98 
99 static void
resp0_ctx_fini(void * arg)100 resp0_ctx_fini(void *arg)
101 {
102 	resp0_ctx *ctx = arg;
103 
104 	resp0_ctx_close(ctx);
105 }
106 
107 static int
resp0_ctx_init(void * carg,void * sarg)108 resp0_ctx_init(void *carg, void *sarg)
109 {
110 	resp0_sock *s   = sarg;
111 	resp0_ctx * ctx = carg;
112 
113 	NNI_LIST_NODE_INIT(&ctx->sqnode);
114 	NNI_LIST_NODE_INIT(&ctx->rqnode);
115 	ctx->btrace_len = 0;
116 	ctx->sock       = s;
117 	ctx->pipe_id    = 0;
118 
119 	return (0);
120 }
121 
122 static void
resp0_ctx_cancel_send(nni_aio * aio,void * arg,int rv)123 resp0_ctx_cancel_send(nni_aio *aio, void *arg, int rv)
124 {
125 	resp0_ctx * ctx = arg;
126 	resp0_sock *s   = ctx->sock;
127 
128 	nni_mtx_lock(&s->mtx);
129 	if (ctx->saio != aio) {
130 		nni_mtx_unlock(&s->mtx);
131 		return;
132 	}
133 	nni_list_node_remove(&ctx->sqnode);
134 	ctx->saio = NULL;
135 	nni_mtx_unlock(&s->mtx);
136 	nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers
137 	nni_aio_finish_error(aio, rv);
138 }
139 
140 static void
resp0_ctx_send(void * arg,nni_aio * aio)141 resp0_ctx_send(void *arg, nni_aio *aio)
142 {
143 	resp0_ctx * ctx = arg;
144 	resp0_sock *s   = ctx->sock;
145 	resp0_pipe *p;
146 	nni_msg *   msg;
147 	size_t      len;
148 	uint32_t    pid;
149 	int         rv;
150 
151 	if (nni_aio_begin(aio) != 0) {
152 		return;
153 	}
154 	msg = nni_aio_get_msg(aio);
155 	nni_msg_header_clear(msg);
156 
157 	if (ctx == &s->ctx) {
158 		// We can't send anymore, because only one send per request.
159 		nni_pollable_clear(&s->writable);
160 	}
161 
162 	nni_mtx_lock(&s->mtx);
163 	if ((rv = nni_aio_schedule(aio, resp0_ctx_cancel_send, ctx)) != 0) {
164 		nni_mtx_unlock(&s->mtx);
165 		nni_aio_finish_error(aio, rv);
166 		return;
167 	}
168 
169 	if ((len = ctx->btrace_len) == 0) {
170 		nni_mtx_unlock(&s->mtx);
171 		nni_aio_finish_error(aio, NNG_ESTATE);
172 		return;
173 	}
174 	pid             = ctx->pipe_id;
175 	ctx->pipe_id    = 0;
176 	ctx->btrace_len = 0;
177 
178 	if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) {
179 		nni_mtx_unlock(&s->mtx);
180 		nni_aio_finish_error(aio, rv);
181 		return;
182 	}
183 
184 	if ((p = nni_id_get(&s->pipes, pid)) == NULL) {
185 		// Surveyor has left the building.  Just discard the reply.
186 		nni_mtx_unlock(&s->mtx);
187 		nni_aio_set_msg(aio, NULL);
188 		nni_aio_finish(aio, 0, nni_msg_len(msg));
189 		nni_msg_free(msg);
190 		return;
191 	}
192 
193 	if (!p->busy) {
194 		p->busy = true;
195 		len     = nni_msg_len(msg);
196 		nni_aio_set_msg(&p->aio_send, msg);
197 		nni_pipe_send(p->npipe, &p->aio_send);
198 		nni_mtx_unlock(&s->mtx);
199 
200 		nni_aio_set_msg(aio, NULL);
201 		nni_aio_finish(aio, 0, len);
202 		return;
203 	}
204 
205 	ctx->saio  = aio;
206 	ctx->spipe = p;
207 	nni_list_append(&p->sendq, ctx);
208 	nni_mtx_unlock(&s->mtx);
209 }
210 
211 static void
resp0_sock_fini(void * arg)212 resp0_sock_fini(void *arg)
213 {
214 	resp0_sock *s = arg;
215 
216 	nni_id_map_fini(&s->pipes);
217 	resp0_ctx_fini(&s->ctx);
218 	nni_pollable_fini(&s->writable);
219 	nni_pollable_fini(&s->readable);
220 	nni_mtx_fini(&s->mtx);
221 }
222 
223 static int
resp0_sock_init(void * arg,nni_sock * nsock)224 resp0_sock_init(void *arg, nni_sock *nsock)
225 {
226 	resp0_sock *s = arg;
227 
228 	NNI_ARG_UNUSED(nsock);
229 
230 	nni_mtx_init(&s->mtx);
231 	nni_id_map_init(&s->pipes, 0, 0, false);
232 
233 	NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode);
234 	NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode);
235 
236 	nni_atomic_init(&s->ttl);
237 	nni_atomic_set(&s->ttl, 8); // Per RFC
238 
239 	(void) resp0_ctx_init(&s->ctx, s);
240 
241 	// We start off without being either readable or writable.
242 	// Readability comes when there is something on the socket.
243 	nni_pollable_init(&s->writable);
244 	nni_pollable_init(&s->readable);
245 	return (0);
246 }
247 
248 static void
resp0_sock_open(void * arg)249 resp0_sock_open(void *arg)
250 {
251 	NNI_ARG_UNUSED(arg);
252 }
253 
254 static void
resp0_sock_close(void * arg)255 resp0_sock_close(void *arg)
256 {
257 	resp0_sock *s = arg;
258 
259 	resp0_ctx_close(&s->ctx);
260 }
261 
262 static void
resp0_pipe_stop(void * arg)263 resp0_pipe_stop(void *arg)
264 {
265 	resp0_pipe *p = arg;
266 
267 	nni_aio_stop(&p->aio_send);
268 	nni_aio_stop(&p->aio_recv);
269 }
270 
271 static void
resp0_pipe_fini(void * arg)272 resp0_pipe_fini(void *arg)
273 {
274 	resp0_pipe *p = arg;
275 	nng_msg *   msg;
276 
277 	if ((msg = nni_aio_get_msg(&p->aio_recv)) != NULL) {
278 		nni_aio_set_msg(&p->aio_recv, NULL);
279 		nni_msg_free(msg);
280 	}
281 	nni_aio_fini(&p->aio_send);
282 	nni_aio_fini(&p->aio_recv);
283 }
284 
285 static int
resp0_pipe_init(void * arg,nni_pipe * npipe,void * s)286 resp0_pipe_init(void *arg, nni_pipe *npipe, void *s)
287 {
288 	resp0_pipe *p = arg;
289 
290 	nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p);
291 	nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p);
292 
293 	NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode);
294 
295 	p->npipe = npipe;
296 	p->psock = s;
297 	p->busy  = false;
298 	p->id    = nni_pipe_id(npipe);
299 
300 	return (0);
301 }
302 
303 static int
resp0_pipe_start(void * arg)304 resp0_pipe_start(void *arg)
305 {
306 	resp0_pipe *p = arg;
307 	resp0_sock *s = p->psock;
308 	int         rv;
309 
310 	if (nni_pipe_peer(p->npipe) != NNI_PROTO_SURVEYOR_V0) {
311 		return (NNG_EPROTO);
312 	}
313 
314 	nni_mtx_lock(&s->mtx);
315 	rv = nni_id_set(&s->pipes, p->id, p);
316 	nni_mtx_unlock(&s->mtx);
317 	if (rv != 0) {
318 		return (rv);
319 	}
320 
321 	nni_pipe_recv(p->npipe, &p->aio_recv);
322 	return (rv);
323 }
324 
325 static void
resp0_pipe_close(void * arg)326 resp0_pipe_close(void *arg)
327 {
328 	resp0_pipe *p = arg;
329 	resp0_sock *s = p->psock;
330 	resp0_ctx * ctx;
331 
332 	nni_aio_close(&p->aio_send);
333 	nni_aio_close(&p->aio_recv);
334 
335 	nni_mtx_lock(&s->mtx);
336 	p->closed = true;
337 	while ((ctx = nni_list_first(&p->sendq)) != NULL) {
338 		nni_aio *aio;
339 		nni_msg *msg;
340 		nni_list_remove(&p->sendq, ctx);
341 		aio       = ctx->saio;
342 		ctx->saio = NULL;
343 		msg       = nni_aio_get_msg(aio);
344 		nni_aio_set_msg(aio, NULL);
345 		nni_aio_finish(aio, 0, nni_msg_len(msg));
346 		nni_msg_free(msg);
347 	}
348 	if (p->id == s->ctx.pipe_id) {
349 		// Make sure user space knows they can send a message to us,
350 		// which we will happily discard.
351 		nni_pollable_raise(&s->writable);
352 	}
353 	nni_id_remove(&s->pipes, p->id);
354 	nni_mtx_unlock(&s->mtx);
355 }
356 
357 static void
resp0_pipe_send_cb(void * arg)358 resp0_pipe_send_cb(void *arg)
359 {
360 	resp0_pipe *p = arg;
361 	resp0_sock *s = p->psock;
362 	resp0_ctx * ctx;
363 	nni_aio *   aio;
364 	nni_msg *   msg;
365 	size_t      len;
366 
367 	if (nni_aio_result(&p->aio_send) != 0) {
368 		nni_msg_free(nni_aio_get_msg(&p->aio_send));
369 		nni_aio_set_msg(&p->aio_send, NULL);
370 		nni_pipe_close(p->npipe);
371 		return;
372 	}
373 	nni_mtx_lock(&s->mtx);
374 	p->busy = false;
375 	if ((ctx = nni_list_first(&p->sendq)) == NULL) {
376 		// Nothing else to send.
377 		if (p->id == s->ctx.pipe_id) {
378 			// Mark us ready for the other side to send!
379 			nni_pollable_raise(&s->writable);
380 		}
381 		nni_mtx_unlock(&s->mtx);
382 		return;
383 	}
384 
385 	nni_list_remove(&p->sendq, ctx);
386 	aio        = ctx->saio;
387 	ctx->saio  = NULL;
388 	ctx->spipe = NULL;
389 	p->busy    = true;
390 	msg        = nni_aio_get_msg(aio);
391 	len        = nni_msg_len(msg);
392 	nni_aio_set_msg(aio, NULL);
393 	nni_aio_set_msg(&p->aio_send, msg);
394 	nni_pipe_send(p->npipe, &p->aio_send);
395 
396 	nni_mtx_unlock(&s->mtx);
397 
398 	nni_aio_finish_sync(aio, 0, len);
399 }
400 
401 static void
resp0_cancel_recv(nni_aio * aio,void * arg,int rv)402 resp0_cancel_recv(nni_aio *aio, void *arg, int rv)
403 {
404 	resp0_ctx * ctx = arg;
405 	resp0_sock *s   = ctx->sock;
406 
407 	nni_mtx_lock(&s->mtx);
408 	if (ctx->raio == aio) {
409 		nni_list_remove(&s->recvq, ctx);
410 		ctx->raio = NULL;
411 		nni_aio_finish_error(aio, rv);
412 	}
413 	nni_mtx_unlock(&s->mtx);
414 }
415 
416 static void
resp0_ctx_recv(void * arg,nni_aio * aio)417 resp0_ctx_recv(void *arg, nni_aio *aio)
418 {
419 	resp0_ctx * ctx = arg;
420 	resp0_sock *s   = ctx->sock;
421 	resp0_pipe *p;
422 	size_t      len;
423 	nni_msg *   msg;
424 
425 	if (nni_aio_begin(aio) != 0) {
426 		return;
427 	}
428 	nni_mtx_lock(&s->mtx);
429 	if ((p = nni_list_first(&s->recvpipes)) == NULL) {
430 		int rv;
431 		rv = nni_aio_schedule(aio, resp0_cancel_recv, ctx);
432 		if (rv != 0) {
433 			nni_mtx_unlock(&s->mtx);
434 			nni_aio_finish_error(aio, rv);
435 			return;
436 		}
437 		// We cannot have two concurrent receive requests on the same
438 		// context...
439 		if (ctx->raio != NULL) {
440 			nni_mtx_unlock(&s->mtx);
441 			nni_aio_finish_error(aio, NNG_ESTATE);
442 			return;
443 		}
444 		ctx->raio = aio;
445 		nni_list_append(&s->recvq, ctx);
446 		nni_mtx_unlock(&s->mtx);
447 		return;
448 	}
449 	msg = nni_aio_get_msg(&p->aio_recv);
450 	nni_aio_set_msg(&p->aio_recv, NULL);
451 	nni_list_remove(&s->recvpipes, p);
452 	if (nni_list_empty(&s->recvpipes)) {
453 		nni_pollable_clear(&s->readable);
454 	}
455 	nni_pipe_recv(p->npipe, &p->aio_recv);
456 
457 	len = nni_msg_header_len(msg);
458 	memcpy(ctx->btrace, nni_msg_header(msg), len);
459 	ctx->btrace_len = len;
460 	ctx->pipe_id    = p->id;
461 	if (ctx == &s->ctx) {
462 		nni_pollable_raise(&s->writable);
463 	}
464 	nni_mtx_unlock(&s->mtx);
465 
466 	nni_msg_header_clear(msg);
467 	nni_aio_set_msg(aio, msg);
468 	nni_aio_finish(aio, 0, nni_msg_len(msg));
469 }
470 
471 static void
resp0_pipe_recv_cb(void * arg)472 resp0_pipe_recv_cb(void *arg)
473 {
474 	resp0_pipe *p = arg;
475 	resp0_sock *s = p->psock;
476 	resp0_ctx * ctx;
477 	nni_msg *   msg;
478 	nni_aio *   aio;
479 	int         hops;
480 	size_t      len;
481 	int         ttl;
482 
483 	if (nni_aio_result(&p->aio_recv) != 0) {
484 		nni_pipe_close(p->npipe);
485 		return;
486 	}
487 
488 	ttl = nni_atomic_get(&s->ttl);
489 	msg = nni_aio_get_msg(&p->aio_recv);
490 	nni_msg_set_pipe(msg, p->id);
491 
492 	// Move backtrace from body to header
493 	hops = 1;
494 	for (;;) {
495 		bool     end = 0;
496 		uint8_t *body;
497 
498 		if (hops > ttl) {
499 			goto drop;
500 		}
501 		hops++;
502 		if (nni_msg_len(msg) < 4) {
503 			// Peer is speaking garbage, kick it.
504 			nni_msg_free(msg);
505 			nni_aio_set_msg(&p->aio_recv, NULL);
506 			nni_pipe_close(p->npipe);
507 			return;
508 		}
509 		body = nni_msg_body(msg);
510 		end  = ((body[0] & 0x80u) != 0);
511 		if (nni_msg_header_append(msg, body, 4) != 0) {
512 			goto drop;
513 		}
514 		nni_msg_trim(msg, 4);
515 		if (end) {
516 			break;
517 		}
518 	}
519 
520 	len = nni_msg_header_len(msg);
521 
522 	nni_mtx_lock(&s->mtx);
523 
524 	if (p->closed) {
525 		// If pipe was closed, we just abandon the data from it.
526 		nni_aio_set_msg(&p->aio_recv, NULL);
527 		nni_mtx_unlock(&s->mtx);
528 		nni_msg_free(msg);
529 		return;
530 	}
531 	if ((ctx = nni_list_first(&s->recvq)) == NULL) {
532 		// No one blocked in recv, stall.
533 		nni_list_append(&s->recvpipes, p);
534 		nni_pollable_raise(&s->readable);
535 		nni_mtx_unlock(&s->mtx);
536 		return;
537 	}
538 
539 	nni_list_remove(&s->recvq, ctx);
540 	aio       = ctx->raio;
541 	ctx->raio = NULL;
542 	nni_aio_set_msg(&p->aio_recv, NULL);
543 
544 	// Start the next receive.
545 	nni_pipe_recv(p->npipe, &p->aio_recv);
546 
547 	ctx->btrace_len = len;
548 	memcpy(ctx->btrace, nni_msg_header(msg), len);
549 	nni_msg_header_clear(msg);
550 	ctx->pipe_id = p->id;
551 
552 	if ((ctx == &s->ctx) && (!p->busy)) {
553 		nni_pollable_raise(&s->writable);
554 	}
555 	nni_mtx_unlock(&s->mtx);
556 
557 	nni_aio_set_msg(aio, msg);
558 	nni_aio_finish_sync(aio, 0, nni_msg_len(msg));
559 	return;
560 
561 drop:
562 	nni_msg_free(msg);
563 	nni_aio_set_msg(&p->aio_recv, NULL);
564 	nni_pipe_recv(p->npipe, &p->aio_recv);
565 }
566 
567 static int
resp0_sock_set_max_ttl(void * arg,const void * buf,size_t sz,nni_opt_type t)568 resp0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t)
569 {
570 	resp0_sock *s = arg;
571 	int         ttl;
572 	int         rv;
573 
574 	if ((rv = nni_copyin_int(&ttl, buf, sz, 1, NNI_MAX_MAX_TTL, t)) == 0) {
575 		nni_atomic_set(&s->ttl, ttl);
576 	}
577 	return (rv);
578 }
579 
580 static int
resp0_sock_get_max_ttl(void * arg,void * buf,size_t * szp,nni_opt_type t)581 resp0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t)
582 {
583 	resp0_sock *s = arg;
584 	return (nni_copyout_int(nni_atomic_get(&s->ttl), buf, szp, t));
585 }
586 
587 static int
resp0_sock_get_sendfd(void * arg,void * buf,size_t * szp,nni_opt_type t)588 resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
589 {
590 	resp0_sock *s = arg;
591 	int         rv;
592 	int         fd;
593 
594 	if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) {
595 		return (rv);
596 	}
597 	return (nni_copyout_int(fd, buf, szp, t));
598 }
599 
600 static int
resp0_sock_get_recvfd(void * arg,void * buf,size_t * szp,nni_opt_type t)601 resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t)
602 {
603 	resp0_sock *s = arg;
604 	int         rv;
605 	int         fd;
606 
607 	if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) {
608 		return (rv);
609 	}
610 	return (nni_copyout_int(fd, buf, szp, t));
611 }
612 
613 static void
resp0_sock_send(void * arg,nni_aio * aio)614 resp0_sock_send(void *arg, nni_aio *aio)
615 {
616 	resp0_sock *s = arg;
617 
618 	resp0_ctx_send(&s->ctx, aio);
619 }
620 
621 static void
resp0_sock_recv(void * arg,nni_aio * aio)622 resp0_sock_recv(void *arg, nni_aio *aio)
623 {
624 	resp0_sock *s = arg;
625 
626 	resp0_ctx_recv(&s->ctx, aio);
627 }
628 
629 static nni_proto_pipe_ops resp0_pipe_ops = {
630 	.pipe_size  = sizeof(resp0_pipe),
631 	.pipe_init  = resp0_pipe_init,
632 	.pipe_fini  = resp0_pipe_fini,
633 	.pipe_start = resp0_pipe_start,
634 	.pipe_close = resp0_pipe_close,
635 	.pipe_stop  = resp0_pipe_stop,
636 };
637 
638 static nni_proto_ctx_ops resp0_ctx_ops = {
639 	.ctx_size = sizeof(resp0_ctx),
640 	.ctx_init = resp0_ctx_init,
641 	.ctx_fini = resp0_ctx_fini,
642 	.ctx_send = resp0_ctx_send,
643 	.ctx_recv = resp0_ctx_recv,
644 };
645 
646 static nni_option resp0_sock_options[] = {
647 	{
648 	    .o_name = NNG_OPT_MAXTTL,
649 	    .o_get  = resp0_sock_get_max_ttl,
650 	    .o_set  = resp0_sock_set_max_ttl,
651 	},
652 	{
653 	    .o_name = NNG_OPT_RECVFD,
654 	    .o_get  = resp0_sock_get_recvfd,
655 	    .o_set  = NULL,
656 	},
657 	{
658 	    .o_name = NNG_OPT_SENDFD,
659 	    .o_get  = resp0_sock_get_sendfd,
660 	    .o_set  = NULL,
661 	},
662 	// terminate list
663 	{
664 	    .o_name = NULL,
665 	},
666 };
667 
668 static nni_proto_sock_ops resp0_sock_ops = {
669 	.sock_size    = sizeof(resp0_sock),
670 	.sock_init    = resp0_sock_init,
671 	.sock_fini    = resp0_sock_fini,
672 	.sock_open    = resp0_sock_open,
673 	.sock_close   = resp0_sock_close,
674 	.sock_send    = resp0_sock_send,
675 	.sock_recv    = resp0_sock_recv,
676 	.sock_options = resp0_sock_options,
677 };
678 
679 static nni_proto resp0_proto = {
680 	.proto_version  = NNI_PROTOCOL_VERSION,
681 	.proto_self     = { NNI_PROTO_RESPONDENT_V0, "respondent" },
682 	.proto_peer     = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
683 	.proto_flags    = NNI_PROTO_FLAG_SNDRCV,
684 	.proto_sock_ops = &resp0_sock_ops,
685 	.proto_pipe_ops = &resp0_pipe_ops,
686 	.proto_ctx_ops  = &resp0_ctx_ops,
687 };
688 
689 int
nng_respondent0_open(nng_socket * sidp)690 nng_respondent0_open(nng_socket *sidp)
691 {
692 	return (nni_proto_open(sidp, &resp0_proto));
693 }
694