1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *	  I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 
19 #include "access/htup_details.h"
20 #include "access/tuptoaster.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "miscadmin.h"
25 #include "utils/builtins.h"
26 #include "utils/lsyscache.h"
27 #include "utils/typcache.h"
28 
29 
30 /*
31  * structure to cache metadata needed for record I/O
32  */
33 typedef struct ColumnIOData
34 {
35 	Oid			column_type;
36 	Oid			typiofunc;
37 	Oid			typioparam;
38 	bool		typisvarlena;
39 	FmgrInfo	proc;
40 } ColumnIOData;
41 
42 typedef struct RecordIOData
43 {
44 	Oid			record_type;
45 	int32		record_typmod;
46 	int			ncolumns;
47 	ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
48 } RecordIOData;
49 
50 /*
51  * structure to cache metadata needed for record comparison
52  */
53 typedef struct ColumnCompareData
54 {
55 	TypeCacheEntry *typentry;	/* has everything we need, actually */
56 } ColumnCompareData;
57 
58 typedef struct RecordCompareData
59 {
60 	int			ncolumns;		/* allocated length of columns[] */
61 	Oid			record1_type;
62 	int32		record1_typmod;
63 	Oid			record2_type;
64 	int32		record2_typmod;
65 	ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
66 } RecordCompareData;
67 
68 
69 /*
70  * record_in		- input routine for any composite type.
71  */
72 Datum
record_in(PG_FUNCTION_ARGS)73 record_in(PG_FUNCTION_ARGS)
74 {
75 	char	   *string = PG_GETARG_CSTRING(0);
76 	Oid			tupType = PG_GETARG_OID(1);
77 	int32		tupTypmod = PG_GETARG_INT32(2);
78 	HeapTupleHeader result;
79 	TupleDesc	tupdesc;
80 	HeapTuple	tuple;
81 	RecordIOData *my_extra;
82 	bool		needComma = false;
83 	int			ncolumns;
84 	int			i;
85 	char	   *ptr;
86 	Datum	   *values;
87 	bool	   *nulls;
88 	StringInfoData buf;
89 
90 	check_stack_depth();		/* recurses for record-type columns */
91 
92 	/*
93 	 * Give a friendly error message if we did not get enough info to identify
94 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
95 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
96 	 * for typmod, since composite types and RECORD have no type modifiers at
97 	 * the SQL level, and thus must fail for RECORD.  However some callers can
98 	 * supply a valid typmod, and then we can do something useful for RECORD.
99 	 */
100 	if (tupType == RECORDOID && tupTypmod < 0)
101 		ereport(ERROR,
102 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
103 				 errmsg("input of anonymous composite types is not implemented")));
104 
105 	/*
106 	 * This comes from the composite type's pg_type.oid and stores system oids
107 	 * in user tables, specifically DatumTupleFields. This oid must be
108 	 * preserved by binary upgrades.
109 	 */
110 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
111 	ncolumns = tupdesc->natts;
112 
113 	/*
114 	 * We arrange to look up the needed I/O info just once per series of
115 	 * calls, assuming the record type doesn't change underneath us.
116 	 */
117 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
118 	if (my_extra == NULL ||
119 		my_extra->ncolumns != ncolumns)
120 	{
121 		fcinfo->flinfo->fn_extra =
122 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
123 							   offsetof(RecordIOData, columns) +
124 							   ncolumns * sizeof(ColumnIOData));
125 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
126 		my_extra->record_type = InvalidOid;
127 		my_extra->record_typmod = 0;
128 	}
129 
130 	if (my_extra->record_type != tupType ||
131 		my_extra->record_typmod != tupTypmod)
132 	{
133 		MemSet(my_extra, 0,
134 			   offsetof(RecordIOData, columns) +
135 			   ncolumns * sizeof(ColumnIOData));
136 		my_extra->record_type = tupType;
137 		my_extra->record_typmod = tupTypmod;
138 		my_extra->ncolumns = ncolumns;
139 	}
140 
141 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
142 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
143 
144 	/*
145 	 * Scan the string.  We use "buf" to accumulate the de-quoted data for
146 	 * each column, which is then fed to the appropriate input converter.
147 	 */
148 	ptr = string;
149 	/* Allow leading whitespace */
150 	while (*ptr && isspace((unsigned char) *ptr))
151 		ptr++;
152 	if (*ptr++ != '(')
153 		ereport(ERROR,
154 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
155 				 errmsg("malformed record literal: \"%s\"", string),
156 				 errdetail("Missing left parenthesis.")));
157 
158 	initStringInfo(&buf);
159 
160 	for (i = 0; i < ncolumns; i++)
161 	{
162 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
163 		ColumnIOData *column_info = &my_extra->columns[i];
164 		Oid			column_type = att->atttypid;
165 		char	   *column_data;
166 
167 		/* Ignore dropped columns in datatype, but fill with nulls */
168 		if (att->attisdropped)
169 		{
170 			values[i] = (Datum) 0;
171 			nulls[i] = true;
172 			continue;
173 		}
174 
175 		if (needComma)
176 		{
177 			/* Skip comma that separates prior field from this one */
178 			if (*ptr == ',')
179 				ptr++;
180 			else
181 				/* *ptr must be ')' */
182 				ereport(ERROR,
183 						(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
184 						 errmsg("malformed record literal: \"%s\"", string),
185 						 errdetail("Too few columns.")));
186 		}
187 
188 		/* Check for null: completely empty input means null */
189 		if (*ptr == ',' || *ptr == ')')
190 		{
191 			column_data = NULL;
192 			nulls[i] = true;
193 		}
194 		else
195 		{
196 			/* Extract string for this column */
197 			bool		inquote = false;
198 
199 			resetStringInfo(&buf);
200 			while (inquote || !(*ptr == ',' || *ptr == ')'))
201 			{
202 				char		ch = *ptr++;
203 
204 				if (ch == '\0')
205 					ereport(ERROR,
206 							(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
207 							 errmsg("malformed record literal: \"%s\"",
208 									string),
209 							 errdetail("Unexpected end of input.")));
210 				if (ch == '\\')
211 				{
212 					if (*ptr == '\0')
213 						ereport(ERROR,
214 								(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
215 								 errmsg("malformed record literal: \"%s\"",
216 										string),
217 								 errdetail("Unexpected end of input.")));
218 					appendStringInfoChar(&buf, *ptr++);
219 				}
220 				else if (ch == '"')
221 				{
222 					if (!inquote)
223 						inquote = true;
224 					else if (*ptr == '"')
225 					{
226 						/* doubled quote within quote sequence */
227 						appendStringInfoChar(&buf, *ptr++);
228 					}
229 					else
230 						inquote = false;
231 				}
232 				else
233 					appendStringInfoChar(&buf, ch);
234 			}
235 
236 			column_data = buf.data;
237 			nulls[i] = false;
238 		}
239 
240 		/*
241 		 * Convert the column value
242 		 */
243 		if (column_info->column_type != column_type)
244 		{
245 			getTypeInputInfo(column_type,
246 							 &column_info->typiofunc,
247 							 &column_info->typioparam);
248 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
249 						  fcinfo->flinfo->fn_mcxt);
250 			column_info->column_type = column_type;
251 		}
252 
253 		values[i] = InputFunctionCall(&column_info->proc,
254 									  column_data,
255 									  column_info->typioparam,
256 									  att->atttypmod);
257 
258 		/*
259 		 * Prep for next column
260 		 */
261 		needComma = true;
262 	}
263 
264 	if (*ptr++ != ')')
265 		ereport(ERROR,
266 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
267 				 errmsg("malformed record literal: \"%s\"", string),
268 				 errdetail("Too many columns.")));
269 	/* Allow trailing whitespace */
270 	while (*ptr && isspace((unsigned char) *ptr))
271 		ptr++;
272 	if (*ptr)
273 		ereport(ERROR,
274 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
275 				 errmsg("malformed record literal: \"%s\"", string),
276 				 errdetail("Junk after right parenthesis.")));
277 
278 	tuple = heap_form_tuple(tupdesc, values, nulls);
279 
280 	/*
281 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
282 	 * part of a larger chunk, and our caller may expect to be able to pfree
283 	 * our result.  So must copy the info into a new palloc chunk.
284 	 */
285 	result = (HeapTupleHeader) palloc(tuple->t_len);
286 	memcpy(result, tuple->t_data, tuple->t_len);
287 
288 	heap_freetuple(tuple);
289 	pfree(buf.data);
290 	pfree(values);
291 	pfree(nulls);
292 	ReleaseTupleDesc(tupdesc);
293 
294 	PG_RETURN_HEAPTUPLEHEADER(result);
295 }
296 
297 /*
298  * record_out		- output routine for any composite type.
299  */
300 Datum
record_out(PG_FUNCTION_ARGS)301 record_out(PG_FUNCTION_ARGS)
302 {
303 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
304 	Oid			tupType;
305 	int32		tupTypmod;
306 	TupleDesc	tupdesc;
307 	HeapTupleData tuple;
308 	RecordIOData *my_extra;
309 	bool		needComma = false;
310 	int			ncolumns;
311 	int			i;
312 	Datum	   *values;
313 	bool	   *nulls;
314 	StringInfoData buf;
315 
316 	check_stack_depth();		/* recurses for record-type columns */
317 
318 	/* Extract type info from the tuple itself */
319 	tupType = HeapTupleHeaderGetTypeId(rec);
320 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
321 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
322 	ncolumns = tupdesc->natts;
323 
324 	/* Build a temporary HeapTuple control structure */
325 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
326 	ItemPointerSetInvalid(&(tuple.t_self));
327 	tuple.t_tableOid = InvalidOid;
328 	tuple.t_data = rec;
329 
330 	/*
331 	 * We arrange to look up the needed I/O info just once per series of
332 	 * calls, assuming the record type doesn't change underneath us.
333 	 */
334 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
335 	if (my_extra == NULL ||
336 		my_extra->ncolumns != ncolumns)
337 	{
338 		fcinfo->flinfo->fn_extra =
339 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
340 							   offsetof(RecordIOData, columns) +
341 							   ncolumns * sizeof(ColumnIOData));
342 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
343 		my_extra->record_type = InvalidOid;
344 		my_extra->record_typmod = 0;
345 	}
346 
347 	if (my_extra->record_type != tupType ||
348 		my_extra->record_typmod != tupTypmod)
349 	{
350 		MemSet(my_extra, 0,
351 			   offsetof(RecordIOData, columns) +
352 			   ncolumns * sizeof(ColumnIOData));
353 		my_extra->record_type = tupType;
354 		my_extra->record_typmod = tupTypmod;
355 		my_extra->ncolumns = ncolumns;
356 	}
357 
358 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
359 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
360 
361 	/* Break down the tuple into fields */
362 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
363 
364 	/* And build the result string */
365 	initStringInfo(&buf);
366 
367 	appendStringInfoChar(&buf, '(');
368 
369 	for (i = 0; i < ncolumns; i++)
370 	{
371 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
372 		ColumnIOData *column_info = &my_extra->columns[i];
373 		Oid			column_type = att->atttypid;
374 		Datum		attr;
375 		char	   *value;
376 		char	   *tmp;
377 		bool		nq;
378 
379 		/* Ignore dropped columns in datatype */
380 		if (att->attisdropped)
381 			continue;
382 
383 		if (needComma)
384 			appendStringInfoChar(&buf, ',');
385 		needComma = true;
386 
387 		if (nulls[i])
388 		{
389 			/* emit nothing... */
390 			continue;
391 		}
392 
393 		/*
394 		 * Convert the column value to text
395 		 */
396 		if (column_info->column_type != column_type)
397 		{
398 			getTypeOutputInfo(column_type,
399 							  &column_info->typiofunc,
400 							  &column_info->typisvarlena);
401 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
402 						  fcinfo->flinfo->fn_mcxt);
403 			column_info->column_type = column_type;
404 		}
405 
406 		attr = values[i];
407 		value = OutputFunctionCall(&column_info->proc, attr);
408 
409 		/* Detect whether we need double quotes for this value */
410 		nq = (value[0] == '\0');	/* force quotes for empty string */
411 		for (tmp = value; *tmp; tmp++)
412 		{
413 			char		ch = *tmp;
414 
415 			if (ch == '"' || ch == '\\' ||
416 				ch == '(' || ch == ')' || ch == ',' ||
417 				isspace((unsigned char) ch))
418 			{
419 				nq = true;
420 				break;
421 			}
422 		}
423 
424 		/* And emit the string */
425 		if (nq)
426 			appendStringInfoCharMacro(&buf, '"');
427 		for (tmp = value; *tmp; tmp++)
428 		{
429 			char		ch = *tmp;
430 
431 			if (ch == '"' || ch == '\\')
432 				appendStringInfoCharMacro(&buf, ch);
433 			appendStringInfoCharMacro(&buf, ch);
434 		}
435 		if (nq)
436 			appendStringInfoCharMacro(&buf, '"');
437 	}
438 
439 	appendStringInfoChar(&buf, ')');
440 
441 	pfree(values);
442 	pfree(nulls);
443 	ReleaseTupleDesc(tupdesc);
444 
445 	PG_RETURN_CSTRING(buf.data);
446 }
447 
448 /*
449  * record_recv		- binary input routine for any composite type.
450  */
451 Datum
record_recv(PG_FUNCTION_ARGS)452 record_recv(PG_FUNCTION_ARGS)
453 {
454 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
455 	Oid			tupType = PG_GETARG_OID(1);
456 	int32		tupTypmod = PG_GETARG_INT32(2);
457 	HeapTupleHeader result;
458 	TupleDesc	tupdesc;
459 	HeapTuple	tuple;
460 	RecordIOData *my_extra;
461 	int			ncolumns;
462 	int			usercols;
463 	int			validcols;
464 	int			i;
465 	Datum	   *values;
466 	bool	   *nulls;
467 
468 	check_stack_depth();		/* recurses for record-type columns */
469 
470 	/*
471 	 * Give a friendly error message if we did not get enough info to identify
472 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
473 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
474 	 * for typmod, since composite types and RECORD have no type modifiers at
475 	 * the SQL level, and thus must fail for RECORD.  However some callers can
476 	 * supply a valid typmod, and then we can do something useful for RECORD.
477 	 */
478 	if (tupType == RECORDOID && tupTypmod < 0)
479 		ereport(ERROR,
480 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481 				 errmsg("input of anonymous composite types is not implemented")));
482 
483 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
484 	ncolumns = tupdesc->natts;
485 
486 	/*
487 	 * We arrange to look up the needed I/O info just once per series of
488 	 * calls, assuming the record type doesn't change underneath us.
489 	 */
490 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
491 	if (my_extra == NULL ||
492 		my_extra->ncolumns != ncolumns)
493 	{
494 		fcinfo->flinfo->fn_extra =
495 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
496 							   offsetof(RecordIOData, columns) +
497 							   ncolumns * sizeof(ColumnIOData));
498 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
499 		my_extra->record_type = InvalidOid;
500 		my_extra->record_typmod = 0;
501 	}
502 
503 	if (my_extra->record_type != tupType ||
504 		my_extra->record_typmod != tupTypmod)
505 	{
506 		MemSet(my_extra, 0,
507 			   offsetof(RecordIOData, columns) +
508 			   ncolumns * sizeof(ColumnIOData));
509 		my_extra->record_type = tupType;
510 		my_extra->record_typmod = tupTypmod;
511 		my_extra->ncolumns = ncolumns;
512 	}
513 
514 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
515 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
516 
517 	/* Fetch number of columns user thinks it has */
518 	usercols = pq_getmsgint(buf, 4);
519 
520 	/* Need to scan to count nondeleted columns */
521 	validcols = 0;
522 	for (i = 0; i < ncolumns; i++)
523 	{
524 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
525 			validcols++;
526 	}
527 	if (usercols != validcols)
528 		ereport(ERROR,
529 				(errcode(ERRCODE_DATATYPE_MISMATCH),
530 				 errmsg("wrong number of columns: %d, expected %d",
531 						usercols, validcols)));
532 
533 	/* Process each column */
534 	for (i = 0; i < ncolumns; i++)
535 	{
536 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
537 		ColumnIOData *column_info = &my_extra->columns[i];
538 		Oid			column_type = att->atttypid;
539 		Oid			coltypoid;
540 		int			itemlen;
541 		StringInfoData item_buf;
542 		StringInfo	bufptr;
543 		char		csave;
544 
545 		/* Ignore dropped columns in datatype, but fill with nulls */
546 		if (att->attisdropped)
547 		{
548 			values[i] = (Datum) 0;
549 			nulls[i] = true;
550 			continue;
551 		}
552 
553 		/* Verify column datatype */
554 		coltypoid = pq_getmsgint(buf, sizeof(Oid));
555 		if (coltypoid != column_type)
556 			ereport(ERROR,
557 					(errcode(ERRCODE_DATATYPE_MISMATCH),
558 					 errmsg("wrong data type: %u, expected %u",
559 							coltypoid, column_type)));
560 
561 		/* Get and check the item length */
562 		itemlen = pq_getmsgint(buf, 4);
563 		if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
564 			ereport(ERROR,
565 					(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
566 					 errmsg("insufficient data left in message")));
567 
568 		if (itemlen == -1)
569 		{
570 			/* -1 length means NULL */
571 			bufptr = NULL;
572 			nulls[i] = true;
573 			csave = 0;			/* keep compiler quiet */
574 		}
575 		else
576 		{
577 			/*
578 			 * Rather than copying data around, we just set up a phony
579 			 * StringInfo pointing to the correct portion of the input buffer.
580 			 * We assume we can scribble on the input buffer so as to maintain
581 			 * the convention that StringInfos have a trailing null.
582 			 */
583 			item_buf.data = &buf->data[buf->cursor];
584 			item_buf.maxlen = itemlen + 1;
585 			item_buf.len = itemlen;
586 			item_buf.cursor = 0;
587 
588 			buf->cursor += itemlen;
589 
590 			csave = buf->data[buf->cursor];
591 			buf->data[buf->cursor] = '\0';
592 
593 			bufptr = &item_buf;
594 			nulls[i] = false;
595 		}
596 
597 		/* Now call the column's receiveproc */
598 		if (column_info->column_type != column_type)
599 		{
600 			getTypeBinaryInputInfo(column_type,
601 								   &column_info->typiofunc,
602 								   &column_info->typioparam);
603 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
604 						  fcinfo->flinfo->fn_mcxt);
605 			column_info->column_type = column_type;
606 		}
607 
608 		values[i] = ReceiveFunctionCall(&column_info->proc,
609 										bufptr,
610 										column_info->typioparam,
611 										att->atttypmod);
612 
613 		if (bufptr)
614 		{
615 			/* Trouble if it didn't eat the whole buffer */
616 			if (item_buf.cursor != itemlen)
617 				ereport(ERROR,
618 						(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
619 						 errmsg("improper binary format in record column %d",
620 								i + 1)));
621 
622 			buf->data[buf->cursor] = csave;
623 		}
624 	}
625 
626 	tuple = heap_form_tuple(tupdesc, values, nulls);
627 
628 	/*
629 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
630 	 * part of a larger chunk, and our caller may expect to be able to pfree
631 	 * our result.  So must copy the info into a new palloc chunk.
632 	 */
633 	result = (HeapTupleHeader) palloc(tuple->t_len);
634 	memcpy(result, tuple->t_data, tuple->t_len);
635 
636 	heap_freetuple(tuple);
637 	pfree(values);
638 	pfree(nulls);
639 	ReleaseTupleDesc(tupdesc);
640 
641 	PG_RETURN_HEAPTUPLEHEADER(result);
642 }
643 
644 /*
645  * record_send		- binary output routine for any composite type.
646  */
647 Datum
record_send(PG_FUNCTION_ARGS)648 record_send(PG_FUNCTION_ARGS)
649 {
650 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
651 	Oid			tupType;
652 	int32		tupTypmod;
653 	TupleDesc	tupdesc;
654 	HeapTupleData tuple;
655 	RecordIOData *my_extra;
656 	int			ncolumns;
657 	int			validcols;
658 	int			i;
659 	Datum	   *values;
660 	bool	   *nulls;
661 	StringInfoData buf;
662 
663 	check_stack_depth();		/* recurses for record-type columns */
664 
665 	/* Extract type info from the tuple itself */
666 	tupType = HeapTupleHeaderGetTypeId(rec);
667 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
668 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
669 	ncolumns = tupdesc->natts;
670 
671 	/* Build a temporary HeapTuple control structure */
672 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
673 	ItemPointerSetInvalid(&(tuple.t_self));
674 	tuple.t_tableOid = InvalidOid;
675 	tuple.t_data = rec;
676 
677 	/*
678 	 * We arrange to look up the needed I/O info just once per series of
679 	 * calls, assuming the record type doesn't change underneath us.
680 	 */
681 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
682 	if (my_extra == NULL ||
683 		my_extra->ncolumns != ncolumns)
684 	{
685 		fcinfo->flinfo->fn_extra =
686 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
687 							   offsetof(RecordIOData, columns) +
688 							   ncolumns * sizeof(ColumnIOData));
689 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
690 		my_extra->record_type = InvalidOid;
691 		my_extra->record_typmod = 0;
692 	}
693 
694 	if (my_extra->record_type != tupType ||
695 		my_extra->record_typmod != tupTypmod)
696 	{
697 		MemSet(my_extra, 0,
698 			   offsetof(RecordIOData, columns) +
699 			   ncolumns * sizeof(ColumnIOData));
700 		my_extra->record_type = tupType;
701 		my_extra->record_typmod = tupTypmod;
702 		my_extra->ncolumns = ncolumns;
703 	}
704 
705 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
706 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
707 
708 	/* Break down the tuple into fields */
709 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
710 
711 	/* And build the result string */
712 	pq_begintypsend(&buf);
713 
714 	/* Need to scan to count nondeleted columns */
715 	validcols = 0;
716 	for (i = 0; i < ncolumns; i++)
717 	{
718 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
719 			validcols++;
720 	}
721 	pq_sendint32(&buf, validcols);
722 
723 	for (i = 0; i < ncolumns; i++)
724 	{
725 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
726 		ColumnIOData *column_info = &my_extra->columns[i];
727 		Oid			column_type = att->atttypid;
728 		Datum		attr;
729 		bytea	   *outputbytes;
730 
731 		/* Ignore dropped columns in datatype */
732 		if (att->attisdropped)
733 			continue;
734 
735 		pq_sendint32(&buf, column_type);
736 
737 		if (nulls[i])
738 		{
739 			/* emit -1 data length to signify a NULL */
740 			pq_sendint32(&buf, -1);
741 			continue;
742 		}
743 
744 		/*
745 		 * Convert the column value to binary
746 		 */
747 		if (column_info->column_type != column_type)
748 		{
749 			getTypeBinaryOutputInfo(column_type,
750 									&column_info->typiofunc,
751 									&column_info->typisvarlena);
752 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
753 						  fcinfo->flinfo->fn_mcxt);
754 			column_info->column_type = column_type;
755 		}
756 
757 		attr = values[i];
758 		outputbytes = SendFunctionCall(&column_info->proc, attr);
759 		pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
760 		pq_sendbytes(&buf, VARDATA(outputbytes),
761 					 VARSIZE(outputbytes) - VARHDRSZ);
762 	}
763 
764 	pfree(values);
765 	pfree(nulls);
766 	ReleaseTupleDesc(tupdesc);
767 
768 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
769 }
770 
771 
772 /*
773  * record_cmp()
774  * Internal comparison function for records.
775  *
776  * Returns -1, 0 or 1
777  *
778  * Do not assume that the two inputs are exactly the same record type;
779  * for instance we might be comparing an anonymous ROW() construct against a
780  * named composite type.  We will compare as long as they have the same number
781  * of non-dropped columns of the same types.
782  */
783 static int
record_cmp(FunctionCallInfo fcinfo)784 record_cmp(FunctionCallInfo fcinfo)
785 {
786 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
787 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
788 	int			result = 0;
789 	Oid			tupType1;
790 	Oid			tupType2;
791 	int32		tupTypmod1;
792 	int32		tupTypmod2;
793 	TupleDesc	tupdesc1;
794 	TupleDesc	tupdesc2;
795 	HeapTupleData tuple1;
796 	HeapTupleData tuple2;
797 	int			ncolumns1;
798 	int			ncolumns2;
799 	RecordCompareData *my_extra;
800 	int			ncols;
801 	Datum	   *values1;
802 	Datum	   *values2;
803 	bool	   *nulls1;
804 	bool	   *nulls2;
805 	int			i1;
806 	int			i2;
807 	int			j;
808 
809 	check_stack_depth();		/* recurses for record-type columns */
810 
811 	/* Extract type info from the tuples */
812 	tupType1 = HeapTupleHeaderGetTypeId(record1);
813 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
814 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
815 	ncolumns1 = tupdesc1->natts;
816 	tupType2 = HeapTupleHeaderGetTypeId(record2);
817 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
818 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
819 	ncolumns2 = tupdesc2->natts;
820 
821 	/* Build temporary HeapTuple control structures */
822 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
823 	ItemPointerSetInvalid(&(tuple1.t_self));
824 	tuple1.t_tableOid = InvalidOid;
825 	tuple1.t_data = record1;
826 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
827 	ItemPointerSetInvalid(&(tuple2.t_self));
828 	tuple2.t_tableOid = InvalidOid;
829 	tuple2.t_data = record2;
830 
831 	/*
832 	 * We arrange to look up the needed comparison info just once per series
833 	 * of calls, assuming the record types don't change underneath us.
834 	 */
835 	ncols = Max(ncolumns1, ncolumns2);
836 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
837 	if (my_extra == NULL ||
838 		my_extra->ncolumns < ncols)
839 	{
840 		fcinfo->flinfo->fn_extra =
841 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
842 							   offsetof(RecordCompareData, columns) +
843 							   ncols * sizeof(ColumnCompareData));
844 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
845 		my_extra->ncolumns = ncols;
846 		my_extra->record1_type = InvalidOid;
847 		my_extra->record1_typmod = 0;
848 		my_extra->record2_type = InvalidOid;
849 		my_extra->record2_typmod = 0;
850 	}
851 
852 	if (my_extra->record1_type != tupType1 ||
853 		my_extra->record1_typmod != tupTypmod1 ||
854 		my_extra->record2_type != tupType2 ||
855 		my_extra->record2_typmod != tupTypmod2)
856 	{
857 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
858 		my_extra->record1_type = tupType1;
859 		my_extra->record1_typmod = tupTypmod1;
860 		my_extra->record2_type = tupType2;
861 		my_extra->record2_typmod = tupTypmod2;
862 	}
863 
864 	/* Break down the tuples into fields */
865 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
866 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
867 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
868 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
869 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
870 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
871 
872 	/*
873 	 * Scan corresponding columns, allowing for dropped columns in different
874 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
875 	 * the logical column index.
876 	 */
877 	i1 = i2 = j = 0;
878 	while (i1 < ncolumns1 || i2 < ncolumns2)
879 	{
880 		Form_pg_attribute att1;
881 		Form_pg_attribute att2;
882 		TypeCacheEntry *typentry;
883 		Oid			collation;
884 
885 		/*
886 		 * Skip dropped columns
887 		 */
888 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
889 		{
890 			i1++;
891 			continue;
892 		}
893 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
894 		{
895 			i2++;
896 			continue;
897 		}
898 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
899 			break;				/* we'll deal with mismatch below loop */
900 
901 		att1 = TupleDescAttr(tupdesc1, i1);
902 		att2 = TupleDescAttr(tupdesc2, i2);
903 
904 		/*
905 		 * Have two matching columns, they must be same type
906 		 */
907 		if (att1->atttypid != att2->atttypid)
908 			ereport(ERROR,
909 					(errcode(ERRCODE_DATATYPE_MISMATCH),
910 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
911 							format_type_be(att1->atttypid),
912 							format_type_be(att2->atttypid),
913 							j + 1)));
914 
915 		/*
916 		 * If they're not same collation, we don't complain here, but the
917 		 * comparison function might.
918 		 */
919 		collation = att1->attcollation;
920 		if (collation != att2->attcollation)
921 			collation = InvalidOid;
922 
923 		/*
924 		 * Lookup the comparison function if not done already
925 		 */
926 		typentry = my_extra->columns[j].typentry;
927 		if (typentry == NULL ||
928 			typentry->type_id != att1->atttypid)
929 		{
930 			typentry = lookup_type_cache(att1->atttypid,
931 										 TYPECACHE_CMP_PROC_FINFO);
932 			if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
933 				ereport(ERROR,
934 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
935 						 errmsg("could not identify a comparison function for type %s",
936 								format_type_be(typentry->type_id))));
937 			my_extra->columns[j].typentry = typentry;
938 		}
939 
940 		/*
941 		 * We consider two NULLs equal; NULL > not-NULL.
942 		 */
943 		if (!nulls1[i1] || !nulls2[i2])
944 		{
945 			FunctionCallInfoData locfcinfo;
946 			int32		cmpresult;
947 
948 			if (nulls1[i1])
949 			{
950 				/* arg1 is greater than arg2 */
951 				result = 1;
952 				break;
953 			}
954 			if (nulls2[i2])
955 			{
956 				/* arg1 is less than arg2 */
957 				result = -1;
958 				break;
959 			}
960 
961 			/* Compare the pair of elements */
962 			InitFunctionCallInfoData(locfcinfo, &typentry->cmp_proc_finfo, 2,
963 									 collation, NULL, NULL);
964 			locfcinfo.arg[0] = values1[i1];
965 			locfcinfo.arg[1] = values2[i2];
966 			locfcinfo.argnull[0] = false;
967 			locfcinfo.argnull[1] = false;
968 			locfcinfo.isnull = false;
969 			cmpresult = DatumGetInt32(FunctionCallInvoke(&locfcinfo));
970 
971 			if (cmpresult < 0)
972 			{
973 				/* arg1 is less than arg2 */
974 				result = -1;
975 				break;
976 			}
977 			else if (cmpresult > 0)
978 			{
979 				/* arg1 is greater than arg2 */
980 				result = 1;
981 				break;
982 			}
983 		}
984 
985 		/* equal, so continue to next column */
986 		i1++, i2++, j++;
987 	}
988 
989 	/*
990 	 * If we didn't break out of the loop early, check for column count
991 	 * mismatch.  (We do not report such mismatch if we found unequal column
992 	 * values; is that a feature or a bug?)
993 	 */
994 	if (result == 0)
995 	{
996 		if (i1 != ncolumns1 || i2 != ncolumns2)
997 			ereport(ERROR,
998 					(errcode(ERRCODE_DATATYPE_MISMATCH),
999 					 errmsg("cannot compare record types with different numbers of columns")));
1000 	}
1001 
1002 	pfree(values1);
1003 	pfree(nulls1);
1004 	pfree(values2);
1005 	pfree(nulls2);
1006 	ReleaseTupleDesc(tupdesc1);
1007 	ReleaseTupleDesc(tupdesc2);
1008 
1009 	/* Avoid leaking memory when handed toasted input. */
1010 	PG_FREE_IF_COPY(record1, 0);
1011 	PG_FREE_IF_COPY(record2, 1);
1012 
1013 	return result;
1014 }
1015 
1016 /*
1017  * record_eq :
1018  *		  compares two records for equality
1019  * result :
1020  *		  returns true if the records are equal, false otherwise.
1021  *
1022  * Note: we do not use record_cmp here, since equality may be meaningful in
1023  * datatypes that don't have a total ordering (and hence no btree support).
1024  */
1025 Datum
record_eq(PG_FUNCTION_ARGS)1026 record_eq(PG_FUNCTION_ARGS)
1027 {
1028 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1029 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1030 	bool		result = true;
1031 	Oid			tupType1;
1032 	Oid			tupType2;
1033 	int32		tupTypmod1;
1034 	int32		tupTypmod2;
1035 	TupleDesc	tupdesc1;
1036 	TupleDesc	tupdesc2;
1037 	HeapTupleData tuple1;
1038 	HeapTupleData tuple2;
1039 	int			ncolumns1;
1040 	int			ncolumns2;
1041 	RecordCompareData *my_extra;
1042 	int			ncols;
1043 	Datum	   *values1;
1044 	Datum	   *values2;
1045 	bool	   *nulls1;
1046 	bool	   *nulls2;
1047 	int			i1;
1048 	int			i2;
1049 	int			j;
1050 
1051 	check_stack_depth();		/* recurses for record-type columns */
1052 
1053 	/* Extract type info from the tuples */
1054 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1055 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1056 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1057 	ncolumns1 = tupdesc1->natts;
1058 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1059 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1060 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1061 	ncolumns2 = tupdesc2->natts;
1062 
1063 	/* Build temporary HeapTuple control structures */
1064 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1065 	ItemPointerSetInvalid(&(tuple1.t_self));
1066 	tuple1.t_tableOid = InvalidOid;
1067 	tuple1.t_data = record1;
1068 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1069 	ItemPointerSetInvalid(&(tuple2.t_self));
1070 	tuple2.t_tableOid = InvalidOid;
1071 	tuple2.t_data = record2;
1072 
1073 	/*
1074 	 * We arrange to look up the needed comparison info just once per series
1075 	 * of calls, assuming the record types don't change underneath us.
1076 	 */
1077 	ncols = Max(ncolumns1, ncolumns2);
1078 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1079 	if (my_extra == NULL ||
1080 		my_extra->ncolumns < ncols)
1081 	{
1082 		fcinfo->flinfo->fn_extra =
1083 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1084 							   offsetof(RecordCompareData, columns) +
1085 							   ncols * sizeof(ColumnCompareData));
1086 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1087 		my_extra->ncolumns = ncols;
1088 		my_extra->record1_type = InvalidOid;
1089 		my_extra->record1_typmod = 0;
1090 		my_extra->record2_type = InvalidOid;
1091 		my_extra->record2_typmod = 0;
1092 	}
1093 
1094 	if (my_extra->record1_type != tupType1 ||
1095 		my_extra->record1_typmod != tupTypmod1 ||
1096 		my_extra->record2_type != tupType2 ||
1097 		my_extra->record2_typmod != tupTypmod2)
1098 	{
1099 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1100 		my_extra->record1_type = tupType1;
1101 		my_extra->record1_typmod = tupTypmod1;
1102 		my_extra->record2_type = tupType2;
1103 		my_extra->record2_typmod = tupTypmod2;
1104 	}
1105 
1106 	/* Break down the tuples into fields */
1107 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1108 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1109 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1110 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1111 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1112 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1113 
1114 	/*
1115 	 * Scan corresponding columns, allowing for dropped columns in different
1116 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1117 	 * the logical column index.
1118 	 */
1119 	i1 = i2 = j = 0;
1120 	while (i1 < ncolumns1 || i2 < ncolumns2)
1121 	{
1122 		Form_pg_attribute att1;
1123 		Form_pg_attribute att2;
1124 		TypeCacheEntry *typentry;
1125 		Oid			collation;
1126 		FunctionCallInfoData locfcinfo;
1127 		bool		oprresult;
1128 
1129 		/*
1130 		 * Skip dropped columns
1131 		 */
1132 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1133 		{
1134 			i1++;
1135 			continue;
1136 		}
1137 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1138 		{
1139 			i2++;
1140 			continue;
1141 		}
1142 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1143 			break;				/* we'll deal with mismatch below loop */
1144 
1145 		att1 = TupleDescAttr(tupdesc1, i1);
1146 		att2 = TupleDescAttr(tupdesc2, i2);
1147 
1148 		/*
1149 		 * Have two matching columns, they must be same type
1150 		 */
1151 		if (att1->atttypid != att2->atttypid)
1152 			ereport(ERROR,
1153 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1154 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1155 							format_type_be(att1->atttypid),
1156 							format_type_be(att2->atttypid),
1157 							j + 1)));
1158 
1159 		/*
1160 		 * If they're not same collation, we don't complain here, but the
1161 		 * equality function might.
1162 		 */
1163 		collation = att1->attcollation;
1164 		if (collation != att2->attcollation)
1165 			collation = InvalidOid;
1166 
1167 		/*
1168 		 * Lookup the equality function if not done already
1169 		 */
1170 		typentry = my_extra->columns[j].typentry;
1171 		if (typentry == NULL ||
1172 			typentry->type_id != att1->atttypid)
1173 		{
1174 			typentry = lookup_type_cache(att1->atttypid,
1175 										 TYPECACHE_EQ_OPR_FINFO);
1176 			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1177 				ereport(ERROR,
1178 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
1179 						 errmsg("could not identify an equality operator for type %s",
1180 								format_type_be(typentry->type_id))));
1181 			my_extra->columns[j].typentry = typentry;
1182 		}
1183 
1184 		/*
1185 		 * We consider two NULLs equal; NULL > not-NULL.
1186 		 */
1187 		if (!nulls1[i1] || !nulls2[i2])
1188 		{
1189 			if (nulls1[i1] || nulls2[i2])
1190 			{
1191 				result = false;
1192 				break;
1193 			}
1194 
1195 			/* Compare the pair of elements */
1196 			InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
1197 									 collation, NULL, NULL);
1198 			locfcinfo.arg[0] = values1[i1];
1199 			locfcinfo.arg[1] = values2[i2];
1200 			locfcinfo.argnull[0] = false;
1201 			locfcinfo.argnull[1] = false;
1202 			locfcinfo.isnull = false;
1203 			oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
1204 			if (!oprresult)
1205 			{
1206 				result = false;
1207 				break;
1208 			}
1209 		}
1210 
1211 		/* equal, so continue to next column */
1212 		i1++, i2++, j++;
1213 	}
1214 
1215 	/*
1216 	 * If we didn't break out of the loop early, check for column count
1217 	 * mismatch.  (We do not report such mismatch if we found unequal column
1218 	 * values; is that a feature or a bug?)
1219 	 */
1220 	if (result)
1221 	{
1222 		if (i1 != ncolumns1 || i2 != ncolumns2)
1223 			ereport(ERROR,
1224 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1225 					 errmsg("cannot compare record types with different numbers of columns")));
1226 	}
1227 
1228 	pfree(values1);
1229 	pfree(nulls1);
1230 	pfree(values2);
1231 	pfree(nulls2);
1232 	ReleaseTupleDesc(tupdesc1);
1233 	ReleaseTupleDesc(tupdesc2);
1234 
1235 	/* Avoid leaking memory when handed toasted input. */
1236 	PG_FREE_IF_COPY(record1, 0);
1237 	PG_FREE_IF_COPY(record2, 1);
1238 
1239 	PG_RETURN_BOOL(result);
1240 }
1241 
1242 Datum
record_ne(PG_FUNCTION_ARGS)1243 record_ne(PG_FUNCTION_ARGS)
1244 {
1245 	PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1246 }
1247 
1248 Datum
record_lt(PG_FUNCTION_ARGS)1249 record_lt(PG_FUNCTION_ARGS)
1250 {
1251 	PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1252 }
1253 
1254 Datum
record_gt(PG_FUNCTION_ARGS)1255 record_gt(PG_FUNCTION_ARGS)
1256 {
1257 	PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1258 }
1259 
1260 Datum
record_le(PG_FUNCTION_ARGS)1261 record_le(PG_FUNCTION_ARGS)
1262 {
1263 	PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1264 }
1265 
1266 Datum
record_ge(PG_FUNCTION_ARGS)1267 record_ge(PG_FUNCTION_ARGS)
1268 {
1269 	PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1270 }
1271 
1272 Datum
btrecordcmp(PG_FUNCTION_ARGS)1273 btrecordcmp(PG_FUNCTION_ARGS)
1274 {
1275 	PG_RETURN_INT32(record_cmp(fcinfo));
1276 }
1277 
1278 
1279 /*
1280  * record_image_cmp :
1281  * Internal byte-oriented comparison function for records.
1282  *
1283  * Returns -1, 0 or 1
1284  *
1285  * Note: The normal concepts of "equality" do not apply here; different
1286  * representation of values considered to be equal are not considered to be
1287  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1288  * they are not identical.
1289  */
1290 static int
record_image_cmp(FunctionCallInfo fcinfo)1291 record_image_cmp(FunctionCallInfo fcinfo)
1292 {
1293 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1294 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1295 	int			result = 0;
1296 	Oid			tupType1;
1297 	Oid			tupType2;
1298 	int32		tupTypmod1;
1299 	int32		tupTypmod2;
1300 	TupleDesc	tupdesc1;
1301 	TupleDesc	tupdesc2;
1302 	HeapTupleData tuple1;
1303 	HeapTupleData tuple2;
1304 	int			ncolumns1;
1305 	int			ncolumns2;
1306 	RecordCompareData *my_extra;
1307 	int			ncols;
1308 	Datum	   *values1;
1309 	Datum	   *values2;
1310 	bool	   *nulls1;
1311 	bool	   *nulls2;
1312 	int			i1;
1313 	int			i2;
1314 	int			j;
1315 
1316 	/* Extract type info from the tuples */
1317 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1318 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1319 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1320 	ncolumns1 = tupdesc1->natts;
1321 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1322 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1323 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1324 	ncolumns2 = tupdesc2->natts;
1325 
1326 	/* Build temporary HeapTuple control structures */
1327 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1328 	ItemPointerSetInvalid(&(tuple1.t_self));
1329 	tuple1.t_tableOid = InvalidOid;
1330 	tuple1.t_data = record1;
1331 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1332 	ItemPointerSetInvalid(&(tuple2.t_self));
1333 	tuple2.t_tableOid = InvalidOid;
1334 	tuple2.t_data = record2;
1335 
1336 	/*
1337 	 * We arrange to look up the needed comparison info just once per series
1338 	 * of calls, assuming the record types don't change underneath us.
1339 	 */
1340 	ncols = Max(ncolumns1, ncolumns2);
1341 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1342 	if (my_extra == NULL ||
1343 		my_extra->ncolumns < ncols)
1344 	{
1345 		fcinfo->flinfo->fn_extra =
1346 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1347 							   offsetof(RecordCompareData, columns) +
1348 							   ncols * sizeof(ColumnCompareData));
1349 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1350 		my_extra->ncolumns = ncols;
1351 		my_extra->record1_type = InvalidOid;
1352 		my_extra->record1_typmod = 0;
1353 		my_extra->record2_type = InvalidOid;
1354 		my_extra->record2_typmod = 0;
1355 	}
1356 
1357 	if (my_extra->record1_type != tupType1 ||
1358 		my_extra->record1_typmod != tupTypmod1 ||
1359 		my_extra->record2_type != tupType2 ||
1360 		my_extra->record2_typmod != tupTypmod2)
1361 	{
1362 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1363 		my_extra->record1_type = tupType1;
1364 		my_extra->record1_typmod = tupTypmod1;
1365 		my_extra->record2_type = tupType2;
1366 		my_extra->record2_typmod = tupTypmod2;
1367 	}
1368 
1369 	/* Break down the tuples into fields */
1370 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1371 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1372 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1373 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1374 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1375 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1376 
1377 	/*
1378 	 * Scan corresponding columns, allowing for dropped columns in different
1379 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1380 	 * the logical column index.
1381 	 */
1382 	i1 = i2 = j = 0;
1383 	while (i1 < ncolumns1 || i2 < ncolumns2)
1384 	{
1385 		Form_pg_attribute att1;
1386 		Form_pg_attribute att2;
1387 
1388 		/*
1389 		 * Skip dropped columns
1390 		 */
1391 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1392 		{
1393 			i1++;
1394 			continue;
1395 		}
1396 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1397 		{
1398 			i2++;
1399 			continue;
1400 		}
1401 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1402 			break;				/* we'll deal with mismatch below loop */
1403 
1404 		att1 = TupleDescAttr(tupdesc1, i1);
1405 		att2 = TupleDescAttr(tupdesc2, i2);
1406 
1407 		/*
1408 		 * Have two matching columns, they must be same type
1409 		 */
1410 		if (att1->atttypid != att2->atttypid)
1411 			ereport(ERROR,
1412 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1413 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1414 							format_type_be(att1->atttypid),
1415 							format_type_be(att2->atttypid),
1416 							j + 1)));
1417 
1418 		/*
1419 		 * The same type should have the same length (or both should be
1420 		 * variable).
1421 		 */
1422 		Assert(att1->attlen == att2->attlen);
1423 
1424 		/*
1425 		 * We consider two NULLs equal; NULL > not-NULL.
1426 		 */
1427 		if (!nulls1[i1] || !nulls2[i2])
1428 		{
1429 			int			cmpresult = 0;
1430 
1431 			if (nulls1[i1])
1432 			{
1433 				/* arg1 is greater than arg2 */
1434 				result = 1;
1435 				break;
1436 			}
1437 			if (nulls2[i2])
1438 			{
1439 				/* arg1 is less than arg2 */
1440 				result = -1;
1441 				break;
1442 			}
1443 
1444 			/* Compare the pair of elements */
1445 			if (att1->attbyval)
1446 			{
1447 				if (values1[i1] != values2[i2])
1448 					cmpresult = (values1[i1] < values2[i2]) ? -1 : 1;
1449 			}
1450 			else if (att1->attlen > 0)
1451 			{
1452 				cmpresult = memcmp(DatumGetPointer(values1[i1]),
1453 								   DatumGetPointer(values2[i2]),
1454 								   att1->attlen);
1455 			}
1456 			else if (att1->attlen == -1)
1457 			{
1458 				Size		len1,
1459 							len2;
1460 				struct varlena *arg1val;
1461 				struct varlena *arg2val;
1462 
1463 				len1 = toast_raw_datum_size(values1[i1]);
1464 				len2 = toast_raw_datum_size(values2[i2]);
1465 				arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1466 				arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1467 
1468 				cmpresult = memcmp(VARDATA_ANY(arg1val),
1469 								   VARDATA_ANY(arg2val),
1470 								   Min(len1, len2) - VARHDRSZ);
1471 				if ((cmpresult == 0) && (len1 != len2))
1472 					cmpresult = (len1 < len2) ? -1 : 1;
1473 
1474 				if ((Pointer) arg1val != (Pointer) values1[i1])
1475 					pfree(arg1val);
1476 				if ((Pointer) arg2val != (Pointer) values2[i2])
1477 					pfree(arg2val);
1478 			}
1479 			else
1480 				elog(ERROR, "unexpected attlen: %d", att1->attlen);
1481 
1482 			if (cmpresult < 0)
1483 			{
1484 				/* arg1 is less than arg2 */
1485 				result = -1;
1486 				break;
1487 			}
1488 			else if (cmpresult > 0)
1489 			{
1490 				/* arg1 is greater than arg2 */
1491 				result = 1;
1492 				break;
1493 			}
1494 		}
1495 
1496 		/* equal, so continue to next column */
1497 		i1++, i2++, j++;
1498 	}
1499 
1500 	/*
1501 	 * If we didn't break out of the loop early, check for column count
1502 	 * mismatch.  (We do not report such mismatch if we found unequal column
1503 	 * values; is that a feature or a bug?)
1504 	 */
1505 	if (result == 0)
1506 	{
1507 		if (i1 != ncolumns1 || i2 != ncolumns2)
1508 			ereport(ERROR,
1509 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1510 					 errmsg("cannot compare record types with different numbers of columns")));
1511 	}
1512 
1513 	pfree(values1);
1514 	pfree(nulls1);
1515 	pfree(values2);
1516 	pfree(nulls2);
1517 	ReleaseTupleDesc(tupdesc1);
1518 	ReleaseTupleDesc(tupdesc2);
1519 
1520 	/* Avoid leaking memory when handed toasted input. */
1521 	PG_FREE_IF_COPY(record1, 0);
1522 	PG_FREE_IF_COPY(record2, 1);
1523 
1524 	return result;
1525 }
1526 
1527 /*
1528  * record_image_eq :
1529  *		  compares two records for identical contents, based on byte images
1530  * result :
1531  *		  returns true if the records are identical, false otherwise.
1532  *
1533  * Note: we do not use record_image_cmp here, since we can avoid
1534  * de-toasting for unequal lengths this way.
1535  */
1536 Datum
record_image_eq(PG_FUNCTION_ARGS)1537 record_image_eq(PG_FUNCTION_ARGS)
1538 {
1539 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1540 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1541 	bool		result = true;
1542 	Oid			tupType1;
1543 	Oid			tupType2;
1544 	int32		tupTypmod1;
1545 	int32		tupTypmod2;
1546 	TupleDesc	tupdesc1;
1547 	TupleDesc	tupdesc2;
1548 	HeapTupleData tuple1;
1549 	HeapTupleData tuple2;
1550 	int			ncolumns1;
1551 	int			ncolumns2;
1552 	RecordCompareData *my_extra;
1553 	int			ncols;
1554 	Datum	   *values1;
1555 	Datum	   *values2;
1556 	bool	   *nulls1;
1557 	bool	   *nulls2;
1558 	int			i1;
1559 	int			i2;
1560 	int			j;
1561 
1562 	/* Extract type info from the tuples */
1563 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1564 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1565 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1566 	ncolumns1 = tupdesc1->natts;
1567 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1568 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1569 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1570 	ncolumns2 = tupdesc2->natts;
1571 
1572 	/* Build temporary HeapTuple control structures */
1573 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1574 	ItemPointerSetInvalid(&(tuple1.t_self));
1575 	tuple1.t_tableOid = InvalidOid;
1576 	tuple1.t_data = record1;
1577 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1578 	ItemPointerSetInvalid(&(tuple2.t_self));
1579 	tuple2.t_tableOid = InvalidOid;
1580 	tuple2.t_data = record2;
1581 
1582 	/*
1583 	 * We arrange to look up the needed comparison info just once per series
1584 	 * of calls, assuming the record types don't change underneath us.
1585 	 */
1586 	ncols = Max(ncolumns1, ncolumns2);
1587 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1588 	if (my_extra == NULL ||
1589 		my_extra->ncolumns < ncols)
1590 	{
1591 		fcinfo->flinfo->fn_extra =
1592 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1593 							   offsetof(RecordCompareData, columns) +
1594 							   ncols * sizeof(ColumnCompareData));
1595 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1596 		my_extra->ncolumns = ncols;
1597 		my_extra->record1_type = InvalidOid;
1598 		my_extra->record1_typmod = 0;
1599 		my_extra->record2_type = InvalidOid;
1600 		my_extra->record2_typmod = 0;
1601 	}
1602 
1603 	if (my_extra->record1_type != tupType1 ||
1604 		my_extra->record1_typmod != tupTypmod1 ||
1605 		my_extra->record2_type != tupType2 ||
1606 		my_extra->record2_typmod != tupTypmod2)
1607 	{
1608 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1609 		my_extra->record1_type = tupType1;
1610 		my_extra->record1_typmod = tupTypmod1;
1611 		my_extra->record2_type = tupType2;
1612 		my_extra->record2_typmod = tupTypmod2;
1613 	}
1614 
1615 	/* Break down the tuples into fields */
1616 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1617 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1618 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1619 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1620 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1621 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1622 
1623 	/*
1624 	 * Scan corresponding columns, allowing for dropped columns in different
1625 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1626 	 * the logical column index.
1627 	 */
1628 	i1 = i2 = j = 0;
1629 	while (i1 < ncolumns1 || i2 < ncolumns2)
1630 	{
1631 		Form_pg_attribute att1;
1632 		Form_pg_attribute att2;
1633 
1634 		/*
1635 		 * Skip dropped columns
1636 		 */
1637 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1638 		{
1639 			i1++;
1640 			continue;
1641 		}
1642 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1643 		{
1644 			i2++;
1645 			continue;
1646 		}
1647 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1648 			break;				/* we'll deal with mismatch below loop */
1649 
1650 		att1 = TupleDescAttr(tupdesc1, i1);
1651 		att2 = TupleDescAttr(tupdesc2, i2);
1652 
1653 		/*
1654 		 * Have two matching columns, they must be same type
1655 		 */
1656 		if (att1->atttypid != att2->atttypid)
1657 			ereport(ERROR,
1658 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1659 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1660 							format_type_be(att1->atttypid),
1661 							format_type_be(att2->atttypid),
1662 							j + 1)));
1663 
1664 		/*
1665 		 * We consider two NULLs equal; NULL > not-NULL.
1666 		 */
1667 		if (!nulls1[i1] || !nulls2[i2])
1668 		{
1669 			if (nulls1[i1] || nulls2[i2])
1670 			{
1671 				result = false;
1672 				break;
1673 			}
1674 
1675 			/* Compare the pair of elements */
1676 			if (att1->attbyval)
1677 			{
1678 				result = (values1[i1] == values2[i2]);
1679 			}
1680 			else if (att1->attlen > 0)
1681 			{
1682 				result = (memcmp(DatumGetPointer(values1[i1]),
1683 								 DatumGetPointer(values2[i2]),
1684 								 att1->attlen) == 0);
1685 			}
1686 			else if (att1->attlen == -1)
1687 			{
1688 				Size		len1,
1689 							len2;
1690 
1691 				len1 = toast_raw_datum_size(values1[i1]);
1692 				len2 = toast_raw_datum_size(values2[i2]);
1693 				/* No need to de-toast if lengths don't match. */
1694 				if (len1 != len2)
1695 					result = false;
1696 				else
1697 				{
1698 					struct varlena *arg1val;
1699 					struct varlena *arg2val;
1700 
1701 					arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1702 					arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1703 
1704 					result = (memcmp(VARDATA_ANY(arg1val),
1705 									 VARDATA_ANY(arg2val),
1706 									 len1 - VARHDRSZ) == 0);
1707 
1708 					/* Only free memory if it's a copy made here. */
1709 					if ((Pointer) arg1val != (Pointer) values1[i1])
1710 						pfree(arg1val);
1711 					if ((Pointer) arg2val != (Pointer) values2[i2])
1712 						pfree(arg2val);
1713 				}
1714 			}
1715 			else
1716 				elog(ERROR, "unexpected attlen: %d", att1->attlen);
1717 
1718 			if (!result)
1719 				break;
1720 		}
1721 
1722 		/* equal, so continue to next column */
1723 		i1++, i2++, j++;
1724 	}
1725 
1726 	/*
1727 	 * If we didn't break out of the loop early, check for column count
1728 	 * mismatch.  (We do not report such mismatch if we found unequal column
1729 	 * values; is that a feature or a bug?)
1730 	 */
1731 	if (result)
1732 	{
1733 		if (i1 != ncolumns1 || i2 != ncolumns2)
1734 			ereport(ERROR,
1735 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1736 					 errmsg("cannot compare record types with different numbers of columns")));
1737 	}
1738 
1739 	pfree(values1);
1740 	pfree(nulls1);
1741 	pfree(values2);
1742 	pfree(nulls2);
1743 	ReleaseTupleDesc(tupdesc1);
1744 	ReleaseTupleDesc(tupdesc2);
1745 
1746 	/* Avoid leaking memory when handed toasted input. */
1747 	PG_FREE_IF_COPY(record1, 0);
1748 	PG_FREE_IF_COPY(record2, 1);
1749 
1750 	PG_RETURN_BOOL(result);
1751 }
1752 
1753 Datum
record_image_ne(PG_FUNCTION_ARGS)1754 record_image_ne(PG_FUNCTION_ARGS)
1755 {
1756 	PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1757 }
1758 
1759 Datum
record_image_lt(PG_FUNCTION_ARGS)1760 record_image_lt(PG_FUNCTION_ARGS)
1761 {
1762 	PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1763 }
1764 
1765 Datum
record_image_gt(PG_FUNCTION_ARGS)1766 record_image_gt(PG_FUNCTION_ARGS)
1767 {
1768 	PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1769 }
1770 
1771 Datum
record_image_le(PG_FUNCTION_ARGS)1772 record_image_le(PG_FUNCTION_ARGS)
1773 {
1774 	PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1775 }
1776 
1777 Datum
record_image_ge(PG_FUNCTION_ARGS)1778 record_image_ge(PG_FUNCTION_ARGS)
1779 {
1780 	PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1781 }
1782 
1783 Datum
btrecordimagecmp(PG_FUNCTION_ARGS)1784 btrecordimagecmp(PG_FUNCTION_ARGS)
1785 {
1786 	PG_RETURN_INT32(record_image_cmp(fcinfo));
1787 }
1788