1 // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
2 // Copyright 2018 Capitar IT Group BV <info@capitoar.com>
3 //
4 // This software is supplied under the terms of the MIT License, a
5 // copy of which should be located in the distribution where this
6 // file was obtained (LICENSE.txt).  A copy of the license may also be
7 // found online at https://opensource.org/licenses/MIT.
8 //
9 
10 // This program serves as an example for how to write an async RPC service,
11 // using the RAW request/reply pattern and nn_poll.  The server receives
12 // messages and keeps them on a list, replying to them.
13 
14 // Our demonstration application layer protocol is simple.  The client sends
15 // a number of milliseconds to wait before responding.  The server just gives
16 // back an empty reply after waiting that long.
17 
18 // To run this program, start the server as async_demo <url> -s
19 // Then connect to it with the client as async_client <url> <msec>.
20 //
21 //  For example:
22 //
23 //  % ./async tcp://127.0.0.1:5555 -s &
24 //  % ./async tcp://127.0.0.1:5555 323
25 //  Request took 324 milliseconds.
26 
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <time.h>
31 
32 #include <nng/nng.h>
33 #include <nng/protocol/reqrep0/rep.h>
34 #include <nng/protocol/reqrep0/req.h>
35 #include <nng/supplemental/util/platform.h>
36 
37 // Parallel is the maximum number of outstanding requests we can handle.
38 // This is *NOT* the number of threads in use, but instead represents
39 // outstanding work items.  Select a small number to reduce memory size.
40 // (Each one of these can be thought of as a request-reply loop.)
41 #ifndef PARALLEL
42 #define PARALLEL 32
43 #endif
44 
45 // The server keeps a list of work items, sorted by expiration time,
46 // so that we can use this to set the timeout to the correct value for
47 // use in poll.
48 struct work {
49 	enum { INIT, RECV, WAIT, SEND } state;
50 	nng_aio *  aio;
51 	nng_socket sock;
52 	nng_msg *  msg;
53 };
54 
55 void
fatal(const char * func,int rv)56 fatal(const char *func, int rv)
57 {
58 	fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
59 	exit(1);
60 }
61 
62 void
server_cb(void * arg)63 server_cb(void *arg)
64 {
65 	struct work *work = arg;
66 	nng_msg *    msg;
67 	int          rv;
68 	uint32_t     when;
69 
70 	switch (work->state) {
71 	case INIT:
72 		work->state = RECV;
73 		nng_recv_aio(work->sock, work->aio);
74 		break;
75 	case RECV:
76 		if ((rv = nng_aio_result(work->aio)) != 0) {
77 			fatal("nng_recv_aio", rv);
78 		}
79 		msg = nng_aio_get_msg(work->aio);
80 		if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
81 			// bad message, just ignore it.
82 			nng_msg_free(msg);
83 			nng_recv_aio(work->sock, work->aio);
84 			return;
85 		}
86 		work->msg   = msg;
87 		work->state = WAIT;
88 		nng_sleep_aio(when, work->aio);
89 		break;
90 	case WAIT:
91 		// We could add more data to the message here.
92 		nng_aio_set_msg(work->aio, work->msg);
93 		work->msg   = NULL;
94 		work->state = SEND;
95 		nng_send_aio(work->sock, work->aio);
96 		break;
97 	case SEND:
98 		if ((rv = nng_aio_result(work->aio)) != 0) {
99 			nng_msg_free(work->msg);
100 			fatal("nng_send_aio", rv);
101 		}
102 		work->state = RECV;
103 		nng_recv_aio(work->sock, work->aio);
104 		break;
105 	default:
106 		fatal("bad state!", NNG_ESTATE);
107 		break;
108 	}
109 }
110 
111 struct work *
alloc_work(nng_socket sock)112 alloc_work(nng_socket sock)
113 {
114 	struct work *w;
115 	int          rv;
116 
117 	if ((w = nng_alloc(sizeof(*w))) == NULL) {
118 		fatal("nng_alloc", NNG_ENOMEM);
119 	}
120 	if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
121 		fatal("nng_aio_alloc", rv);
122 	}
123 	w->state = INIT;
124 	w->sock  = sock;
125 	return (w);
126 }
127 
128 // The server runs forever.
129 int
server(const char * url)130 server(const char *url)
131 {
132 	nng_socket   sock;
133 	struct work *works[PARALLEL];
134 	int          rv;
135 	int          i;
136 
137 	/*  Create the socket. */
138 	rv = nng_rep0_open_raw(&sock);
139 	if (rv != 0) {
140 		fatal("nng_rep0_open", rv);
141 	}
142 
143 	for (i = 0; i < PARALLEL; i++) {
144 		works[i] = alloc_work(sock);
145 	}
146 
147 	if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
148 		fatal("nng_listen", rv);
149 	}
150 
151 	for (i = 0; i < PARALLEL; i++) {
152 		server_cb(works[i]); // this starts them going (INIT state)
153 	}
154 
155 	for (;;) {
156 		nng_msleep(3600000); // neither pause() nor sleep() portable
157 	}
158 }
159 
160 /*  The client runs just once, and then returns. */
161 int
client(const char * url,const char * msecstr)162 client(const char *url, const char *msecstr)
163 {
164 	nng_socket sock;
165 	int        rv;
166 	nng_msg *  msg;
167 	nng_time   start;
168 	nng_time   end;
169 	unsigned   msec;
170 
171 	msec = atoi(msecstr) * 1000;
172 
173 	if ((rv = nng_req0_open(&sock)) != 0) {
174 		fatal("nng_req0_open", rv);
175 	}
176 
177 	if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
178 		fatal("nng_dial", rv);
179 	}
180 
181 	start = nng_clock();
182 
183 	if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
184 		fatal("nng_msg_alloc", rv);
185 	}
186 	if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
187 		fatal("nng_msg_append_u32", rv);
188 	}
189 
190 	if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
191 		fatal("nng_send", rv);
192 	}
193 
194 	if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
195 		fatal("nng_recvmsg", rv);
196 	}
197 	end = nng_clock();
198 	nng_msg_free(msg);
199 	nng_close(sock);
200 
201 	printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
202 	return (0);
203 }
204 
205 int
main(int argc,char ** argv)206 main(int argc, char **argv)
207 {
208 	int rc;
209 
210 	if (argc < 3) {
211 		fprintf(stderr, "Usage: %s <url> [-s|<secs>]\n", argv[0]);
212 		exit(EXIT_FAILURE);
213 	}
214 	if (strcmp(argv[2], "-s") == 0) {
215 		rc = server(argv[1]);
216 	} else {
217 		rc = client(argv[1], argv[2]);
218 	}
219 	exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
220 }
221