1 //
2 // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt).  A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10 
11 #define INPROC_URL "inproc://rot13"
12 #define REST_URL "http://127.0.0.1:%u/api/rest/rot13"
13 
14 // REST API -> NNG REP server demonstration.
15 
16 // This is a silly demo -- it listens on port 8888 (or $PORT if present),
17 // and accepts HTTP POST requests at /api/rest/rot13
18 //
19 // These requests are converted into an NNG REQ message, and sent to an
20 // NNG REP server (builtin inproc_server, for demonstration purposes only).
21 // The reply is obtained from the server, and sent back to the client via
22 // the HTTP server framework.
23 
24 // Example usage:
25 //
26 // % export CPPFLAGS="-I /usr/local/include"
27 // % export LDFLAGS="-L /usr/local/lib -lnng"
28 // % export CC="cc"
29 // % ${CC} ${CPPFLAGS} server.c -o server ${LDFLAGS}
30 // % ./server &
31 // % curl -d TEST http://127.0.0.1:8888/api/rest/rot13
32 // GRFG
33 //
34 
35 #include <nng/nng.h>
36 #include <nng/protocol/reqrep0/rep.h>
37 #include <nng/protocol/reqrep0/req.h>
38 #include <nng/supplemental/http/http.h>
39 #include <nng/supplemental/util/platform.h>
40 
41 #include <ctype.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <string.h>
45 
46 // utility function
47 void
fatal(const char * what,int rv)48 fatal(const char *what, int rv)
49 {
50 	fprintf(stderr, "%s: %s\n", what, nng_strerror(rv));
51 	exit(1);
52 }
53 
54 // This server acts as a proxy.  We take HTTP POST requests, convert them to
55 // REQ messages, and when the reply is received, send the reply back to
56 // the original HTTP client.
57 //
58 // The state flow looks like:
59 //
60 // 1. Receive HTTP request & headers
61 // 2. Receive HTTP request (POST) data
62 // 3. Send POST payload as REQ body
63 // 4. Receive REP reply (including payload)
64 // 5. Return REP message body to the HTTP server (which forwards to client)
65 // 6. Restart at step 1.
66 //
67 // The above flow is pretty linear, and so we use contexts (nng_ctx) to
68 // obtain parallelism.
69 
70 typedef enum {
71 	SEND_REQ, // Sending REQ request
72 	RECV_REP, // Receiving REQ reply
73 } job_state;
74 
75 typedef struct rest_job {
76 	nng_aio *        http_aio; // aio from HTTP we must reply to
77 	nng_http_res *   http_res; // HTTP response object
78 	job_state        state;    // 0 = sending, 1 = receiving
79 	nng_msg *        msg;      // request message
80 	nng_aio *        aio;      // request flow
81 	nng_ctx          ctx;      // context on the request socket
82 	struct rest_job *next;     // next on the freelist
83 } rest_job;
84 
85 nng_socket req_sock;
86 
87 // We maintain a queue of free jobs.  This way we don't have to
88 // deallocate them from the callback; we just reuse them.
89 nng_mtx * job_lock;
90 rest_job *job_freelist;
91 
92 static void rest_job_cb(void *arg);
93 
94 static void
rest_recycle_job(rest_job * job)95 rest_recycle_job(rest_job *job)
96 {
97 	if (job->http_res != NULL) {
98 		nng_http_res_free(job->http_res);
99 		job->http_res = NULL;
100 	}
101 	if (job->msg != NULL) {
102 		nng_msg_free(job->msg);
103 		job->msg = NULL;
104 	}
105 	if (nng_ctx_id(job->ctx) != 0) {
106 		nng_ctx_close(job->ctx);
107 	}
108 
109 	nng_mtx_lock(job_lock);
110 	job->next    = job_freelist;
111 	job_freelist = job;
112 	nng_mtx_unlock(job_lock);
113 }
114 
115 static rest_job *
rest_get_job(void)116 rest_get_job(void)
117 {
118 	rest_job *job;
119 
120 	nng_mtx_lock(job_lock);
121 	if ((job = job_freelist) != NULL) {
122 		job_freelist = job->next;
123 		nng_mtx_unlock(job_lock);
124 		job->next = NULL;
125 		return (job);
126 	}
127 	nng_mtx_unlock(job_lock);
128 	if ((job = calloc(1, sizeof(*job))) == NULL) {
129 		return (NULL);
130 	}
131 	if (nng_aio_alloc(&job->aio, rest_job_cb, job) != 0) {
132 		free(job);
133 		return (NULL);
134 	}
135 	return (job);
136 }
137 
138 static void
rest_http_fatal(rest_job * job,const char * fmt,int rv)139 rest_http_fatal(rest_job *job, const char *fmt, int rv)
140 {
141 	char          buf[128];
142 	nng_aio *     aio = job->http_aio;
143 	nng_http_res *res = job->http_res;
144 
145 	job->http_res = NULL;
146 	job->http_aio = NULL;
147 	snprintf(buf, sizeof(buf), fmt, nng_strerror(rv));
148 	nng_http_res_set_status(res, NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR);
149 	nng_http_res_set_reason(res, buf);
150 	nng_aio_set_output(aio, 0, res);
151 	nng_aio_finish(aio, 0);
152 	rest_recycle_job(job);
153 }
154 
155 static void
rest_job_cb(void * arg)156 rest_job_cb(void *arg)
157 {
158 	rest_job *job = arg;
159 	nng_aio * aio = job->aio;
160 	int       rv;
161 
162 	switch (job->state) {
163 	case SEND_REQ:
164 		if ((rv = nng_aio_result(aio)) != 0) {
165 			rest_http_fatal(job, "send REQ failed: %s", rv);
166 			return;
167 		}
168 		job->msg = NULL;
169 		// Message was sent, so now wait for the reply.
170 		nng_aio_set_msg(aio, NULL);
171 		job->state = RECV_REP;
172 		nng_ctx_recv(job->ctx, aio);
173 		break;
174 	case RECV_REP:
175 		if ((rv = nng_aio_result(aio)) != 0) {
176 			rest_http_fatal(job, "recv reply failed: %s", rv);
177 			return;
178 		}
179 		job->msg = nng_aio_get_msg(aio);
180 		// We got a reply, so give it back to the server.
181 		rv = nng_http_res_copy_data(job->http_res,
182 		    nng_msg_body(job->msg), nng_msg_len(job->msg));
183 		if (rv != 0) {
184 			rest_http_fatal(job, "nng_http_res_copy_data: %s", rv);
185 			return;
186 		}
187 		// Set the output - the HTTP server will send it back to the
188 		// user agent with a 200 response.
189 		nng_aio_set_output(job->http_aio, 0, job->http_res);
190 		nng_aio_finish(job->http_aio, 0);
191 		job->http_aio = NULL;
192 		job->http_res = NULL;
193 		// We are done with the job.
194 		rest_recycle_job(job);
195 		return;
196 	default:
197 		fatal("bad case", NNG_ESTATE);
198 		break;
199 	}
200 }
201 
202 // Our rest server just takes the message body, creates a request ID
203 // for it, and sends it on.  This runs in raw mode, so
204 void
rest_handle(nng_aio * aio)205 rest_handle(nng_aio *aio)
206 {
207 	struct rest_job *job;
208 	nng_http_req *   req  = nng_aio_get_input(aio, 0);
209 	nng_http_conn *  conn = nng_aio_get_input(aio, 2);
210 	const char *     clen;
211 	size_t           sz;
212 	nng_iov          iov;
213 	int              rv;
214 	void *           data;
215 
216 	if ((job = rest_get_job()) == NULL) {
217 		nng_aio_finish(aio, NNG_ENOMEM);
218 		return;
219 	}
220 	if (((rv = nng_http_res_alloc(&job->http_res)) != 0) ||
221 	    ((rv = nng_ctx_open(&job->ctx, req_sock)) != 0)) {
222 		rest_recycle_job(job);
223 		nng_aio_finish(aio, rv);
224 		return;
225 	}
226 
227 	nng_http_req_get_data(req, &data, &sz);
228 	job->http_aio = aio;
229 
230 	if ((rv = nng_msg_alloc(&job->msg, sz)) != 0) {
231 		rest_http_fatal(job, "nng_msg_alloc: %s", rv);
232 		return;
233 	}
234 
235 	memcpy(nng_msg_body(job->msg), data, sz);
236 	nng_aio_set_msg(job->aio, job->msg);
237 	job->state = SEND_REQ;
238 	nng_ctx_send(job->ctx, job->aio);
239 }
240 
241 void
rest_start(uint16_t port)242 rest_start(uint16_t port)
243 {
244 	nng_http_server * server;
245 	nng_http_handler *handler;
246 	char              rest_addr[128];
247 	nng_url *         url;
248 	int               rv;
249 
250 	if ((rv = nng_mtx_alloc(&job_lock)) != 0) {
251 		fatal("nng_mtx_alloc", rv);
252 	}
253 	job_freelist = NULL;
254 
255 	// Set up some strings, etc.  We use the port number
256 	// from the argument list.
257 	snprintf(rest_addr, sizeof(rest_addr), REST_URL, port);
258 	if ((rv = nng_url_parse(&url, rest_addr)) != 0) {
259 		fatal("nng_url_parse", rv);
260 	}
261 
262 	// Create the REQ socket, and put it in raw mode, connected to
263 	// the remote REP server (our inproc server in this case).
264 	if ((rv = nng_req0_open(&req_sock)) != 0) {
265 		fatal("nng_req0_open", rv);
266 	}
267 	if ((rv = nng_dial(req_sock, INPROC_URL, NULL, NNG_FLAG_NONBLOCK)) !=
268 	    0) {
269 		fatal("nng_dial(" INPROC_URL ")", rv);
270 	}
271 
272 	// Get a suitable HTTP server instance.  This creates one
273 	// if it doesn't already exist.
274 	if ((rv = nng_http_server_hold(&server, url)) != 0) {
275 		fatal("nng_http_server_hold", rv);
276 	}
277 
278 	// Allocate the handler - we use a dynamic handler for REST
279 	// using the function "rest_handle" declared above.
280 	rv = nng_http_handler_alloc(&handler, url->u_path, rest_handle);
281 	if (rv != 0) {
282 		fatal("nng_http_handler_alloc", rv);
283 	}
284 
285 	if ((rv = nng_http_handler_set_method(handler, "POST")) != 0) {
286 		fatal("nng_http_handler_set_method", rv);
287 	}
288 	// We want to collect the body, and we (arbitrarily) limit this to
289 	// 128KB.  The default limit is 1MB.  You can explicitly collect
290 	// the data yourself with another HTTP read transaction by disabling
291 	// this, but that's a lot of work, especially if you want to handle
292 	// chunked transfers.
293 	if ((rv = nng_http_handler_collect_body(handler, true, 1024 * 128)) !=
294 	    0) {
295 		fatal("nng_http_handler_collect_body", rv);
296 	}
297 	if ((rv = nng_http_server_add_handler(server, handler)) != 0) {
298 		fatal("nng_http_handler_add_handler", rv);
299 	}
300 	if ((rv = nng_http_server_start(server)) != 0) {
301 		fatal("nng_http_server_start", rv);
302 	}
303 
304 	nng_url_free(url);
305 }
306 
307 //
308 // inproc_server - this just is a simple REP server that listens for
309 // messages, and performs ROT13 on them before sending them.  This
310 // doesn't have to be in the same process -- it is hear for demonstration
311 // simplicity only.  (Most likely this would be somewhere else.)  Note
312 // especially that this uses inproc, so nothing can get to it directly
313 // from outside the process.
314 //
315 void
inproc_server(void * arg)316 inproc_server(void *arg)
317 {
318 	nng_socket s;
319 	int        rv;
320 	nng_msg *  msg;
321 
322 	if (((rv = nng_rep0_open(&s)) != 0) ||
323 	    ((rv = nng_listen(s, INPROC_URL, NULL, 0)) != 0)) {
324 		fatal("unable to set up inproc", rv);
325 	}
326 	// This is simple enough that we don't need concurrency.  Plus it
327 	// makes for an easier demo.
328 	for (;;) {
329 		char *body;
330 		if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
331 			fatal("inproc recvmsg", rv);
332 		}
333 		body = nng_msg_body(msg);
334 		for (int i = 0; i < nng_msg_len(msg); i++) {
335 			// Table lookup would be faster, but this works.
336 			if (isupper(body[i])) {
337 				char base = body[i] - 'A';
338 				base      = (base + 13) % 26;
339 				body[i]   = base + 'A';
340 			} else if (islower(body[i])) {
341 				char base = body[i] - 'a';
342 				base      = (base + 13) % 26;
343 				body[i]   = base + 'a';
344 			}
345 		}
346 		if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
347 			fatal("inproc sendmsg", rv);
348 		}
349 	}
350 }
351 
352 int
main(int argc,char ** argv)353 main(int argc, char **argv)
354 {
355 	int         rv;
356 	nng_thread *inproc_thr;
357 	uint16_t    port = 0;
358 
359 	rv = nng_thread_create(&inproc_thr, inproc_server, NULL);
360 	if (rv != 0) {
361 		fatal("cannot start inproc server", rv);
362 	}
363 	if (getenv("PORT") != NULL) {
364 		port = (uint16_t) atoi(getenv("PORT"));
365 	}
366 	port = port ? port : 8888;
367 	rest_start(port);
368 
369 	// This runs forever.  The inproc_thr never exits, so we
370 	// just block behind its condition variable.
371 	nng_thread_destroy(inproc_thr);
372 }
373