1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: src/usr.sbin/nscd/mp_rs_query.c,v 1.4 2008/10/12 00:44:27 delphij Exp $ 27 */ 28 29 #include <sys/socket.h> 30 #include <sys/time.h> 31 #include <sys/types.h> 32 #include <sys/event.h> 33 #include <assert.h> 34 #include <errno.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <stdio.h> 38 39 #include "cachelib.h" 40 #include "config.h" 41 #include "debug.h" 42 #include "log.h" 43 #include "query.h" 44 #include "mp_rs_query.h" 45 #include "mp_ws_query.h" 46 #include "singletons.h" 47 48 static int on_mp_read_session_close_notification(struct query_state *); 49 static void on_mp_read_session_destroy(struct query_state *); 50 static int on_mp_read_session_mapper(struct query_state *); 51 /* int on_mp_read_session_request_read1(struct query_state *); */ 52 static int on_mp_read_session_request_read2(struct query_state *); 53 static int on_mp_read_session_request_process(struct query_state *); 54 static int on_mp_read_session_response_write1(struct query_state *); 55 static int on_mp_read_session_read_request_process(struct query_state *); 56 static int on_mp_read_session_read_response_write1(struct query_state *); 57 static int on_mp_read_session_read_response_write2(struct query_state *); 58 59 /* 60 * This function is used as the query_state's destroy_func to make the 61 * proper cleanup in case of errors. 62 */ 63 static void 64 on_mp_read_session_destroy(struct query_state *qstate) 65 { 66 TRACE_IN(on_mp_read_session_destroy); 67 finalize_comm_element(&qstate->request); 68 finalize_comm_element(&qstate->response); 69 70 if (qstate->mdata != NULL) { 71 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 72 close_cache_mp_read_session( 73 (cache_mp_read_session)qstate->mdata); 74 configuration_unlock_entry(qstate->config_entry, 75 CELT_MULTIPART); 76 } 77 TRACE_OUT(on_mp_read_session_destroy); 78 } 79 80 /* 81 * The functions below are used to process multipart read session initiation 82 * requests. 83 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 84 * the request itself 85 * - on_mp_read_session_request_process processes it 86 * - on_mp_read_session_response_write1 sends the response 87 */ 88 int 89 on_mp_read_session_request_read1(struct query_state *qstate) 90 { 91 struct cache_mp_read_session_request *c_mp_rs_request; 92 ssize_t result; 93 94 TRACE_IN(on_mp_read_session_request_read1); 95 if (qstate->kevent_watermark == 0) 96 qstate->kevent_watermark = sizeof(size_t); 97 else { 98 init_comm_element(&qstate->request, 99 CET_MP_READ_SESSION_REQUEST); 100 c_mp_rs_request = get_cache_mp_read_session_request( 101 &qstate->request); 102 103 result = qstate->read_func(qstate, 104 &c_mp_rs_request->entry_length, sizeof(size_t)); 105 106 if (result != sizeof(size_t)) { 107 TRACE_OUT(on_mp_read_session_request_read1); 108 return (-1); 109 } 110 111 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 112 TRACE_OUT(on_mp_read_session_request_read1); 113 return (-1); 114 } 115 116 c_mp_rs_request->entry = (char *)calloc(1, 117 c_mp_rs_request->entry_length + 1); 118 assert(c_mp_rs_request->entry != NULL); 119 120 qstate->kevent_watermark = c_mp_rs_request->entry_length; 121 qstate->process_func = on_mp_read_session_request_read2; 122 } 123 TRACE_OUT(on_mp_read_session_request_read1); 124 return (0); 125 } 126 127 static int 128 on_mp_read_session_request_read2(struct query_state *qstate) 129 { 130 struct cache_mp_read_session_request *c_mp_rs_request; 131 ssize_t result; 132 133 TRACE_IN(on_mp_read_session_request_read2); 134 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 135 136 result = qstate->read_func(qstate, c_mp_rs_request->entry, 137 c_mp_rs_request->entry_length); 138 139 if (result != qstate->kevent_watermark) { 140 LOG_ERR_3("on_mp_read_session_request_read2", 141 "read failed"); 142 TRACE_OUT(on_mp_read_session_request_read2); 143 return (-1); 144 } 145 146 qstate->kevent_watermark = 0; 147 qstate->process_func = on_mp_read_session_request_process; 148 TRACE_OUT(on_mp_read_session_request_read2); 149 return (0); 150 } 151 152 static int 153 on_mp_read_session_request_process(struct query_state *qstate) 154 { 155 struct cache_mp_read_session_request *c_mp_rs_request; 156 struct cache_mp_read_session_response *c_mp_rs_response; 157 cache_mp_read_session rs; 158 cache_entry c_entry; 159 char *dec_cache_entry_name; 160 161 char *buffer; 162 size_t buffer_size; 163 cache_mp_write_session ws; 164 struct agent *lookup_agent; 165 struct multipart_agent *mp_agent; 166 void *mdata; 167 int res; 168 169 TRACE_IN(on_mp_read_session_request_process); 170 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 171 c_mp_rs_response = get_cache_mp_read_session_response( 172 &qstate->response); 173 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 174 175 qstate->config_entry = configuration_find_entry( 176 s_configuration, c_mp_rs_request->entry); 177 if (qstate->config_entry == NULL) { 178 c_mp_rs_response->error_code = ENOENT; 179 180 LOG_ERR_2("read_session_request", 181 "can't find configuration entry '%s'." 182 " aborting request", c_mp_rs_request->entry); 183 goto fin; 184 } 185 186 if (qstate->config_entry->enabled == 0) { 187 c_mp_rs_response->error_code = EACCES; 188 189 LOG_ERR_2("read_session_request", 190 "configuration entry '%s' is disabled", 191 c_mp_rs_request->entry); 192 goto fin; 193 } 194 195 if (qstate->config_entry->perform_actual_lookups != 0) 196 dec_cache_entry_name = strdup( 197 qstate->config_entry->mp_cache_params.entry_name); 198 else { 199 #ifdef NS_NSCD_EID_CHECKING 200 if (check_query_eids(qstate) != 0) { 201 c_mp_rs_response->error_code = EPERM; 202 goto fin; 203 } 204 #endif 205 206 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 207 qstate->config_entry->mp_cache_params.entry_name); 208 } 209 210 assert(dec_cache_entry_name != NULL); 211 212 configuration_lock_rdlock(s_configuration); 213 c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 214 configuration_unlock(s_configuration); 215 216 if ((c_entry == INVALID_CACHE) && 217 (qstate->config_entry->perform_actual_lookups != 0)) 218 c_entry = register_new_mp_cache_entry(qstate, 219 dec_cache_entry_name); 220 221 free(dec_cache_entry_name); 222 223 if (c_entry != INVALID_CACHE_ENTRY) { 224 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 225 rs = open_cache_mp_read_session(c_entry); 226 configuration_unlock_entry(qstate->config_entry, 227 CELT_MULTIPART); 228 229 if ((rs == INVALID_CACHE_MP_READ_SESSION) && 230 (qstate->config_entry->perform_actual_lookups != 0)) { 231 lookup_agent = find_agent(s_agent_table, 232 c_mp_rs_request->entry, MULTIPART_AGENT); 233 234 if ((lookup_agent != NULL) && 235 (lookup_agent->type == MULTIPART_AGENT)) { 236 mp_agent = (struct multipart_agent *) 237 lookup_agent; 238 mdata = mp_agent->mp_init_func(); 239 240 /* 241 * Multipart agents read the whole snapshot 242 * of the data at one time. 243 */ 244 configuration_lock_entry(qstate->config_entry, 245 CELT_MULTIPART); 246 ws = open_cache_mp_write_session(c_entry); 247 configuration_unlock_entry(qstate->config_entry, 248 CELT_MULTIPART); 249 if (ws != NULL) { 250 do { 251 buffer = NULL; 252 res = mp_agent->mp_lookup_func(&buffer, 253 &buffer_size, 254 mdata); 255 256 if ((res & NS_TERMINATE) && 257 (buffer != NULL)) { 258 configuration_lock_entry( 259 qstate->config_entry, 260 CELT_MULTIPART); 261 if (cache_mp_write(ws, buffer, 262 buffer_size) != 0) { 263 abandon_cache_mp_write_session(ws); 264 ws = NULL; 265 } 266 configuration_unlock_entry( 267 qstate->config_entry, 268 CELT_MULTIPART); 269 270 free(buffer); 271 buffer = NULL; 272 } else { 273 configuration_lock_entry( 274 qstate->config_entry, 275 CELT_MULTIPART); 276 close_cache_mp_write_session(ws); 277 configuration_unlock_entry( 278 qstate->config_entry, 279 CELT_MULTIPART); 280 281 free(buffer); 282 buffer = NULL; 283 } 284 } while ((res & NS_TERMINATE) && 285 (ws != NULL)); 286 } 287 288 configuration_lock_entry(qstate->config_entry, 289 CELT_MULTIPART); 290 rs = open_cache_mp_read_session(c_entry); 291 configuration_unlock_entry(qstate->config_entry, 292 CELT_MULTIPART); 293 } 294 } 295 296 if (rs == INVALID_CACHE_MP_READ_SESSION) 297 c_mp_rs_response->error_code = -1; 298 else { 299 qstate->mdata = rs; 300 qstate->destroy_func = on_mp_read_session_destroy; 301 302 configuration_lock_entry(qstate->config_entry, 303 CELT_MULTIPART); 304 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 305 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 306 memcpy(&qstate->timeout, 307 &qstate->config_entry->mp_query_timeout, 308 sizeof(struct timeval)); 309 configuration_unlock_entry(qstate->config_entry, 310 CELT_MULTIPART); 311 } 312 } else 313 c_mp_rs_response->error_code = -1; 314 315 fin: 316 qstate->process_func = on_mp_read_session_response_write1; 317 qstate->kevent_watermark = sizeof(int); 318 qstate->kevent_filter = EVFILT_WRITE; 319 320 TRACE_OUT(on_mp_read_session_request_process); 321 return (0); 322 } 323 324 static int 325 on_mp_read_session_response_write1(struct query_state *qstate) 326 { 327 struct cache_mp_read_session_response *c_mp_rs_response; 328 ssize_t result; 329 330 TRACE_IN(on_mp_read_session_response_write1); 331 c_mp_rs_response = get_cache_mp_read_session_response( 332 &qstate->response); 333 result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 334 sizeof(int)); 335 336 if (result != sizeof(int)) { 337 LOG_ERR_3("on_mp_read_session_response_write1", 338 "write failed"); 339 TRACE_OUT(on_mp_read_session_response_write1); 340 return (-1); 341 } 342 343 if (c_mp_rs_response->error_code == 0) { 344 qstate->kevent_watermark = sizeof(int); 345 qstate->process_func = on_mp_read_session_mapper; 346 qstate->kevent_filter = EVFILT_READ; 347 } else { 348 qstate->kevent_watermark = 0; 349 qstate->process_func = NULL; 350 } 351 TRACE_OUT(on_mp_read_session_response_write1); 352 return (0); 353 } 354 355 /* 356 * Mapper function is used to avoid multiple connections for each session 357 * write or read requests. After processing the request, it does not close 358 * the connection, but waits for the next request. 359 */ 360 static int 361 on_mp_read_session_mapper(struct query_state *qstate) 362 { 363 ssize_t result; 364 int elem_type; 365 366 TRACE_IN(on_mp_read_session_mapper); 367 if (qstate->kevent_watermark == 0) { 368 qstate->kevent_watermark = sizeof(int); 369 } else { 370 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 371 if (result != sizeof(int)) { 372 LOG_ERR_3("on_mp_read_session_mapper", 373 "read failed"); 374 TRACE_OUT(on_mp_read_session_mapper); 375 return (-1); 376 } 377 378 switch (elem_type) { 379 case CET_MP_READ_SESSION_READ_REQUEST: 380 qstate->kevent_watermark = 0; 381 qstate->process_func = 382 on_mp_read_session_read_request_process; 383 break; 384 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 385 qstate->kevent_watermark = 0; 386 qstate->process_func = 387 on_mp_read_session_close_notification; 388 break; 389 default: 390 qstate->kevent_watermark = 0; 391 qstate->process_func = NULL; 392 LOG_ERR_3("on_mp_read_session_mapper", 393 "unknown element type"); 394 TRACE_OUT(on_mp_read_session_mapper); 395 return (-1); 396 } 397 } 398 TRACE_OUT(on_mp_read_session_mapper); 399 return (0); 400 } 401 402 /* 403 * The functions below are used to process multipart read sessions read 404 * requests. User doesn't have to pass any kind of data, besides the 405 * request identificator itself. So we don't need any XXX_read functions and 406 * start with the XXX_process function. 407 * - on_mp_read_session_read_request_process processes it 408 * - on_mp_read_session_read_response_write1 and 409 * on_mp_read_session_read_response_write2 sends the response 410 */ 411 static int 412 on_mp_read_session_read_request_process(struct query_state *qstate) 413 { 414 struct cache_mp_read_session_read_response *read_response; 415 416 TRACE_IN(on_mp_read_session_response_process); 417 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 418 read_response = get_cache_mp_read_session_read_response( 419 &qstate->response); 420 421 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 422 read_response->error_code = cache_mp_read( 423 (cache_mp_read_session)qstate->mdata, NULL, 424 &read_response->data_size); 425 426 if (read_response->error_code == 0) { 427 read_response->data = (char *)malloc(read_response->data_size); 428 assert(read_response != NULL); 429 read_response->error_code = cache_mp_read( 430 (cache_mp_read_session)qstate->mdata, 431 read_response->data, 432 &read_response->data_size); 433 } 434 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 435 436 if (read_response->error_code == 0) 437 qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 438 else 439 qstate->kevent_watermark = sizeof(int); 440 qstate->process_func = on_mp_read_session_read_response_write1; 441 qstate->kevent_filter = EVFILT_WRITE; 442 443 TRACE_OUT(on_mp_read_session_response_process); 444 return (0); 445 } 446 447 static int 448 on_mp_read_session_read_response_write1(struct query_state *qstate) 449 { 450 struct cache_mp_read_session_read_response *read_response; 451 ssize_t result; 452 453 TRACE_IN(on_mp_read_session_read_response_write1); 454 read_response = get_cache_mp_read_session_read_response( 455 &qstate->response); 456 457 result = qstate->write_func(qstate, &read_response->error_code, 458 sizeof(int)); 459 if (read_response->error_code == 0) { 460 result += qstate->write_func(qstate, &read_response->data_size, 461 sizeof(size_t)); 462 if (result != qstate->kevent_watermark) { 463 TRACE_OUT(on_mp_read_session_read_response_write1); 464 LOG_ERR_3("on_mp_read_session_read_response_write1", 465 "write failed"); 466 return (-1); 467 } 468 469 qstate->kevent_watermark = read_response->data_size; 470 qstate->process_func = on_mp_read_session_read_response_write2; 471 } else { 472 if (result != qstate->kevent_watermark) { 473 LOG_ERR_3("on_mp_read_session_read_response_write1", 474 "write failed"); 475 TRACE_OUT(on_mp_read_session_read_response_write1); 476 return (-1); 477 } 478 479 qstate->kevent_watermark = 0; 480 qstate->process_func = NULL; 481 } 482 483 TRACE_OUT(on_mp_read_session_read_response_write1); 484 return (0); 485 } 486 487 static int 488 on_mp_read_session_read_response_write2(struct query_state *qstate) 489 { 490 struct cache_mp_read_session_read_response *read_response; 491 ssize_t result; 492 493 TRACE_IN(on_mp_read_session_read_response_write2); 494 read_response = get_cache_mp_read_session_read_response( 495 &qstate->response); 496 result = qstate->write_func(qstate, read_response->data, 497 read_response->data_size); 498 if (result != qstate->kevent_watermark) { 499 LOG_ERR_3("on_mp_read_session_read_response_write2", 500 "write failed"); 501 TRACE_OUT(on_mp_read_session_read_response_write2); 502 return (-1); 503 } 504 505 finalize_comm_element(&qstate->request); 506 finalize_comm_element(&qstate->response); 507 508 qstate->kevent_watermark = sizeof(int); 509 qstate->process_func = on_mp_read_session_mapper; 510 qstate->kevent_filter = EVFILT_READ; 511 512 TRACE_OUT(on_mp_read_session_read_response_write2); 513 return (0); 514 } 515 516 /* 517 * Handles session close notification by calling close_cache_mp_read_session 518 * function. 519 */ 520 static int 521 on_mp_read_session_close_notification(struct query_state *qstate) 522 { 523 524 TRACE_IN(on_mp_read_session_close_notification); 525 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 526 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 527 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 528 qstate->mdata = NULL; 529 qstate->kevent_watermark = 0; 530 qstate->process_func = NULL; 531 TRACE_OUT(on_mp_read_session_close_notification); 532 return (0); 533 } 534