xref: /minix/external/bsd/libevent/dist/evrpc.c (revision 9f988b79)
1 /*	$NetBSD: evrpc.c,v 1.2 2013/04/11 16:56:41 christos Exp $	*/
2 /*
3  * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "event2/event-config.h"
29 #include <sys/cdefs.h>
30 __RCSID("$NetBSD: evrpc.c,v 1.2 2013/04/11 16:56:41 christos Exp $");
31 
32 #ifdef WIN32
33 #define WIN32_LEAN_AND_MEAN
34 #include <winsock2.h>
35 #include <windows.h>
36 #undef WIN32_LEAN_AND_MEAN
37 #endif
38 
39 #include <sys/types.h>
40 #ifndef WIN32
41 #include <sys/socket.h>
42 #endif
43 #ifdef _EVENT_HAVE_SYS_TIME_H
44 #include <sys/time.h>
45 #endif
46 #include <sys/queue.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #ifndef WIN32
50 #include <unistd.h>
51 #endif
52 #include <errno.h>
53 #include <signal.h>
54 #include <string.h>
55 
56 #include <sys/queue.h>
57 
58 #include "event2/event.h"
59 #include "event2/event_struct.h"
60 #include "event2/rpc.h"
61 #include "event2/rpc_struct.h"
62 #include "evrpc-internal.h"
63 #include "event2/http.h"
64 #include "event2/buffer.h"
65 #include "event2/tag.h"
66 #include "event2/http_struct.h"
67 #include "event2/http_compat.h"
68 #include "event2/util.h"
69 #include "util-internal.h"
70 #include "log-internal.h"
71 #include "mm-internal.h"
72 
73 struct evrpc_base *
74 evrpc_init(struct evhttp *http_server)
75 {
76 	struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
77 	if (base == NULL)
78 		return (NULL);
79 
80 	/* we rely on the tagging sub system */
81 	evtag_init();
82 
83 	TAILQ_INIT(&base->registered_rpcs);
84 	TAILQ_INIT(&base->input_hooks);
85 	TAILQ_INIT(&base->output_hooks);
86 
87 	TAILQ_INIT(&base->paused_requests);
88 
89 	base->http_server = http_server;
90 
91 	return (base);
92 }
93 
94 void
95 evrpc_free(struct evrpc_base *base)
96 {
97 	struct evrpc *rpc;
98 	struct evrpc_hook *hook;
99 	struct evrpc_hook_ctx *paused;
100 	int r;
101 
102 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
103 		r = evrpc_unregister_rpc(base, rpc->uri);
104 		EVUTIL_ASSERT(r == 0);
105 	}
106 	while ((paused = TAILQ_FIRST(&base->paused_requests)) != NULL) {
107 		TAILQ_REMOVE(&base->paused_requests, paused, next);
108 		mm_free(paused);
109 	}
110 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
111 		r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
112 		EVUTIL_ASSERT(r);
113 	}
114 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
115 		r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
116 		EVUTIL_ASSERT(r);
117 	}
118 	mm_free(base);
119 }
120 
121 void *
122 evrpc_add_hook(void *vbase,
123     enum EVRPC_HOOK_TYPE hook_type,
124     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
125     void *cb_arg)
126 {
127 	struct _evrpc_hooks *base = vbase;
128 	struct evrpc_hook_list *head = NULL;
129 	struct evrpc_hook *hook = NULL;
130 	switch (hook_type) {
131 	case EVRPC_INPUT:
132 		head = &base->in_hooks;
133 		break;
134 	case EVRPC_OUTPUT:
135 		head = &base->out_hooks;
136 		break;
137 	default:
138 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
139 	}
140 
141 	hook = mm_calloc(1, sizeof(struct evrpc_hook));
142 	EVUTIL_ASSERT(hook != NULL);
143 
144 	hook->process = cb;
145 	hook->process_arg = cb_arg;
146 	TAILQ_INSERT_TAIL(head, hook, next);
147 
148 	return (hook);
149 }
150 
151 static int
152 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
153 {
154 	struct evrpc_hook *hook = NULL;
155 	TAILQ_FOREACH(hook, head, next) {
156 		if (hook == handle) {
157 			TAILQ_REMOVE(head, hook, next);
158 			mm_free(hook);
159 			return (1);
160 		}
161 	}
162 
163 	return (0);
164 }
165 
166 /*
167  * remove the hook specified by the handle
168  */
169 
170 int
171 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
172 {
173 	struct _evrpc_hooks *base = vbase;
174 	struct evrpc_hook_list *head = NULL;
175 	switch (hook_type) {
176 	case EVRPC_INPUT:
177 		head = &base->in_hooks;
178 		break;
179 	case EVRPC_OUTPUT:
180 		head = &base->out_hooks;
181 		break;
182 	default:
183 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
184 	}
185 
186 	return (evrpc_remove_hook_internal(head, handle));
187 }
188 
189 static int
190 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
191     struct evhttp_request *req, struct evbuffer *evbuf)
192 {
193 	struct evrpc_hook *hook;
194 	TAILQ_FOREACH(hook, head, next) {
195 		int res = hook->process(ctx, req, evbuf, hook->process_arg);
196 		if (res != EVRPC_CONTINUE)
197 			return (res);
198 	}
199 
200 	return (EVRPC_CONTINUE);
201 }
202 
203 static void evrpc_pool_schedule(struct evrpc_pool *pool);
204 static void evrpc_request_cb(struct evhttp_request *, void *);
205 
206 /*
207  * Registers a new RPC with the HTTP server.   The evrpc object is expected
208  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
209  * calls this function.
210  */
211 
212 static char *
213 evrpc_construct_uri(const char *uri)
214 {
215 	char *constructed_uri;
216 	size_t constructed_uri_len;
217 
218 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
219 	if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
220 		event_err(1, "%s: failed to register rpc at %s",
221 		    __func__, uri);
222 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
223 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
224 	constructed_uri[constructed_uri_len - 1] = '\0';
225 
226 	return (constructed_uri);
227 }
228 
229 int
230 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
231     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
232 {
233 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
234 
235 	rpc->base = base;
236 	rpc->cb = cb;
237 	rpc->cb_arg = cb_arg;
238 
239 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
240 
241 	evhttp_set_cb(base->http_server,
242 	    constructed_uri,
243 	    evrpc_request_cb,
244 	    rpc);
245 
246 	mm_free(constructed_uri);
247 
248 	return (0);
249 }
250 
251 int
252 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
253 {
254 	char *registered_uri = NULL;
255 	struct evrpc *rpc;
256 	int r;
257 
258 	/* find the right rpc; linear search might be slow */
259 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
260 		if (strcmp(rpc->uri, name) == 0)
261 			break;
262 	}
263 	if (rpc == NULL) {
264 		/* We did not find an RPC with this name */
265 		return (-1);
266 	}
267 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
268 
269 	registered_uri = evrpc_construct_uri(name);
270 
271 	/* remove the http server callback */
272 	r = evhttp_del_cb(base->http_server, registered_uri);
273 	EVUTIL_ASSERT(r == 0);
274 
275 	mm_free(registered_uri);
276 
277 	mm_free(__UNCONST(rpc->uri));
278 	mm_free(rpc);
279 	return (0);
280 }
281 
282 static int evrpc_pause_request(void *vbase, void *ctx,
283     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
284 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
285 
286 static void
287 evrpc_request_cb(struct evhttp_request *req, void *arg)
288 {
289 	struct evrpc *rpc = arg;
290 	struct evrpc_req_generic *rpc_state = NULL;
291 
292 	/* let's verify the outside parameters */
293 	if (req->type != EVHTTP_REQ_POST ||
294 	    evbuffer_get_length(req->input_buffer) <= 0)
295 		goto error;
296 
297 	rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
298 	if (rpc_state == NULL)
299 		goto error;
300 	rpc_state->rpc = rpc;
301 	rpc_state->http_req = req;
302 	rpc_state->rpc_data = NULL;
303 
304 	if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
305 		int hook_res;
306 
307 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
308 
309 		/*
310 		 * allow hooks to modify the outgoing request
311 		 */
312 		hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
313 		    rpc_state, req, req->input_buffer);
314 		switch (hook_res) {
315 		case EVRPC_TERMINATE:
316 			goto error;
317 		case EVRPC_PAUSE:
318 			evrpc_pause_request(rpc->base, rpc_state,
319 			    evrpc_request_cb_closure);
320 			return;
321 		case EVRPC_CONTINUE:
322 			break;
323 		default:
324 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
325 			    hook_res == EVRPC_CONTINUE ||
326 			    hook_res == EVRPC_PAUSE);
327 		}
328 	}
329 
330 	evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
331 	return;
332 
333 error:
334 	if (rpc_state != NULL)
335 		evrpc_reqstate_free(rpc_state);
336 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
337 	return;
338 }
339 
340 static void
341 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
342 {
343 	struct evrpc_req_generic *rpc_state = arg;
344 	struct evrpc *rpc;
345 	struct evhttp_request *req;
346 
347 	EVUTIL_ASSERT(rpc_state);
348 	rpc = rpc_state->rpc;
349 	req = rpc_state->http_req;
350 
351 	if (hook_res == EVRPC_TERMINATE)
352 		goto error;
353 
354 	/* let's check that we can parse the request */
355 	rpc_state->request = rpc->request_new(rpc->request_new_arg);
356 	if (rpc_state->request == NULL)
357 		goto error;
358 
359 	if (rpc->request_unmarshal(
360 		    rpc_state->request, req->input_buffer) == -1) {
361 		/* we failed to parse the request; that's a bummer */
362 		goto error;
363 	}
364 
365 	/* at this point, we have a well formed request, prepare the reply */
366 
367 	rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
368 	if (rpc_state->reply == NULL)
369 		goto error;
370 
371 	/* give the rpc to the user; they can deal with it */
372 	rpc->cb(rpc_state, rpc->cb_arg);
373 
374 	return;
375 
376 error:
377 	if (rpc_state != NULL)
378 		evrpc_reqstate_free(rpc_state);
379 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
380 	return;
381 }
382 
383 
384 void
385 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
386 {
387 	struct evrpc *rpc;
388 	EVUTIL_ASSERT(rpc_state != NULL);
389 	rpc = rpc_state->rpc;
390 
391 	/* clean up all memory */
392 	if (rpc_state->hook_meta != NULL)
393 		evrpc_hook_context_free(rpc_state->hook_meta);
394 	if (rpc_state->request != NULL)
395 		rpc->request_free(rpc_state->request);
396 	if (rpc_state->reply != NULL)
397 		rpc->reply_free(rpc_state->reply);
398 	if (rpc_state->rpc_data != NULL)
399 		evbuffer_free(rpc_state->rpc_data);
400 	mm_free(rpc_state);
401 }
402 
403 static void
404 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
405 
406 void
407 evrpc_request_done(struct evrpc_req_generic *rpc_state)
408 {
409 	struct evhttp_request *req;
410 	struct evrpc *rpc;
411 
412 	EVUTIL_ASSERT(rpc_state);
413 
414 	req = rpc_state->http_req;
415 	rpc = rpc_state->rpc;
416 
417 	if (rpc->reply_complete(rpc_state->reply) == -1) {
418 		/* the reply was not completely filled in.  error out */
419 		goto error;
420 	}
421 
422 	if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
423 		/* out of memory */
424 		goto error;
425 	}
426 
427 	/* serialize the reply */
428 	rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
429 
430 	if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
431 		int hook_res;
432 
433 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
434 
435 		/* do hook based tweaks to the request */
436 		hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
437 		    rpc_state, req, rpc_state->rpc_data);
438 		switch (hook_res) {
439 		case EVRPC_TERMINATE:
440 			goto error;
441 		case EVRPC_PAUSE:
442 			if (evrpc_pause_request(rpc->base, rpc_state,
443 				evrpc_request_done_closure) == -1)
444 				goto error;
445 			return;
446 		case EVRPC_CONTINUE:
447 			break;
448 		default:
449 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
450 			    hook_res == EVRPC_CONTINUE ||
451 			    hook_res == EVRPC_PAUSE);
452 		}
453 	}
454 
455 	evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
456 	return;
457 
458 error:
459 	if (rpc_state != NULL)
460 		evrpc_reqstate_free(rpc_state);
461 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
462 	return;
463 }
464 
465 void *
466 evrpc_get_request(struct evrpc_req_generic *req)
467 {
468 	return req->request;
469 }
470 
471 void *
472 evrpc_get_reply(struct evrpc_req_generic *req)
473 {
474 	return req->reply;
475 }
476 
477 static void
478 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
479 {
480 	struct evrpc_req_generic *rpc_state = arg;
481 	struct evhttp_request *req;
482 	EVUTIL_ASSERT(rpc_state);
483 	req = rpc_state->http_req;
484 
485 	if (hook_res == EVRPC_TERMINATE)
486 		goto error;
487 
488 	/* on success, we are going to transmit marshaled binary data */
489 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
490 		evhttp_add_header(req->output_headers,
491 		    "Content-Type", "application/octet-stream");
492 	}
493 	evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
494 
495 	evrpc_reqstate_free(rpc_state);
496 
497 	return;
498 
499 error:
500 	if (rpc_state != NULL)
501 		evrpc_reqstate_free(rpc_state);
502 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
503 	return;
504 }
505 
506 
507 /* Client implementation of RPC site */
508 
509 static int evrpc_schedule_request(struct evhttp_connection *connection,
510     struct evrpc_request_wrapper *ctx);
511 
512 struct evrpc_pool *
513 evrpc_pool_new(struct event_base *base)
514 {
515 	struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
516 	if (pool == NULL)
517 		return (NULL);
518 
519 	TAILQ_INIT(&pool->connections);
520 	TAILQ_INIT(&pool->requests);
521 
522 	TAILQ_INIT(&pool->paused_requests);
523 
524 	TAILQ_INIT(&pool->input_hooks);
525 	TAILQ_INIT(&pool->output_hooks);
526 
527 	pool->base = base;
528 	pool->timeout = -1;
529 
530 	return (pool);
531 }
532 
533 static void
534 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
535 {
536 	if (request->hook_meta != NULL)
537 		evrpc_hook_context_free(request->hook_meta);
538 	mm_free(request->name);
539 	mm_free(request);
540 }
541 
542 void
543 evrpc_pool_free(struct evrpc_pool *pool)
544 {
545 	struct evhttp_connection *connection;
546 	struct evrpc_request_wrapper *request;
547 	struct evrpc_hook_ctx *paused;
548 	struct evrpc_hook *hook;
549 	int r;
550 
551 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
552 		TAILQ_REMOVE(&pool->requests, request, next);
553 		evrpc_request_wrapper_free(request);
554 	}
555 
556 	while ((paused = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
557 		TAILQ_REMOVE(&pool->paused_requests, paused, next);
558 		mm_free(paused);
559 	}
560 
561 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
562 		TAILQ_REMOVE(&pool->connections, connection, next);
563 		evhttp_connection_free(connection);
564 	}
565 
566 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
567 		r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
568 		EVUTIL_ASSERT(r);
569 	}
570 
571 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
572 		r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
573 		EVUTIL_ASSERT(r);
574 	}
575 
576 	mm_free(pool);
577 }
578 
579 /*
580  * Add a connection to the RPC pool.   A request scheduled on the pool
581  * may use any available connection.
582  */
583 
584 void
585 evrpc_pool_add_connection(struct evrpc_pool *pool,
586     struct evhttp_connection *connection)
587 {
588 	EVUTIL_ASSERT(connection->http_server == NULL);
589 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
590 
591 	/*
592 	 * associate an event base with this connection
593 	 */
594 	if (pool->base != NULL)
595 		evhttp_connection_set_base(connection, pool->base);
596 
597 	/*
598 	 * unless a timeout was specifically set for a connection,
599 	 * the connection inherits the timeout from the pool.
600 	 */
601 	if (connection->timeout == -1)
602 		connection->timeout = pool->timeout;
603 
604 	/*
605 	 * if we have any requests pending, schedule them with the new
606 	 * connections.
607 	 */
608 
609 	if (TAILQ_FIRST(&pool->requests) != NULL) {
610 		struct evrpc_request_wrapper *request =
611 		    TAILQ_FIRST(&pool->requests);
612 		TAILQ_REMOVE(&pool->requests, request, next);
613 		evrpc_schedule_request(connection, request);
614 	}
615 }
616 
617 void
618 evrpc_pool_remove_connection(struct evrpc_pool *pool,
619     struct evhttp_connection *connection)
620 {
621 	TAILQ_REMOVE(&pool->connections, connection, next);
622 }
623 
624 void
625 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
626 {
627 	struct evhttp_connection *evcon;
628 	TAILQ_FOREACH(evcon, &pool->connections, next) {
629 		evcon->timeout = timeout_in_secs;
630 	}
631 	pool->timeout = timeout_in_secs;
632 }
633 
634 
635 static void evrpc_reply_done(struct evhttp_request *, void *);
636 static void evrpc_request_timeout(evutil_socket_t, short, void *);
637 
638 /*
639  * Finds a connection object associated with the pool that is currently
640  * idle and can be used to make a request.
641  */
642 static struct evhttp_connection *
643 evrpc_pool_find_connection(struct evrpc_pool *pool)
644 {
645 	struct evhttp_connection *connection;
646 	TAILQ_FOREACH(connection, &pool->connections, next) {
647 		if (TAILQ_FIRST(&connection->requests) == NULL)
648 			return (connection);
649 	}
650 
651 	return (NULL);
652 }
653 
654 /*
655  * Prototypes responsible for evrpc scheduling and hooking
656  */
657 
658 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
659 
660 /*
661  * We assume that the ctx is no longer queued on the pool.
662  */
663 static int
664 evrpc_schedule_request(struct evhttp_connection *connection,
665     struct evrpc_request_wrapper *ctx)
666 {
667 	struct evhttp_request *req = NULL;
668 	struct evrpc_pool *pool = ctx->pool;
669 	struct evrpc_status status;
670 
671 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
672 		goto error;
673 
674 	/* serialize the request data into the output buffer */
675 	ctx->request_marshal(req->output_buffer, ctx->request);
676 
677 	/* we need to know the connection that we might have to abort */
678 	ctx->evcon = connection;
679 
680 	/* if we get paused we also need to know the request */
681 	ctx->req = req;
682 
683 	if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
684 		int hook_res;
685 
686 		evrpc_hook_associate_meta(&ctx->hook_meta, connection);
687 
688 		/* apply hooks to the outgoing request */
689 		hook_res = evrpc_process_hooks(&pool->output_hooks,
690 		    ctx, req, req->output_buffer);
691 
692 		switch (hook_res) {
693 		case EVRPC_TERMINATE:
694 			goto error;
695 		case EVRPC_PAUSE:
696 			/* we need to be explicitly resumed */
697 			if (evrpc_pause_request(pool, ctx,
698 				evrpc_schedule_request_closure) == -1)
699 				goto error;
700 			return (0);
701 		case EVRPC_CONTINUE:
702 			/* we can just continue */
703 			break;
704 		default:
705 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
706 			    hook_res == EVRPC_CONTINUE ||
707 			    hook_res == EVRPC_PAUSE);
708 		}
709 	}
710 
711 	evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
712 	return (0);
713 
714 error:
715 	memset(&status, 0, sizeof(status));
716 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
717 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
718 	evrpc_request_wrapper_free(ctx);
719 	return (-1);
720 }
721 
722 static void
723 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
724 {
725 	struct evrpc_request_wrapper *ctx = arg;
726 	struct evhttp_connection *connection = ctx->evcon;
727 	struct evhttp_request *req = ctx->req;
728 	struct evrpc_pool *pool = ctx->pool;
729 	struct evrpc_status status;
730 	char *uri = NULL;
731 	int res = 0;
732 
733 	if (hook_res == EVRPC_TERMINATE)
734 		goto error;
735 
736 	uri = evrpc_construct_uri(ctx->name);
737 	if (uri == NULL)
738 		goto error;
739 
740 	if (pool->timeout > 0) {
741 		/*
742 		 * a timeout after which the whole rpc is going to be aborted.
743 		 */
744 		struct timeval tv;
745 		evutil_timerclear(&tv);
746 		tv.tv_sec = pool->timeout;
747 		evtimer_add(&ctx->ev_timeout, &tv);
748 	}
749 
750 	/* start the request over the connection */
751 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
752 	mm_free(uri);
753 
754 	if (res == -1)
755 		goto error;
756 
757 	return;
758 
759 error:
760 	memset(&status, 0, sizeof(status));
761 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
762 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
763 	evrpc_request_wrapper_free(ctx);
764 }
765 
766 /* we just queue the paused request on the pool under the req object */
767 static int
768 evrpc_pause_request(void *vbase, void *ctx,
769     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
770 {
771 	struct _evrpc_hooks *base = vbase;
772 	struct evrpc_hook_ctx *paused = mm_malloc(sizeof(*paused));
773 	if (paused == NULL)
774 		return (-1);
775 
776 	paused->ctx = ctx;
777 	paused->cb = cb;
778 
779 	TAILQ_INSERT_TAIL(&base->pause_requests, paused, next);
780 	return (0);
781 }
782 
783 int
784 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
785 {
786 	struct _evrpc_hooks *base = vbase;
787 	struct evrpc_pause_list *head = &base->pause_requests;
788 	struct evrpc_hook_ctx *paused;
789 
790 	TAILQ_FOREACH(paused, head, next) {
791 		if (paused->ctx == ctx)
792 			break;
793 	}
794 
795 	if (paused == NULL)
796 		return (-1);
797 
798 	(*paused->cb)(paused->ctx, res);
799 	TAILQ_REMOVE(head, paused, next);
800 	mm_free(paused);
801 	return (0);
802 }
803 
804 int
805 evrpc_make_request(struct evrpc_request_wrapper *ctx)
806 {
807 	struct evrpc_pool *pool = ctx->pool;
808 
809 	/* initialize the event structure for this rpc */
810 	evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
811 
812 	/* we better have some available connections on the pool */
813 	EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
814 
815 	/*
816 	 * if no connection is available, we queue the request on the pool,
817 	 * the next time a connection is empty, the rpc will be send on that.
818 	 */
819 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
820 
821 	evrpc_pool_schedule(pool);
822 
823 	return (0);
824 }
825 
826 
827 struct evrpc_request_wrapper *
828 evrpc_make_request_ctx(
829 	struct evrpc_pool *pool, void *request, void *reply,
830 	const char *rpcname,
831 	void (*req_marshal)(struct evbuffer*, void *),
832 	void (*rpl_clear)(void *),
833 	int (*rpl_unmarshal)(void *, struct evbuffer *),
834 	void (*cb)(struct evrpc_status *, void *, void *, void *),
835 	void *cbarg)
836 {
837 	struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
838 	    mm_malloc(sizeof(struct evrpc_request_wrapper));
839 	if (ctx == NULL)
840 		return (NULL);
841 
842 	ctx->pool = pool;
843 	ctx->hook_meta = NULL;
844 	ctx->evcon = NULL;
845 	ctx->name = mm_strdup(rpcname);
846 	if (ctx->name == NULL) {
847 		mm_free(ctx);
848 		return (NULL);
849 	}
850 	ctx->cb = cb;
851 	ctx->cb_arg = cbarg;
852 	ctx->request = request;
853 	ctx->reply = reply;
854 	ctx->request_marshal = req_marshal;
855 	ctx->reply_clear = rpl_clear;
856 	ctx->reply_unmarshal = rpl_unmarshal;
857 
858 	return (ctx);
859 }
860 
861 static void
862 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
863 
864 static void
865 evrpc_reply_done(struct evhttp_request *req, void *arg)
866 {
867 	struct evrpc_request_wrapper *ctx = arg;
868 	struct evrpc_pool *pool = ctx->pool;
869 	int hook_res = EVRPC_CONTINUE;
870 
871 	/* cancel any timeout we might have scheduled */
872 	event_del(&ctx->ev_timeout);
873 
874 	ctx->req = req;
875 
876 	/* we need to get the reply now */
877 	if (req == NULL) {
878 		evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
879 		return;
880 	}
881 
882 	if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
883 		evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
884 
885 		/* apply hooks to the incoming request */
886 		hook_res = evrpc_process_hooks(&pool->input_hooks,
887 		    ctx, req, req->input_buffer);
888 
889 		switch (hook_res) {
890 		case EVRPC_TERMINATE:
891 		case EVRPC_CONTINUE:
892 			break;
893 		case EVRPC_PAUSE:
894 			/*
895 			 * if we get paused we also need to know the
896 			 * request.  unfortunately, the underlying
897 			 * layer is going to free it.  we need to
898 			 * request ownership explicitly
899 			 */
900 			if (req != NULL)
901 				evhttp_request_own(req);
902 
903 			evrpc_pause_request(pool, ctx,
904 			    evrpc_reply_done_closure);
905 			return;
906 		default:
907 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
908 			    hook_res == EVRPC_CONTINUE ||
909 			    hook_res == EVRPC_PAUSE);
910 		}
911 	}
912 
913 	evrpc_reply_done_closure(ctx, hook_res);
914 
915 	/* http request is being freed by underlying layer */
916 }
917 
918 static void
919 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
920 {
921 	struct evrpc_request_wrapper *ctx = arg;
922 	struct evhttp_request *req = ctx->req;
923 	struct evrpc_pool *pool = ctx->pool;
924 	struct evrpc_status status;
925 	int res = -1;
926 
927 	memset(&status, 0, sizeof(status));
928 	status.http_req = req;
929 
930 	/* we need to get the reply now */
931 	if (req == NULL) {
932 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
933 	} else if (hook_res == EVRPC_TERMINATE) {
934 		status.error = EVRPC_STATUS_ERR_HOOKABORTED;
935 	} else {
936 		res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
937 		if (res == -1)
938 			status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
939 	}
940 
941 	if (res == -1) {
942 		/* clear everything that we might have written previously */
943 		ctx->reply_clear(ctx->reply);
944 	}
945 
946 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
947 
948 	evrpc_request_wrapper_free(ctx);
949 
950 	/* the http layer owned the original request structure, but if we
951 	 * got paused, we asked for ownership and need to free it here. */
952 	if (req != NULL && evhttp_request_is_owned(req))
953 		evhttp_request_free(req);
954 
955 	/* see if we can schedule another request */
956 	evrpc_pool_schedule(pool);
957 }
958 
959 static void
960 evrpc_pool_schedule(struct evrpc_pool *pool)
961 {
962 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
963 	struct evhttp_connection *evcon;
964 
965 	/* if no requests are pending, we have no work */
966 	if (ctx == NULL)
967 		return;
968 
969 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
970 		TAILQ_REMOVE(&pool->requests, ctx, next);
971 		evrpc_schedule_request(evcon, ctx);
972 	}
973 }
974 
975 static void
976 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
977 {
978 	struct evrpc_request_wrapper *ctx = arg;
979 	struct evhttp_connection *evcon = ctx->evcon;
980 	EVUTIL_ASSERT(evcon != NULL);
981 
982 	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
983 }
984 
985 /*
986  * frees potential meta data associated with a request.
987  */
988 
989 static void
990 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
991 {
992 	struct evrpc_meta *entry;
993 	EVUTIL_ASSERT(meta_data != NULL);
994 
995 	while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
996 		TAILQ_REMOVE(meta_data, entry, next);
997 		mm_free(entry->key);
998 		mm_free(entry->data);
999 		mm_free(entry);
1000 	}
1001 }
1002 
1003 static struct evrpc_hook_meta *
1004 evrpc_hook_meta_new(void)
1005 {
1006 	struct evrpc_hook_meta *ctx;
1007 	ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1008 	EVUTIL_ASSERT(ctx != NULL);
1009 
1010 	TAILQ_INIT(&ctx->meta_data);
1011 	ctx->evcon = NULL;
1012 
1013 	return (ctx);
1014 }
1015 
1016 static void
1017 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
1018     struct evhttp_connection *evcon)
1019 {
1020 	struct evrpc_hook_meta *ctx = *pctx;
1021 	if (ctx == NULL)
1022 		*pctx = ctx = evrpc_hook_meta_new();
1023 	ctx->evcon = evcon;
1024 }
1025 
1026 static void
1027 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
1028 {
1029 	evrpc_meta_data_free(&ctx->meta_data);
1030 	mm_free(ctx);
1031 }
1032 
1033 /* Adds meta data */
1034 void
1035 evrpc_hook_add_meta(void *ctx, const char *key,
1036     const void *data, size_t data_size)
1037 {
1038 	struct evrpc_request_wrapper *req = ctx;
1039 	struct evrpc_hook_meta *store = NULL;
1040 	struct evrpc_meta *meta = NULL;
1041 
1042 	if ((store = req->hook_meta) == NULL)
1043 		store = req->hook_meta = evrpc_hook_meta_new();
1044 
1045 	meta = mm_malloc(sizeof(struct evrpc_meta));
1046 	EVUTIL_ASSERT(meta != NULL);
1047 	meta->key = mm_strdup(key);
1048 	EVUTIL_ASSERT(meta->key != NULL);
1049 	meta->data_size = data_size;
1050 	meta->data = mm_malloc(data_size);
1051 	EVUTIL_ASSERT(meta->data != NULL);
1052 	memcpy(meta->data, data, data_size);
1053 
1054 	TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1055 }
1056 
1057 int
1058 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1059 {
1060 	struct evrpc_request_wrapper *req = ctx;
1061 	struct evrpc_meta *meta = NULL;
1062 
1063 	if (req->hook_meta == NULL)
1064 		return (-1);
1065 
1066 	TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1067 		if (strcmp(meta->key, key) == 0) {
1068 			*data = meta->data;
1069 			*data_size = meta->data_size;
1070 			return (0);
1071 		}
1072 	}
1073 
1074 	return (-1);
1075 }
1076 
1077 struct evhttp_connection *
1078 evrpc_hook_get_connection(void *ctx)
1079 {
1080 	struct evrpc_request_wrapper *req = ctx;
1081 	return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1082 }
1083 
1084 int
1085 evrpc_send_request_generic(struct evrpc_pool *pool,
1086     void *request, void *reply,
1087     void (*cb)(struct evrpc_status *, void *, void *, void *),
1088     void *cb_arg,
1089     const char *rpcname,
1090     void (*req_marshal)(struct evbuffer *, void *),
1091     void (*rpl_clear)(void *),
1092     int (*rpl_unmarshal)(void *, struct evbuffer *))
1093 {
1094 	struct evrpc_status status;
1095 	struct evrpc_request_wrapper *ctx;
1096 	ctx = evrpc_make_request_ctx(pool, request, reply,
1097 	    rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1098 	if (ctx == NULL)
1099 		goto error;
1100 	return (evrpc_make_request(ctx));
1101 error:
1102 	memset(&status, 0, sizeof(status));
1103 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
1104 	(*(cb))(&status, request, reply, cb_arg);
1105 	return (-1);
1106 }
1107 
1108 /** Takes a request object and fills it in with the right magic */
1109 static struct evrpc *
1110 evrpc_register_object(const char *name,
1111     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1112     int (*req_unmarshal)(void *, struct evbuffer *),
1113     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1114     int (*rpl_complete)(void *),
1115     void (*rpl_marshal)(struct evbuffer *, void *))
1116 {
1117 	struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1118 	if (rpc == NULL)
1119 		return (NULL);
1120 	rpc->uri = mm_strdup(name);
1121 	if (rpc->uri == NULL) {
1122 		mm_free(rpc);
1123 		return (NULL);
1124 	}
1125 	rpc->request_new = req_new;
1126 	rpc->request_new_arg = req_new_arg;
1127 	rpc->request_free = req_free;
1128 	rpc->request_unmarshal = req_unmarshal;
1129 	rpc->reply_new = rpl_new;
1130 	rpc->reply_new_arg = rpl_new_arg;
1131 	rpc->reply_free = rpl_free;
1132 	rpc->reply_complete = rpl_complete;
1133 	rpc->reply_marshal = rpl_marshal;
1134 	return (rpc);
1135 }
1136 
1137 int
1138 evrpc_register_generic(struct evrpc_base *base, const char *name,
1139     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1140     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1141     int (*req_unmarshal)(void *, struct evbuffer *),
1142     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1143     int (*rpl_complete)(void *),
1144     void (*rpl_marshal)(struct evbuffer *, void *))
1145 {
1146 	struct evrpc* rpc =
1147 	    evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1148 		rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1149 	if (rpc == NULL)
1150 		return (-1);
1151 	evrpc_register_rpc(base, rpc,
1152 	    (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1153 	return (0);
1154 }
1155 
1156 /** accessors for obscure and undocumented functionality */
1157 struct evrpc_pool *
1158 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1159 {
1160 	return (ctx->pool);
1161 }
1162 
1163 void
1164 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1165     struct evrpc_pool *pool)
1166 {
1167 	ctx->pool = pool;
1168 }
1169 
1170 void
1171 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1172     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1173     void *cb_arg)
1174 {
1175 	ctx->cb = cb;
1176 	ctx->cb_arg = cb_arg;
1177 }
1178