1 /*-------------------------------------------------------------------------
2  *
3  * spgtextproc.c
4  *	  implementation of radix tree (compressed trie) over text
5  *
6  * In a text_ops SPGiST index, inner tuples can have a prefix which is the
7  * common prefix of all strings indexed under that tuple.  The node labels
8  * represent the next byte of the string(s) after the prefix.  Assuming we
9  * always use the longest possible prefix, we will get more than one node
10  * label unless the prefix length is restricted by SPGIST_MAX_PREFIX_LENGTH.
11  *
12  * To reconstruct the indexed string for any index entry, concatenate the
13  * inner-tuple prefixes and node labels starting at the root and working
14  * down to the leaf entry, then append the datum in the leaf entry.
15  * (While descending the tree, "level" is the number of bytes reconstructed
16  * so far.)
17  *
18  * However, there are two special cases for node labels: -1 indicates that
19  * there are no more bytes after the prefix-so-far, and -2 indicates that we
20  * had to split an existing allTheSame tuple (in such a case we have to create
21  * a node label that doesn't correspond to any string byte).  In either case,
22  * the node label does not contribute anything to the reconstructed string.
23  *
24  * Previously, we used a node label of zero for both special cases, but
25  * this was problematic because one can't tell whether a string ending at
26  * the current level can be pushed down into such a child node.  For
27  * backwards compatibility, we still support such node labels for reading;
28  * but no new entries will ever be pushed down into a zero-labeled child.
29  * No new entries ever get pushed into a -2-labeled child, either.
30  *
31  *
32  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
33  * Portions Copyright (c) 1994, Regents of the University of California
34  *
35  * IDENTIFICATION
36  *			src/backend/access/spgist/spgtextproc.c
37  *
38  *-------------------------------------------------------------------------
39  */
40 #include "postgres.h"
41 
42 #include "access/spgist.h"
43 #include "catalog/pg_type.h"
44 #include "mb/pg_wchar.h"
45 #include "utils/builtins.h"
46 #include "utils/datum.h"
47 #include "utils/pg_locale.h"
48 #include "utils/varlena.h"
49 
50 
51 /*
52  * In the worst case, an inner tuple in a text radix tree could have as many
53  * as 258 nodes (one for each possible byte value, plus the two special
54  * cases).  Each node can take 16 bytes on MAXALIGN=8 machines.  The inner
55  * tuple must fit on an index page of size BLCKSZ.  Rather than assuming we
56  * know the exact amount of overhead imposed by page headers, tuple headers,
57  * etc, we leave 100 bytes for that (the actual overhead should be no more
58  * than 56 bytes at this writing, so there is slop in this number).
59  * So we can safely create prefixes up to BLCKSZ - 258 * 16 - 100 bytes long.
60  * Unfortunately, because 258 * 16 is over 4K, there is no safe prefix length
61  * when BLCKSZ is less than 8K; it is always possible to get "SPGiST inner
62  * tuple size exceeds maximum" if there are too many distinct next-byte values
63  * at a given place in the tree.  Since use of nonstandard block sizes appears
64  * to be negligible in the field, we just live with that fact for now,
65  * choosing a max prefix size of 32 bytes when BLCKSZ is configured smaller
66  * than default.
67  */
68 #define SPGIST_MAX_PREFIX_LENGTH	Max((int) (BLCKSZ - 258 * 16 - 100), 32)
69 
70 /* Struct for sorting values in picksplit */
71 typedef struct spgNodePtr
72 {
73 	Datum		d;
74 	int			i;
75 	int16		c;
76 } spgNodePtr;
77 
78 
79 Datum
spg_text_config(PG_FUNCTION_ARGS)80 spg_text_config(PG_FUNCTION_ARGS)
81 {
82 	/* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
83 	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
84 
85 	cfg->prefixType = TEXTOID;
86 	cfg->labelType = INT2OID;
87 	cfg->canReturnData = true;
88 	cfg->longValuesOK = true;	/* suffixing will shorten long values */
89 	PG_RETURN_VOID();
90 }
91 
92 /*
93  * Form a text datum from the given not-necessarily-null-terminated string,
94  * using short varlena header format if possible
95  */
96 static Datum
formTextDatum(const char * data,int datalen)97 formTextDatum(const char *data, int datalen)
98 {
99 	char	   *p;
100 
101 	p = (char *) palloc(datalen + VARHDRSZ);
102 
103 	if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX)
104 	{
105 		SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT);
106 		if (datalen)
107 			memcpy(p + VARHDRSZ_SHORT, data, datalen);
108 	}
109 	else
110 	{
111 		SET_VARSIZE(p, datalen + VARHDRSZ);
112 		memcpy(p + VARHDRSZ, data, datalen);
113 	}
114 
115 	return PointerGetDatum(p);
116 }
117 
118 /*
119  * Find the length of the common prefix of a and b
120  */
121 static int
commonPrefix(const char * a,const char * b,int lena,int lenb)122 commonPrefix(const char *a, const char *b, int lena, int lenb)
123 {
124 	int			i = 0;
125 
126 	while (i < lena && i < lenb && *a == *b)
127 	{
128 		a++;
129 		b++;
130 		i++;
131 	}
132 
133 	return i;
134 }
135 
136 /*
137  * Binary search an array of int16 datums for a match to c
138  *
139  * On success, *i gets the match location; on failure, it gets where to insert
140  */
141 static bool
searchChar(Datum * nodeLabels,int nNodes,int16 c,int * i)142 searchChar(Datum *nodeLabels, int nNodes, int16 c, int *i)
143 {
144 	int			StopLow = 0,
145 				StopHigh = nNodes;
146 
147 	while (StopLow < StopHigh)
148 	{
149 		int			StopMiddle = (StopLow + StopHigh) >> 1;
150 		int16		middle = DatumGetInt16(nodeLabels[StopMiddle]);
151 
152 		if (c < middle)
153 			StopHigh = StopMiddle;
154 		else if (c > middle)
155 			StopLow = StopMiddle + 1;
156 		else
157 		{
158 			*i = StopMiddle;
159 			return true;
160 		}
161 	}
162 
163 	*i = StopHigh;
164 	return false;
165 }
166 
167 Datum
spg_text_choose(PG_FUNCTION_ARGS)168 spg_text_choose(PG_FUNCTION_ARGS)
169 {
170 	spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
171 	spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
172 	text	   *inText = DatumGetTextPP(in->datum);
173 	char	   *inStr = VARDATA_ANY(inText);
174 	int			inSize = VARSIZE_ANY_EXHDR(inText);
175 	char	   *prefixStr = NULL;
176 	int			prefixSize = 0;
177 	int			commonLen = 0;
178 	int16		nodeChar = 0;
179 	int			i = 0;
180 
181 	/* Check for prefix match, set nodeChar to first byte after prefix */
182 	if (in->hasPrefix)
183 	{
184 		text	   *prefixText = DatumGetTextPP(in->prefixDatum);
185 
186 		prefixStr = VARDATA_ANY(prefixText);
187 		prefixSize = VARSIZE_ANY_EXHDR(prefixText);
188 
189 		commonLen = commonPrefix(inStr + in->level,
190 								 prefixStr,
191 								 inSize - in->level,
192 								 prefixSize);
193 
194 		if (commonLen == prefixSize)
195 		{
196 			if (inSize - in->level > commonLen)
197 				nodeChar = *(unsigned char *) (inStr + in->level + commonLen);
198 			else
199 				nodeChar = -1;
200 		}
201 		else
202 		{
203 			/* Must split tuple because incoming value doesn't match prefix */
204 			out->resultType = spgSplitTuple;
205 
206 			if (commonLen == 0)
207 			{
208 				out->result.splitTuple.prefixHasPrefix = false;
209 			}
210 			else
211 			{
212 				out->result.splitTuple.prefixHasPrefix = true;
213 				out->result.splitTuple.prefixPrefixDatum =
214 					formTextDatum(prefixStr, commonLen);
215 			}
216 			out->result.splitTuple.prefixNNodes = 1;
217 			out->result.splitTuple.prefixNodeLabels =
218 				(Datum *) palloc(sizeof(Datum));
219 			out->result.splitTuple.prefixNodeLabels[0] =
220 				Int16GetDatum(*(unsigned char *) (prefixStr + commonLen));
221 
222 			out->result.splitTuple.childNodeN = 0;
223 
224 			if (prefixSize - commonLen == 1)
225 			{
226 				out->result.splitTuple.postfixHasPrefix = false;
227 			}
228 			else
229 			{
230 				out->result.splitTuple.postfixHasPrefix = true;
231 				out->result.splitTuple.postfixPrefixDatum =
232 					formTextDatum(prefixStr + commonLen + 1,
233 								  prefixSize - commonLen - 1);
234 			}
235 
236 			PG_RETURN_VOID();
237 		}
238 	}
239 	else if (inSize > in->level)
240 	{
241 		nodeChar = *(unsigned char *) (inStr + in->level);
242 	}
243 	else
244 	{
245 		nodeChar = -1;
246 	}
247 
248 	/* Look up nodeChar in the node label array */
249 	if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i))
250 	{
251 		/*
252 		 * Descend to existing node.  (If in->allTheSame, the core code will
253 		 * ignore our nodeN specification here, but that's OK.  We still have
254 		 * to provide the correct levelAdd and restDatum values, and those are
255 		 * the same regardless of which node gets chosen by core.)
256 		 */
257 		int			levelAdd;
258 
259 		out->resultType = spgMatchNode;
260 		out->result.matchNode.nodeN = i;
261 		levelAdd = commonLen;
262 		if (nodeChar >= 0)
263 			levelAdd++;
264 		out->result.matchNode.levelAdd = levelAdd;
265 		if (inSize - in->level - levelAdd > 0)
266 			out->result.matchNode.restDatum =
267 				formTextDatum(inStr + in->level + levelAdd,
268 							  inSize - in->level - levelAdd);
269 		else
270 			out->result.matchNode.restDatum =
271 				formTextDatum(NULL, 0);
272 	}
273 	else if (in->allTheSame)
274 	{
275 		/*
276 		 * Can't use AddNode action, so split the tuple.  The upper tuple has
277 		 * the same prefix as before and uses a dummy node label -2 for the
278 		 * lower tuple.  The lower tuple has no prefix and the same node
279 		 * labels as the original tuple.
280 		 *
281 		 * Note: it might seem tempting to shorten the upper tuple's prefix,
282 		 * if it has one, then use its last byte as label for the lower tuple.
283 		 * But that doesn't win since we know the incoming value matches the
284 		 * whole prefix: we'd just end up splitting the lower tuple again.
285 		 */
286 		out->resultType = spgSplitTuple;
287 		out->result.splitTuple.prefixHasPrefix = in->hasPrefix;
288 		out->result.splitTuple.prefixPrefixDatum = in->prefixDatum;
289 		out->result.splitTuple.prefixNNodes = 1;
290 		out->result.splitTuple.prefixNodeLabels = (Datum *) palloc(sizeof(Datum));
291 		out->result.splitTuple.prefixNodeLabels[0] = Int16GetDatum(-2);
292 		out->result.splitTuple.childNodeN = 0;
293 		out->result.splitTuple.postfixHasPrefix = false;
294 	}
295 	else
296 	{
297 		/* Add a node for the not-previously-seen nodeChar value */
298 		out->resultType = spgAddNode;
299 		out->result.addNode.nodeLabel = Int16GetDatum(nodeChar);
300 		out->result.addNode.nodeN = i;
301 	}
302 
303 	PG_RETURN_VOID();
304 }
305 
306 /* qsort comparator to sort spgNodePtr structs by "c" */
307 static int
cmpNodePtr(const void * a,const void * b)308 cmpNodePtr(const void *a, const void *b)
309 {
310 	const spgNodePtr *aa = (const spgNodePtr *) a;
311 	const spgNodePtr *bb = (const spgNodePtr *) b;
312 
313 	return aa->c - bb->c;
314 }
315 
316 Datum
spg_text_picksplit(PG_FUNCTION_ARGS)317 spg_text_picksplit(PG_FUNCTION_ARGS)
318 {
319 	spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
320 	spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
321 	text	   *text0 = DatumGetTextPP(in->datums[0]);
322 	int			i,
323 				commonLen;
324 	spgNodePtr *nodes;
325 
326 	/* Identify longest common prefix, if any */
327 	commonLen = VARSIZE_ANY_EXHDR(text0);
328 	for (i = 1; i < in->nTuples && commonLen > 0; i++)
329 	{
330 		text	   *texti = DatumGetTextPP(in->datums[i]);
331 		int			tmp = commonPrefix(VARDATA_ANY(text0),
332 									   VARDATA_ANY(texti),
333 									   VARSIZE_ANY_EXHDR(text0),
334 									   VARSIZE_ANY_EXHDR(texti));
335 
336 		if (tmp < commonLen)
337 			commonLen = tmp;
338 	}
339 
340 	/*
341 	 * Limit the prefix length, if necessary, to ensure that the resulting
342 	 * inner tuple will fit on a page.
343 	 */
344 	commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH);
345 
346 	/* Set node prefix to be that string, if it's not empty */
347 	if (commonLen == 0)
348 	{
349 		out->hasPrefix = false;
350 	}
351 	else
352 	{
353 		out->hasPrefix = true;
354 		out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen);
355 	}
356 
357 	/* Extract the node label (first non-common byte) from each value */
358 	nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples);
359 
360 	for (i = 0; i < in->nTuples; i++)
361 	{
362 		text	   *texti = DatumGetTextPP(in->datums[i]);
363 
364 		if (commonLen < VARSIZE_ANY_EXHDR(texti))
365 			nodes[i].c = *(unsigned char *) (VARDATA_ANY(texti) + commonLen);
366 		else
367 			nodes[i].c = -1;	/* use -1 if string is all common */
368 		nodes[i].i = i;
369 		nodes[i].d = in->datums[i];
370 	}
371 
372 	/*
373 	 * Sort by label values so that we can group the values into nodes.  This
374 	 * also ensures that the nodes are ordered by label value, allowing the
375 	 * use of binary search in searchChar.
376 	 */
377 	qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr);
378 
379 	/* And emit results */
380 	out->nNodes = 0;
381 	out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples);
382 	out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples);
383 	out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples);
384 
385 	for (i = 0; i < in->nTuples; i++)
386 	{
387 		text	   *texti = DatumGetTextPP(nodes[i].d);
388 		Datum		leafD;
389 
390 		if (i == 0 || nodes[i].c != nodes[i - 1].c)
391 		{
392 			out->nodeLabels[out->nNodes] = Int16GetDatum(nodes[i].c);
393 			out->nNodes++;
394 		}
395 
396 		if (commonLen < VARSIZE_ANY_EXHDR(texti))
397 			leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1,
398 								  VARSIZE_ANY_EXHDR(texti) - commonLen - 1);
399 		else
400 			leafD = formTextDatum(NULL, 0);
401 
402 		out->leafTupleDatums[nodes[i].i] = leafD;
403 		out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1;
404 	}
405 
406 	PG_RETURN_VOID();
407 }
408 
409 Datum
spg_text_inner_consistent(PG_FUNCTION_ARGS)410 spg_text_inner_consistent(PG_FUNCTION_ARGS)
411 {
412 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
413 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
414 	bool		collate_is_c = lc_collate_is_c(PG_GET_COLLATION());
415 	text	   *reconstructedValue;
416 	text	   *reconstrText;
417 	int			maxReconstrLen;
418 	text	   *prefixText = NULL;
419 	int			prefixSize = 0;
420 	int			i;
421 
422 	/*
423 	 * Reconstruct values represented at this tuple, including parent data,
424 	 * prefix of this tuple if any, and the node label if it's non-dummy.
425 	 * in->level should be the length of the previously reconstructed value,
426 	 * and the number of bytes added here is prefixSize or prefixSize + 1.
427 	 *
428 	 * Note: we assume that in->reconstructedValue isn't toasted and doesn't
429 	 * have a short varlena header.  This is okay because it must have been
430 	 * created by a previous invocation of this routine, and we always emit
431 	 * long-format reconstructed values.
432 	 */
433 	reconstructedValue = (text *) DatumGetPointer(in->reconstructedValue);
434 	Assert(reconstructedValue == NULL ? in->level == 0 :
435 		   VARSIZE_ANY_EXHDR(reconstructedValue) == in->level);
436 
437 	maxReconstrLen = in->level + 1;
438 	if (in->hasPrefix)
439 	{
440 		prefixText = DatumGetTextPP(in->prefixDatum);
441 		prefixSize = VARSIZE_ANY_EXHDR(prefixText);
442 		maxReconstrLen += prefixSize;
443 	}
444 
445 	reconstrText = palloc(VARHDRSZ + maxReconstrLen);
446 	SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen);
447 
448 	if (in->level)
449 		memcpy(VARDATA(reconstrText),
450 			   VARDATA(reconstructedValue),
451 			   in->level);
452 	if (prefixSize)
453 		memcpy(((char *) VARDATA(reconstrText)) + in->level,
454 			   VARDATA_ANY(prefixText),
455 			   prefixSize);
456 	/* last byte of reconstrText will be filled in below */
457 
458 	/*
459 	 * Scan the child nodes.  For each one, complete the reconstructed value
460 	 * and see if it's consistent with the query.  If so, emit an entry into
461 	 * the output arrays.
462 	 */
463 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
464 	out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes);
465 	out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
466 	out->nNodes = 0;
467 
468 	for (i = 0; i < in->nNodes; i++)
469 	{
470 		int16		nodeChar = DatumGetInt16(in->nodeLabels[i]);
471 		int			thisLen;
472 		bool		res = true;
473 		int			j;
474 
475 		/* If nodeChar is a dummy value, don't include it in data */
476 		if (nodeChar <= 0)
477 			thisLen = maxReconstrLen - 1;
478 		else
479 		{
480 			((unsigned char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar;
481 			thisLen = maxReconstrLen;
482 		}
483 
484 		for (j = 0; j < in->nkeys; j++)
485 		{
486 			StrategyNumber strategy = in->scankeys[j].sk_strategy;
487 			text	   *inText;
488 			int			inSize;
489 			int			r;
490 
491 			/*
492 			 * If it's a collation-aware operator, but the collation is C, we
493 			 * can treat it as non-collation-aware.  With non-C collation we
494 			 * need to traverse whole tree :-( so there's no point in making
495 			 * any check here.  (Note also that our reconstructed value may
496 			 * well end with a partial multibyte character, so that applying
497 			 * any encoding-sensitive test to it would be risky anyhow.)
498 			 */
499 			if (strategy > 10)
500 			{
501 				if (collate_is_c)
502 					strategy -= 10;
503 				else
504 					continue;
505 			}
506 
507 			inText = DatumGetTextPP(in->scankeys[j].sk_argument);
508 			inSize = VARSIZE_ANY_EXHDR(inText);
509 
510 			r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText),
511 					   Min(inSize, thisLen));
512 
513 			switch (strategy)
514 			{
515 				case BTLessStrategyNumber:
516 				case BTLessEqualStrategyNumber:
517 					if (r > 0)
518 						res = false;
519 					break;
520 				case BTEqualStrategyNumber:
521 					if (r != 0 || inSize < thisLen)
522 						res = false;
523 					break;
524 				case BTGreaterEqualStrategyNumber:
525 				case BTGreaterStrategyNumber:
526 					if (r < 0)
527 						res = false;
528 					break;
529 				default:
530 					elog(ERROR, "unrecognized strategy number: %d",
531 						 in->scankeys[j].sk_strategy);
532 					break;
533 			}
534 
535 			if (!res)
536 				break;			/* no need to consider remaining conditions */
537 		}
538 
539 		if (res)
540 		{
541 			out->nodeNumbers[out->nNodes] = i;
542 			out->levelAdds[out->nNodes] = thisLen - in->level;
543 			SET_VARSIZE(reconstrText, VARHDRSZ + thisLen);
544 			out->reconstructedValues[out->nNodes] =
545 				datumCopy(PointerGetDatum(reconstrText), false, -1);
546 			out->nNodes++;
547 		}
548 	}
549 
550 	PG_RETURN_VOID();
551 }
552 
553 Datum
spg_text_leaf_consistent(PG_FUNCTION_ARGS)554 spg_text_leaf_consistent(PG_FUNCTION_ARGS)
555 {
556 	spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
557 	spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
558 	int			level = in->level;
559 	text	   *leafValue,
560 			   *reconstrValue = NULL;
561 	char	   *fullValue;
562 	int			fullLen;
563 	bool		res;
564 	int			j;
565 
566 	/* all tests are exact */
567 	out->recheck = false;
568 
569 	leafValue = DatumGetTextPP(in->leafDatum);
570 
571 	/* As above, in->reconstructedValue isn't toasted or short. */
572 	if (DatumGetPointer(in->reconstructedValue))
573 		reconstrValue = (text *) DatumGetPointer(in->reconstructedValue);
574 
575 	Assert(reconstrValue == NULL ? level == 0 :
576 		   VARSIZE_ANY_EXHDR(reconstrValue) == level);
577 
578 	/* Reconstruct the full string represented by this leaf tuple */
579 	fullLen = level + VARSIZE_ANY_EXHDR(leafValue);
580 	if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0)
581 	{
582 		fullValue = VARDATA(reconstrValue);
583 		out->leafValue = PointerGetDatum(reconstrValue);
584 	}
585 	else
586 	{
587 		text	   *fullText = palloc(VARHDRSZ + fullLen);
588 
589 		SET_VARSIZE(fullText, VARHDRSZ + fullLen);
590 		fullValue = VARDATA(fullText);
591 		if (level)
592 			memcpy(fullValue, VARDATA(reconstrValue), level);
593 		if (VARSIZE_ANY_EXHDR(leafValue) > 0)
594 			memcpy(fullValue + level, VARDATA_ANY(leafValue),
595 				   VARSIZE_ANY_EXHDR(leafValue));
596 		out->leafValue = PointerGetDatum(fullText);
597 	}
598 
599 	/* Perform the required comparison(s) */
600 	res = true;
601 	for (j = 0; j < in->nkeys; j++)
602 	{
603 		StrategyNumber strategy = in->scankeys[j].sk_strategy;
604 		text	   *query = DatumGetTextPP(in->scankeys[j].sk_argument);
605 		int			queryLen = VARSIZE_ANY_EXHDR(query);
606 		int			r;
607 
608 		if (strategy > 10)
609 		{
610 			/* Collation-aware comparison */
611 			strategy -= 10;
612 
613 			/* If asserts enabled, verify encoding of reconstructed string */
614 			Assert(pg_verifymbstr(fullValue, fullLen, false));
615 
616 			r = varstr_cmp(fullValue, fullLen,
617 						   VARDATA_ANY(query), queryLen,
618 						   PG_GET_COLLATION());
619 		}
620 		else
621 		{
622 			/* Non-collation-aware comparison */
623 			r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen));
624 
625 			if (r == 0)
626 			{
627 				if (queryLen > fullLen)
628 					r = -1;
629 				else if (queryLen < fullLen)
630 					r = 1;
631 			}
632 		}
633 
634 		switch (strategy)
635 		{
636 			case BTLessStrategyNumber:
637 				res = (r < 0);
638 				break;
639 			case BTLessEqualStrategyNumber:
640 				res = (r <= 0);
641 				break;
642 			case BTEqualStrategyNumber:
643 				res = (r == 0);
644 				break;
645 			case BTGreaterEqualStrategyNumber:
646 				res = (r >= 0);
647 				break;
648 			case BTGreaterStrategyNumber:
649 				res = (r > 0);
650 				break;
651 			default:
652 				elog(ERROR, "unrecognized strategy number: %d",
653 					 in->scankeys[j].sk_strategy);
654 				res = false;
655 				break;
656 		}
657 
658 		if (!res)
659 			break;				/* no need to consider remaining conditions */
660 	}
661 
662 	PG_RETURN_BOOL(res);
663 }
664