1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *	  I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 
19 #include "access/htup_details.h"
20 #include "access/tuptoaster.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "miscadmin.h"
25 #include "utils/builtins.h"
26 #include "utils/datum.h"
27 #include "utils/lsyscache.h"
28 #include "utils/typcache.h"
29 
30 
31 /*
32  * structure to cache metadata needed for record I/O
33  */
34 typedef struct ColumnIOData
35 {
36 	Oid			column_type;
37 	Oid			typiofunc;
38 	Oid			typioparam;
39 	bool		typisvarlena;
40 	FmgrInfo	proc;
41 } ColumnIOData;
42 
43 typedef struct RecordIOData
44 {
45 	Oid			record_type;
46 	int32		record_typmod;
47 	int			ncolumns;
48 	ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
49 } RecordIOData;
50 
51 /*
52  * structure to cache metadata needed for record comparison
53  */
54 typedef struct ColumnCompareData
55 {
56 	TypeCacheEntry *typentry;	/* has everything we need, actually */
57 } ColumnCompareData;
58 
59 typedef struct RecordCompareData
60 {
61 	int			ncolumns;		/* allocated length of columns[] */
62 	Oid			record1_type;
63 	int32		record1_typmod;
64 	Oid			record2_type;
65 	int32		record2_typmod;
66 	ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
67 } RecordCompareData;
68 
69 
70 /*
71  * record_in		- input routine for any composite type.
72  */
73 Datum
record_in(PG_FUNCTION_ARGS)74 record_in(PG_FUNCTION_ARGS)
75 {
76 	char	   *string = PG_GETARG_CSTRING(0);
77 	Oid			tupType = PG_GETARG_OID(1);
78 	int32		tupTypmod = PG_GETARG_INT32(2);
79 	HeapTupleHeader result;
80 	TupleDesc	tupdesc;
81 	HeapTuple	tuple;
82 	RecordIOData *my_extra;
83 	bool		needComma = false;
84 	int			ncolumns;
85 	int			i;
86 	char	   *ptr;
87 	Datum	   *values;
88 	bool	   *nulls;
89 	StringInfoData buf;
90 
91 	check_stack_depth();		/* recurses for record-type columns */
92 
93 	/*
94 	 * Give a friendly error message if we did not get enough info to identify
95 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
96 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
97 	 * for typmod, since composite types and RECORD have no type modifiers at
98 	 * the SQL level, and thus must fail for RECORD.  However some callers can
99 	 * supply a valid typmod, and then we can do something useful for RECORD.
100 	 */
101 	if (tupType == RECORDOID && tupTypmod < 0)
102 		ereport(ERROR,
103 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
104 				 errmsg("input of anonymous composite types is not implemented")));
105 
106 	/*
107 	 * This comes from the composite type's pg_type.oid and stores system oids
108 	 * in user tables, specifically DatumTupleFields. This oid must be
109 	 * preserved by binary upgrades.
110 	 */
111 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
112 	ncolumns = tupdesc->natts;
113 
114 	/*
115 	 * We arrange to look up the needed I/O info just once per series of
116 	 * calls, assuming the record type doesn't change underneath us.
117 	 */
118 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
119 	if (my_extra == NULL ||
120 		my_extra->ncolumns != ncolumns)
121 	{
122 		fcinfo->flinfo->fn_extra =
123 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
124 							   offsetof(RecordIOData, columns) +
125 							   ncolumns * sizeof(ColumnIOData));
126 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
127 		my_extra->record_type = InvalidOid;
128 		my_extra->record_typmod = 0;
129 	}
130 
131 	if (my_extra->record_type != tupType ||
132 		my_extra->record_typmod != tupTypmod)
133 	{
134 		MemSet(my_extra, 0,
135 			   offsetof(RecordIOData, columns) +
136 			   ncolumns * sizeof(ColumnIOData));
137 		my_extra->record_type = tupType;
138 		my_extra->record_typmod = tupTypmod;
139 		my_extra->ncolumns = ncolumns;
140 	}
141 
142 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
143 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
144 
145 	/*
146 	 * Scan the string.  We use "buf" to accumulate the de-quoted data for
147 	 * each column, which is then fed to the appropriate input converter.
148 	 */
149 	ptr = string;
150 	/* Allow leading whitespace */
151 	while (*ptr && isspace((unsigned char) *ptr))
152 		ptr++;
153 	if (*ptr++ != '(')
154 		ereport(ERROR,
155 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
156 				 errmsg("malformed record literal: \"%s\"", string),
157 				 errdetail("Missing left parenthesis.")));
158 
159 	initStringInfo(&buf);
160 
161 	for (i = 0; i < ncolumns; i++)
162 	{
163 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
164 		ColumnIOData *column_info = &my_extra->columns[i];
165 		Oid			column_type = att->atttypid;
166 		char	   *column_data;
167 
168 		/* Ignore dropped columns in datatype, but fill with nulls */
169 		if (att->attisdropped)
170 		{
171 			values[i] = (Datum) 0;
172 			nulls[i] = true;
173 			continue;
174 		}
175 
176 		if (needComma)
177 		{
178 			/* Skip comma that separates prior field from this one */
179 			if (*ptr == ',')
180 				ptr++;
181 			else
182 				/* *ptr must be ')' */
183 				ereport(ERROR,
184 						(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
185 						 errmsg("malformed record literal: \"%s\"", string),
186 						 errdetail("Too few columns.")));
187 		}
188 
189 		/* Check for null: completely empty input means null */
190 		if (*ptr == ',' || *ptr == ')')
191 		{
192 			column_data = NULL;
193 			nulls[i] = true;
194 		}
195 		else
196 		{
197 			/* Extract string for this column */
198 			bool		inquote = false;
199 
200 			resetStringInfo(&buf);
201 			while (inquote || !(*ptr == ',' || *ptr == ')'))
202 			{
203 				char		ch = *ptr++;
204 
205 				if (ch == '\0')
206 					ereport(ERROR,
207 							(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
208 							 errmsg("malformed record literal: \"%s\"",
209 									string),
210 							 errdetail("Unexpected end of input.")));
211 				if (ch == '\\')
212 				{
213 					if (*ptr == '\0')
214 						ereport(ERROR,
215 								(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
216 								 errmsg("malformed record literal: \"%s\"",
217 										string),
218 								 errdetail("Unexpected end of input.")));
219 					appendStringInfoChar(&buf, *ptr++);
220 				}
221 				else if (ch == '"')
222 				{
223 					if (!inquote)
224 						inquote = true;
225 					else if (*ptr == '"')
226 					{
227 						/* doubled quote within quote sequence */
228 						appendStringInfoChar(&buf, *ptr++);
229 					}
230 					else
231 						inquote = false;
232 				}
233 				else
234 					appendStringInfoChar(&buf, ch);
235 			}
236 
237 			column_data = buf.data;
238 			nulls[i] = false;
239 		}
240 
241 		/*
242 		 * Convert the column value
243 		 */
244 		if (column_info->column_type != column_type)
245 		{
246 			getTypeInputInfo(column_type,
247 							 &column_info->typiofunc,
248 							 &column_info->typioparam);
249 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
250 						  fcinfo->flinfo->fn_mcxt);
251 			column_info->column_type = column_type;
252 		}
253 
254 		values[i] = InputFunctionCall(&column_info->proc,
255 									  column_data,
256 									  column_info->typioparam,
257 									  att->atttypmod);
258 
259 		/*
260 		 * Prep for next column
261 		 */
262 		needComma = true;
263 	}
264 
265 	if (*ptr++ != ')')
266 		ereport(ERROR,
267 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
268 				 errmsg("malformed record literal: \"%s\"", string),
269 				 errdetail("Too many columns.")));
270 	/* Allow trailing whitespace */
271 	while (*ptr && isspace((unsigned char) *ptr))
272 		ptr++;
273 	if (*ptr)
274 		ereport(ERROR,
275 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
276 				 errmsg("malformed record literal: \"%s\"", string),
277 				 errdetail("Junk after right parenthesis.")));
278 
279 	tuple = heap_form_tuple(tupdesc, values, nulls);
280 
281 	/*
282 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
283 	 * part of a larger chunk, and our caller may expect to be able to pfree
284 	 * our result.  So must copy the info into a new palloc chunk.
285 	 */
286 	result = (HeapTupleHeader) palloc(tuple->t_len);
287 	memcpy(result, tuple->t_data, tuple->t_len);
288 
289 	heap_freetuple(tuple);
290 	pfree(buf.data);
291 	pfree(values);
292 	pfree(nulls);
293 	ReleaseTupleDesc(tupdesc);
294 
295 	PG_RETURN_HEAPTUPLEHEADER(result);
296 }
297 
298 /*
299  * record_out		- output routine for any composite type.
300  */
301 Datum
record_out(PG_FUNCTION_ARGS)302 record_out(PG_FUNCTION_ARGS)
303 {
304 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
305 	Oid			tupType;
306 	int32		tupTypmod;
307 	TupleDesc	tupdesc;
308 	HeapTupleData tuple;
309 	RecordIOData *my_extra;
310 	bool		needComma = false;
311 	int			ncolumns;
312 	int			i;
313 	Datum	   *values;
314 	bool	   *nulls;
315 	StringInfoData buf;
316 
317 	check_stack_depth();		/* recurses for record-type columns */
318 
319 	/* Extract type info from the tuple itself */
320 	tupType = HeapTupleHeaderGetTypeId(rec);
321 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
322 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
323 	ncolumns = tupdesc->natts;
324 
325 	/* Build a temporary HeapTuple control structure */
326 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
327 	ItemPointerSetInvalid(&(tuple.t_self));
328 	tuple.t_tableOid = InvalidOid;
329 	tuple.t_data = rec;
330 
331 	/*
332 	 * We arrange to look up the needed I/O info just once per series of
333 	 * calls, assuming the record type doesn't change underneath us.
334 	 */
335 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
336 	if (my_extra == NULL ||
337 		my_extra->ncolumns != ncolumns)
338 	{
339 		fcinfo->flinfo->fn_extra =
340 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
341 							   offsetof(RecordIOData, columns) +
342 							   ncolumns * sizeof(ColumnIOData));
343 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
344 		my_extra->record_type = InvalidOid;
345 		my_extra->record_typmod = 0;
346 	}
347 
348 	if (my_extra->record_type != tupType ||
349 		my_extra->record_typmod != tupTypmod)
350 	{
351 		MemSet(my_extra, 0,
352 			   offsetof(RecordIOData, columns) +
353 			   ncolumns * sizeof(ColumnIOData));
354 		my_extra->record_type = tupType;
355 		my_extra->record_typmod = tupTypmod;
356 		my_extra->ncolumns = ncolumns;
357 	}
358 
359 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
360 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
361 
362 	/* Break down the tuple into fields */
363 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
364 
365 	/* And build the result string */
366 	initStringInfo(&buf);
367 
368 	appendStringInfoChar(&buf, '(');
369 
370 	for (i = 0; i < ncolumns; i++)
371 	{
372 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
373 		ColumnIOData *column_info = &my_extra->columns[i];
374 		Oid			column_type = att->atttypid;
375 		Datum		attr;
376 		char	   *value;
377 		char	   *tmp;
378 		bool		nq;
379 
380 		/* Ignore dropped columns in datatype */
381 		if (att->attisdropped)
382 			continue;
383 
384 		if (needComma)
385 			appendStringInfoChar(&buf, ',');
386 		needComma = true;
387 
388 		if (nulls[i])
389 		{
390 			/* emit nothing... */
391 			continue;
392 		}
393 
394 		/*
395 		 * Convert the column value to text
396 		 */
397 		if (column_info->column_type != column_type)
398 		{
399 			getTypeOutputInfo(column_type,
400 							  &column_info->typiofunc,
401 							  &column_info->typisvarlena);
402 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
403 						  fcinfo->flinfo->fn_mcxt);
404 			column_info->column_type = column_type;
405 		}
406 
407 		attr = values[i];
408 		value = OutputFunctionCall(&column_info->proc, attr);
409 
410 		/* Detect whether we need double quotes for this value */
411 		nq = (value[0] == '\0');	/* force quotes for empty string */
412 		for (tmp = value; *tmp; tmp++)
413 		{
414 			char		ch = *tmp;
415 
416 			if (ch == '"' || ch == '\\' ||
417 				ch == '(' || ch == ')' || ch == ',' ||
418 				isspace((unsigned char) ch))
419 			{
420 				nq = true;
421 				break;
422 			}
423 		}
424 
425 		/* And emit the string */
426 		if (nq)
427 			appendStringInfoCharMacro(&buf, '"');
428 		for (tmp = value; *tmp; tmp++)
429 		{
430 			char		ch = *tmp;
431 
432 			if (ch == '"' || ch == '\\')
433 				appendStringInfoCharMacro(&buf, ch);
434 			appendStringInfoCharMacro(&buf, ch);
435 		}
436 		if (nq)
437 			appendStringInfoCharMacro(&buf, '"');
438 	}
439 
440 	appendStringInfoChar(&buf, ')');
441 
442 	pfree(values);
443 	pfree(nulls);
444 	ReleaseTupleDesc(tupdesc);
445 
446 	PG_RETURN_CSTRING(buf.data);
447 }
448 
449 /*
450  * record_recv		- binary input routine for any composite type.
451  */
452 Datum
record_recv(PG_FUNCTION_ARGS)453 record_recv(PG_FUNCTION_ARGS)
454 {
455 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
456 	Oid			tupType = PG_GETARG_OID(1);
457 	int32		tupTypmod = PG_GETARG_INT32(2);
458 	HeapTupleHeader result;
459 	TupleDesc	tupdesc;
460 	HeapTuple	tuple;
461 	RecordIOData *my_extra;
462 	int			ncolumns;
463 	int			usercols;
464 	int			validcols;
465 	int			i;
466 	Datum	   *values;
467 	bool	   *nulls;
468 
469 	check_stack_depth();		/* recurses for record-type columns */
470 
471 	/*
472 	 * Give a friendly error message if we did not get enough info to identify
473 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
474 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
475 	 * for typmod, since composite types and RECORD have no type modifiers at
476 	 * the SQL level, and thus must fail for RECORD.  However some callers can
477 	 * supply a valid typmod, and then we can do something useful for RECORD.
478 	 */
479 	if (tupType == RECORDOID && tupTypmod < 0)
480 		ereport(ERROR,
481 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
482 				 errmsg("input of anonymous composite types is not implemented")));
483 
484 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
485 	ncolumns = tupdesc->natts;
486 
487 	/*
488 	 * We arrange to look up the needed I/O info just once per series of
489 	 * calls, assuming the record type doesn't change underneath us.
490 	 */
491 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
492 	if (my_extra == NULL ||
493 		my_extra->ncolumns != ncolumns)
494 	{
495 		fcinfo->flinfo->fn_extra =
496 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
497 							   offsetof(RecordIOData, columns) +
498 							   ncolumns * sizeof(ColumnIOData));
499 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
500 		my_extra->record_type = InvalidOid;
501 		my_extra->record_typmod = 0;
502 	}
503 
504 	if (my_extra->record_type != tupType ||
505 		my_extra->record_typmod != tupTypmod)
506 	{
507 		MemSet(my_extra, 0,
508 			   offsetof(RecordIOData, columns) +
509 			   ncolumns * sizeof(ColumnIOData));
510 		my_extra->record_type = tupType;
511 		my_extra->record_typmod = tupTypmod;
512 		my_extra->ncolumns = ncolumns;
513 	}
514 
515 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
516 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
517 
518 	/* Fetch number of columns user thinks it has */
519 	usercols = pq_getmsgint(buf, 4);
520 
521 	/* Need to scan to count nondeleted columns */
522 	validcols = 0;
523 	for (i = 0; i < ncolumns; i++)
524 	{
525 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
526 			validcols++;
527 	}
528 	if (usercols != validcols)
529 		ereport(ERROR,
530 				(errcode(ERRCODE_DATATYPE_MISMATCH),
531 				 errmsg("wrong number of columns: %d, expected %d",
532 						usercols, validcols)));
533 
534 	/* Process each column */
535 	for (i = 0; i < ncolumns; i++)
536 	{
537 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
538 		ColumnIOData *column_info = &my_extra->columns[i];
539 		Oid			column_type = att->atttypid;
540 		Oid			coltypoid;
541 		int			itemlen;
542 		StringInfoData item_buf;
543 		StringInfo	bufptr;
544 		char		csave;
545 
546 		/* Ignore dropped columns in datatype, but fill with nulls */
547 		if (att->attisdropped)
548 		{
549 			values[i] = (Datum) 0;
550 			nulls[i] = true;
551 			continue;
552 		}
553 
554 		/* Verify column datatype */
555 		coltypoid = pq_getmsgint(buf, sizeof(Oid));
556 		if (coltypoid != column_type)
557 			ereport(ERROR,
558 					(errcode(ERRCODE_DATATYPE_MISMATCH),
559 					 errmsg("wrong data type: %u, expected %u",
560 							coltypoid, column_type)));
561 
562 		/* Get and check the item length */
563 		itemlen = pq_getmsgint(buf, 4);
564 		if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
565 			ereport(ERROR,
566 					(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
567 					 errmsg("insufficient data left in message")));
568 
569 		if (itemlen == -1)
570 		{
571 			/* -1 length means NULL */
572 			bufptr = NULL;
573 			nulls[i] = true;
574 			csave = 0;			/* keep compiler quiet */
575 		}
576 		else
577 		{
578 			/*
579 			 * Rather than copying data around, we just set up a phony
580 			 * StringInfo pointing to the correct portion of the input buffer.
581 			 * We assume we can scribble on the input buffer so as to maintain
582 			 * the convention that StringInfos have a trailing null.
583 			 */
584 			item_buf.data = &buf->data[buf->cursor];
585 			item_buf.maxlen = itemlen + 1;
586 			item_buf.len = itemlen;
587 			item_buf.cursor = 0;
588 
589 			buf->cursor += itemlen;
590 
591 			csave = buf->data[buf->cursor];
592 			buf->data[buf->cursor] = '\0';
593 
594 			bufptr = &item_buf;
595 			nulls[i] = false;
596 		}
597 
598 		/* Now call the column's receiveproc */
599 		if (column_info->column_type != column_type)
600 		{
601 			getTypeBinaryInputInfo(column_type,
602 								   &column_info->typiofunc,
603 								   &column_info->typioparam);
604 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
605 						  fcinfo->flinfo->fn_mcxt);
606 			column_info->column_type = column_type;
607 		}
608 
609 		values[i] = ReceiveFunctionCall(&column_info->proc,
610 										bufptr,
611 										column_info->typioparam,
612 										att->atttypmod);
613 
614 		if (bufptr)
615 		{
616 			/* Trouble if it didn't eat the whole buffer */
617 			if (item_buf.cursor != itemlen)
618 				ereport(ERROR,
619 						(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
620 						 errmsg("improper binary format in record column %d",
621 								i + 1)));
622 
623 			buf->data[buf->cursor] = csave;
624 		}
625 	}
626 
627 	tuple = heap_form_tuple(tupdesc, values, nulls);
628 
629 	/*
630 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
631 	 * part of a larger chunk, and our caller may expect to be able to pfree
632 	 * our result.  So must copy the info into a new palloc chunk.
633 	 */
634 	result = (HeapTupleHeader) palloc(tuple->t_len);
635 	memcpy(result, tuple->t_data, tuple->t_len);
636 
637 	heap_freetuple(tuple);
638 	pfree(values);
639 	pfree(nulls);
640 	ReleaseTupleDesc(tupdesc);
641 
642 	PG_RETURN_HEAPTUPLEHEADER(result);
643 }
644 
645 /*
646  * record_send		- binary output routine for any composite type.
647  */
648 Datum
record_send(PG_FUNCTION_ARGS)649 record_send(PG_FUNCTION_ARGS)
650 {
651 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
652 	Oid			tupType;
653 	int32		tupTypmod;
654 	TupleDesc	tupdesc;
655 	HeapTupleData tuple;
656 	RecordIOData *my_extra;
657 	int			ncolumns;
658 	int			validcols;
659 	int			i;
660 	Datum	   *values;
661 	bool	   *nulls;
662 	StringInfoData buf;
663 
664 	check_stack_depth();		/* recurses for record-type columns */
665 
666 	/* Extract type info from the tuple itself */
667 	tupType = HeapTupleHeaderGetTypeId(rec);
668 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
669 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
670 	ncolumns = tupdesc->natts;
671 
672 	/* Build a temporary HeapTuple control structure */
673 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
674 	ItemPointerSetInvalid(&(tuple.t_self));
675 	tuple.t_tableOid = InvalidOid;
676 	tuple.t_data = rec;
677 
678 	/*
679 	 * We arrange to look up the needed I/O info just once per series of
680 	 * calls, assuming the record type doesn't change underneath us.
681 	 */
682 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
683 	if (my_extra == NULL ||
684 		my_extra->ncolumns != ncolumns)
685 	{
686 		fcinfo->flinfo->fn_extra =
687 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
688 							   offsetof(RecordIOData, columns) +
689 							   ncolumns * sizeof(ColumnIOData));
690 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
691 		my_extra->record_type = InvalidOid;
692 		my_extra->record_typmod = 0;
693 	}
694 
695 	if (my_extra->record_type != tupType ||
696 		my_extra->record_typmod != tupTypmod)
697 	{
698 		MemSet(my_extra, 0,
699 			   offsetof(RecordIOData, columns) +
700 			   ncolumns * sizeof(ColumnIOData));
701 		my_extra->record_type = tupType;
702 		my_extra->record_typmod = tupTypmod;
703 		my_extra->ncolumns = ncolumns;
704 	}
705 
706 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
707 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
708 
709 	/* Break down the tuple into fields */
710 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
711 
712 	/* And build the result string */
713 	pq_begintypsend(&buf);
714 
715 	/* Need to scan to count nondeleted columns */
716 	validcols = 0;
717 	for (i = 0; i < ncolumns; i++)
718 	{
719 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
720 			validcols++;
721 	}
722 	pq_sendint32(&buf, validcols);
723 
724 	for (i = 0; i < ncolumns; i++)
725 	{
726 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
727 		ColumnIOData *column_info = &my_extra->columns[i];
728 		Oid			column_type = att->atttypid;
729 		Datum		attr;
730 		bytea	   *outputbytes;
731 
732 		/* Ignore dropped columns in datatype */
733 		if (att->attisdropped)
734 			continue;
735 
736 		pq_sendint32(&buf, column_type);
737 
738 		if (nulls[i])
739 		{
740 			/* emit -1 data length to signify a NULL */
741 			pq_sendint32(&buf, -1);
742 			continue;
743 		}
744 
745 		/*
746 		 * Convert the column value to binary
747 		 */
748 		if (column_info->column_type != column_type)
749 		{
750 			getTypeBinaryOutputInfo(column_type,
751 									&column_info->typiofunc,
752 									&column_info->typisvarlena);
753 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
754 						  fcinfo->flinfo->fn_mcxt);
755 			column_info->column_type = column_type;
756 		}
757 
758 		attr = values[i];
759 		outputbytes = SendFunctionCall(&column_info->proc, attr);
760 		pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
761 		pq_sendbytes(&buf, VARDATA(outputbytes),
762 					 VARSIZE(outputbytes) - VARHDRSZ);
763 	}
764 
765 	pfree(values);
766 	pfree(nulls);
767 	ReleaseTupleDesc(tupdesc);
768 
769 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
770 }
771 
772 
773 /*
774  * record_cmp()
775  * Internal comparison function for records.
776  *
777  * Returns -1, 0 or 1
778  *
779  * Do not assume that the two inputs are exactly the same record type;
780  * for instance we might be comparing an anonymous ROW() construct against a
781  * named composite type.  We will compare as long as they have the same number
782  * of non-dropped columns of the same types.
783  */
784 static int
record_cmp(FunctionCallInfo fcinfo)785 record_cmp(FunctionCallInfo fcinfo)
786 {
787 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
788 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
789 	int			result = 0;
790 	Oid			tupType1;
791 	Oid			tupType2;
792 	int32		tupTypmod1;
793 	int32		tupTypmod2;
794 	TupleDesc	tupdesc1;
795 	TupleDesc	tupdesc2;
796 	HeapTupleData tuple1;
797 	HeapTupleData tuple2;
798 	int			ncolumns1;
799 	int			ncolumns2;
800 	RecordCompareData *my_extra;
801 	int			ncols;
802 	Datum	   *values1;
803 	Datum	   *values2;
804 	bool	   *nulls1;
805 	bool	   *nulls2;
806 	int			i1;
807 	int			i2;
808 	int			j;
809 
810 	check_stack_depth();		/* recurses for record-type columns */
811 
812 	/* Extract type info from the tuples */
813 	tupType1 = HeapTupleHeaderGetTypeId(record1);
814 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
815 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
816 	ncolumns1 = tupdesc1->natts;
817 	tupType2 = HeapTupleHeaderGetTypeId(record2);
818 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
819 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
820 	ncolumns2 = tupdesc2->natts;
821 
822 	/* Build temporary HeapTuple control structures */
823 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
824 	ItemPointerSetInvalid(&(tuple1.t_self));
825 	tuple1.t_tableOid = InvalidOid;
826 	tuple1.t_data = record1;
827 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
828 	ItemPointerSetInvalid(&(tuple2.t_self));
829 	tuple2.t_tableOid = InvalidOid;
830 	tuple2.t_data = record2;
831 
832 	/*
833 	 * We arrange to look up the needed comparison info just once per series
834 	 * of calls, assuming the record types don't change underneath us.
835 	 */
836 	ncols = Max(ncolumns1, ncolumns2);
837 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
838 	if (my_extra == NULL ||
839 		my_extra->ncolumns < ncols)
840 	{
841 		fcinfo->flinfo->fn_extra =
842 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
843 							   offsetof(RecordCompareData, columns) +
844 							   ncols * sizeof(ColumnCompareData));
845 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
846 		my_extra->ncolumns = ncols;
847 		my_extra->record1_type = InvalidOid;
848 		my_extra->record1_typmod = 0;
849 		my_extra->record2_type = InvalidOid;
850 		my_extra->record2_typmod = 0;
851 	}
852 
853 	if (my_extra->record1_type != tupType1 ||
854 		my_extra->record1_typmod != tupTypmod1 ||
855 		my_extra->record2_type != tupType2 ||
856 		my_extra->record2_typmod != tupTypmod2)
857 	{
858 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
859 		my_extra->record1_type = tupType1;
860 		my_extra->record1_typmod = tupTypmod1;
861 		my_extra->record2_type = tupType2;
862 		my_extra->record2_typmod = tupTypmod2;
863 	}
864 
865 	/* Break down the tuples into fields */
866 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
867 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
868 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
869 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
870 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
871 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
872 
873 	/*
874 	 * Scan corresponding columns, allowing for dropped columns in different
875 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
876 	 * the logical column index.
877 	 */
878 	i1 = i2 = j = 0;
879 	while (i1 < ncolumns1 || i2 < ncolumns2)
880 	{
881 		Form_pg_attribute att1;
882 		Form_pg_attribute att2;
883 		TypeCacheEntry *typentry;
884 		Oid			collation;
885 
886 		/*
887 		 * Skip dropped columns
888 		 */
889 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
890 		{
891 			i1++;
892 			continue;
893 		}
894 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
895 		{
896 			i2++;
897 			continue;
898 		}
899 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
900 			break;				/* we'll deal with mismatch below loop */
901 
902 		att1 = TupleDescAttr(tupdesc1, i1);
903 		att2 = TupleDescAttr(tupdesc2, i2);
904 
905 		/*
906 		 * Have two matching columns, they must be same type
907 		 */
908 		if (att1->atttypid != att2->atttypid)
909 			ereport(ERROR,
910 					(errcode(ERRCODE_DATATYPE_MISMATCH),
911 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
912 							format_type_be(att1->atttypid),
913 							format_type_be(att2->atttypid),
914 							j + 1)));
915 
916 		/*
917 		 * If they're not same collation, we don't complain here, but the
918 		 * comparison function might.
919 		 */
920 		collation = att1->attcollation;
921 		if (collation != att2->attcollation)
922 			collation = InvalidOid;
923 
924 		/*
925 		 * Lookup the comparison function if not done already
926 		 */
927 		typentry = my_extra->columns[j].typentry;
928 		if (typentry == NULL ||
929 			typentry->type_id != att1->atttypid)
930 		{
931 			typentry = lookup_type_cache(att1->atttypid,
932 										 TYPECACHE_CMP_PROC_FINFO);
933 			if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
934 				ereport(ERROR,
935 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
936 						 errmsg("could not identify a comparison function for type %s",
937 								format_type_be(typentry->type_id))));
938 			my_extra->columns[j].typentry = typentry;
939 		}
940 
941 		/*
942 		 * We consider two NULLs equal; NULL > not-NULL.
943 		 */
944 		if (!nulls1[i1] || !nulls2[i2])
945 		{
946 			LOCAL_FCINFO(locfcinfo, 2);
947 			int32		cmpresult;
948 
949 			if (nulls1[i1])
950 			{
951 				/* arg1 is greater than arg2 */
952 				result = 1;
953 				break;
954 			}
955 			if (nulls2[i2])
956 			{
957 				/* arg1 is less than arg2 */
958 				result = -1;
959 				break;
960 			}
961 
962 			/* Compare the pair of elements */
963 			InitFunctionCallInfoData(*locfcinfo, &typentry->cmp_proc_finfo, 2,
964 									 collation, NULL, NULL);
965 			locfcinfo->args[0].value = values1[i1];
966 			locfcinfo->args[0].isnull = false;
967 			locfcinfo->args[1].value = values2[i2];
968 			locfcinfo->args[1].isnull = false;
969 			locfcinfo->isnull = false;
970 			cmpresult = DatumGetInt32(FunctionCallInvoke(locfcinfo));
971 
972 			if (cmpresult < 0)
973 			{
974 				/* arg1 is less than arg2 */
975 				result = -1;
976 				break;
977 			}
978 			else if (cmpresult > 0)
979 			{
980 				/* arg1 is greater than arg2 */
981 				result = 1;
982 				break;
983 			}
984 		}
985 
986 		/* equal, so continue to next column */
987 		i1++, i2++, j++;
988 	}
989 
990 	/*
991 	 * If we didn't break out of the loop early, check for column count
992 	 * mismatch.  (We do not report such mismatch if we found unequal column
993 	 * values; is that a feature or a bug?)
994 	 */
995 	if (result == 0)
996 	{
997 		if (i1 != ncolumns1 || i2 != ncolumns2)
998 			ereport(ERROR,
999 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1000 					 errmsg("cannot compare record types with different numbers of columns")));
1001 	}
1002 
1003 	pfree(values1);
1004 	pfree(nulls1);
1005 	pfree(values2);
1006 	pfree(nulls2);
1007 	ReleaseTupleDesc(tupdesc1);
1008 	ReleaseTupleDesc(tupdesc2);
1009 
1010 	/* Avoid leaking memory when handed toasted input. */
1011 	PG_FREE_IF_COPY(record1, 0);
1012 	PG_FREE_IF_COPY(record2, 1);
1013 
1014 	return result;
1015 }
1016 
1017 /*
1018  * record_eq :
1019  *		  compares two records for equality
1020  * result :
1021  *		  returns true if the records are equal, false otherwise.
1022  *
1023  * Note: we do not use record_cmp here, since equality may be meaningful in
1024  * datatypes that don't have a total ordering (and hence no btree support).
1025  */
1026 Datum
record_eq(PG_FUNCTION_ARGS)1027 record_eq(PG_FUNCTION_ARGS)
1028 {
1029 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1030 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1031 	bool		result = true;
1032 	Oid			tupType1;
1033 	Oid			tupType2;
1034 	int32		tupTypmod1;
1035 	int32		tupTypmod2;
1036 	TupleDesc	tupdesc1;
1037 	TupleDesc	tupdesc2;
1038 	HeapTupleData tuple1;
1039 	HeapTupleData tuple2;
1040 	int			ncolumns1;
1041 	int			ncolumns2;
1042 	RecordCompareData *my_extra;
1043 	int			ncols;
1044 	Datum	   *values1;
1045 	Datum	   *values2;
1046 	bool	   *nulls1;
1047 	bool	   *nulls2;
1048 	int			i1;
1049 	int			i2;
1050 	int			j;
1051 
1052 	check_stack_depth();		/* recurses for record-type columns */
1053 
1054 	/* Extract type info from the tuples */
1055 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1056 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1057 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1058 	ncolumns1 = tupdesc1->natts;
1059 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1060 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1061 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1062 	ncolumns2 = tupdesc2->natts;
1063 
1064 	/* Build temporary HeapTuple control structures */
1065 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1066 	ItemPointerSetInvalid(&(tuple1.t_self));
1067 	tuple1.t_tableOid = InvalidOid;
1068 	tuple1.t_data = record1;
1069 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1070 	ItemPointerSetInvalid(&(tuple2.t_self));
1071 	tuple2.t_tableOid = InvalidOid;
1072 	tuple2.t_data = record2;
1073 
1074 	/*
1075 	 * We arrange to look up the needed comparison info just once per series
1076 	 * of calls, assuming the record types don't change underneath us.
1077 	 */
1078 	ncols = Max(ncolumns1, ncolumns2);
1079 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1080 	if (my_extra == NULL ||
1081 		my_extra->ncolumns < ncols)
1082 	{
1083 		fcinfo->flinfo->fn_extra =
1084 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1085 							   offsetof(RecordCompareData, columns) +
1086 							   ncols * sizeof(ColumnCompareData));
1087 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1088 		my_extra->ncolumns = ncols;
1089 		my_extra->record1_type = InvalidOid;
1090 		my_extra->record1_typmod = 0;
1091 		my_extra->record2_type = InvalidOid;
1092 		my_extra->record2_typmod = 0;
1093 	}
1094 
1095 	if (my_extra->record1_type != tupType1 ||
1096 		my_extra->record1_typmod != tupTypmod1 ||
1097 		my_extra->record2_type != tupType2 ||
1098 		my_extra->record2_typmod != tupTypmod2)
1099 	{
1100 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1101 		my_extra->record1_type = tupType1;
1102 		my_extra->record1_typmod = tupTypmod1;
1103 		my_extra->record2_type = tupType2;
1104 		my_extra->record2_typmod = tupTypmod2;
1105 	}
1106 
1107 	/* Break down the tuples into fields */
1108 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1109 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1110 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1111 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1112 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1113 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1114 
1115 	/*
1116 	 * Scan corresponding columns, allowing for dropped columns in different
1117 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1118 	 * the logical column index.
1119 	 */
1120 	i1 = i2 = j = 0;
1121 	while (i1 < ncolumns1 || i2 < ncolumns2)
1122 	{
1123 		LOCAL_FCINFO(locfcinfo, 2);
1124 		Form_pg_attribute att1;
1125 		Form_pg_attribute att2;
1126 		TypeCacheEntry *typentry;
1127 		Oid			collation;
1128 		bool		oprresult;
1129 
1130 		/*
1131 		 * Skip dropped columns
1132 		 */
1133 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1134 		{
1135 			i1++;
1136 			continue;
1137 		}
1138 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1139 		{
1140 			i2++;
1141 			continue;
1142 		}
1143 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1144 			break;				/* we'll deal with mismatch below loop */
1145 
1146 		att1 = TupleDescAttr(tupdesc1, i1);
1147 		att2 = TupleDescAttr(tupdesc2, i2);
1148 
1149 		/*
1150 		 * Have two matching columns, they must be same type
1151 		 */
1152 		if (att1->atttypid != att2->atttypid)
1153 			ereport(ERROR,
1154 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1155 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1156 							format_type_be(att1->atttypid),
1157 							format_type_be(att2->atttypid),
1158 							j + 1)));
1159 
1160 		/*
1161 		 * If they're not same collation, we don't complain here, but the
1162 		 * equality function might.
1163 		 */
1164 		collation = att1->attcollation;
1165 		if (collation != att2->attcollation)
1166 			collation = InvalidOid;
1167 
1168 		/*
1169 		 * Lookup the equality function if not done already
1170 		 */
1171 		typentry = my_extra->columns[j].typentry;
1172 		if (typentry == NULL ||
1173 			typentry->type_id != att1->atttypid)
1174 		{
1175 			typentry = lookup_type_cache(att1->atttypid,
1176 										 TYPECACHE_EQ_OPR_FINFO);
1177 			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1178 				ereport(ERROR,
1179 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
1180 						 errmsg("could not identify an equality operator for type %s",
1181 								format_type_be(typentry->type_id))));
1182 			my_extra->columns[j].typentry = typentry;
1183 		}
1184 
1185 		/*
1186 		 * We consider two NULLs equal; NULL > not-NULL.
1187 		 */
1188 		if (!nulls1[i1] || !nulls2[i2])
1189 		{
1190 			if (nulls1[i1] || nulls2[i2])
1191 			{
1192 				result = false;
1193 				break;
1194 			}
1195 
1196 			/* Compare the pair of elements */
1197 			InitFunctionCallInfoData(*locfcinfo, &typentry->eq_opr_finfo, 2,
1198 									 collation, NULL, NULL);
1199 			locfcinfo->args[0].value = values1[i1];
1200 			locfcinfo->args[0].isnull = false;
1201 			locfcinfo->args[1].value = values2[i2];
1202 			locfcinfo->args[1].isnull = false;
1203 			locfcinfo->isnull = false;
1204 			oprresult = DatumGetBool(FunctionCallInvoke(locfcinfo));
1205 			if (!oprresult)
1206 			{
1207 				result = false;
1208 				break;
1209 			}
1210 		}
1211 
1212 		/* equal, so continue to next column */
1213 		i1++, i2++, j++;
1214 	}
1215 
1216 	/*
1217 	 * If we didn't break out of the loop early, check for column count
1218 	 * mismatch.  (We do not report such mismatch if we found unequal column
1219 	 * values; is that a feature or a bug?)
1220 	 */
1221 	if (result)
1222 	{
1223 		if (i1 != ncolumns1 || i2 != ncolumns2)
1224 			ereport(ERROR,
1225 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1226 					 errmsg("cannot compare record types with different numbers of columns")));
1227 	}
1228 
1229 	pfree(values1);
1230 	pfree(nulls1);
1231 	pfree(values2);
1232 	pfree(nulls2);
1233 	ReleaseTupleDesc(tupdesc1);
1234 	ReleaseTupleDesc(tupdesc2);
1235 
1236 	/* Avoid leaking memory when handed toasted input. */
1237 	PG_FREE_IF_COPY(record1, 0);
1238 	PG_FREE_IF_COPY(record2, 1);
1239 
1240 	PG_RETURN_BOOL(result);
1241 }
1242 
1243 Datum
record_ne(PG_FUNCTION_ARGS)1244 record_ne(PG_FUNCTION_ARGS)
1245 {
1246 	PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1247 }
1248 
1249 Datum
record_lt(PG_FUNCTION_ARGS)1250 record_lt(PG_FUNCTION_ARGS)
1251 {
1252 	PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1253 }
1254 
1255 Datum
record_gt(PG_FUNCTION_ARGS)1256 record_gt(PG_FUNCTION_ARGS)
1257 {
1258 	PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1259 }
1260 
1261 Datum
record_le(PG_FUNCTION_ARGS)1262 record_le(PG_FUNCTION_ARGS)
1263 {
1264 	PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1265 }
1266 
1267 Datum
record_ge(PG_FUNCTION_ARGS)1268 record_ge(PG_FUNCTION_ARGS)
1269 {
1270 	PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1271 }
1272 
1273 Datum
btrecordcmp(PG_FUNCTION_ARGS)1274 btrecordcmp(PG_FUNCTION_ARGS)
1275 {
1276 	PG_RETURN_INT32(record_cmp(fcinfo));
1277 }
1278 
1279 
1280 /*
1281  * record_image_cmp :
1282  * Internal byte-oriented comparison function for records.
1283  *
1284  * Returns -1, 0 or 1
1285  *
1286  * Note: The normal concepts of "equality" do not apply here; different
1287  * representation of values considered to be equal are not considered to be
1288  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1289  * they are not identical.
1290  */
1291 static int
record_image_cmp(FunctionCallInfo fcinfo)1292 record_image_cmp(FunctionCallInfo fcinfo)
1293 {
1294 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1295 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1296 	int			result = 0;
1297 	Oid			tupType1;
1298 	Oid			tupType2;
1299 	int32		tupTypmod1;
1300 	int32		tupTypmod2;
1301 	TupleDesc	tupdesc1;
1302 	TupleDesc	tupdesc2;
1303 	HeapTupleData tuple1;
1304 	HeapTupleData tuple2;
1305 	int			ncolumns1;
1306 	int			ncolumns2;
1307 	RecordCompareData *my_extra;
1308 	int			ncols;
1309 	Datum	   *values1;
1310 	Datum	   *values2;
1311 	bool	   *nulls1;
1312 	bool	   *nulls2;
1313 	int			i1;
1314 	int			i2;
1315 	int			j;
1316 
1317 	/* Extract type info from the tuples */
1318 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1319 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1320 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1321 	ncolumns1 = tupdesc1->natts;
1322 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1323 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1324 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1325 	ncolumns2 = tupdesc2->natts;
1326 
1327 	/* Build temporary HeapTuple control structures */
1328 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1329 	ItemPointerSetInvalid(&(tuple1.t_self));
1330 	tuple1.t_tableOid = InvalidOid;
1331 	tuple1.t_data = record1;
1332 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1333 	ItemPointerSetInvalid(&(tuple2.t_self));
1334 	tuple2.t_tableOid = InvalidOid;
1335 	tuple2.t_data = record2;
1336 
1337 	/*
1338 	 * We arrange to look up the needed comparison info just once per series
1339 	 * of calls, assuming the record types don't change underneath us.
1340 	 */
1341 	ncols = Max(ncolumns1, ncolumns2);
1342 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1343 	if (my_extra == NULL ||
1344 		my_extra->ncolumns < ncols)
1345 	{
1346 		fcinfo->flinfo->fn_extra =
1347 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1348 							   offsetof(RecordCompareData, columns) +
1349 							   ncols * sizeof(ColumnCompareData));
1350 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1351 		my_extra->ncolumns = ncols;
1352 		my_extra->record1_type = InvalidOid;
1353 		my_extra->record1_typmod = 0;
1354 		my_extra->record2_type = InvalidOid;
1355 		my_extra->record2_typmod = 0;
1356 	}
1357 
1358 	if (my_extra->record1_type != tupType1 ||
1359 		my_extra->record1_typmod != tupTypmod1 ||
1360 		my_extra->record2_type != tupType2 ||
1361 		my_extra->record2_typmod != tupTypmod2)
1362 	{
1363 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1364 		my_extra->record1_type = tupType1;
1365 		my_extra->record1_typmod = tupTypmod1;
1366 		my_extra->record2_type = tupType2;
1367 		my_extra->record2_typmod = tupTypmod2;
1368 	}
1369 
1370 	/* Break down the tuples into fields */
1371 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1372 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1373 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1374 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1375 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1376 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1377 
1378 	/*
1379 	 * Scan corresponding columns, allowing for dropped columns in different
1380 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1381 	 * the logical column index.
1382 	 */
1383 	i1 = i2 = j = 0;
1384 	while (i1 < ncolumns1 || i2 < ncolumns2)
1385 	{
1386 		Form_pg_attribute att1;
1387 		Form_pg_attribute att2;
1388 
1389 		/*
1390 		 * Skip dropped columns
1391 		 */
1392 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1393 		{
1394 			i1++;
1395 			continue;
1396 		}
1397 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1398 		{
1399 			i2++;
1400 			continue;
1401 		}
1402 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1403 			break;				/* we'll deal with mismatch below loop */
1404 
1405 		att1 = TupleDescAttr(tupdesc1, i1);
1406 		att2 = TupleDescAttr(tupdesc2, i2);
1407 
1408 		/*
1409 		 * Have two matching columns, they must be same type
1410 		 */
1411 		if (att1->atttypid != att2->atttypid)
1412 			ereport(ERROR,
1413 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1414 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1415 							format_type_be(att1->atttypid),
1416 							format_type_be(att2->atttypid),
1417 							j + 1)));
1418 
1419 		/*
1420 		 * The same type should have the same length (or both should be
1421 		 * variable).
1422 		 */
1423 		Assert(att1->attlen == att2->attlen);
1424 
1425 		/*
1426 		 * We consider two NULLs equal; NULL > not-NULL.
1427 		 */
1428 		if (!nulls1[i1] || !nulls2[i2])
1429 		{
1430 			int			cmpresult = 0;
1431 
1432 			if (nulls1[i1])
1433 			{
1434 				/* arg1 is greater than arg2 */
1435 				result = 1;
1436 				break;
1437 			}
1438 			if (nulls2[i2])
1439 			{
1440 				/* arg1 is less than arg2 */
1441 				result = -1;
1442 				break;
1443 			}
1444 
1445 			/* Compare the pair of elements */
1446 			if (att1->attbyval)
1447 			{
1448 				if (values1[i1] != values2[i2])
1449 					cmpresult = (values1[i1] < values2[i2]) ? -1 : 1;
1450 			}
1451 			else if (att1->attlen > 0)
1452 			{
1453 				cmpresult = memcmp(DatumGetPointer(values1[i1]),
1454 								   DatumGetPointer(values2[i2]),
1455 								   att1->attlen);
1456 			}
1457 			else if (att1->attlen == -1)
1458 			{
1459 				Size		len1,
1460 							len2;
1461 				struct varlena *arg1val;
1462 				struct varlena *arg2val;
1463 
1464 				len1 = toast_raw_datum_size(values1[i1]);
1465 				len2 = toast_raw_datum_size(values2[i2]);
1466 				arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1467 				arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1468 
1469 				cmpresult = memcmp(VARDATA_ANY(arg1val),
1470 								   VARDATA_ANY(arg2val),
1471 								   Min(len1, len2) - VARHDRSZ);
1472 				if ((cmpresult == 0) && (len1 != len2))
1473 					cmpresult = (len1 < len2) ? -1 : 1;
1474 
1475 				if ((Pointer) arg1val != (Pointer) values1[i1])
1476 					pfree(arg1val);
1477 				if ((Pointer) arg2val != (Pointer) values2[i2])
1478 					pfree(arg2val);
1479 			}
1480 			else
1481 				elog(ERROR, "unexpected attlen: %d", att1->attlen);
1482 
1483 			if (cmpresult < 0)
1484 			{
1485 				/* arg1 is less than arg2 */
1486 				result = -1;
1487 				break;
1488 			}
1489 			else if (cmpresult > 0)
1490 			{
1491 				/* arg1 is greater than arg2 */
1492 				result = 1;
1493 				break;
1494 			}
1495 		}
1496 
1497 		/* equal, so continue to next column */
1498 		i1++, i2++, j++;
1499 	}
1500 
1501 	/*
1502 	 * If we didn't break out of the loop early, check for column count
1503 	 * mismatch.  (We do not report such mismatch if we found unequal column
1504 	 * values; is that a feature or a bug?)
1505 	 */
1506 	if (result == 0)
1507 	{
1508 		if (i1 != ncolumns1 || i2 != ncolumns2)
1509 			ereport(ERROR,
1510 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1511 					 errmsg("cannot compare record types with different numbers of columns")));
1512 	}
1513 
1514 	pfree(values1);
1515 	pfree(nulls1);
1516 	pfree(values2);
1517 	pfree(nulls2);
1518 	ReleaseTupleDesc(tupdesc1);
1519 	ReleaseTupleDesc(tupdesc2);
1520 
1521 	/* Avoid leaking memory when handed toasted input. */
1522 	PG_FREE_IF_COPY(record1, 0);
1523 	PG_FREE_IF_COPY(record2, 1);
1524 
1525 	return result;
1526 }
1527 
1528 /*
1529  * record_image_eq :
1530  *		  compares two records for identical contents, based on byte images
1531  * result :
1532  *		  returns true if the records are identical, false otherwise.
1533  *
1534  * Note: we do not use record_image_cmp here, since we can avoid
1535  * de-toasting for unequal lengths this way.
1536  */
1537 Datum
record_image_eq(PG_FUNCTION_ARGS)1538 record_image_eq(PG_FUNCTION_ARGS)
1539 {
1540 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1541 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1542 	bool		result = true;
1543 	Oid			tupType1;
1544 	Oid			tupType2;
1545 	int32		tupTypmod1;
1546 	int32		tupTypmod2;
1547 	TupleDesc	tupdesc1;
1548 	TupleDesc	tupdesc2;
1549 	HeapTupleData tuple1;
1550 	HeapTupleData tuple2;
1551 	int			ncolumns1;
1552 	int			ncolumns2;
1553 	RecordCompareData *my_extra;
1554 	int			ncols;
1555 	Datum	   *values1;
1556 	Datum	   *values2;
1557 	bool	   *nulls1;
1558 	bool	   *nulls2;
1559 	int			i1;
1560 	int			i2;
1561 	int			j;
1562 
1563 	/* Extract type info from the tuples */
1564 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1565 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1566 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1567 	ncolumns1 = tupdesc1->natts;
1568 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1569 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1570 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1571 	ncolumns2 = tupdesc2->natts;
1572 
1573 	/* Build temporary HeapTuple control structures */
1574 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1575 	ItemPointerSetInvalid(&(tuple1.t_self));
1576 	tuple1.t_tableOid = InvalidOid;
1577 	tuple1.t_data = record1;
1578 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1579 	ItemPointerSetInvalid(&(tuple2.t_self));
1580 	tuple2.t_tableOid = InvalidOid;
1581 	tuple2.t_data = record2;
1582 
1583 	/*
1584 	 * We arrange to look up the needed comparison info just once per series
1585 	 * of calls, assuming the record types don't change underneath us.
1586 	 */
1587 	ncols = Max(ncolumns1, ncolumns2);
1588 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1589 	if (my_extra == NULL ||
1590 		my_extra->ncolumns < ncols)
1591 	{
1592 		fcinfo->flinfo->fn_extra =
1593 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1594 							   offsetof(RecordCompareData, columns) +
1595 							   ncols * sizeof(ColumnCompareData));
1596 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1597 		my_extra->ncolumns = ncols;
1598 		my_extra->record1_type = InvalidOid;
1599 		my_extra->record1_typmod = 0;
1600 		my_extra->record2_type = InvalidOid;
1601 		my_extra->record2_typmod = 0;
1602 	}
1603 
1604 	if (my_extra->record1_type != tupType1 ||
1605 		my_extra->record1_typmod != tupTypmod1 ||
1606 		my_extra->record2_type != tupType2 ||
1607 		my_extra->record2_typmod != tupTypmod2)
1608 	{
1609 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1610 		my_extra->record1_type = tupType1;
1611 		my_extra->record1_typmod = tupTypmod1;
1612 		my_extra->record2_type = tupType2;
1613 		my_extra->record2_typmod = tupTypmod2;
1614 	}
1615 
1616 	/* Break down the tuples into fields */
1617 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1618 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1619 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1620 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1621 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1622 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1623 
1624 	/*
1625 	 * Scan corresponding columns, allowing for dropped columns in different
1626 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1627 	 * the logical column index.
1628 	 */
1629 	i1 = i2 = j = 0;
1630 	while (i1 < ncolumns1 || i2 < ncolumns2)
1631 	{
1632 		Form_pg_attribute att1;
1633 		Form_pg_attribute att2;
1634 
1635 		/*
1636 		 * Skip dropped columns
1637 		 */
1638 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1639 		{
1640 			i1++;
1641 			continue;
1642 		}
1643 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1644 		{
1645 			i2++;
1646 			continue;
1647 		}
1648 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1649 			break;				/* we'll deal with mismatch below loop */
1650 
1651 		att1 = TupleDescAttr(tupdesc1, i1);
1652 		att2 = TupleDescAttr(tupdesc2, i2);
1653 
1654 		/*
1655 		 * Have two matching columns, they must be same type
1656 		 */
1657 		if (att1->atttypid != att2->atttypid)
1658 			ereport(ERROR,
1659 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1660 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1661 							format_type_be(att1->atttypid),
1662 							format_type_be(att2->atttypid),
1663 							j + 1)));
1664 
1665 		/*
1666 		 * We consider two NULLs equal; NULL > not-NULL.
1667 		 */
1668 		if (!nulls1[i1] || !nulls2[i2])
1669 		{
1670 			if (nulls1[i1] || nulls2[i2])
1671 			{
1672 				result = false;
1673 				break;
1674 			}
1675 
1676 			/* Compare the pair of elements */
1677 			result = datum_image_eq(values1[i1], values2[i2], att1->attbyval, att2->attlen);
1678 			if (!result)
1679 				break;
1680 		}
1681 
1682 		/* equal, so continue to next column */
1683 		i1++, i2++, j++;
1684 	}
1685 
1686 	/*
1687 	 * If we didn't break out of the loop early, check for column count
1688 	 * mismatch.  (We do not report such mismatch if we found unequal column
1689 	 * values; is that a feature or a bug?)
1690 	 */
1691 	if (result)
1692 	{
1693 		if (i1 != ncolumns1 || i2 != ncolumns2)
1694 			ereport(ERROR,
1695 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1696 					 errmsg("cannot compare record types with different numbers of columns")));
1697 	}
1698 
1699 	pfree(values1);
1700 	pfree(nulls1);
1701 	pfree(values2);
1702 	pfree(nulls2);
1703 	ReleaseTupleDesc(tupdesc1);
1704 	ReleaseTupleDesc(tupdesc2);
1705 
1706 	/* Avoid leaking memory when handed toasted input. */
1707 	PG_FREE_IF_COPY(record1, 0);
1708 	PG_FREE_IF_COPY(record2, 1);
1709 
1710 	PG_RETURN_BOOL(result);
1711 }
1712 
1713 Datum
record_image_ne(PG_FUNCTION_ARGS)1714 record_image_ne(PG_FUNCTION_ARGS)
1715 {
1716 	PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1717 }
1718 
1719 Datum
record_image_lt(PG_FUNCTION_ARGS)1720 record_image_lt(PG_FUNCTION_ARGS)
1721 {
1722 	PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1723 }
1724 
1725 Datum
record_image_gt(PG_FUNCTION_ARGS)1726 record_image_gt(PG_FUNCTION_ARGS)
1727 {
1728 	PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1729 }
1730 
1731 Datum
record_image_le(PG_FUNCTION_ARGS)1732 record_image_le(PG_FUNCTION_ARGS)
1733 {
1734 	PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1735 }
1736 
1737 Datum
record_image_ge(PG_FUNCTION_ARGS)1738 record_image_ge(PG_FUNCTION_ARGS)
1739 {
1740 	PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1741 }
1742 
1743 Datum
btrecordimagecmp(PG_FUNCTION_ARGS)1744 btrecordimagecmp(PG_FUNCTION_ARGS)
1745 {
1746 	PG_RETURN_INT32(record_image_cmp(fcinfo));
1747 }
1748