1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdlib.h>
19 #include <string.h>
20 #include <stddef.h>
21 #include <libcouchbase/couchbase.h>
22 #include <libcouchbase/vbucket.h>
23 #include "config.h"
24 #include "contrib/cJSON/cJSON.h"
25 #include "json-inl.h"
26 #include "hash.h"
27 #include "crc32.h"
28
29 #define STRINGIFY_(X) #X
30 #define STRINGIFY(X) STRINGIFY_(X)
31 #define MAX_AUTHORITY_SIZE 100
32 #define SET_ERRSTR(cfg, s) if (!(cfg)->errstr) { \
33 (cfg)->errstr = __FILE__ ":" STRINGIFY(__LINE__) " " s ; \
34 }
35
36 /******************************************************************************
37 ******************************************************************************
38 ** Core Parsing Routines **
39 ******************************************************************************
40 ******************************************************************************/
41 static lcbvb_VBUCKET *
42 build_vbmap(lcbvb_CONFIG *cfg, cJSON *cj, unsigned *nitems)
43 {
44 lcbvb_VBUCKET *vblist = NULL;
sepgsql_attribute_post_create(Oid relOid,AttrNumber attnum)45 cJSON *jvb;
46 unsigned ii, nalloc;
47
48 /** FIXME: Realloc dynamically when too small */
49 if (!(nalloc = cJSON_GetArraySize(cj))) {
50 goto GT_ERR;
51 }
52
53 if (!(vblist = calloc(nalloc, sizeof(*vblist)))) {
54 goto GT_ERR;
55 }
56
57 /* Iterate over all the vbuckets */
58 jvb = cj->child;
59 for (ii = 0; ii < nalloc && jvb; ++ii, jvb = jvb->next) {
60 cJSON *jsix;
61 lcbvb_VBUCKET *cvb;
62 unsigned jj, nservers;
63
64 if (jvb->type != cJSON_Array) {
65 goto GT_ERR;
66 }
67
68 nservers = cJSON_GetArraySize(jvb);
69 jsix = jvb->child;
70 cvb = vblist + ii;
71
72 /* Iterate over each index in the vbucket */
73 for (jj = 0; jj < nservers && jsix; ++jj, jsix = jsix->next) {
74 if (jsix->type != cJSON_Number) {
75 goto GT_ERR;
76 }
77 cvb->servers[jj] = jsix->valueint;
78 if (cvb->servers[jj] > (int)cfg->nsrv-1) {
79 SET_ERRSTR(cfg, "Invalid vBucket map received from server. Above-bounds vBucket target found");
80 goto GT_ERR;
81 }
82 }
83 }
84
85 *nitems = nalloc;
86 return vblist;
87
88 GT_ERR:
89 free(vblist);
90 return NULL;
91 }
92
93 static void copy_address(char *buf, size_t nbuf, const char *host, lcb_U16 port)
94 {
95 if (strchr(host, ':')) {
96 // IPv6 and should be bracketed
97 snprintf(buf, nbuf, "[%s]:%d", host, port);
98 } else {
99 snprintf(buf, nbuf, "%s:%d", host, port);
100 }
101 }
102
103 static lcbvb_SERVER *
104 find_server_memd(lcbvb_SERVER *servers, unsigned n, const char *s)
105 {
106 unsigned ii;
107 for (ii = 0; ii < n; ii++) {
108 char buf[4096] = { 0 };
109 lcbvb_SERVER *cur = servers + ii;
110 copy_address(buf, sizeof(buf), cur->hostname, cur->svc.data);
111 if (!strncmp(s, buf, sizeof(buf))) {
112 return cur;
113 }
114 }
115 return NULL;
116 }
117
118 static int
119 assign_dumy_server(lcbvb_CONFIG *cfg, lcbvb_SERVER *dst, const char *s)
120 {
121 int itmp;
122 char *colon;
123 if (!(dst->authority = strdup(s))) {
124 SET_ERRSTR(cfg, "Couldn't allocate authority string");
125 goto GT_ERR;
126 }
127
128 if (!(colon = strstr(s, ":"))) {
129 SET_ERRSTR(cfg, "Badly formatted name string");
130 goto GT_ERR;
131 }
132
133 if (sscanf(colon+1, "%d", &itmp) != 1) {
134 SET_ERRSTR(cfg, "Badly formatted port");
sepgsql_attribute_drop(Oid relOid,AttrNumber attnum)135 goto GT_ERR;
136 }
137
138 dst->svc.data = itmp;
139 return 1;
140
141 GT_ERR:
142 free(dst->authority);
143 return 0;
144 }
145
146 static void
147 set_vb_count(lcbvb_CONFIG *cfg, lcbvb_VBUCKET *vbs)
148 {
149 unsigned ii, jj;
150 if (!vbs) {
151 return;
152 }
153
154 for (ii = 0; ii < cfg->nvb; ++ii) {
155 for (jj = 0; jj < cfg->nrepl+1; ++jj) {
156 int ix = vbs[ii].servers[jj];
157 if (ix < 0 || (unsigned)ix > cfg->nsrv) {
158 continue;
159 }
160 cfg->servers[ix].nvbs++;
161 }
162 }
163 }
164
165 static int
166 pair_server_list(lcbvb_CONFIG *cfg, cJSON *vbconfig)
sepgsql_attribute_relabel(Oid relOid,AttrNumber attnum,const char * seclabel)167 {
168 cJSON *servers;
169 lcbvb_SERVER *newlist = NULL;
170 unsigned ii, nsrv;
171
172 if (!get_jarray(vbconfig, "serverList", &servers)) {
173 SET_ERRSTR(cfg, "Couldn't find serverList");
174 goto GT_ERROR;
175 }
176
177 nsrv = cJSON_GetArraySize(servers);
178
179 if (nsrv > cfg->nsrv) {
180 /* nodes in serverList which are not in nodes/nodesExt */
181 void *tmp = realloc(cfg->servers, sizeof(*cfg->servers) * nsrv);
182 if (!tmp) {
183 SET_ERRSTR(cfg, "Couldn't allocate memory for server list");
184 goto GT_ERROR;
185 }
186 cfg->servers = tmp;
187 cfg->nsrv = nsrv;
188 }
189
190 /* allocate an array for the reordered server list */
191 newlist = calloc(nsrv, sizeof(*cfg->servers));
192
193 for (ii = 0; ii < nsrv; ii++) {
194 char *tmp;
195 cJSON *jst;
196 lcbvb_SERVER *cur;
197 jst = cJSON_GetArrayItem(servers, ii);
198 tmp = jst->valuestring;
199 cur = find_server_memd(cfg->servers, cfg->nsrv, tmp);
200
201 if (cur) {
202 newlist[ii] = *cur;
203 } else {
204 /* found server inside serverList but not in nodes? */
205 if (!assign_dumy_server(cfg, &newlist[ii], tmp)) {
206 goto GT_ERROR;
207 }
208 }
209 }
210
211 free(cfg->servers);
212 cfg->servers = newlist;
213 return 1;
214
215 GT_ERROR:
216 free(newlist);
217 return 0;
218
219 }
220
221 static int
222 parse_vbucket(lcbvb_CONFIG *cfg, cJSON *cj)
223 {
224 cJSON *vbconfig, *vbmap, *ffmap = NULL;
225
226 if (!get_jobj(cj, "vBucketServerMap", &vbconfig)) {
227 SET_ERRSTR(cfg, "Expected top-level 'vBucketServerMap'");
228 goto GT_ERROR;
229 }
230
231 if (!get_juint(vbconfig, "numReplicas", &cfg->nrepl)) {
232 SET_ERRSTR(cfg, "'numReplicas' missing");
233 goto GT_ERROR;
234 }
235
236 if (!get_jarray(vbconfig, "vBucketMap", &vbmap)) {
237 SET_ERRSTR(cfg, "Missing 'vBucketMap'");
238 goto GT_ERROR;
239 }
240
241 get_jarray(vbconfig, "vBucketMapForward", &ffmap);
sepgsql_relation_post_create(Oid relOid)242
243 if ((cfg->vbuckets = build_vbmap(cfg, vbmap, &cfg->nvb)) == NULL) {
244 goto GT_ERROR;
245 }
246
247 if (ffmap && (cfg->ffvbuckets = build_vbmap(cfg, ffmap, &cfg->nvb)) == NULL) {
248 goto GT_ERROR;
249 }
250
251 if (!cfg->is3x) {
252 if (!pair_server_list(cfg, vbconfig)) {
253 goto GT_ERROR;
254 }
255 }
256
257 /** Now figure out which server goes where */
258 set_vb_count(cfg, cfg->vbuckets);
259 set_vb_count(cfg, cfg->ffvbuckets);
260 return 1;
261
262 GT_ERROR:
263 return 0;
264 }
265
266 static int server_cmp(const void *s1, const void *s2)
267 {
268 return strcmp(((const lcbvb_SERVER *)s1)->authority,
269 ((const lcbvb_SERVER *)s2)->authority);
270 }
271
272 static int continuum_item_cmp(const void *t1, const void *t2)
273 {
274 const lcbvb_CONTINUUM *ct1 = t1, *ct2 = t2;
275
276 if (ct1->point == ct2->point) {
277 return 0;
278 } else if (ct1->point > ct2->point) {
279 return 1;
280 } else {
281 return -1;
282 }
283 }
284
285 static int
286 update_ketama(lcbvb_CONFIG *cfg)
287 {
288 char host[MAX_AUTHORITY_SIZE+10] = "";
289 int nhost;
290 unsigned pp, hh, ss, nn;
291 unsigned char digest[16];
292 lcbvb_CONTINUUM *new_continuum, *old_continuum;
293
294 qsort(cfg->servers, cfg->ndatasrv, sizeof(*cfg->servers), server_cmp);
295
296 new_continuum = calloc(160 * cfg->ndatasrv, sizeof(*new_continuum));
297 /* 40 hashes, 4 numbers per hash = 160 points per server */
298 for (ss = 0, pp = 0; ss < cfg->ndatasrv; ++ss) {
299 /* we can add more points to server which have more memory */
300 for (hh = 0; hh < 40; ++hh) {
301 lcbvb_SERVER *srv = cfg->servers + ss;
302 nhost = snprintf(host, MAX_AUTHORITY_SIZE+10, "%s-%u", srv->authority, hh);
303 vb__hash_md5(host, nhost, digest);
304 for (nn = 0; nn < 4; ++nn, ++pp) {
305 new_continuum[pp].index = ss;
306 new_continuum[pp].point = ((uint32_t) (digest[3 + nn * 4] & 0xFF) << 24)
307 | ((uint32_t) (digest[2 + nn * 4] & 0xFF) << 16)
308 | ((uint32_t) (digest[1 + nn * 4] & 0xFF) << 8)
309 | (digest[0 + nn * 4] & 0xFF);
310 }
311 }
312 }
313
314 qsort(new_continuum, pp, sizeof *new_continuum, continuum_item_cmp);
315 old_continuum = cfg->continuum;
316 cfg->continuum = new_continuum;
317 cfg->ncontinuum = pp;
318 free(old_continuum);
319 return 1;
320 }
321
322 static int
323 extract_services(lcbvb_CONFIG *cfg, cJSON *jsvc, lcbvb_SERVICES *svc, int is_ssl)
324 {
325 int itmp;
326 int rv;
327 const char *key;
328
329 #define EXTRACT_SERVICE(k, fld) \
330 key = is_ssl ? k"SSL" : k; \
331 rv = get_jint(jsvc, key, &itmp); \
332 if (rv) { svc->fld = itmp; } else { svc->fld = 0; }
333
334 EXTRACT_SERVICE("kv", data);
335 EXTRACT_SERVICE("mgmt", mgmt);
336 EXTRACT_SERVICE("capi", views);
337 EXTRACT_SERVICE("n1ql", n1ql);
338 EXTRACT_SERVICE("fts", fts);
339 EXTRACT_SERVICE("indexAdmin", ixadmin);
340 EXTRACT_SERVICE("indexScan", ixquery);
341 EXTRACT_SERVICE("cbas", cbas);
342
343 #undef EXTRACT_SERVICE
344
345 (void)cfg;
346 return 1;
347 }
348
349 static int
350 build_server_strings(lcbvb_CONFIG *cfg, lcbvb_SERVER *server)
351 {
352 /* get the authority */
353 char tmpbuf[4096];
354
355 copy_address(tmpbuf, sizeof(tmpbuf), server->hostname, server->svc.data);
356 server->authority = strdup(tmpbuf);
357 if (!server->authority) {
358 SET_ERRSTR(cfg, "Couldn't allocate authority");
359 return 0;
360 }
361
362 server->svc.hoststrs[LCBVB_SVCTYPE_DATA] = strdup(server->authority);
363 if (server->viewpath == NULL && server->svc.views) {
364 server->viewpath = malloc(strlen(cfg->bname) + 2);
365 sprintf(server->viewpath, "/%s", cfg->bname);
366 }
367 if (server->querypath == NULL && server->svc.n1ql) {
368 server->querypath = strdup("/query/service");
369 }
370 if (server->ftspath == NULL && server->svc.fts) {
371 server->ftspath = strdup("/");
372 }
373 if (server->cbaspath == NULL && server->svc.cbas) {
374 server->cbaspath = strdup("/query/service");
375 }
376 return 1;
377 }
378
379 /**
380 * Parse a node from the 'nodesExt' array
381 * @param cfg
382 * @param server
383 * @param js
384 * @return
385 */
386 static int
387 build_server_3x(lcbvb_CONFIG *cfg, lcbvb_SERVER *server, cJSON *js, char **network)
388 {
389 cJSON *jsvcs;
390 char *htmp;
391
392 if (!get_jstr(js, "hostname", &htmp)) {
393 htmp = "$HOST";
394 }
395 if (!(server->hostname = strdup(htmp))) {
396 SET_ERRSTR(cfg, "Couldn't allocate memory");
397 goto GT_ERR;
398 }
399
400 if (!get_jobj(js, "services", &jsvcs)) {
401 SET_ERRSTR(cfg, "Couldn't find 'services'");
402 goto GT_ERR;
403 }
404
405 if (!extract_services(cfg, jsvcs, &server->svc, 0)) {
406 goto GT_ERR;
407 }
408 if (!extract_services(cfg, jsvcs, &server->svc_ssl, 1)) {
409 goto GT_ERR;
410 }
411
412 if (!build_server_strings(cfg, server)) {
413 goto GT_ERR;
414 }
415
416 if (network && *network && strcmp(*network, "default") != 0) {
417 cJSON *jaltaddr = cJSON_GetObjectItem(js, "alternateAddresses");
sepgsql_relation_drop(Oid relOid)418 if (jaltaddr && jaltaddr->type == cJSON_Object) {
419 cJSON *jnetwork = cJSON_GetObjectItem(jaltaddr, *network);
420 if (jnetwork && get_jstr(jnetwork, "hostname", &htmp)) {
421 cJSON *jports;
422 server->alt_hostname = strdup(htmp);
423 jports = cJSON_GetObjectItem(jnetwork, "ports");
424 if (jports && jports->type == cJSON_Object) {
425 extract_services(cfg, jports, &server->alt_svc, 0);
426 extract_services(cfg, jports, &server->alt_svc_ssl, 1);
427 }
428
429 #define COPY_SERVICE(src, dst) \
430 if ((dst)->data == 0) (dst)->data = (src)->data; \
431 if ((dst)->mgmt == 0) (dst)->mgmt = (src)->mgmt; \
432 if ((dst)->views == 0) (dst)->views = (src)->views; \
433 if ((dst)->n1ql == 0) (dst)->n1ql = (src)->n1ql; \
434 if ((dst)->fts == 0) (dst)->fts = (src)->fts; \
435 if ((dst)->ixadmin == 0) (dst)->ixadmin = (src)->ixadmin; \
436 if ((dst)->ixquery == 0) (dst)->ixquery = (src)->ixquery; \
437 if ((dst)->cbas == 0) (dst)->cbas = (src)->cbas;
438
439 COPY_SERVICE(&server->svc, &server->alt_svc);
440 COPY_SERVICE(&server->svc_ssl, &server->alt_svc_ssl);
441
442 #undef COPY_SERVICE
443 }
444 }
445 }
446
447 return 1;
448
449 GT_ERR:
450 return 0;
451 }
452
453 /**
454 * Initialize a server from a JSON Object
455 * @param server The server to initialize
456 * @param js The object which contains the server information
457 * @return nonzero on success, 0 on failure.
458 */
459 static int
460 build_server_2x(lcbvb_CONFIG *cfg, lcbvb_SERVER *server, cJSON *js, char **network)
461 {
462 char *tmp = NULL, *colon;
463 int itmp;
464 cJSON *jsports;
465
466 if (!get_jstr(js, "hostname", &tmp)) {
467 SET_ERRSTR(cfg, "Couldn't find hostname");
468 goto GT_ERR;
469 }
470
471 /** Hostname is the _rest_ API host, e.g. '8091' */
472 if ((server->hostname = strdup(tmp)) == NULL) {
473 SET_ERRSTR(cfg, "Couldn't allocate hostname");
474 goto GT_ERR;
475 }
476
477 colon = strchr(server->hostname, ':');
478 if (!colon) {
479 SET_ERRSTR(cfg, "Expected ':' in 'hostname'");
480 goto GT_ERR;
481 }
482 if (sscanf(colon+1, "%d", &itmp) != 1) {
483 SET_ERRSTR(cfg, "Expected port after ':'");
484 goto GT_ERR;
485 }
486
487 /* plain mgmt port is extracted from hostname */
488 server->svc.mgmt = itmp;
489 *colon = '\0';
490
491 /** Handle the views name */
492 if (get_jstr(js, "couchApiBase", &tmp)) {
493 /** Have views */
494 char *path_begin;
495 colon = strrchr(tmp, ':');
496
497 if (!colon) {
498 /* no port */
499 goto GT_ERR;
500 }
501 if (sscanf(colon+1, "%d", &itmp) != 1) {
502 goto GT_ERR;
503 }
504
505 /* Assign the port */
506 server->svc.views = itmp;
507 path_begin = strstr(colon, "/");
508 if (!path_begin) {
509 SET_ERRSTR(cfg, "Expected path in couchApiBase");
510 goto GT_ERR;
511 }
512 server->viewpath = strdup(path_begin);
513 } else {
514 server->svc.views = 0;
515 }
516
517 /* get the 'ports' dictionary */
518 if (!get_jobj(js, "ports", &jsports)) {
519 SET_ERRSTR(cfg, "Expected 'ports' dictionary");
520 goto GT_ERR;
521 }
522
523 /* memcached port */
524 if (get_jint(jsports, "direct", &itmp)) {
525 server->svc.data = itmp;
sepgsql_relation_relabel(Oid relOid,const char * seclabel)526 } else {
527 SET_ERRSTR(cfg, "Expected 'direct' field in 'ports'");
528 goto GT_ERR;
529 }
530
531 /* set the authority */
532 if (!build_server_strings(cfg, server)) {
533 goto GT_ERR;
534 }
535 return 1;
536
537 GT_ERR:
538 return 0;
539 }
540
541 static void
542 guess_network(cJSON *jnodes, int nsrv, const char *source, char **network)
543 {
544 int ii;
545 for (ii = 0; ii < nsrv; ii++) {
546 cJSON *jsrv = cJSON_GetArrayItem(jnodes, ii);
547 {
548 cJSON *jhostname = cJSON_GetObjectItem(jsrv, "hostname");
549 if (jhostname && jhostname->type == cJSON_String) {
550 if (strcmp(jhostname->valuestring, source) == 0) {
551 *network = strdup("default");
552 return;
553 }
554 }
555 }
556 {
557 cJSON *jaltaddr = cJSON_GetObjectItem(jsrv, "alternateAddresses");
558 if (jaltaddr && jaltaddr->type == cJSON_Object) {
559 cJSON *cur;
560 for (cur = jaltaddr->child; cur != NULL; cur = cur->next) {
561 if (cur->type == cJSON_Object) {
562 cJSON *jhostname = cJSON_GetObjectItem(cur, "hostname");
563 if (jhostname && jhostname->type == cJSON_String) {
564 if (strcmp(jhostname->valuestring, source) == 0) {
565 *network = strdup(cur->string);
566 return;
567 }
568 }
569 }
570 }
571 }
572 }
573 }
574 *network = strdup("default");
575 }
576
sepgsql_relation_setattr(Oid relOid)577 int
578 lcbvb_load_json_ex(lcbvb_CONFIG *cfg, const char *data, const char *source, char **network)
579 {
580 cJSON *cj = NULL, *jnodes_ext = NULL, *jnodes = NULL;
581 char *tmp = NULL;
582 unsigned ii, jnodes_size = 0;
583 int jnodes_defined = 0;
584
585 if ((cj = cJSON_Parse(data)) == NULL) {
586 SET_ERRSTR(cfg, "Couldn't parse JSON");
587 goto GT_ERROR;
588 }
589
590 if (!get_jstr(cj, "name", &tmp)) {
591 SET_ERRSTR(cfg, "Expected 'name' key");
592 goto GT_ERROR;
593 }
594 cfg->bname = strdup(tmp);
595
596 if (!get_jstr(cj, "nodeLocator", &tmp)) {
597 SET_ERRSTR(cfg, "Expected 'nodeLocator' key");
598 goto GT_ERROR;
599 }
600
601 get_jarray(cj, "nodes", &jnodes);
602 if (jnodes) {
603 jnodes_defined = 1;
604 jnodes_size = cJSON_GetArraySize(jnodes);
605 }
606 if (get_jarray(cj, "nodesExt", &jnodes_ext)) {
607 cfg->is3x = 1;
608 cfg->nsrv = cJSON_GetArraySize(jnodes_ext);
609 jnodes = jnodes_ext;
610 } else if (jnodes == NULL) {
611 SET_ERRSTR(cfg, "expected 'nodesExt' or 'nodes' array");
612 goto GT_ERROR;
613 }
614
615 if (!strcmp(tmp, "ketama")) {
616 cfg->dtype = LCBVB_DIST_KETAMA;
617 } else {
618 cfg->dtype = LCBVB_DIST_VBUCKET;
619 }
620
621 if (!get_jint(cj, "rev", &cfg->revid)) {
622 cfg->revid = -1;
623 }
624
625 cfg->caps = 0;
626 {
627 cJSON *jcaps = NULL;
628 if (get_jarray(cj, "bucketCapabilities", &jcaps)) {
629 unsigned ncaps = cJSON_GetArraySize(jcaps);
630 for (ii = 0; ii < ncaps; ii++) {
631 cJSON *jcap = cJSON_GetArrayItem(jcaps, ii);
632 if (jcap || jcap->type == cJSON_String) {
633 if (strcmp(jcap->valuestring, "xattr") == 0) {
634 cfg->caps |= LCBVB_CAP_XATTR;
635 } else if (strcmp(jcap->valuestring, "dcp") == 0) {
636 cfg->caps |= LCBVB_CAP_DCP;
637 } else if (strcmp(jcap->valuestring, "cbhello") == 0) {
638 cfg->caps |= LCBVB_CAP_CBHELLO;
639 } else if (strcmp(jcap->valuestring, "touch") == 0) {
640 cfg->caps |= LCBVB_CAP_TOUCH;
641 } else if (strcmp(jcap->valuestring, "couchapi") == 0) {
642 cfg->caps |= LCBVB_CAP_COUCHAPI;
643 } else if (strcmp(jcap->valuestring, "cccp") == 0) {
644 cfg->caps |= LCBVB_CAP_CCCP;
645 } else if (strcmp(jcap->valuestring, "xdcrCheckpointing") == 0) {
646 cfg->caps |= LCBVB_CAP_XDCR_CHECKPOINTING;
647 } else if (strcmp(jcap->valuestring, "nodesExt") == 0) {
648 cfg->caps |= LCBVB_CAP_NODES_EXT;
649 }
650 }
651 }
652 }
653 }
654
655 /** Get the number of nodes. This traverses the list. Yuck */
656 cfg->nsrv = cJSON_GetArraySize(jnodes);
657
658 if (network && *network == NULL) {
659 guess_network(jnodes, cfg->nsrv, source, network);
660 }
661
662 /** Allocate a temporary one on the heap */
663 cfg->servers = calloc(cfg->nsrv, sizeof(*cfg->servers));
664 for (ii = 0; ii < cfg->nsrv; ii++) {
665 int rv;
666 cJSON *jsrv = cJSON_GetArrayItem(jnodes, ii);
667
668 if (cfg->is3x) {
669 rv = build_server_3x(cfg, cfg->servers + ii, jsrv, network);
670 if (jnodes_defined && rv && ii >= jnodes_size) {
671 cfg->servers[ii].svc.data = 0;
672 cfg->servers[ii].svc_ssl.data = 0;
673 cfg->servers[ii].alt_svc.data = 0;
674 cfg->servers[ii].alt_svc_ssl.data = 0;
675 }
676 } else {
677 rv = build_server_2x(cfg, cfg->servers + ii, jsrv, network);
678 }
679
680 if (!rv) {
681 SET_ERRSTR(cfg, "Failed to build server");
682 goto GT_ERROR;
683 }
684 }
685
686 /* Count the number of _data_ servers in the cluster. Per the spec,
687 * these will always appear in order (so that we won't ever have "holes") */
688 for (ii = 0; ii < cfg->nsrv; ii++) {
689 if (!cfg->servers[ii].svc.data) {
690 break;
691 }
692 }
693 cfg->ndatasrv = ii;
694
695 if (cfg->dtype == LCBVB_DIST_VBUCKET) {
696 if (!parse_vbucket(cfg, cj)) {
697 SET_ERRSTR(cfg, "Failed to parse vBucket map");
698 goto GT_ERROR;
699 }
700 } else {
701 /* If there is no $HOST then we can update the ketama config, otherwise
702 * we must wait for the hostname to be replaced! */
703 if (strstr(data, "$HOST") == NULL) {
704 if (!update_ketama(cfg)) {
705 SET_ERRSTR(cfg, "Failed to establish ketama continuums");
706 }
707 }
708 }
709 cfg->servers = realloc(cfg->servers, sizeof(*cfg->servers) * cfg->nsrv);
710 cfg->randbuf = malloc(cfg->nsrv * sizeof(*cfg->randbuf));
711 cJSON_Delete(cj);
712 return 0;
713
714 GT_ERROR:
715 if (cj) {
716 cJSON_Delete(cj);
717 }
718 return -1;
719 }
720
721 int
722 lcbvb_load_json(lcbvb_CONFIG *cfg, const char *data)
723 {
sepgsql_index_modify(Oid indexOid)724 return lcbvb_load_json_ex(cfg, data, NULL, NULL);
725 }
726
727 static void
728 replace_hoststr(char **orig, const char *replacement)
729 {
730 char *match;
731 char *newbuf;
732
733 if (!*orig) {
734 return;
735 }
736
737 match = strstr(*orig, "$HOST");
738 if (match == NULL || *match == '\0') {
739 return;
740 }
741
742 newbuf = malloc(strlen(*orig) + strlen(replacement));
743 *match = '\0';
744
745 /* copy until the placeholder */
746 strcpy(newbuf, *orig);
747 /* copy the host */
748 strcat(newbuf, replacement);
749 /* copy after the placeholder */
750 match += sizeof("$HOST")-1;
751 strcat(newbuf, match);
752 free(*orig);
753 *orig = newbuf;
754 }
755
756 LIBCOUCHBASE_API
757 void
758 lcbvb_replace_host(lcbvb_CONFIG *cfg, const char *hoststr)
759 {
760 unsigned ii, copy = 0;
761 char *replacement = (char *)hoststr;
762 if (strchr(replacement, ':')) {
763 size_t len = strlen(hoststr);
764 replacement = calloc(len + 2, sizeof(char));
765 replacement[0] = '[';
766 memcpy(replacement + 1, hoststr, len);
767 replacement[len + 1] = ']';
768 copy = 1;
769 }
770 for (ii = 0; ii < cfg->nsrv; ++ii) {
771 unsigned jj;
772 lcbvb_SERVER *srv = cfg->servers + ii;
773 lcbvb_SERVICES *svcs[] = { &srv->svc, &srv->svc_ssl };
774
775 replace_hoststr(&srv->hostname, hoststr);
776 for (jj = 0; jj < 2; ++jj) {
777 unsigned kk;
778 lcbvb_SERVICES *cursvc = svcs[jj];
779 replace_hoststr(&cursvc->views_base_, replacement);
780 for (kk = 0; kk < LCBVB_SVCTYPE__MAX; ++kk) {
781 replace_hoststr(&cursvc->hoststrs[kk], replacement);
782 }
783 }
784 /* reassign authority */
785 free(srv->authority);
786 srv->authority = strdup(srv->svc.hoststrs[LCBVB_SVCTYPE_DATA]);
787 }
788 if (copy) {
789 free(replacement);
790 }
791 if (cfg->dtype == LCBVB_DIST_KETAMA) {
792 update_ketama(cfg);
793 }
794 }
795
796 lcbvb_CONFIG *
797 lcbvb_parse_json(const char *js)
798 {
799 int rv;
800 lcbvb_CONFIG *cfg = calloc(1, sizeof(*cfg));
801 rv = lcbvb_load_json(cfg, js);
802 if (rv) {
803 lcbvb_destroy(cfg);
804 return NULL;
805 }
806 return cfg;
807 }
808
809 LIBCOUCHBASE_API
810 lcbvb_CONFIG *
811 lcbvb_create(void)
812 {
813 return calloc(1, sizeof(lcbvb_CONFIG));
814 }
815
816 static void
817 free_service_strs(lcbvb_SERVICES *svc)
818 {
819 unsigned ii;
820 for (ii = 0; ii < LCBVB_SVCTYPE__MAX; ii++) {
821 free(svc->hoststrs[ii]);
822 }
823 free(svc->views_base_);
824 free(svc->query_base_);
825 free(svc->fts_base_);
826 free(svc->cbas_base_);
827 }
828
829 void
830 lcbvb_destroy(lcbvb_CONFIG *conf)
831 {
832 unsigned ii;
833 for (ii = 0; ii < conf->nsrv; ii++) {
834 lcbvb_SERVER *srv = conf->servers + ii;
835 free(srv->hostname);
836 free(srv->viewpath);
837 free(srv->querypath);
838 free(srv->ftspath);
839 free(srv->cbaspath);
840 free_service_strs(&srv->svc);
841 free_service_strs(&srv->svc_ssl);
842 free(srv->authority);
843 free(srv->alt_hostname);
844 free_service_strs(&srv->alt_svc);
845 free_service_strs(&srv->alt_svc_ssl);
846 }
847 free(conf->servers);
848 free(conf->continuum);
849 free(conf->bname);
850 free(conf->vbuckets);
851 free(conf->ffvbuckets);
852 free(conf->randbuf);
853 free(conf);
854 }
855
856 static void
857 svcs_to_json(lcbvb_SERVICES *svc, cJSON *jsvc, int is_ssl)
858 {
859 cJSON *tmp;
860 const char *key;
861 #define EXTRACT_SERVICE(name, fld) \
862 if (svc->fld) { \
863 key = is_ssl ? name"SSL" : name; \
864 tmp = cJSON_CreateNumber(svc->fld); \
865 cJSON_AddItemToObject(jsvc, key, tmp); \
866 }
867
868 EXTRACT_SERVICE("mgmt", mgmt);
869 EXTRACT_SERVICE("capi", views);
870 EXTRACT_SERVICE("kv", data);
871 EXTRACT_SERVICE("n1ql", n1ql);
872 EXTRACT_SERVICE("indexScan", ixquery);
873 EXTRACT_SERVICE("indexAdmin", ixadmin);
874 EXTRACT_SERVICE("fts", fts);
875 EXTRACT_SERVICE("cbas", cbas);
876
877 #undef EXTRACT_SERVICE
878 }
879
880 LIBCOUCHBASE_API
881 char *
882 lcbvb_save_json(lcbvb_CONFIG *cfg)
883 {
884 unsigned ii;
885 char *ret;
886 cJSON *tmp = NULL, *nodes = NULL;
887 cJSON *root = cJSON_CreateObject();
888
889 if (cfg->dtype == LCBVB_DIST_VBUCKET) {
890 tmp = cJSON_CreateString("vbucket");
891 } else {
892 tmp = cJSON_CreateString("ketama");
893 }
894 cJSON_AddItemToObject(root, "nodeLocator", tmp);
895
896 if (cfg->revid > -1) {
897 tmp = cJSON_CreateNumber(cfg->revid);
898 cJSON_AddItemToObject(root, "rev", tmp);
899 }
900 tmp = cJSON_CreateString(cfg->bname);
901 cJSON_AddItemToObject(root, "name", tmp);
902
903 nodes = cJSON_CreateArray();
904 cJSON_AddItemToObject(root, "nodesExt", nodes);
905
906 for (ii = 0; ii < cfg->nsrv; ii++) {
907 cJSON *sj = cJSON_CreateObject(), *jsvc = cJSON_CreateObject();
908 lcbvb_SERVER *srv = cfg->servers + ii;
909
910 tmp = cJSON_CreateString(srv->hostname);
911 cJSON_AddItemToObject(sj, "hostname", tmp);
912 svcs_to_json(&srv->svc, jsvc, 0);
913 svcs_to_json(&srv->svc_ssl, jsvc, 1);
914
915 /* add the services to the server */
916 cJSON_AddItemToObject(sj, "services", jsvc);
917 cJSON_AddItemToArray(nodes, sj);
918 }
919
920 /* Now either add the vbucket or ketama stuff */
921 if (cfg->dtype == LCBVB_DIST_VBUCKET) {
922 cJSON *vbroot = cJSON_CreateObject();
923 cJSON *vbmap = cJSON_CreateArray();
924
925 tmp = cJSON_CreateNumber(cfg->nrepl);
926 cJSON_AddItemToObject(vbroot, "numReplicas", tmp);
927
928 for (ii = 0; ii < cfg->nvb; ii++) {
929 cJSON *curvb = cJSON_CreateIntArray(
930 cfg->vbuckets[ii].servers, cfg->nrepl+1);
931 cJSON_AddItemToArray(vbmap, curvb);
932 }
933
934 cJSON_AddItemToObject(vbroot, "vBucketMap", vbmap);
935 cJSON_AddItemToObject(root, "vBucketServerMap", vbroot);
936 }
937 if (cfg->caps != 0) {
938 cJSON *jcaps = cJSON_CreateArray();
939 if (cfg->caps & LCBVB_CAP_XATTR) {
940 cJSON_AddItemToArray(jcaps, cJSON_CreateString("xattr"));
941 }
942 if (cfg->caps & LCBVB_CAP_DCP) {
943 cJSON_AddItemToArray(jcaps, cJSON_CreateString("dcp"));
944 }
945 if (cfg->caps & LCBVB_CAP_CBHELLO) {
946 cJSON_AddItemToArray(jcaps, cJSON_CreateString("cbhello"));
947 }
948 if (cfg->caps & LCBVB_CAP_TOUCH) {
949 cJSON_AddItemToArray(jcaps, cJSON_CreateString("touch"));
950 }
951 if (cfg->caps & LCBVB_CAP_COUCHAPI) {
952 cJSON_AddItemToArray(jcaps, cJSON_CreateString("couchapi"));
953 }
954 if (cfg->caps & LCBVB_CAP_CCCP) {
955 cJSON_AddItemToArray(jcaps, cJSON_CreateString("cccp"));
956 }
957 if (cfg->caps & LCBVB_CAP_XDCR_CHECKPOINTING) {
958 cJSON_AddItemToArray(jcaps, cJSON_CreateString("xdcrCheckpointing"));
959 }
960 if (cfg->caps & LCBVB_CAP_NODES_EXT) {
961 cJSON_AddItemToArray(jcaps, cJSON_CreateString("nodesExt"));
962 }
963 cJSON_AddItemToObject(root, "bucketCapabilities", jcaps);
964 }
965
966 ret = cJSON_PrintUnformatted(root);
967 cJSON_Delete(root);
968 return ret;
969 }
970
971 /******************************************************************************
972 ******************************************************************************
973 ** Mapping Routines **
974 ******************************************************************************
975 ******************************************************************************/
976
977 static int
978 map_ketama(lcbvb_CONFIG *cfg, const void *key, size_t nkey)
979 {
980 uint32_t digest, mid, prev;
981 lcbvb_CONTINUUM *beginp, *endp, *midp, *highp, *lowp;
982 lcb_assert(cfg->continuum);
983 digest = vb__hash_ketama(key, nkey);
984 beginp = lowp = cfg->continuum;
985 endp = highp = cfg->continuum + cfg->ncontinuum;
986
987 /* divide and conquer array search to find server with next biggest
988 * point after what this key hashes to */
989 while (1)
990 {
991 /* pick the middle point */
992 midp = lowp + (highp - lowp) / 2;
993
994 if (midp == endp) {
995 /* if at the end, roll back to zeroth */
996 return beginp->index;
997 break;
998 }
999
1000 mid = midp->point;
1001 prev = (midp == beginp) ? 0 : (midp-1)->point;
1002
1003 if (digest <= mid && digest > prev) {
1004 /* we found nearest server */
1005 return midp->index;
1006 break;
1007 }
1008
1009 /* adjust the limits */
1010 if (mid < digest) {
1011 lowp = midp + 1;
1012 } else {
1013 highp = midp - 1;
1014 }
1015
1016 if (lowp > highp) {
1017 return beginp->index;
1018 break;
1019 }
1020 }
1021 return -1;
1022 }
1023
1024 int
1025 lcbvb_k2vb(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE n)
1026 {
1027 uint32_t digest = hash_crc32(k, n);
1028 return digest % cfg->nvb;
1029 }
1030
1031 int
1032 lcbvb_vbmaster(lcbvb_CONFIG *cfg, int vbid)
1033 {
1034 return cfg->vbuckets[vbid].servers[0];
1035 }
1036
1037 int
1038 lcbvb_vbreplica(lcbvb_CONFIG *cfg, int vbid, unsigned ix)
1039 {
1040 if (ix < cfg->nrepl) {
1041 return cfg->vbuckets[vbid].servers[ix+1];
1042 } else {
1043 return -1;
1044 }
1045 }
1046
1047 /*
1048 * (https://www.couchbase.com/issues/browse/MB-12268?focusedCommentId=101894&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101894)
1049 *
1050 * So (from my possibly partially ignorant view of that matter) I'd do the following:
1051 *
1052 * 1) Send first request according to lated vbucket map you have.
1053 * If it works, we're good. Exit.
1054 *
1055 * 2) if that fails, look if you've newer vbucket map. If there's newer vbucket map
1056 * and it points to _different_ node, send request to that node and proceed
1057 * to step 3. Otherwise go to step 4
1058 *
1059 * 3) if newer node still gives you not-my-vbucket, go to step 4
1060 *
1061 * 4) if there's fast forward map in latest bucket info and fast forward map
1062 * points to different node, send request to that node. And go to step 5.
1063 * Otherwise (not ff map or it points to one of nodes you've tried already),
1064 * go to step 6
1065 *
1066 * 5) if ff map node request succeeds. Exit. Otherwise go to step 6.
1067 *
1068 * 6) Try first replica unless it's one of nodes you've already tried.
1069 * If it succeeds. Exit. Otherwise go to step 7.
1070 *
1071 * 7) Try all nodes in turn, prioritizing other replicas to beginning of list
1072 * and nodes you have already tried to end. If one of nodes agrees to perform
1073 * your request. Exit. Otherwise propagate error to back to app
1074 */
1075 int
1076 lcbvb_nmv_remap_ex(lcbvb_CONFIG *cfg, int vbid, int bad, int heuristic)
1077 {
1078 int cur = cfg->vbuckets[vbid].servers[0];
1079 int rv = cur;
1080 unsigned ii;
1081
1082 if (bad != cur) {
1083 return cur;
1084 }
1085
1086 /* if a forward table exists, then return the vbucket id from the forward table
1087 * and update that information in the current table. We also need to Update the
1088 * replica information for that vbucket */
1089
1090 if (cfg->ffvbuckets &&
1091 (rv = cfg->ffvbuckets[vbid].servers[0]) != bad && rv > -1) {
1092 memcpy(&cfg->vbuckets[vbid], &cfg->ffvbuckets[vbid], sizeof (lcbvb_VBUCKET));
1093 }
1094
1095 /* this path is usually only followed if fvbuckets is not present */
1096 if (heuristic && cur == bad) {
1097 int validrv = -1;
1098 for (ii = 0; ii < cfg->ndatasrv; ii++) {
1099 rv = (rv + 1) % cfg->ndatasrv;
1100 /* check that the new index has assigned vbuckets (master or replica) */
1101 if (cfg->servers[rv].nvbs) {
1102 validrv = rv;
1103 cfg->vbuckets[vbid].servers[0] = rv;
1104 break;
1105 }
1106 }
1107
1108 if (validrv == -1) {
1109 /* this should happen when there is only one valid node remaining
1110 * in the cluster, and we've removed serveral other nodes that are
1111 * still present in the map due to the grace period window.*/
1112 return -1;
1113 }
1114 }
1115
1116 if (rv == bad) {
1117 return -1;
1118 }
1119
1120 return rv;
1121 }
1122
1123
1124 int
1125 lcbvb_map_key(lcbvb_CONFIG *cfg, const void *key, lcb_SIZE nkey,
1126 int *vbid, int *srvix)
1127 {
1128 if (cfg->dtype == LCBVB_DIST_KETAMA) {
1129 *srvix = map_ketama(cfg, key, nkey);
1130 *vbid = 0;
1131 return 0;
1132 } else {
1133 *vbid = lcbvb_k2vb(cfg, key, nkey);
1134 *srvix = lcbvb_vbmaster(cfg, *vbid);
1135 }
1136 return 0;
1137 }
1138
1139 int
1140 lcbvb_has_vbucket(lcbvb_CONFIG *vbc, int vbid, int ix)
1141 {
1142 unsigned ii;
1143 lcbvb_VBUCKET *vb = & vbc->vbuckets[vbid];
1144 for (ii = 0; ii < vbc->nrepl+1; ii++) {
1145 if (vb->servers[ii] == ix) {
1146 return 1;
1147 }
1148 }
1149 return 0;
1150 }
1151
1152 /******************************************************************************
1153 ******************************************************************************
1154 ** Configuration Comparisons/Diffs **
1155 ******************************************************************************
1156 ******************************************************************************/
1157 static void
1158 compute_vb_list_diff(lcbvb_CONFIG *from, lcbvb_CONFIG *to, char **out)
1159 {
1160 int offset = 0;
1161 unsigned ii, jj;
1162
1163 for (ii = 0; ii < to->nsrv; ii++) {
1164 int found = 0;
1165 lcbvb_SERVER *newsrv = to->servers + ii;
1166 for (jj = 0; !found && jj < from->nsrv; jj++) {
1167 lcbvb_SERVER *oldsrv = from->servers + jj;
1168 found |= (strcmp(newsrv->authority, oldsrv->authority) == 0);
1169 }
1170 if (!found) {
1171 char *infostr = malloc(strlen(newsrv->authority) + 128);
1172 lcb_assert(infostr);
1173 sprintf(infostr, "%s(Data=%d, Index=%d, Query=%d)", newsrv->authority, newsrv->svc.data, newsrv->svc.ixquery, newsrv->svc.n1ql);
1174 out[offset] = infostr;
1175 ++offset;
1176 }
1177 }
1178 }
1179
1180 lcbvb_CONFIGDIFF *
1181 lcbvb_compare(lcbvb_CONFIG *from, lcbvb_CONFIG *to)
1182 {
1183 int nservers;
1184 lcbvb_CONFIGDIFF *ret;
1185 unsigned ii;
1186
1187 ret = calloc(1, sizeof(*ret));
1188 nservers = (from->nsrv > to->nsrv ? from->nsrv : to->nsrv) + 1;
1189 ret->servers_added = calloc(nservers, sizeof(*ret->servers_added));
1190 ret->servers_removed = calloc(nservers, sizeof(*ret->servers_removed));
1191 compute_vb_list_diff(from, to, ret->servers_added);
1192 compute_vb_list_diff(to, from, ret->servers_removed);
1193
1194 if (to->nsrv == from->nsrv) {
1195 for (ii = 0; ii < from->nsrv; ii++) {
1196 const char *sa, *sb;
1197 sa = from->servers[ii].authority;
1198 sb = to->servers[ii].authority;
1199 ret->sequence_changed |= (0 != strcmp(sa, sb));
1200 }
1201 } else {
1202 ret->sequence_changed = 1;
1203 }
1204
1205 if (from->nvb == to->nvb) {
1206 for (ii = 0; ii < from->nvb; ii++) {
1207 lcbvb_VBUCKET *vba = from->vbuckets + ii, *vbb = to->vbuckets + ii;
1208 if (vba->servers[0] != vbb->servers[0]) {
1209 ret->n_vb_changes++;
1210 }
1211 }
1212 } else {
1213 ret->n_vb_changes = -1;
1214 }
1215 return ret;
1216 }
1217
1218 static void
1219 free_array_helper(char **l)
1220 {
1221 int ii;
1222 for (ii = 0; l[ii]; ii++) {
1223 free(l[ii]);
1224 }
1225 free(l);
1226 }
1227
1228 void
1229 lcbvb_free_diff(lcbvb_CONFIGDIFF *diff) {
1230 lcb_assert(diff);
1231 free_array_helper(diff->servers_added);
1232 free_array_helper(diff->servers_removed);
1233 free(diff);
1234 }
1235
1236
1237 lcbvb_CHANGETYPE
1238 lcbvb_get_changetype(lcbvb_CONFIGDIFF *diff)
1239 {
1240 lcbvb_CHANGETYPE ret = 0;
1241 if (diff->n_vb_changes) {
1242 ret |= LCBVB_MAP_MODIFIED;
1243 }
1244 if (*diff->servers_added || *diff->servers_removed || diff->sequence_changed) {
1245 ret |= LCBVB_SERVERS_MODIFIED;
1246 }
1247 return ret;
1248 }
1249
1250 /******************************************************************************
1251 ******************************************************************************
1252 ** String/Port Getters **
1253 ******************************************************************************
1254 ******************************************************************************/
1255
1256 static const lcbvb_SERVICES *
1257 get_svc(const lcbvb_SERVER *srv, lcbvb_SVCMODE mode)
1258 {
1259 if (srv->alt_hostname) {
1260 if (mode == LCBVB_SVCMODE_PLAIN) {
1261 return &srv->alt_svc;
1262 } else {
1263 return &srv->alt_svc_ssl;
1264 }
1265 } else {
1266 if (mode == LCBVB_SVCMODE_PLAIN) {
1267 return &srv->svc;
1268 } else {
1269 return &srv->svc_ssl;
1270 }
1271 }
1272 }
1273
1274 static const char *
1275 get_hostname(const lcbvb_SERVER *srv)
1276 {
1277 if (srv->alt_hostname) {
1278 return srv->alt_hostname;
1279 } else {
1280 return srv->hostname;
1281 }
1282 }
1283
1284 LIBCOUCHBASE_API
1285 unsigned
1286 lcbvb_get_port(lcbvb_CONFIG *cfg,
1287 unsigned ix, lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1288 {
1289 const lcbvb_SERVICES *svc;
1290 lcbvb_SERVER *srv;
1291 if (type >= LCBVB_SVCTYPE__MAX || mode >= LCBVB_SVCMODE__MAX) {
1292 return 0;
1293 }
1294 if (ix >= cfg->nsrv) {
1295 return 0;
1296 }
1297
1298 srv = cfg->servers + ix;
1299 svc = get_svc(srv, mode);
1300
1301 if (type == LCBVB_SVCTYPE_DATA) {
1302 return svc->data;
1303 } else if (type == LCBVB_SVCTYPE_MGMT) {
1304 return svc->mgmt;
1305 } else if (type == LCBVB_SVCTYPE_VIEWS) {
1306 return svc->views;
1307 } else if (type == LCBVB_SVCTYPE_IXADMIN) {
1308 return svc->ixadmin;
1309 } else if (type == LCBVB_SVCTYPE_IXQUERY) {
1310 return svc->ixquery;
1311 } else if (type == LCBVB_SVCTYPE_N1QL) {
1312 return svc->n1ql;
1313 } else if (type == LCBVB_SVCTYPE_FTS) {
1314 return svc->fts;
1315 } else if (type == LCBVB_SVCTYPE_CBAS) {
1316 return svc->cbas;
1317 } else {
1318 return 0;
1319 }
1320 }
1321
1322 LIBCOUCHBASE_API
1323 const char *
1324 lcbvb_get_hostport(lcbvb_CONFIG *cfg,
1325 unsigned ix, lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1326 {
1327 char **strp;
1328 lcbvb_SERVER *srv;
1329 lcbvb_SERVICES *svc;
1330 unsigned port = lcbvb_get_port(cfg, ix, type, mode);
1331
1332 if (!port) {
1333 return NULL;
1334 }
1335
1336 srv = cfg->servers + ix;
1337 svc = (lcbvb_SERVICES *)get_svc(srv, mode);
1338
1339 strp = &svc->hoststrs[type];
1340 if (*strp == NULL) {
1341 const char *hostname = get_hostname(srv);
1342 size_t strn = strlen(hostname) + 20;
1343 *strp = calloc(strn, sizeof(char));
1344 copy_address(*strp, strn, hostname, port);
1345 }
1346 return *strp;
1347 }
1348
1349 LIBCOUCHBASE_API
1350 const char *
1351 lcbvb_get_hostname(const lcbvb_CONFIG *cfg, unsigned ix)
1352 {
1353 if (cfg->nsrv > ix) {
1354 return get_hostname(cfg->servers + ix);
1355 } else {
1356 return NULL;
1357 }
1358 }
1359
1360 LIBCOUCHBASE_API
1361 int
1362 lcbvb_get_randhost_ex(const lcbvb_CONFIG *cfg,
1363 lcbvb_SVCTYPE type, lcbvb_SVCMODE mode, int *used)
1364 {
1365 size_t nn, oix = 0;
1366
1367 /*
1368 * Since not all nodes support all service types, we need to make it a
1369 * fair selection by pre-filtering the nodes which actually support the
1370 * service, and then proceed to actually select a suitable node.
1371 */
1372 for (nn = 0; nn < cfg->nsrv; nn++) {
1373 const lcbvb_SERVER *server = cfg->servers + nn;
1374 const lcbvb_SERVICES *svcs = get_svc(server, mode);
1375 int has_svc = 0;
1376
1377 // Check if this node is in the exclude list
1378 if (used && used[nn]) {
1379 continue;
1380 }
1381
1382 has_svc =
1383 (type == LCBVB_SVCTYPE_DATA && svcs->data) ||
1384 (type == LCBVB_SVCTYPE_IXADMIN && svcs->ixadmin) ||
1385 (type == LCBVB_SVCTYPE_IXQUERY && svcs->ixquery) ||
1386 (type == LCBVB_SVCTYPE_MGMT && svcs->mgmt) ||
1387 (type == LCBVB_SVCTYPE_N1QL && svcs->n1ql) ||
1388 (type == LCBVB_SVCTYPE_FTS && svcs->fts) ||
1389 (type == LCBVB_SVCTYPE_VIEWS && svcs->views) ||
1390 (type == LCBVB_SVCTYPE_CBAS && svcs->cbas);
1391
1392 if (has_svc) {
1393 cfg->randbuf[oix++] = (int)nn;
1394 }
1395 }
1396
1397 if (!oix) {
1398 /* nothing supports it! */
1399 return -1;
1400 }
1401
1402 nn = rand();
1403 nn %= oix;
1404 return cfg->randbuf[nn];
1405 }
1406
1407 LIBCOUCHBASE_API
1408 int
1409 lcbvb_get_randhost(const lcbvb_CONFIG *cfg,
1410 lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1411 {
1412 return lcbvb_get_randhost_ex(cfg, type, mode, NULL);
1413 }
1414
1415 LIBCOUCHBASE_API
1416 const char *
1417 lcbvb_get_resturl(lcbvb_CONFIG *cfg, unsigned ix,
1418 lcbvb_SVCTYPE svc, lcbvb_SVCMODE mode)
1419 {
1420 char **strp;
1421 const char *prefix;
1422 const char *path;
1423
1424 lcbvb_SERVER *srv;
1425 lcbvb_SERVICES *svcs;
1426 unsigned port;
1427 port = lcbvb_get_port(cfg, ix, svc, mode);
1428 if (!port) {
1429 return NULL;
1430 }
1431
1432 srv = cfg->servers + ix;
1433 if (mode == LCBVB_SVCMODE_PLAIN) {
1434 prefix = "http";
1435 } else {
1436 prefix = "https";
1437 }
1438 svcs = (lcbvb_SERVICES *)get_svc(srv, mode);
1439
1440 if (svc == LCBVB_SVCTYPE_VIEWS) {
1441 path = srv->viewpath;
1442 strp = &svcs->views_base_;
1443 } else if (svc == LCBVB_SVCTYPE_N1QL) {
1444 path = srv->querypath;
1445 strp = &svcs->query_base_;
1446 } else if (svc == LCBVB_SVCTYPE_FTS) {
1447 path = srv->ftspath;
1448 strp = &svcs->fts_base_;
1449 } else if (svc == LCBVB_SVCTYPE_CBAS) {
1450 path = srv->cbaspath;
1451 strp = &svcs->cbas_base_;
1452 } else {
1453 /* Unknown service! */
1454 return NULL;
1455 }
1456
1457 if (path == NULL) {
1458 return NULL;
1459 } else if (!*strp) {
1460 char buf[4096];
1461 const char *hostname = get_hostname(srv);
1462 if (strchr(hostname, ':')) {
1463 // IPv6 and should be bracketed
1464 snprintf(buf, sizeof(buf), "%s://[%s]:%d%s", prefix, hostname, port, path);
1465 } else {
1466 snprintf(buf, sizeof(buf), "%s://%s:%d%s", prefix, hostname, port, path);
1467 }
1468 *strp = strdup(buf);
1469 }
1470
1471 return *strp;
1472 }
1473
1474 LIBCOUCHBASE_API
1475 const char *
1476 lcbvb_get_capibase(lcbvb_CONFIG *cfg, unsigned ix, lcbvb_SVCMODE mode)
1477 {
1478 return lcbvb_get_resturl(cfg, ix, LCBVB_SVCTYPE_VIEWS, mode);
1479 }
1480
1481 LIBCOUCHBASE_API int lcbvb_get_revision(const lcbvb_CONFIG *cfg) {
1482 return cfg->revid;
1483 }
1484 LIBCOUCHBASE_API unsigned lcbvb_get_nservers(const lcbvb_CONFIG *cfg) {
1485 return cfg->nsrv;
1486 }
1487 LIBCOUCHBASE_API unsigned lcbvb_get_nreplicas(const lcbvb_CONFIG *cfg) {
1488 return cfg->nrepl;
1489 }
1490 LIBCOUCHBASE_API unsigned lcbvb_get_nvbuckets(const lcbvb_CONFIG *cfg) {
1491 return cfg->nvb;
1492 }
1493 LIBCOUCHBASE_API lcbvb_DISTMODE lcbvb_get_distmode(const lcbvb_CONFIG *cfg) {
1494 return cfg->dtype;
1495 }
1496 LIBCOUCHBASE_API const char *lcbvb_get_error(const lcbvb_CONFIG *cfg) {
1497 return cfg->errstr;
1498 }
1499 /******************************************************************************
1500 ******************************************************************************
1501 ** Generation Functions **
1502 ******************************************************************************
1503 ******************************************************************************/
1504
1505 static void copy_service(const char *hostname, const lcbvb_SERVICES *src, lcbvb_SERVICES *dst)
1506 {
1507 *dst = *src;
1508 memset(&dst->hoststrs, 0, sizeof dst->hoststrs);
1509 if (src->views_base_) {
1510 dst->views_base_ = strdup(src->views_base_);
1511 }
1512 if (src->query_base_) {
1513 dst->query_base_ = strdup(src->query_base_);
1514 }
1515 if (src->fts_base_) {
1516 dst->fts_base_ = strdup(src->fts_base_);
1517 }
1518 if (src->cbas_base_) {
1519 dst->cbas_base_ = strdup(src->cbas_base_);
1520 }
1521 if (dst->data) {
1522 char buf[4096];
1523 copy_address(buf, sizeof(buf), hostname, dst->data);
1524 dst->hoststrs[LCBVB_SVCTYPE_DATA] = strdup(buf);
1525 }
1526 }
1527
1528 LIBCOUCHBASE_API
1529 int
1530 lcbvb_genconfig_ex(lcbvb_CONFIG *vb,
1531 const char *name, const char *uuid,
1532 const lcbvb_SERVER *servers,
1533 unsigned nservers, unsigned nreplica, unsigned nvbuckets)
1534 {
1535 unsigned ii, jj;
1536 int srvix = 0, in_nondata = 0;
1537
1538 lcb_assert(nservers);
1539
1540 if (!name) {
1541 name = "default";
1542 }
1543
1544 memset(vb, 0, sizeof(*vb));
1545 vb->dtype = LCBVB_DIST_VBUCKET;
1546 vb->nvb = nvbuckets;
1547 vb->nrepl = nreplica;
1548 vb->nsrv = nservers;
1549 vb->bname = strdup(name);
1550 (void)uuid;
1551
1552 if (nreplica >= nservers) {
1553 vb->errstr = "nservers must be > nreplicas";
1554 return -1;
1555 }
1556
1557 if (nreplica > 4) {
1558 vb->errstr = "Replicas must be <= 4";
1559 return -1;
1560 }
1561
1562 /* Count the number of data servers.. */
1563 for (ii = 0; ii < nservers; ii++) {
1564 const lcbvb_SERVER *server = servers + ii;
1565 if (server->svc.data) {
1566 if (in_nondata) {
1567 vb->errstr = "All data servers must be specified before non-data servers";
1568 return -1;
1569 }
1570 vb->ndatasrv++;
1571 } else {
1572 in_nondata = 1;
1573 }
1574 }
1575
1576 if (vb->nvb) {
1577 vb->vbuckets = malloc(vb->nvb * sizeof(*vb->vbuckets));
1578 if (!vb->vbuckets) {
1579 vb->errstr = "Couldn't allocate vbucket array";
1580 return -1;
1581 }
1582 }
1583
1584 for (ii = 0; ii < vb->nvb; ii++) {
1585 lcbvb_VBUCKET *cur = vb->vbuckets + ii;
1586 cur->servers[0] = srvix;
1587 for (jj = 1; jj < vb->nrepl+1; jj++) {
1588 cur->servers[jj] = (srvix + jj) % vb->ndatasrv;
1589 }
1590 srvix = (srvix + 1) % vb->ndatasrv;
1591 }
1592
1593 vb->servers = calloc(vb->nsrv, sizeof(*vb->servers));
1594 vb->randbuf = calloc(vb->nsrv, sizeof(*vb->randbuf));
1595
1596 for (ii = 0; ii < vb->nsrv; ii++) {
1597 lcbvb_SERVER *dst = vb->servers + ii;
1598 const lcbvb_SERVER *src = servers + ii;
1599
1600 *dst = *src;
1601 dst->hostname = strdup(src->hostname);
1602 if (src->viewpath) {
1603 dst->viewpath = strdup(src->viewpath);
1604 }
1605 if (src->querypath) {
1606 dst->querypath = strdup(src->querypath);
1607 }
1608 if (src->ftspath) {
1609 dst->ftspath = strdup(src->ftspath);
1610 }
1611 if (src->cbaspath) {
1612 dst->cbaspath = strdup(src->cbaspath);
1613 }
1614
1615 copy_service(src->hostname, &src->svc, &dst->svc);
1616 copy_service(src->hostname, &src->svc_ssl, &dst->svc_ssl);
1617 {
1618 char tmpbuf[MAX_AUTHORITY_SIZE] = {0};
1619 copy_address(tmpbuf, sizeof(tmpbuf), dst->hostname, dst->svc.data);
1620 dst->authority = strdup(tmpbuf);
1621 }
1622 }
1623
1624 for (ii = 0; ii < vb->nvb; ii++) {
1625 for (jj = 0; jj < vb->nrepl+1; jj++) {
1626 int ix = vb->vbuckets[ii].servers[jj];
1627 if (ix >= 0) {
1628 vb->servers[ix].nvbs++;
1629 }
1630 }
1631 }
1632 return 0;
1633 }
1634
1635 int
1636 lcbvb_genconfig(lcbvb_CONFIG *vb,
1637 unsigned nservers, unsigned nreplica, unsigned nvbuckets)
1638 {
1639 unsigned ii;
1640 int rv;
1641 lcbvb_SERVER *srvarry;
1642
1643 srvarry = calloc(nservers, sizeof(*srvarry));
1644 for (ii = 0; ii < nservers; ii++) {
1645 srvarry[ii].svc.data = 1000 + ii;
1646 srvarry[ii].svc.views = 2000 + ii;
1647 srvarry[ii].svc.mgmt = 3000 + ii;
1648 srvarry[ii].hostname = "localhost";
1649 srvarry[ii].svc.views_base_ = "/default";
1650 }
1651 rv = lcbvb_genconfig_ex(vb,
1652 "default", NULL, srvarry, nservers, nreplica, nvbuckets);
1653 free(srvarry);
1654 return rv;
1655 }
1656
1657 void
1658 lcbvb_genffmap(lcbvb_CONFIG *cfg)
1659 {
1660 size_t ii;
1661 lcb_assert(cfg->nrepl);
1662 if (cfg->ffvbuckets) {
1663 free(cfg->ffvbuckets);
1664 }
1665 cfg->ffvbuckets = calloc(cfg->nvb, sizeof *cfg->ffvbuckets);
1666 for (ii = 0; ii < cfg->nvb; ++ii) {
1667 size_t jj;
1668 lcbvb_VBUCKET *vb = cfg->ffvbuckets + ii;
1669 memcpy(vb, cfg->vbuckets + ii, sizeof *vb);
1670 for (jj = 0; jj < cfg->ndatasrv; ++jj) {
1671 vb->servers[jj] = (vb->servers[jj] + 1) % cfg->ndatasrv;
1672 }
1673 }
1674 }
1675
1676 void
1677 lcbvb_make_ketama(lcbvb_CONFIG *vb)
1678 {
1679 if (vb->dtype == LCBVB_DIST_KETAMA) {
1680 return;
1681 }
1682 vb->dtype = LCBVB_DIST_KETAMA;
1683 vb->nrepl = 0;
1684 vb->nvb = 0;
1685 update_ketama(vb);
1686 }
1687
1688
1689 /******************************************************************************
1690 ******************************************************************************
1691 ** Compatibility APIs **
1692 ******************************************************************************
1693 ******************************************************************************/
1694 LIBCOUCHBASE_API lcbvb_CONFIG* vbucket_config_create(void) {
1695 return lcbvb_create();
1696 }
1697 LIBCOUCHBASE_API void vbucket_config_destroy(lcbvb_CONFIG*h) {
1698 lcbvb_destroy(h);
1699 }
1700 LIBCOUCHBASE_API int vbucket_config_parse(lcbvb_CONFIG*h, vbucket_source_t src, const char *s) {
1701 (void)src; return lcbvb_load_json(h, s);
1702 }
1703 LIBCOUCHBASE_API const char * vbucket_get_error_message(lcbvb_CONFIG*h) {
1704 return h->errstr;
1705 }
1706 LIBCOUCHBASE_API int vbucket_config_get_num_servers(lcbvb_CONFIG *cfg) {
1707 return cfg->nsrv;
1708 }
1709 LIBCOUCHBASE_API int vbucket_config_get_num_replicas(lcbvb_CONFIG *cfg) {
1710 return cfg->nrepl;
1711 }
1712 LIBCOUCHBASE_API int vbucket_config_get_num_vbuckets(lcbvb_CONFIG *cfg) {
1713 return cfg->nvb;
1714 }
1715 LIBCOUCHBASE_API const char *vbucket_config_get_server(lcbvb_CONFIG *cfg, int ix) {
1716 return lcbvb_get_hostport(cfg, ix, LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN);
1717 }
1718 LIBCOUCHBASE_API const char *vbucket_config_get_rest_api_server(lcbvb_CONFIG *cfg, int ix) {
1719 return lcbvb_get_hostport(cfg, ix, LCBVB_SVCTYPE_MGMT, LCBVB_SVCMODE_PLAIN);
1720 }
1721 LIBCOUCHBASE_API const char *vbucket_config_get_couch_api_base(lcbvb_CONFIG *cfg, int ix) {
1722 return lcbvb_get_capibase(cfg, ix, LCBVB_SVCMODE_PLAIN);
1723 }
1724 LIBCOUCHBASE_API lcbvb_DISTMODE vbucket_config_get_distribution_type(lcbvb_CONFIG *cfg) {
1725 return cfg->dtype;
1726 }
1727 LIBCOUCHBASE_API int vbucket_map(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE nk, int *pvb, int *pix) {
1728 return lcbvb_map_key(cfg, k, nk, pvb, pix);
1729 }
1730 LIBCOUCHBASE_API int vbucket_get_vbucket_by_key(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE nk) {
1731 return lcbvb_k2vb(cfg, k, nk);
1732 }
1733 LIBCOUCHBASE_API int vbucket_get_master(lcbvb_CONFIG *cfg, int vb) {
1734 return lcbvb_vbmaster(cfg, vb);
1735 }
1736 LIBCOUCHBASE_API int vbucket_get_replica(lcbvb_CONFIG *cfg, int vb, int repl) {
1737 return lcbvb_vbreplica(cfg, vb, repl);
1738 }
1739 LIBCOUCHBASE_API lcbvb_CONFIGDIFF *vbucket_compare(lcbvb_CONFIG*a,lcbvb_CONFIG*b) {
1740 return lcbvb_compare(a,b);
1741 }
1742 LIBCOUCHBASE_API void vbucket_free_diff(lcbvb_CONFIGDIFF *p) {
1743 lcbvb_free_diff(p);
1744 }
1745 LIBCOUCHBASE_API int vbucket_config_get_revision(lcbvb_CONFIG *p) {
1746 return lcbvb_get_revision(p);
1747 }
1748 LIBCOUCHBASE_API lcbvb_CHANGETYPE vbucket_what_changed(lcbvb_CONFIGDIFF *diff) {
1749 return lcbvb_get_changetype(diff);
1750 }
1751 LIBCOUCHBASE_API int vbucket_config_generate(lcbvb_CONFIG*cfg, unsigned nsrv, unsigned nrepl, unsigned nvb) {
1752 return lcbvb_genconfig(cfg,nsrv,nrepl,nvb);
1753 }
1754