1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2018 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <libcouchbase/couchbase.h>
19 #include <libcouchbase/analytics.h>
20 #include <jsparse/parser.h>
21 #include "internal.h"
22 #include "auth-priv.h"
23 #include "http/http.h"
24 #include "logging.h"
25 #include "contrib/lcb-jsoncpp/lcb-jsoncpp.h"
26 #include <map>
27 #include <string>
28 #include <list>
29 #include "docreq/docreq.h"
30 #include "rnd.h"
31
32 #define LOGFMT "(NR=%p) "
33 #define LOGID(req) static_cast< const void * >(req)
34 #define LOGARGS(req, lvl) req->instance->settings, "analytics", LCB_LOG_##lvl, __FILE__, __LINE__
35
36 using namespace lcb;
37
default_id_generator(lcb_t,const void *,lcb_ANALYTICSINGESTIDGENERATORPARAM * param)38 static lcb_ANALYTICSINGESTSTATUS default_id_generator(lcb_t, const void *, lcb_ANALYTICSINGESTIDGENERATORPARAM *param)
39 {
40 param->id = static_cast<char *>(calloc(34, sizeof(char)));
41 param->id_free = free;
42 param->nid = snprintf(param->id, 34, "%016" PRIx64 "-%016" PRIx64, lcb_next_rand64(), lcb_next_rand64());
43 return LCB_ANALYTICSINGEST_OK;
44 }
45
default_data_converter(lcb_t,const void *,lcb_ANALYTICSINGESTDATACONVERTERPARAM *)46 static lcb_ANALYTICSINGESTSTATUS default_data_converter(lcb_t, const void *, lcb_ANALYTICSINGESTDATACONVERTERPARAM *)
47 {
48 return LCB_ANALYTICSINGEST_OK;
49 }
50
51 struct lcb_ANALYTICSINGEST_st {
52 lcb_ANALYTICSINGESTMETHOD method;
53 uint32_t exptime;
54 bool ignore_errors;
55 lcb_ANALYTICSINGESTIDGENERATOR id_generator;
56 lcb_ANALYTICSINGESTDATACONVERTER data_converter;
57
lcb_ANALYTICSINGEST_stlcb_ANALYTICSINGEST_st58 lcb_ANALYTICSINGEST_st()
59 : method(LCB_ANALYTICSINGEST_NONE), exptime(0), ignore_errors(false), id_generator(default_id_generator),
60 data_converter(default_data_converter)
61 {
62 }
63 };
64
65 struct lcb_ANALYTICSREQ;
66 struct IngestRequest : docreq::DocRequest {
67 lcb_ANALYTICSREQ *parent;
68 std::string row;
69 };
70
71 struct lcb_CMDANALYTICS_st {
72 Json::Value root;
73 std::string encoded;
74 lcb_ANALYTICSCALLBACK callback;
75 lcb_ANALYTICSHANDLE handle;
76 lcb_ANALYTICSINGEST_st ingest;
77
lcb_CMDANALYTICS_stlcb_CMDANALYTICS_st78 lcb_CMDANALYTICS_st() : root(Json::objectValue), callback(NULL), handle(NULL) {}
79
encodelcb_CMDANALYTICS_st80 bool encode()
81 {
82 encoded = Json::FastWriter().write(root);
83 return true;
84 }
85 };
86
87 LIBCOUCHBASE_API
lcb_analytics_new(void)88 lcb_CMDANALYTICS *lcb_analytics_new(void)
89 {
90 return new lcb_CMDANALYTICS;
91 }
92
93 LIBCOUCHBASE_API
lcb_analytics_reset(lcb_CMDANALYTICS * cmd)94 void lcb_analytics_reset(lcb_CMDANALYTICS *cmd)
95 {
96 cmd->encoded.clear();
97 cmd->root.clear();
98 }
99
100 LIBCOUCHBASE_API
lcb_analytics_free(lcb_CMDANALYTICS * cmd)101 void lcb_analytics_free(lcb_CMDANALYTICS *cmd)
102 {
103 delete cmd;
104 }
105
106 LIBCOUCHBASE_API
lcb_analytics_gethandle(lcb_CMDANALYTICS * cmd)107 lcb_ANALYTICSHANDLE lcb_analytics_gethandle(lcb_CMDANALYTICS *cmd)
108 {
109 return cmd->handle;
110 }
111
112 LIBCOUCHBASE_API
lcb_analytics_setcallback(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSCALLBACK callback)113 lcb_error_t lcb_analytics_setcallback(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSCALLBACK callback)
114 {
115 if (cmd) {
116 cmd->callback = callback;
117 return LCB_SUCCESS;
118 }
119 return LCB_EINVAL;
120 }
121
122 #define fix_strlen(s, n) \
123 if (n == (size_t)-1) { \
124 n = strlen(s); \
125 }
126
127 LIBCOUCHBASE_API
lcb_analytics_setquery(lcb_CMDANALYTICS * cmd,const char * qstr,size_t nqstr)128 lcb_error_t lcb_analytics_setquery(lcb_CMDANALYTICS *cmd, const char *qstr, size_t nqstr)
129 {
130 fix_strlen(qstr, nqstr);
131 Json::Value value;
132 if (!Json::Reader().parse(qstr, qstr + nqstr, value)) {
133 return LCB_EINVAL;
134 }
135 cmd->root = value;
136 return LCB_SUCCESS;
137 }
138
139 LIBCOUCHBASE_API
lcb_analytics_setopt(lcb_CMDANALYTICS * cmd,const char * k,size_t nk,const char * v,size_t nv)140 lcb_error_t lcb_analytics_setopt(lcb_CMDANALYTICS *cmd, const char *k, size_t nk, const char *v, size_t nv)
141 {
142 fix_strlen(v, nv);
143 fix_strlen(k, nk);
144 Json::Value value;
145 if (!Json::Reader().parse(v, v + nv, value)) {
146 return LCB_EINVAL;
147 }
148 cmd->root[std::string(k, nk)] = value;
149 return LCB_SUCCESS;
150 }
151
152 LIBCOUCHBASE_API
lcb_analytics_setstatement(lcb_CMDANALYTICS * cmd,const char * sstr,size_t nsstr)153 lcb_error_t lcb_analytics_setstatement(lcb_CMDANALYTICS *cmd, const char *sstr, size_t nsstr)
154 {
155 fix_strlen(sstr, nsstr);
156 cmd->root["statement"] = std::string(sstr, nsstr);
157 return LCB_SUCCESS;
158 }
159
160 LIBCOUCHBASE_API
lcb_analytics_namedparam(lcb_CMDANALYTICS * cmd,const char * name,size_t nname,const char * value,size_t nvalue)161 lcb_error_t lcb_analytics_namedparam(lcb_CMDANALYTICS *cmd, const char *name, size_t nname, const char *value,
162 size_t nvalue)
163 {
164 return lcb_analytics_setopt(cmd, name, nname, value, nvalue);
165 }
166
167 LIBCOUCHBASE_API
lcb_analytics_posparam(lcb_CMDANALYTICS * cmd,const char * value,size_t nvalue)168 lcb_error_t lcb_analytics_posparam(lcb_CMDANALYTICS *cmd, const char *value, size_t nvalue)
169 {
170 fix_strlen(value, nvalue);
171 Json::Value jval;
172 if (!Json::Reader().parse(value, value + nvalue, jval)) {
173 return LCB_EINVAL;
174 }
175 cmd->root["args"].append(jval);
176 return LCB_SUCCESS;
177 }
178
179 LIBCOUCHBASE_API
lcb_analytics_ingest_setmethod(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTMETHOD method)180 lcb_error_t lcb_analytics_ingest_setmethod(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTMETHOD method)
181 {
182 cmd->ingest.method = method;
183 return LCB_SUCCESS;
184 }
185
186 LIBCOUCHBASE_API
lcb_analytics_ingest_setexptime(lcb_CMDANALYTICS * cmd,lcb_U32 exptime)187 lcb_error_t lcb_analytics_ingest_setexptime(lcb_CMDANALYTICS *cmd, lcb_U32 exptime)
188 {
189 cmd->ingest.exptime = exptime;
190 return LCB_SUCCESS;
191 }
192
193 LIBCOUCHBASE_API
lcb_analytics_ingest_ignoreingesterror(lcb_CMDANALYTICS * cmd,int ignore)194 lcb_error_t lcb_analytics_ingest_ignoreingesterror(lcb_CMDANALYTICS *cmd, int ignore)
195 {
196 cmd->ingest.ignore_errors = ignore ? true : false;
197 return LCB_SUCCESS;
198 }
199
200 LIBCOUCHBASE_API
lcb_analytics_ingest_setidgenerator(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTIDGENERATOR generator)201 lcb_error_t lcb_analytics_ingest_setidgenerator(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTIDGENERATOR generator)
202 {
203 cmd->ingest.id_generator = generator;
204 return LCB_SUCCESS;
205 }
206
207 LIBCOUCHBASE_API
lcb_analytics_ingest_setdataconverter(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTDATACONVERTER converter)208 lcb_error_t lcb_analytics_ingest_setdataconverter(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTDATACONVERTER converter)
209 {
210 cmd->ingest.data_converter = converter;
211 return LCB_SUCCESS;
212 }
213
214 LIBCOUCHBASE_API
lcb_analytics_setdeferred(lcb_CMDANALYTICS * cmd,int deferred)215 lcb_error_t lcb_analytics_setdeferred(lcb_CMDANALYTICS *cmd, int deferred)
216 {
217 if (deferred) {
218 cmd->root["mode"] = std::string("async");
219 } else {
220 cmd->root.removeMember("mode");
221 }
222 return LCB_SUCCESS;
223 }
224
225 struct lcb_ANALYTICSDEFERREDHANDLE_st {
226 std::string status;
227 std::string handle;
228 lcb_ANALYTICSCALLBACK callback;
229
lcb_ANALYTICSDEFERREDHANDLE_stlcb_ANALYTICSDEFERREDHANDLE_st230 lcb_ANALYTICSDEFERREDHANDLE_st(std::string status_, std::string handle_) : status(status_), handle(handle_) {}
231 };
232
lcb_analytics_defhnd_extract(const lcb_RESPANALYTICS * resp)233 lcb_ANALYTICSDEFERREDHANDLE *lcb_analytics_defhnd_extract(const lcb_RESPANALYTICS *resp)
234 {
235 if (resp == NULL || resp->rc != LCB_SUCCESS || ((resp->rflags & (LCB_RESP_F_FINAL | LCB_RESP_F_EXTDATA)) == 0) ||
236 resp->nrow == 0 || resp->row == NULL) {
237 return NULL;
238 }
239 Json::Value payload;
240 if (!Json::Reader().parse(resp->row, resp->row + resp->nrow, payload)) {
241 return NULL;
242 }
243 if (!payload.isObject()) {
244 return NULL;
245 }
246 Json::Value status = payload["status"];
247 Json::Value handle = payload["handle"];
248 if (status.isString() && handle.isString()) {
249 return new lcb_ANALYTICSDEFERREDHANDLE_st(status.asString(), handle.asString());
250 }
251 return NULL;
252 }
253
lcb_analytics_defhnd_free(lcb_ANALYTICSDEFERREDHANDLE * handle)254 void lcb_analytics_defhnd_free(lcb_ANALYTICSDEFERREDHANDLE *handle)
255 {
256 if (handle == NULL) {
257 return;
258 }
259 delete handle;
260 }
261
lcb_analytics_defhnd_status(lcb_ANALYTICSDEFERREDHANDLE * handle)262 const char *lcb_analytics_defhnd_status(lcb_ANALYTICSDEFERREDHANDLE *handle)
263 {
264 if (handle == NULL) {
265 return NULL;
266 }
267 return handle->status.c_str();
268 }
269
lcb_analytics_defhnd_setcallback(lcb_ANALYTICSDEFERREDHANDLE * handle,lcb_ANALYTICSCALLBACK callback)270 lcb_error_t lcb_analytics_defhnd_setcallback(lcb_ANALYTICSDEFERREDHANDLE *handle, lcb_ANALYTICSCALLBACK callback)
271 {
272 if (handle) {
273 handle->callback = callback;
274 return LCB_SUCCESS;
275 }
276 return LCB_EINVAL;
277 }
278
279 typedef struct lcb_ANALYTICSREQ : lcb::jsparse::Parser::Actions {
280 const lcb_RESPHTTP *cur_htresp;
281 struct lcb_http_request_st *htreq;
282 lcb::jsparse::Parser *parser;
283 const void *cookie;
284 lcb_ANALYTICSCALLBACK callback;
285 lcb_t instance;
286 lcb_error_t lasterr;
287 lcb_U32 timeout;
288 // How many rows were received. Used to avoid parsing the meta
289 size_t nrows;
290
291 /** Request body as received from the application */
292 Json::Value json;
json_constlcb_ANALYTICSREQ293 const Json::Value &json_const() const
294 {
295 return json;
296 }
297
298 /** String of the original statement. Cached here to avoid jsoncpp lookups */
299 std::string statement;
300
301 /** Whether we're retrying this */
302 bool was_retried;
303
304 /** Non-empty if this is deferred query check/fetch */
305 std::string deferred_handle;
306
307 lcb_ANALYTICSINGEST_st ingest;
308 docreq::Queue *docq;
309 unsigned refcount;
310
311 #ifdef LCB_TRACING
312 lcbtrace_SPAN *span;
313 #endif
314
unreflcb_ANALYTICSREQ315 void unref()
316 {
317 if (!--refcount) {
318 delete this;
319 }
320 }
321
reflcb_ANALYTICSREQ322 void ref()
323 {
324 refcount++;
325 }
326
327 /**
328 * Issues the HTTP request for the query
329 * @param payload The body to send
330 * @return Error code from lcb's http subsystem
331 */
332 inline lcb_error_t issue_htreq(const std::string &payload);
333
issue_htreqlcb_ANALYTICSREQ334 lcb_error_t issue_htreq()
335 {
336 std::string s = Json::FastWriter().write(json);
337 return issue_htreq(s);
338 }
339
340 /**
341 * Attempt to retry the query. This will inspect the meta (if present)
342 * for any errors indicating that a failure might be a result of a stale
343 * plan, and if this query was retried already.
344 * @return true if the retry was successful.
345 */
346 inline bool maybe_retry();
347
348 /**
349 * Returns true if payload matches retry conditions.
350 */
351 inline bool has_retriable_error(const Json::Value &root);
352
353 /**
354 * Pass a row back to the application
355 * @param resp The response. This is populated with state information
356 * from the current query
357 * @param is_last Whether this is the last row. If this is the last, then
358 * the RESP_F_FINAL flag is set, and no further callbacks will be invoked
359 */
360 inline void invoke_row(lcb_RESPANALYTICS *resp, bool is_last);
361
362 inline lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_CMDANALYTICS *cmd);
363 inline lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_ANALYTICSDEFERREDHANDLE *handle);
364 inline ~lcb_ANALYTICSREQ();
365
366 // Parser overrides:
JSPARSE_on_rowlcb_ANALYTICSREQ367 void JSPARSE_on_row(const lcb::jsparse::Row &row)
368 {
369 lcb_RESPANALYTICS resp = {0};
370 resp.row = static_cast< const char * >(row.row.iov_base);
371 resp.nrow = row.row.iov_len;
372 nrows++;
373 if (ingest.method != LCB_ANALYTICSINGEST_NONE) {
374 IngestRequest *req = new IngestRequest();
375 req->parent = this;
376 req->row.assign(static_cast< const char * >(row.row.iov_base), row.row.iov_len);
377 docq->add(req);
378 ref();
379 }
380 invoke_row(&resp, false);
381 }
JSPARSE_on_errorlcb_ANALYTICSREQ382 void JSPARSE_on_error(const std::string &)
383 {
384 lasterr = LCB_PROTOCOL_ERROR;
385 }
JSPARSE_on_completelcb_ANALYTICSREQ386 void JSPARSE_on_complete(const std::string &)
387 {
388 // Nothing
389 }
390
391 } ANALYTICSREQ;
392
parse_json(const char * s,size_t n,Json::Value & res)393 static bool parse_json(const char *s, size_t n, Json::Value &res)
394 {
395 return Json::Reader().parse(s, s + n, res);
396 }
397
has_retriable_error(const Json::Value & root)398 bool ANALYTICSREQ::has_retriable_error(const Json::Value &root)
399 {
400 if (!root.isObject()) {
401 return false;
402 }
403 const Json::Value &errors = root["errors"];
404 if (!errors.isArray()) {
405 return false;
406 }
407 Json::Value::const_iterator ii;
408 for (ii = errors.begin(); ii != errors.end(); ++ii) {
409 const Json::Value &cur = *ii;
410 if (!cur.isObject()) {
411 continue; // eh?
412 }
413 const Json::Value &jcode = cur["code"];
414 unsigned code = 0;
415 if (jcode.isNumeric()) {
416 code = jcode.asUInt();
417 switch (code) {
418 case 23000:
419 case 23003:
420 case 23007:
421 lcb_log(LOGARGS(this, TRACE), LOGFMT "Will retry request. code: %d", LOGID(this), code);
422 return true;
423 default:
424 break;
425 }
426 }
427 }
428 return false;
429 }
430
maybe_retry()431 bool ANALYTICSREQ::maybe_retry()
432 {
433 // Examines the buffer to determine the type of error
434 Json::Value root;
435 lcb_IOV meta;
436
437 if (callback == NULL) {
438 // Cancelled
439 return false;
440 }
441
442 if (nrows) {
443 // Has results:
444 return false;
445 }
446
447 if (was_retried) {
448 return false;
449 }
450
451 was_retried = true;
452 parser->get_postmortem(meta);
453 if (!parse_json(static_cast< const char * >(meta.iov_base), meta.iov_len, root)) {
454 return false; // Not JSON
455 }
456 if (has_retriable_error(root)) {
457 return true;
458 }
459
460 return false;
461 }
462
invoke_row(lcb_RESPANALYTICS * resp,bool is_last)463 void ANALYTICSREQ::invoke_row(lcb_RESPANALYTICS *resp, bool is_last)
464 {
465 resp->cookie = const_cast< void * >(cookie);
466 resp->htresp = cur_htresp;
467
468 if (is_last) {
469 lcb_IOV meta;
470 resp->rflags |= LCB_RESP_F_FINAL;
471 resp->rc = lasterr;
472 parser->get_postmortem(meta);
473 resp->row = static_cast< const char * >(meta.iov_base);
474 resp->nrow = meta.iov_len;
475 if (!deferred_handle.empty()) {
476 /* signal that response might have deferred handle */
477 resp->rflags |= LCB_RESP_F_EXTDATA;
478 }
479 }
480
481 if (callback) {
482 callback(instance, LCB_CALLBACK_ANALYTICS, resp);
483 }
484 if (is_last) {
485 callback = NULL;
486 }
487 }
488
~lcb_ANALYTICSREQ()489 lcb_ANALYTICSREQ::~lcb_ANALYTICSREQ()
490 {
491 if (htreq) {
492 lcb_cancel_http_request(instance, htreq);
493 htreq = NULL;
494 }
495
496 if (callback) {
497 lcb_RESPANALYTICS resp = {0};
498 invoke_row(&resp, 1);
499 }
500
501 #ifdef LCB_TRACING
502 if (span) {
503 if (htreq) {
504 lcbio_CTX *ctx = htreq->ioctx;
505 if (ctx) {
506 lcbtrace_span_add_tag_str_nocopy(span, LCBTRACE_TAG_PEER_ADDRESS, htreq->peer.c_str());
507 lcbtrace_span_add_tag_str_nocopy(span, LCBTRACE_TAG_LOCAL_ADDRESS, ctx->sock->info->ep_local);
508 }
509 }
510 lcbtrace_span_finish(span, LCBTRACE_NOW);
511 span = NULL;
512 }
513 #endif
514
515 if (parser) {
516 delete parser;
517 }
518
519 if (docq != NULL) {
520 docq->parent = NULL;
521 docq->unref();
522 }
523 lcb_aspend_del(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL);
524 }
525
chunk_callback(lcb_t instance,int ign,const lcb_RESPBASE * rb)526 static void chunk_callback(lcb_t instance, int ign, const lcb_RESPBASE *rb)
527 {
528 const lcb_RESPHTTP *rh = (const lcb_RESPHTTP *)rb;
529 ANALYTICSREQ *req = static_cast< ANALYTICSREQ * >(rh->cookie);
530
531 (void)ign;
532 (void)instance;
533
534 req->cur_htresp = rh;
535 if (rh->rc != LCB_SUCCESS || rh->htstatus != 200) {
536 if (req->lasterr == LCB_SUCCESS || rh->htstatus != 200) {
537 req->lasterr = rh->rc ? rh->rc : LCB_HTTP_ERROR;
538 }
539 }
540
541 if (rh->rflags & LCB_RESP_F_FINAL) {
542 req->htreq = NULL;
543 if (!req->maybe_retry()) {
544 req->unref();
545 }
546 return;
547 } else if (req->callback == NULL) {
548 /* Cancelled. Similar to the block above, except the http request
549 * should remain alive (so we can cancel it later on) */
550 req->unref();
551 return;
552 }
553 req->parser->feed(static_cast< const char * >(rh->body), rh->nbody);
554 }
555
issue_htreq(const std::string & body)556 lcb_error_t ANALYTICSREQ::issue_htreq(const std::string &body)
557 {
558 lcb_CMDHTTP htcmd = {0};
559 htcmd.body = body.c_str();
560 htcmd.nbody = body.size();
561
562 htcmd.content_type = "application/json";
563 if (deferred_handle.empty()) {
564 htcmd.method = LCB_HTTP_METHOD_POST;
565 } else {
566 htcmd.method = LCB_HTTP_METHOD_GET;
567 htcmd.host = deferred_handle.c_str();
568 }
569
570 htcmd.type = LCB_HTTP_TYPE_CBAS;
571
572 htcmd.cmdflags = LCB_CMDHTTP_F_STREAM | LCB_CMDHTTP_F_CASTMO;
573 htcmd.reqhandle = &htreq;
574 htcmd.cas = timeout;
575
576 lcb_error_t rc = lcb_http3(instance, this, &htcmd);
577 if (rc == LCB_SUCCESS) {
578 htreq->set_callback(chunk_callback);
579 }
580 return rc;
581 }
582
lcb_analyticsreq_parsetmo(const std::string & s)583 lcb_U32 lcb_analyticsreq_parsetmo(const std::string &s)
584 {
585 double num;
586 int nchars, rv;
587
588 rv = sscanf(s.c_str(), "%lf%n", &num, &nchars);
589 if (rv != 1) {
590 return 0;
591 }
592 std::string mults = s.substr(nchars);
593
594 // Get the actual timeout value in microseconds. Note we can't use the macros
595 // since they will truncate the double value.
596 if (mults == "s") {
597 return num * static_cast< double >(LCB_S2US(1));
598 } else if (mults == "ms") {
599 return num * static_cast< double >(LCB_MS2US(1));
600 } else if (mults == "h") {
601 return num * static_cast< double >(LCB_S2US(3600));
602 } else if (mults == "us") {
603 return num;
604 } else if (mults == "m") {
605 return num * static_cast< double >(LCB_S2US(60));
606 } else if (mults == "ns") {
607 return LCB_NS2US(num);
608 } else {
609 return 0;
610 }
611 }
612
doc_callback(lcb_t,int,const lcb_RESPBASE * rb)613 static void doc_callback(lcb_t, int, const lcb_RESPBASE *rb)
614 {
615 lcb::docreq::DocRequest *dreq = reinterpret_cast< lcb::docreq::DocRequest * >(rb->cookie);
616 lcb::docreq::Queue *q = dreq->parent;
617
618 q->ref();
619
620 q->n_awaiting_response--;
621 dreq->ready = 1;
622
623 q->check();
624
625 q->unref();
626 }
627
cb_op_schedule(lcb::docreq::Queue * q,lcb::docreq::DocRequest * dreq)628 static lcb_error_t cb_op_schedule(lcb::docreq::Queue *q, lcb::docreq::DocRequest *dreq)
629 {
630 IngestRequest *req = reinterpret_cast< IngestRequest * >(dreq);
631 lcb_ANALYTICSREQ *areq = req->parent;
632 lcb_ANALYTICSINGESTSTATUS rc;
633
634 lcb_ANALYTICSINGESTIDGENERATORPARAM id_param;
635 id_param.method = areq->ingest.method;
636 id_param.row = req->row.c_str();
637 id_param.nrow = req->row.size();
638 id_param.id_free = NULL;
639 id_param.id = NULL;
640 id_param.nid = 0;
641
642 rc = areq->ingest.id_generator(q->instance, areq->cookie, &id_param);
643 if (rc != LCB_ANALYTICSINGEST_OK || id_param.id == NULL) {
644 return LCB_EINTERNAL;
645 }
646
647 lcb_ANALYTICSINGESTDATACONVERTERPARAM body_param;
648 body_param.method = areq->ingest.method;
649 body_param.row = req->row.c_str();
650 body_param.nrow = req->row.size();
651 body_param.out_free = NULL;
652 body_param.out = NULL;
653 body_param.nout = 0;
654 rc = areq->ingest.data_converter(q->instance, areq->cookie, &body_param);
655 if (rc != LCB_ANALYTICSINGEST_OK) {
656 if (id_param.id_free) {
657 id_param.id_free(id_param.id);
658 }
659 return LCB_EINTERNAL;
660 }
661
662 lcb_CMDSTORE cmd = {0};
663 cmd.exptime = areq->ingest.exptime;
664 switch (areq->ingest.method) {
665 case LCB_ANALYTICSINGEST_INSERT:
666 cmd.operation = LCB_ADD;
667 break;
668 case LCB_ANALYTICSINGEST_REPLACE:
669 cmd.operation = LCB_REPLACE;
670 break;
671 case LCB_ANALYTICSINGEST_UPSERT:
672 default:
673 cmd.operation = LCB_UPSERT;
674 break;
675 }
676 LCB_CMD_SET_KEY(&cmd, id_param.id, id_param.nid);
677 cmd.value.vtype = LCB_KV_COPY;
678 if (body_param.out) {
679 cmd.value.u_buf.contig.bytes = body_param.out;
680 cmd.value.u_buf.contig.nbytes = body_param.nout;
681 } else {
682 cmd.value.u_buf.contig.bytes = req->row.c_str();
683 cmd.value.u_buf.contig.nbytes = req->row.size();
684 }
685 #ifdef LCB_TRACING
686 if (areq->span) {
687 LCB_CMD_SET_TRACESPAN(&cmd, areq->span);
688 }
689 #endif
690 dreq->callback = doc_callback;
691 cmd.cmdflags |= LCB_CMD_F_INTERNAL_CALLBACK;
692 lcb_error_t err = lcb_store3(q->instance, &dreq->callback, &cmd);
693 if (id_param.id_free) {
694 id_param.id_free(id_param.id);
695 }
696 if (body_param.out_free && body_param.out) {
697 body_param.out_free(body_param.out);
698 }
699 return err;
700 }
701
cb_doc_ready(lcb::docreq::Queue * q,lcb::docreq::DocRequest * req_base)702 static void cb_doc_ready(lcb::docreq::Queue *q, lcb::docreq::DocRequest *req_base)
703 {
704 IngestRequest *req = (IngestRequest *)req_base;
705 /* TODO: check if we should ignore errors */
706 delete req;
707
708 if (q->parent) {
709 reinterpret_cast< lcb_ANALYTICSREQ * >(q->parent)->unref();
710 }
711 }
712
cb_docq_throttle(lcb::docreq::Queue * q,int enabled)713 static void cb_docq_throttle(lcb::docreq::Queue *q, int enabled)
714 {
715 lcb_ANALYTICSREQ *req = reinterpret_cast< lcb_ANALYTICSREQ * >(q->parent);
716 if (req == NULL || req->htreq == NULL) {
717 return;
718 }
719 if (enabled) {
720 req->htreq->pause();
721 } else {
722 req->htreq->resume();
723 }
724 }
725
lcb_ANALYTICSREQ(lcb_t obj,const void * user_cookie,lcb_CMDANALYTICS * cmd)726 lcb_ANALYTICSREQ::lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_CMDANALYTICS *cmd)
727 : cur_htresp(NULL), htreq(NULL), parser(new lcb::jsparse::Parser(lcb::jsparse::Parser::MODE_ANALYTICS, this)),
728 cookie(user_cookie), callback(cmd->callback), instance(obj), lasterr(LCB_SUCCESS), timeout(0), nrows(0),
729 was_retried(false), deferred_handle(""), ingest(cmd->ingest), docq(NULL), refcount(1)
730 #ifdef LCB_TRACING
731 ,
732 span(NULL)
733 #endif
734 {
735 if (cmd->handle) {
736 cmd->handle = this;
737 }
738
739 if (!parse_json(cmd->encoded.c_str(), cmd->encoded.size(), json)) {
740 lasterr = LCB_EINVAL;
741 return;
742 }
743
744 const Json::Value &j_statement = json_const()["statement"];
745 if (j_statement.isString()) {
746 statement = j_statement.asString();
747 } else if (!j_statement.isNull()) {
748 lasterr = LCB_EINVAL;
749 return;
750 }
751
752 Json::Value &tmoval = json["timeout"];
753 if (tmoval.isNull()) {
754 // Set the default timeout as the server-side query timeout if no
755 // other timeout is used.
756 char buf[64] = {0};
757 sprintf(buf, "%uus", LCBT_SETTING(obj, n1ql_timeout));
758 tmoval = buf;
759 /* FIXME: use separate timeout for analytics */
760 timeout = LCBT_SETTING(obj, n1ql_timeout);
761 } else if (tmoval.isString()) {
762 timeout = lcb_analyticsreq_parsetmo(tmoval.asString());
763 } else {
764 // Timeout is not a string!
765 lasterr = LCB_EINVAL;
766 return;
767 }
768
769 #ifdef LCB_TRACING
770 if (instance->settings->tracer) {
771 char id[20] = {0};
772 snprintf(id, sizeof(id), "%p", (void *)this);
773 span = lcbtrace_span_start(instance->settings->tracer, LCBTRACE_OP_DISPATCH_TO_SERVER, LCBTRACE_NOW, NULL);
774 lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, id);
775 lcbtrace_span_add_system_tags(span, instance->settings, LCBTRACE_TAG_SERVICE_ANALYTICS);
776 }
777 #endif
778
779 if (ingest.method != LCB_ANALYTICSINGEST_NONE) {
780 docq = new lcb::docreq::Queue(instance);
781 docq->parent = this;
782 docq->cb_schedule = cb_op_schedule;
783 docq->cb_ready = cb_doc_ready;
784 docq->cb_throttle = cb_docq_throttle;
785 // TODO: docq->max_pending_response;
786 lcb_aspend_add(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL);
787 }
788 }
789
lcb_ANALYTICSREQ(lcb_t obj,const void * user_cookie,lcb_ANALYTICSDEFERREDHANDLE * handle)790 lcb_ANALYTICSREQ::lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_ANALYTICSDEFERREDHANDLE *handle)
791 : cur_htresp(NULL), htreq(NULL),
792 parser(new lcb::jsparse::Parser(lcb::jsparse::Parser::MODE_ANALYTICS_DEFERRED, this)), cookie(user_cookie),
793 callback(handle->callback), instance(obj), lasterr(LCB_SUCCESS), timeout(0), nrows(0), was_retried(false),
794 deferred_handle(handle->handle), docq(NULL), refcount(1)
795 #ifdef LCB_TRACING
796 ,
797 span(NULL)
798 #endif
799 {
800 /* FIXME: use separate timeout for analytics */
801 timeout = LCBT_SETTING(obj, n1ql_timeout);
802
803 #ifdef LCB_TRACING
804 if (instance->settings->tracer) {
805 char id[20] = {0};
806 snprintf(id, sizeof(id), "%p", (void *)this);
807 span = lcbtrace_span_start(instance->settings->tracer, LCBTRACE_OP_DISPATCH_TO_SERVER, LCBTRACE_NOW, NULL);
808 lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, id);
809 lcbtrace_span_add_system_tags(span, instance->settings, LCBTRACE_TAG_SERVICE_ANALYTICS);
810 }
811 #endif
812 }
813
814 LIBCOUCHBASE_API
lcb_analytics_query(lcb_t instance,const void * cookie,lcb_CMDANALYTICS * cmd)815 lcb_error_t lcb_analytics_query(lcb_t instance, const void *cookie, lcb_CMDANALYTICS *cmd)
816 {
817 lcb_error_t err;
818 ANALYTICSREQ *req = NULL;
819
820 if (cmd->callback == NULL) {
821 return LCB_EINVAL;
822 }
823 if (!cmd->encode()) {
824 return LCB_EINVAL;
825 }
826
827 req = new lcb_ANALYTICSREQ(instance, cookie, cmd);
828 if (!req) {
829 err = LCB_CLIENT_ENOMEM;
830 goto GT_DESTROY;
831 }
832 if ((err = req->lasterr) != LCB_SUCCESS) {
833 goto GT_DESTROY;
834 }
835
836 if ((err = req->issue_htreq()) != LCB_SUCCESS) {
837 goto GT_DESTROY;
838 }
839
840 return LCB_SUCCESS;
841
842 GT_DESTROY:
843 if (cmd->handle) {
844 cmd->handle = NULL;
845 }
846
847 if (req) {
848 req->callback = NULL;
849 req->unref();
850 }
851 return err;
852 }
853
lcb_analytics_defhnd_poll(lcb_t instance,const void * cookie,lcb_ANALYTICSDEFERREDHANDLE * handle)854 lcb_error_t lcb_analytics_defhnd_poll(lcb_t instance, const void *cookie, lcb_ANALYTICSDEFERREDHANDLE *handle)
855 {
856 lcb_error_t err;
857 ANALYTICSREQ *req = NULL;
858
859 if (handle->callback == NULL || handle->handle.empty()) {
860 return LCB_EINVAL;
861 }
862
863 req = new lcb_ANALYTICSREQ(instance, cookie, handle);
864 if (!req) {
865 err = LCB_CLIENT_ENOMEM;
866 goto GT_DESTROY;
867 }
868 if ((err = req->lasterr) != LCB_SUCCESS) {
869 goto GT_DESTROY;
870 }
871
872 if ((err = req->issue_htreq()) != LCB_SUCCESS) {
873 goto GT_DESTROY;
874 }
875
876 return LCB_SUCCESS;
877
878 GT_DESTROY:
879 if (req) {
880 req->callback = NULL;
881 req->unref();
882 }
883 return err;
884 }
885
886 LIBCOUCHBASE_API
lcb_analytics_cancel(lcb_t,lcb_ANALYTICSHANDLE handle)887 void lcb_analytics_cancel(lcb_t, lcb_ANALYTICSHANDLE handle)
888 {
889 if (handle->callback) {
890 handle->callback = NULL;
891 if (handle->docq) {
892 handle->docq->cancel();
893 }
894 }
895 }
896
897 #ifdef LCB_TRACING
898
899 LIBCOUCHBASE_API
lcb_analytics_set_parent_span(lcb_t,lcb_ANALYTICSHANDLE handle,lcbtrace_SPAN * span)900 void lcb_analytics_set_parent_span(lcb_t, lcb_ANALYTICSHANDLE handle, lcbtrace_SPAN *span)
901 {
902 if (handle) {
903 lcbtrace_span_set_parent(handle->span, span);
904 }
905 }
906
907 #endif
908