1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "internal.h"
18 #include <vector>
19 #include <string>
20 #include <include/libcouchbase/subdoc.h>
21 
22 static lcb_size_t
23 get_value_size(mc_PACKET *packet)
24 {
25     if (packet->flags & MCREQ_F_HASVALUE) {
26         if (packet->flags & MCREQ_F_VALUE_IOV) {
27             return packet->u_value.multi.total_length;
28         } else {
29             return packet->u_value.single.size;
30         }
31     } else {
32         return 0;
33     }
34 }
35 
36 namespace SubdocCmdTraits {
37 enum Options {
38     EMPTY_PATH = 1<<0,
39     ALLOW_EXPIRY = 1<<1,
40     HAS_VALUE = 1<<2,
41     ALLOW_MKDIRP = 1<<3,
42     IS_LOOKUP = 1<<4,
43     // Must encapsulate in 'multi' spec
44     NO_STANDALONE = 1<<5
45 };
46 
47 struct Traits {
48     const unsigned allow_empty_path;
49     const unsigned allow_expiry;
50     const unsigned has_value;
51     const unsigned allow_mkdir_p;
52     const unsigned is_lookup;
53     const uint8_t opcode;
54 
55     inline bool valid() const {
56         return opcode != PROTOCOL_BINARY_CMD_INVALID;
57     }
58 
59     inline unsigned mode() const {
60         return is_lookup ? LCB_SDMULTI_MODE_LOOKUP : LCB_SDMULTI_MODE_MUTATE;
61     }
62 
63     inline bool chk_allow_empty_path(uint32_t options) const {
64         if (allow_empty_path) {
65             return true;
66         }
67         if (!is_lookup) {
68             return false;
69         }
70 
71         return (options & LCB_SDSPEC_F_XATTRPATH) != 0;
72     }
73 
74     inline Traits(uint8_t op, unsigned options) :
75         allow_empty_path(options & EMPTY_PATH),
76         allow_expiry(options & ALLOW_EXPIRY),
77         has_value(options & HAS_VALUE),
78         allow_mkdir_p(options & ALLOW_MKDIRP),
79         is_lookup(options & IS_LOOKUP),
80         opcode(op) {}
81 };
82 
83 static const Traits
84 Get(PROTOCOL_BINARY_CMD_SUBDOC_GET, IS_LOOKUP);
85 
86 static const Traits
87 Exists(PROTOCOL_BINARY_CMD_SUBDOC_EXISTS, IS_LOOKUP);
88 
89 static const Traits
90 GetCount(PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT, IS_LOOKUP|EMPTY_PATH);
91 
92 static const Traits
93 DictAdd(PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD, ALLOW_EXPIRY|HAS_VALUE);
94 
95 static const Traits
96 DictUpsert(PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT,
97     ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP);
98 
99 static const Traits
100 Remove(PROTOCOL_BINARY_CMD_SUBDOC_DELETE, ALLOW_EXPIRY);
101 
102 static const Traits
103 ArrayInsert(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT,
104     ALLOW_EXPIRY|HAS_VALUE);
105 
106 static const Traits
107 Replace(PROTOCOL_BINARY_CMD_SUBDOC_REPLACE, ALLOW_EXPIRY|HAS_VALUE);
108 
109 static const Traits
110 ArrayAddFirst(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST,
111     ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
112 
113 static const Traits
114 ArrayAddLast(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST,
115     ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
116 
117 static const Traits
118 ArrayAddUnique(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE,
119     ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
120 
121 static const Traits
122 Counter(PROTOCOL_BINARY_CMD_SUBDOC_COUNTER, ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP);
123 
124 static const Traits
125 GetDoc(PROTOCOL_BINARY_CMD_GET, IS_LOOKUP|EMPTY_PATH|NO_STANDALONE);
126 
127 static const Traits
128 SetDoc(PROTOCOL_BINARY_CMD_SET, EMPTY_PATH|NO_STANDALONE);
129 
130 static const Traits
131 DeleteDoc(PROTOCOL_BINARY_CMD_DELETE, EMPTY_PATH|NO_STANDALONE);
132 
133 static const Traits
134 Invalid(PROTOCOL_BINARY_CMD_INVALID, 0);
135 
136 const Traits&
137 find(unsigned mode)
138 {
139     switch (mode) {
140     case LCB_SDCMD_REPLACE:
141         return Replace;
142     case LCB_SDCMD_DICT_ADD:
143         return DictAdd;
144     case LCB_SDCMD_DICT_UPSERT:
145         return DictUpsert;
146     case LCB_SDCMD_ARRAY_ADD_FIRST:
147         return ArrayAddFirst;
148     case LCB_SDCMD_ARRAY_ADD_LAST:
149         return ArrayAddLast;
150     case LCB_SDCMD_ARRAY_ADD_UNIQUE:
151         return ArrayAddUnique;
152     case LCB_SDCMD_ARRAY_INSERT:
153         return ArrayInsert;
154     case LCB_SDCMD_GET:
155         return Get;
156     case LCB_SDCMD_EXISTS:
157         return Exists;
158     case LCB_SDCMD_GET_COUNT:
159         return GetCount;
160     case LCB_SDCMD_REMOVE:
161         return Remove;
162     case LCB_SDCMD_COUNTER:
163         return Counter;
164     case LCB_SDCMD_GET_FULLDOC:
165         return GetDoc;
166     case LCB_SDCMD_SET_FULLDOC:
167         return SetDoc;
168     case LCB_SDCMD_REMOVE_FULLDOC:
169         return DeleteDoc;
170     default:
171         return Invalid;
172     }
173 }
174 }
175 
176 namespace SubdocPathFlags {
177 static const uint8_t MKDIR_P = 0x01;
178 static const uint8_t XATTR = 0x04;
179 static const uint8_t EXPAND_MACROS = 0x010;
180 }
181 
182 namespace SubdocDocFlags {
183 static const uint8_t MKDOC = 0x01;
184 static const uint8_t ADDDOC = 0x02;
185 static const uint8_t ACCESS_DELETED = 0x04;
186 }
187 
188 static size_t
189 get_valbuf_size(const lcb_VALBUF& vb)
190 {
191     if (vb.vtype == LCB_KV_COPY || vb.vtype == LCB_KV_CONTIG) {
192         return vb.u_buf.contig.nbytes;
193     } else {
194         if (vb.u_buf.multi.total_length) {
195             return vb.u_buf.multi.total_length;
196         } else {
197             size_t tmp = 0;
198             for (size_t ii = 0; ii < vb.u_buf.multi.niov; ++ii) {
199                 tmp += vb.u_buf.multi.iov[ii].iov_len;
200             }
201             return tmp;
202         }
203     }
204 }
205 
206 static uint8_t
207 make_path_flags(const uint32_t user) {
208     uint8_t flags = 0;
209     if (user & LCB_SDSPEC_F_MKINTERMEDIATES) {
210         flags |= SubdocPathFlags::MKDIR_P;
211     }
212     if (user & LCB_SDSPEC_F_XATTRPATH) {
213         flags |= SubdocPathFlags::XATTR;
214     }
215     if (user & LCB_SDSPEC_F_XATTR_MACROVALUES) {
216         flags |= SubdocPathFlags::XATTR | SubdocPathFlags::EXPAND_MACROS;
217     }
218     return flags;
219 }
220 
221 static uint8_t
222 make_doc_flags(const uint32_t user) {
223     uint8_t flags = 0;
224     if (user & LCB_CMDSUBDOC_F_INSERT_DOC) {
225         flags |= SubdocDocFlags::ADDDOC;
226     }
227     if (user & LCB_CMDSUBDOC_F_UPSERT_DOC) {
228         flags |= SubdocDocFlags::MKDOC;
229     }
230     if (user & LCB_CMDSUBDOC_F_ACCESS_DELETED) {
231         flags |= SubdocDocFlags::ACCESS_DELETED;
232     }
233     return flags;
234 }
235 
236 struct MultiBuilder {
237     static unsigned infer_mode(const lcb_CMDSUBDOC *cmd) {
238         if (cmd->nspecs == 0) {
239             return 0;
240         }
241         const SubdocCmdTraits::Traits& trait = SubdocCmdTraits::find(cmd->specs[0].sdcmd);
242         if (!trait.valid()) {
243             return 0;
244         }
245         return trait.mode();
246     }
247 
248     MultiBuilder(const lcb_CMDSUBDOC *cmd_)
249     : cmd(cmd_), payload_size(0) {
250         mode = infer_mode(cmd_);
251         size_t ebufsz = is_lookup() ? cmd->nspecs * 4 : cmd->nspecs * 8;
252         extra_body = new char[ebufsz];
253         bodysz = 0;
254     }
255 
256     ~MultiBuilder() {
257         if (extra_body != NULL) {
258             delete[] extra_body;
259         }
260     }
261 
262     inline MultiBuilder(const MultiBuilder&);
263 
264     // IOVs which are fed into lcb_VALBUF for subsequent use
265     const lcb_CMDSUBDOC *cmd;
266     std::vector<lcb_IOV> iovs;
267     char *extra_body;
268     size_t bodysz;
269 
270     // Total size of the payload itself
271     size_t payload_size;
272 
273     unsigned mode;
274 
275     bool is_lookup() const {
276         return mode == LCB_SDMULTI_MODE_LOOKUP;
277     }
278 
279     bool is_mutate() const {
280         return mode == LCB_SDMULTI_MODE_MUTATE;
281     }
282 
283     void maybe_setmode(const SubdocCmdTraits::Traits& t) {
284         if (mode == 0) {
285             mode = t.mode();
286         }
287     }
288 
289     template <typename T> void add_field(T itm, size_t len) {
290         const char *b = reinterpret_cast<const char *>(&itm);
291         memcpy(extra_body + bodysz, b, len);
292         bodysz += len;
293     }
294 
295     const char *extra_mark() const {
296         return extra_body + bodysz;
297     }
298 
299     void add_extras_iov(const char *last_begin) {
300         const char *p_end = extra_mark();
301         add_iov(last_begin, p_end - last_begin);
302     }
303 
304     void add_iov(const void *b, size_t n) {
305         if (!n) {
306             return;
307         }
308 
309         lcb_IOV iov;
310         iov.iov_base = const_cast<void*>(b);
311         iov.iov_len = n;
312         iovs.push_back(iov);
313         payload_size += n;
314     }
315 
316     void add_iov(const lcb_VALBUF& vb) {
317         if (vb.vtype == LCB_KV_CONTIG || vb.vtype == LCB_KV_COPY) {
318             add_iov(vb.u_buf.contig.bytes, vb.u_buf.contig.nbytes);
319         } else {
320             for (size_t ii = 0; ii < vb.u_buf.contig.nbytes; ++ii) {
321                 const lcb_IOV& iov = vb.u_buf.multi.iov[ii];
322                 if (!iov.iov_len) {
323                     continue; // Skip it
324                 }
325                 payload_size += iov.iov_len;
326                 iovs.push_back(iov);
327             }
328         }
329     }
330 
331     inline lcb_error_t add_spec(const lcb_SDSPEC *);
332 };
333 
334 lcb_error_t
335 MultiBuilder::add_spec(const lcb_SDSPEC *spec)
336 {
337     const SubdocCmdTraits::Traits& trait = SubdocCmdTraits::find(spec->sdcmd);
338     if (!trait.valid()) {
339         return LCB_UNKNOWN_SDCMD;
340     }
341     maybe_setmode(trait);
342 
343     if (trait.mode() != mode) {
344         return LCB_OPTIONS_CONFLICT;
345     }
346 
347     const char *p_begin = extra_mark();
348     // opcode
349     add_field(trait.opcode, 1);
350     // flags
351     add_field(make_path_flags(spec->options), 1);
352 
353     uint16_t npath = static_cast<uint16_t>(spec->path.contig.nbytes);
354     if (!npath && !trait.chk_allow_empty_path(spec->options)) {
355         return LCB_EMPTY_PATH;
356     }
357 
358     // Path length
359     add_field(static_cast<uint16_t>(htons(npath)), 2);
360 
361     uint32_t vsize = 0;
362     if (is_mutate()) {
363         // Mutation needs an additional 'value' spec.
364         vsize = get_valbuf_size(spec->value);
365         add_field(static_cast<uint32_t>(htonl(vsize)), 4);
366     }
367 
368     // Finalize the header..
369     add_extras_iov(p_begin);
370 
371     // Add the actual path
372     add_iov(spec->path.contig.bytes, spec->path.contig.nbytes);
373     if (vsize) {
374         add_iov(spec->value);
375     }
376     return LCB_SUCCESS;
377 }
378 
379 
380 static lcb_error_t
381 sd3_single(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd)
382 {
383     // Find the trait
384     const lcb_SDSPEC *spec = cmd->specs;
385     const SubdocCmdTraits::Traits& traits = SubdocCmdTraits::find(spec->sdcmd);
386     lcb_error_t rc;
387 
388     // Any error here is implicitly related to the only spec
389     if (cmd->error_index) {
390         *cmd->error_index = 0;
391     }
392 
393     if (!traits.valid()) {
394         return LCB_UNKNOWN_SDCMD;
395     }
396 
397     // Determine if the trait matches the mode. Technically we don't care
398     // about this (since it's always a single command) but we do want the
399     // API to remain consistent.
400     if (cmd->multimode != 0 && cmd->multimode != traits.mode()) {
401         return LCB_OPTIONS_CONFLICT;
402     }
403 
404     if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
405         return LCB_EMPTY_KEY;
406     }
407     if (LCB_KEYBUF_IS_EMPTY(&spec->path) &&
408             !traits.chk_allow_empty_path(spec->options)) {
409         return LCB_EMPTY_PATH;
410     }
411 
412     lcb_VALBUF valbuf;
413     const lcb_VALBUF *valbuf_p = &valbuf;
414     lcb_IOV tmpiov[2];
415     lcb_FRAGBUF *fbuf = &valbuf.u_buf.multi;
416 
417     valbuf.vtype = LCB_KV_IOVCOPY;
418     fbuf->iov = tmpiov;
419     fbuf->niov = 1;
420     fbuf->total_length = 0;
421     tmpiov[0].iov_base = const_cast<void*>(spec->path.contig.bytes);
422     tmpiov[0].iov_len = spec->path.contig.nbytes;
423 
424     if (traits.has_value) {
425         if (spec->value.vtype == LCB_KV_COPY) {
426             fbuf->niov = 2;
427             /* Subdoc value is the second IOV */
428             tmpiov[1].iov_base = (void *)spec->value.u_buf.contig.bytes;
429             tmpiov[1].iov_len = spec->value.u_buf.contig.nbytes;
430         } else {
431             /* Assume properly formatted packet */
432             valbuf_p = &spec->value;
433         }
434     }
435 
436     uint8_t extlen = 3;
437     uint32_t exptime = 0;
438     if (cmd->exptime) {
439         if (!traits.allow_expiry) {
440             return LCB_OPTIONS_CONFLICT;
441         }
442         exptime = cmd->exptime;
443         extlen = 7;
444     }
445 
446     uint8_t docflags = make_doc_flags(cmd->cmdflags);
447     if (docflags) {
448         extlen++;
449     }
450 
451     protocol_binary_request_header hdr = {{0}};
452     mc_PACKET *packet;
453     mc_PIPELINE *pipeline;
454 
455     rc = mcreq_basic_packet(&instance->cmdq,
456         (const lcb_CMDBASE*)cmd,
457         &hdr, extlen, &packet, &pipeline, MCREQ_BASICPACKET_F_FALLBACKOK);
458 
459     if (rc != LCB_SUCCESS) {
460         return rc;
461     }
462 
463     rc = mcreq_reserve_value(pipeline, packet, valbuf_p);
464     if (rc != LCB_SUCCESS) {
465         mcreq_wipe_packet(pipeline, packet);
466         mcreq_release_packet(pipeline, packet);
467         return rc;
468     }
469 
470     MCREQ_PKT_RDATA(packet)->cookie = cookie;
471     MCREQ_PKT_RDATA(packet)->start = gethrtime();
472 
473     hdr.request.magic = PROTOCOL_BINARY_REQ;
474     hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
475     hdr.request.extlen = packet->extlen;
476     hdr.request.opaque = packet->opaque;
477     hdr.request.opcode = traits.opcode;
478     hdr.request.cas = lcb_htonll(cmd->cas);
479     hdr.request.bodylen = htonl(hdr.request.extlen +
480         ntohs(hdr.request.keylen) + get_value_size(packet));
481 
482     memcpy(SPAN_BUFFER(&packet->kh_span), hdr.bytes, sizeof hdr.bytes);
483 
484     char *extras = SPAN_BUFFER(&packet->kh_span) + MCREQ_PKT_BASESIZE;
485     // Path length:
486     uint16_t enc_pathlen = htons(spec->path.contig.nbytes);
487     memcpy(extras, &enc_pathlen, 2);
488     extras += 2;
489 
490     uint8_t path_flags = make_path_flags(spec->options);
491     memcpy(extras, &path_flags, 1);
492     extras += 1;
493 
494     if (exptime) {
495         uint32_t enc_exptime = htonl(exptime);
496         memcpy(extras, &enc_exptime, 4);
497         extras += 4;
498     }
499 
500     if (docflags) {
501         memcpy(extras, &docflags, 1);
502         extras += 1;
503     }
504 
505     LCB_SCHED_ADD(instance, pipeline, packet);
506     return LCB_SUCCESS;
507 }
508 
509 LIBCOUCHBASE_API
510 lcb_error_t
511 lcb_subdoc3(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd)
512 {
513     // First validate the command
514     if (cmd->nspecs == 0) {
515         return LCB_ENO_COMMANDS;
516     }
517 
518     if (cmd->nspecs == 1) {
519         switch (cmd->specs[0].sdcmd) {
520         case LCB_SDCMD_GET_FULLDOC:
521         case LCB_SDCMD_SET_FULLDOC:
522         case LCB_SDCMD_REMOVE_FULLDOC:
523             break;
524         default:
525             return sd3_single(instance, cookie, cmd);
526         }
527     }
528 
529     uint32_t expiry = cmd->exptime;
530     uint8_t docflags = make_doc_flags(cmd->cmdflags);
531     lcb_error_t rc = LCB_SUCCESS;
532 
533     MultiBuilder ctx(cmd);
534     if (cmd->error_index) {
535         *cmd->error_index = -1;
536     }
537 
538     if (expiry && !ctx.is_mutate()) {
539         return LCB_OPTIONS_CONFLICT;
540     }
541 
542     for (size_t ii = 0; ii < cmd->nspecs; ++ii) {
543         if (cmd->error_index) {
544             *cmd->error_index = ii;
545         }
546         rc = ctx.add_spec(cmd->specs + ii);
547         if (rc != LCB_SUCCESS) {
548             return rc;
549         }
550     }
551 
552     mc_PIPELINE *pl;
553     mc_PACKET *pkt;
554     uint8_t extlen = 0;
555     if (expiry) {
556         extlen += 4;
557     }
558     if (docflags) {
559         extlen ++;
560     }
561 
562     protocol_binary_request_header hdr;
563 
564     if (cmd->error_index) {
565         *cmd->error_index = -1;
566     }
567 
568     rc = mcreq_basic_packet(
569         &instance->cmdq, reinterpret_cast<const lcb_CMDBASE*>(cmd),
570         &hdr, extlen, &pkt, &pl, MCREQ_BASICPACKET_F_FALLBACKOK);
571 
572     if (rc != LCB_SUCCESS) {
573         return rc;
574     }
575 
576     lcb_VALBUF vb = { LCB_KV_IOVCOPY };
577     vb.u_buf.multi.iov = &ctx.iovs[0];
578     vb.u_buf.multi.niov = ctx.iovs.size();
579     vb.u_buf.multi.total_length = ctx.payload_size;
580     rc = mcreq_reserve_value(pl, pkt, &vb);
581 
582     if (rc != LCB_SUCCESS) {
583         mcreq_wipe_packet(pl, pkt);
584         mcreq_release_packet(pl, pkt);
585         return rc;
586     }
587 
588     // Set the header fields.
589     hdr.request.magic = PROTOCOL_BINARY_REQ;
590     if (ctx.is_lookup()) {
591         hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP;
592     } else {
593         hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION;
594     }
595     hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
596     hdr.request.extlen = pkt->extlen;
597     hdr.request.opaque = pkt->opaque;
598     hdr.request.cas = lcb_htonll(cmd->cas);
599     hdr.request.bodylen = htonl(hdr.request.extlen +
600         ntohs(hdr.request.keylen) + ctx.payload_size);
601     memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof hdr.bytes);
602     if (expiry) {
603         expiry = htonl(expiry);
604         memcpy(SPAN_BUFFER(&pkt->kh_span) + 24, &expiry, 4);
605     }
606     if (docflags) {
607         memcpy(SPAN_BUFFER(&pkt->kh_span) + 24 + (extlen -1), &docflags, 1);
608     }
609 
610     MCREQ_PKT_RDATA(pkt)->cookie = cookie;
611     MCREQ_PKT_RDATA(pkt)->start = gethrtime();
612     LCB_SCHED_ADD(instance, pl, pkt);
613     return LCB_SUCCESS;
614 }
615