xref: /dragonfly/usr.sbin/nscd/mp_rs_query.c (revision 86d7f5d3)
1 /*-
2  * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD: src/usr.sbin/nscd/mp_rs_query.c,v 1.4 2008/10/12 00:44:27 delphij Exp $
27  */
28 
29 #include <sys/socket.h>
30 #include <sys/time.h>
31 #include <sys/types.h>
32 #include <sys/event.h>
33 #include <assert.h>
34 #include <errno.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38 
39 #include "cachelib.h"
40 #include "config.h"
41 #include "debug.h"
42 #include "log.h"
43 #include "query.h"
44 #include "mp_rs_query.h"
45 #include "mp_ws_query.h"
46 #include "singletons.h"
47 
48 static int on_mp_read_session_close_notification(struct query_state *);
49 static void on_mp_read_session_destroy(struct query_state *);
50 static int on_mp_read_session_mapper(struct query_state *);
51 /* int on_mp_read_session_request_read1(struct query_state *); */
52 static int on_mp_read_session_request_read2(struct query_state *);
53 static int on_mp_read_session_request_process(struct query_state *);
54 static int on_mp_read_session_response_write1(struct query_state *);
55 static int on_mp_read_session_read_request_process(struct query_state *);
56 static int on_mp_read_session_read_response_write1(struct query_state *);
57 static int on_mp_read_session_read_response_write2(struct query_state *);
58 
59 /*
60  * This function is used as the query_state's destroy_func to make the
61  * proper cleanup in case of errors.
62  */
63 static void
on_mp_read_session_destroy(struct query_state * qstate)64 on_mp_read_session_destroy(struct query_state *qstate)
65 {
66 	TRACE_IN(on_mp_read_session_destroy);
67 	finalize_comm_element(&qstate->request);
68 	finalize_comm_element(&qstate->response);
69 
70 	if (qstate->mdata != NULL) {
71 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
72 		close_cache_mp_read_session(
73 			(cache_mp_read_session)qstate->mdata);
74 		configuration_unlock_entry(qstate->config_entry,
75 			CELT_MULTIPART);
76 	}
77 	TRACE_OUT(on_mp_read_session_destroy);
78 }
79 
80 /*
81  * The functions below are used to process multipart read session initiation
82  * requests.
83  * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
84  *   the request itself
85  * - on_mp_read_session_request_process processes it
86  * - on_mp_read_session_response_write1 sends the response
87  */
88 int
on_mp_read_session_request_read1(struct query_state * qstate)89 on_mp_read_session_request_read1(struct query_state *qstate)
90 {
91 	struct cache_mp_read_session_request	*c_mp_rs_request;
92 	ssize_t	result;
93 
94 	TRACE_IN(on_mp_read_session_request_read1);
95 	if (qstate->kevent_watermark == 0)
96 		qstate->kevent_watermark = sizeof(size_t);
97 	else {
98 		init_comm_element(&qstate->request,
99 			CET_MP_READ_SESSION_REQUEST);
100 		c_mp_rs_request = get_cache_mp_read_session_request(
101 			&qstate->request);
102 
103 		result = qstate->read_func(qstate,
104 			&c_mp_rs_request->entry_length, sizeof(size_t));
105 
106 		if (result != sizeof(size_t)) {
107 			TRACE_OUT(on_mp_read_session_request_read1);
108 			return (-1);
109 		}
110 
111 		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
112 			TRACE_OUT(on_mp_read_session_request_read1);
113 			return (-1);
114 		}
115 
116 		c_mp_rs_request->entry = (char *)calloc(1,
117 			c_mp_rs_request->entry_length + 1);
118 		assert(c_mp_rs_request->entry != NULL);
119 
120 		qstate->kevent_watermark = c_mp_rs_request->entry_length;
121 		qstate->process_func = on_mp_read_session_request_read2;
122 	}
123 	TRACE_OUT(on_mp_read_session_request_read1);
124 	return (0);
125 }
126 
127 static int
on_mp_read_session_request_read2(struct query_state * qstate)128 on_mp_read_session_request_read2(struct query_state *qstate)
129 {
130 	struct cache_mp_read_session_request	*c_mp_rs_request;
131 	ssize_t	result;
132 
133 	TRACE_IN(on_mp_read_session_request_read2);
134 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
135 
136 	result = qstate->read_func(qstate, c_mp_rs_request->entry,
137 		c_mp_rs_request->entry_length);
138 
139 	if (result != qstate->kevent_watermark) {
140 		LOG_ERR_3("on_mp_read_session_request_read2",
141 			"read failed");
142 		TRACE_OUT(on_mp_read_session_request_read2);
143 		return (-1);
144 	}
145 
146 	qstate->kevent_watermark = 0;
147 	qstate->process_func = on_mp_read_session_request_process;
148 	TRACE_OUT(on_mp_read_session_request_read2);
149 	return (0);
150 }
151 
152 static int
on_mp_read_session_request_process(struct query_state * qstate)153 on_mp_read_session_request_process(struct query_state *qstate)
154 {
155 	struct cache_mp_read_session_request	*c_mp_rs_request;
156 	struct cache_mp_read_session_response	*c_mp_rs_response;
157 	cache_mp_read_session	rs;
158 	cache_entry	c_entry;
159 	char	*dec_cache_entry_name;
160 
161 	char *buffer;
162 	size_t buffer_size;
163 	cache_mp_write_session ws;
164 	struct agent	*lookup_agent;
165 	struct multipart_agent *mp_agent;
166 	void *mdata;
167 	int res;
168 
169 	TRACE_IN(on_mp_read_session_request_process);
170 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
171 	c_mp_rs_response = get_cache_mp_read_session_response(
172 		&qstate->response);
173 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
174 
175 	qstate->config_entry = configuration_find_entry(
176 		s_configuration, c_mp_rs_request->entry);
177 	if (qstate->config_entry == NULL) {
178 		c_mp_rs_response->error_code = ENOENT;
179 
180 		LOG_ERR_2("read_session_request",
181 			"can't find configuration entry '%s'."
182 			" aborting request", c_mp_rs_request->entry);
183 		goto fin;
184 	}
185 
186 	if (qstate->config_entry->enabled == 0) {
187 		c_mp_rs_response->error_code = EACCES;
188 
189 		LOG_ERR_2("read_session_request",
190 			"configuration entry '%s' is disabled",
191 			c_mp_rs_request->entry);
192 		goto fin;
193 	}
194 
195 	if (qstate->config_entry->perform_actual_lookups != 0)
196 		dec_cache_entry_name = strdup(
197 			qstate->config_entry->mp_cache_params.entry_name);
198 	else {
199 #ifdef NS_NSCD_EID_CHECKING
200 		if (check_query_eids(qstate) != 0) {
201 			c_mp_rs_response->error_code = EPERM;
202 			goto fin;
203 		}
204 #endif
205 
206 		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
207 			qstate->config_entry->mp_cache_params.entry_name);
208 	}
209 
210 	assert(dec_cache_entry_name != NULL);
211 
212 	configuration_lock_rdlock(s_configuration);
213 	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
214 	configuration_unlock(s_configuration);
215 
216 	if ((c_entry == INVALID_CACHE) &&
217 	   (qstate->config_entry->perform_actual_lookups != 0))
218 		c_entry = register_new_mp_cache_entry(qstate,
219 			dec_cache_entry_name);
220 
221 	free(dec_cache_entry_name);
222 
223 	if (c_entry != INVALID_CACHE_ENTRY) {
224 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
225 		rs = open_cache_mp_read_session(c_entry);
226 		configuration_unlock_entry(qstate->config_entry,
227 			CELT_MULTIPART);
228 
229 		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
230 		   (qstate->config_entry->perform_actual_lookups != 0)) {
231 			lookup_agent = find_agent(s_agent_table,
232 				c_mp_rs_request->entry, MULTIPART_AGENT);
233 
234 			if ((lookup_agent != NULL) &&
235 			(lookup_agent->type == MULTIPART_AGENT)) {
236 				mp_agent = (struct multipart_agent *)
237 					lookup_agent;
238 				mdata = mp_agent->mp_init_func();
239 
240 				/*
241 				 * Multipart agents read the whole snapshot
242 				 * of the data at one time.
243 				 */
244 				configuration_lock_entry(qstate->config_entry,
245 					CELT_MULTIPART);
246 				ws = open_cache_mp_write_session(c_entry);
247 				configuration_unlock_entry(qstate->config_entry,
248 					CELT_MULTIPART);
249 				if (ws != NULL) {
250 				    do {
251 					buffer = NULL;
252 					res = mp_agent->mp_lookup_func(&buffer,
253 						&buffer_size,
254 						mdata);
255 
256 					if ((res & NS_TERMINATE) &&
257 					   (buffer != NULL)) {
258 						configuration_lock_entry(
259 							qstate->config_entry,
260 							CELT_MULTIPART);
261 						if (cache_mp_write(ws, buffer,
262 						    buffer_size) != 0) {
263 							abandon_cache_mp_write_session(ws);
264 							ws = NULL;
265 						}
266 						configuration_unlock_entry(
267 							qstate->config_entry,
268 							CELT_MULTIPART);
269 
270 						free(buffer);
271 						buffer = NULL;
272 					} else {
273 						configuration_lock_entry(
274 							qstate->config_entry,
275 							CELT_MULTIPART);
276 						close_cache_mp_write_session(ws);
277 						configuration_unlock_entry(
278 							qstate->config_entry,
279 							CELT_MULTIPART);
280 
281 						free(buffer);
282 						buffer = NULL;
283 					}
284 				    } while ((res & NS_TERMINATE) &&
285 					    (ws != NULL));
286 				}
287 
288 				configuration_lock_entry(qstate->config_entry,
289 					CELT_MULTIPART);
290 				rs = open_cache_mp_read_session(c_entry);
291 				configuration_unlock_entry(qstate->config_entry,
292 					CELT_MULTIPART);
293 			}
294 		}
295 
296 		if (rs == INVALID_CACHE_MP_READ_SESSION)
297 			c_mp_rs_response->error_code = -1;
298 		else {
299 		    qstate->mdata = rs;
300 		    qstate->destroy_func = on_mp_read_session_destroy;
301 
302 		    configuration_lock_entry(qstate->config_entry,
303 			CELT_MULTIPART);
304 		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
305 		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
306 			memcpy(&qstate->timeout,
307 			    &qstate->config_entry->mp_query_timeout,
308 			    sizeof(struct timeval));
309 		    configuration_unlock_entry(qstate->config_entry,
310 			CELT_MULTIPART);
311 		}
312 	} else
313 		c_mp_rs_response->error_code = -1;
314 
315 fin:
316 	qstate->process_func = on_mp_read_session_response_write1;
317 	qstate->kevent_watermark = sizeof(int);
318 	qstate->kevent_filter = EVFILT_WRITE;
319 
320 	TRACE_OUT(on_mp_read_session_request_process);
321 	return (0);
322 }
323 
324 static int
on_mp_read_session_response_write1(struct query_state * qstate)325 on_mp_read_session_response_write1(struct query_state *qstate)
326 {
327 	struct cache_mp_read_session_response	*c_mp_rs_response;
328 	ssize_t	result;
329 
330 	TRACE_IN(on_mp_read_session_response_write1);
331 	c_mp_rs_response = get_cache_mp_read_session_response(
332 		&qstate->response);
333 	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
334 		sizeof(int));
335 
336 	if (result != sizeof(int)) {
337 		LOG_ERR_3("on_mp_read_session_response_write1",
338 			"write failed");
339 		TRACE_OUT(on_mp_read_session_response_write1);
340 		return (-1);
341 	}
342 
343 	if (c_mp_rs_response->error_code == 0) {
344 		qstate->kevent_watermark = sizeof(int);
345 		qstate->process_func = on_mp_read_session_mapper;
346 		qstate->kevent_filter = EVFILT_READ;
347 	} else {
348 		qstate->kevent_watermark = 0;
349 		qstate->process_func = NULL;
350 	}
351 	TRACE_OUT(on_mp_read_session_response_write1);
352 	return (0);
353 }
354 
355 /*
356  * Mapper function is used to avoid multiple connections for each session
357  * write or read requests. After processing the request, it does not close
358  * the connection, but waits for the next request.
359  */
360 static int
on_mp_read_session_mapper(struct query_state * qstate)361 on_mp_read_session_mapper(struct query_state *qstate)
362 {
363 	ssize_t	result;
364 	int elem_type;
365 
366 	TRACE_IN(on_mp_read_session_mapper);
367 	if (qstate->kevent_watermark == 0) {
368 		qstate->kevent_watermark = sizeof(int);
369 	} else {
370 		result = qstate->read_func(qstate, &elem_type, sizeof(int));
371 		if (result != sizeof(int)) {
372 			LOG_ERR_3("on_mp_read_session_mapper",
373 				"read failed");
374 			TRACE_OUT(on_mp_read_session_mapper);
375 			return (-1);
376 		}
377 
378 		switch (elem_type) {
379 		case CET_MP_READ_SESSION_READ_REQUEST:
380 			qstate->kevent_watermark = 0;
381 			qstate->process_func =
382 				on_mp_read_session_read_request_process;
383 			break;
384 		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
385 			qstate->kevent_watermark = 0;
386 			qstate->process_func =
387 				on_mp_read_session_close_notification;
388 			break;
389 		default:
390 			qstate->kevent_watermark = 0;
391 			qstate->process_func = NULL;
392 			LOG_ERR_3("on_mp_read_session_mapper",
393 				"unknown element type");
394 			TRACE_OUT(on_mp_read_session_mapper);
395 			return (-1);
396 		}
397 	}
398 	TRACE_OUT(on_mp_read_session_mapper);
399 	return (0);
400 }
401 
402 /*
403  * The functions below are used to process multipart read sessions read
404  * requests. User doesn't have to pass any kind of data, besides the
405  * request identificator itself. So we don't need any XXX_read functions and
406  * start with the XXX_process function.
407  * - on_mp_read_session_read_request_process processes it
408  * - on_mp_read_session_read_response_write1 and
409  *   on_mp_read_session_read_response_write2 sends the response
410  */
411 static int
on_mp_read_session_read_request_process(struct query_state * qstate)412 on_mp_read_session_read_request_process(struct query_state *qstate)
413 {
414 	struct cache_mp_read_session_read_response	*read_response;
415 
416 	TRACE_IN(on_mp_read_session_response_process);
417 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
418 	read_response = get_cache_mp_read_session_read_response(
419 		&qstate->response);
420 
421 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
422 	read_response->error_code = cache_mp_read(
423 		(cache_mp_read_session)qstate->mdata, NULL,
424 		&read_response->data_size);
425 
426 	if (read_response->error_code == 0) {
427 		read_response->data = (char *)malloc(read_response->data_size);
428 		assert(read_response != NULL);
429 		read_response->error_code = cache_mp_read(
430 			(cache_mp_read_session)qstate->mdata,
431 			read_response->data,
432 			&read_response->data_size);
433 	}
434 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
435 
436 	if (read_response->error_code == 0)
437 		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
438 	else
439 		qstate->kevent_watermark = sizeof(int);
440 	qstate->process_func = on_mp_read_session_read_response_write1;
441 	qstate->kevent_filter = EVFILT_WRITE;
442 
443 	TRACE_OUT(on_mp_read_session_response_process);
444 	return (0);
445 }
446 
447 static int
on_mp_read_session_read_response_write1(struct query_state * qstate)448 on_mp_read_session_read_response_write1(struct query_state *qstate)
449 {
450 	struct cache_mp_read_session_read_response	*read_response;
451 	ssize_t	result;
452 
453 	TRACE_IN(on_mp_read_session_read_response_write1);
454 	read_response = get_cache_mp_read_session_read_response(
455 		&qstate->response);
456 
457 	result = qstate->write_func(qstate, &read_response->error_code,
458 		sizeof(int));
459 	if (read_response->error_code == 0) {
460 		result += qstate->write_func(qstate, &read_response->data_size,
461 			sizeof(size_t));
462 		if (result != qstate->kevent_watermark) {
463 			TRACE_OUT(on_mp_read_session_read_response_write1);
464 			LOG_ERR_3("on_mp_read_session_read_response_write1",
465 				"write failed");
466 			return (-1);
467 		}
468 
469 		qstate->kevent_watermark = read_response->data_size;
470 		qstate->process_func = on_mp_read_session_read_response_write2;
471 	} else {
472 		if (result != qstate->kevent_watermark) {
473 			LOG_ERR_3("on_mp_read_session_read_response_write1",
474 				"write failed");
475 			TRACE_OUT(on_mp_read_session_read_response_write1);
476 			return (-1);
477 		}
478 
479 		qstate->kevent_watermark = 0;
480 		qstate->process_func = NULL;
481 	}
482 
483 	TRACE_OUT(on_mp_read_session_read_response_write1);
484 	return (0);
485 }
486 
487 static int
on_mp_read_session_read_response_write2(struct query_state * qstate)488 on_mp_read_session_read_response_write2(struct query_state *qstate)
489 {
490 	struct cache_mp_read_session_read_response *read_response;
491 	ssize_t	result;
492 
493 	TRACE_IN(on_mp_read_session_read_response_write2);
494 	read_response = get_cache_mp_read_session_read_response(
495 		&qstate->response);
496 	result = qstate->write_func(qstate, read_response->data,
497 		read_response->data_size);
498 	if (result != qstate->kevent_watermark) {
499 		LOG_ERR_3("on_mp_read_session_read_response_write2",
500 			"write failed");
501 		TRACE_OUT(on_mp_read_session_read_response_write2);
502 		return (-1);
503 	}
504 
505 	finalize_comm_element(&qstate->request);
506 	finalize_comm_element(&qstate->response);
507 
508 	qstate->kevent_watermark = sizeof(int);
509 	qstate->process_func = on_mp_read_session_mapper;
510 	qstate->kevent_filter = EVFILT_READ;
511 
512 	TRACE_OUT(on_mp_read_session_read_response_write2);
513 	return (0);
514 }
515 
516 /*
517  * Handles session close notification by calling close_cache_mp_read_session
518  * function.
519  */
520 static int
on_mp_read_session_close_notification(struct query_state * qstate)521 on_mp_read_session_close_notification(struct query_state *qstate)
522 {
523 
524 	TRACE_IN(on_mp_read_session_close_notification);
525 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
526 	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
527 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
528 	qstate->mdata = NULL;
529 	qstate->kevent_watermark = 0;
530 	qstate->process_func = NULL;
531 	TRACE_OUT(on_mp_read_session_close_notification);
532 	return (0);
533 }
534