1 /*- 2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 */ 27 28 #include <sys/types.h> 29 #include <sys/event.h> 30 #include <sys/socket.h> 31 #include <sys/time.h> 32 33 #include <assert.h> 34 #include <errno.h> 35 #include <nsswitch.h> 36 #include <stdio.h> 37 #include <stdlib.h> 38 #include <string.h> 39 40 #include "cachelib.h" 41 #include "config.h" 42 #include "debug.h" 43 #include "log.h" 44 #include "query.h" 45 #include "mp_rs_query.h" 46 #include "mp_ws_query.h" 47 #include "singletons.h" 48 49 static int on_mp_read_session_close_notification(struct query_state *); 50 static void on_mp_read_session_destroy(struct query_state *); 51 static int on_mp_read_session_mapper(struct query_state *); 52 /* int on_mp_read_session_request_read1(struct query_state *); */ 53 static int on_mp_read_session_request_read2(struct query_state *); 54 static int on_mp_read_session_request_process(struct query_state *); 55 static int on_mp_read_session_response_write1(struct query_state *); 56 static int on_mp_read_session_read_request_process(struct query_state *); 57 static int on_mp_read_session_read_response_write1(struct query_state *); 58 static int on_mp_read_session_read_response_write2(struct query_state *); 59 60 /* 61 * This function is used as the query_state's destroy_func to make the 62 * proper cleanup in case of errors. 63 */ 64 static void 65 on_mp_read_session_destroy(struct query_state *qstate) 66 { 67 TRACE_IN(on_mp_read_session_destroy); 68 finalize_comm_element(&qstate->request); 69 finalize_comm_element(&qstate->response); 70 71 if (qstate->mdata != NULL) { 72 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 73 close_cache_mp_read_session( 74 (cache_mp_read_session)qstate->mdata); 75 configuration_unlock_entry(qstate->config_entry, 76 CELT_MULTIPART); 77 } 78 TRACE_OUT(on_mp_read_session_destroy); 79 } 80 81 /* 82 * The functions below are used to process multipart read session initiation 83 * requests. 84 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 85 * the request itself 86 * - on_mp_read_session_request_process processes it 87 * - on_mp_read_session_response_write1 sends the response 88 */ 89 int 90 on_mp_read_session_request_read1(struct query_state *qstate) 91 { 92 struct cache_mp_read_session_request *c_mp_rs_request; 93 ssize_t result; 94 95 TRACE_IN(on_mp_read_session_request_read1); 96 if (qstate->kevent_watermark == 0) 97 qstate->kevent_watermark = sizeof(size_t); 98 else { 99 init_comm_element(&qstate->request, 100 CET_MP_READ_SESSION_REQUEST); 101 c_mp_rs_request = get_cache_mp_read_session_request( 102 &qstate->request); 103 104 result = qstate->read_func(qstate, 105 &c_mp_rs_request->entry_length, sizeof(size_t)); 106 107 if (result != sizeof(size_t)) { 108 TRACE_OUT(on_mp_read_session_request_read1); 109 return (-1); 110 } 111 112 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 113 TRACE_OUT(on_mp_read_session_request_read1); 114 return (-1); 115 } 116 117 c_mp_rs_request->entry = calloc(1, 118 c_mp_rs_request->entry_length + 1); 119 assert(c_mp_rs_request->entry != NULL); 120 121 qstate->kevent_watermark = c_mp_rs_request->entry_length; 122 qstate->process_func = on_mp_read_session_request_read2; 123 } 124 TRACE_OUT(on_mp_read_session_request_read1); 125 return (0); 126 } 127 128 static int 129 on_mp_read_session_request_read2(struct query_state *qstate) 130 { 131 struct cache_mp_read_session_request *c_mp_rs_request; 132 ssize_t result; 133 134 TRACE_IN(on_mp_read_session_request_read2); 135 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 136 137 result = qstate->read_func(qstate, c_mp_rs_request->entry, 138 c_mp_rs_request->entry_length); 139 140 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 141 LOG_ERR_3("on_mp_read_session_request_read2", 142 "read failed"); 143 TRACE_OUT(on_mp_read_session_request_read2); 144 return (-1); 145 } 146 147 qstate->kevent_watermark = 0; 148 qstate->process_func = on_mp_read_session_request_process; 149 TRACE_OUT(on_mp_read_session_request_read2); 150 return (0); 151 } 152 153 static int 154 on_mp_read_session_request_process(struct query_state *qstate) 155 { 156 struct cache_mp_read_session_request *c_mp_rs_request; 157 struct cache_mp_read_session_response *c_mp_rs_response; 158 cache_mp_read_session rs; 159 cache_entry c_entry; 160 char *dec_cache_entry_name; 161 162 char *buffer; 163 size_t buffer_size; 164 cache_mp_write_session ws; 165 struct agent *lookup_agent; 166 struct multipart_agent *mp_agent; 167 void *mdata; 168 int res; 169 170 TRACE_IN(on_mp_read_session_request_process); 171 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 172 c_mp_rs_response = get_cache_mp_read_session_response( 173 &qstate->response); 174 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 175 176 qstate->config_entry = configuration_find_entry( 177 s_configuration, c_mp_rs_request->entry); 178 if (qstate->config_entry == NULL) { 179 c_mp_rs_response->error_code = ENOENT; 180 181 LOG_ERR_2("read_session_request", 182 "can't find configuration entry '%s'." 183 " aborting request", c_mp_rs_request->entry); 184 goto fin; 185 } 186 187 if (qstate->config_entry->enabled == 0) { 188 c_mp_rs_response->error_code = EACCES; 189 190 LOG_ERR_2("read_session_request", 191 "configuration entry '%s' is disabled", 192 c_mp_rs_request->entry); 193 goto fin; 194 } 195 196 if (qstate->config_entry->perform_actual_lookups != 0) 197 dec_cache_entry_name = strdup( 198 qstate->config_entry->mp_cache_params.cep.entry_name); 199 else { 200 #ifdef NS_NSCD_EID_CHECKING 201 if (check_query_eids(qstate) != 0) { 202 c_mp_rs_response->error_code = EPERM; 203 goto fin; 204 } 205 #endif 206 207 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 208 qstate->config_entry->mp_cache_params.cep.entry_name); 209 } 210 211 assert(dec_cache_entry_name != NULL); 212 213 configuration_lock_rdlock(s_configuration); 214 c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 215 configuration_unlock(s_configuration); 216 217 if ((c_entry == INVALID_CACHE) && 218 (qstate->config_entry->perform_actual_lookups != 0)) 219 c_entry = register_new_mp_cache_entry(qstate, 220 dec_cache_entry_name); 221 222 free(dec_cache_entry_name); 223 224 if (c_entry != INVALID_CACHE_ENTRY) { 225 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 226 rs = open_cache_mp_read_session(c_entry); 227 configuration_unlock_entry(qstate->config_entry, 228 CELT_MULTIPART); 229 230 if ((rs == INVALID_CACHE_MP_READ_SESSION) && 231 (qstate->config_entry->perform_actual_lookups != 0)) { 232 lookup_agent = find_agent(s_agent_table, 233 c_mp_rs_request->entry, MULTIPART_AGENT); 234 235 if ((lookup_agent != NULL) && 236 (lookup_agent->type == MULTIPART_AGENT)) { 237 mp_agent = (struct multipart_agent *) 238 lookup_agent; 239 mdata = mp_agent->mp_init_func(); 240 241 /* 242 * Multipart agents read the whole snapshot 243 * of the data at one time. 244 */ 245 configuration_lock_entry(qstate->config_entry, 246 CELT_MULTIPART); 247 ws = open_cache_mp_write_session(c_entry); 248 configuration_unlock_entry(qstate->config_entry, 249 CELT_MULTIPART); 250 if (ws != NULL) { 251 do { 252 buffer = NULL; 253 res = mp_agent->mp_lookup_func(&buffer, 254 &buffer_size, 255 mdata); 256 257 if ((res & NS_TERMINATE) && 258 (buffer != NULL)) { 259 configuration_lock_entry( 260 qstate->config_entry, 261 CELT_MULTIPART); 262 if (cache_mp_write(ws, buffer, 263 buffer_size) != 0) { 264 abandon_cache_mp_write_session(ws); 265 ws = NULL; 266 } 267 configuration_unlock_entry( 268 qstate->config_entry, 269 CELT_MULTIPART); 270 271 free(buffer); 272 buffer = NULL; 273 } else { 274 configuration_lock_entry( 275 qstate->config_entry, 276 CELT_MULTIPART); 277 close_cache_mp_write_session(ws); 278 configuration_unlock_entry( 279 qstate->config_entry, 280 CELT_MULTIPART); 281 282 free(buffer); 283 buffer = NULL; 284 } 285 } while ((res & NS_TERMINATE) && 286 (ws != NULL)); 287 } 288 289 configuration_lock_entry(qstate->config_entry, 290 CELT_MULTIPART); 291 rs = open_cache_mp_read_session(c_entry); 292 configuration_unlock_entry(qstate->config_entry, 293 CELT_MULTIPART); 294 } 295 } 296 297 if (rs == INVALID_CACHE_MP_READ_SESSION) 298 c_mp_rs_response->error_code = -1; 299 else { 300 qstate->mdata = rs; 301 qstate->destroy_func = on_mp_read_session_destroy; 302 303 configuration_lock_entry(qstate->config_entry, 304 CELT_MULTIPART); 305 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 306 (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 307 memcpy(&qstate->timeout, 308 &qstate->config_entry->mp_query_timeout, 309 sizeof(struct timeval)); 310 configuration_unlock_entry(qstate->config_entry, 311 CELT_MULTIPART); 312 } 313 } else 314 c_mp_rs_response->error_code = -1; 315 316 fin: 317 qstate->process_func = on_mp_read_session_response_write1; 318 qstate->kevent_watermark = sizeof(int); 319 qstate->kevent_filter = EVFILT_WRITE; 320 321 TRACE_OUT(on_mp_read_session_request_process); 322 return (0); 323 } 324 325 static int 326 on_mp_read_session_response_write1(struct query_state *qstate) 327 { 328 struct cache_mp_read_session_response *c_mp_rs_response; 329 ssize_t result; 330 331 TRACE_IN(on_mp_read_session_response_write1); 332 c_mp_rs_response = get_cache_mp_read_session_response( 333 &qstate->response); 334 result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 335 sizeof(int)); 336 337 if (result != sizeof(int)) { 338 LOG_ERR_3("on_mp_read_session_response_write1", 339 "write failed"); 340 TRACE_OUT(on_mp_read_session_response_write1); 341 return (-1); 342 } 343 344 if (c_mp_rs_response->error_code == 0) { 345 qstate->kevent_watermark = sizeof(int); 346 qstate->process_func = on_mp_read_session_mapper; 347 qstate->kevent_filter = EVFILT_READ; 348 } else { 349 qstate->kevent_watermark = 0; 350 qstate->process_func = NULL; 351 } 352 TRACE_OUT(on_mp_read_session_response_write1); 353 return (0); 354 } 355 356 /* 357 * Mapper function is used to avoid multiple connections for each session 358 * write or read requests. After processing the request, it does not close 359 * the connection, but waits for the next request. 360 */ 361 static int 362 on_mp_read_session_mapper(struct query_state *qstate) 363 { 364 ssize_t result; 365 int elem_type; 366 367 TRACE_IN(on_mp_read_session_mapper); 368 if (qstate->kevent_watermark == 0) { 369 qstate->kevent_watermark = sizeof(int); 370 } else { 371 result = qstate->read_func(qstate, &elem_type, sizeof(int)); 372 if (result != sizeof(int)) { 373 LOG_ERR_3("on_mp_read_session_mapper", 374 "read failed"); 375 TRACE_OUT(on_mp_read_session_mapper); 376 return (-1); 377 } 378 379 switch (elem_type) { 380 case CET_MP_READ_SESSION_READ_REQUEST: 381 qstate->kevent_watermark = 0; 382 qstate->process_func = 383 on_mp_read_session_read_request_process; 384 break; 385 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 386 qstate->kevent_watermark = 0; 387 qstate->process_func = 388 on_mp_read_session_close_notification; 389 break; 390 default: 391 qstate->kevent_watermark = 0; 392 qstate->process_func = NULL; 393 LOG_ERR_3("on_mp_read_session_mapper", 394 "unknown element type"); 395 TRACE_OUT(on_mp_read_session_mapper); 396 return (-1); 397 } 398 } 399 TRACE_OUT(on_mp_read_session_mapper); 400 return (0); 401 } 402 403 /* 404 * The functions below are used to process multipart read sessions read 405 * requests. User doesn't have to pass any kind of data, besides the 406 * request identificator itself. So we don't need any XXX_read functions and 407 * start with the XXX_process function. 408 * - on_mp_read_session_read_request_process processes it 409 * - on_mp_read_session_read_response_write1 and 410 * on_mp_read_session_read_response_write2 sends the response 411 */ 412 static int 413 on_mp_read_session_read_request_process(struct query_state *qstate) 414 { 415 struct cache_mp_read_session_read_response *read_response; 416 417 TRACE_IN(on_mp_read_session_response_process); 418 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 419 read_response = get_cache_mp_read_session_read_response( 420 &qstate->response); 421 422 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 423 read_response->error_code = cache_mp_read( 424 (cache_mp_read_session)qstate->mdata, NULL, 425 &read_response->data_size); 426 427 if (read_response->error_code == 0) { 428 read_response->data = malloc(read_response->data_size); 429 assert(read_response != NULL); 430 read_response->error_code = cache_mp_read( 431 (cache_mp_read_session)qstate->mdata, 432 read_response->data, 433 &read_response->data_size); 434 } 435 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 436 437 if (read_response->error_code == 0) 438 qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 439 else 440 qstate->kevent_watermark = sizeof(int); 441 qstate->process_func = on_mp_read_session_read_response_write1; 442 qstate->kevent_filter = EVFILT_WRITE; 443 444 TRACE_OUT(on_mp_read_session_response_process); 445 return (0); 446 } 447 448 static int 449 on_mp_read_session_read_response_write1(struct query_state *qstate) 450 { 451 struct cache_mp_read_session_read_response *read_response; 452 ssize_t result; 453 454 TRACE_IN(on_mp_read_session_read_response_write1); 455 read_response = get_cache_mp_read_session_read_response( 456 &qstate->response); 457 458 result = qstate->write_func(qstate, &read_response->error_code, 459 sizeof(int)); 460 if (read_response->error_code == 0) { 461 result += qstate->write_func(qstate, &read_response->data_size, 462 sizeof(size_t)); 463 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 464 TRACE_OUT(on_mp_read_session_read_response_write1); 465 LOG_ERR_3("on_mp_read_session_read_response_write1", 466 "write failed"); 467 return (-1); 468 } 469 470 qstate->kevent_watermark = read_response->data_size; 471 qstate->process_func = on_mp_read_session_read_response_write2; 472 } else { 473 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 474 LOG_ERR_3("on_mp_read_session_read_response_write1", 475 "write failed"); 476 TRACE_OUT(on_mp_read_session_read_response_write1); 477 return (-1); 478 } 479 480 qstate->kevent_watermark = 0; 481 qstate->process_func = NULL; 482 } 483 484 TRACE_OUT(on_mp_read_session_read_response_write1); 485 return (0); 486 } 487 488 static int 489 on_mp_read_session_read_response_write2(struct query_state *qstate) 490 { 491 struct cache_mp_read_session_read_response *read_response; 492 ssize_t result; 493 494 TRACE_IN(on_mp_read_session_read_response_write2); 495 read_response = get_cache_mp_read_session_read_response( 496 &qstate->response); 497 result = qstate->write_func(qstate, read_response->data, 498 read_response->data_size); 499 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 500 LOG_ERR_3("on_mp_read_session_read_response_write2", 501 "write failed"); 502 TRACE_OUT(on_mp_read_session_read_response_write2); 503 return (-1); 504 } 505 506 finalize_comm_element(&qstate->request); 507 finalize_comm_element(&qstate->response); 508 509 qstate->kevent_watermark = sizeof(int); 510 qstate->process_func = on_mp_read_session_mapper; 511 qstate->kevent_filter = EVFILT_READ; 512 513 TRACE_OUT(on_mp_read_session_read_response_write2); 514 return (0); 515 } 516 517 /* 518 * Handles session close notification by calling close_cache_mp_read_session 519 * function. 520 */ 521 static int 522 on_mp_read_session_close_notification(struct query_state *qstate) 523 { 524 525 TRACE_IN(on_mp_read_session_close_notification); 526 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 527 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 528 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 529 qstate->mdata = NULL; 530 qstate->kevent_watermark = 0; 531 qstate->process_func = NULL; 532 TRACE_OUT(on_mp_read_session_close_notification); 533 return (0); 534 } 535