1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2011-2014 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #include "internal.h" 19 #include "logging.h" 20 #include "vbucket/aliases.h" 21 #include "settings.h" 22 #include "negotiate.h" 23 #include "bucketconfig/clconfig.h" 24 #include "mc/mcreq-flush-inl.h" 25 #include <lcbio/ssl.h> 26 #include "ctx-log-inl.h" 27 28 #define LOGARGS(c, lvl) (c)->settings, "server", LCB_LOG_##lvl, __FILE__, __LINE__ 29 #define LOGARGS_T(lvl) LOGARGS(this, lvl) 30 31 #define LOGFMT CTX_LOGFMT_PRE ",SRV=%p,IX=%d) " 32 #define PKTFMT "OP=0x%x, RC=0x%x, SEQ=%u" 33 #define PKTARGS(pkt) (pkt).opcode(), (pkt).status(), (pkt).opaque() 34 35 #define LOGID(server) CTX_LOGID(server->connctx), (void *)server, server->index 36 #define LOGID_T() LOGID(this) 37 38 #define MCREQ_MAXIOV 32 39 #define LCBCONN_UNWANT(conn, flags) (conn)->want &= ~(flags) 40 41 using namespace lcb; 42 43 static void on_error(lcbio_CTX *ctx, lcb_error_t err); 44 45 static void 46 on_flush_ready(lcbio_CTX *ctx) 47 { 48 Server *server = Server::get(ctx); 49 nb_IOV iov[MCREQ_MAXIOV] = {}; 50 int ready; 51 52 do { 53 int niov = 0; 54 unsigned nb; 55 nb = mcreq_flush_iov_fill(server, iov, MCREQ_MAXIOV, &niov); 56 if (!nb) { 57 return; 58 } 59 #ifdef LCB_DUMP_PACKETS 60 { 61 char *b64 = NULL; 62 int nb64 = 0; 63 lcb_base64_encode_iov((lcb_IOV *)iov, niov, nb, &b64, &nb64); 64 lcb_log(LOGARGS(server, TRACE), LOGFMT "pkt,snd,fill: size=%d, %.*s", LOGID(server), nb64, nb64, b64); 65 free(b64); 66 } 67 #endif 68 ready = lcbio_ctx_put_ex(ctx, (lcb_IOV *)iov, niov, nb); 69 } while (ready); 70 lcbio_ctx_wwant(ctx); 71 } 72 73 static void 74 on_flush_done(lcbio_CTX *ctx, unsigned expected, unsigned actual) 75 { 76 Server *server = Server::get(ctx); 77 lcb_U64 now = 0; 78 if (server->settings->readj_ts_wait) { 79 now = gethrtime(); 80 } 81 82 #ifdef LCB_DUMP_PACKETS 83 lcb_log(LOGARGS(server, TRACE), LOGFMT "pkt,snd,flush: expected=%u, actual=%u", LOGID(server), expected, actual); 84 #endif 85 mcreq_flush_done_ex(server, actual, expected, now); 86 server->check_closed(); 87 } 88 89 void 90 Server::flush() 91 { 92 /** Call into the wwant stuff.. */ 93 if (!connctx->rdwant) { 94 lcbio_ctx_rwant(connctx, 24); 95 } 96 97 lcbio_ctx_wwant(connctx); 98 lcbio_ctx_schedule(connctx); 99 100 if (!lcbio_timer_armed(io_timer)) { 101 /** 102 * XXX: Maybe use get_next_timeout(), although here we can assume 103 * that a command was just scheduled 104 */ 105 lcbio_timer_rearm(io_timer, default_timeout()); 106 } 107 } 108 109 LIBCOUCHBASE_API 110 void 111 lcb_sched_flush(lcb_t instance) 112 { 113 for (size_t ii = 0; ii < LCBT_NSERVERS(instance); ii++) { 114 Server *server = instance->get_server(ii); 115 116 if (!server->has_pending()) { 117 continue; 118 } 119 server->flush_start(server); 120 } 121 } 122 123 /** 124 * Invoked when get a NOT_MY_VBUCKET response. If the response contains a JSON 125 * payload then we refresh the configuration with it. 126 * 127 * This function returns 1 if the operation was successfully rescheduled; 128 * otherwise it returns 0. If it returns 0 then we give the error back to the 129 * user. 130 */ 131 bool 132 Server::handle_nmv(MemcachedResponse& resinfo, mc_PACKET *oldpkt) 133 { 134 protocol_binary_request_header hdr; 135 lcb_error_t err = LCB_ERROR; 136 lcb_U16 vbid; 137 lcb::clconfig::Provider *cccp = 138 instance->confmon->get_provider(lcb::clconfig::CLCONFIG_CCCP); 139 140 MC_INCR_METRIC(this, packets_nmv, 1); 141 142 mcreq_read_hdr(oldpkt, &hdr); 143 vbid = ntohs(hdr.request.vbucket); 144 lcb_log(LOGARGS_T(WARN), LOGFMT "NOT_MY_VBUCKET. Packet=%p (S=%u). VBID=%u", LOGID_T(), (void*)oldpkt, oldpkt->opaque, vbid); 145 146 /* Notify of new map */ 147 lcb_vbguess_remap(instance, vbid, index); 148 149 if (resinfo.vallen() && cccp->enabled) { 150 std::string s(resinfo.value(), resinfo.vallen()); 151 err = lcb::clconfig::cccp_update(cccp, curhost->host, s.c_str()); 152 } 153 154 if (err != LCB_SUCCESS) { 155 int bs_options; 156 if (instance->cur_configinfo->get_origin() == lcb::clconfig::CLCONFIG_CCCP) { 157 /** 158 * XXX: Not enough to see if cccp was enabled, since cccp might 159 * be requested by a user, but would still not actually be active 160 * for clusters < 2.5 If our current config is from CCCP 161 * then we can be fairly certain that CCCP is indeed working. 162 * 163 * For this reason, we don't use if (cccp->enabled) {...} 164 */ 165 bs_options = BS_REFRESH_THROTTLE; 166 } else { 167 bs_options = BS_REFRESH_ALWAYS; 168 } 169 instance->bootstrap(bs_options); 170 } 171 172 if (!lcb_should_retry(settings, oldpkt, LCB_NOT_MY_VBUCKET)) { 173 return false; 174 } 175 176 /** Reschedule the packet again .. */ 177 mc_PACKET *newpkt = mcreq_renew_packet(oldpkt); 178 newpkt->flags &= ~MCREQ_STATE_FLAGS; 179 instance->retryq->nmvadd((mc_EXPACKET*)newpkt); 180 return true; 181 } 182 183 /** 184 * Determine if this is an error code that we can pass to the user, or can 185 * otherwise handle "innately" 186 */ 187 static bool is_fastpath_error(uint16_t rc) { 188 switch (rc) { 189 case PROTOCOL_BINARY_RESPONSE_SUCCESS: 190 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: 191 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: 192 case PROTOCOL_BINARY_RESPONSE_E2BIG: 193 case PROTOCOL_BINARY_RESPONSE_NOT_STORED: 194 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL: 195 case PROTOCOL_BINARY_RESPONSE_ERANGE: 196 case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED: 197 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: 198 case PROTOCOL_BINARY_RESPONSE_ETMPFAIL: 199 case PROTOCOL_BINARY_RESPONSE_ENOMEM: 200 case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_ENOENT: 201 case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EEXISTS: 202 case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_MISMATCH: 203 case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EINVAL: 204 case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_E2BIG: 205 case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_CANTINSERT: 206 case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_ETOODEEP: 207 case PROTOCOL_BINARY_RESPONSE_SUBDOC_DOC_NOTJSON: 208 case PROTOCOL_BINARY_RESPONSE_SUBDOC_NUM_ERANGE: 209 case PROTOCOL_BINARY_RESPONSE_SUBDOC_DELTA_ERANGE: 210 case PROTOCOL_BINARY_RESPONSE_SUBDOC_INVALID_COMBO: 211 case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE: 212 case PROTOCOL_BINARY_RESPONSE_SUBDOC_SUCCESS_DELETED: 213 case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_INVALID_FLAG_COMBO: 214 case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_INVALID_KEY_COMBO: 215 case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_UNKNOWN_MACRO: 216 case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_UNKNOWN_VATTR: 217 case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_CANT_MODIFY_VATTR: 218 case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE_DELETED: 219 case PROTOCOL_BINARY_RESPONSE_SUBDOC_INVALID_XATTR_ORDER: 220 case PROTOCOL_BINARY_RESPONSE_EACCESS: 221 return true; 222 default: 223 if (rc >= 0xc0 && rc <= 0xcc) { 224 // other subdoc? 225 return true; 226 } else { 227 return false; 228 } 229 break; 230 } 231 } 232 233 #define ERRMAP_HANDLE_CONTINUE 0 234 #define ERRMAP_HANDLE_DISCONN 1 235 #define ERRMAP_HANDLE_RETRY 2 236 237 /** 238 * Handle an unknown memcached error 239 * 240 * @param mcresp Response which contains the unknown error 241 * @param[out] newerr more user-friendly based on error map attributes 242 * 243 * @return true if this function handled the error specially (by disconnecting) 244 * or false if normal handling should continue. 245 */ 246 int Server::handle_unknown_error(const mc_PACKET *request, 247 const MemcachedResponse& mcresp, 248 lcb_error_t& newerr) { 249 250 if (!settings->errmap->isLoaded() || !settings->use_errmap) { 251 // If there's no error map, just return false 252 return ERRMAP_HANDLE_CONTINUE; 253 } 254 255 // Look up the error map definition for this error 256 const errmap::Error& err = settings->errmap->getError(mcresp.status()); 257 258 if (!err.isValid() || err.hasAttribute(errmap::SPECIAL_HANDLING)) { 259 lcb_log(LOGARGS_T(ERR), LOGFMT "Received error not in error map or requires special handling! " PKTFMT, LOGID_T(), PKTARGS(mcresp)); 260 lcbio_ctx_senderr(connctx, LCB_PROTOCOL_ERROR); 261 return ERRMAP_HANDLE_DISCONN; 262 } else { 263 lcb_log(LOGARGS_T(WARN), LOGFMT "Received server error %s (0x%x) on packet: " PKTFMT, LOGID_T(), err.shortname.c_str(), err.code, PKTARGS(mcresp)); 264 } 265 266 if (err.hasAttribute(errmap::FETCH_CONFIG)) { 267 instance->bootstrap(BS_REFRESH_THROTTLE); 268 } 269 270 if (err.hasAttribute(errmap::TEMPORARY)) { 271 newerr = LCB_GENERIC_TMPERR; 272 } 273 274 if (err.hasAttribute(errmap::CONSTRAINT_FAILURE)) { 275 newerr = LCB_GENERIC_CONSTRAINT_ERR; 276 } 277 278 if (err.hasAttribute(errmap::AUTH)) { 279 newerr = LCB_AUTH_ERROR; 280 } 281 282 if (err.hasAttribute(errmap::SUBDOC) && newerr == LCB_SUCCESS) { 283 newerr = LCB_GENERIC_SUBDOCERR; 284 } 285 286 /* TODO: remove masking LOCKED in 3.0 release */ 287 if (err.hasAttribute(errmap::ITEM_LOCKED)) { 288 switch (mcresp.opcode()) { 289 case PROTOCOL_BINARY_CMD_SET: 290 case PROTOCOL_BINARY_CMD_REPLACE: 291 case PROTOCOL_BINARY_CMD_DELETE: 292 newerr = LCB_KEY_EEXISTS; 293 break; 294 default: 295 newerr = LCB_ETMPFAIL; 296 } 297 } 298 299 int rv = 0; 300 301 if (err.hasAttribute(errmap::AUTO_RETRY)) { 302 errmap::RetrySpec *spec = err.getRetrySpec(); 303 304 mc_PACKET *newpkt = mcreq_renew_packet(request); 305 newpkt->flags &= ~MCREQ_STATE_FLAGS; 306 instance->retryq->add((mc_EXPACKET *)newpkt, newerr ? newerr : LCB_ERROR, spec); 307 rv |= ERRMAP_HANDLE_RETRY; 308 } 309 310 if (err.hasAttribute(errmap::CONN_STATE_INVALIDATED)) { 311 if (newerr != LCB_SUCCESS) { 312 newerr = LCB_ERROR; 313 } 314 lcbio_ctx_senderr(connctx, newerr); 315 rv |= ERRMAP_HANDLE_DISCONN; 316 } 317 318 return rv; 319 320 } 321 322 /* This function is called within a loop to process a single packet. 323 * 324 * If a full packet is available, it will process the packet and return 325 * PKT_READ_COMPLETE, resulting in the `on_read()` function calling this 326 * function in a loop. 327 * 328 * When a complete packet is not available, PKT_READ_PARTIAL will be returned 329 * and the `on_read()` loop will exit, scheduling any required pending I/O. 330 */ 331 Server::ReadState 332 Server::try_read(lcbio_CTX *ctx, rdb_IOROPE *ior) 333 { 334 MemcachedResponse mcresp; 335 mc_PACKET *request; 336 unsigned pktsize = 24, is_last = 1; 337 338 #define RETURN_NEED_MORE(n) \ 339 if (has_pending()) { \ 340 lcbio_ctx_rwant(ctx, n); \ 341 } \ 342 return PKT_READ_PARTIAL; \ 343 344 #define DO_ASSIGN_PAYLOAD() \ 345 rdb_consumed(ior, mcresp.hdrsize()); \ 346 if (mcresp.bodylen()) { \ 347 mcresp.payload = rdb_get_consolidated(ior, mcresp.bodylen()); \ 348 } { 349 350 #define DO_SWALLOW_PAYLOAD() \ 351 } if (mcresp.bodylen()) { \ 352 rdb_consumed(ior, mcresp.bodylen()); \ 353 } 354 355 if (rdb_get_nused(ior) < pktsize) { 356 RETURN_NEED_MORE(pktsize) 357 } 358 359 MC_INCR_METRIC(this, packets_read, 1); 360 361 /* copy bytes into the info structure */ 362 rdb_copyread(ior, mcresp.hdrbytes(), mcresp.hdrsize()); 363 364 pktsize += mcresp.bodylen(); 365 if (rdb_get_nused(ior) < pktsize) { 366 RETURN_NEED_MORE(pktsize); 367 } 368 369 /* Find the packet */ 370 if (mcresp.opcode() == PROTOCOL_BINARY_CMD_STAT && mcresp.keylen() != 0) { 371 is_last = 0; 372 request = mcreq_pipeline_find(this, mcresp.opaque()); 373 } else { 374 is_last = 1; 375 request = mcreq_pipeline_remove(this, mcresp.opaque()); 376 } 377 378 if (!request) { 379 MC_INCR_METRIC(this, packets_ownerless, 1); 380 lcb_log(LOGARGS_T(DEBUG), LOGFMT "Server sent us reply for a timed-out command. (OP=0x%x, RC=0x%x, SEQ=%u)", LOGID_T(), mcresp.opcode(), mcresp.status(), mcresp.opaque()); 381 rdb_consumed(ior, pktsize); 382 return PKT_READ_COMPLETE; 383 } 384 385 lcb_error_t err_override = LCB_SUCCESS; 386 ReadState rdstate = PKT_READ_COMPLETE; 387 int unknown_err_rv; 388 389 /* Check if the status code is one which must be handled carefully by the 390 * client */ 391 if (is_fastpath_error(mcresp.status())) { 392 // Nothing here! 393 } else if (mcresp.status() == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) { 394 /* consume the header */ 395 DO_ASSIGN_PAYLOAD() 396 if (!handle_nmv(mcresp, request)) { 397 mcreq_dispatch_response(this, request, &mcresp, LCB_NOT_MY_VBUCKET); 398 } 399 DO_SWALLOW_PAYLOAD() 400 goto GT_DONE; 401 } else if ((unknown_err_rv = 402 handle_unknown_error(request, mcresp, err_override)) != 403 ERRMAP_HANDLE_CONTINUE) { 404 DO_ASSIGN_PAYLOAD() 405 if (!(unknown_err_rv & ERRMAP_HANDLE_RETRY)) { 406 mcreq_dispatch_response(this, request, &mcresp, err_override); 407 } 408 DO_SWALLOW_PAYLOAD() 409 if (unknown_err_rv & ERRMAP_HANDLE_DISCONN) { 410 rdstate = PKT_READ_ABORT; 411 } 412 goto GT_DONE; 413 } 414 415 /* Figure out if the request is 'ufwd' or not */ 416 if (!(request->flags & MCREQ_F_UFWD)) { 417 DO_ASSIGN_PAYLOAD(); 418 mcresp.bufh = rdb_get_first_segment(ior); 419 mcreq_dispatch_response(this, request, &mcresp, err_override); 420 DO_SWALLOW_PAYLOAD() 421 422 } else { 423 /* figure out how many buffers we want to use as an upper limit for the 424 * IOV arrays. Currently we'll keep it simple and ensure the entire 425 * response is contiguous. */ 426 lcb_PKTFWDRESP resp = { 0 }; /* TODO: next ABI version should include is_last flag */ 427 rdb_ROPESEG *segs; 428 nb_IOV iov; 429 430 rdb_consolidate(ior, pktsize); 431 rdb_refread_ex(ior, &iov, &segs, 1, pktsize); 432 433 resp.bufs = &segs; 434 resp.iovs = (lcb_IOV*)&iov; 435 resp.nitems = 1; 436 resp.header = mcresp.hdrbytes(); 437 instance->callbacks.pktfwd( 438 instance, MCREQ_PKT_COOKIE(request), LCB_SUCCESS, &resp); 439 rdb_consumed(ior, pktsize); 440 } 441 442 GT_DONE: 443 if (is_last) { 444 mcreq_packet_handled(this, request); 445 } 446 return rdstate; 447 } 448 449 static void 450 on_read(lcbio_CTX *ctx, unsigned) 451 { 452 Server *server = Server::get(ctx); 453 rdb_IOROPE *ior = &ctx->ior; 454 455 if (server->check_closed()) { 456 return; 457 } 458 459 Server::ReadState rv; 460 while ((rv = server->try_read(ctx, ior)) == Server::PKT_READ_COMPLETE); 461 lcbio_ctx_schedule(ctx); 462 lcb_maybe_breakout(server->instance); 463 } 464 465 static void flush_noop(mc_PIPELINE *pipeline) { 466 (void)pipeline; 467 } 468 469 static void server_connect(Server *server) { 470 server->connect(); 471 } 472 473 bool 474 Server::maybe_retry_packet(mc_PACKET *pkt, lcb_error_t err) 475 { 476 lcbvb_DISTMODE dist_t = lcbvb_get_distmode(parent->config); 477 478 if (dist_t != LCBVB_DIST_VBUCKET) { 479 /** memcached bucket */ 480 return false; 481 } 482 if (!lcb_should_retry(settings, pkt, err)) { 483 return false; 484 } 485 486 mc_PACKET *newpkt = mcreq_renew_packet(pkt); 487 newpkt->flags &= ~MCREQ_STATE_FLAGS; 488 // TODO: Load the 4th argument from the error map 489 instance->retryq->add((mc_EXPACKET *)newpkt, err, NULL); 490 return true; 491 } 492 493 static void 494 fail_callback(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err, void *) { 495 static_cast<Server*>(pipeline)->purge_single(pkt, err); 496 } 497 498 static const char *opcode_name(uint8_t code) 499 { 500 switch (code) { 501 case PROTOCOL_BINARY_CMD_GET: 502 return "get"; 503 case PROTOCOL_BINARY_CMD_SET: 504 return "set"; 505 case PROTOCOL_BINARY_CMD_ADD: 506 return "add"; 507 case PROTOCOL_BINARY_CMD_REPLACE: 508 return "replace"; 509 case PROTOCOL_BINARY_CMD_DELETE: 510 return "delete"; 511 case PROTOCOL_BINARY_CMD_INCREMENT: 512 return "incr"; 513 case PROTOCOL_BINARY_CMD_DECREMENT: 514 return "decr"; 515 case PROTOCOL_BINARY_CMD_FLUSH: 516 return "flush"; 517 case PROTOCOL_BINARY_CMD_GETQ: 518 return "getq"; 519 case PROTOCOL_BINARY_CMD_NOOP: 520 return "noop"; 521 case PROTOCOL_BINARY_CMD_VERSION: 522 return "version"; 523 case PROTOCOL_BINARY_CMD_APPEND: 524 return "append"; 525 case PROTOCOL_BINARY_CMD_PREPEND: 526 return "prepend"; 527 case PROTOCOL_BINARY_CMD_STAT: 528 return "stat"; 529 case PROTOCOL_BINARY_CMD_VERBOSITY: 530 return "verbosity"; 531 case PROTOCOL_BINARY_CMD_TOUCH: 532 return "touch"; 533 case PROTOCOL_BINARY_CMD_GAT: 534 return "gat"; 535 case PROTOCOL_BINARY_CMD_HELLO: 536 return "hello"; 537 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: 538 return "sasl_list_mechs"; 539 case PROTOCOL_BINARY_CMD_SASL_AUTH: 540 return "sasl_auth"; 541 case PROTOCOL_BINARY_CMD_SASL_STEP: 542 return "sasl_step"; 543 case PROTOCOL_BINARY_CMD_GET_REPLICA: 544 return "get_replica"; 545 case PROTOCOL_BINARY_CMD_SELECT_BUCKET: 546 return "select_bucket"; 547 case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO: 548 return "observe_seqno"; 549 case PROTOCOL_BINARY_CMD_OBSERVE: 550 return "observe"; 551 case PROTOCOL_BINARY_CMD_GET_LOCKED: 552 return "get_locked"; 553 case PROTOCOL_BINARY_CMD_UNLOCK_KEY: 554 return "unlock_key"; 555 case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG: 556 return "get_cluster_config"; 557 case PROTOCOL_BINARY_CMD_SUBDOC_GET: 558 return "subdoc_get"; 559 case PROTOCOL_BINARY_CMD_SUBDOC_EXISTS: 560 return "subdoc_exists"; 561 case PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD: 562 return "subdoc_dict_add"; 563 case PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT: 564 return "subdoc_dict_upsert"; 565 case PROTOCOL_BINARY_CMD_SUBDOC_DELETE: 566 return "subdoc_delete"; 567 case PROTOCOL_BINARY_CMD_SUBDOC_REPLACE: 568 return "subdoc_replace"; 569 case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST: 570 return "subdoc_array_push_last"; 571 case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST: 572 return "subdoc_array_push_first"; 573 case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT: 574 return "subdoc_array_insert"; 575 case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE: 576 return "subdoc_array_add_unique"; 577 case PROTOCOL_BINARY_CMD_SUBDOC_COUNTER: 578 return "subdoc_counter"; 579 case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP: 580 return "subdoc_multi_lookup"; 581 case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION: 582 return "subdoc_multi_mutation"; 583 case PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT: 584 return "subdoc_get_count"; 585 case PROTOCOL_BINARY_CMD_GET_ERROR_MAP: 586 return "get_error_map"; 587 default: 588 return "unknown"; 589 } 590 } 591 592 void Server::purge_single(mc_PACKET *pkt, lcb_error_t err) { 593 if (maybe_retry_packet(pkt, err)) { 594 return; 595 } 596 597 if (err == LCB_AUTH_ERROR) { 598 /* In-situ auth errors are actually dead servers. Let's provide this 599 * as the actual error code. */ 600 err = LCB_MAP_CHANGED; 601 } 602 603 if (err == LCB_ETIMEDOUT) { 604 lcb_error_t tmperr = lcb::RetryQueue::error_for(pkt); 605 if (tmperr != LCB_SUCCESS) { 606 err = tmperr; 607 } 608 } 609 610 protocol_binary_request_header hdr; 611 memcpy(hdr.bytes, SPAN_BUFFER(&pkt->kh_span), sizeof(hdr.bytes)); 612 MemcachedResponse resp(protocol_binary_command(hdr.request.opcode), 613 hdr.request.opaque, 614 PROTOCOL_BINARY_RESPONSE_EINVAL); 615 616 #ifdef LCB_TRACING 617 lcbtrace_span_set_orphaned(MCREQ_PKT_RDATA(pkt)->span, true); 618 #endif 619 if (err == LCB_ETIMEDOUT && settings->use_tracing) { 620 Json::Value info; 621 622 char opid[30] = {}; 623 snprintf(opid, sizeof(opid), "kv:%s", opcode_name(hdr.request.opcode)); 624 info["s"] = opid; 625 info["b"] = settings->bucket; 626 info["t"] = settings->operation_timeout; 627 628 const lcb_host_t &remote = get_host(); 629 std::string rhost; 630 if (remote.ipv6) { 631 rhost.append("[").append(remote.host).append("]:").append(remote.port); 632 } else { 633 rhost.append(remote.host).append(":").append(remote.port); 634 } 635 info["r"] = rhost.c_str(); 636 637 if (connctx) { 638 char local_id[54] = {}; 639 snprintf(local_id, sizeof(local_id), "%016" PRIx64 "/%016" PRIx64 "/%x", 640 (lcb_U64)settings->iid, connctx->sock->id, (int)pkt->opaque); 641 info["i"] = local_id; 642 info["l"] = connctx->sock->info->ep_local; 643 } 644 std::string msg(Json::FastWriter().write(info)); 645 if (msg.size() > 1) { 646 lcb_log(LOGARGS(instance, WARN), "Failing command with error %s: %.*s", 647 lcb_strerror_short(err), (int)(msg.size() - 1), msg.c_str()); 648 } 649 } else { 650 lcb_log(LOGARGS_T(WARN), LOGFMT "Failing command (pkt=%p, opaque=%lu, opcode=0x%x) with error %s", LOGID_T(), (void*)pkt, (unsigned long)pkt->opaque, hdr.request.opcode, lcb_strerror_short(err)); 651 } 652 int rv = mcreq_dispatch_response(this, pkt, &resp, err); 653 lcb_assert(rv == 0); 654 } 655 656 int 657 Server::purge(lcb_error_t error, hrtime_t thresh, hrtime_t *next, 658 RefreshPolicy policy) 659 { 660 unsigned affected; 661 662 if (thresh) { 663 affected = mcreq_pipeline_timeout( 664 this, error, fail_callback, NULL, thresh, next); 665 666 } else { 667 mcreq_pipeline_fail(this, error, fail_callback, NULL); 668 affected = -1; 669 } 670 671 MC_INCR_METRIC(this, packets_errored, affected); 672 if (policy == REFRESH_NEVER) { 673 return affected; 674 } 675 676 if (affected || policy == REFRESH_ALWAYS) { 677 instance->bootstrap(BS_REFRESH_THROTTLE|BS_REFRESH_INCRERR); 678 } 679 return affected; 680 } 681 682 static void flush_errdrain(mc_PIPELINE *pipeline) 683 { 684 /* Called when we are draining errors. */ 685 Server *server = (Server *)pipeline; 686 if (!lcbio_timer_armed(server->io_timer)) { 687 lcbio_timer_rearm(server->io_timer, server->default_timeout()); 688 } 689 } 690 691 uint32_t 692 Server::next_timeout() const 693 { 694 hrtime_t now, expiry, diff; 695 mc_PACKET *pkt = mcreq_first_packet(this); 696 697 if (!pkt) { 698 return default_timeout(); 699 } 700 701 now = gethrtime(); 702 expiry = MCREQ_PKT_RDATA(pkt)->start + LCB_US2NS(default_timeout()); 703 if (expiry <= now) { 704 diff = 0; 705 } else { 706 diff = expiry - now; 707 } 708 709 return LCB_NS2US(diff); 710 } 711 712 static void 713 timeout_server(void *arg) 714 { 715 reinterpret_cast<Server*>(arg)->io_timeout(); 716 } 717 718 void Server::io_timeout() 719 { 720 hrtime_t now = gethrtime(); 721 hrtime_t min_valid = now - LCB_US2NS(default_timeout()); 722 723 hrtime_t next_ns; 724 int npurged = purge(LCB_ETIMEDOUT, min_valid, &next_ns, 725 Server::REFRESH_ONFAILED); 726 if (npurged) { 727 MC_INCR_METRIC(this, packets_timeout, npurged); 728 lcb_log(LOGARGS_T(DEBUG), LOGFMT "Server timed out. Some commands have failed", LOGID_T()); 729 } 730 731 uint32_t next_us = next_timeout(); 732 lcb_log(LOGARGS_T(TRACE), LOGFMT "Scheduling next timeout for %u ms. This is not an error", LOGID_T(), next_us / 1000); 733 lcbio_timer_rearm(io_timer, next_us); 734 lcb_maybe_breakout(instance); 735 } 736 737 bool 738 Server::maybe_reconnect_on_fake_timeout(lcb_error_t err) 739 { 740 if (err != LCB_ETIMEDOUT) { 741 return false; /* not a timeout */ 742 } 743 if (!settings->readj_ts_wait) { 744 return false; /* normal timeout behavior */ 745 } 746 if (!has_pending()) { 747 return false; /* nothing pending */ 748 } 749 750 uint32_t next_tmo = next_timeout(); 751 if (next_tmo < default_timeout() / 2) { 752 /* Ideally we'd have a fuzz interval to shave off the actual timeout, 753 * since there will inevitably be some time taken off the next timeout */ 754 return false; 755 } 756 757 lcb_log(LOGARGS_T(INFO), LOGFMT "Retrying connection. Assuming timeout because of stalled event loop", LOGID_T()); 758 connect(); 759 return true; 760 } 761 762 static void 763 on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR syserr) 764 { 765 Server *server = reinterpret_cast<Server*>(data); 766 server->handle_connected(sock, err, syserr); 767 } 768 769 static void mcserver_flush(Server *s) { s->flush(); } 770 771 void 772 Server::handle_connected(lcbio_SOCKET *sock, lcb_error_t err, lcbio_OSERR syserr) 773 { 774 connreq = NULL; 775 776 if (err != LCB_SUCCESS) { 777 lcb_log(LOGARGS_T(ERR), LOGFMT "Connection attempt failed. Received %s from libcouchbase, received %d from operating system", LOGID_T(), lcb_strerror_short(err), syserr); 778 MC_INCR_METRIC(this, iometrics.io_error, 1); 779 if (!maybe_reconnect_on_fake_timeout(err)) { 780 socket_failed(err); 781 } 782 return; 783 } 784 785 lcb_assert(sock); 786 if (metrics) { 787 lcbio_set_metrics(sock, &metrics->iometrics); 788 } 789 790 /** Do we need sasl? */ 791 SessionInfo* sessinfo = SessionInfo::get(sock); 792 if (sessinfo == NULL) { 793 lcb_log(LOGARGS_T(TRACE), "<%s:%s> (SRV=%p) Session not yet negotiated. Negotiating", curhost->host, curhost->port, (void*)this); 794 connreq = SessionRequest::start( 795 sock, settings, default_timeout(), on_connected, this); 796 return; 797 } else { 798 jsonsupport = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_JSON); 799 compsupport = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_SNAPPY); 800 mutation_tokens = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_MUTATION_SEQNO); 801 } 802 803 lcbio_CTXPROCS procs; 804 procs.cb_err = on_error; 805 procs.cb_read = on_read; 806 procs.cb_flush_done = on_flush_done; 807 procs.cb_flush_ready = on_flush_ready; 808 connctx = lcbio_ctx_new(sock, this, &procs); 809 connctx->subsys = "memcached"; 810 sock->service = LCBIO_SERVICE_KV; 811 flush_start = (mcreq_flushstart_fn)mcserver_flush; 812 813 uint32_t tmo = next_timeout(); 814 lcbio_timer_rearm(io_timer, tmo); 815 flush(); 816 } 817 818 void 819 Server::connect() 820 { 821 connreq = instance->memd_sockpool->get(*curhost, 822 default_timeout(), on_connected, this); 823 flush_start = flush_noop; 824 state = Server::S_CLEAN; 825 } 826 827 static void 828 buf_done_cb(mc_PIPELINE *pl, const void *cookie, void *, void *) 829 { 830 Server *server = static_cast<Server*>(pl); 831 server->instance->callbacks.pktflushed(server->instance, cookie); 832 } 833 834 Server::Server(lcb_t instance_, int ix) 835 : mc_PIPELINE(), state(S_CLEAN), 836 io_timer(lcbio_timer_new(instance_->iotable, this, timeout_server)), 837 instance(instance_), 838 settings(lcb_settings_ref2(instance_->settings)), 839 compsupport(0), 840 jsonsupport(0), 841 mutation_tokens(0), 842 connctx(NULL), 843 curhost(new lcb_host_t()) 844 { 845 mcreq_pipeline_init(this); 846 flush_start = (mcreq_flushstart_fn)server_connect; 847 buf_done_callback = buf_done_cb; 848 index = ix; 849 850 std::memset(&connreq, 0, sizeof connreq); 851 std::memset(curhost, 0, sizeof *curhost); 852 853 const char *datahost = lcbvb_get_hostport( 854 LCBT_VBCONFIG(instance), ix, 855 LCBVB_SVCTYPE_DATA, LCBT_SETTING_SVCMODE(instance)); 856 if (datahost) { 857 lcb_host_parsez(curhost, datahost, LCB_CONFIG_MCD_PORT); 858 } 859 860 if (settings->metrics) { 861 /** Allocate / reinitialize the metrics here */ 862 metrics = lcb_metrics_getserver(settings->metrics, curhost->host, curhost->port, 1); 863 lcb_metrics_reset_pipeline_gauges(metrics); 864 } 865 } 866 867 Server::Server() 868 : state(S_TEMPORARY), 869 io_timer(NULL), instance(NULL), settings(NULL), compsupport(0), jsonsupport(0), 870 mutation_tokens(0), connctx(NULL), connreq(NULL), curhost(NULL) 871 { 872 } 873 874 Server::~Server() { 875 if (state == S_TEMPORARY) { 876 return; 877 } 878 879 if (this->instance) { 880 unsigned ii; 881 mc_CMDQUEUE *cmdq = &this->instance->cmdq; 882 for (ii = 0; ii < cmdq->npipelines; ii++) { 883 lcb::Server *server = static_cast<lcb::Server*>(cmdq->pipelines[ii]); 884 if (server == this) { 885 cmdq->pipelines[ii] = NULL; 886 break; 887 } 888 } 889 } 890 this->instance = NULL; 891 mcreq_pipeline_cleanup(this); 892 893 if (io_timer) { 894 lcbio_timer_destroy(io_timer); 895 } 896 897 delete curhost; 898 lcb_settings_unref(settings); 899 } 900 901 static void 902 close_cb(lcbio_SOCKET *sock, int, void *) 903 { 904 lcbio_ref(sock); 905 lcb::io::Pool::discard(sock); 906 } 907 908 static void 909 on_error(lcbio_CTX *ctx, lcb_error_t err) 910 { 911 Server *server = Server::get(ctx); 912 lcb_log(LOGARGS(server, WARN), LOGFMT "Got socket error %s", LOGID(server), lcb_strerror_short(err)); 913 if (server->check_closed()) { 914 return; 915 } 916 server->socket_failed(err); 917 } 918 919 /**Handle a socket error. This function will close the current connection 920 * and trigger a failout of any pending commands. 921 * This function triggers a configuration refresh */ 922 void 923 Server::socket_failed(lcb_error_t err) 924 { 925 if (check_closed()) { 926 return; 927 } 928 929 purge(err, 0, NULL, REFRESH_ALWAYS); 930 lcb_maybe_breakout(instance); 931 start_errored_ctx(S_ERRDRAIN); 932 } 933 934 void 935 Server::close() 936 { 937 /* Should never be called twice */ 938 lcb_assert(state != Server::S_CLOSED); 939 start_errored_ctx(S_CLOSED); 940 } 941 942 /** 943 * Call to signal an error or similar on the current socket. 944 * @param server The server 945 * @param next_state The next state (S_CLOSED or S_ERRDRAIN) 946 */ 947 void 948 Server::start_errored_ctx(State next_state) 949 { 950 lcbio_CTX *ctx = connctx; 951 952 state = next_state; 953 /* Cancel any pending connection attempt? */ 954 lcb::io::ConnectionRequest::cancel(&connreq); 955 956 /* If the server is being destroyed, silence the timer */ 957 if (next_state == Server::S_CLOSED && io_timer != NULL) { 958 lcbio_timer_destroy(io_timer); 959 io_timer = NULL; 960 } 961 962 if (ctx == NULL) { 963 if (next_state == Server::S_CLOSED) { 964 delete this; 965 return; 966 } else { 967 /* Not closed but don't have a current context */ 968 if (has_pending()) { 969 if (!lcbio_timer_armed(io_timer)) { 970 /* TODO: Maybe throttle reconnection attempts? */ 971 lcbio_timer_rearm(io_timer, default_timeout()); 972 } 973 connect(); 974 } else { 975 // Connect once someone actually wants a connection. 976 flush_start = (mcreq_flushstart_fn)server_connect; 977 } 978 } 979 980 } else { 981 if (ctx->npending) { 982 /* Have pending items? */ 983 984 /* Flush any remaining events */ 985 lcbio_ctx_schedule(ctx); 986 987 /* Close the socket not to leak resources */ 988 lcbio_shutdown(lcbio_ctx_sock(ctx)); 989 if (next_state == Server::S_ERRDRAIN) { 990 flush_start = (mcreq_flushstart_fn)flush_errdrain; 991 } 992 } else { 993 finalize_errored_ctx(); 994 } 995 } 996 } 997 998 /** 999 * This function actually finalizes a ctx which has an error on it. If the 1000 * ctx has pending operations remaining then this function returns immediately. 1001 * Otherwise this will either reinitialize the connection or free the server 1002 * object depending on the actual object state (i.e. if it was closed or 1003 * simply errored). 1004 */ 1005 void 1006 Server::finalize_errored_ctx() 1007 { 1008 if (connctx->npending) { 1009 return; 1010 } 1011 1012 lcb_log(LOGARGS_T(DEBUG), LOGFMT "Finalizing context", LOGID_T()); 1013 1014 /* Always close the existing context. */ 1015 lcbio_ctx_close(connctx, close_cb, NULL); 1016 connctx = NULL; 1017 1018 /**Marks any unflushed data inside this server as being already flushed. This 1019 * should be done within error handling. If subsequent data is flushed on this 1020 * pipeline to the same connection, the results are undefined. */ 1021 1022 unsigned toflush; 1023 nb_IOV iov; 1024 while ((toflush = mcreq_flush_iov_fill(this, &iov, 1, NULL))) { 1025 mcreq_flush_done(this, toflush, toflush); 1026 } 1027 1028 if (state == Server::S_CLOSED) { 1029 /* If the server is closed, time to free it */ 1030 delete this; 1031 } else { 1032 /* Otherwise, cycle the state back to CLEAN and reinit 1033 * the connection */ 1034 state = Server::S_CLEAN; 1035 connect(); 1036 } 1037 } 1038 1039 /** 1040 * This little function checks to see if the server struct is still valid, or 1041 * whether it should just be cleaned once no pending I/O remainds. 1042 * 1043 * If this function returns false then the server is still valid; otherwise it 1044 * is invalid and must not be used further. 1045 */ 1046 bool 1047 Server::check_closed() 1048 { 1049 if (state == Server::S_CLEAN) { 1050 return false; 1051 } 1052 lcb_log(LOGARGS_T(INFO), LOGFMT "Got handler after close. Checking pending calls (pending=%u)", LOGID_T(), connctx->npending); 1053 finalize_errored_ctx(); 1054 return 1; 1055 } 1056