1 /*-------------------------------------------------------------------------
2 *
3 * spgtextproc.c
4 * implementation of radix tree (compressed trie) over text
5 *
6 * In a text_ops SPGiST index, inner tuples can have a prefix which is the
7 * common prefix of all strings indexed under that tuple. The node labels
8 * represent the next byte of the string(s) after the prefix. Assuming we
9 * always use the longest possible prefix, we will get more than one node
10 * label unless the prefix length is restricted by SPGIST_MAX_PREFIX_LENGTH.
11 *
12 * To reconstruct the indexed string for any index entry, concatenate the
13 * inner-tuple prefixes and node labels starting at the root and working
14 * down to the leaf entry, then append the datum in the leaf entry.
15 * (While descending the tree, "level" is the number of bytes reconstructed
16 * so far.)
17 *
18 * However, there are two special cases for node labels: -1 indicates that
19 * there are no more bytes after the prefix-so-far, and -2 indicates that we
20 * had to split an existing allTheSame tuple (in such a case we have to create
21 * a node label that doesn't correspond to any string byte). In either case,
22 * the node label does not contribute anything to the reconstructed string.
23 *
24 * Previously, we used a node label of zero for both special cases, but
25 * this was problematic because one can't tell whether a string ending at
26 * the current level can be pushed down into such a child node. For
27 * backwards compatibility, we still support such node labels for reading;
28 * but no new entries will ever be pushed down into a zero-labeled child.
29 * No new entries ever get pushed into a -2-labeled child, either.
30 *
31 *
32 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
33 * Portions Copyright (c) 1994, Regents of the University of California
34 *
35 * IDENTIFICATION
36 * src/backend/access/spgist/spgtextproc.c
37 *
38 *-------------------------------------------------------------------------
39 */
40 #include "postgres.h"
41
42 #include "access/spgist.h"
43 #include "catalog/pg_type.h"
44 #include "mb/pg_wchar.h"
45 #include "utils/builtins.h"
46 #include "utils/datum.h"
47 #include "utils/pg_locale.h"
48
49
50 /*
51 * In the worst case, an inner tuple in a text radix tree could have as many
52 * as 258 nodes (one for each possible byte value, plus the two special
53 * cases). Each node can take 16 bytes on MAXALIGN=8 machines. The inner
54 * tuple must fit on an index page of size BLCKSZ. Rather than assuming we
55 * know the exact amount of overhead imposed by page headers, tuple headers,
56 * etc, we leave 100 bytes for that (the actual overhead should be no more
57 * than 56 bytes at this writing, so there is slop in this number).
58 * So we can safely create prefixes up to BLCKSZ - 258 * 16 - 100 bytes long.
59 * Unfortunately, because 258 * 16 is over 4K, there is no safe prefix length
60 * when BLCKSZ is less than 8K; it is always possible to get "SPGiST inner
61 * tuple size exceeds maximum" if there are too many distinct next-byte values
62 * at a given place in the tree. Since use of nonstandard block sizes appears
63 * to be negligible in the field, we just live with that fact for now,
64 * choosing a max prefix size of 32 bytes when BLCKSZ is configured smaller
65 * than default.
66 */
67 #define SPGIST_MAX_PREFIX_LENGTH Max((int) (BLCKSZ - 258 * 16 - 100), 32)
68
69 /* Struct for sorting values in picksplit */
70 typedef struct spgNodePtr
71 {
72 Datum d;
73 int i;
74 int16 c;
75 } spgNodePtr;
76
77
78 Datum
spg_text_config(PG_FUNCTION_ARGS)79 spg_text_config(PG_FUNCTION_ARGS)
80 {
81 /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
82 spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
83
84 cfg->prefixType = TEXTOID;
85 cfg->labelType = INT2OID;
86 cfg->canReturnData = true;
87 cfg->longValuesOK = true; /* suffixing will shorten long values */
88 PG_RETURN_VOID();
89 }
90
91 /*
92 * Form a text datum from the given not-necessarily-null-terminated string,
93 * using short varlena header format if possible
94 */
95 static Datum
formTextDatum(const char * data,int datalen)96 formTextDatum(const char *data, int datalen)
97 {
98 char *p;
99
100 p = (char *) palloc(datalen + VARHDRSZ);
101
102 if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX)
103 {
104 SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT);
105 if (datalen)
106 memcpy(p + VARHDRSZ_SHORT, data, datalen);
107 }
108 else
109 {
110 SET_VARSIZE(p, datalen + VARHDRSZ);
111 memcpy(p + VARHDRSZ, data, datalen);
112 }
113
114 return PointerGetDatum(p);
115 }
116
117 /*
118 * Find the length of the common prefix of a and b
119 */
120 static int
commonPrefix(const char * a,const char * b,int lena,int lenb)121 commonPrefix(const char *a, const char *b, int lena, int lenb)
122 {
123 int i = 0;
124
125 while (i < lena && i < lenb && *a == *b)
126 {
127 a++;
128 b++;
129 i++;
130 }
131
132 return i;
133 }
134
135 /*
136 * Binary search an array of int16 datums for a match to c
137 *
138 * On success, *i gets the match location; on failure, it gets where to insert
139 */
140 static bool
searchChar(Datum * nodeLabels,int nNodes,int16 c,int * i)141 searchChar(Datum *nodeLabels, int nNodes, int16 c, int *i)
142 {
143 int StopLow = 0,
144 StopHigh = nNodes;
145
146 while (StopLow < StopHigh)
147 {
148 int StopMiddle = (StopLow + StopHigh) >> 1;
149 int16 middle = DatumGetInt16(nodeLabels[StopMiddle]);
150
151 if (c < middle)
152 StopHigh = StopMiddle;
153 else if (c > middle)
154 StopLow = StopMiddle + 1;
155 else
156 {
157 *i = StopMiddle;
158 return true;
159 }
160 }
161
162 *i = StopHigh;
163 return false;
164 }
165
166 Datum
spg_text_choose(PG_FUNCTION_ARGS)167 spg_text_choose(PG_FUNCTION_ARGS)
168 {
169 spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
170 spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
171 text *inText = DatumGetTextPP(in->datum);
172 char *inStr = VARDATA_ANY(inText);
173 int inSize = VARSIZE_ANY_EXHDR(inText);
174 char *prefixStr = NULL;
175 int prefixSize = 0;
176 int commonLen = 0;
177 int16 nodeChar = 0;
178 int i = 0;
179
180 /* Check for prefix match, set nodeChar to first byte after prefix */
181 if (in->hasPrefix)
182 {
183 text *prefixText = DatumGetTextPP(in->prefixDatum);
184
185 prefixStr = VARDATA_ANY(prefixText);
186 prefixSize = VARSIZE_ANY_EXHDR(prefixText);
187
188 commonLen = commonPrefix(inStr + in->level,
189 prefixStr,
190 inSize - in->level,
191 prefixSize);
192
193 if (commonLen == prefixSize)
194 {
195 if (inSize - in->level > commonLen)
196 nodeChar = *(unsigned char *) (inStr + in->level + commonLen);
197 else
198 nodeChar = -1;
199 }
200 else
201 {
202 /* Must split tuple because incoming value doesn't match prefix */
203 out->resultType = spgSplitTuple;
204
205 if (commonLen == 0)
206 {
207 out->result.splitTuple.prefixHasPrefix = false;
208 }
209 else
210 {
211 out->result.splitTuple.prefixHasPrefix = true;
212 out->result.splitTuple.prefixPrefixDatum =
213 formTextDatum(prefixStr, commonLen);
214 }
215 out->result.splitTuple.nodeLabel =
216 Int16GetDatum(*(unsigned char *) (prefixStr + commonLen));
217
218 if (prefixSize - commonLen == 1)
219 {
220 out->result.splitTuple.postfixHasPrefix = false;
221 }
222 else
223 {
224 out->result.splitTuple.postfixHasPrefix = true;
225 out->result.splitTuple.postfixPrefixDatum =
226 formTextDatum(prefixStr + commonLen + 1,
227 prefixSize - commonLen - 1);
228 }
229
230 PG_RETURN_VOID();
231 }
232 }
233 else if (inSize > in->level)
234 {
235 nodeChar = *(unsigned char *) (inStr + in->level);
236 }
237 else
238 {
239 nodeChar = -1;
240 }
241
242 /* Look up nodeChar in the node label array */
243 if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i))
244 {
245 /*
246 * Descend to existing node. (If in->allTheSame, the core code will
247 * ignore our nodeN specification here, but that's OK. We still have
248 * to provide the correct levelAdd and restDatum values, and those are
249 * the same regardless of which node gets chosen by core.)
250 */
251 int levelAdd;
252
253 out->resultType = spgMatchNode;
254 out->result.matchNode.nodeN = i;
255 levelAdd = commonLen;
256 if (nodeChar >= 0)
257 levelAdd++;
258 out->result.matchNode.levelAdd = levelAdd;
259 if (inSize - in->level - levelAdd > 0)
260 out->result.matchNode.restDatum =
261 formTextDatum(inStr + in->level + levelAdd,
262 inSize - in->level - levelAdd);
263 else
264 out->result.matchNode.restDatum =
265 formTextDatum(NULL, 0);
266 }
267 else if (in->allTheSame)
268 {
269 /*
270 * Can't use AddNode action, so split the tuple. The upper tuple has
271 * the same prefix as before and uses a dummy node label -2 for the
272 * lower tuple. The lower tuple has no prefix and the same node
273 * labels as the original tuple.
274 *
275 * Note: it might seem tempting to shorten the upper tuple's prefix,
276 * if it has one, then use its last byte as label for the lower tuple.
277 * But that doesn't win since we know the incoming value matches the
278 * whole prefix: we'd just end up splitting the lower tuple again.
279 */
280 out->resultType = spgSplitTuple;
281 out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
282 out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
283 out->result.splitTuple.nodeLabel = Int16GetDatum(-2);
284 out->result.splitTuple.postfixHasPrefix = false;
285 }
286 else
287 {
288 /* Add a node for the not-previously-seen nodeChar value */
289 out->resultType = spgAddNode;
290 out->result.addNode.nodeLabel = Int16GetDatum(nodeChar);
291 out->result.addNode.nodeN = i;
292 }
293
294 PG_RETURN_VOID();
295 }
296
297 /* qsort comparator to sort spgNodePtr structs by "c" */
298 static int
cmpNodePtr(const void * a,const void * b)299 cmpNodePtr(const void *a, const void *b)
300 {
301 const spgNodePtr *aa = (const spgNodePtr *) a;
302 const spgNodePtr *bb = (const spgNodePtr *) b;
303
304 return aa->c - bb->c;
305 }
306
307 Datum
spg_text_picksplit(PG_FUNCTION_ARGS)308 spg_text_picksplit(PG_FUNCTION_ARGS)
309 {
310 spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
311 spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
312 text *text0 = DatumGetTextPP(in->datums[0]);
313 int i,
314 commonLen;
315 spgNodePtr *nodes;
316
317 /* Identify longest common prefix, if any */
318 commonLen = VARSIZE_ANY_EXHDR(text0);
319 for (i = 1; i < in->nTuples && commonLen > 0; i++)
320 {
321 text *texti = DatumGetTextPP(in->datums[i]);
322 int tmp = commonPrefix(VARDATA_ANY(text0),
323 VARDATA_ANY(texti),
324 VARSIZE_ANY_EXHDR(text0),
325 VARSIZE_ANY_EXHDR(texti));
326
327 if (tmp < commonLen)
328 commonLen = tmp;
329 }
330
331 /*
332 * Limit the prefix length, if necessary, to ensure that the resulting
333 * inner tuple will fit on a page.
334 */
335 commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH);
336
337 /* Set node prefix to be that string, if it's not empty */
338 if (commonLen == 0)
339 {
340 out->hasPrefix = false;
341 }
342 else
343 {
344 out->hasPrefix = true;
345 out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen);
346 }
347
348 /* Extract the node label (first non-common byte) from each value */
349 nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples);
350
351 for (i = 0; i < in->nTuples; i++)
352 {
353 text *texti = DatumGetTextPP(in->datums[i]);
354
355 if (commonLen < VARSIZE_ANY_EXHDR(texti))
356 nodes[i].c = *(unsigned char *) (VARDATA_ANY(texti) + commonLen);
357 else
358 nodes[i].c = -1; /* use -1 if string is all common */
359 nodes[i].i = i;
360 nodes[i].d = in->datums[i];
361 }
362
363 /*
364 * Sort by label values so that we can group the values into nodes. This
365 * also ensures that the nodes are ordered by label value, allowing the
366 * use of binary search in searchChar.
367 */
368 qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr);
369
370 /* And emit results */
371 out->nNodes = 0;
372 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples);
373 out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples);
374 out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples);
375
376 for (i = 0; i < in->nTuples; i++)
377 {
378 text *texti = DatumGetTextPP(nodes[i].d);
379 Datum leafD;
380
381 if (i == 0 || nodes[i].c != nodes[i - 1].c)
382 {
383 out->nodeLabels[out->nNodes] = Int16GetDatum(nodes[i].c);
384 out->nNodes++;
385 }
386
387 if (commonLen < VARSIZE_ANY_EXHDR(texti))
388 leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1,
389 VARSIZE_ANY_EXHDR(texti) - commonLen - 1);
390 else
391 leafD = formTextDatum(NULL, 0);
392
393 out->leafTupleDatums[nodes[i].i] = leafD;
394 out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1;
395 }
396
397 PG_RETURN_VOID();
398 }
399
400 Datum
spg_text_inner_consistent(PG_FUNCTION_ARGS)401 spg_text_inner_consistent(PG_FUNCTION_ARGS)
402 {
403 spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
404 spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
405 bool collate_is_c = lc_collate_is_c(PG_GET_COLLATION());
406 text *reconstructedValue;
407 text *reconstrText;
408 int maxReconstrLen;
409 text *prefixText = NULL;
410 int prefixSize = 0;
411 int i;
412
413 /*
414 * Reconstruct values represented at this tuple, including parent data,
415 * prefix of this tuple if any, and the node label if it's non-dummy.
416 * in->level should be the length of the previously reconstructed value,
417 * and the number of bytes added here is prefixSize or prefixSize + 1.
418 *
419 * Note: we assume that in->reconstructedValue isn't toasted and doesn't
420 * have a short varlena header. This is okay because it must have been
421 * created by a previous invocation of this routine, and we always emit
422 * long-format reconstructed values.
423 */
424 reconstructedValue = (text *) DatumGetPointer(in->reconstructedValue);
425 Assert(reconstructedValue == NULL ? in->level == 0 :
426 VARSIZE_ANY_EXHDR(reconstructedValue) == in->level);
427
428 maxReconstrLen = in->level + 1;
429 if (in->hasPrefix)
430 {
431 prefixText = DatumGetTextPP(in->prefixDatum);
432 prefixSize = VARSIZE_ANY_EXHDR(prefixText);
433 maxReconstrLen += prefixSize;
434 }
435
436 reconstrText = palloc(VARHDRSZ + maxReconstrLen);
437 SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen);
438
439 if (in->level)
440 memcpy(VARDATA(reconstrText),
441 VARDATA(reconstructedValue),
442 in->level);
443 if (prefixSize)
444 memcpy(((char *) VARDATA(reconstrText)) + in->level,
445 VARDATA_ANY(prefixText),
446 prefixSize);
447 /* last byte of reconstrText will be filled in below */
448
449 /*
450 * Scan the child nodes. For each one, complete the reconstructed value
451 * and see if it's consistent with the query. If so, emit an entry into
452 * the output arrays.
453 */
454 out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
455 out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes);
456 out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
457 out->nNodes = 0;
458
459 for (i = 0; i < in->nNodes; i++)
460 {
461 int16 nodeChar = DatumGetInt16(in->nodeLabels[i]);
462 int thisLen;
463 bool res = true;
464 int j;
465
466 /* If nodeChar is a dummy value, don't include it in data */
467 if (nodeChar <= 0)
468 thisLen = maxReconstrLen - 1;
469 else
470 {
471 ((unsigned char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar;
472 thisLen = maxReconstrLen;
473 }
474
475 for (j = 0; j < in->nkeys; j++)
476 {
477 StrategyNumber strategy = in->scankeys[j].sk_strategy;
478 text *inText;
479 int inSize;
480 int r;
481
482 /*
483 * If it's a collation-aware operator, but the collation is C, we
484 * can treat it as non-collation-aware. With non-C collation we
485 * need to traverse whole tree :-( so there's no point in making
486 * any check here. (Note also that our reconstructed value may
487 * well end with a partial multibyte character, so that applying
488 * any encoding-sensitive test to it would be risky anyhow.)
489 */
490 if (strategy > 10)
491 {
492 if (collate_is_c)
493 strategy -= 10;
494 else
495 continue;
496 }
497
498 inText = DatumGetTextPP(in->scankeys[j].sk_argument);
499 inSize = VARSIZE_ANY_EXHDR(inText);
500
501 r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
502 Min(inSize, thisLen));
503
504 switch (strategy)
505 {
506 case BTLessStrategyNumber:
507 case BTLessEqualStrategyNumber:
508 if (r > 0)
509 res = false;
510 break;
511 case BTEqualStrategyNumber:
512 if (r != 0 || inSize < thisLen)
513 res = false;
514 break;
515 case BTGreaterEqualStrategyNumber:
516 case BTGreaterStrategyNumber:
517 if (r < 0)
518 res = false;
519 break;
520 default:
521 elog(ERROR, "unrecognized strategy number: %d",
522 in->scankeys[j].sk_strategy);
523 break;
524 }
525
526 if (!res)
527 break; /* no need to consider remaining conditions */
528 }
529
530 if (res)
531 {
532 out->nodeNumbers[out->nNodes] = i;
533 out->levelAdds[out->nNodes] = thisLen - in->level;
534 SET_VARSIZE(reconstrText, VARHDRSZ + thisLen);
535 out->reconstructedValues[out->nNodes] =
536 datumCopy(PointerGetDatum(reconstrText), false, -1);
537 out->nNodes++;
538 }
539 }
540
541 PG_RETURN_VOID();
542 }
543
544 Datum
spg_text_leaf_consistent(PG_FUNCTION_ARGS)545 spg_text_leaf_consistent(PG_FUNCTION_ARGS)
546 {
547 spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
548 spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
549 int level = in->level;
550 text *leafValue,
551 *reconstrValue = NULL;
552 char *fullValue;
553 int fullLen;
554 bool res;
555 int j;
556
557 /* all tests are exact */
558 out->recheck = false;
559
560 leafValue = DatumGetTextPP(in->leafDatum);
561
562 if (DatumGetPointer(in->reconstructedValue))
563 reconstrValue = DatumGetTextP(in->reconstructedValue);
564
565 Assert(reconstrValue == NULL ? level == 0 :
566 VARSIZE_ANY_EXHDR(reconstrValue) == level);
567
568 /* Reconstruct the full string represented by this leaf tuple */
569 fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
570 if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
571 {
572 fullValue = VARDATA(reconstrValue);
573 out->leafValue = PointerGetDatum(reconstrValue);
574 }
575 else
576 {
577 text *fullText = palloc(VARHDRSZ + fullLen);
578
579 SET_VARSIZE(fullText, VARHDRSZ + fullLen);
580 fullValue = VARDATA(fullText);
581 if (level)
582 memcpy(fullValue, VARDATA(reconstrValue), level);
583 if (VARSIZE_ANY_EXHDR(leafValue) > 0)
584 memcpy(fullValue + level, VARDATA_ANY(leafValue),
585 VARSIZE_ANY_EXHDR(leafValue));
586 out->leafValue = PointerGetDatum(fullText);
587 }
588
589 /* Perform the required comparison(s) */
590 res = true;
591 for (j = 0; j < in->nkeys; j++)
592 {
593 StrategyNumber strategy = in->scankeys[j].sk_strategy;
594 text *query = DatumGetTextPP(in->scankeys[j].sk_argument);
595 int queryLen = VARSIZE_ANY_EXHDR(query);
596 int r;
597
598 if (strategy > 10)
599 {
600 /* Collation-aware comparison */
601 strategy -= 10;
602
603 /* If asserts enabled, verify encoding of reconstructed string */
604 Assert(pg_verifymbstr(fullValue, fullLen, false));
605
606 r = varstr_cmp(fullValue, fullLen,
607 VARDATA_ANY(query), queryLen,
608 PG_GET_COLLATION());
609 }
610 else
611 {
612 /* Non-collation-aware comparison */
613 r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
614
615 if (r == 0)
616 {
617 if (queryLen > fullLen)
618 r = -1;
619 else if (queryLen < fullLen)
620 r = 1;
621 }
622 }
623
624 switch (strategy)
625 {
626 case BTLessStrategyNumber:
627 res = (r < 0);
628 break;
629 case BTLessEqualStrategyNumber:
630 res = (r <= 0);
631 break;
632 case BTEqualStrategyNumber:
633 res = (r == 0);
634 break;
635 case BTGreaterEqualStrategyNumber:
636 res = (r >= 0);
637 break;
638 case BTGreaterStrategyNumber:
639 res = (r > 0);
640 break;
641 default:
642 elog(ERROR, "unrecognized strategy number: %d",
643 in->scankeys[j].sk_strategy);
644 res = false;
645 break;
646 }
647
648 if (!res)
649 break; /* no need to consider remaining conditions */
650 }
651
652 PG_RETURN_BOOL(res);
653 }
654