1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2011-2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "internal.h"
19 #include "logging.h"
20 #include "vbucket/aliases.h"
21 #include "settings.h"
22 #include "negotiate.h"
23 #include "bucketconfig/clconfig.h"
24 #include "mc/mcreq-flush-inl.h"
25 #include <lcbio/ssl.h>
26 #include "ctx-log-inl.h"
27 
28 #define LOGARGS(c, lvl) (c)->settings, "server", LCB_LOG_##lvl, __FILE__, __LINE__
29 #define LOGARGS_T(lvl) LOGARGS(this, lvl)
30 
31 #define LOGFMT CTX_LOGFMT_PRE ",SRV=%p,IX=%d) "
32 #define PKTFMT "OP=0x%x, RC=0x%x, SEQ=%u"
33 #define PKTARGS(pkt) (pkt).opcode(), (pkt).status(), (pkt).opaque()
34 
35 #define LOGID(server) CTX_LOGID(server->connctx), (void *)server, server->index
36 #define LOGID_T() LOGID(this)
37 
38 #define MCREQ_MAXIOV 32
39 #define LCBCONN_UNWANT(conn, flags) (conn)->want &= ~(flags)
40 
41 using namespace lcb;
42 
43 static void on_error(lcbio_CTX *ctx, lcb_error_t err);
44 
45 static void
46 on_flush_ready(lcbio_CTX *ctx)
47 {
48     Server *server = Server::get(ctx);
49     nb_IOV iov[MCREQ_MAXIOV] = {};
50     int ready;
51 
52     do {
53         int niov = 0;
54         unsigned nb;
55         nb = mcreq_flush_iov_fill(server, iov, MCREQ_MAXIOV, &niov);
56         if (!nb) {
57             return;
58         }
59 #ifdef LCB_DUMP_PACKETS
60         {
61             char *b64 = NULL;
62             int nb64 = 0;
63             lcb_base64_encode_iov((lcb_IOV *)iov, niov, nb, &b64, &nb64);
64             lcb_log(LOGARGS(server, TRACE), LOGFMT "pkt,snd,fill: size=%d, %.*s", LOGID(server), nb64, nb64, b64);
65             free(b64);
66         }
67 #endif
68         ready = lcbio_ctx_put_ex(ctx, (lcb_IOV *)iov, niov, nb);
69     } while (ready);
70     lcbio_ctx_wwant(ctx);
71 }
72 
73 static void
74 on_flush_done(lcbio_CTX *ctx, unsigned expected, unsigned actual)
75 {
76     Server *server = Server::get(ctx);
77     lcb_U64 now = 0;
78     if (server->settings->readj_ts_wait) {
79         now = gethrtime();
80     }
81 
82 #ifdef LCB_DUMP_PACKETS
83     lcb_log(LOGARGS(server, TRACE), LOGFMT "pkt,snd,flush: expected=%u, actual=%u", LOGID(server), expected, actual);
84 #endif
85     mcreq_flush_done_ex(server, actual, expected, now);
86     server->check_closed();
87 }
88 
89 void
90 Server::flush()
91 {
92     /** Call into the wwant stuff.. */
93     if (!connctx->rdwant) {
94         lcbio_ctx_rwant(connctx, 24);
95     }
96 
97     lcbio_ctx_wwant(connctx);
98     lcbio_ctx_schedule(connctx);
99 
100     if (!lcbio_timer_armed(io_timer)) {
101         /**
102          * XXX: Maybe use get_next_timeout(), although here we can assume
103          * that a command was just scheduled
104          */
105         lcbio_timer_rearm(io_timer, default_timeout());
106     }
107 }
108 
109 LIBCOUCHBASE_API
110 void
111 lcb_sched_flush(lcb_t instance)
112 {
113     for (size_t ii = 0; ii < LCBT_NSERVERS(instance); ii++) {
114         Server *server = instance->get_server(ii);
115 
116         if (!server->has_pending()) {
117             continue;
118         }
119         server->flush_start(server);
120     }
121 }
122 
123 /**
124  * Invoked when get a NOT_MY_VBUCKET response. If the response contains a JSON
125  * payload then we refresh the configuration with it.
126  *
127  * This function returns 1 if the operation was successfully rescheduled;
128  * otherwise it returns 0. If it returns 0 then we give the error back to the
129  * user.
130  */
131 bool
132 Server::handle_nmv(MemcachedResponse& resinfo, mc_PACKET *oldpkt)
133 {
134     protocol_binary_request_header hdr;
135     lcb_error_t err = LCB_ERROR;
136     lcb_U16 vbid;
137     lcb::clconfig::Provider *cccp =
138             instance->confmon->get_provider(lcb::clconfig::CLCONFIG_CCCP);
139 
140     MC_INCR_METRIC(this, packets_nmv, 1);
141 
142     mcreq_read_hdr(oldpkt, &hdr);
143     vbid = ntohs(hdr.request.vbucket);
144     lcb_log(LOGARGS_T(WARN), LOGFMT "NOT_MY_VBUCKET. Packet=%p (S=%u). VBID=%u", LOGID_T(), (void*)oldpkt, oldpkt->opaque, vbid);
145 
146     /* Notify of new map */
147     lcb_vbguess_remap(instance, vbid, index);
148 
149     if (resinfo.vallen() && cccp->enabled) {
150         std::string s(resinfo.value(), resinfo.vallen());
151         err = lcb::clconfig::cccp_update(cccp, curhost->host, s.c_str());
152     }
153 
154     if (err != LCB_SUCCESS) {
155         int bs_options;
156         if (instance->cur_configinfo->get_origin() == lcb::clconfig::CLCONFIG_CCCP) {
157             /**
158              * XXX: Not enough to see if cccp was enabled, since cccp might
159              * be requested by a user, but would still not actually be active
160              * for clusters < 2.5 If our current config is from CCCP
161              * then we can be fairly certain that CCCP is indeed working.
162              *
163              * For this reason, we don't use if (cccp->enabled) {...}
164              */
165             bs_options = BS_REFRESH_THROTTLE;
166         } else {
167             bs_options = BS_REFRESH_ALWAYS;
168         }
169         instance->bootstrap(bs_options);
170     }
171 
172     if (!lcb_should_retry(settings, oldpkt, LCB_NOT_MY_VBUCKET)) {
173         return false;
174     }
175 
176     /** Reschedule the packet again .. */
177     mc_PACKET *newpkt = mcreq_renew_packet(oldpkt);
178     newpkt->flags &= ~MCREQ_STATE_FLAGS;
179     instance->retryq->nmvadd((mc_EXPACKET*)newpkt);
180     return true;
181 }
182 
183 /**
184  * Determine if this is an error code that we can pass to the user, or can
185  * otherwise handle "innately"
186  */
187 static bool is_fastpath_error(uint16_t rc) {
188     switch (rc) {
189     case PROTOCOL_BINARY_RESPONSE_SUCCESS:
190     case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
191     case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
192     case PROTOCOL_BINARY_RESPONSE_E2BIG:
193     case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
194     case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
195     case PROTOCOL_BINARY_RESPONSE_ERANGE:
196     case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
197     case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
198     case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
199     case PROTOCOL_BINARY_RESPONSE_ENOMEM:
200     case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_ENOENT:
201     case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EEXISTS:
202     case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_MISMATCH:
203     case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EINVAL:
204     case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_E2BIG:
205     case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_CANTINSERT:
206     case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_ETOODEEP:
207     case PROTOCOL_BINARY_RESPONSE_SUBDOC_DOC_NOTJSON:
208     case PROTOCOL_BINARY_RESPONSE_SUBDOC_NUM_ERANGE:
209     case PROTOCOL_BINARY_RESPONSE_SUBDOC_DELTA_ERANGE:
210     case PROTOCOL_BINARY_RESPONSE_SUBDOC_INVALID_COMBO:
211     case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE:
212     case PROTOCOL_BINARY_RESPONSE_SUBDOC_SUCCESS_DELETED:
213     case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_INVALID_FLAG_COMBO:
214     case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_INVALID_KEY_COMBO:
215     case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_UNKNOWN_MACRO:
216     case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_UNKNOWN_VATTR:
217     case PROTOCOL_BINARY_RESPONSE_SUBDOC_XATTR_CANT_MODIFY_VATTR:
218     case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE_DELETED:
219     case PROTOCOL_BINARY_RESPONSE_SUBDOC_INVALID_XATTR_ORDER:
220     case PROTOCOL_BINARY_RESPONSE_EACCESS:
221         return true;
222     default:
223         if (rc >= 0xc0 && rc <= 0xcc) {
224             // other subdoc?
225             return true;
226         } else {
227             return false;
228         }
229         break;
230     }
231 }
232 
233 #define ERRMAP_HANDLE_CONTINUE 0
234 #define ERRMAP_HANDLE_DISCONN 1
235 #define ERRMAP_HANDLE_RETRY 2
236 
237 /**
238  * Handle an unknown memcached error
239  *
240  * @param mcresp Response which contains the unknown error
241  * @param[out] newerr more user-friendly based on error map attributes
242  *
243  * @return true if this function handled the error specially (by disconnecting)
244  * or false if normal handling should continue.
245  */
246 int Server::handle_unknown_error(const mc_PACKET *request,
247                                  const MemcachedResponse& mcresp,
248                                  lcb_error_t& newerr) {
249 
250     if (!settings->errmap->isLoaded() || !settings->use_errmap) {
251         // If there's no error map, just return false
252         return ERRMAP_HANDLE_CONTINUE;
253     }
254 
255     // Look up the error map definition for this error
256     const errmap::Error& err = settings->errmap->getError(mcresp.status());
257 
258     if (!err.isValid() || err.hasAttribute(errmap::SPECIAL_HANDLING)) {
259         lcb_log(LOGARGS_T(ERR), LOGFMT "Received error not in error map or requires special handling! " PKTFMT, LOGID_T(), PKTARGS(mcresp));
260         lcbio_ctx_senderr(connctx, LCB_PROTOCOL_ERROR);
261         return ERRMAP_HANDLE_DISCONN;
262     } else {
263         lcb_log(LOGARGS_T(WARN), LOGFMT "Received server error %s (0x%x) on packet: " PKTFMT, LOGID_T(), err.shortname.c_str(), err.code, PKTARGS(mcresp));
264     }
265 
266     if (err.hasAttribute(errmap::FETCH_CONFIG)) {
267         instance->bootstrap(BS_REFRESH_THROTTLE);
268     }
269 
270     if (err.hasAttribute(errmap::TEMPORARY)) {
271         newerr = LCB_GENERIC_TMPERR;
272     }
273 
274     if (err.hasAttribute(errmap::CONSTRAINT_FAILURE)) {
275         newerr = LCB_GENERIC_CONSTRAINT_ERR;
276     }
277 
278     if (err.hasAttribute(errmap::AUTH)) {
279         newerr = LCB_AUTH_ERROR;
280     }
281 
282     if (err.hasAttribute(errmap::SUBDOC) && newerr == LCB_SUCCESS) {
283         newerr = LCB_GENERIC_SUBDOCERR;
284     }
285 
286     /* TODO: remove masking LOCKED in 3.0 release */
287     if (err.hasAttribute(errmap::ITEM_LOCKED)) {
288         switch (mcresp.opcode()) {
289         case PROTOCOL_BINARY_CMD_SET:
290         case PROTOCOL_BINARY_CMD_REPLACE:
291         case PROTOCOL_BINARY_CMD_DELETE:
292             newerr = LCB_KEY_EEXISTS;
293             break;
294         default:
295             newerr = LCB_ETMPFAIL;
296         }
297     }
298 
299     int rv = 0;
300 
301     if (err.hasAttribute(errmap::AUTO_RETRY)) {
302         errmap::RetrySpec *spec = err.getRetrySpec();
303 
304         mc_PACKET *newpkt = mcreq_renew_packet(request);
305         newpkt->flags &= ~MCREQ_STATE_FLAGS;
306         instance->retryq->add((mc_EXPACKET *)newpkt, newerr ? newerr : LCB_ERROR, spec);
307         rv |= ERRMAP_HANDLE_RETRY;
308     }
309 
310     if (err.hasAttribute(errmap::CONN_STATE_INVALIDATED)) {
311         if (newerr != LCB_SUCCESS) {
312             newerr = LCB_ERROR;
313         }
314         lcbio_ctx_senderr(connctx, newerr);
315         rv |= ERRMAP_HANDLE_DISCONN;
316     }
317 
318     return rv;
319 
320 }
321 
322 /* This function is called within a loop to process a single packet.
323  *
324  * If a full packet is available, it will process the packet and return
325  * PKT_READ_COMPLETE, resulting in the `on_read()` function calling this
326  * function in a loop.
327  *
328  * When a complete packet is not available, PKT_READ_PARTIAL will be returned
329  * and the `on_read()` loop will exit, scheduling any required pending I/O.
330  */
331 Server::ReadState
332 Server::try_read(lcbio_CTX *ctx, rdb_IOROPE *ior)
333 {
334     MemcachedResponse mcresp;
335     mc_PACKET *request;
336     unsigned pktsize = 24, is_last = 1;
337 
338     #define RETURN_NEED_MORE(n) \
339         if (has_pending()) { \
340             lcbio_ctx_rwant(ctx, n); \
341         } \
342         return PKT_READ_PARTIAL; \
343 
344     #define DO_ASSIGN_PAYLOAD() \
345         rdb_consumed(ior, mcresp.hdrsize()); \
346         if (mcresp.bodylen()) { \
347             mcresp.payload = rdb_get_consolidated(ior, mcresp.bodylen()); \
348         } {
349 
350     #define DO_SWALLOW_PAYLOAD() \
351         } if (mcresp.bodylen()) { \
352             rdb_consumed(ior, mcresp.bodylen()); \
353         }
354 
355     if (rdb_get_nused(ior) < pktsize) {
356         RETURN_NEED_MORE(pktsize)
357     }
358 
359     MC_INCR_METRIC(this, packets_read, 1);
360 
361     /* copy bytes into the info structure */
362     rdb_copyread(ior, mcresp.hdrbytes(), mcresp.hdrsize());
363 
364     pktsize += mcresp.bodylen();
365     if (rdb_get_nused(ior) < pktsize) {
366         RETURN_NEED_MORE(pktsize);
367     }
368 
369     /* Find the packet */
370     if (mcresp.opcode() == PROTOCOL_BINARY_CMD_STAT && mcresp.keylen() != 0) {
371         is_last = 0;
372         request = mcreq_pipeline_find(this, mcresp.opaque());
373     } else {
374         is_last = 1;
375         request = mcreq_pipeline_remove(this, mcresp.opaque());
376     }
377 
378     if (!request) {
379         MC_INCR_METRIC(this, packets_ownerless, 1);
380         lcb_log(LOGARGS_T(DEBUG), LOGFMT "Server sent us reply for a timed-out command. (OP=0x%x, RC=0x%x, SEQ=%u)", LOGID_T(), mcresp.opcode(), mcresp.status(), mcresp.opaque());
381         rdb_consumed(ior, pktsize);
382         return PKT_READ_COMPLETE;
383     }
384 
385     lcb_error_t err_override = LCB_SUCCESS;
386     ReadState rdstate = PKT_READ_COMPLETE;
387     int unknown_err_rv;
388 
389     /* Check if the status code is one which must be handled carefully by the
390      * client */
391     if (is_fastpath_error(mcresp.status())) {
392         // Nothing here!
393     } else if (mcresp.status() == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
394         /* consume the header */
395         DO_ASSIGN_PAYLOAD()
396         if (!handle_nmv(mcresp, request)) {
397             mcreq_dispatch_response(this, request, &mcresp, LCB_NOT_MY_VBUCKET);
398         }
399         DO_SWALLOW_PAYLOAD()
400         goto GT_DONE;
401     } else if ((unknown_err_rv =
402                 handle_unknown_error(request, mcresp, err_override)) !=
403                         ERRMAP_HANDLE_CONTINUE) {
404         DO_ASSIGN_PAYLOAD()
405         if (!(unknown_err_rv & ERRMAP_HANDLE_RETRY)) {
406             mcreq_dispatch_response(this, request, &mcresp, err_override);
407         }
408         DO_SWALLOW_PAYLOAD()
409         if (unknown_err_rv & ERRMAP_HANDLE_DISCONN) {
410             rdstate = PKT_READ_ABORT;
411         }
412         goto GT_DONE;
413     }
414 
415     /* Figure out if the request is 'ufwd' or not */
416     if (!(request->flags & MCREQ_F_UFWD)) {
417         DO_ASSIGN_PAYLOAD();
418         mcresp.bufh = rdb_get_first_segment(ior);
419         mcreq_dispatch_response(this, request, &mcresp, err_override);
420         DO_SWALLOW_PAYLOAD()
421 
422     } else {
423         /* figure out how many buffers we want to use as an upper limit for the
424          * IOV arrays. Currently we'll keep it simple and ensure the entire
425          * response is contiguous. */
426         lcb_PKTFWDRESP resp = { 0 }; /* TODO: next ABI version should include is_last flag */
427         rdb_ROPESEG *segs;
428         nb_IOV iov;
429 
430         rdb_consolidate(ior, pktsize);
431         rdb_refread_ex(ior, &iov, &segs, 1, pktsize);
432 
433         resp.bufs = &segs;
434         resp.iovs = (lcb_IOV*)&iov;
435         resp.nitems = 1;
436         resp.header = mcresp.hdrbytes();
437         instance->callbacks.pktfwd(
438             instance, MCREQ_PKT_COOKIE(request), LCB_SUCCESS, &resp);
439         rdb_consumed(ior, pktsize);
440     }
441 
442     GT_DONE:
443     if (is_last) {
444         mcreq_packet_handled(this, request);
445     }
446     return rdstate;
447 }
448 
449 static void
450 on_read(lcbio_CTX *ctx, unsigned)
451 {
452     Server *server = Server::get(ctx);
453     rdb_IOROPE *ior = &ctx->ior;
454 
455     if (server->check_closed()) {
456         return;
457     }
458 
459     Server::ReadState rv;
460     while ((rv = server->try_read(ctx, ior)) == Server::PKT_READ_COMPLETE);
461     lcbio_ctx_schedule(ctx);
462     lcb_maybe_breakout(server->instance);
463 }
464 
465 static void flush_noop(mc_PIPELINE *pipeline) {
466     (void)pipeline;
467 }
468 
469 static void server_connect(Server *server) {
470     server->connect();
471 }
472 
473 bool
474 Server::maybe_retry_packet(mc_PACKET *pkt, lcb_error_t err)
475 {
476     lcbvb_DISTMODE dist_t = lcbvb_get_distmode(parent->config);
477 
478     if (dist_t != LCBVB_DIST_VBUCKET) {
479         /** memcached bucket */
480         return false;
481     }
482     if (!lcb_should_retry(settings, pkt, err)) {
483         return false;
484     }
485 
486     mc_PACKET *newpkt = mcreq_renew_packet(pkt);
487     newpkt->flags &= ~MCREQ_STATE_FLAGS;
488     // TODO: Load the 4th argument from the error map
489     instance->retryq->add((mc_EXPACKET *)newpkt, err, NULL);
490     return true;
491 }
492 
493 static void
494 fail_callback(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err, void *) {
495     static_cast<Server*>(pipeline)->purge_single(pkt, err);
496 }
497 
498 static const char *opcode_name(uint8_t code)
499 {
500     switch (code) {
501         case PROTOCOL_BINARY_CMD_GET:
502             return "get";
503         case PROTOCOL_BINARY_CMD_SET:
504             return "set";
505         case PROTOCOL_BINARY_CMD_ADD:
506             return "add";
507         case PROTOCOL_BINARY_CMD_REPLACE:
508             return "replace";
509         case PROTOCOL_BINARY_CMD_DELETE:
510             return "delete";
511         case PROTOCOL_BINARY_CMD_INCREMENT:
512             return "incr";
513         case PROTOCOL_BINARY_CMD_DECREMENT:
514             return "decr";
515         case PROTOCOL_BINARY_CMD_FLUSH:
516             return "flush";
517         case PROTOCOL_BINARY_CMD_GETQ:
518             return "getq";
519         case PROTOCOL_BINARY_CMD_NOOP:
520             return "noop";
521         case PROTOCOL_BINARY_CMD_VERSION:
522             return "version";
523         case PROTOCOL_BINARY_CMD_APPEND:
524             return "append";
525         case PROTOCOL_BINARY_CMD_PREPEND:
526             return "prepend";
527         case PROTOCOL_BINARY_CMD_STAT:
528             return "stat";
529         case PROTOCOL_BINARY_CMD_VERBOSITY:
530             return "verbosity";
531         case PROTOCOL_BINARY_CMD_TOUCH:
532             return "touch";
533         case PROTOCOL_BINARY_CMD_GAT:
534             return "gat";
535         case PROTOCOL_BINARY_CMD_HELLO:
536             return "hello";
537         case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
538             return "sasl_list_mechs";
539         case PROTOCOL_BINARY_CMD_SASL_AUTH:
540             return "sasl_auth";
541         case PROTOCOL_BINARY_CMD_SASL_STEP:
542             return "sasl_step";
543         case PROTOCOL_BINARY_CMD_GET_REPLICA:
544             return "get_replica";
545         case PROTOCOL_BINARY_CMD_SELECT_BUCKET:
546             return "select_bucket";
547         case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
548             return "observe_seqno";
549         case PROTOCOL_BINARY_CMD_OBSERVE:
550             return "observe";
551         case PROTOCOL_BINARY_CMD_GET_LOCKED:
552             return "get_locked";
553         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
554             return "unlock_key";
555         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
556             return "get_cluster_config";
557         case PROTOCOL_BINARY_CMD_SUBDOC_GET:
558             return "subdoc_get";
559         case PROTOCOL_BINARY_CMD_SUBDOC_EXISTS:
560             return "subdoc_exists";
561         case PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD:
562             return "subdoc_dict_add";
563         case PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT:
564             return "subdoc_dict_upsert";
565         case PROTOCOL_BINARY_CMD_SUBDOC_DELETE:
566             return "subdoc_delete";
567         case PROTOCOL_BINARY_CMD_SUBDOC_REPLACE:
568             return "subdoc_replace";
569         case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST:
570             return "subdoc_array_push_last";
571         case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST:
572             return "subdoc_array_push_first";
573         case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT:
574             return "subdoc_array_insert";
575         case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE:
576             return "subdoc_array_add_unique";
577         case PROTOCOL_BINARY_CMD_SUBDOC_COUNTER:
578             return "subdoc_counter";
579         case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP:
580             return "subdoc_multi_lookup";
581         case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION:
582             return "subdoc_multi_mutation";
583         case PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT:
584             return "subdoc_get_count";
585         case PROTOCOL_BINARY_CMD_GET_ERROR_MAP:
586             return "get_error_map";
587         default:
588             return "unknown";
589     }
590 }
591 
592 void Server::purge_single(mc_PACKET *pkt, lcb_error_t err) {
593     if (maybe_retry_packet(pkt, err)) {
594         return;
595     }
596 
597     if (err == LCB_AUTH_ERROR) {
598         /* In-situ auth errors are actually dead servers. Let's provide this
599          * as the actual error code. */
600         err = LCB_MAP_CHANGED;
601     }
602 
603     if (err == LCB_ETIMEDOUT) {
604         lcb_error_t tmperr = lcb::RetryQueue::error_for(pkt);
605         if (tmperr != LCB_SUCCESS) {
606             err = tmperr;
607         }
608     }
609 
610     protocol_binary_request_header hdr;
611     memcpy(hdr.bytes, SPAN_BUFFER(&pkt->kh_span), sizeof(hdr.bytes));
612     MemcachedResponse resp(protocol_binary_command(hdr.request.opcode),
613                            hdr.request.opaque,
614                            PROTOCOL_BINARY_RESPONSE_EINVAL);
615 
616 #ifdef LCB_TRACING
617     lcbtrace_span_set_orphaned(MCREQ_PKT_RDATA(pkt)->span, true);
618 #endif
619     if (err == LCB_ETIMEDOUT && settings->use_tracing) {
620         Json::Value info;
621 
622         char opid[30] = {};
623         snprintf(opid, sizeof(opid), "kv:%s", opcode_name(hdr.request.opcode));
624         info["s"] = opid;
625         info["b"] = settings->bucket;
626         info["t"] = settings->operation_timeout;
627 
628         const lcb_host_t &remote = get_host();
629         std::string rhost;
630         if (remote.ipv6) {
631             rhost.append("[").append(remote.host).append("]:").append(remote.port);
632         } else {
633             rhost.append(remote.host).append(":").append(remote.port);
634         }
635         info["r"] = rhost.c_str();
636 
637         if (connctx) {
638             char local_id[54] = {};
639             snprintf(local_id, sizeof(local_id), "%016" PRIx64 "/%016" PRIx64 "/%x",
640                      (lcb_U64)settings->iid, connctx->sock->id, (int)pkt->opaque);
641             info["i"] = local_id;
642             info["l"] = connctx->sock->info->ep_local;
643         }
644         std::string msg(Json::FastWriter().write(info));
645         if (msg.size() > 1) {
646             lcb_log(LOGARGS(instance, WARN), "Failing command with error %s: %.*s",
647                     lcb_strerror_short(err), (int)(msg.size() - 1), msg.c_str());
648         }
649     } else {
650         lcb_log(LOGARGS_T(WARN), LOGFMT "Failing command (pkt=%p, opaque=%lu, opcode=0x%x) with error %s", LOGID_T(), (void*)pkt, (unsigned long)pkt->opaque, hdr.request.opcode, lcb_strerror_short(err));
651     }
652     int rv = mcreq_dispatch_response(this, pkt, &resp, err);
653     lcb_assert(rv == 0);
654 }
655 
656 int
657 Server::purge(lcb_error_t error, hrtime_t thresh, hrtime_t *next,
658               RefreshPolicy policy)
659 {
660     unsigned affected;
661 
662     if (thresh) {
663         affected = mcreq_pipeline_timeout(
664                 this, error, fail_callback, NULL, thresh, next);
665 
666     } else {
667         mcreq_pipeline_fail(this, error, fail_callback, NULL);
668         affected = -1;
669     }
670 
671     MC_INCR_METRIC(this, packets_errored, affected);
672     if (policy == REFRESH_NEVER) {
673         return affected;
674     }
675 
676     if (affected || policy == REFRESH_ALWAYS) {
677         instance->bootstrap(BS_REFRESH_THROTTLE|BS_REFRESH_INCRERR);
678     }
679     return affected;
680 }
681 
682 static void flush_errdrain(mc_PIPELINE *pipeline)
683 {
684     /* Called when we are draining errors. */
685     Server *server = (Server *)pipeline;
686     if (!lcbio_timer_armed(server->io_timer)) {
687         lcbio_timer_rearm(server->io_timer, server->default_timeout());
688     }
689 }
690 
691 uint32_t
692 Server::next_timeout() const
693 {
694     hrtime_t now, expiry, diff;
695     mc_PACKET *pkt = mcreq_first_packet(this);
696 
697     if (!pkt) {
698         return default_timeout();
699     }
700 
701     now = gethrtime();
702     expiry = MCREQ_PKT_RDATA(pkt)->start + LCB_US2NS(default_timeout());
703     if (expiry <= now) {
704         diff = 0;
705     } else {
706         diff = expiry - now;
707     }
708 
709     return LCB_NS2US(diff);
710 }
711 
712 static void
713 timeout_server(void *arg)
714 {
715     reinterpret_cast<Server*>(arg)->io_timeout();
716 }
717 
718 void Server::io_timeout()
719 {
720     hrtime_t now = gethrtime();
721     hrtime_t min_valid = now - LCB_US2NS(default_timeout());
722 
723     hrtime_t next_ns;
724     int npurged = purge(LCB_ETIMEDOUT, min_valid, &next_ns,
725                         Server::REFRESH_ONFAILED);
726     if (npurged) {
727         MC_INCR_METRIC(this, packets_timeout, npurged);
728         lcb_log(LOGARGS_T(DEBUG), LOGFMT "Server timed out. Some commands have failed", LOGID_T());
729     }
730 
731     uint32_t next_us = next_timeout();
732     lcb_log(LOGARGS_T(TRACE), LOGFMT "Scheduling next timeout for %u ms. This is not an error", LOGID_T(), next_us / 1000);
733     lcbio_timer_rearm(io_timer, next_us);
734     lcb_maybe_breakout(instance);
735 }
736 
737 bool
738 Server::maybe_reconnect_on_fake_timeout(lcb_error_t err)
739 {
740     if (err != LCB_ETIMEDOUT) {
741         return false; /* not a timeout */
742     }
743     if (!settings->readj_ts_wait) {
744         return false; /* normal timeout behavior */
745     }
746     if (!has_pending()) {
747         return false; /* nothing pending */
748     }
749 
750     uint32_t next_tmo = next_timeout();
751     if (next_tmo < default_timeout() / 2) {
752         /* Ideally we'd have a fuzz interval to shave off the actual timeout,
753          * since there will inevitably be some time taken off the next timeout */
754         return false;
755     }
756 
757     lcb_log(LOGARGS_T(INFO), LOGFMT "Retrying connection. Assuming timeout because of stalled event loop", LOGID_T());
758     connect();
759     return true;
760 }
761 
762 static void
763 on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR syserr)
764 {
765     Server *server = reinterpret_cast<Server*>(data);
766     server->handle_connected(sock, err, syserr);
767 }
768 
769 static void mcserver_flush(Server *s) { s->flush(); }
770 
771 void
772 Server::handle_connected(lcbio_SOCKET *sock, lcb_error_t err, lcbio_OSERR syserr)
773 {
774     connreq = NULL;
775 
776     if (err != LCB_SUCCESS) {
777         lcb_log(LOGARGS_T(ERR), LOGFMT "Connection attempt failed. Received %s from libcouchbase, received %d from operating system", LOGID_T(), lcb_strerror_short(err), syserr);
778         MC_INCR_METRIC(this, iometrics.io_error, 1);
779         if (!maybe_reconnect_on_fake_timeout(err)) {
780             socket_failed(err);
781         }
782         return;
783     }
784 
785     lcb_assert(sock);
786     if (metrics) {
787         lcbio_set_metrics(sock, &metrics->iometrics);
788     }
789 
790     /** Do we need sasl? */
791     SessionInfo* sessinfo = SessionInfo::get(sock);
792     if (sessinfo == NULL) {
793         lcb_log(LOGARGS_T(TRACE), "<%s:%s> (SRV=%p) Session not yet negotiated. Negotiating", curhost->host, curhost->port, (void*)this);
794         connreq = SessionRequest::start(
795             sock, settings, default_timeout(), on_connected, this);
796         return;
797     } else {
798         jsonsupport = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_JSON);
799         compsupport = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_SNAPPY);
800         mutation_tokens = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_MUTATION_SEQNO);
801     }
802 
803     lcbio_CTXPROCS procs;
804     procs.cb_err = on_error;
805     procs.cb_read = on_read;
806     procs.cb_flush_done = on_flush_done;
807     procs.cb_flush_ready = on_flush_ready;
808     connctx = lcbio_ctx_new(sock, this, &procs);
809     connctx->subsys = "memcached";
810     sock->service = LCBIO_SERVICE_KV;
811     flush_start = (mcreq_flushstart_fn)mcserver_flush;
812 
813     uint32_t tmo = next_timeout();
814     lcbio_timer_rearm(io_timer, tmo);
815     flush();
816 }
817 
818 void
819 Server::connect()
820 {
821     connreq = instance->memd_sockpool->get(*curhost,
822         default_timeout(), on_connected, this);
823     flush_start = flush_noop;
824     state = Server::S_CLEAN;
825 }
826 
827 static void
828 buf_done_cb(mc_PIPELINE *pl, const void *cookie, void *, void *)
829 {
830     Server *server = static_cast<Server*>(pl);
831     server->instance->callbacks.pktflushed(server->instance, cookie);
832 }
833 
834 Server::Server(lcb_t instance_, int ix)
835     : mc_PIPELINE(), state(S_CLEAN),
836       io_timer(lcbio_timer_new(instance_->iotable, this, timeout_server)),
837       instance(instance_),
838       settings(lcb_settings_ref2(instance_->settings)),
839       compsupport(0),
840       jsonsupport(0),
841       mutation_tokens(0),
842       connctx(NULL),
843       curhost(new lcb_host_t())
844 {
845     mcreq_pipeline_init(this);
846     flush_start = (mcreq_flushstart_fn)server_connect;
847     buf_done_callback = buf_done_cb;
848     index = ix;
849 
850     std::memset(&connreq, 0, sizeof connreq);
851     std::memset(curhost, 0, sizeof *curhost);
852 
853     const char *datahost = lcbvb_get_hostport(
854         LCBT_VBCONFIG(instance), ix,
855         LCBVB_SVCTYPE_DATA, LCBT_SETTING_SVCMODE(instance));
856     if (datahost) {
857         lcb_host_parsez(curhost, datahost, LCB_CONFIG_MCD_PORT);
858     }
859 
860     if (settings->metrics) {
861         /** Allocate / reinitialize the metrics here */
862         metrics = lcb_metrics_getserver(settings->metrics, curhost->host, curhost->port, 1);
863         lcb_metrics_reset_pipeline_gauges(metrics);
864     }
865 }
866 
867 Server::Server()
868     : state(S_TEMPORARY),
869       io_timer(NULL), instance(NULL), settings(NULL), compsupport(0), jsonsupport(0),
870       mutation_tokens(0), connctx(NULL), connreq(NULL), curhost(NULL)
871 {
872 }
873 
874 Server::~Server() {
875     if (state == S_TEMPORARY) {
876         return;
877     }
878 
879     if (this->instance) {
880         unsigned ii;
881         mc_CMDQUEUE *cmdq = &this->instance->cmdq;
882         for (ii = 0; ii < cmdq->npipelines; ii++) {
883             lcb::Server *server = static_cast<lcb::Server*>(cmdq->pipelines[ii]);
884             if (server == this) {
885                 cmdq->pipelines[ii] = NULL;
886                 break;
887             }
888         }
889     }
890     this->instance = NULL;
891     mcreq_pipeline_cleanup(this);
892 
893     if (io_timer) {
894         lcbio_timer_destroy(io_timer);
895     }
896 
897     delete curhost;
898     lcb_settings_unref(settings);
899 }
900 
901 static void
902 close_cb(lcbio_SOCKET *sock, int, void *)
903 {
904     lcbio_ref(sock);
905     lcb::io::Pool::discard(sock);
906 }
907 
908 static void
909 on_error(lcbio_CTX *ctx, lcb_error_t err)
910 {
911     Server *server = Server::get(ctx);
912     lcb_log(LOGARGS(server, WARN), LOGFMT "Got socket error %s", LOGID(server), lcb_strerror_short(err));
913     if (server->check_closed()) {
914         return;
915     }
916     server->socket_failed(err);
917 }
918 
919 /**Handle a socket error. This function will close the current connection
920  * and trigger a failout of any pending commands.
921  * This function triggers a configuration refresh */
922 void
923 Server::socket_failed(lcb_error_t err)
924 {
925     if (check_closed()) {
926         return;
927     }
928 
929     purge(err, 0, NULL, REFRESH_ALWAYS);
930     lcb_maybe_breakout(instance);
931     start_errored_ctx(S_ERRDRAIN);
932 }
933 
934 void
935 Server::close()
936 {
937     /* Should never be called twice */
938     lcb_assert(state != Server::S_CLOSED);
939     start_errored_ctx(S_CLOSED);
940 }
941 
942 /**
943  * Call to signal an error or similar on the current socket.
944  * @param server The server
945  * @param next_state The next state (S_CLOSED or S_ERRDRAIN)
946  */
947 void
948 Server::start_errored_ctx(State next_state)
949 {
950     lcbio_CTX *ctx = connctx;
951 
952     state = next_state;
953     /* Cancel any pending connection attempt? */
954     lcb::io::ConnectionRequest::cancel(&connreq);
955 
956     /* If the server is being destroyed, silence the timer */
957     if (next_state == Server::S_CLOSED && io_timer != NULL) {
958         lcbio_timer_destroy(io_timer);
959         io_timer = NULL;
960     }
961 
962     if (ctx == NULL) {
963         if (next_state == Server::S_CLOSED) {
964             delete this;
965             return;
966         } else {
967             /* Not closed but don't have a current context */
968             if (has_pending()) {
969                 if (!lcbio_timer_armed(io_timer)) {
970                     /* TODO: Maybe throttle reconnection attempts? */
971                     lcbio_timer_rearm(io_timer, default_timeout());
972                 }
973                 connect();
974             } else {
975                 // Connect once someone actually wants a connection.
976                 flush_start = (mcreq_flushstart_fn)server_connect;
977             }
978         }
979 
980     } else {
981         if (ctx->npending) {
982             /* Have pending items? */
983 
984             /* Flush any remaining events */
985             lcbio_ctx_schedule(ctx);
986 
987             /* Close the socket not to leak resources */
988             lcbio_shutdown(lcbio_ctx_sock(ctx));
989             if (next_state == Server::S_ERRDRAIN) {
990                 flush_start = (mcreq_flushstart_fn)flush_errdrain;
991             }
992         } else {
993             finalize_errored_ctx();
994         }
995     }
996 }
997 
998 /**
999  * This function actually finalizes a ctx which has an error on it. If the
1000  * ctx has pending operations remaining then this function returns immediately.
1001  * Otherwise this will either reinitialize the connection or free the server
1002  * object depending on the actual object state (i.e. if it was closed or
1003  * simply errored).
1004  */
1005 void
1006 Server::finalize_errored_ctx()
1007 {
1008     if (connctx->npending) {
1009         return;
1010     }
1011 
1012     lcb_log(LOGARGS_T(DEBUG), LOGFMT "Finalizing context", LOGID_T());
1013 
1014     /* Always close the existing context. */
1015     lcbio_ctx_close(connctx, close_cb, NULL);
1016     connctx = NULL;
1017 
1018     /**Marks any unflushed data inside this server as being already flushed. This
1019      * should be done within error handling. If subsequent data is flushed on this
1020      * pipeline to the same connection, the results are undefined. */
1021 
1022     unsigned toflush;
1023     nb_IOV iov;
1024     while ((toflush = mcreq_flush_iov_fill(this, &iov, 1, NULL))) {
1025         mcreq_flush_done(this, toflush, toflush);
1026     }
1027 
1028     if (state == Server::S_CLOSED) {
1029         /* If the server is closed, time to free it */
1030         delete this;
1031     } else {
1032         /* Otherwise, cycle the state back to CLEAN and reinit
1033          * the connection */
1034         state = Server::S_CLEAN;
1035         connect();
1036     }
1037 }
1038 
1039 /**
1040  * This little function checks to see if the server struct is still valid, or
1041  * whether it should just be cleaned once no pending I/O remainds.
1042  *
1043  * If this function returns false then the server is still valid; otherwise it
1044  * is invalid and must not be used further.
1045  */
1046 bool
1047 Server::check_closed()
1048 {
1049     if (state == Server::S_CLEAN) {
1050         return false;
1051     }
1052     lcb_log(LOGARGS_T(INFO), LOGFMT "Got handler after close. Checking pending calls (pending=%u)", LOGID_T(), connctx->npending);
1053     finalize_errored_ctx();
1054     return 1;
1055 }
1056