1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2015 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 #include "internal.h" 18 #include <vector> 19 #include <string> 20 #include <include/libcouchbase/subdoc.h> 21 22 static lcb_size_t 23 get_value_size(mc_PACKET *packet) 24 { 25 if (packet->flags & MCREQ_F_HASVALUE) { 26 if (packet->flags & MCREQ_F_VALUE_IOV) { 27 return packet->u_value.multi.total_length; 28 } else { 29 return packet->u_value.single.size; 30 } 31 } else { 32 return 0; 33 } 34 } 35 36 namespace SubdocCmdTraits { 37 enum Options { 38 EMPTY_PATH = 1<<0, 39 ALLOW_EXPIRY = 1<<1, 40 HAS_VALUE = 1<<2, 41 ALLOW_MKDIRP = 1<<3, 42 IS_LOOKUP = 1<<4, 43 // Must encapsulate in 'multi' spec 44 NO_STANDALONE = 1<<5 45 }; 46 47 struct Traits { 48 const unsigned allow_empty_path; 49 const unsigned allow_expiry; 50 const unsigned has_value; 51 const unsigned allow_mkdir_p; 52 const unsigned is_lookup; 53 const uint8_t opcode; 54 55 inline bool valid() const { 56 return opcode != PROTOCOL_BINARY_CMD_INVALID; 57 } 58 59 inline unsigned mode() const { 60 return is_lookup ? LCB_SDMULTI_MODE_LOOKUP : LCB_SDMULTI_MODE_MUTATE; 61 } 62 63 inline bool chk_allow_empty_path(uint32_t options) const { 64 if (allow_empty_path) { 65 return true; 66 } 67 if (!is_lookup) { 68 return false; 69 } 70 71 return (options & LCB_SDSPEC_F_XATTRPATH) != 0; 72 } 73 74 inline Traits(uint8_t op, unsigned options) : 75 allow_empty_path(options & EMPTY_PATH), 76 allow_expiry(options & ALLOW_EXPIRY), 77 has_value(options & HAS_VALUE), 78 allow_mkdir_p(options & ALLOW_MKDIRP), 79 is_lookup(options & IS_LOOKUP), 80 opcode(op) {} 81 }; 82 83 static const Traits 84 Get(PROTOCOL_BINARY_CMD_SUBDOC_GET, IS_LOOKUP); 85 86 static const Traits 87 Exists(PROTOCOL_BINARY_CMD_SUBDOC_EXISTS, IS_LOOKUP); 88 89 static const Traits 90 GetCount(PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT, IS_LOOKUP|EMPTY_PATH); 91 92 static const Traits 93 DictAdd(PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD, ALLOW_EXPIRY|HAS_VALUE); 94 95 static const Traits 96 DictUpsert(PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT, 97 ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP); 98 99 static const Traits 100 Remove(PROTOCOL_BINARY_CMD_SUBDOC_DELETE, ALLOW_EXPIRY); 101 102 static const Traits 103 ArrayInsert(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT, 104 ALLOW_EXPIRY|HAS_VALUE); 105 106 static const Traits 107 Replace(PROTOCOL_BINARY_CMD_SUBDOC_REPLACE, ALLOW_EXPIRY|HAS_VALUE); 108 109 static const Traits 110 ArrayAddFirst(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST, 111 ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP); 112 113 static const Traits 114 ArrayAddLast(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST, 115 ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP); 116 117 static const Traits 118 ArrayAddUnique(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE, 119 ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP); 120 121 static const Traits 122 Counter(PROTOCOL_BINARY_CMD_SUBDOC_COUNTER, ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP); 123 124 static const Traits 125 GetDoc(PROTOCOL_BINARY_CMD_GET, IS_LOOKUP|EMPTY_PATH|NO_STANDALONE); 126 127 static const Traits 128 SetDoc(PROTOCOL_BINARY_CMD_SET, EMPTY_PATH|NO_STANDALONE); 129 130 static const Traits 131 DeleteDoc(PROTOCOL_BINARY_CMD_DELETE, EMPTY_PATH|NO_STANDALONE); 132 133 static const Traits 134 Invalid(PROTOCOL_BINARY_CMD_INVALID, 0); 135 136 const Traits& 137 find(unsigned mode) 138 { 139 switch (mode) { 140 case LCB_SDCMD_REPLACE: 141 return Replace; 142 case LCB_SDCMD_DICT_ADD: 143 return DictAdd; 144 case LCB_SDCMD_DICT_UPSERT: 145 return DictUpsert; 146 case LCB_SDCMD_ARRAY_ADD_FIRST: 147 return ArrayAddFirst; 148 case LCB_SDCMD_ARRAY_ADD_LAST: 149 return ArrayAddLast; 150 case LCB_SDCMD_ARRAY_ADD_UNIQUE: 151 return ArrayAddUnique; 152 case LCB_SDCMD_ARRAY_INSERT: 153 return ArrayInsert; 154 case LCB_SDCMD_GET: 155 return Get; 156 case LCB_SDCMD_EXISTS: 157 return Exists; 158 case LCB_SDCMD_GET_COUNT: 159 return GetCount; 160 case LCB_SDCMD_REMOVE: 161 return Remove; 162 case LCB_SDCMD_COUNTER: 163 return Counter; 164 case LCB_SDCMD_GET_FULLDOC: 165 return GetDoc; 166 case LCB_SDCMD_SET_FULLDOC: 167 return SetDoc; 168 case LCB_SDCMD_REMOVE_FULLDOC: 169 return DeleteDoc; 170 default: 171 return Invalid; 172 } 173 } 174 } 175 176 namespace SubdocPathFlags { 177 static const uint8_t MKDIR_P = 0x01; 178 static const uint8_t XATTR = 0x04; 179 static const uint8_t EXPAND_MACROS = 0x010; 180 } 181 182 namespace SubdocDocFlags { 183 static const uint8_t MKDOC = 0x01; 184 static const uint8_t ADDDOC = 0x02; 185 static const uint8_t ACCESS_DELETED = 0x04; 186 } 187 188 static size_t 189 get_valbuf_size(const lcb_VALBUF& vb) 190 { 191 if (vb.vtype == LCB_KV_COPY || vb.vtype == LCB_KV_CONTIG) { 192 return vb.u_buf.contig.nbytes; 193 } else { 194 if (vb.u_buf.multi.total_length) { 195 return vb.u_buf.multi.total_length; 196 } else { 197 size_t tmp = 0; 198 for (size_t ii = 0; ii < vb.u_buf.multi.niov; ++ii) { 199 tmp += vb.u_buf.multi.iov[ii].iov_len; 200 } 201 return tmp; 202 } 203 } 204 } 205 206 static uint8_t 207 make_path_flags(const uint32_t user) { 208 uint8_t flags = 0; 209 if (user & LCB_SDSPEC_F_MKINTERMEDIATES) { 210 flags |= SubdocPathFlags::MKDIR_P; 211 } 212 if (user & LCB_SDSPEC_F_XATTRPATH) { 213 flags |= SubdocPathFlags::XATTR; 214 } 215 if (user & LCB_SDSPEC_F_XATTR_MACROVALUES) { 216 flags |= SubdocPathFlags::XATTR | SubdocPathFlags::EXPAND_MACROS; 217 } 218 return flags; 219 } 220 221 static uint8_t 222 make_doc_flags(const uint32_t user) { 223 uint8_t flags = 0; 224 if (user & LCB_CMDSUBDOC_F_INSERT_DOC) { 225 flags |= SubdocDocFlags::ADDDOC; 226 } 227 if (user & LCB_CMDSUBDOC_F_UPSERT_DOC) { 228 flags |= SubdocDocFlags::MKDOC; 229 } 230 if (user & LCB_CMDSUBDOC_F_ACCESS_DELETED) { 231 flags |= SubdocDocFlags::ACCESS_DELETED; 232 } 233 return flags; 234 } 235 236 struct MultiBuilder { 237 static unsigned infer_mode(const lcb_CMDSUBDOC *cmd) { 238 if (cmd->nspecs == 0) { 239 return 0; 240 } 241 const SubdocCmdTraits::Traits& trait = SubdocCmdTraits::find(cmd->specs[0].sdcmd); 242 if (!trait.valid()) { 243 return 0; 244 } 245 return trait.mode(); 246 } 247 248 MultiBuilder(const lcb_CMDSUBDOC *cmd_) 249 : cmd(cmd_), payload_size(0) { 250 mode = infer_mode(cmd_); 251 size_t ebufsz = is_lookup() ? cmd->nspecs * 4 : cmd->nspecs * 8; 252 extra_body = new char[ebufsz]; 253 bodysz = 0; 254 } 255 256 ~MultiBuilder() { 257 if (extra_body != NULL) { 258 delete[] extra_body; 259 } 260 } 261 262 inline MultiBuilder(const MultiBuilder&); 263 264 // IOVs which are fed into lcb_VALBUF for subsequent use 265 const lcb_CMDSUBDOC *cmd; 266 std::vector<lcb_IOV> iovs; 267 char *extra_body; 268 size_t bodysz; 269 270 // Total size of the payload itself 271 size_t payload_size; 272 273 unsigned mode; 274 275 bool is_lookup() const { 276 return mode == LCB_SDMULTI_MODE_LOOKUP; 277 } 278 279 bool is_mutate() const { 280 return mode == LCB_SDMULTI_MODE_MUTATE; 281 } 282 283 void maybe_setmode(const SubdocCmdTraits::Traits& t) { 284 if (mode == 0) { 285 mode = t.mode(); 286 } 287 } 288 289 template <typename T> void add_field(T itm, size_t len) { 290 const char *b = reinterpret_cast<const char *>(&itm); 291 memcpy(extra_body + bodysz, b, len); 292 bodysz += len; 293 } 294 295 const char *extra_mark() const { 296 return extra_body + bodysz; 297 } 298 299 void add_extras_iov(const char *last_begin) { 300 const char *p_end = extra_mark(); 301 add_iov(last_begin, p_end - last_begin); 302 } 303 304 void add_iov(const void *b, size_t n) { 305 if (!n) { 306 return; 307 } 308 309 lcb_IOV iov; 310 iov.iov_base = const_cast<void*>(b); 311 iov.iov_len = n; 312 iovs.push_back(iov); 313 payload_size += n; 314 } 315 316 void add_iov(const lcb_VALBUF& vb) { 317 if (vb.vtype == LCB_KV_CONTIG || vb.vtype == LCB_KV_COPY) { 318 add_iov(vb.u_buf.contig.bytes, vb.u_buf.contig.nbytes); 319 } else { 320 for (size_t ii = 0; ii < vb.u_buf.contig.nbytes; ++ii) { 321 const lcb_IOV& iov = vb.u_buf.multi.iov[ii]; 322 if (!iov.iov_len) { 323 continue; // Skip it 324 } 325 payload_size += iov.iov_len; 326 iovs.push_back(iov); 327 } 328 } 329 } 330 331 inline lcb_error_t add_spec(const lcb_SDSPEC *); 332 }; 333 334 lcb_error_t 335 MultiBuilder::add_spec(const lcb_SDSPEC *spec) 336 { 337 const SubdocCmdTraits::Traits& trait = SubdocCmdTraits::find(spec->sdcmd); 338 if (!trait.valid()) { 339 return LCB_UNKNOWN_SDCMD; 340 } 341 maybe_setmode(trait); 342 343 if (trait.mode() != mode) { 344 return LCB_OPTIONS_CONFLICT; 345 } 346 347 const char *p_begin = extra_mark(); 348 // opcode 349 add_field(trait.opcode, 1); 350 // flags 351 add_field(make_path_flags(spec->options), 1); 352 353 uint16_t npath = static_cast<uint16_t>(spec->path.contig.nbytes); 354 if (!npath && !trait.chk_allow_empty_path(spec->options)) { 355 return LCB_EMPTY_PATH; 356 } 357 358 // Path length 359 add_field(static_cast<uint16_t>(htons(npath)), 2); 360 361 uint32_t vsize = 0; 362 if (is_mutate()) { 363 // Mutation needs an additional 'value' spec. 364 vsize = get_valbuf_size(spec->value); 365 add_field(static_cast<uint32_t>(htonl(vsize)), 4); 366 } 367 368 // Finalize the header.. 369 add_extras_iov(p_begin); 370 371 // Add the actual path 372 add_iov(spec->path.contig.bytes, spec->path.contig.nbytes); 373 if (vsize) { 374 add_iov(spec->value); 375 } 376 return LCB_SUCCESS; 377 } 378 379 380 static lcb_error_t 381 sd3_single(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd) 382 { 383 // Find the trait 384 const lcb_SDSPEC *spec = cmd->specs; 385 const SubdocCmdTraits::Traits& traits = SubdocCmdTraits::find(spec->sdcmd); 386 lcb_error_t rc; 387 388 // Any error here is implicitly related to the only spec 389 if (cmd->error_index) { 390 *cmd->error_index = 0; 391 } 392 393 if (!traits.valid()) { 394 return LCB_UNKNOWN_SDCMD; 395 } 396 397 // Determine if the trait matches the mode. Technically we don't care 398 // about this (since it's always a single command) but we do want the 399 // API to remain consistent. 400 if (cmd->multimode != 0 && cmd->multimode != traits.mode()) { 401 return LCB_OPTIONS_CONFLICT; 402 } 403 404 if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) { 405 return LCB_EMPTY_KEY; 406 } 407 if (LCB_KEYBUF_IS_EMPTY(&spec->path) && 408 !traits.chk_allow_empty_path(spec->options)) { 409 return LCB_EMPTY_PATH; 410 } 411 412 lcb_VALBUF valbuf; 413 const lcb_VALBUF *valbuf_p = &valbuf; 414 lcb_IOV tmpiov[2]; 415 lcb_FRAGBUF *fbuf = &valbuf.u_buf.multi; 416 417 valbuf.vtype = LCB_KV_IOVCOPY; 418 fbuf->iov = tmpiov; 419 fbuf->niov = 1; 420 fbuf->total_length = 0; 421 tmpiov[0].iov_base = const_cast<void*>(spec->path.contig.bytes); 422 tmpiov[0].iov_len = spec->path.contig.nbytes; 423 424 if (traits.has_value) { 425 if (spec->value.vtype == LCB_KV_COPY) { 426 fbuf->niov = 2; 427 /* Subdoc value is the second IOV */ 428 tmpiov[1].iov_base = (void *)spec->value.u_buf.contig.bytes; 429 tmpiov[1].iov_len = spec->value.u_buf.contig.nbytes; 430 } else { 431 /* Assume properly formatted packet */ 432 valbuf_p = &spec->value; 433 } 434 } 435 436 uint8_t extlen = 3; 437 uint32_t exptime = 0; 438 if (cmd->exptime) { 439 if (!traits.allow_expiry) { 440 return LCB_OPTIONS_CONFLICT; 441 } 442 exptime = cmd->exptime; 443 extlen = 7; 444 } 445 446 uint8_t docflags = make_doc_flags(cmd->cmdflags); 447 if (docflags) { 448 extlen++; 449 } 450 451 protocol_binary_request_header hdr = {{0}}; 452 mc_PACKET *packet; 453 mc_PIPELINE *pipeline; 454 455 rc = mcreq_basic_packet(&instance->cmdq, 456 (const lcb_CMDBASE*)cmd, 457 &hdr, extlen, &packet, &pipeline, MCREQ_BASICPACKET_F_FALLBACKOK); 458 459 if (rc != LCB_SUCCESS) { 460 return rc; 461 } 462 463 rc = mcreq_reserve_value(pipeline, packet, valbuf_p); 464 if (rc != LCB_SUCCESS) { 465 mcreq_wipe_packet(pipeline, packet); 466 mcreq_release_packet(pipeline, packet); 467 return rc; 468 } 469 470 MCREQ_PKT_RDATA(packet)->cookie = cookie; 471 MCREQ_PKT_RDATA(packet)->start = gethrtime(); 472 473 hdr.request.magic = PROTOCOL_BINARY_REQ; 474 hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES; 475 hdr.request.extlen = packet->extlen; 476 hdr.request.opaque = packet->opaque; 477 hdr.request.opcode = traits.opcode; 478 hdr.request.cas = lcb_htonll(cmd->cas); 479 hdr.request.bodylen = htonl(hdr.request.extlen + 480 ntohs(hdr.request.keylen) + get_value_size(packet)); 481 482 memcpy(SPAN_BUFFER(&packet->kh_span), hdr.bytes, sizeof hdr.bytes); 483 484 char *extras = SPAN_BUFFER(&packet->kh_span) + MCREQ_PKT_BASESIZE; 485 // Path length: 486 uint16_t enc_pathlen = htons(spec->path.contig.nbytes); 487 memcpy(extras, &enc_pathlen, 2); 488 extras += 2; 489 490 uint8_t path_flags = make_path_flags(spec->options); 491 memcpy(extras, &path_flags, 1); 492 extras += 1; 493 494 if (exptime) { 495 uint32_t enc_exptime = htonl(exptime); 496 memcpy(extras, &enc_exptime, 4); 497 extras += 4; 498 } 499 500 if (docflags) { 501 memcpy(extras, &docflags, 1); 502 extras += 1; 503 } 504 505 LCB_SCHED_ADD(instance, pipeline, packet); 506 return LCB_SUCCESS; 507 } 508 509 LIBCOUCHBASE_API 510 lcb_error_t 511 lcb_subdoc3(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd) 512 { 513 // First validate the command 514 if (cmd->nspecs == 0) { 515 return LCB_ENO_COMMANDS; 516 } 517 518 if (cmd->nspecs == 1) { 519 switch (cmd->specs[0].sdcmd) { 520 case LCB_SDCMD_GET_FULLDOC: 521 case LCB_SDCMD_SET_FULLDOC: 522 case LCB_SDCMD_REMOVE_FULLDOC: 523 break; 524 default: 525 return sd3_single(instance, cookie, cmd); 526 } 527 } 528 529 uint32_t expiry = cmd->exptime; 530 uint8_t docflags = make_doc_flags(cmd->cmdflags); 531 lcb_error_t rc = LCB_SUCCESS; 532 533 MultiBuilder ctx(cmd); 534 if (cmd->error_index) { 535 *cmd->error_index = -1; 536 } 537 538 if (expiry && !ctx.is_mutate()) { 539 return LCB_OPTIONS_CONFLICT; 540 } 541 542 for (size_t ii = 0; ii < cmd->nspecs; ++ii) { 543 if (cmd->error_index) { 544 *cmd->error_index = ii; 545 } 546 rc = ctx.add_spec(cmd->specs + ii); 547 if (rc != LCB_SUCCESS) { 548 return rc; 549 } 550 } 551 552 mc_PIPELINE *pl; 553 mc_PACKET *pkt; 554 uint8_t extlen = 0; 555 if (expiry) { 556 extlen += 4; 557 } 558 if (docflags) { 559 extlen ++; 560 } 561 562 protocol_binary_request_header hdr; 563 564 if (cmd->error_index) { 565 *cmd->error_index = -1; 566 } 567 568 rc = mcreq_basic_packet( 569 &instance->cmdq, reinterpret_cast<const lcb_CMDBASE*>(cmd), 570 &hdr, extlen, &pkt, &pl, MCREQ_BASICPACKET_F_FALLBACKOK); 571 572 if (rc != LCB_SUCCESS) { 573 return rc; 574 } 575 576 lcb_VALBUF vb = { LCB_KV_IOVCOPY }; 577 vb.u_buf.multi.iov = &ctx.iovs[0]; 578 vb.u_buf.multi.niov = ctx.iovs.size(); 579 vb.u_buf.multi.total_length = ctx.payload_size; 580 rc = mcreq_reserve_value(pl, pkt, &vb); 581 582 if (rc != LCB_SUCCESS) { 583 mcreq_wipe_packet(pl, pkt); 584 mcreq_release_packet(pl, pkt); 585 return rc; 586 } 587 588 // Set the header fields. 589 hdr.request.magic = PROTOCOL_BINARY_REQ; 590 if (ctx.is_lookup()) { 591 hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP; 592 } else { 593 hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION; 594 } 595 hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES; 596 hdr.request.extlen = pkt->extlen; 597 hdr.request.opaque = pkt->opaque; 598 hdr.request.cas = lcb_htonll(cmd->cas); 599 hdr.request.bodylen = htonl(hdr.request.extlen + 600 ntohs(hdr.request.keylen) + ctx.payload_size); 601 memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof hdr.bytes); 602 if (expiry) { 603 expiry = htonl(expiry); 604 memcpy(SPAN_BUFFER(&pkt->kh_span) + 24, &expiry, 4); 605 } 606 if (docflags) { 607 memcpy(SPAN_BUFFER(&pkt->kh_span) + 24 + (extlen -1), &docflags, 1); 608 } 609 610 MCREQ_PKT_RDATA(pkt)->cookie = cookie; 611 MCREQ_PKT_RDATA(pkt)->start = gethrtime(); 612 LCB_SCHED_ADD(instance, pl, pkt); 613 return LCB_SUCCESS; 614 } 615