1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <libcouchbase/couchbase.h>
19 #include <libcouchbase/analytics.h>
20 #include <jsparse/parser.h>
21 #include "internal.h"
22 #include "auth-priv.h"
23 #include "http/http.h"
24 #include "logging.h"
25 #include "contrib/lcb-jsoncpp/lcb-jsoncpp.h"
26 #include <map>
27 #include <string>
28 #include <list>
29 #include "docreq/docreq.h"
30 #include "rnd.h"
31 
32 #define LOGFMT "(NR=%p) "
33 #define LOGID(req) static_cast< const void * >(req)
34 #define LOGARGS(req, lvl) req->instance->settings, "analytics", LCB_LOG_##lvl, __FILE__, __LINE__
35 
36 using namespace lcb;
37 
default_id_generator(lcb_t,const void *,lcb_ANALYTICSINGESTIDGENERATORPARAM * param)38 static lcb_ANALYTICSINGESTSTATUS default_id_generator(lcb_t, const void *, lcb_ANALYTICSINGESTIDGENERATORPARAM *param)
39 {
40     param->id = static_cast<char *>(calloc(34, sizeof(char)));
41     param->id_free = free;
42     param->nid = snprintf(param->id, 34, "%016" PRIx64 "-%016" PRIx64, lcb_next_rand64(), lcb_next_rand64());
43     return LCB_ANALYTICSINGEST_OK;
44 }
45 
default_data_converter(lcb_t,const void *,lcb_ANALYTICSINGESTDATACONVERTERPARAM *)46 static lcb_ANALYTICSINGESTSTATUS default_data_converter(lcb_t, const void *, lcb_ANALYTICSINGESTDATACONVERTERPARAM *)
47 {
48     return LCB_ANALYTICSINGEST_OK;
49 }
50 
51 struct lcb_ANALYTICSINGEST_st {
52     lcb_ANALYTICSINGESTMETHOD method;
53     uint32_t exptime;
54     bool ignore_errors;
55     lcb_ANALYTICSINGESTIDGENERATOR id_generator;
56     lcb_ANALYTICSINGESTDATACONVERTER data_converter;
57 
lcb_ANALYTICSINGEST_stlcb_ANALYTICSINGEST_st58     lcb_ANALYTICSINGEST_st()
59         : method(LCB_ANALYTICSINGEST_NONE), exptime(0), ignore_errors(false), id_generator(default_id_generator),
60           data_converter(default_data_converter)
61     {
62     }
63 };
64 
65 struct lcb_ANALYTICSREQ;
66 struct IngestRequest : docreq::DocRequest {
67     lcb_ANALYTICSREQ *parent;
68     std::string row;
69 };
70 
71 struct lcb_CMDANALYTICS_st {
72     Json::Value root;
73     std::string encoded;
74     lcb_ANALYTICSCALLBACK callback;
75     lcb_ANALYTICSHANDLE handle;
76     lcb_ANALYTICSINGEST_st ingest;
77 
lcb_CMDANALYTICS_stlcb_CMDANALYTICS_st78     lcb_CMDANALYTICS_st() : root(Json::objectValue), callback(NULL), handle(NULL) {}
79 
encodelcb_CMDANALYTICS_st80     bool encode()
81     {
82         encoded = Json::FastWriter().write(root);
83         return true;
84     }
85 };
86 
87 LIBCOUCHBASE_API
lcb_analytics_new(void)88 lcb_CMDANALYTICS *lcb_analytics_new(void)
89 {
90     return new lcb_CMDANALYTICS;
91 }
92 
93 LIBCOUCHBASE_API
lcb_analytics_reset(lcb_CMDANALYTICS * cmd)94 void lcb_analytics_reset(lcb_CMDANALYTICS *cmd)
95 {
96     cmd->encoded.clear();
97     cmd->root.clear();
98 }
99 
100 LIBCOUCHBASE_API
lcb_analytics_free(lcb_CMDANALYTICS * cmd)101 void lcb_analytics_free(lcb_CMDANALYTICS *cmd)
102 {
103     delete cmd;
104 }
105 
106 LIBCOUCHBASE_API
lcb_analytics_gethandle(lcb_CMDANALYTICS * cmd)107 lcb_ANALYTICSHANDLE lcb_analytics_gethandle(lcb_CMDANALYTICS *cmd)
108 {
109     return cmd->handle;
110 }
111 
112 LIBCOUCHBASE_API
lcb_analytics_setcallback(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSCALLBACK callback)113 lcb_error_t lcb_analytics_setcallback(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSCALLBACK callback)
114 {
115     if (cmd) {
116         cmd->callback = callback;
117         return LCB_SUCCESS;
118     }
119     return LCB_EINVAL;
120 }
121 
122 #define fix_strlen(s, n)                                                                                               \
123     if (n == (size_t)-1) {                                                                                             \
124         n = strlen(s);                                                                                                 \
125     }
126 
127 LIBCOUCHBASE_API
lcb_analytics_setquery(lcb_CMDANALYTICS * cmd,const char * qstr,size_t nqstr)128 lcb_error_t lcb_analytics_setquery(lcb_CMDANALYTICS *cmd, const char *qstr, size_t nqstr)
129 {
130     fix_strlen(qstr, nqstr);
131     Json::Value value;
132     if (!Json::Reader().parse(qstr, qstr + nqstr, value)) {
133         return LCB_EINVAL;
134     }
135     cmd->root = value;
136     return LCB_SUCCESS;
137 }
138 
139 LIBCOUCHBASE_API
lcb_analytics_setopt(lcb_CMDANALYTICS * cmd,const char * k,size_t nk,const char * v,size_t nv)140 lcb_error_t lcb_analytics_setopt(lcb_CMDANALYTICS *cmd, const char *k, size_t nk, const char *v, size_t nv)
141 {
142     fix_strlen(v, nv);
143     fix_strlen(k, nk);
144     Json::Value value;
145     if (!Json::Reader().parse(v, v + nv, value)) {
146         return LCB_EINVAL;
147     }
148     cmd->root[std::string(k, nk)] = value;
149     return LCB_SUCCESS;
150 }
151 
152 LIBCOUCHBASE_API
lcb_analytics_setstatement(lcb_CMDANALYTICS * cmd,const char * sstr,size_t nsstr)153 lcb_error_t lcb_analytics_setstatement(lcb_CMDANALYTICS *cmd, const char *sstr, size_t nsstr)
154 {
155     fix_strlen(sstr, nsstr);
156     cmd->root["statement"] = std::string(sstr, nsstr);
157     return LCB_SUCCESS;
158 }
159 
160 LIBCOUCHBASE_API
lcb_analytics_namedparam(lcb_CMDANALYTICS * cmd,const char * name,size_t nname,const char * value,size_t nvalue)161 lcb_error_t lcb_analytics_namedparam(lcb_CMDANALYTICS *cmd, const char *name, size_t nname, const char *value,
162                                      size_t nvalue)
163 {
164     return lcb_analytics_setopt(cmd, name, nname, value, nvalue);
165 }
166 
167 LIBCOUCHBASE_API
lcb_analytics_posparam(lcb_CMDANALYTICS * cmd,const char * value,size_t nvalue)168 lcb_error_t lcb_analytics_posparam(lcb_CMDANALYTICS *cmd, const char *value, size_t nvalue)
169 {
170     fix_strlen(value, nvalue);
171     Json::Value jval;
172     if (!Json::Reader().parse(value, value + nvalue, jval)) {
173         return LCB_EINVAL;
174     }
175     cmd->root["args"].append(jval);
176     return LCB_SUCCESS;
177 }
178 
179 LIBCOUCHBASE_API
lcb_analytics_ingest_setmethod(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTMETHOD method)180 lcb_error_t lcb_analytics_ingest_setmethod(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTMETHOD method)
181 {
182     cmd->ingest.method = method;
183     return LCB_SUCCESS;
184 }
185 
186 LIBCOUCHBASE_API
lcb_analytics_ingest_setexptime(lcb_CMDANALYTICS * cmd,lcb_U32 exptime)187 lcb_error_t lcb_analytics_ingest_setexptime(lcb_CMDANALYTICS *cmd, lcb_U32 exptime)
188 {
189     cmd->ingest.exptime = exptime;
190     return LCB_SUCCESS;
191 }
192 
193 LIBCOUCHBASE_API
lcb_analytics_ingest_ignoreingesterror(lcb_CMDANALYTICS * cmd,int ignore)194 lcb_error_t lcb_analytics_ingest_ignoreingesterror(lcb_CMDANALYTICS *cmd, int ignore)
195 {
196     cmd->ingest.ignore_errors = ignore ? true : false;
197     return LCB_SUCCESS;
198 }
199 
200 LIBCOUCHBASE_API
lcb_analytics_ingest_setidgenerator(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTIDGENERATOR generator)201 lcb_error_t lcb_analytics_ingest_setidgenerator(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTIDGENERATOR generator)
202 {
203     cmd->ingest.id_generator = generator;
204     return LCB_SUCCESS;
205 }
206 
207 LIBCOUCHBASE_API
lcb_analytics_ingest_setdataconverter(lcb_CMDANALYTICS * cmd,lcb_ANALYTICSINGESTDATACONVERTER converter)208 lcb_error_t lcb_analytics_ingest_setdataconverter(lcb_CMDANALYTICS *cmd, lcb_ANALYTICSINGESTDATACONVERTER converter)
209 {
210     cmd->ingest.data_converter = converter;
211     return LCB_SUCCESS;
212 }
213 
214 LIBCOUCHBASE_API
lcb_analytics_setdeferred(lcb_CMDANALYTICS * cmd,int deferred)215 lcb_error_t lcb_analytics_setdeferred(lcb_CMDANALYTICS *cmd, int deferred)
216 {
217     if (deferred) {
218         cmd->root["mode"] = std::string("async");
219     } else {
220         cmd->root.removeMember("mode");
221     }
222     return LCB_SUCCESS;
223 }
224 
225 struct lcb_ANALYTICSDEFERREDHANDLE_st {
226     std::string status;
227     std::string handle;
228     lcb_ANALYTICSCALLBACK callback;
229 
lcb_ANALYTICSDEFERREDHANDLE_stlcb_ANALYTICSDEFERREDHANDLE_st230     lcb_ANALYTICSDEFERREDHANDLE_st(std::string status_, std::string handle_) : status(status_), handle(handle_) {}
231 };
232 
lcb_analytics_defhnd_extract(const lcb_RESPANALYTICS * resp)233 lcb_ANALYTICSDEFERREDHANDLE *lcb_analytics_defhnd_extract(const lcb_RESPANALYTICS *resp)
234 {
235     if (resp == NULL || resp->rc != LCB_SUCCESS || ((resp->rflags & (LCB_RESP_F_FINAL | LCB_RESP_F_EXTDATA)) == 0) ||
236         resp->nrow == 0 || resp->row == NULL) {
237         return NULL;
238     }
239     Json::Value payload;
240     if (!Json::Reader().parse(resp->row, resp->row + resp->nrow, payload)) {
241         return NULL;
242     }
243     if (!payload.isObject()) {
244         return NULL;
245     }
246     Json::Value status = payload["status"];
247     Json::Value handle = payload["handle"];
248     if (status.isString() && handle.isString()) {
249         return new lcb_ANALYTICSDEFERREDHANDLE_st(status.asString(), handle.asString());
250     }
251     return NULL;
252 }
253 
lcb_analytics_defhnd_free(lcb_ANALYTICSDEFERREDHANDLE * handle)254 void lcb_analytics_defhnd_free(lcb_ANALYTICSDEFERREDHANDLE *handle)
255 {
256     if (handle == NULL) {
257         return;
258     }
259     delete handle;
260 }
261 
lcb_analytics_defhnd_status(lcb_ANALYTICSDEFERREDHANDLE * handle)262 const char *lcb_analytics_defhnd_status(lcb_ANALYTICSDEFERREDHANDLE *handle)
263 {
264     if (handle == NULL) {
265         return NULL;
266     }
267     return handle->status.c_str();
268 }
269 
lcb_analytics_defhnd_setcallback(lcb_ANALYTICSDEFERREDHANDLE * handle,lcb_ANALYTICSCALLBACK callback)270 lcb_error_t lcb_analytics_defhnd_setcallback(lcb_ANALYTICSDEFERREDHANDLE *handle, lcb_ANALYTICSCALLBACK callback)
271 {
272     if (handle) {
273         handle->callback = callback;
274         return LCB_SUCCESS;
275     }
276     return LCB_EINVAL;
277 }
278 
279 typedef struct lcb_ANALYTICSREQ : lcb::jsparse::Parser::Actions {
280     const lcb_RESPHTTP *cur_htresp;
281     struct lcb_http_request_st *htreq;
282     lcb::jsparse::Parser *parser;
283     const void *cookie;
284     lcb_ANALYTICSCALLBACK callback;
285     lcb_t instance;
286     lcb_error_t lasterr;
287     lcb_U32 timeout;
288     // How many rows were received. Used to avoid parsing the meta
289     size_t nrows;
290 
291     /** Request body as received from the application */
292     Json::Value json;
json_constlcb_ANALYTICSREQ293     const Json::Value &json_const() const
294     {
295         return json;
296     }
297 
298     /** String of the original statement. Cached here to avoid jsoncpp lookups */
299     std::string statement;
300 
301     /** Whether we're retrying this */
302     bool was_retried;
303 
304     /** Non-empty if this is deferred query check/fetch */
305     std::string deferred_handle;
306 
307     lcb_ANALYTICSINGEST_st ingest;
308     docreq::Queue *docq;
309     unsigned refcount;
310 
311 #ifdef LCB_TRACING
312     lcbtrace_SPAN *span;
313 #endif
314 
unreflcb_ANALYTICSREQ315     void unref()
316     {
317         if (!--refcount) {
318             delete this;
319         }
320     }
321 
reflcb_ANALYTICSREQ322     void ref()
323     {
324         refcount++;
325     }
326 
327     /**
328      * Issues the HTTP request for the query
329      * @param payload The body to send
330      * @return Error code from lcb's http subsystem
331      */
332     inline lcb_error_t issue_htreq(const std::string &payload);
333 
issue_htreqlcb_ANALYTICSREQ334     lcb_error_t issue_htreq()
335     {
336         std::string s = Json::FastWriter().write(json);
337         return issue_htreq(s);
338     }
339 
340     /**
341      * Attempt to retry the query. This will inspect the meta (if present)
342      * for any errors indicating that a failure might be a result of a stale
343      * plan, and if this query was retried already.
344      * @return true if the retry was successful.
345      */
346     inline bool maybe_retry();
347 
348     /**
349      * Returns true if payload matches retry conditions.
350      */
351     inline bool has_retriable_error(const Json::Value &root);
352 
353     /**
354      * Pass a row back to the application
355      * @param resp The response. This is populated with state information
356      *  from the current query
357      * @param is_last Whether this is the last row. If this is the last, then
358      *  the RESP_F_FINAL flag is set, and no further callbacks will be invoked
359      */
360     inline void invoke_row(lcb_RESPANALYTICS *resp, bool is_last);
361 
362     inline lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_CMDANALYTICS *cmd);
363     inline lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_ANALYTICSDEFERREDHANDLE *handle);
364     inline ~lcb_ANALYTICSREQ();
365 
366     // Parser overrides:
JSPARSE_on_rowlcb_ANALYTICSREQ367     void JSPARSE_on_row(const lcb::jsparse::Row &row)
368     {
369         lcb_RESPANALYTICS resp = {0};
370         resp.row = static_cast< const char * >(row.row.iov_base);
371         resp.nrow = row.row.iov_len;
372         nrows++;
373         if (ingest.method != LCB_ANALYTICSINGEST_NONE) {
374             IngestRequest *req = new IngestRequest();
375             req->parent = this;
376             req->row.assign(static_cast< const char * >(row.row.iov_base), row.row.iov_len);
377             docq->add(req);
378             ref();
379         }
380         invoke_row(&resp, false);
381     }
JSPARSE_on_errorlcb_ANALYTICSREQ382     void JSPARSE_on_error(const std::string &)
383     {
384         lasterr = LCB_PROTOCOL_ERROR;
385     }
JSPARSE_on_completelcb_ANALYTICSREQ386     void JSPARSE_on_complete(const std::string &)
387     {
388         // Nothing
389     }
390 
391 } ANALYTICSREQ;
392 
parse_json(const char * s,size_t n,Json::Value & res)393 static bool parse_json(const char *s, size_t n, Json::Value &res)
394 {
395     return Json::Reader().parse(s, s + n, res);
396 }
397 
has_retriable_error(const Json::Value & root)398 bool ANALYTICSREQ::has_retriable_error(const Json::Value &root)
399 {
400     if (!root.isObject()) {
401         return false;
402     }
403     const Json::Value &errors = root["errors"];
404     if (!errors.isArray()) {
405         return false;
406     }
407     Json::Value::const_iterator ii;
408     for (ii = errors.begin(); ii != errors.end(); ++ii) {
409         const Json::Value &cur = *ii;
410         if (!cur.isObject()) {
411             continue; // eh?
412         }
413         const Json::Value &jcode = cur["code"];
414         unsigned code = 0;
415         if (jcode.isNumeric()) {
416             code = jcode.asUInt();
417             switch (code) {
418                 case 23000:
419                 case 23003:
420                 case 23007:
421                     lcb_log(LOGARGS(this, TRACE), LOGFMT "Will retry request. code: %d", LOGID(this), code);
422                     return true;
423                 default:
424                     break;
425             }
426         }
427     }
428     return false;
429 }
430 
maybe_retry()431 bool ANALYTICSREQ::maybe_retry()
432 {
433     // Examines the buffer to determine the type of error
434     Json::Value root;
435     lcb_IOV meta;
436 
437     if (callback == NULL) {
438         // Cancelled
439         return false;
440     }
441 
442     if (nrows) {
443         // Has results:
444         return false;
445     }
446 
447     if (was_retried) {
448         return false;
449     }
450 
451     was_retried = true;
452     parser->get_postmortem(meta);
453     if (!parse_json(static_cast< const char * >(meta.iov_base), meta.iov_len, root)) {
454         return false; // Not JSON
455     }
456     if (has_retriable_error(root)) {
457         return true;
458     }
459 
460     return false;
461 }
462 
invoke_row(lcb_RESPANALYTICS * resp,bool is_last)463 void ANALYTICSREQ::invoke_row(lcb_RESPANALYTICS *resp, bool is_last)
464 {
465     resp->cookie = const_cast< void * >(cookie);
466     resp->htresp = cur_htresp;
467 
468     if (is_last) {
469         lcb_IOV meta;
470         resp->rflags |= LCB_RESP_F_FINAL;
471         resp->rc = lasterr;
472         parser->get_postmortem(meta);
473         resp->row = static_cast< const char * >(meta.iov_base);
474         resp->nrow = meta.iov_len;
475         if (!deferred_handle.empty()) {
476             /* signal that response might have deferred handle */
477             resp->rflags |= LCB_RESP_F_EXTDATA;
478         }
479     }
480 
481     if (callback) {
482         callback(instance, LCB_CALLBACK_ANALYTICS, resp);
483     }
484     if (is_last) {
485         callback = NULL;
486     }
487 }
488 
~lcb_ANALYTICSREQ()489 lcb_ANALYTICSREQ::~lcb_ANALYTICSREQ()
490 {
491     if (htreq) {
492         lcb_cancel_http_request(instance, htreq);
493         htreq = NULL;
494     }
495 
496     if (callback) {
497         lcb_RESPANALYTICS resp = {0};
498         invoke_row(&resp, 1);
499     }
500 
501 #ifdef LCB_TRACING
502     if (span) {
503         if (htreq) {
504             lcbio_CTX *ctx = htreq->ioctx;
505             if (ctx) {
506                 lcbtrace_span_add_tag_str_nocopy(span, LCBTRACE_TAG_PEER_ADDRESS, htreq->peer.c_str());
507                 lcbtrace_span_add_tag_str_nocopy(span, LCBTRACE_TAG_LOCAL_ADDRESS, ctx->sock->info->ep_local);
508             }
509         }
510         lcbtrace_span_finish(span, LCBTRACE_NOW);
511         span = NULL;
512     }
513 #endif
514 
515     if (parser) {
516         delete parser;
517     }
518 
519     if (docq != NULL) {
520         docq->parent = NULL;
521         docq->unref();
522     }
523     lcb_aspend_del(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL);
524 }
525 
chunk_callback(lcb_t instance,int ign,const lcb_RESPBASE * rb)526 static void chunk_callback(lcb_t instance, int ign, const lcb_RESPBASE *rb)
527 {
528     const lcb_RESPHTTP *rh = (const lcb_RESPHTTP *)rb;
529     ANALYTICSREQ *req = static_cast< ANALYTICSREQ * >(rh->cookie);
530 
531     (void)ign;
532     (void)instance;
533 
534     req->cur_htresp = rh;
535     if (rh->rc != LCB_SUCCESS || rh->htstatus != 200) {
536         if (req->lasterr == LCB_SUCCESS || rh->htstatus != 200) {
537             req->lasterr = rh->rc ? rh->rc : LCB_HTTP_ERROR;
538         }
539     }
540 
541     if (rh->rflags & LCB_RESP_F_FINAL) {
542         req->htreq = NULL;
543         if (!req->maybe_retry()) {
544             req->unref();
545         }
546         return;
547     } else if (req->callback == NULL) {
548         /* Cancelled. Similar to the block above, except the http request
549          * should remain alive (so we can cancel it later on) */
550         req->unref();
551         return;
552     }
553     req->parser->feed(static_cast< const char * >(rh->body), rh->nbody);
554 }
555 
issue_htreq(const std::string & body)556 lcb_error_t ANALYTICSREQ::issue_htreq(const std::string &body)
557 {
558     lcb_CMDHTTP htcmd = {0};
559     htcmd.body = body.c_str();
560     htcmd.nbody = body.size();
561 
562     htcmd.content_type = "application/json";
563     if (deferred_handle.empty()) {
564         htcmd.method = LCB_HTTP_METHOD_POST;
565     } else {
566         htcmd.method = LCB_HTTP_METHOD_GET;
567         htcmd.host = deferred_handle.c_str();
568     }
569 
570     htcmd.type = LCB_HTTP_TYPE_CBAS;
571 
572     htcmd.cmdflags = LCB_CMDHTTP_F_STREAM | LCB_CMDHTTP_F_CASTMO;
573     htcmd.reqhandle = &htreq;
574     htcmd.cas = timeout;
575 
576     lcb_error_t rc = lcb_http3(instance, this, &htcmd);
577     if (rc == LCB_SUCCESS) {
578         htreq->set_callback(chunk_callback);
579     }
580     return rc;
581 }
582 
lcb_analyticsreq_parsetmo(const std::string & s)583 lcb_U32 lcb_analyticsreq_parsetmo(const std::string &s)
584 {
585     double num;
586     int nchars, rv;
587 
588     rv = sscanf(s.c_str(), "%lf%n", &num, &nchars);
589     if (rv != 1) {
590         return 0;
591     }
592     std::string mults = s.substr(nchars);
593 
594     // Get the actual timeout value in microseconds. Note we can't use the macros
595     // since they will truncate the double value.
596     if (mults == "s") {
597         return num * static_cast< double >(LCB_S2US(1));
598     } else if (mults == "ms") {
599         return num * static_cast< double >(LCB_MS2US(1));
600     } else if (mults == "h") {
601         return num * static_cast< double >(LCB_S2US(3600));
602     } else if (mults == "us") {
603         return num;
604     } else if (mults == "m") {
605         return num * static_cast< double >(LCB_S2US(60));
606     } else if (mults == "ns") {
607         return LCB_NS2US(num);
608     } else {
609         return 0;
610     }
611 }
612 
doc_callback(lcb_t,int,const lcb_RESPBASE * rb)613 static void doc_callback(lcb_t, int, const lcb_RESPBASE *rb)
614 {
615     lcb::docreq::DocRequest *dreq = reinterpret_cast< lcb::docreq::DocRequest * >(rb->cookie);
616     lcb::docreq::Queue *q = dreq->parent;
617 
618     q->ref();
619 
620     q->n_awaiting_response--;
621     dreq->ready = 1;
622 
623     q->check();
624 
625     q->unref();
626 }
627 
cb_op_schedule(lcb::docreq::Queue * q,lcb::docreq::DocRequest * dreq)628 static lcb_error_t cb_op_schedule(lcb::docreq::Queue *q, lcb::docreq::DocRequest *dreq)
629 {
630     IngestRequest *req = reinterpret_cast< IngestRequest * >(dreq);
631     lcb_ANALYTICSREQ *areq = req->parent;
632     lcb_ANALYTICSINGESTSTATUS rc;
633 
634     lcb_ANALYTICSINGESTIDGENERATORPARAM id_param;
635     id_param.method = areq->ingest.method;
636     id_param.row = req->row.c_str();
637     id_param.nrow = req->row.size();
638     id_param.id_free = NULL;
639     id_param.id = NULL;
640     id_param.nid = 0;
641 
642     rc = areq->ingest.id_generator(q->instance, areq->cookie, &id_param);
643     if (rc != LCB_ANALYTICSINGEST_OK || id_param.id == NULL) {
644         return LCB_EINTERNAL;
645     }
646 
647     lcb_ANALYTICSINGESTDATACONVERTERPARAM body_param;
648     body_param.method = areq->ingest.method;
649     body_param.row = req->row.c_str();
650     body_param.nrow = req->row.size();
651     body_param.out_free = NULL;
652     body_param.out = NULL;
653     body_param.nout = 0;
654     rc = areq->ingest.data_converter(q->instance, areq->cookie, &body_param);
655     if (rc != LCB_ANALYTICSINGEST_OK) {
656         if (id_param.id_free) {
657             id_param.id_free(id_param.id);
658         }
659         return LCB_EINTERNAL;
660     }
661 
662     lcb_CMDSTORE cmd = {0};
663     cmd.exptime = areq->ingest.exptime;
664     switch (areq->ingest.method) {
665         case LCB_ANALYTICSINGEST_INSERT:
666             cmd.operation = LCB_ADD;
667             break;
668         case LCB_ANALYTICSINGEST_REPLACE:
669             cmd.operation = LCB_REPLACE;
670             break;
671         case LCB_ANALYTICSINGEST_UPSERT:
672         default:
673             cmd.operation = LCB_UPSERT;
674             break;
675     }
676     LCB_CMD_SET_KEY(&cmd, id_param.id, id_param.nid);
677     cmd.value.vtype = LCB_KV_COPY;
678     if (body_param.out) {
679         cmd.value.u_buf.contig.bytes = body_param.out;
680         cmd.value.u_buf.contig.nbytes = body_param.nout;
681     } else {
682         cmd.value.u_buf.contig.bytes = req->row.c_str();
683         cmd.value.u_buf.contig.nbytes = req->row.size();
684     }
685 #ifdef LCB_TRACING
686     if (areq->span) {
687         LCB_CMD_SET_TRACESPAN(&cmd, areq->span);
688     }
689 #endif
690     dreq->callback = doc_callback;
691     cmd.cmdflags |= LCB_CMD_F_INTERNAL_CALLBACK;
692     lcb_error_t err = lcb_store3(q->instance, &dreq->callback, &cmd);
693     if (id_param.id_free) {
694         id_param.id_free(id_param.id);
695     }
696     if (body_param.out_free && body_param.out) {
697         body_param.out_free(body_param.out);
698     }
699     return err;
700 }
701 
cb_doc_ready(lcb::docreq::Queue * q,lcb::docreq::DocRequest * req_base)702 static void cb_doc_ready(lcb::docreq::Queue *q, lcb::docreq::DocRequest *req_base)
703 {
704     IngestRequest *req = (IngestRequest *)req_base;
705     /* TODO: check if we should ignore errors */
706     delete req;
707 
708     if (q->parent) {
709         reinterpret_cast< lcb_ANALYTICSREQ * >(q->parent)->unref();
710     }
711 }
712 
cb_docq_throttle(lcb::docreq::Queue * q,int enabled)713 static void cb_docq_throttle(lcb::docreq::Queue *q, int enabled)
714 {
715     lcb_ANALYTICSREQ *req = reinterpret_cast< lcb_ANALYTICSREQ * >(q->parent);
716     if (req == NULL || req->htreq == NULL) {
717         return;
718     }
719     if (enabled) {
720         req->htreq->pause();
721     } else {
722         req->htreq->resume();
723     }
724 }
725 
lcb_ANALYTICSREQ(lcb_t obj,const void * user_cookie,lcb_CMDANALYTICS * cmd)726 lcb_ANALYTICSREQ::lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_CMDANALYTICS *cmd)
727     : cur_htresp(NULL), htreq(NULL), parser(new lcb::jsparse::Parser(lcb::jsparse::Parser::MODE_ANALYTICS, this)),
728       cookie(user_cookie), callback(cmd->callback), instance(obj), lasterr(LCB_SUCCESS), timeout(0), nrows(0),
729       was_retried(false), deferred_handle(""), ingest(cmd->ingest), docq(NULL), refcount(1)
730 #ifdef LCB_TRACING
731       ,
732       span(NULL)
733 #endif
734 {
735     if (cmd->handle) {
736         cmd->handle = this;
737     }
738 
739     if (!parse_json(cmd->encoded.c_str(), cmd->encoded.size(), json)) {
740         lasterr = LCB_EINVAL;
741         return;
742     }
743 
744     const Json::Value &j_statement = json_const()["statement"];
745     if (j_statement.isString()) {
746         statement = j_statement.asString();
747     } else if (!j_statement.isNull()) {
748         lasterr = LCB_EINVAL;
749         return;
750     }
751 
752     Json::Value &tmoval = json["timeout"];
753     if (tmoval.isNull()) {
754         // Set the default timeout as the server-side query timeout if no
755         // other timeout is used.
756         char buf[64] = {0};
757         sprintf(buf, "%uus", LCBT_SETTING(obj, n1ql_timeout));
758         tmoval = buf;
759         /* FIXME: use separate timeout for analytics */
760         timeout = LCBT_SETTING(obj, n1ql_timeout);
761     } else if (tmoval.isString()) {
762         timeout = lcb_analyticsreq_parsetmo(tmoval.asString());
763     } else {
764         // Timeout is not a string!
765         lasterr = LCB_EINVAL;
766         return;
767     }
768 
769 #ifdef LCB_TRACING
770     if (instance->settings->tracer) {
771         char id[20] = {0};
772         snprintf(id, sizeof(id), "%p", (void *)this);
773         span = lcbtrace_span_start(instance->settings->tracer, LCBTRACE_OP_DISPATCH_TO_SERVER, LCBTRACE_NOW, NULL);
774         lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, id);
775         lcbtrace_span_add_system_tags(span, instance->settings, LCBTRACE_TAG_SERVICE_ANALYTICS);
776     }
777 #endif
778 
779     if (ingest.method != LCB_ANALYTICSINGEST_NONE) {
780         docq = new lcb::docreq::Queue(instance);
781         docq->parent = this;
782         docq->cb_schedule = cb_op_schedule;
783         docq->cb_ready = cb_doc_ready;
784         docq->cb_throttle = cb_docq_throttle;
785         // TODO: docq->max_pending_response;
786         lcb_aspend_add(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL);
787     }
788 }
789 
lcb_ANALYTICSREQ(lcb_t obj,const void * user_cookie,lcb_ANALYTICSDEFERREDHANDLE * handle)790 lcb_ANALYTICSREQ::lcb_ANALYTICSREQ(lcb_t obj, const void *user_cookie, lcb_ANALYTICSDEFERREDHANDLE *handle)
791     : cur_htresp(NULL), htreq(NULL),
792       parser(new lcb::jsparse::Parser(lcb::jsparse::Parser::MODE_ANALYTICS_DEFERRED, this)), cookie(user_cookie),
793       callback(handle->callback), instance(obj), lasterr(LCB_SUCCESS), timeout(0), nrows(0), was_retried(false),
794       deferred_handle(handle->handle), docq(NULL), refcount(1)
795 #ifdef LCB_TRACING
796       ,
797       span(NULL)
798 #endif
799 {
800     /* FIXME: use separate timeout for analytics */
801     timeout = LCBT_SETTING(obj, n1ql_timeout);
802 
803 #ifdef LCB_TRACING
804     if (instance->settings->tracer) {
805         char id[20] = {0};
806         snprintf(id, sizeof(id), "%p", (void *)this);
807         span = lcbtrace_span_start(instance->settings->tracer, LCBTRACE_OP_DISPATCH_TO_SERVER, LCBTRACE_NOW, NULL);
808         lcbtrace_span_add_tag_str(span, LCBTRACE_TAG_OPERATION_ID, id);
809         lcbtrace_span_add_system_tags(span, instance->settings, LCBTRACE_TAG_SERVICE_ANALYTICS);
810     }
811 #endif
812 }
813 
814 LIBCOUCHBASE_API
lcb_analytics_query(lcb_t instance,const void * cookie,lcb_CMDANALYTICS * cmd)815 lcb_error_t lcb_analytics_query(lcb_t instance, const void *cookie, lcb_CMDANALYTICS *cmd)
816 {
817     lcb_error_t err;
818     ANALYTICSREQ *req = NULL;
819 
820     if (cmd->callback == NULL) {
821         return LCB_EINVAL;
822     }
823     if (!cmd->encode()) {
824         return LCB_EINVAL;
825     }
826 
827     req = new lcb_ANALYTICSREQ(instance, cookie, cmd);
828     if (!req) {
829         err = LCB_CLIENT_ENOMEM;
830         goto GT_DESTROY;
831     }
832     if ((err = req->lasterr) != LCB_SUCCESS) {
833         goto GT_DESTROY;
834     }
835 
836     if ((err = req->issue_htreq()) != LCB_SUCCESS) {
837         goto GT_DESTROY;
838     }
839 
840     return LCB_SUCCESS;
841 
842 GT_DESTROY:
843     if (cmd->handle) {
844         cmd->handle = NULL;
845     }
846 
847     if (req) {
848         req->callback = NULL;
849         req->unref();
850     }
851     return err;
852 }
853 
lcb_analytics_defhnd_poll(lcb_t instance,const void * cookie,lcb_ANALYTICSDEFERREDHANDLE * handle)854 lcb_error_t lcb_analytics_defhnd_poll(lcb_t instance, const void *cookie, lcb_ANALYTICSDEFERREDHANDLE *handle)
855 {
856     lcb_error_t err;
857     ANALYTICSREQ *req = NULL;
858 
859     if (handle->callback == NULL || handle->handle.empty()) {
860         return LCB_EINVAL;
861     }
862 
863     req = new lcb_ANALYTICSREQ(instance, cookie, handle);
864     if (!req) {
865         err = LCB_CLIENT_ENOMEM;
866         goto GT_DESTROY;
867     }
868     if ((err = req->lasterr) != LCB_SUCCESS) {
869         goto GT_DESTROY;
870     }
871 
872     if ((err = req->issue_htreq()) != LCB_SUCCESS) {
873         goto GT_DESTROY;
874     }
875 
876     return LCB_SUCCESS;
877 
878 GT_DESTROY:
879     if (req) {
880         req->callback = NULL;
881         req->unref();
882     }
883     return err;
884 }
885 
886 LIBCOUCHBASE_API
lcb_analytics_cancel(lcb_t,lcb_ANALYTICSHANDLE handle)887 void lcb_analytics_cancel(lcb_t, lcb_ANALYTICSHANDLE handle)
888 {
889     if (handle->callback) {
890         handle->callback = NULL;
891         if (handle->docq) {
892             handle->docq->cancel();
893         }
894     }
895 }
896 
897 #ifdef LCB_TRACING
898 
899 LIBCOUCHBASE_API
lcb_analytics_set_parent_span(lcb_t,lcb_ANALYTICSHANDLE handle,lcbtrace_SPAN * span)900 void lcb_analytics_set_parent_span(lcb_t, lcb_ANALYTICSHANDLE handle, lcbtrace_SPAN *span)
901 {
902     if (handle) {
903         lcbtrace_span_set_parent(handle->span, span);
904     }
905 }
906 
907 #endif
908