1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdlib.h>
19 #include <string.h>
20 #include <stddef.h>
21 #include <libcouchbase/couchbase.h>
22 #include <libcouchbase/vbucket.h>
23 #include "config.h"
24 #include "contrib/cJSON/cJSON.h"
25 #include "json-inl.h"
26 #include "hash.h"
27 #include "crc32.h"
28 
29 #define STRINGIFY_(X) #X
30 #define STRINGIFY(X) STRINGIFY_(X)
31 #define MAX_AUTHORITY_SIZE 100
32 #define SET_ERRSTR(cfg, s) if (!(cfg)->errstr) { \
33     (cfg)->errstr = __FILE__ ":" STRINGIFY(__LINE__) " " s ; \
34 }
35 
36 /******************************************************************************
37  ******************************************************************************
38  ** Core Parsing Routines                                                    **
39  ******************************************************************************
40  ******************************************************************************/
41 static lcbvb_VBUCKET *
42 build_vbmap(lcbvb_CONFIG *cfg, cJSON *cj, unsigned *nitems)
43 {
44     lcbvb_VBUCKET *vblist = NULL;
sepgsql_attribute_post_create(Oid relOid,AttrNumber attnum)45     cJSON *jvb;
46     unsigned ii, nalloc;
47 
48     /** FIXME: Realloc dynamically when too small */
49     if (!(nalloc = cJSON_GetArraySize(cj))) {
50         goto GT_ERR;
51     }
52 
53     if (!(vblist = calloc(nalloc, sizeof(*vblist)))) {
54         goto GT_ERR;
55     }
56 
57     /* Iterate over all the vbuckets */
58     jvb = cj->child;
59     for (ii = 0; ii < nalloc && jvb; ++ii, jvb = jvb->next) {
60         cJSON *jsix;
61         lcbvb_VBUCKET *cvb;
62         unsigned jj, nservers;
63 
64         if (jvb->type != cJSON_Array) {
65             goto GT_ERR;
66         }
67 
68         nservers = cJSON_GetArraySize(jvb);
69         jsix = jvb->child;
70         cvb = vblist + ii;
71 
72         /* Iterate over each index in the vbucket */
73         for (jj = 0; jj < nservers && jsix; ++jj, jsix = jsix->next) {
74             if (jsix->type != cJSON_Number) {
75                 goto GT_ERR;
76             }
77             cvb->servers[jj] = jsix->valueint;
78             if (cvb->servers[jj] > (int)cfg->nsrv-1) {
79                 SET_ERRSTR(cfg, "Invalid vBucket map received from server. Above-bounds vBucket target found");
80                 goto GT_ERR;
81             }
82         }
83     }
84 
85     *nitems = nalloc;
86     return vblist;
87 
88     GT_ERR:
89     free(vblist);
90     return NULL;
91 }
92 
93 static void copy_address(char *buf, size_t nbuf, const char *host, lcb_U16 port)
94 {
95     if (strchr(host, ':')) {
96         // IPv6 and should be bracketed
97         snprintf(buf, nbuf, "[%s]:%d", host, port);
98     } else {
99         snprintf(buf, nbuf, "%s:%d", host, port);
100     }
101 }
102 
103 static lcbvb_SERVER *
104 find_server_memd(lcbvb_SERVER *servers, unsigned n, const char *s)
105 {
106     unsigned ii;
107     for (ii = 0; ii < n; ii++) {
108         char buf[4096] = { 0 };
109         lcbvb_SERVER *cur = servers + ii;
110         copy_address(buf, sizeof(buf), cur->hostname, cur->svc.data);
111         if (!strncmp(s, buf, sizeof(buf))) {
112             return cur;
113         }
114     }
115     return NULL;
116 }
117 
118 static int
119 assign_dumy_server(lcbvb_CONFIG *cfg, lcbvb_SERVER *dst, const char *s)
120 {
121     int itmp;
122     char *colon;
123     if (!(dst->authority = strdup(s))) {
124         SET_ERRSTR(cfg, "Couldn't allocate authority string");
125         goto GT_ERR;
126     }
127 
128     if (!(colon = strstr(s, ":"))) {
129         SET_ERRSTR(cfg, "Badly formatted name string");
130         goto GT_ERR;
131     }
132 
133     if (sscanf(colon+1, "%d", &itmp) != 1) {
134         SET_ERRSTR(cfg, "Badly formatted port");
sepgsql_attribute_drop(Oid relOid,AttrNumber attnum)135         goto GT_ERR;
136     }
137 
138     dst->svc.data = itmp;
139     return 1;
140 
141     GT_ERR:
142     free(dst->authority);
143     return 0;
144 }
145 
146 static void
147 set_vb_count(lcbvb_CONFIG *cfg, lcbvb_VBUCKET *vbs)
148 {
149     unsigned ii, jj;
150     if (!vbs) {
151         return;
152     }
153 
154     for (ii = 0; ii < cfg->nvb; ++ii) {
155         for (jj = 0; jj < cfg->nrepl+1; ++jj) {
156             int ix = vbs[ii].servers[jj];
157             if (ix < 0 || (unsigned)ix > cfg->nsrv) {
158                 continue;
159             }
160             cfg->servers[ix].nvbs++;
161         }
162     }
163 }
164 
165 static int
166 pair_server_list(lcbvb_CONFIG *cfg, cJSON *vbconfig)
sepgsql_attribute_relabel(Oid relOid,AttrNumber attnum,const char * seclabel)167 {
168     cJSON *servers;
169     lcbvb_SERVER *newlist = NULL;
170     unsigned ii, nsrv;
171 
172     if (!get_jarray(vbconfig, "serverList", &servers)) {
173         SET_ERRSTR(cfg, "Couldn't find serverList");
174         goto GT_ERROR;
175     }
176 
177     nsrv = cJSON_GetArraySize(servers);
178 
179     if (nsrv > cfg->nsrv) {
180         /* nodes in serverList which are not in nodes/nodesExt */
181         void *tmp = realloc(cfg->servers, sizeof(*cfg->servers) * nsrv);
182         if (!tmp) {
183             SET_ERRSTR(cfg, "Couldn't allocate memory for server list");
184             goto GT_ERROR;
185         }
186         cfg->servers = tmp;
187         cfg->nsrv = nsrv;
188     }
189 
190     /* allocate an array for the reordered server list */
191     newlist = calloc(nsrv, sizeof(*cfg->servers));
192 
193     for (ii = 0; ii < nsrv; ii++) {
194         char *tmp;
195         cJSON *jst;
196         lcbvb_SERVER *cur;
197         jst = cJSON_GetArrayItem(servers, ii);
198         tmp = jst->valuestring;
199         cur = find_server_memd(cfg->servers, cfg->nsrv, tmp);
200 
201         if (cur) {
202             newlist[ii] = *cur;
203         } else {
204             /* found server inside serverList but not in nodes? */
205             if (!assign_dumy_server(cfg, &newlist[ii], tmp)) {
206                 goto GT_ERROR;
207             }
208         }
209     }
210 
211     free(cfg->servers);
212     cfg->servers = newlist;
213     return 1;
214 
215     GT_ERROR:
216     free(newlist);
217     return 0;
218 
219 }
220 
221 static int
222 parse_vbucket(lcbvb_CONFIG *cfg, cJSON *cj)
223 {
224     cJSON *vbconfig, *vbmap, *ffmap = NULL;
225 
226     if (!get_jobj(cj, "vBucketServerMap", &vbconfig)) {
227         SET_ERRSTR(cfg, "Expected top-level 'vBucketServerMap'");
228         goto GT_ERROR;
229     }
230 
231     if (!get_juint(vbconfig, "numReplicas", &cfg->nrepl)) {
232         SET_ERRSTR(cfg, "'numReplicas' missing");
233         goto GT_ERROR;
234     }
235 
236     if (!get_jarray(vbconfig, "vBucketMap", &vbmap)) {
237         SET_ERRSTR(cfg, "Missing 'vBucketMap'");
238         goto GT_ERROR;
239     }
240 
241     get_jarray(vbconfig, "vBucketMapForward", &ffmap);
sepgsql_relation_post_create(Oid relOid)242 
243     if ((cfg->vbuckets = build_vbmap(cfg, vbmap, &cfg->nvb)) == NULL) {
244         goto GT_ERROR;
245     }
246 
247     if (ffmap && (cfg->ffvbuckets = build_vbmap(cfg, ffmap, &cfg->nvb)) == NULL) {
248         goto GT_ERROR;
249     }
250 
251     if (!cfg->is3x) {
252         if (!pair_server_list(cfg, vbconfig)) {
253             goto GT_ERROR;
254         }
255     }
256 
257     /** Now figure out which server goes where */
258     set_vb_count(cfg, cfg->vbuckets);
259     set_vb_count(cfg, cfg->ffvbuckets);
260     return 1;
261 
262     GT_ERROR:
263     return 0;
264 }
265 
266 static int server_cmp(const void *s1, const void *s2)
267 {
268     return strcmp(((const lcbvb_SERVER *)s1)->authority,
269         ((const lcbvb_SERVER *)s2)->authority);
270 }
271 
272 static int continuum_item_cmp(const void *t1, const void *t2)
273 {
274     const lcbvb_CONTINUUM *ct1 = t1, *ct2 = t2;
275 
276     if (ct1->point == ct2->point) {
277         return 0;
278     } else if (ct1->point > ct2->point) {
279         return 1;
280     } else {
281         return -1;
282     }
283 }
284 
285 static int
286 update_ketama(lcbvb_CONFIG *cfg)
287 {
288     char host[MAX_AUTHORITY_SIZE+10] = "";
289     int nhost;
290     unsigned pp, hh, ss, nn;
291     unsigned char digest[16];
292     lcbvb_CONTINUUM *new_continuum, *old_continuum;
293 
294     qsort(cfg->servers, cfg->ndatasrv, sizeof(*cfg->servers), server_cmp);
295 
296     new_continuum = calloc(160 * cfg->ndatasrv, sizeof(*new_continuum));
297     /* 40 hashes, 4 numbers per hash = 160 points per server */
298     for (ss = 0, pp = 0; ss < cfg->ndatasrv; ++ss) {
299         /* we can add more points to server which have more memory */
300         for (hh = 0; hh < 40; ++hh) {
301             lcbvb_SERVER *srv = cfg->servers + ss;
302             nhost = snprintf(host, MAX_AUTHORITY_SIZE+10, "%s-%u", srv->authority, hh);
303             vb__hash_md5(host, nhost, digest);
304             for (nn = 0; nn < 4; ++nn, ++pp) {
305                 new_continuum[pp].index = ss;
306                 new_continuum[pp].point = ((uint32_t) (digest[3 + nn * 4] & 0xFF) << 24)
307                                         | ((uint32_t) (digest[2 + nn * 4] & 0xFF) << 16)
308                                         | ((uint32_t) (digest[1 + nn * 4] & 0xFF) << 8)
309                                         | (digest[0 + nn * 4] & 0xFF);
310             }
311         }
312     }
313 
314     qsort(new_continuum, pp, sizeof *new_continuum, continuum_item_cmp);
315     old_continuum = cfg->continuum;
316     cfg->continuum = new_continuum;
317     cfg->ncontinuum = pp;
318     free(old_continuum);
319     return 1;
320 }
321 
322 static int
323 extract_services(lcbvb_CONFIG *cfg, cJSON *jsvc, lcbvb_SERVICES *svc, int is_ssl)
324 {
325     int itmp;
326     int rv;
327     const char *key;
328 
329     #define EXTRACT_SERVICE(k, fld) \
330         key = is_ssl ? k"SSL" : k; \
331         rv = get_jint(jsvc, key, &itmp); \
332         if (rv) { svc->fld = itmp; } else { svc->fld = 0; }
333 
334     EXTRACT_SERVICE("kv", data);
335     EXTRACT_SERVICE("mgmt", mgmt);
336     EXTRACT_SERVICE("capi", views);
337     EXTRACT_SERVICE("n1ql", n1ql);
338     EXTRACT_SERVICE("fts", fts);
339     EXTRACT_SERVICE("indexAdmin", ixadmin);
340     EXTRACT_SERVICE("indexScan", ixquery);
341     EXTRACT_SERVICE("cbas", cbas);
342 
343     #undef EXTRACT_SERVICE
344 
345     (void)cfg;
346     return 1;
347 }
348 
349 static int
350 build_server_strings(lcbvb_CONFIG *cfg, lcbvb_SERVER *server)
351 {
352     /* get the authority */
353     char tmpbuf[4096];
354 
355     copy_address(tmpbuf, sizeof(tmpbuf), server->hostname, server->svc.data);
356     server->authority = strdup(tmpbuf);
357     if (!server->authority) {
358         SET_ERRSTR(cfg, "Couldn't allocate authority");
359         return 0;
360     }
361 
362     server->svc.hoststrs[LCBVB_SVCTYPE_DATA] = strdup(server->authority);
363     if (server->viewpath == NULL && server->svc.views) {
364         server->viewpath = malloc(strlen(cfg->bname) + 2);
365         sprintf(server->viewpath, "/%s", cfg->bname);
366     }
367     if (server->querypath == NULL && server->svc.n1ql) {
368         server->querypath = strdup("/query/service");
369     }
370     if (server->ftspath == NULL && server->svc.fts) {
371         server->ftspath = strdup("/");
372     }
373     if (server->cbaspath == NULL && server->svc.cbas) {
374         server->cbaspath = strdup("/query/service");
375     }
376     return 1;
377 }
378 
379 /**
380  * Parse a node from the 'nodesExt' array
381  * @param cfg
382  * @param server
383  * @param js
384  * @return
385  */
386 static int
387 build_server_3x(lcbvb_CONFIG *cfg, lcbvb_SERVER *server, cJSON *js, char **network)
388 {
389     cJSON *jsvcs;
390     char *htmp;
391 
392     if (!get_jstr(js, "hostname", &htmp)) {
393         htmp = "$HOST";
394     }
395     if (!(server->hostname = strdup(htmp))) {
396         SET_ERRSTR(cfg, "Couldn't allocate memory");
397         goto GT_ERR;
398     }
399 
400     if (!get_jobj(js, "services", &jsvcs)) {
401         SET_ERRSTR(cfg, "Couldn't find 'services'");
402         goto GT_ERR;
403     }
404 
405     if (!extract_services(cfg, jsvcs, &server->svc, 0)) {
406         goto GT_ERR;
407     }
408     if (!extract_services(cfg, jsvcs, &server->svc_ssl, 1)) {
409         goto GT_ERR;
410     }
411 
412     if (!build_server_strings(cfg, server)) {
413         goto GT_ERR;
414     }
415 
416     if (network && *network && strcmp(*network, "default") != 0) {
417         cJSON *jaltaddr = cJSON_GetObjectItem(js, "alternateAddresses");
sepgsql_relation_drop(Oid relOid)418         if (jaltaddr && jaltaddr->type == cJSON_Object) {
419             cJSON *jnetwork = cJSON_GetObjectItem(jaltaddr, *network);
420             if (jnetwork && get_jstr(jnetwork, "hostname", &htmp)) {
421                 cJSON *jports;
422                 server->alt_hostname = strdup(htmp);
423                 jports = cJSON_GetObjectItem(jnetwork, "ports");
424                 if (jports && jports->type == cJSON_Object) {
425                     extract_services(cfg, jports, &server->alt_svc, 0);
426                     extract_services(cfg, jports, &server->alt_svc_ssl, 1);
427                 }
428 
429 #define COPY_SERVICE(src, dst)                                          \
430                 if ((dst)->data == 0) (dst)->data = (src)->data;        \
431                 if ((dst)->mgmt == 0) (dst)->mgmt = (src)->mgmt;        \
432                 if ((dst)->views == 0) (dst)->views = (src)->views;     \
433                 if ((dst)->n1ql == 0) (dst)->n1ql = (src)->n1ql;        \
434                 if ((dst)->fts == 0) (dst)->fts = (src)->fts;           \
435                 if ((dst)->ixadmin == 0) (dst)->ixadmin = (src)->ixadmin; \
436                 if ((dst)->ixquery == 0) (dst)->ixquery = (src)->ixquery; \
437                 if ((dst)->cbas == 0) (dst)->cbas = (src)->cbas;
438 
439                 COPY_SERVICE(&server->svc, &server->alt_svc);
440                 COPY_SERVICE(&server->svc_ssl, &server->alt_svc_ssl);
441 
442 #undef COPY_SERVICE
443             }
444         }
445     }
446 
447     return 1;
448 
449     GT_ERR:
450     return 0;
451 }
452 
453 /**
454  * Initialize a server from a JSON Object
455  * @param server The server to initialize
456  * @param js The object which contains the server information
457  * @return nonzero on success, 0 on failure.
458  */
459 static int
460 build_server_2x(lcbvb_CONFIG *cfg, lcbvb_SERVER *server, cJSON *js, char **network)
461 {
462     char *tmp = NULL, *colon;
463     int itmp;
464     cJSON *jsports;
465 
466     if (!get_jstr(js, "hostname", &tmp)) {
467         SET_ERRSTR(cfg, "Couldn't find hostname");
468         goto GT_ERR;
469     }
470 
471     /** Hostname is the _rest_ API host, e.g. '8091' */
472     if ((server->hostname = strdup(tmp)) == NULL) {
473         SET_ERRSTR(cfg, "Couldn't allocate hostname");
474         goto GT_ERR;
475     }
476 
477     colon = strchr(server->hostname, ':');
478     if (!colon) {
479         SET_ERRSTR(cfg, "Expected ':' in 'hostname'");
480         goto GT_ERR;
481     }
482     if (sscanf(colon+1, "%d", &itmp) != 1) {
483         SET_ERRSTR(cfg, "Expected port after ':'");
484         goto GT_ERR;
485     }
486 
487     /* plain mgmt port is extracted from hostname */
488     server->svc.mgmt = itmp;
489     *colon = '\0';
490 
491     /** Handle the views name */
492     if (get_jstr(js, "couchApiBase", &tmp)) {
493         /** Have views */
494         char *path_begin;
495         colon = strrchr(tmp, ':');
496 
497         if (!colon) {
498             /* no port */
499             goto GT_ERR;
500         }
501         if (sscanf(colon+1, "%d", &itmp) != 1) {
502             goto GT_ERR;
503         }
504 
505         /* Assign the port */
506         server->svc.views = itmp;
507         path_begin = strstr(colon, "/");
508         if (!path_begin) {
509             SET_ERRSTR(cfg, "Expected path in couchApiBase");
510             goto GT_ERR;
511         }
512         server->viewpath = strdup(path_begin);
513     } else {
514         server->svc.views = 0;
515     }
516 
517     /* get the 'ports' dictionary */
518     if (!get_jobj(js, "ports", &jsports)) {
519         SET_ERRSTR(cfg, "Expected 'ports' dictionary");
520         goto GT_ERR;
521     }
522 
523     /* memcached port */
524     if (get_jint(jsports, "direct", &itmp)) {
525         server->svc.data = itmp;
sepgsql_relation_relabel(Oid relOid,const char * seclabel)526     } else {
527         SET_ERRSTR(cfg, "Expected 'direct' field in 'ports'");
528         goto GT_ERR;
529     }
530 
531     /* set the authority */
532     if (!build_server_strings(cfg, server)) {
533         goto GT_ERR;
534     }
535     return 1;
536 
537     GT_ERR:
538     return 0;
539 }
540 
541 static void
542 guess_network(cJSON *jnodes, int nsrv, const char *source, char **network)
543 {
544     int ii;
545     for (ii = 0; ii < nsrv; ii++) {
546         cJSON *jsrv = cJSON_GetArrayItem(jnodes, ii);
547         {
548             cJSON *jhostname = cJSON_GetObjectItem(jsrv, "hostname");
549             if (jhostname && jhostname->type == cJSON_String) {
550                 if (strcmp(jhostname->valuestring, source) == 0) {
551                     *network = strdup("default");
552                     return;
553                 }
554             }
555         }
556         {
557             cJSON *jaltaddr = cJSON_GetObjectItem(jsrv, "alternateAddresses");
558             if (jaltaddr && jaltaddr->type == cJSON_Object) {
559                 cJSON *cur;
560                 for (cur = jaltaddr->child; cur != NULL; cur = cur->next) {
561                     if (cur->type == cJSON_Object) {
562                         cJSON *jhostname = cJSON_GetObjectItem(cur, "hostname");
563                         if (jhostname && jhostname->type == cJSON_String) {
564                             if (strcmp(jhostname->valuestring, source) == 0) {
565                                 *network = strdup(cur->string);
566                                 return;
567                             }
568                         }
569                     }
570                 }
571             }
572         }
573     }
574     *network = strdup("default");
575 }
576 
sepgsql_relation_setattr(Oid relOid)577 int
578 lcbvb_load_json_ex(lcbvb_CONFIG *cfg, const char *data, const char *source, char **network)
579 {
580     cJSON *cj = NULL, *jnodes_ext = NULL, *jnodes = NULL;
581     char *tmp = NULL;
582     unsigned ii, jnodes_size = 0;
583     int jnodes_defined = 0;
584 
585     if ((cj = cJSON_Parse(data)) == NULL) {
586         SET_ERRSTR(cfg, "Couldn't parse JSON");
587         goto GT_ERROR;
588     }
589 
590     if (!get_jstr(cj, "name", &tmp)) {
591         SET_ERRSTR(cfg, "Expected 'name' key");
592         goto GT_ERROR;
593     }
594     cfg->bname = strdup(tmp);
595 
596     if (!get_jstr(cj, "nodeLocator", &tmp)) {
597         SET_ERRSTR(cfg, "Expected 'nodeLocator' key");
598         goto GT_ERROR;
599     }
600 
601     get_jarray(cj, "nodes", &jnodes);
602     if (jnodes) {
603         jnodes_defined = 1;
604         jnodes_size = cJSON_GetArraySize(jnodes);
605     }
606     if (get_jarray(cj, "nodesExt", &jnodes_ext)) {
607         cfg->is3x = 1;
608         cfg->nsrv = cJSON_GetArraySize(jnodes_ext);
609         jnodes = jnodes_ext;
610     } else if (jnodes == NULL) {
611         SET_ERRSTR(cfg, "expected 'nodesExt' or 'nodes' array");
612         goto GT_ERROR;
613     }
614 
615     if (!strcmp(tmp, "ketama")) {
616         cfg->dtype = LCBVB_DIST_KETAMA;
617     } else {
618         cfg->dtype = LCBVB_DIST_VBUCKET;
619     }
620 
621     if (!get_jint(cj, "rev", &cfg->revid)) {
622         cfg->revid = -1;
623     }
624 
625     cfg->caps = 0;
626     {
627         cJSON *jcaps = NULL;
628         if (get_jarray(cj, "bucketCapabilities", &jcaps)) {
629             unsigned ncaps = cJSON_GetArraySize(jcaps);
630             for (ii = 0; ii < ncaps; ii++) {
631                 cJSON *jcap = cJSON_GetArrayItem(jcaps, ii);
632                 if (jcap || jcap->type == cJSON_String) {
633                     if (strcmp(jcap->valuestring, "xattr") == 0) {
634                         cfg->caps |= LCBVB_CAP_XATTR;
635                     } else if (strcmp(jcap->valuestring, "dcp") == 0) {
636                         cfg->caps |= LCBVB_CAP_DCP;
637                     } else if (strcmp(jcap->valuestring, "cbhello") == 0) {
638                         cfg->caps |= LCBVB_CAP_CBHELLO;
639                     } else if (strcmp(jcap->valuestring, "touch") == 0) {
640                         cfg->caps |= LCBVB_CAP_TOUCH;
641                     } else if (strcmp(jcap->valuestring, "couchapi") == 0) {
642                         cfg->caps |= LCBVB_CAP_COUCHAPI;
643                     } else if (strcmp(jcap->valuestring, "cccp") == 0) {
644                         cfg->caps |= LCBVB_CAP_CCCP;
645                     } else if (strcmp(jcap->valuestring, "xdcrCheckpointing") == 0) {
646                         cfg->caps |= LCBVB_CAP_XDCR_CHECKPOINTING;
647                     } else if (strcmp(jcap->valuestring, "nodesExt") == 0) {
648                         cfg->caps |= LCBVB_CAP_NODES_EXT;
649                     }
650                 }
651             }
652         }
653     }
654 
655     /** Get the number of nodes. This traverses the list. Yuck */
656     cfg->nsrv = cJSON_GetArraySize(jnodes);
657 
658     if (network && *network == NULL) {
659         guess_network(jnodes, cfg->nsrv, source, network);
660     }
661 
662     /** Allocate a temporary one on the heap */
663     cfg->servers = calloc(cfg->nsrv, sizeof(*cfg->servers));
664     for (ii = 0; ii < cfg->nsrv; ii++) {
665         int rv;
666         cJSON *jsrv = cJSON_GetArrayItem(jnodes, ii);
667 
668         if (cfg->is3x) {
669             rv = build_server_3x(cfg, cfg->servers + ii, jsrv, network);
670             if (jnodes_defined && rv && ii >= jnodes_size) {
671                 cfg->servers[ii].svc.data = 0;
672                 cfg->servers[ii].svc_ssl.data = 0;
673                 cfg->servers[ii].alt_svc.data = 0;
674                 cfg->servers[ii].alt_svc_ssl.data = 0;
675             }
676         } else {
677             rv = build_server_2x(cfg, cfg->servers + ii, jsrv, network);
678         }
679 
680         if (!rv) {
681             SET_ERRSTR(cfg, "Failed to build server");
682             goto GT_ERROR;
683         }
684     }
685 
686     /* Count the number of _data_ servers in the cluster. Per the spec,
687      * these will always appear in order (so that we won't ever have "holes") */
688     for (ii = 0; ii < cfg->nsrv; ii++) {
689         if (!cfg->servers[ii].svc.data) {
690             break;
691         }
692     }
693     cfg->ndatasrv = ii;
694 
695     if (cfg->dtype == LCBVB_DIST_VBUCKET) {
696         if (!parse_vbucket(cfg, cj)) {
697             SET_ERRSTR(cfg, "Failed to parse vBucket map");
698             goto GT_ERROR;
699         }
700     } else {
701         /* If there is no $HOST then we can update the ketama config, otherwise
702          * we must wait for the hostname to be replaced! */
703         if (strstr(data, "$HOST") == NULL) {
704             if (!update_ketama(cfg)) {
705                 SET_ERRSTR(cfg, "Failed to establish ketama continuums");
706             }
707         }
708     }
709     cfg->servers = realloc(cfg->servers, sizeof(*cfg->servers) * cfg->nsrv);
710     cfg->randbuf = malloc(cfg->nsrv * sizeof(*cfg->randbuf));
711     cJSON_Delete(cj);
712     return 0;
713 
714     GT_ERROR:
715     if (cj) {
716         cJSON_Delete(cj);
717     }
718     return -1;
719 }
720 
721 int
722 lcbvb_load_json(lcbvb_CONFIG *cfg, const char *data)
723 {
sepgsql_index_modify(Oid indexOid)724     return lcbvb_load_json_ex(cfg, data, NULL, NULL);
725 }
726 
727 static void
728 replace_hoststr(char **orig, const char *replacement)
729 {
730     char *match;
731     char *newbuf;
732 
733     if (!*orig) {
734         return;
735     }
736 
737     match = strstr(*orig, "$HOST");
738     if (match == NULL || *match == '\0') {
739         return;
740     }
741 
742     newbuf = malloc(strlen(*orig) + strlen(replacement));
743     *match = '\0';
744 
745     /* copy until the placeholder */
746     strcpy(newbuf, *orig);
747     /* copy the host */
748     strcat(newbuf, replacement);
749     /* copy after the placeholder */
750     match += sizeof("$HOST")-1;
751     strcat(newbuf, match);
752     free(*orig);
753     *orig = newbuf;
754 }
755 
756 LIBCOUCHBASE_API
757 void
758 lcbvb_replace_host(lcbvb_CONFIG *cfg, const char *hoststr)
759 {
760     unsigned ii, copy = 0;
761     char *replacement = (char *)hoststr;
762     if (strchr(replacement, ':')) {
763         size_t len = strlen(hoststr);
764         replacement = calloc(len + 2, sizeof(char));
765         replacement[0] = '[';
766         memcpy(replacement + 1, hoststr, len);
767         replacement[len + 1] = ']';
768         copy = 1;
769     }
770     for (ii = 0; ii < cfg->nsrv; ++ii) {
771         unsigned jj;
772         lcbvb_SERVER *srv = cfg->servers + ii;
773         lcbvb_SERVICES *svcs[] = { &srv->svc, &srv->svc_ssl };
774 
775         replace_hoststr(&srv->hostname, hoststr);
776         for (jj = 0; jj < 2; ++jj) {
777             unsigned kk;
778             lcbvb_SERVICES *cursvc = svcs[jj];
779             replace_hoststr(&cursvc->views_base_, replacement);
780             for (kk = 0; kk < LCBVB_SVCTYPE__MAX; ++kk) {
781                 replace_hoststr(&cursvc->hoststrs[kk], replacement);
782             }
783         }
784         /* reassign authority */
785         free(srv->authority);
786         srv->authority = strdup(srv->svc.hoststrs[LCBVB_SVCTYPE_DATA]);
787     }
788     if (copy) {
789         free(replacement);
790     }
791     if (cfg->dtype == LCBVB_DIST_KETAMA) {
792         update_ketama(cfg);
793     }
794 }
795 
796 lcbvb_CONFIG *
797 lcbvb_parse_json(const char *js)
798 {
799     int rv;
800     lcbvb_CONFIG *cfg = calloc(1, sizeof(*cfg));
801     rv = lcbvb_load_json(cfg, js);
802     if (rv) {
803         lcbvb_destroy(cfg);
804         return NULL;
805     }
806     return cfg;
807 }
808 
809 LIBCOUCHBASE_API
810 lcbvb_CONFIG *
811 lcbvb_create(void)
812 {
813     return calloc(1, sizeof(lcbvb_CONFIG));
814 }
815 
816 static void
817 free_service_strs(lcbvb_SERVICES *svc)
818 {
819     unsigned ii;
820     for (ii = 0; ii < LCBVB_SVCTYPE__MAX; ii++) {
821         free(svc->hoststrs[ii]);
822     }
823     free(svc->views_base_);
824     free(svc->query_base_);
825     free(svc->fts_base_);
826     free(svc->cbas_base_);
827 }
828 
829 void
830 lcbvb_destroy(lcbvb_CONFIG *conf)
831 {
832     unsigned ii;
833     for (ii = 0; ii < conf->nsrv; ii++) {
834         lcbvb_SERVER *srv = conf->servers + ii;
835         free(srv->hostname);
836         free(srv->viewpath);
837         free(srv->querypath);
838         free(srv->ftspath);
839         free(srv->cbaspath);
840         free_service_strs(&srv->svc);
841         free_service_strs(&srv->svc_ssl);
842         free(srv->authority);
843         free(srv->alt_hostname);
844         free_service_strs(&srv->alt_svc);
845         free_service_strs(&srv->alt_svc_ssl);
846     }
847     free(conf->servers);
848     free(conf->continuum);
849     free(conf->bname);
850     free(conf->vbuckets);
851     free(conf->ffvbuckets);
852     free(conf->randbuf);
853     free(conf);
854 }
855 
856 static void
857 svcs_to_json(lcbvb_SERVICES *svc, cJSON *jsvc, int is_ssl)
858 {
859     cJSON *tmp;
860     const char *key;
861     #define EXTRACT_SERVICE(name, fld) \
862     if (svc->fld) { \
863         key = is_ssl ? name"SSL" : name; \
864         tmp = cJSON_CreateNumber(svc->fld); \
865         cJSON_AddItemToObject(jsvc, key, tmp); \
866     }
867 
868     EXTRACT_SERVICE("mgmt", mgmt);
869     EXTRACT_SERVICE("capi", views);
870     EXTRACT_SERVICE("kv", data);
871     EXTRACT_SERVICE("n1ql", n1ql);
872     EXTRACT_SERVICE("indexScan", ixquery);
873     EXTRACT_SERVICE("indexAdmin", ixadmin);
874     EXTRACT_SERVICE("fts", fts);
875     EXTRACT_SERVICE("cbas", cbas);
876 
877     #undef EXTRACT_SERVICE
878 }
879 
880 LIBCOUCHBASE_API
881 char *
882 lcbvb_save_json(lcbvb_CONFIG *cfg)
883 {
884     unsigned ii;
885     char *ret;
886     cJSON *tmp = NULL, *nodes = NULL;
887     cJSON *root = cJSON_CreateObject();
888 
889     if (cfg->dtype == LCBVB_DIST_VBUCKET) {
890         tmp = cJSON_CreateString("vbucket");
891     } else {
892         tmp = cJSON_CreateString("ketama");
893     }
894     cJSON_AddItemToObject(root, "nodeLocator", tmp);
895 
896     if (cfg->revid > -1) {
897         tmp = cJSON_CreateNumber(cfg->revid);
898         cJSON_AddItemToObject(root, "rev", tmp);
899     }
900     tmp = cJSON_CreateString(cfg->bname);
901     cJSON_AddItemToObject(root, "name", tmp);
902 
903     nodes = cJSON_CreateArray();
904     cJSON_AddItemToObject(root, "nodesExt", nodes);
905 
906     for (ii = 0; ii < cfg->nsrv; ii++) {
907         cJSON *sj = cJSON_CreateObject(), *jsvc = cJSON_CreateObject();
908         lcbvb_SERVER *srv = cfg->servers + ii;
909 
910         tmp = cJSON_CreateString(srv->hostname);
911         cJSON_AddItemToObject(sj, "hostname", tmp);
912         svcs_to_json(&srv->svc, jsvc, 0);
913         svcs_to_json(&srv->svc_ssl, jsvc, 1);
914 
915         /* add the services to the server */
916         cJSON_AddItemToObject(sj, "services", jsvc);
917         cJSON_AddItemToArray(nodes, sj);
918     }
919 
920     /* Now either add the vbucket or ketama stuff */
921     if (cfg->dtype == LCBVB_DIST_VBUCKET) {
922         cJSON *vbroot = cJSON_CreateObject();
923         cJSON *vbmap = cJSON_CreateArray();
924 
925         tmp = cJSON_CreateNumber(cfg->nrepl);
926         cJSON_AddItemToObject(vbroot, "numReplicas", tmp);
927 
928         for (ii = 0; ii < cfg->nvb; ii++) {
929             cJSON *curvb = cJSON_CreateIntArray(
930                 cfg->vbuckets[ii].servers, cfg->nrepl+1);
931             cJSON_AddItemToArray(vbmap, curvb);
932         }
933 
934         cJSON_AddItemToObject(vbroot, "vBucketMap", vbmap);
935         cJSON_AddItemToObject(root, "vBucketServerMap", vbroot);
936     }
937     if (cfg->caps != 0) {
938         cJSON *jcaps = cJSON_CreateArray();
939         if (cfg->caps & LCBVB_CAP_XATTR) {
940             cJSON_AddItemToArray(jcaps, cJSON_CreateString("xattr"));
941         }
942         if (cfg->caps & LCBVB_CAP_DCP) {
943             cJSON_AddItemToArray(jcaps, cJSON_CreateString("dcp"));
944         }
945         if (cfg->caps & LCBVB_CAP_CBHELLO) {
946             cJSON_AddItemToArray(jcaps, cJSON_CreateString("cbhello"));
947         }
948         if (cfg->caps & LCBVB_CAP_TOUCH) {
949             cJSON_AddItemToArray(jcaps, cJSON_CreateString("touch"));
950         }
951         if (cfg->caps & LCBVB_CAP_COUCHAPI) {
952             cJSON_AddItemToArray(jcaps, cJSON_CreateString("couchapi"));
953         }
954         if (cfg->caps & LCBVB_CAP_CCCP) {
955             cJSON_AddItemToArray(jcaps, cJSON_CreateString("cccp"));
956         }
957         if (cfg->caps & LCBVB_CAP_XDCR_CHECKPOINTING) {
958             cJSON_AddItemToArray(jcaps, cJSON_CreateString("xdcrCheckpointing"));
959         }
960         if (cfg->caps & LCBVB_CAP_NODES_EXT) {
961             cJSON_AddItemToArray(jcaps, cJSON_CreateString("nodesExt"));
962         }
963         cJSON_AddItemToObject(root, "bucketCapabilities", jcaps);
964     }
965 
966     ret = cJSON_PrintUnformatted(root);
967     cJSON_Delete(root);
968     return ret;
969 }
970 
971 /******************************************************************************
972  ******************************************************************************
973  ** Mapping Routines                                                         **
974  ******************************************************************************
975  ******************************************************************************/
976 
977 static int
978 map_ketama(lcbvb_CONFIG *cfg, const void *key, size_t nkey)
979 {
980     uint32_t digest, mid, prev;
981     lcbvb_CONTINUUM *beginp, *endp, *midp, *highp, *lowp;
982     lcb_assert(cfg->continuum);
983     digest = vb__hash_ketama(key, nkey);
984     beginp = lowp = cfg->continuum;
985     endp = highp = cfg->continuum + cfg->ncontinuum;
986 
987     /* divide and conquer array search to find server with next biggest
988      * point after what this key hashes to */
989     while (1)
990     {
991         /* pick the middle point */
992         midp = lowp + (highp - lowp) / 2;
993 
994         if (midp == endp) {
995             /* if at the end, roll back to zeroth */
996             return beginp->index;
997             break;
998         }
999 
1000         mid = midp->point;
1001         prev = (midp == beginp) ? 0 : (midp-1)->point;
1002 
1003         if (digest <= mid && digest > prev) {
1004             /* we found nearest server */
1005             return midp->index;
1006             break;
1007         }
1008 
1009         /* adjust the limits */
1010         if (mid < digest) {
1011             lowp = midp + 1;
1012         } else {
1013             highp = midp - 1;
1014         }
1015 
1016         if (lowp > highp) {
1017             return beginp->index;
1018             break;
1019         }
1020     }
1021     return -1;
1022 }
1023 
1024 int
1025 lcbvb_k2vb(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE n)
1026 {
1027     uint32_t digest = hash_crc32(k, n);
1028     return digest % cfg->nvb;
1029 }
1030 
1031 int
1032 lcbvb_vbmaster(lcbvb_CONFIG *cfg, int vbid)
1033 {
1034     return cfg->vbuckets[vbid].servers[0];
1035 }
1036 
1037 int
1038 lcbvb_vbreplica(lcbvb_CONFIG *cfg, int vbid, unsigned ix)
1039 {
1040     if (ix < cfg->nrepl) {
1041         return cfg->vbuckets[vbid].servers[ix+1];
1042     } else {
1043         return -1;
1044     }
1045 }
1046 
1047 /*
1048  * (https://www.couchbase.com/issues/browse/MB-12268?focusedCommentId=101894&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-101894)
1049  *
1050  * So (from my possibly partially ignorant view of that matter) I'd do the following:
1051  *
1052  * 1) Send first request according to lated vbucket map you have.
1053  *    If it works, we're good. Exit.
1054  *
1055  * 2) if that fails, look if you've newer vbucket map. If there's newer vbucket map
1056  *    and it points to _different_ node, send request to that node and proceed
1057  *    to step 3. Otherwise go to step 4
1058  *
1059  * 3) if newer node still gives you not-my-vbucket, go to step 4
1060  *
1061  * 4) if there's fast forward map in latest bucket info and fast forward map
1062  *    points to different node, send request to that node. And go to step 5.
1063  *    Otherwise (not ff map or it points to one of nodes you've tried already),
1064  *    go to step 6
1065  *
1066  * 5) if ff map node request succeeds. Exit. Otherwise go to step 6.
1067  *
1068  * 6) Try first replica unless it's one of nodes you've already tried.
1069  *    If it succeeds. Exit. Otherwise go to step 7.
1070  *
1071  * 7) Try all nodes in turn, prioritizing other replicas to beginning of list
1072  *    and nodes you have already tried to end. If one of nodes agrees to perform
1073  *    your request. Exit. Otherwise propagate error to back to app
1074  */
1075 int
1076 lcbvb_nmv_remap_ex(lcbvb_CONFIG *cfg, int vbid, int bad, int heuristic)
1077 {
1078     int cur = cfg->vbuckets[vbid].servers[0];
1079     int rv = cur;
1080     unsigned ii;
1081 
1082     if (bad != cur) {
1083         return cur;
1084     }
1085 
1086     /* if a forward table exists, then return the vbucket id from the forward table
1087      * and update that information in the current table. We also need to Update the
1088      * replica information for that vbucket */
1089 
1090     if (cfg->ffvbuckets &&
1091             (rv = cfg->ffvbuckets[vbid].servers[0]) != bad && rv > -1) {
1092         memcpy(&cfg->vbuckets[vbid], &cfg->ffvbuckets[vbid], sizeof (lcbvb_VBUCKET));
1093     }
1094 
1095     /* this path is usually only followed if fvbuckets is not present */
1096     if (heuristic && cur == bad) {
1097         int validrv = -1;
1098         for (ii = 0; ii < cfg->ndatasrv; ii++) {
1099             rv = (rv + 1) % cfg->ndatasrv;
1100             /* check that the new index has assigned vbuckets (master or replica) */
1101             if (cfg->servers[rv].nvbs) {
1102                 validrv = rv;
1103                 cfg->vbuckets[vbid].servers[0] = rv;
1104                 break;
1105             }
1106         }
1107 
1108         if (validrv == -1) {
1109             /* this should happen when there is only one valid node remaining
1110              * in the cluster, and we've removed serveral other nodes that are
1111              * still present in the map due to the grace period window.*/
1112             return -1;
1113         }
1114     }
1115 
1116     if (rv == bad) {
1117         return -1;
1118     }
1119 
1120     return rv;
1121 }
1122 
1123 
1124 int
1125 lcbvb_map_key(lcbvb_CONFIG *cfg, const void *key, lcb_SIZE nkey,
1126     int *vbid, int *srvix)
1127 {
1128     if (cfg->dtype == LCBVB_DIST_KETAMA) {
1129         *srvix = map_ketama(cfg, key, nkey);
1130         *vbid = 0;
1131         return 0;
1132     } else {
1133         *vbid = lcbvb_k2vb(cfg, key, nkey);
1134         *srvix = lcbvb_vbmaster(cfg, *vbid);
1135     }
1136     return 0;
1137 }
1138 
1139 int
1140 lcbvb_has_vbucket(lcbvb_CONFIG *vbc, int vbid, int ix)
1141 {
1142     unsigned ii;
1143     lcbvb_VBUCKET *vb = & vbc->vbuckets[vbid];
1144     for (ii = 0; ii < vbc->nrepl+1; ii++) {
1145         if (vb->servers[ii] == ix) {
1146             return 1;
1147         }
1148     }
1149     return 0;
1150 }
1151 
1152 /******************************************************************************
1153  ******************************************************************************
1154  ** Configuration Comparisons/Diffs                                          **
1155  ******************************************************************************
1156  ******************************************************************************/
1157 static void
1158 compute_vb_list_diff(lcbvb_CONFIG *from, lcbvb_CONFIG *to, char **out)
1159 {
1160     int offset = 0;
1161     unsigned ii, jj;
1162 
1163     for (ii = 0; ii < to->nsrv; ii++) {
1164         int found = 0;
1165         lcbvb_SERVER *newsrv = to->servers + ii;
1166         for (jj = 0; !found && jj < from->nsrv; jj++) {
1167             lcbvb_SERVER *oldsrv = from->servers + jj;
1168             found |= (strcmp(newsrv->authority, oldsrv->authority) == 0);
1169         }
1170         if (!found) {
1171             char *infostr = malloc(strlen(newsrv->authority) + 128);
1172             lcb_assert(infostr);
1173             sprintf(infostr, "%s(Data=%d, Index=%d, Query=%d)", newsrv->authority, newsrv->svc.data, newsrv->svc.ixquery, newsrv->svc.n1ql);
1174             out[offset] = infostr;
1175             ++offset;
1176         }
1177     }
1178 }
1179 
1180 lcbvb_CONFIGDIFF *
1181 lcbvb_compare(lcbvb_CONFIG *from, lcbvb_CONFIG *to)
1182 {
1183     int nservers;
1184     lcbvb_CONFIGDIFF *ret;
1185     unsigned ii;
1186 
1187     ret = calloc(1, sizeof(*ret));
1188     nservers = (from->nsrv > to->nsrv ? from->nsrv : to->nsrv) + 1;
1189     ret->servers_added = calloc(nservers, sizeof(*ret->servers_added));
1190     ret->servers_removed = calloc(nservers, sizeof(*ret->servers_removed));
1191     compute_vb_list_diff(from, to, ret->servers_added);
1192     compute_vb_list_diff(to, from, ret->servers_removed);
1193 
1194     if (to->nsrv == from->nsrv) {
1195         for (ii = 0; ii < from->nsrv; ii++) {
1196             const char *sa, *sb;
1197             sa = from->servers[ii].authority;
1198             sb = to->servers[ii].authority;
1199             ret->sequence_changed |= (0 != strcmp(sa, sb));
1200         }
1201     } else {
1202         ret->sequence_changed = 1;
1203     }
1204 
1205     if (from->nvb == to->nvb) {
1206         for (ii = 0; ii < from->nvb; ii++) {
1207             lcbvb_VBUCKET *vba = from->vbuckets + ii, *vbb = to->vbuckets + ii;
1208             if (vba->servers[0] != vbb->servers[0]) {
1209                 ret->n_vb_changes++;
1210             }
1211         }
1212     } else {
1213         ret->n_vb_changes = -1;
1214     }
1215     return ret;
1216 }
1217 
1218 static void
1219 free_array_helper(char **l)
1220 {
1221     int ii;
1222     for (ii = 0; l[ii]; ii++) {
1223         free(l[ii]);
1224     }
1225     free(l);
1226 }
1227 
1228 void
1229 lcbvb_free_diff(lcbvb_CONFIGDIFF *diff) {
1230     lcb_assert(diff);
1231     free_array_helper(diff->servers_added);
1232     free_array_helper(diff->servers_removed);
1233     free(diff);
1234 }
1235 
1236 
1237 lcbvb_CHANGETYPE
1238 lcbvb_get_changetype(lcbvb_CONFIGDIFF *diff)
1239 {
1240     lcbvb_CHANGETYPE ret = 0;
1241     if (diff->n_vb_changes) {
1242         ret |= LCBVB_MAP_MODIFIED;
1243     }
1244     if (*diff->servers_added || *diff->servers_removed || diff->sequence_changed) {
1245         ret |= LCBVB_SERVERS_MODIFIED;
1246     }
1247     return ret;
1248 }
1249 
1250 /******************************************************************************
1251  ******************************************************************************
1252  ** String/Port Getters                                                      **
1253  ******************************************************************************
1254  ******************************************************************************/
1255 
1256 static const lcbvb_SERVICES *
1257 get_svc(const lcbvb_SERVER *srv, lcbvb_SVCMODE mode)
1258 {
1259     if (srv->alt_hostname) {
1260         if (mode == LCBVB_SVCMODE_PLAIN) {
1261             return &srv->alt_svc;
1262         } else {
1263             return &srv->alt_svc_ssl;
1264         }
1265     } else {
1266         if (mode == LCBVB_SVCMODE_PLAIN) {
1267             return &srv->svc;
1268         } else {
1269             return &srv->svc_ssl;
1270         }
1271     }
1272 }
1273 
1274 static const char *
1275 get_hostname(const lcbvb_SERVER *srv)
1276 {
1277     if (srv->alt_hostname) {
1278         return srv->alt_hostname;
1279     } else {
1280         return srv->hostname;
1281     }
1282 }
1283 
1284 LIBCOUCHBASE_API
1285 unsigned
1286 lcbvb_get_port(lcbvb_CONFIG *cfg,
1287     unsigned ix, lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1288 {
1289     const lcbvb_SERVICES *svc;
1290     lcbvb_SERVER *srv;
1291     if (type >= LCBVB_SVCTYPE__MAX || mode >= LCBVB_SVCMODE__MAX) {
1292         return 0;
1293     }
1294     if (ix >= cfg->nsrv) {
1295         return 0;
1296     }
1297 
1298     srv = cfg->servers + ix;
1299     svc = get_svc(srv, mode);
1300 
1301     if (type == LCBVB_SVCTYPE_DATA) {
1302         return svc->data;
1303     } else if (type == LCBVB_SVCTYPE_MGMT) {
1304         return svc->mgmt;
1305     } else if (type == LCBVB_SVCTYPE_VIEWS) {
1306         return svc->views;
1307     } else if (type == LCBVB_SVCTYPE_IXADMIN) {
1308         return svc->ixadmin;
1309     } else if (type == LCBVB_SVCTYPE_IXQUERY) {
1310         return svc->ixquery;
1311     } else if (type == LCBVB_SVCTYPE_N1QL) {
1312         return svc->n1ql;
1313     } else if (type == LCBVB_SVCTYPE_FTS) {
1314         return svc->fts;
1315     } else if (type == LCBVB_SVCTYPE_CBAS) {
1316         return svc->cbas;
1317     } else {
1318         return 0;
1319     }
1320 }
1321 
1322 LIBCOUCHBASE_API
1323 const char *
1324 lcbvb_get_hostport(lcbvb_CONFIG *cfg,
1325     unsigned ix, lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1326 {
1327     char **strp;
1328     lcbvb_SERVER *srv;
1329     lcbvb_SERVICES *svc;
1330     unsigned port = lcbvb_get_port(cfg, ix, type, mode);
1331 
1332     if (!port) {
1333         return NULL;
1334     }
1335 
1336     srv = cfg->servers + ix;
1337     svc = (lcbvb_SERVICES *)get_svc(srv, mode);
1338 
1339     strp = &svc->hoststrs[type];
1340     if (*strp == NULL) {
1341         const char *hostname = get_hostname(srv);
1342         size_t strn = strlen(hostname) + 20;
1343         *strp = calloc(strn, sizeof(char));
1344         copy_address(*strp, strn, hostname, port);
1345     }
1346     return *strp;
1347 }
1348 
1349 LIBCOUCHBASE_API
1350 const char *
1351 lcbvb_get_hostname(const lcbvb_CONFIG *cfg, unsigned ix)
1352 {
1353     if (cfg->nsrv > ix) {
1354         return get_hostname(cfg->servers + ix);
1355     } else {
1356         return NULL;
1357     }
1358 }
1359 
1360 LIBCOUCHBASE_API
1361 int
1362 lcbvb_get_randhost_ex(const lcbvb_CONFIG *cfg,
1363     lcbvb_SVCTYPE type, lcbvb_SVCMODE mode, int *used)
1364 {
1365     size_t nn, oix = 0;
1366 
1367     /*
1368      * Since not all nodes support all service types, we need to make it a
1369      * fair selection by pre-filtering the nodes which actually support the
1370      * service, and then proceed to actually select a suitable node.
1371      */
1372     for (nn = 0; nn < cfg->nsrv; nn++) {
1373         const lcbvb_SERVER *server = cfg->servers + nn;
1374         const lcbvb_SERVICES *svcs = get_svc(server, mode);
1375         int has_svc = 0;
1376 
1377         // Check if this node is in the exclude list
1378         if (used && used[nn]) {
1379             continue;
1380         }
1381 
1382         has_svc =
1383                 (type == LCBVB_SVCTYPE_DATA && svcs->data) ||
1384                 (type == LCBVB_SVCTYPE_IXADMIN && svcs->ixadmin) ||
1385                 (type == LCBVB_SVCTYPE_IXQUERY && svcs->ixquery) ||
1386                 (type == LCBVB_SVCTYPE_MGMT && svcs->mgmt) ||
1387                 (type == LCBVB_SVCTYPE_N1QL && svcs->n1ql) ||
1388                 (type == LCBVB_SVCTYPE_FTS && svcs->fts) ||
1389                 (type == LCBVB_SVCTYPE_VIEWS && svcs->views) ||
1390                 (type == LCBVB_SVCTYPE_CBAS && svcs->cbas);
1391 
1392         if (has_svc) {
1393             cfg->randbuf[oix++] = (int)nn;
1394         }
1395     }
1396 
1397     if (!oix) {
1398         /* nothing supports it! */
1399         return -1;
1400     }
1401 
1402     nn = rand();
1403     nn %= oix;
1404     return cfg->randbuf[nn];
1405 }
1406 
1407 LIBCOUCHBASE_API
1408 int
1409 lcbvb_get_randhost(const lcbvb_CONFIG *cfg,
1410     lcbvb_SVCTYPE type, lcbvb_SVCMODE mode)
1411 {
1412     return lcbvb_get_randhost_ex(cfg, type, mode, NULL);
1413 }
1414 
1415 LIBCOUCHBASE_API
1416 const char *
1417 lcbvb_get_resturl(lcbvb_CONFIG *cfg, unsigned ix,
1418     lcbvb_SVCTYPE svc, lcbvb_SVCMODE mode)
1419 {
1420     char **strp;
1421     const char *prefix;
1422     const char *path;
1423 
1424     lcbvb_SERVER *srv;
1425     lcbvb_SERVICES *svcs;
1426     unsigned port;
1427     port = lcbvb_get_port(cfg, ix, svc, mode);
1428     if (!port) {
1429         return NULL;
1430     }
1431 
1432     srv = cfg->servers + ix;
1433     if (mode == LCBVB_SVCMODE_PLAIN) {
1434         prefix = "http";
1435     } else {
1436         prefix = "https";
1437     }
1438     svcs = (lcbvb_SERVICES *)get_svc(srv, mode);
1439 
1440     if (svc == LCBVB_SVCTYPE_VIEWS) {
1441         path = srv->viewpath;
1442         strp = &svcs->views_base_;
1443     } else if (svc == LCBVB_SVCTYPE_N1QL) {
1444         path = srv->querypath;
1445         strp = &svcs->query_base_;
1446     } else if (svc == LCBVB_SVCTYPE_FTS) {
1447         path = srv->ftspath;
1448         strp = &svcs->fts_base_;
1449     } else if (svc == LCBVB_SVCTYPE_CBAS) {
1450         path = srv->cbaspath;
1451         strp = &svcs->cbas_base_;
1452     } else {
1453         /* Unknown service! */
1454         return NULL;
1455     }
1456 
1457     if (path == NULL) {
1458         return NULL;
1459     } else if (!*strp) {
1460         char buf[4096];
1461         const char *hostname = get_hostname(srv);
1462         if (strchr(hostname, ':')) {
1463             // IPv6 and should be bracketed
1464             snprintf(buf, sizeof(buf), "%s://[%s]:%d%s", prefix, hostname, port, path);
1465         } else {
1466             snprintf(buf, sizeof(buf), "%s://%s:%d%s", prefix, hostname, port, path);
1467         }
1468         *strp = strdup(buf);
1469     }
1470 
1471     return *strp;
1472 }
1473 
1474 LIBCOUCHBASE_API
1475 const char *
1476 lcbvb_get_capibase(lcbvb_CONFIG *cfg, unsigned ix, lcbvb_SVCMODE mode)
1477 {
1478     return lcbvb_get_resturl(cfg, ix, LCBVB_SVCTYPE_VIEWS, mode);
1479 }
1480 
1481 LIBCOUCHBASE_API int lcbvb_get_revision(const lcbvb_CONFIG *cfg) {
1482     return cfg->revid;
1483 }
1484 LIBCOUCHBASE_API unsigned lcbvb_get_nservers(const lcbvb_CONFIG *cfg) {
1485     return cfg->nsrv;
1486 }
1487 LIBCOUCHBASE_API unsigned lcbvb_get_nreplicas(const lcbvb_CONFIG *cfg) {
1488     return cfg->nrepl;
1489 }
1490 LIBCOUCHBASE_API unsigned lcbvb_get_nvbuckets(const lcbvb_CONFIG *cfg) {
1491     return cfg->nvb;
1492 }
1493 LIBCOUCHBASE_API lcbvb_DISTMODE lcbvb_get_distmode(const lcbvb_CONFIG *cfg) {
1494     return cfg->dtype;
1495 }
1496 LIBCOUCHBASE_API const char *lcbvb_get_error(const lcbvb_CONFIG *cfg) {
1497     return cfg->errstr;
1498 }
1499 /******************************************************************************
1500  ******************************************************************************
1501  ** Generation Functions                                                     **
1502  ******************************************************************************
1503  ******************************************************************************/
1504 
1505 static void copy_service(const char *hostname, const lcbvb_SERVICES *src, lcbvb_SERVICES *dst)
1506 {
1507     *dst = *src;
1508     memset(&dst->hoststrs, 0, sizeof dst->hoststrs);
1509     if (src->views_base_) {
1510         dst->views_base_ = strdup(src->views_base_);
1511     }
1512     if (src->query_base_) {
1513         dst->query_base_ = strdup(src->query_base_);
1514     }
1515     if (src->fts_base_) {
1516         dst->fts_base_ = strdup(src->fts_base_);
1517     }
1518     if (src->cbas_base_) {
1519         dst->cbas_base_ = strdup(src->cbas_base_);
1520     }
1521     if (dst->data) {
1522         char buf[4096];
1523         copy_address(buf, sizeof(buf), hostname, dst->data);
1524         dst->hoststrs[LCBVB_SVCTYPE_DATA] = strdup(buf);
1525     }
1526 }
1527 
1528 LIBCOUCHBASE_API
1529 int
1530 lcbvb_genconfig_ex(lcbvb_CONFIG *vb,
1531     const char *name, const char *uuid,
1532     const lcbvb_SERVER *servers,
1533     unsigned nservers, unsigned nreplica, unsigned nvbuckets)
1534 {
1535     unsigned ii, jj;
1536     int srvix = 0, in_nondata = 0;
1537 
1538     lcb_assert(nservers);
1539 
1540     if (!name) {
1541         name = "default";
1542     }
1543 
1544     memset(vb, 0, sizeof(*vb));
1545     vb->dtype = LCBVB_DIST_VBUCKET;
1546     vb->nvb = nvbuckets;
1547     vb->nrepl = nreplica;
1548     vb->nsrv = nservers;
1549     vb->bname = strdup(name);
1550     (void)uuid;
1551 
1552     if (nreplica >= nservers) {
1553         vb->errstr = "nservers must be > nreplicas";
1554         return -1;
1555     }
1556 
1557     if (nreplica > 4) {
1558         vb->errstr = "Replicas must be <= 4";
1559         return -1;
1560     }
1561 
1562     /* Count the number of data servers.. */
1563     for (ii = 0; ii < nservers; ii++) {
1564         const lcbvb_SERVER *server = servers + ii;
1565         if (server->svc.data) {
1566             if (in_nondata) {
1567                 vb->errstr = "All data servers must be specified before non-data servers";
1568                 return -1;
1569             }
1570             vb->ndatasrv++;
1571         } else {
1572             in_nondata = 1;
1573         }
1574     }
1575 
1576     if (vb->nvb) {
1577         vb->vbuckets = malloc(vb->nvb * sizeof(*vb->vbuckets));
1578         if (!vb->vbuckets) {
1579             vb->errstr = "Couldn't allocate vbucket array";
1580             return -1;
1581         }
1582     }
1583 
1584     for (ii = 0; ii < vb->nvb; ii++) {
1585         lcbvb_VBUCKET *cur = vb->vbuckets + ii;
1586         cur->servers[0] = srvix;
1587         for (jj = 1; jj < vb->nrepl+1; jj++) {
1588             cur->servers[jj] = (srvix + jj) % vb->ndatasrv;
1589         }
1590         srvix = (srvix + 1) % vb->ndatasrv;
1591     }
1592 
1593     vb->servers = calloc(vb->nsrv, sizeof(*vb->servers));
1594     vb->randbuf = calloc(vb->nsrv, sizeof(*vb->randbuf));
1595 
1596     for (ii = 0; ii < vb->nsrv; ii++) {
1597         lcbvb_SERVER *dst = vb->servers + ii;
1598         const lcbvb_SERVER *src = servers + ii;
1599 
1600         *dst = *src;
1601         dst->hostname = strdup(src->hostname);
1602         if (src->viewpath) {
1603             dst->viewpath = strdup(src->viewpath);
1604         }
1605         if (src->querypath) {
1606             dst->querypath = strdup(src->querypath);
1607         }
1608         if (src->ftspath) {
1609             dst->ftspath = strdup(src->ftspath);
1610         }
1611         if (src->cbaspath) {
1612             dst->cbaspath = strdup(src->cbaspath);
1613         }
1614 
1615         copy_service(src->hostname, &src->svc, &dst->svc);
1616         copy_service(src->hostname, &src->svc_ssl, &dst->svc_ssl);
1617         {
1618             char tmpbuf[MAX_AUTHORITY_SIZE] = {0};
1619             copy_address(tmpbuf, sizeof(tmpbuf), dst->hostname, dst->svc.data);
1620             dst->authority = strdup(tmpbuf);
1621         }
1622     }
1623 
1624     for (ii = 0; ii < vb->nvb; ii++) {
1625         for (jj = 0; jj < vb->nrepl+1; jj++) {
1626             int ix = vb->vbuckets[ii].servers[jj];
1627             if (ix >= 0) {
1628                 vb->servers[ix].nvbs++;
1629             }
1630         }
1631     }
1632     return 0;
1633 }
1634 
1635 int
1636 lcbvb_genconfig(lcbvb_CONFIG *vb,
1637     unsigned nservers, unsigned nreplica, unsigned nvbuckets)
1638 {
1639     unsigned ii;
1640     int rv;
1641     lcbvb_SERVER *srvarry;
1642 
1643     srvarry = calloc(nservers, sizeof(*srvarry));
1644     for (ii = 0; ii < nservers; ii++) {
1645         srvarry[ii].svc.data = 1000 + ii;
1646         srvarry[ii].svc.views = 2000 + ii;
1647         srvarry[ii].svc.mgmt = 3000 + ii;
1648         srvarry[ii].hostname = "localhost";
1649         srvarry[ii].svc.views_base_ = "/default";
1650     }
1651     rv = lcbvb_genconfig_ex(vb,
1652         "default", NULL, srvarry, nservers, nreplica, nvbuckets);
1653     free(srvarry);
1654     return rv;
1655 }
1656 
1657 void
1658 lcbvb_genffmap(lcbvb_CONFIG *cfg)
1659 {
1660     size_t ii;
1661     lcb_assert(cfg->nrepl);
1662     if (cfg->ffvbuckets) {
1663         free(cfg->ffvbuckets);
1664     }
1665     cfg->ffvbuckets = calloc(cfg->nvb, sizeof *cfg->ffvbuckets);
1666     for (ii = 0; ii < cfg->nvb; ++ii) {
1667         size_t jj;
1668         lcbvb_VBUCKET *vb = cfg->ffvbuckets + ii;
1669         memcpy(vb, cfg->vbuckets + ii, sizeof *vb);
1670         for (jj = 0; jj < cfg->ndatasrv; ++jj) {
1671             vb->servers[jj] = (vb->servers[jj] + 1) % cfg->ndatasrv;
1672         }
1673     }
1674 }
1675 
1676 void
1677 lcbvb_make_ketama(lcbvb_CONFIG *vb)
1678 {
1679     if (vb->dtype == LCBVB_DIST_KETAMA) {
1680         return;
1681     }
1682     vb->dtype = LCBVB_DIST_KETAMA;
1683     vb->nrepl = 0;
1684     vb->nvb = 0;
1685     update_ketama(vb);
1686 }
1687 
1688 
1689 /******************************************************************************
1690  ******************************************************************************
1691  ** Compatibility APIs                                                       **
1692  ******************************************************************************
1693  ******************************************************************************/
1694 LIBCOUCHBASE_API lcbvb_CONFIG* vbucket_config_create(void) {
1695     return lcbvb_create();
1696 }
1697 LIBCOUCHBASE_API void vbucket_config_destroy(lcbvb_CONFIG*h) {
1698     lcbvb_destroy(h);
1699 }
1700 LIBCOUCHBASE_API int vbucket_config_parse(lcbvb_CONFIG*h, vbucket_source_t src, const char *s) {
1701     (void)src; return lcbvb_load_json(h, s);
1702 }
1703 LIBCOUCHBASE_API const char * vbucket_get_error_message(lcbvb_CONFIG*h) {
1704     return h->errstr;
1705 }
1706 LIBCOUCHBASE_API int vbucket_config_get_num_servers(lcbvb_CONFIG *cfg) {
1707     return cfg->nsrv;
1708 }
1709 LIBCOUCHBASE_API int vbucket_config_get_num_replicas(lcbvb_CONFIG *cfg) {
1710     return cfg->nrepl;
1711 }
1712 LIBCOUCHBASE_API int vbucket_config_get_num_vbuckets(lcbvb_CONFIG *cfg) {
1713     return cfg->nvb;
1714 }
1715 LIBCOUCHBASE_API const char *vbucket_config_get_server(lcbvb_CONFIG *cfg, int ix) {
1716     return lcbvb_get_hostport(cfg, ix, LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN);
1717 }
1718 LIBCOUCHBASE_API const char *vbucket_config_get_rest_api_server(lcbvb_CONFIG *cfg, int ix) {
1719     return lcbvb_get_hostport(cfg, ix, LCBVB_SVCTYPE_MGMT, LCBVB_SVCMODE_PLAIN);
1720 }
1721 LIBCOUCHBASE_API const char *vbucket_config_get_couch_api_base(lcbvb_CONFIG *cfg, int ix) {
1722     return lcbvb_get_capibase(cfg, ix, LCBVB_SVCMODE_PLAIN);
1723 }
1724 LIBCOUCHBASE_API lcbvb_DISTMODE vbucket_config_get_distribution_type(lcbvb_CONFIG *cfg) {
1725     return cfg->dtype;
1726 }
1727 LIBCOUCHBASE_API int vbucket_map(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE nk, int *pvb, int *pix) {
1728     return lcbvb_map_key(cfg, k, nk, pvb, pix);
1729 }
1730 LIBCOUCHBASE_API int vbucket_get_vbucket_by_key(lcbvb_CONFIG *cfg, const void *k, lcb_SIZE nk) {
1731     return lcbvb_k2vb(cfg, k, nk);
1732 }
1733 LIBCOUCHBASE_API int vbucket_get_master(lcbvb_CONFIG *cfg, int vb) {
1734     return lcbvb_vbmaster(cfg, vb);
1735 }
1736 LIBCOUCHBASE_API int vbucket_get_replica(lcbvb_CONFIG *cfg, int vb, int repl) {
1737     return lcbvb_vbreplica(cfg, vb, repl);
1738 }
1739 LIBCOUCHBASE_API lcbvb_CONFIGDIFF *vbucket_compare(lcbvb_CONFIG*a,lcbvb_CONFIG*b) {
1740     return lcbvb_compare(a,b);
1741 }
1742 LIBCOUCHBASE_API void vbucket_free_diff(lcbvb_CONFIGDIFF *p) {
1743     lcbvb_free_diff(p);
1744 }
1745 LIBCOUCHBASE_API int vbucket_config_get_revision(lcbvb_CONFIG *p) {
1746     return lcbvb_get_revision(p);
1747 }
1748 LIBCOUCHBASE_API lcbvb_CHANGETYPE vbucket_what_changed(lcbvb_CONFIGDIFF *diff) {
1749     return lcbvb_get_changetype(diff);
1750 }
1751 LIBCOUCHBASE_API int vbucket_config_generate(lcbvb_CONFIG*cfg, unsigned nsrv, unsigned nrepl, unsigned nvb) {
1752     return lcbvb_genconfig(cfg,nsrv,nrepl,nvb);
1753 }
1754