xref: /freebsd/usr.sbin/nscd/mp_rs_query.c (revision 06c3fb27)
1 /*-
2  * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  */
27 
28 #include <sys/types.h>
29 #include <sys/event.h>
30 #include <sys/socket.h>
31 #include <sys/time.h>
32 
33 #include <assert.h>
34 #include <errno.h>
35 #include <nsswitch.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 
40 #include "cachelib.h"
41 #include "config.h"
42 #include "debug.h"
43 #include "log.h"
44 #include "query.h"
45 #include "mp_rs_query.h"
46 #include "mp_ws_query.h"
47 #include "singletons.h"
48 
49 static int on_mp_read_session_close_notification(struct query_state *);
50 static void on_mp_read_session_destroy(struct query_state *);
51 static int on_mp_read_session_mapper(struct query_state *);
52 /* int on_mp_read_session_request_read1(struct query_state *); */
53 static int on_mp_read_session_request_read2(struct query_state *);
54 static int on_mp_read_session_request_process(struct query_state *);
55 static int on_mp_read_session_response_write1(struct query_state *);
56 static int on_mp_read_session_read_request_process(struct query_state *);
57 static int on_mp_read_session_read_response_write1(struct query_state *);
58 static int on_mp_read_session_read_response_write2(struct query_state *);
59 
60 /*
61  * This function is used as the query_state's destroy_func to make the
62  * proper cleanup in case of errors.
63  */
64 static void
65 on_mp_read_session_destroy(struct query_state *qstate)
66 {
67 	TRACE_IN(on_mp_read_session_destroy);
68 	finalize_comm_element(&qstate->request);
69 	finalize_comm_element(&qstate->response);
70 
71 	if (qstate->mdata != NULL) {
72 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
73 		close_cache_mp_read_session(
74 	    		(cache_mp_read_session)qstate->mdata);
75 		configuration_unlock_entry(qstate->config_entry,
76 			CELT_MULTIPART);
77 	}
78 	TRACE_OUT(on_mp_read_session_destroy);
79 }
80 
81 /*
82  * The functions below are used to process multipart read session initiation
83  * requests.
84  * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
85  *   the request itself
86  * - on_mp_read_session_request_process processes it
87  * - on_mp_read_session_response_write1 sends the response
88  */
89 int
90 on_mp_read_session_request_read1(struct query_state *qstate)
91 {
92 	struct cache_mp_read_session_request	*c_mp_rs_request;
93 	ssize_t	result;
94 
95 	TRACE_IN(on_mp_read_session_request_read1);
96 	if (qstate->kevent_watermark == 0)
97 		qstate->kevent_watermark = sizeof(size_t);
98 	else {
99 		init_comm_element(&qstate->request,
100 	    		CET_MP_READ_SESSION_REQUEST);
101 		c_mp_rs_request = get_cache_mp_read_session_request(
102 	    		&qstate->request);
103 
104 		result = qstate->read_func(qstate,
105 	    		&c_mp_rs_request->entry_length, sizeof(size_t));
106 
107 		if (result != sizeof(size_t)) {
108 			TRACE_OUT(on_mp_read_session_request_read1);
109 			return (-1);
110 		}
111 
112 		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
113 			TRACE_OUT(on_mp_read_session_request_read1);
114 			return (-1);
115 		}
116 
117 		c_mp_rs_request->entry = calloc(1,
118 			c_mp_rs_request->entry_length + 1);
119 		assert(c_mp_rs_request->entry != NULL);
120 
121 		qstate->kevent_watermark = c_mp_rs_request->entry_length;
122 		qstate->process_func = on_mp_read_session_request_read2;
123 	}
124 	TRACE_OUT(on_mp_read_session_request_read1);
125 	return (0);
126 }
127 
128 static int
129 on_mp_read_session_request_read2(struct query_state *qstate)
130 {
131 	struct cache_mp_read_session_request	*c_mp_rs_request;
132 	ssize_t	result;
133 
134 	TRACE_IN(on_mp_read_session_request_read2);
135 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
136 
137 	result = qstate->read_func(qstate, c_mp_rs_request->entry,
138 		c_mp_rs_request->entry_length);
139 
140 	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
141 		LOG_ERR_3("on_mp_read_session_request_read2",
142 			"read failed");
143 		TRACE_OUT(on_mp_read_session_request_read2);
144 		return (-1);
145 	}
146 
147 	qstate->kevent_watermark = 0;
148 	qstate->process_func = on_mp_read_session_request_process;
149 	TRACE_OUT(on_mp_read_session_request_read2);
150 	return (0);
151 }
152 
153 static int
154 on_mp_read_session_request_process(struct query_state *qstate)
155 {
156 	struct cache_mp_read_session_request	*c_mp_rs_request;
157 	struct cache_mp_read_session_response	*c_mp_rs_response;
158 	cache_mp_read_session	rs;
159 	cache_entry	c_entry;
160 	char	*dec_cache_entry_name;
161 
162 	char *buffer;
163 	size_t buffer_size;
164 	cache_mp_write_session ws;
165 	struct agent	*lookup_agent;
166 	struct multipart_agent *mp_agent;
167 	void *mdata;
168 	int res;
169 
170 	TRACE_IN(on_mp_read_session_request_process);
171 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
172 	c_mp_rs_response = get_cache_mp_read_session_response(
173 		&qstate->response);
174 	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
175 
176 	qstate->config_entry = configuration_find_entry(
177 		s_configuration, c_mp_rs_request->entry);
178 	if (qstate->config_entry == NULL) {
179 		c_mp_rs_response->error_code = ENOENT;
180 
181 		LOG_ERR_2("read_session_request",
182 			"can't find configuration entry '%s'."
183 			" aborting request", c_mp_rs_request->entry);
184 		goto fin;
185 	}
186 
187 	if (qstate->config_entry->enabled == 0) {
188 		c_mp_rs_response->error_code = EACCES;
189 
190 		LOG_ERR_2("read_session_request",
191 			"configuration entry '%s' is disabled",
192 			c_mp_rs_request->entry);
193 		goto fin;
194 	}
195 
196 	if (qstate->config_entry->perform_actual_lookups != 0)
197 		dec_cache_entry_name = strdup(
198 			qstate->config_entry->mp_cache_params.cep.entry_name);
199 	else {
200 #ifdef NS_NSCD_EID_CHECKING
201 		if (check_query_eids(qstate) != 0) {
202 			c_mp_rs_response->error_code = EPERM;
203 			goto fin;
204 		}
205 #endif
206 
207 		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
208 			qstate->config_entry->mp_cache_params.cep.entry_name);
209 	}
210 
211 	assert(dec_cache_entry_name != NULL);
212 
213 	configuration_lock_rdlock(s_configuration);
214 	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
215 	configuration_unlock(s_configuration);
216 
217 	if ((c_entry == INVALID_CACHE) &&
218 	   (qstate->config_entry->perform_actual_lookups != 0))
219 		c_entry = register_new_mp_cache_entry(qstate,
220 			dec_cache_entry_name);
221 
222 	free(dec_cache_entry_name);
223 
224 	if (c_entry != INVALID_CACHE_ENTRY) {
225 		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
226 		rs = open_cache_mp_read_session(c_entry);
227 		configuration_unlock_entry(qstate->config_entry,
228 			CELT_MULTIPART);
229 
230 		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
231 		   (qstate->config_entry->perform_actual_lookups != 0)) {
232 			lookup_agent = find_agent(s_agent_table,
233 				c_mp_rs_request->entry, MULTIPART_AGENT);
234 
235 			if ((lookup_agent != NULL) &&
236 			(lookup_agent->type == MULTIPART_AGENT)) {
237 				mp_agent = (struct multipart_agent *)
238 					lookup_agent;
239 				mdata = mp_agent->mp_init_func();
240 
241 				/*
242 				 * Multipart agents read the whole snapshot
243 				 * of the data at one time.
244 				 */
245 				configuration_lock_entry(qstate->config_entry,
246 					CELT_MULTIPART);
247 				ws = open_cache_mp_write_session(c_entry);
248 				configuration_unlock_entry(qstate->config_entry,
249 					CELT_MULTIPART);
250 				if (ws != NULL) {
251 				    do {
252 					buffer = NULL;
253 					res = mp_agent->mp_lookup_func(&buffer,
254 						&buffer_size,
255 						mdata);
256 
257 					if ((res & NS_TERMINATE) &&
258 					   (buffer != NULL)) {
259 						configuration_lock_entry(
260 							qstate->config_entry,
261 						   	CELT_MULTIPART);
262 						if (cache_mp_write(ws, buffer,
263 						    buffer_size) != 0) {
264 							abandon_cache_mp_write_session(ws);
265 							ws = NULL;
266 						}
267 						configuration_unlock_entry(
268 							qstate->config_entry,
269 							CELT_MULTIPART);
270 
271 						free(buffer);
272 						buffer = NULL;
273 					} else {
274 						configuration_lock_entry(
275 							qstate->config_entry,
276 							CELT_MULTIPART);
277 						close_cache_mp_write_session(ws);
278 						configuration_unlock_entry(
279 							qstate->config_entry,
280 							CELT_MULTIPART);
281 
282 						free(buffer);
283 						buffer = NULL;
284 					}
285 				    } while ((res & NS_TERMINATE) &&
286 				    	    (ws != NULL));
287 				}
288 
289 				configuration_lock_entry(qstate->config_entry,
290 					CELT_MULTIPART);
291 				rs = open_cache_mp_read_session(c_entry);
292 				configuration_unlock_entry(qstate->config_entry,
293 					CELT_MULTIPART);
294 			}
295 		}
296 
297 		if (rs == INVALID_CACHE_MP_READ_SESSION)
298 			c_mp_rs_response->error_code = -1;
299 		else {
300 		    qstate->mdata = rs;
301 		    qstate->destroy_func = on_mp_read_session_destroy;
302 
303 		    configuration_lock_entry(qstate->config_entry,
304 			CELT_MULTIPART);
305 		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
306 		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
307 			memcpy(&qstate->timeout,
308 			    &qstate->config_entry->mp_query_timeout,
309 			    sizeof(struct timeval));
310 		    configuration_unlock_entry(qstate->config_entry,
311 			CELT_MULTIPART);
312 		}
313 	} else
314 		c_mp_rs_response->error_code = -1;
315 
316 fin:
317 	qstate->process_func = on_mp_read_session_response_write1;
318 	qstate->kevent_watermark = sizeof(int);
319 	qstate->kevent_filter = EVFILT_WRITE;
320 
321 	TRACE_OUT(on_mp_read_session_request_process);
322 	return (0);
323 }
324 
325 static int
326 on_mp_read_session_response_write1(struct query_state *qstate)
327 {
328 	struct cache_mp_read_session_response	*c_mp_rs_response;
329 	ssize_t	result;
330 
331 	TRACE_IN(on_mp_read_session_response_write1);
332 	c_mp_rs_response = get_cache_mp_read_session_response(
333 		&qstate->response);
334 	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
335 		sizeof(int));
336 
337 	if (result != sizeof(int)) {
338 		LOG_ERR_3("on_mp_read_session_response_write1",
339 			"write failed");
340 		TRACE_OUT(on_mp_read_session_response_write1);
341 		return (-1);
342 	}
343 
344 	if (c_mp_rs_response->error_code == 0) {
345 		qstate->kevent_watermark = sizeof(int);
346 		qstate->process_func = on_mp_read_session_mapper;
347 		qstate->kevent_filter = EVFILT_READ;
348 	} else {
349 		qstate->kevent_watermark = 0;
350 		qstate->process_func = NULL;
351 	}
352 	TRACE_OUT(on_mp_read_session_response_write1);
353 	return (0);
354 }
355 
356 /*
357  * Mapper function is used to avoid multiple connections for each session
358  * write or read requests. After processing the request, it does not close
359  * the connection, but waits for the next request.
360  */
361 static int
362 on_mp_read_session_mapper(struct query_state *qstate)
363 {
364 	ssize_t	result;
365 	int elem_type;
366 
367 	TRACE_IN(on_mp_read_session_mapper);
368 	if (qstate->kevent_watermark == 0) {
369 		qstate->kevent_watermark = sizeof(int);
370 	} else {
371 		result = qstate->read_func(qstate, &elem_type, sizeof(int));
372 		if (result != sizeof(int)) {
373 			LOG_ERR_3("on_mp_read_session_mapper",
374 				"read failed");
375 			TRACE_OUT(on_mp_read_session_mapper);
376 			return (-1);
377 		}
378 
379 		switch (elem_type) {
380 		case CET_MP_READ_SESSION_READ_REQUEST:
381 			qstate->kevent_watermark = 0;
382 			qstate->process_func =
383 				on_mp_read_session_read_request_process;
384 			break;
385 		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
386 			qstate->kevent_watermark = 0;
387 			qstate->process_func =
388 				on_mp_read_session_close_notification;
389 			break;
390 		default:
391 			qstate->kevent_watermark = 0;
392 			qstate->process_func = NULL;
393 			LOG_ERR_3("on_mp_read_session_mapper",
394 				"unknown element type");
395 			TRACE_OUT(on_mp_read_session_mapper);
396 			return (-1);
397 		}
398 	}
399 	TRACE_OUT(on_mp_read_session_mapper);
400 	return (0);
401 }
402 
403 /*
404  * The functions below are used to process multipart read sessions read
405  * requests. User doesn't have to pass any kind of data, besides the
406  * request identificator itself. So we don't need any XXX_read functions and
407  * start with the XXX_process function.
408  * - on_mp_read_session_read_request_process processes it
409  * - on_mp_read_session_read_response_write1 and
410  *   on_mp_read_session_read_response_write2 sends the response
411  */
412 static int
413 on_mp_read_session_read_request_process(struct query_state *qstate)
414 {
415 	struct cache_mp_read_session_read_response	*read_response;
416 
417 	TRACE_IN(on_mp_read_session_response_process);
418 	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
419 	read_response = get_cache_mp_read_session_read_response(
420 		&qstate->response);
421 
422 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
423 	read_response->error_code = cache_mp_read(
424 		(cache_mp_read_session)qstate->mdata, NULL,
425 		&read_response->data_size);
426 
427 	if (read_response->error_code == 0) {
428 		read_response->data = malloc(read_response->data_size);
429 		assert(read_response != NULL);
430 		read_response->error_code = cache_mp_read(
431 			(cache_mp_read_session)qstate->mdata,
432 	    		read_response->data,
433 			&read_response->data_size);
434 	}
435 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
436 
437 	if (read_response->error_code == 0)
438 		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
439 	else
440 		qstate->kevent_watermark = sizeof(int);
441 	qstate->process_func = on_mp_read_session_read_response_write1;
442 	qstate->kevent_filter = EVFILT_WRITE;
443 
444 	TRACE_OUT(on_mp_read_session_response_process);
445 	return (0);
446 }
447 
448 static int
449 on_mp_read_session_read_response_write1(struct query_state *qstate)
450 {
451 	struct cache_mp_read_session_read_response	*read_response;
452 	ssize_t	result;
453 
454 	TRACE_IN(on_mp_read_session_read_response_write1);
455 	read_response = get_cache_mp_read_session_read_response(
456 		&qstate->response);
457 
458 	result = qstate->write_func(qstate, &read_response->error_code,
459 		sizeof(int));
460 	if (read_response->error_code == 0) {
461 		result += qstate->write_func(qstate, &read_response->data_size,
462 			sizeof(size_t));
463 		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
464 			TRACE_OUT(on_mp_read_session_read_response_write1);
465 			LOG_ERR_3("on_mp_read_session_read_response_write1",
466 				"write failed");
467 			return (-1);
468 		}
469 
470 		qstate->kevent_watermark = read_response->data_size;
471 		qstate->process_func = on_mp_read_session_read_response_write2;
472 	} else {
473 		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
474 			LOG_ERR_3("on_mp_read_session_read_response_write1",
475 				"write failed");
476 			TRACE_OUT(on_mp_read_session_read_response_write1);
477 			return (-1);
478 		}
479 
480 		qstate->kevent_watermark = 0;
481 		qstate->process_func = NULL;
482 	}
483 
484 	TRACE_OUT(on_mp_read_session_read_response_write1);
485 	return (0);
486 }
487 
488 static int
489 on_mp_read_session_read_response_write2(struct query_state *qstate)
490 {
491 	struct cache_mp_read_session_read_response *read_response;
492 	ssize_t	result;
493 
494 	TRACE_IN(on_mp_read_session_read_response_write2);
495 	read_response = get_cache_mp_read_session_read_response(
496 		&qstate->response);
497 	result = qstate->write_func(qstate, read_response->data,
498 		read_response->data_size);
499 	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
500 		LOG_ERR_3("on_mp_read_session_read_response_write2",
501 			"write failed");
502 		TRACE_OUT(on_mp_read_session_read_response_write2);
503 		return (-1);
504 	}
505 
506 	finalize_comm_element(&qstate->request);
507 	finalize_comm_element(&qstate->response);
508 
509 	qstate->kevent_watermark = sizeof(int);
510 	qstate->process_func = on_mp_read_session_mapper;
511 	qstate->kevent_filter = EVFILT_READ;
512 
513 	TRACE_OUT(on_mp_read_session_read_response_write2);
514 	return (0);
515 }
516 
517 /*
518  * Handles session close notification by calling close_cache_mp_read_session
519  * function.
520  */
521 static int
522 on_mp_read_session_close_notification(struct query_state *qstate)
523 {
524 
525 	TRACE_IN(on_mp_read_session_close_notification);
526 	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
527 	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
528 	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
529 	qstate->mdata = NULL;
530 	qstate->kevent_watermark = 0;
531 	qstate->process_func = NULL;
532 	TRACE_OUT(on_mp_read_session_close_notification);
533 	return (0);
534 }
535