1 /*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 * $FreeBSD: src/usr.sbin/nscd/mp_rs_query.c,v 1.4 2008/10/12 00:44:27 delphij Exp $
27 */
28
29 #include <sys/socket.h>
30 #include <sys/time.h>
31 #include <sys/types.h>
32 #include <sys/event.h>
33 #include <assert.h>
34 #include <errno.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38
39 #include "cachelib.h"
40 #include "config.h"
41 #include "debug.h"
42 #include "log.h"
43 #include "query.h"
44 #include "mp_rs_query.h"
45 #include "mp_ws_query.h"
46 #include "singletons.h"
47
48 static int on_mp_read_session_close_notification(struct query_state *);
49 static void on_mp_read_session_destroy(struct query_state *);
50 static int on_mp_read_session_mapper(struct query_state *);
51 /* int on_mp_read_session_request_read1(struct query_state *); */
52 static int on_mp_read_session_request_read2(struct query_state *);
53 static int on_mp_read_session_request_process(struct query_state *);
54 static int on_mp_read_session_response_write1(struct query_state *);
55 static int on_mp_read_session_read_request_process(struct query_state *);
56 static int on_mp_read_session_read_response_write1(struct query_state *);
57 static int on_mp_read_session_read_response_write2(struct query_state *);
58
59 /*
60 * This function is used as the query_state's destroy_func to make the
61 * proper cleanup in case of errors.
62 */
63 static void
on_mp_read_session_destroy(struct query_state * qstate)64 on_mp_read_session_destroy(struct query_state *qstate)
65 {
66 TRACE_IN(on_mp_read_session_destroy);
67 finalize_comm_element(&qstate->request);
68 finalize_comm_element(&qstate->response);
69
70 if (qstate->mdata != NULL) {
71 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
72 close_cache_mp_read_session(
73 (cache_mp_read_session)qstate->mdata);
74 configuration_unlock_entry(qstate->config_entry,
75 CELT_MULTIPART);
76 }
77 TRACE_OUT(on_mp_read_session_destroy);
78 }
79
80 /*
81 * The functions below are used to process multipart read session initiation
82 * requests.
83 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
84 * the request itself
85 * - on_mp_read_session_request_process processes it
86 * - on_mp_read_session_response_write1 sends the response
87 */
88 int
on_mp_read_session_request_read1(struct query_state * qstate)89 on_mp_read_session_request_read1(struct query_state *qstate)
90 {
91 struct cache_mp_read_session_request *c_mp_rs_request;
92 ssize_t result;
93
94 TRACE_IN(on_mp_read_session_request_read1);
95 if (qstate->kevent_watermark == 0)
96 qstate->kevent_watermark = sizeof(size_t);
97 else {
98 init_comm_element(&qstate->request,
99 CET_MP_READ_SESSION_REQUEST);
100 c_mp_rs_request = get_cache_mp_read_session_request(
101 &qstate->request);
102
103 result = qstate->read_func(qstate,
104 &c_mp_rs_request->entry_length, sizeof(size_t));
105
106 if (result != sizeof(size_t)) {
107 TRACE_OUT(on_mp_read_session_request_read1);
108 return (-1);
109 }
110
111 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
112 TRACE_OUT(on_mp_read_session_request_read1);
113 return (-1);
114 }
115
116 c_mp_rs_request->entry = (char *)calloc(1,
117 c_mp_rs_request->entry_length + 1);
118 assert(c_mp_rs_request->entry != NULL);
119
120 qstate->kevent_watermark = c_mp_rs_request->entry_length;
121 qstate->process_func = on_mp_read_session_request_read2;
122 }
123 TRACE_OUT(on_mp_read_session_request_read1);
124 return (0);
125 }
126
127 static int
on_mp_read_session_request_read2(struct query_state * qstate)128 on_mp_read_session_request_read2(struct query_state *qstate)
129 {
130 struct cache_mp_read_session_request *c_mp_rs_request;
131 ssize_t result;
132
133 TRACE_IN(on_mp_read_session_request_read2);
134 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
135
136 result = qstate->read_func(qstate, c_mp_rs_request->entry,
137 c_mp_rs_request->entry_length);
138
139 if (result != qstate->kevent_watermark) {
140 LOG_ERR_3("on_mp_read_session_request_read2",
141 "read failed");
142 TRACE_OUT(on_mp_read_session_request_read2);
143 return (-1);
144 }
145
146 qstate->kevent_watermark = 0;
147 qstate->process_func = on_mp_read_session_request_process;
148 TRACE_OUT(on_mp_read_session_request_read2);
149 return (0);
150 }
151
152 static int
on_mp_read_session_request_process(struct query_state * qstate)153 on_mp_read_session_request_process(struct query_state *qstate)
154 {
155 struct cache_mp_read_session_request *c_mp_rs_request;
156 struct cache_mp_read_session_response *c_mp_rs_response;
157 cache_mp_read_session rs;
158 cache_entry c_entry;
159 char *dec_cache_entry_name;
160
161 char *buffer;
162 size_t buffer_size;
163 cache_mp_write_session ws;
164 struct agent *lookup_agent;
165 struct multipart_agent *mp_agent;
166 void *mdata;
167 int res;
168
169 TRACE_IN(on_mp_read_session_request_process);
170 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
171 c_mp_rs_response = get_cache_mp_read_session_response(
172 &qstate->response);
173 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
174
175 qstate->config_entry = configuration_find_entry(
176 s_configuration, c_mp_rs_request->entry);
177 if (qstate->config_entry == NULL) {
178 c_mp_rs_response->error_code = ENOENT;
179
180 LOG_ERR_2("read_session_request",
181 "can't find configuration entry '%s'."
182 " aborting request", c_mp_rs_request->entry);
183 goto fin;
184 }
185
186 if (qstate->config_entry->enabled == 0) {
187 c_mp_rs_response->error_code = EACCES;
188
189 LOG_ERR_2("read_session_request",
190 "configuration entry '%s' is disabled",
191 c_mp_rs_request->entry);
192 goto fin;
193 }
194
195 if (qstate->config_entry->perform_actual_lookups != 0)
196 dec_cache_entry_name = strdup(
197 qstate->config_entry->mp_cache_params.entry_name);
198 else {
199 #ifdef NS_NSCD_EID_CHECKING
200 if (check_query_eids(qstate) != 0) {
201 c_mp_rs_response->error_code = EPERM;
202 goto fin;
203 }
204 #endif
205
206 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
207 qstate->config_entry->mp_cache_params.entry_name);
208 }
209
210 assert(dec_cache_entry_name != NULL);
211
212 configuration_lock_rdlock(s_configuration);
213 c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
214 configuration_unlock(s_configuration);
215
216 if ((c_entry == INVALID_CACHE) &&
217 (qstate->config_entry->perform_actual_lookups != 0))
218 c_entry = register_new_mp_cache_entry(qstate,
219 dec_cache_entry_name);
220
221 free(dec_cache_entry_name);
222
223 if (c_entry != INVALID_CACHE_ENTRY) {
224 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
225 rs = open_cache_mp_read_session(c_entry);
226 configuration_unlock_entry(qstate->config_entry,
227 CELT_MULTIPART);
228
229 if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
230 (qstate->config_entry->perform_actual_lookups != 0)) {
231 lookup_agent = find_agent(s_agent_table,
232 c_mp_rs_request->entry, MULTIPART_AGENT);
233
234 if ((lookup_agent != NULL) &&
235 (lookup_agent->type == MULTIPART_AGENT)) {
236 mp_agent = (struct multipart_agent *)
237 lookup_agent;
238 mdata = mp_agent->mp_init_func();
239
240 /*
241 * Multipart agents read the whole snapshot
242 * of the data at one time.
243 */
244 configuration_lock_entry(qstate->config_entry,
245 CELT_MULTIPART);
246 ws = open_cache_mp_write_session(c_entry);
247 configuration_unlock_entry(qstate->config_entry,
248 CELT_MULTIPART);
249 if (ws != NULL) {
250 do {
251 buffer = NULL;
252 res = mp_agent->mp_lookup_func(&buffer,
253 &buffer_size,
254 mdata);
255
256 if ((res & NS_TERMINATE) &&
257 (buffer != NULL)) {
258 configuration_lock_entry(
259 qstate->config_entry,
260 CELT_MULTIPART);
261 if (cache_mp_write(ws, buffer,
262 buffer_size) != 0) {
263 abandon_cache_mp_write_session(ws);
264 ws = NULL;
265 }
266 configuration_unlock_entry(
267 qstate->config_entry,
268 CELT_MULTIPART);
269
270 free(buffer);
271 buffer = NULL;
272 } else {
273 configuration_lock_entry(
274 qstate->config_entry,
275 CELT_MULTIPART);
276 close_cache_mp_write_session(ws);
277 configuration_unlock_entry(
278 qstate->config_entry,
279 CELT_MULTIPART);
280
281 free(buffer);
282 buffer = NULL;
283 }
284 } while ((res & NS_TERMINATE) &&
285 (ws != NULL));
286 }
287
288 configuration_lock_entry(qstate->config_entry,
289 CELT_MULTIPART);
290 rs = open_cache_mp_read_session(c_entry);
291 configuration_unlock_entry(qstate->config_entry,
292 CELT_MULTIPART);
293 }
294 }
295
296 if (rs == INVALID_CACHE_MP_READ_SESSION)
297 c_mp_rs_response->error_code = -1;
298 else {
299 qstate->mdata = rs;
300 qstate->destroy_func = on_mp_read_session_destroy;
301
302 configuration_lock_entry(qstate->config_entry,
303 CELT_MULTIPART);
304 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
305 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
306 memcpy(&qstate->timeout,
307 &qstate->config_entry->mp_query_timeout,
308 sizeof(struct timeval));
309 configuration_unlock_entry(qstate->config_entry,
310 CELT_MULTIPART);
311 }
312 } else
313 c_mp_rs_response->error_code = -1;
314
315 fin:
316 qstate->process_func = on_mp_read_session_response_write1;
317 qstate->kevent_watermark = sizeof(int);
318 qstate->kevent_filter = EVFILT_WRITE;
319
320 TRACE_OUT(on_mp_read_session_request_process);
321 return (0);
322 }
323
324 static int
on_mp_read_session_response_write1(struct query_state * qstate)325 on_mp_read_session_response_write1(struct query_state *qstate)
326 {
327 struct cache_mp_read_session_response *c_mp_rs_response;
328 ssize_t result;
329
330 TRACE_IN(on_mp_read_session_response_write1);
331 c_mp_rs_response = get_cache_mp_read_session_response(
332 &qstate->response);
333 result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
334 sizeof(int));
335
336 if (result != sizeof(int)) {
337 LOG_ERR_3("on_mp_read_session_response_write1",
338 "write failed");
339 TRACE_OUT(on_mp_read_session_response_write1);
340 return (-1);
341 }
342
343 if (c_mp_rs_response->error_code == 0) {
344 qstate->kevent_watermark = sizeof(int);
345 qstate->process_func = on_mp_read_session_mapper;
346 qstate->kevent_filter = EVFILT_READ;
347 } else {
348 qstate->kevent_watermark = 0;
349 qstate->process_func = NULL;
350 }
351 TRACE_OUT(on_mp_read_session_response_write1);
352 return (0);
353 }
354
355 /*
356 * Mapper function is used to avoid multiple connections for each session
357 * write or read requests. After processing the request, it does not close
358 * the connection, but waits for the next request.
359 */
360 static int
on_mp_read_session_mapper(struct query_state * qstate)361 on_mp_read_session_mapper(struct query_state *qstate)
362 {
363 ssize_t result;
364 int elem_type;
365
366 TRACE_IN(on_mp_read_session_mapper);
367 if (qstate->kevent_watermark == 0) {
368 qstate->kevent_watermark = sizeof(int);
369 } else {
370 result = qstate->read_func(qstate, &elem_type, sizeof(int));
371 if (result != sizeof(int)) {
372 LOG_ERR_3("on_mp_read_session_mapper",
373 "read failed");
374 TRACE_OUT(on_mp_read_session_mapper);
375 return (-1);
376 }
377
378 switch (elem_type) {
379 case CET_MP_READ_SESSION_READ_REQUEST:
380 qstate->kevent_watermark = 0;
381 qstate->process_func =
382 on_mp_read_session_read_request_process;
383 break;
384 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
385 qstate->kevent_watermark = 0;
386 qstate->process_func =
387 on_mp_read_session_close_notification;
388 break;
389 default:
390 qstate->kevent_watermark = 0;
391 qstate->process_func = NULL;
392 LOG_ERR_3("on_mp_read_session_mapper",
393 "unknown element type");
394 TRACE_OUT(on_mp_read_session_mapper);
395 return (-1);
396 }
397 }
398 TRACE_OUT(on_mp_read_session_mapper);
399 return (0);
400 }
401
402 /*
403 * The functions below are used to process multipart read sessions read
404 * requests. User doesn't have to pass any kind of data, besides the
405 * request identificator itself. So we don't need any XXX_read functions and
406 * start with the XXX_process function.
407 * - on_mp_read_session_read_request_process processes it
408 * - on_mp_read_session_read_response_write1 and
409 * on_mp_read_session_read_response_write2 sends the response
410 */
411 static int
on_mp_read_session_read_request_process(struct query_state * qstate)412 on_mp_read_session_read_request_process(struct query_state *qstate)
413 {
414 struct cache_mp_read_session_read_response *read_response;
415
416 TRACE_IN(on_mp_read_session_response_process);
417 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
418 read_response = get_cache_mp_read_session_read_response(
419 &qstate->response);
420
421 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
422 read_response->error_code = cache_mp_read(
423 (cache_mp_read_session)qstate->mdata, NULL,
424 &read_response->data_size);
425
426 if (read_response->error_code == 0) {
427 read_response->data = (char *)malloc(read_response->data_size);
428 assert(read_response != NULL);
429 read_response->error_code = cache_mp_read(
430 (cache_mp_read_session)qstate->mdata,
431 read_response->data,
432 &read_response->data_size);
433 }
434 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
435
436 if (read_response->error_code == 0)
437 qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
438 else
439 qstate->kevent_watermark = sizeof(int);
440 qstate->process_func = on_mp_read_session_read_response_write1;
441 qstate->kevent_filter = EVFILT_WRITE;
442
443 TRACE_OUT(on_mp_read_session_response_process);
444 return (0);
445 }
446
447 static int
on_mp_read_session_read_response_write1(struct query_state * qstate)448 on_mp_read_session_read_response_write1(struct query_state *qstate)
449 {
450 struct cache_mp_read_session_read_response *read_response;
451 ssize_t result;
452
453 TRACE_IN(on_mp_read_session_read_response_write1);
454 read_response = get_cache_mp_read_session_read_response(
455 &qstate->response);
456
457 result = qstate->write_func(qstate, &read_response->error_code,
458 sizeof(int));
459 if (read_response->error_code == 0) {
460 result += qstate->write_func(qstate, &read_response->data_size,
461 sizeof(size_t));
462 if (result != qstate->kevent_watermark) {
463 TRACE_OUT(on_mp_read_session_read_response_write1);
464 LOG_ERR_3("on_mp_read_session_read_response_write1",
465 "write failed");
466 return (-1);
467 }
468
469 qstate->kevent_watermark = read_response->data_size;
470 qstate->process_func = on_mp_read_session_read_response_write2;
471 } else {
472 if (result != qstate->kevent_watermark) {
473 LOG_ERR_3("on_mp_read_session_read_response_write1",
474 "write failed");
475 TRACE_OUT(on_mp_read_session_read_response_write1);
476 return (-1);
477 }
478
479 qstate->kevent_watermark = 0;
480 qstate->process_func = NULL;
481 }
482
483 TRACE_OUT(on_mp_read_session_read_response_write1);
484 return (0);
485 }
486
487 static int
on_mp_read_session_read_response_write2(struct query_state * qstate)488 on_mp_read_session_read_response_write2(struct query_state *qstate)
489 {
490 struct cache_mp_read_session_read_response *read_response;
491 ssize_t result;
492
493 TRACE_IN(on_mp_read_session_read_response_write2);
494 read_response = get_cache_mp_read_session_read_response(
495 &qstate->response);
496 result = qstate->write_func(qstate, read_response->data,
497 read_response->data_size);
498 if (result != qstate->kevent_watermark) {
499 LOG_ERR_3("on_mp_read_session_read_response_write2",
500 "write failed");
501 TRACE_OUT(on_mp_read_session_read_response_write2);
502 return (-1);
503 }
504
505 finalize_comm_element(&qstate->request);
506 finalize_comm_element(&qstate->response);
507
508 qstate->kevent_watermark = sizeof(int);
509 qstate->process_func = on_mp_read_session_mapper;
510 qstate->kevent_filter = EVFILT_READ;
511
512 TRACE_OUT(on_mp_read_session_read_response_write2);
513 return (0);
514 }
515
516 /*
517 * Handles session close notification by calling close_cache_mp_read_session
518 * function.
519 */
520 static int
on_mp_read_session_close_notification(struct query_state * qstate)521 on_mp_read_session_close_notification(struct query_state *qstate)
522 {
523
524 TRACE_IN(on_mp_read_session_close_notification);
525 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
526 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
527 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
528 qstate->mdata = NULL;
529 qstate->kevent_watermark = 0;
530 qstate->process_func = NULL;
531 TRACE_OUT(on_mp_read_session_close_notification);
532 return (0);
533 }
534