1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2017 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "internal.h"
19 #include <libcouchbase/n1ql.h>
20 #include "http/http.h"
21 #include "auth-priv.h"
22
23 static void refcnt_dtor_ping(mc_PACKET *);
24 static void handle_ping(mc_PIPELINE *, mc_PACKET *, lcb_error_t, const void *);
25
26 static mc_REQDATAPROCS ping_procs = {
27 handle_ping,
28 refcnt_dtor_ping
29 };
30
31 struct PingCookie : mc_REQDATAEX {
32 int remaining;
33 int options;
34 std::list<lcb_PINGSVC> responses;
35 std::string id;
36
PingCookiePingCookie37 PingCookie(const void *cookie_, int _options)
38 : mc_REQDATAEX(cookie_, ping_procs, gethrtime()),
39 remaining(0), options(_options) {
40 }
41
~PingCookiePingCookie42 ~PingCookie() {
43 for (std::list<lcb_PINGSVC>::iterator it = responses.begin(); it != responses.end(); it++) {
44 if (it->server) {
45 free((void *)it->server);
46 it->server = NULL;
47 free((void *)it->local);
48 it->local = NULL;
49 free((void *)it->id);
50 it->id = NULL;
51 }
52 }
53 }
54
needMetricsPingCookie55 bool needMetrics() {
56 return (options & LCB_PINGOPT_F_NOMETRICS) == 0;
57 }
58
needJSONPingCookie59 bool needJSON() {
60 return options & LCB_PINGOPT_F_JSON;
61 }
62
needDetailsPingCookie63 bool needDetails() {
64 return options & LCB_PINGOPT_F_JSONDETAILS;
65 }
66
needPrettyPingCookie67 bool needPretty() {
68 return options & LCB_PINGOPT_F_JSONPRETTY;
69 }
70 };
71
72 static void
refcnt_dtor_ping(mc_PACKET * pkt)73 refcnt_dtor_ping(mc_PACKET *pkt)
74 {
75 PingCookie *ck = static_cast<PingCookie *>(pkt->u_rdata.exdata);
76 if (!--ck->remaining) {
77 delete ck;
78 }
79 }
80
svc_to_string(lcb_PINGSVCTYPE type)81 static const char* svc_to_string(lcb_PINGSVCTYPE type)
82 {
83 switch (type) {
84 case LCB_PINGSVC_KV:
85 return "kv";
86 case LCB_PINGSVC_VIEWS:
87 return "views";
88 case LCB_PINGSVC_N1QL:
89 return "n1ql";
90 case LCB_PINGSVC_ANALYTICS:
91 return "analytics";
92 case LCB_PINGSVC_FTS:
93 return "fts";
94 default:
95 return "unknown";
96 }
97 }
98
99 static void
build_ping_json(lcb_t instance,lcb_RESPPING & ping,Json::Value & root,PingCookie * ck)100 build_ping_json(lcb_t instance, lcb_RESPPING &ping, Json::Value &root, PingCookie *ck)
101 {
102 Json::Value services;
103 for (size_t ii = 0; ii < ping.nservices; ii++) {
104 lcb_PINGSVC &svc = ping.services[ii];
105 Json::Value service;
106 service["remote"] = svc.server;
107 if (svc.local) {
108 service["local"] = svc.local;
109 }
110 if (svc.id) {
111 service["id"] = svc.id;
112 }
113 if (svc.scope) {
114 service["scope"] = svc.scope;
115 }
116
117 service["latency_us"] = (Json::Value::UInt64)LCB_NS2US(svc.latency);
118 switch (svc.status) {
119 case LCB_PINGSTATUS_OK:
120 service["status"] = "ok";
121 break;
122 case LCB_PINGSTATUS_TIMEOUT:
123 service["status"] = "timeout";
124 break;
125 default:
126 service["status"] = "error";
127 if (ck->needDetails()) {
128 service["details"] = lcb_strerror_long(svc.rc);
129 }
130 }
131 services[svc_to_string(svc.type)].append(service);
132 }
133 root["services"] = services;
134 root["version"] = 1;
135
136 std::string sdk("libcouchbase/" LCB_VERSION_STRING);
137 if (LCBT_SETTING(instance, client_string)) {
138 sdk.append(" ").append(LCBT_SETTING(instance, client_string));
139 }
140 root["sdk"] = sdk.c_str();
141 root["id"] = ck->id;
142
143 int config_rev = -1;
144 if (instance->cur_configinfo) {
145 lcb::clconfig::ConfigInfo *cfg = instance->cur_configinfo;
146 config_rev = cfg->vbc->revid;
147 }
148 root["config_rev"] = config_rev;
149 }
150
151 static void
invoke_ping_callback(lcb_t instance,PingCookie * ck)152 invoke_ping_callback(lcb_t instance, PingCookie *ck)
153 {
154 lcb_RESPPING ping;
155 std::string json;
156 size_t idx = 0;
157 memset(&ping, 0, sizeof(ping));
158 if (ck->needMetrics()) {
159 ping.nservices = ck->responses.size();
160 ping.services = new lcb_PINGSVC[ping.nservices];
161 for(std::list<lcb_PINGSVC>::const_iterator it = ck->responses.begin(); it != ck->responses.end(); ++it){
162 ping.services[idx++] = *it;
163 }
164 if (ck->needJSON()) {
165 Json::Value root;
166 build_ping_json(instance, ping, root, ck);
167 Json::Writer *w;
168 if (ck->needPretty()) {
169 w = new Json::StyledWriter();
170 } else {
171 w = new Json::FastWriter();
172 }
173 json = w->write(root);
174 delete w;
175 ping.njson = json.size();
176 ping.json = json.c_str();
177 }
178 }
179 lcb_RESPCALLBACK callback;
180 callback = lcb_find_callback(instance, LCB_CALLBACK_PING);
181 ping.cookie = const_cast<void*>(ck->cookie);
182 callback(instance, LCB_CALLBACK_PING, (lcb_RESPBASE *)&ping);
183 if (ping.services != NULL) {
184 delete []ping.services;
185 }
186 delete ck;
187 }
188
189 static void
handle_ping(mc_PIPELINE * pipeline,mc_PACKET * req,lcb_error_t err,const void *)190 handle_ping(mc_PIPELINE *pipeline, mc_PACKET *req, lcb_error_t err, const void *)
191 {
192 lcb::Server *server = static_cast<lcb::Server*>(pipeline);
193 PingCookie *ck = (PingCookie *)req->u_rdata.exdata;
194
195 if (ck->needMetrics()) {
196 const lcb_host_t &remote = server->get_host();
197 std::string hh;
198 if (remote.ipv6) {
199 hh.append("[").append(remote.host).append("]:").append(remote.port);
200 } else {
201 hh.append(remote.host).append(":").append(remote.port);
202 }
203 lcb_PINGSVC svc = {};
204 svc.type = LCB_PINGSVC_KV;
205 svc.server = strdup(hh.c_str());
206 svc.latency = gethrtime() - MCREQ_PKT_RDATA(req)->start;
207 svc.rc = err;
208 switch (err) {
209 case LCB_ETIMEDOUT:
210 svc.status = LCB_PINGSTATUS_TIMEOUT;
211 break;
212 case LCB_SUCCESS:
213 svc.status = LCB_PINGSTATUS_OK;
214 break;
215 default:
216 svc.status = LCB_PINGSTATUS_ERROR;
217 break;
218 }
219 lcbio_CTX *ctx = server->connctx;
220 if (ctx) {
221 char id[20] = {0};
222 svc.local = strdup(ctx->sock->info->ep_local);
223 snprintf(id, sizeof(id), "%p", (void *)ctx->sock);
224 svc.id = strdup(id);
225 }
226 svc.scope = server->get_instance()->get_bucketname();
227
228 ck->responses.push_back(svc);
229 }
230
231 if (--ck->remaining) {
232 return;
233 }
234 invoke_ping_callback(server->get_instance(), ck);
235 }
236
handle_http(lcb_t instance,lcb_PINGSVCTYPE type,const lcb_RESPHTTP * resp)237 static void handle_http(lcb_t instance, lcb_PINGSVCTYPE type, const lcb_RESPHTTP *resp)
238 {
239 if ((resp->rflags & LCB_RESP_F_FINAL) == 0) {
240 return;
241 }
242 PingCookie *ck = (PingCookie *)resp->cookie;
243 lcb::http::Request *htreq = reinterpret_cast<lcb::http::Request*>(resp->_htreq);
244
245 if (ck->needMetrics()) {
246 lcb_PINGSVC svc = {};
247 svc.type = type;
248 std::string hh;
249 if (htreq->ipv6) {
250 hh = "[" + std::string(htreq->host) + "]:" + std::string(htreq->port);
251 } else {
252 hh = std::string(htreq->host) + ":" + std::string(htreq->port);
253 }
254 svc.server = strdup(hh.c_str());
255 svc.latency = gethrtime() - htreq->start;
256 svc.rc = resp->rc;
257 switch (resp->rc) {
258 case LCB_ETIMEDOUT:
259 svc.status = LCB_PINGSTATUS_TIMEOUT;
260 break;
261 case LCB_SUCCESS:
262 svc.status = LCB_PINGSTATUS_OK;
263 break;
264 default:
265 svc.status = LCB_PINGSTATUS_ERROR;
266 break;
267 }
268 lcbio_CTX *ctx = htreq->ioctx;
269 if (ctx) {
270 char id[20] = {0};
271 snprintf(id, sizeof(id), "%p", (void *)ctx->sock);
272 svc.id = strdup(id);
273 svc.local = strdup(ctx->sock->info->ep_local);
274 }
275 ck->responses.push_back(svc);
276 }
277 if (--ck->remaining) {
278 return;
279 }
280 invoke_ping_callback(instance, ck);
281 }
282
handle_n1ql(lcb_t instance,int,const lcb_RESPBASE * resp)283 static void handle_n1ql(lcb_t instance, int, const lcb_RESPBASE *resp)
284 {
285 handle_http(instance, LCB_PINGSVC_N1QL, (const lcb_RESPHTTP *)resp);
286 }
287
handle_analytics(lcb_t instance,int,const lcb_RESPBASE * resp)288 static void handle_analytics(lcb_t instance, int, const lcb_RESPBASE *resp)
289 {
290 handle_http(instance, LCB_PINGSVC_ANALYTICS, (const lcb_RESPHTTP *)resp);
291 }
292
handle_views(lcb_t instance,int,const lcb_RESPBASE * resp)293 static void handle_views(lcb_t instance, int, const lcb_RESPBASE *resp)
294 {
295 handle_http(instance, LCB_PINGSVC_VIEWS, (const lcb_RESPHTTP *)resp);
296 }
297
handle_fts(lcb_t instance,int,const lcb_RESPBASE * resp)298 static void handle_fts(lcb_t instance, int, const lcb_RESPBASE *resp)
299 {
300 handle_http(instance, LCB_PINGSVC_FTS, (const lcb_RESPHTTP *)resp);
301 }
302
303 LIBCOUCHBASE_API
304 lcb_error_t
lcb_ping3(lcb_t instance,const void * cookie,const lcb_CMDPING * cmd)305 lcb_ping3(lcb_t instance, const void *cookie, const lcb_CMDPING *cmd)
306 {
307 mc_CMDQUEUE *cq = &instance->cmdq;
308 unsigned ii;
309
310 if (!cq->config) {
311 return LCB_CLIENT_ETMPFAIL;
312 }
313
314 PingCookie *ckwrap = new PingCookie(cookie, cmd->options);
315 {
316 char id[20] = {0};
317 snprintf(id, sizeof(id), "%p", (void *)instance);
318 ckwrap->id = id;
319 if (cmd->id) {
320 ckwrap->id.append("/").append(cmd->id);
321 }
322 }
323
324 lcbvb_CONFIG *cfg = LCBT_VBCONFIG(instance);
325 const lcbvb_SVCMODE mode = LCBT_SETTING_SVCMODE(instance);
326 if (cmd->services & LCB_PINGSVC_F_KV) {
327 for (ii = 0; ii < cq->npipelines; ii++) {
328 unsigned port = lcbvb_get_port(cfg, ii, LCBVB_SVCTYPE_DATA, mode);
329 if (!port) {
330 continue;
331 }
332
333 mc_PIPELINE *pl = cq->pipelines[ii];
334 mc_PACKET *pkt = mcreq_allocate_packet(pl);
335 protocol_binary_request_header hdr;
336 memset(&hdr, 0, sizeof(hdr));
337
338 if (!pkt) {
339 return LCB_CLIENT_ENOMEM;
340 }
341
342 pkt->u_rdata.exdata = ckwrap;
343 pkt->flags |= MCREQ_F_REQEXT;
344
345 hdr.request.magic = PROTOCOL_BINARY_REQ;
346 hdr.request.opaque = pkt->opaque;
347 hdr.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
348
349 mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE);
350 memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
351 mcreq_sched_add(pl, pkt);
352 ckwrap->remaining++;
353 }
354 }
355
356 for (int idx = 0; idx < (int)LCBVB_NSERVERS(cfg); idx++) {
357 #define PING_HTTP(SVC, PATH, TMO, CB) \
358 lcb_error_t rc; \
359 struct lcb_http_request_st *htreq; \
360 lcb_CMDHTTP htcmd = {0}; \
361 char buf[1024] = {0}; \
362 unsigned port; \
363 port = lcbvb_get_port(cfg, idx, SVC, mode); \
364 if (port) { \
365 lcb::Authenticator& auth = *instance->settings->auth; \
366 std::string username = auth.username_for(NULL, NULL, LCBT_SETTING(instance, bucket)); \
367 std::string password = auth.password_for(NULL, NULL, LCBT_SETTING(instance, bucket)); \
368 lcbvb_SERVER *srv = LCBVB_GET_SERVER(cfg, idx); \
369 bool ipv6 = strchr(srv->hostname, ':'); \
370 snprintf(buf, sizeof(buf), "%s://%s%s%s:%d%s", (mode == LCBVB_SVCMODE_PLAIN) ? "http" : "https", \
371 ipv6 ? "[" : "", srv->hostname, ipv6 ? "]" : "", port, PATH); \
372 htcmd.host = buf; \
373 htcmd.method = LCB_HTTP_METHOD_GET; \
374 htcmd.type = LCB_HTTP_TYPE_PING; \
375 htcmd.reqhandle = &htreq; \
376 htcmd.username = username.c_str(); \
377 htcmd.password = password.c_str(); \
378 htcmd.cmdflags = LCB_CMDHTTP_F_CASTMO; \
379 htcmd.cas = LCBT_SETTING(instance, TMO); \
380 rc = lcb_http3(instance, ckwrap, &htcmd); \
381 if (rc == LCB_SUCCESS) { \
382 htreq->set_callback(CB); \
383 ckwrap->remaining++; \
384 } \
385 }
386
387 if (cmd->services & LCB_PINGSVC_F_N1QL) {
388 PING_HTTP(LCBVB_SVCTYPE_N1QL, "/admin/ping", n1ql_timeout, handle_n1ql);
389 }
390 if (cmd->services & LCB_PINGSVC_F_VIEWS) {
391 PING_HTTP(LCBVB_SVCTYPE_VIEWS, "/", views_timeout, handle_views);
392 }
393 if (cmd->services & LCB_PINGSVC_F_FTS) {
394 PING_HTTP(LCBVB_SVCTYPE_FTS, "/api/ping", http_timeout, handle_fts);
395 }
396 if (cmd->services & LCB_PINGSVC_F_ANALYTICS) {
397 PING_HTTP(LCBVB_SVCTYPE_ANALYTICS, "/admin/ping", n1ql_timeout, handle_analytics);
398 }
399 #undef PING_HTTP
400 }
401
402 if (ckwrap->remaining == 0) {
403 delete ckwrap;
404 return LCB_NO_MATCHING_SERVER;
405 }
406 MAYBE_SCHEDLEAVE(instance);
407 return LCB_SUCCESS;
408 }
409
410
411 LIBCOUCHBASE_API
412 lcb_error_t
lcb_diag(lcb_t instance,const void * cookie,const lcb_CMDDIAG * cmd)413 lcb_diag(lcb_t instance, const void *cookie, const lcb_CMDDIAG *cmd)
414 {
415 Json::Value root;
416 hrtime_t now = LCB_NS2US(gethrtime());
417
418 root["version"] = 1;
419
420 std::string sdk("libcouchbase/" LCB_VERSION_STRING);
421 if (LCBT_SETTING(instance, client_string)) {
422 sdk.append(" ").append(LCBT_SETTING(instance, client_string));
423 }
424 root["sdk"] = sdk.c_str();
425 {
426 char id[20] = {0};
427 snprintf(id, sizeof(id), "%p", (void *)instance);
428 std::string idstr(id);
429 if (cmd->id) {
430 idstr.append("/").append(cmd->id);
431 }
432 root["id"] = idstr;
433 }
434
435 size_t ii;
436 Json::Value kv;
437 for (ii = 0; ii < instance->cmdq.npipelines; ii++) {
438 lcb::Server *server = static_cast<lcb::Server*>(instance->cmdq.pipelines[ii]);
439 lcbio_CTX *ctx = server->connctx;
440 if (ctx) {
441 Json::Value endpoint;
442 char id[20] = {0};
443 snprintf(id, sizeof(id), "%016" PRIx64, ctx->sock ? ctx->sock->id : (lcb_U64)0);
444 endpoint["id"] = id;
445 if (server->curhost->ipv6) {
446 endpoint["remote"] = "[" + std::string(server->curhost->host) + "]:" + std::string(server->curhost->port);
447 } else {
448 endpoint["remote"] = std::string(server->curhost->host) + ":" + std::string(server->curhost->port);
449 }
450 endpoint["local"] = ctx->sock->info->ep_local;
451 endpoint["last_activity_us"] = (Json::Value::UInt64)(now > ctx->sock->atime ? now - ctx->sock->atime : 0);
452 endpoint["status"] = "connected";
453 root[lcbio_svcstr(ctx->sock->service)].append(endpoint);
454 }
455 }
456 instance->memd_sockpool->toJSON(now, root);
457 instance->http_sockpool->toJSON(now, root);
458 {
459 Json::Value cur;
460 lcb_ASPEND_SETTYPE::iterator it;
461 lcb_ASPEND_SETTYPE *pendq;
462 if ((pendq = instance->pendops.items[LCB_PENDTYPE_HTTP])) {
463 for (it = pendq->begin(); it != pendq->end(); ++it) {
464 lcb::http::Request *htreq = reinterpret_cast<lcb::http::Request*>(*it);
465 lcbio_CTX *ctx = htreq->ioctx;
466 if (ctx) {
467 Json::Value endpoint;
468 char id[20] = {0};
469 snprintf(id, sizeof(id), "%016" PRIx64, ctx->sock ? ctx->sock->id : (lcb_U64)0);
470 endpoint["id"] = id;
471 if (htreq->ipv6) {
472 endpoint["remote"] = "[" + std::string(htreq->host) + "]:" + std::string(htreq->port);
473 } else {
474 endpoint["remote"] = std::string(htreq->host) + ":" + std::string(htreq->port);
475 }
476 endpoint["local"] = ctx->sock->info->ep_local;
477 endpoint["last_activity_us"] = (Json::Value::UInt64)(now > ctx->sock->atime ? now - ctx->sock->atime : 0);
478 endpoint["status"] = "connected";
479 root[lcbio_svcstr(ctx->sock->service)].append(endpoint);
480 }
481 }
482 }
483 }
484
485 Json::Writer *w;
486 if (cmd->options & LCB_PINGOPT_F_JSONPRETTY) {
487 w = new Json::StyledWriter();
488 } else {
489 w = new Json::FastWriter();
490 }
491 std::string json = w->write(root);
492 delete w;
493
494 lcb_RESPDIAG resp = {0};
495 lcb_RESPCALLBACK callback;
496
497 resp.njson = json.size();
498 resp.json = json.c_str();
499
500 callback = lcb_find_callback(instance, LCB_CALLBACK_DIAG);
501 resp.cookie = const_cast<void*>(cookie);
502 callback(instance, LCB_CALLBACK_DIAG, (lcb_RESPBASE *)&resp);
503
504 return LCB_SUCCESS;
505 }
506