1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "internal.h"
19 #include <libcouchbase/n1ql.h>
20 #include "http/http.h"
21 #include "auth-priv.h"
22 
23 static void refcnt_dtor_ping(mc_PACKET *);
24 static void handle_ping(mc_PIPELINE *, mc_PACKET *, lcb_error_t, const void *);
25 
26 static mc_REQDATAPROCS ping_procs = {
27     handle_ping,
28     refcnt_dtor_ping
29 };
30 
31 struct PingCookie : mc_REQDATAEX {
32     int remaining;
33     int options;
34     std::list<lcb_PINGSVC> responses;
35     std::string id;
36 
PingCookiePingCookie37     PingCookie(const void *cookie_, int _options)
38         : mc_REQDATAEX(cookie_, ping_procs, gethrtime()),
39         remaining(0), options(_options) {
40     }
41 
~PingCookiePingCookie42     ~PingCookie() {
43         for (std::list<lcb_PINGSVC>::iterator it = responses.begin(); it != responses.end(); it++) {
44             if (it->server) {
45                 free((void *)it->server);
46                 it->server = NULL;
47                 free((void *)it->local);
48                 it->local = NULL;
49                 free((void *)it->id);
50                 it->id = NULL;
51             }
52         }
53     }
54 
needMetricsPingCookie55     bool needMetrics() {
56         return (options & LCB_PINGOPT_F_NOMETRICS) == 0;
57     }
58 
needJSONPingCookie59     bool needJSON() {
60         return options & LCB_PINGOPT_F_JSON;
61     }
62 
needDetailsPingCookie63     bool needDetails() {
64         return options & LCB_PINGOPT_F_JSONDETAILS;
65     }
66 
needPrettyPingCookie67     bool needPretty() {
68         return options & LCB_PINGOPT_F_JSONPRETTY;
69     }
70 };
71 
72 static void
refcnt_dtor_ping(mc_PACKET * pkt)73 refcnt_dtor_ping(mc_PACKET *pkt)
74 {
75     PingCookie *ck = static_cast<PingCookie *>(pkt->u_rdata.exdata);
76     if (!--ck->remaining) {
77         delete ck;
78     }
79 }
80 
svc_to_string(lcb_PINGSVCTYPE type)81 static const char* svc_to_string(lcb_PINGSVCTYPE type)
82 {
83     switch (type) {
84     case LCB_PINGSVC_KV:
85         return "kv";
86     case LCB_PINGSVC_VIEWS:
87         return "views";
88     case LCB_PINGSVC_N1QL:
89         return "n1ql";
90     case LCB_PINGSVC_ANALYTICS:
91         return "analytics";
92     case LCB_PINGSVC_FTS:
93         return "fts";
94     default:
95         return "unknown";
96     }
97 }
98 
99 static void
build_ping_json(lcb_t instance,lcb_RESPPING & ping,Json::Value & root,PingCookie * ck)100 build_ping_json(lcb_t instance, lcb_RESPPING &ping, Json::Value &root, PingCookie *ck)
101 {
102     Json::Value services;
103     for (size_t ii = 0; ii < ping.nservices; ii++) {
104         lcb_PINGSVC &svc = ping.services[ii];
105         Json::Value service;
106         service["remote"] = svc.server;
107         if (svc.local) {
108             service["local"] = svc.local;
109         }
110         if (svc.id) {
111             service["id"] = svc.id;
112         }
113         if (svc.scope) {
114             service["scope"] = svc.scope;
115         }
116 
117         service["latency_us"] = (Json::Value::UInt64)LCB_NS2US(svc.latency);
118         switch (svc.status) {
119         case LCB_PINGSTATUS_OK:
120             service["status"] = "ok";
121             break;
122         case LCB_PINGSTATUS_TIMEOUT:
123             service["status"] = "timeout";
124             break;
125         default:
126             service["status"] = "error";
127             if (ck->needDetails()) {
128                 service["details"] = lcb_strerror_long(svc.rc);
129             }
130         }
131         services[svc_to_string(svc.type)].append(service);
132     }
133     root["services"] = services;
134     root["version"] = 1;
135 
136     std::string sdk("libcouchbase/" LCB_VERSION_STRING);
137     if (LCBT_SETTING(instance, client_string)) {
138         sdk.append(" ").append(LCBT_SETTING(instance, client_string));
139     }
140     root["sdk"] = sdk.c_str();
141     root["id"] = ck->id;
142 
143     int config_rev = -1;
144     if (instance->cur_configinfo) {
145         lcb::clconfig::ConfigInfo *cfg = instance->cur_configinfo;
146         config_rev = cfg->vbc->revid;
147     }
148     root["config_rev"] = config_rev;
149 }
150 
151 static void
invoke_ping_callback(lcb_t instance,PingCookie * ck)152 invoke_ping_callback(lcb_t instance, PingCookie *ck)
153 {
154     lcb_RESPPING ping;
155     std::string json;
156     size_t idx = 0;
157     memset(&ping, 0, sizeof(ping));
158     if (ck->needMetrics()) {
159         ping.nservices = ck->responses.size();
160         ping.services = new lcb_PINGSVC[ping.nservices];
161         for(std::list<lcb_PINGSVC>::const_iterator it = ck->responses.begin(); it != ck->responses.end(); ++it){
162             ping.services[idx++] = *it;
163         }
164         if (ck->needJSON()) {
165             Json::Value root;
166             build_ping_json(instance, ping, root, ck);
167             Json::Writer *w;
168             if (ck->needPretty()) {
169                 w = new Json::StyledWriter();
170             } else {
171                 w = new Json::FastWriter();
172             }
173             json = w->write(root);
174             delete w;
175             ping.njson = json.size();
176             ping.json = json.c_str();
177         }
178     }
179     lcb_RESPCALLBACK callback;
180     callback = lcb_find_callback(instance, LCB_CALLBACK_PING);
181     ping.cookie = const_cast<void*>(ck->cookie);
182     callback(instance, LCB_CALLBACK_PING, (lcb_RESPBASE *)&ping);
183     if (ping.services != NULL) {
184         delete []ping.services;
185     }
186     delete ck;
187 }
188 
189 static void
handle_ping(mc_PIPELINE * pipeline,mc_PACKET * req,lcb_error_t err,const void *)190 handle_ping(mc_PIPELINE *pipeline, mc_PACKET *req, lcb_error_t err, const void *)
191 {
192     lcb::Server *server = static_cast<lcb::Server*>(pipeline);
193     PingCookie *ck = (PingCookie *)req->u_rdata.exdata;
194 
195     if (ck->needMetrics()) {
196         const lcb_host_t &remote = server->get_host();
197         std::string hh;
198         if (remote.ipv6) {
199             hh.append("[").append(remote.host).append("]:").append(remote.port);
200         } else {
201             hh.append(remote.host).append(":").append(remote.port);
202         }
203         lcb_PINGSVC svc = {};
204         svc.type = LCB_PINGSVC_KV;
205         svc.server = strdup(hh.c_str());
206         svc.latency = gethrtime() - MCREQ_PKT_RDATA(req)->start;
207         svc.rc = err;
208         switch (err) {
209         case LCB_ETIMEDOUT:
210             svc.status = LCB_PINGSTATUS_TIMEOUT;
211             break;
212         case LCB_SUCCESS:
213             svc.status = LCB_PINGSTATUS_OK;
214             break;
215         default:
216             svc.status = LCB_PINGSTATUS_ERROR;
217             break;
218         }
219         lcbio_CTX *ctx = server->connctx;
220         if (ctx) {
221             char id[20] = {0};
222             svc.local = strdup(ctx->sock->info->ep_local);
223             snprintf(id, sizeof(id), "%p", (void *)ctx->sock);
224             svc.id = strdup(id);
225         }
226         svc.scope = server->get_instance()->get_bucketname();
227 
228         ck->responses.push_back(svc);
229     }
230 
231     if (--ck->remaining) {
232         return;
233     }
234     invoke_ping_callback(server->get_instance(), ck);
235 }
236 
handle_http(lcb_t instance,lcb_PINGSVCTYPE type,const lcb_RESPHTTP * resp)237 static void handle_http(lcb_t instance, lcb_PINGSVCTYPE type, const lcb_RESPHTTP *resp)
238 {
239     if ((resp->rflags & LCB_RESP_F_FINAL) == 0) {
240         return;
241     }
242     PingCookie *ck = (PingCookie *)resp->cookie;
243     lcb::http::Request *htreq = reinterpret_cast<lcb::http::Request*>(resp->_htreq);
244 
245     if (ck->needMetrics()) {
246         lcb_PINGSVC svc = {};
247         svc.type = type;
248         std::string hh;
249         if (htreq->ipv6) {
250             hh = "[" + std::string(htreq->host) + "]:" + std::string(htreq->port);
251         } else {
252             hh = std::string(htreq->host) + ":" + std::string(htreq->port);
253         }
254         svc.server = strdup(hh.c_str());
255         svc.latency = gethrtime() - htreq->start;
256         svc.rc = resp->rc;
257         switch (resp->rc) {
258         case LCB_ETIMEDOUT:
259             svc.status = LCB_PINGSTATUS_TIMEOUT;
260             break;
261         case LCB_SUCCESS:
262             svc.status = LCB_PINGSTATUS_OK;
263             break;
264         default:
265             svc.status = LCB_PINGSTATUS_ERROR;
266             break;
267         }
268         lcbio_CTX *ctx = htreq->ioctx;
269         if (ctx) {
270             char id[20] = {0};
271             snprintf(id, sizeof(id), "%p", (void *)ctx->sock);
272             svc.id = strdup(id);
273             svc.local = strdup(ctx->sock->info->ep_local);
274         }
275         ck->responses.push_back(svc);
276     }
277     if (--ck->remaining) {
278         return;
279     }
280     invoke_ping_callback(instance, ck);
281 }
282 
handle_n1ql(lcb_t instance,int,const lcb_RESPBASE * resp)283 static void handle_n1ql(lcb_t instance, int, const lcb_RESPBASE *resp)
284 {
285     handle_http(instance, LCB_PINGSVC_N1QL, (const lcb_RESPHTTP *)resp);
286 }
287 
handle_analytics(lcb_t instance,int,const lcb_RESPBASE * resp)288 static void handle_analytics(lcb_t instance, int, const lcb_RESPBASE *resp)
289 {
290   handle_http(instance, LCB_PINGSVC_ANALYTICS, (const lcb_RESPHTTP *)resp);
291 }
292 
handle_views(lcb_t instance,int,const lcb_RESPBASE * resp)293 static void handle_views(lcb_t instance, int, const lcb_RESPBASE *resp)
294 {
295     handle_http(instance, LCB_PINGSVC_VIEWS, (const lcb_RESPHTTP *)resp);
296 }
297 
handle_fts(lcb_t instance,int,const lcb_RESPBASE * resp)298 static void handle_fts(lcb_t instance, int, const lcb_RESPBASE *resp)
299 {
300     handle_http(instance, LCB_PINGSVC_FTS, (const lcb_RESPHTTP *)resp);
301 }
302 
303 LIBCOUCHBASE_API
304 lcb_error_t
lcb_ping3(lcb_t instance,const void * cookie,const lcb_CMDPING * cmd)305 lcb_ping3(lcb_t instance, const void *cookie, const lcb_CMDPING *cmd)
306 {
307     mc_CMDQUEUE *cq = &instance->cmdq;
308     unsigned ii;
309 
310     if (!cq->config) {
311         return LCB_CLIENT_ETMPFAIL;
312     }
313 
314     PingCookie *ckwrap = new PingCookie(cookie, cmd->options);
315     {
316         char id[20] = {0};
317         snprintf(id, sizeof(id), "%p", (void *)instance);
318         ckwrap->id = id;
319         if (cmd->id) {
320             ckwrap->id.append("/").append(cmd->id);
321         }
322     }
323 
324     lcbvb_CONFIG *cfg = LCBT_VBCONFIG(instance);
325     const lcbvb_SVCMODE mode = LCBT_SETTING_SVCMODE(instance);
326     if (cmd->services & LCB_PINGSVC_F_KV) {
327         for (ii = 0; ii < cq->npipelines; ii++) {
328             unsigned port = lcbvb_get_port(cfg, ii, LCBVB_SVCTYPE_DATA, mode);
329             if (!port) {
330                 continue;
331             }
332 
333             mc_PIPELINE *pl = cq->pipelines[ii];
334             mc_PACKET *pkt = mcreq_allocate_packet(pl);
335             protocol_binary_request_header hdr;
336             memset(&hdr, 0, sizeof(hdr));
337 
338             if (!pkt) {
339                 return LCB_CLIENT_ENOMEM;
340             }
341 
342             pkt->u_rdata.exdata = ckwrap;
343             pkt->flags |= MCREQ_F_REQEXT;
344 
345             hdr.request.magic = PROTOCOL_BINARY_REQ;
346             hdr.request.opaque = pkt->opaque;
347             hdr.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
348 
349             mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE);
350             memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
351             mcreq_sched_add(pl, pkt);
352             ckwrap->remaining++;
353         }
354     }
355 
356     for (int idx = 0; idx < (int)LCBVB_NSERVERS(cfg); idx++) {
357 #define PING_HTTP(SVC, PATH, TMO, CB) \
358             lcb_error_t rc; \
359             struct lcb_http_request_st *htreq; \
360             lcb_CMDHTTP htcmd = {0}; \
361             char buf[1024] = {0}; \
362             unsigned port; \
363             port = lcbvb_get_port(cfg, idx, SVC, mode); \
364             if (port) { \
365                 lcb::Authenticator& auth = *instance->settings->auth; \
366                 std::string username = auth.username_for(NULL, NULL, LCBT_SETTING(instance, bucket)); \
367                 std::string password = auth.password_for(NULL, NULL, LCBT_SETTING(instance, bucket)); \
368                 lcbvb_SERVER *srv = LCBVB_GET_SERVER(cfg, idx); \
369                 bool ipv6 = strchr(srv->hostname, ':'); \
370                 snprintf(buf, sizeof(buf), "%s://%s%s%s:%d%s", (mode == LCBVB_SVCMODE_PLAIN) ? "http" : "https", \
371                          ipv6 ? "[" : "", srv->hostname, ipv6 ? "]" : "", port, PATH); \
372                 htcmd.host = buf; \
373                 htcmd.method = LCB_HTTP_METHOD_GET; \
374                 htcmd.type = LCB_HTTP_TYPE_PING; \
375                 htcmd.reqhandle = &htreq; \
376                 htcmd.username = username.c_str(); \
377                 htcmd.password = password.c_str(); \
378                 htcmd.cmdflags = LCB_CMDHTTP_F_CASTMO; \
379                 htcmd.cas = LCBT_SETTING(instance, TMO); \
380                 rc = lcb_http3(instance, ckwrap, &htcmd); \
381                 if (rc == LCB_SUCCESS) { \
382                     htreq->set_callback(CB); \
383                     ckwrap->remaining++; \
384                 } \
385             }
386 
387         if (cmd->services & LCB_PINGSVC_F_N1QL) {
388             PING_HTTP(LCBVB_SVCTYPE_N1QL, "/admin/ping", n1ql_timeout, handle_n1ql);
389         }
390         if (cmd->services & LCB_PINGSVC_F_VIEWS) {
391             PING_HTTP(LCBVB_SVCTYPE_VIEWS, "/", views_timeout, handle_views);
392         }
393         if (cmd->services & LCB_PINGSVC_F_FTS) {
394             PING_HTTP(LCBVB_SVCTYPE_FTS, "/api/ping", http_timeout, handle_fts);
395         }
396         if (cmd->services & LCB_PINGSVC_F_ANALYTICS) {
397             PING_HTTP(LCBVB_SVCTYPE_ANALYTICS, "/admin/ping", n1ql_timeout, handle_analytics);
398         }
399 #undef PING_HTTP
400     }
401 
402     if (ckwrap->remaining == 0) {
403         delete ckwrap;
404         return LCB_NO_MATCHING_SERVER;
405     }
406     MAYBE_SCHEDLEAVE(instance);
407     return LCB_SUCCESS;
408 }
409 
410 
411 LIBCOUCHBASE_API
412 lcb_error_t
lcb_diag(lcb_t instance,const void * cookie,const lcb_CMDDIAG * cmd)413 lcb_diag(lcb_t instance, const void *cookie, const lcb_CMDDIAG *cmd)
414 {
415     Json::Value root;
416     hrtime_t now = LCB_NS2US(gethrtime());
417 
418     root["version"] = 1;
419 
420     std::string sdk("libcouchbase/" LCB_VERSION_STRING);
421     if (LCBT_SETTING(instance, client_string)) {
422         sdk.append(" ").append(LCBT_SETTING(instance, client_string));
423     }
424     root["sdk"] = sdk.c_str();
425     {
426         char id[20] = {0};
427         snprintf(id, sizeof(id), "%p", (void *)instance);
428         std::string idstr(id);
429         if (cmd->id) {
430             idstr.append("/").append(cmd->id);
431         }
432         root["id"] = idstr;
433     }
434 
435     size_t ii;
436     Json::Value kv;
437     for (ii = 0; ii < instance->cmdq.npipelines; ii++) {
438         lcb::Server *server = static_cast<lcb::Server*>(instance->cmdq.pipelines[ii]);
439         lcbio_CTX *ctx = server->connctx;
440         if (ctx) {
441             Json::Value endpoint;
442             char id[20] = {0};
443             snprintf(id, sizeof(id), "%016" PRIx64, ctx->sock ? ctx->sock->id : (lcb_U64)0);
444             endpoint["id"] = id;
445             if (server->curhost->ipv6) {
446                 endpoint["remote"] = "[" + std::string(server->curhost->host) + "]:" + std::string(server->curhost->port);
447             } else {
448                 endpoint["remote"] = std::string(server->curhost->host) + ":" + std::string(server->curhost->port);
449             }
450             endpoint["local"] = ctx->sock->info->ep_local;
451             endpoint["last_activity_us"] = (Json::Value::UInt64)(now > ctx->sock->atime ? now - ctx->sock->atime : 0);
452             endpoint["status"] = "connected";
453             root[lcbio_svcstr(ctx->sock->service)].append(endpoint);
454         }
455     }
456     instance->memd_sockpool->toJSON(now, root);
457     instance->http_sockpool->toJSON(now, root);
458     {
459         Json::Value cur;
460         lcb_ASPEND_SETTYPE::iterator it;
461         lcb_ASPEND_SETTYPE *pendq;
462         if ((pendq = instance->pendops.items[LCB_PENDTYPE_HTTP])) {
463             for (it = pendq->begin(); it != pendq->end(); ++it) {
464                 lcb::http::Request *htreq = reinterpret_cast<lcb::http::Request*>(*it);
465                 lcbio_CTX *ctx = htreq->ioctx;
466                 if (ctx) {
467                     Json::Value endpoint;
468                     char id[20] = {0};
469                     snprintf(id, sizeof(id), "%016" PRIx64, ctx->sock ? ctx->sock->id : (lcb_U64)0);
470                     endpoint["id"] = id;
471                     if (htreq->ipv6) {
472                         endpoint["remote"] = "[" + std::string(htreq->host) + "]:" + std::string(htreq->port);
473                     } else {
474                         endpoint["remote"] = std::string(htreq->host) + ":" + std::string(htreq->port);
475                     }
476                     endpoint["local"] = ctx->sock->info->ep_local;
477                     endpoint["last_activity_us"] = (Json::Value::UInt64)(now > ctx->sock->atime ? now - ctx->sock->atime : 0);
478                     endpoint["status"] = "connected";
479                     root[lcbio_svcstr(ctx->sock->service)].append(endpoint);
480                 }
481             }
482         }
483     }
484 
485     Json::Writer *w;
486     if (cmd->options & LCB_PINGOPT_F_JSONPRETTY) {
487         w = new Json::StyledWriter();
488     } else {
489         w = new Json::FastWriter();
490     }
491     std::string json = w->write(root);
492     delete w;
493 
494     lcb_RESPDIAG resp = {0};
495     lcb_RESPCALLBACK callback;
496 
497     resp.njson = json.size();
498     resp.json = json.c_str();
499 
500     callback = lcb_find_callback(instance, LCB_CALLBACK_DIAG);
501     resp.cookie = const_cast<void*>(cookie);
502     callback(instance, LCB_CALLBACK_DIAG, (lcb_RESPBASE *)&resp);
503 
504     return LCB_SUCCESS;
505 }
506