1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *	  I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 
19 #include "access/detoast.h"
20 #include "access/htup_details.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "miscadmin.h"
25 #include "utils/builtins.h"
26 #include "utils/datum.h"
27 #include "utils/lsyscache.h"
28 #include "utils/typcache.h"
29 
30 
31 /*
32  * structure to cache metadata needed for record I/O
33  */
34 typedef struct ColumnIOData
35 {
36 	Oid			column_type;
37 	Oid			typiofunc;
38 	Oid			typioparam;
39 	bool		typisvarlena;
40 	FmgrInfo	proc;
41 } ColumnIOData;
42 
43 typedef struct RecordIOData
44 {
45 	Oid			record_type;
46 	int32		record_typmod;
47 	int			ncolumns;
48 	ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
49 } RecordIOData;
50 
51 /*
52  * structure to cache metadata needed for record comparison
53  */
54 typedef struct ColumnCompareData
55 {
56 	TypeCacheEntry *typentry;	/* has everything we need, actually */
57 } ColumnCompareData;
58 
59 typedef struct RecordCompareData
60 {
61 	int			ncolumns;		/* allocated length of columns[] */
62 	Oid			record1_type;
63 	int32		record1_typmod;
64 	Oid			record2_type;
65 	int32		record2_typmod;
66 	ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
67 } RecordCompareData;
68 
69 
70 /*
71  * record_in		- input routine for any composite type.
72  */
73 Datum
record_in(PG_FUNCTION_ARGS)74 record_in(PG_FUNCTION_ARGS)
75 {
76 	char	   *string = PG_GETARG_CSTRING(0);
77 	Oid			tupType = PG_GETARG_OID(1);
78 	int32		tupTypmod = PG_GETARG_INT32(2);
79 	HeapTupleHeader result;
80 	TupleDesc	tupdesc;
81 	HeapTuple	tuple;
82 	RecordIOData *my_extra;
83 	bool		needComma = false;
84 	int			ncolumns;
85 	int			i;
86 	char	   *ptr;
87 	Datum	   *values;
88 	bool	   *nulls;
89 	StringInfoData buf;
90 
91 	check_stack_depth();		/* recurses for record-type columns */
92 
93 	/*
94 	 * Give a friendly error message if we did not get enough info to identify
95 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
96 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
97 	 * for typmod, since composite types and RECORD have no type modifiers at
98 	 * the SQL level, and thus must fail for RECORD.  However some callers can
99 	 * supply a valid typmod, and then we can do something useful for RECORD.
100 	 */
101 	if (tupType == RECORDOID && tupTypmod < 0)
102 		ereport(ERROR,
103 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
104 				 errmsg("input of anonymous composite types is not implemented")));
105 
106 	/*
107 	 * This comes from the composite type's pg_type.oid and stores system oids
108 	 * in user tables, specifically DatumTupleFields. This oid must be
109 	 * preserved by binary upgrades.
110 	 */
111 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
112 	ncolumns = tupdesc->natts;
113 
114 	/*
115 	 * We arrange to look up the needed I/O info just once per series of
116 	 * calls, assuming the record type doesn't change underneath us.
117 	 */
118 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
119 	if (my_extra == NULL ||
120 		my_extra->ncolumns != ncolumns)
121 	{
122 		fcinfo->flinfo->fn_extra =
123 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
124 							   offsetof(RecordIOData, columns) +
125 							   ncolumns * sizeof(ColumnIOData));
126 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
127 		my_extra->record_type = InvalidOid;
128 		my_extra->record_typmod = 0;
129 	}
130 
131 	if (my_extra->record_type != tupType ||
132 		my_extra->record_typmod != tupTypmod)
133 	{
134 		MemSet(my_extra, 0,
135 			   offsetof(RecordIOData, columns) +
136 			   ncolumns * sizeof(ColumnIOData));
137 		my_extra->record_type = tupType;
138 		my_extra->record_typmod = tupTypmod;
139 		my_extra->ncolumns = ncolumns;
140 	}
141 
142 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
143 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
144 
145 	/*
146 	 * Scan the string.  We use "buf" to accumulate the de-quoted data for
147 	 * each column, which is then fed to the appropriate input converter.
148 	 */
149 	ptr = string;
150 	/* Allow leading whitespace */
151 	while (*ptr && isspace((unsigned char) *ptr))
152 		ptr++;
153 	if (*ptr++ != '(')
154 		ereport(ERROR,
155 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
156 				 errmsg("malformed record literal: \"%s\"", string),
157 				 errdetail("Missing left parenthesis.")));
158 
159 	initStringInfo(&buf);
160 
161 	for (i = 0; i < ncolumns; i++)
162 	{
163 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
164 		ColumnIOData *column_info = &my_extra->columns[i];
165 		Oid			column_type = att->atttypid;
166 		char	   *column_data;
167 
168 		/* Ignore dropped columns in datatype, but fill with nulls */
169 		if (att->attisdropped)
170 		{
171 			values[i] = (Datum) 0;
172 			nulls[i] = true;
173 			continue;
174 		}
175 
176 		if (needComma)
177 		{
178 			/* Skip comma that separates prior field from this one */
179 			if (*ptr == ',')
180 				ptr++;
181 			else
182 				/* *ptr must be ')' */
183 				ereport(ERROR,
184 						(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
185 						 errmsg("malformed record literal: \"%s\"", string),
186 						 errdetail("Too few columns.")));
187 		}
188 
189 		/* Check for null: completely empty input means null */
190 		if (*ptr == ',' || *ptr == ')')
191 		{
192 			column_data = NULL;
193 			nulls[i] = true;
194 		}
195 		else
196 		{
197 			/* Extract string for this column */
198 			bool		inquote = false;
199 
200 			resetStringInfo(&buf);
201 			while (inquote || !(*ptr == ',' || *ptr == ')'))
202 			{
203 				char		ch = *ptr++;
204 
205 				if (ch == '\0')
206 					ereport(ERROR,
207 							(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
208 							 errmsg("malformed record literal: \"%s\"",
209 									string),
210 							 errdetail("Unexpected end of input.")));
211 				if (ch == '\\')
212 				{
213 					if (*ptr == '\0')
214 						ereport(ERROR,
215 								(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
216 								 errmsg("malformed record literal: \"%s\"",
217 										string),
218 								 errdetail("Unexpected end of input.")));
219 					appendStringInfoChar(&buf, *ptr++);
220 				}
221 				else if (ch == '"')
222 				{
223 					if (!inquote)
224 						inquote = true;
225 					else if (*ptr == '"')
226 					{
227 						/* doubled quote within quote sequence */
228 						appendStringInfoChar(&buf, *ptr++);
229 					}
230 					else
231 						inquote = false;
232 				}
233 				else
234 					appendStringInfoChar(&buf, ch);
235 			}
236 
237 			column_data = buf.data;
238 			nulls[i] = false;
239 		}
240 
241 		/*
242 		 * Convert the column value
243 		 */
244 		if (column_info->column_type != column_type)
245 		{
246 			getTypeInputInfo(column_type,
247 							 &column_info->typiofunc,
248 							 &column_info->typioparam);
249 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
250 						  fcinfo->flinfo->fn_mcxt);
251 			column_info->column_type = column_type;
252 		}
253 
254 		values[i] = InputFunctionCall(&column_info->proc,
255 									  column_data,
256 									  column_info->typioparam,
257 									  att->atttypmod);
258 
259 		/*
260 		 * Prep for next column
261 		 */
262 		needComma = true;
263 	}
264 
265 	if (*ptr++ != ')')
266 		ereport(ERROR,
267 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
268 				 errmsg("malformed record literal: \"%s\"", string),
269 				 errdetail("Too many columns.")));
270 	/* Allow trailing whitespace */
271 	while (*ptr && isspace((unsigned char) *ptr))
272 		ptr++;
273 	if (*ptr)
274 		ereport(ERROR,
275 				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
276 				 errmsg("malformed record literal: \"%s\"", string),
277 				 errdetail("Junk after right parenthesis.")));
278 
279 	tuple = heap_form_tuple(tupdesc, values, nulls);
280 
281 	/*
282 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
283 	 * part of a larger chunk, and our caller may expect to be able to pfree
284 	 * our result.  So must copy the info into a new palloc chunk.
285 	 */
286 	result = (HeapTupleHeader) palloc(tuple->t_len);
287 	memcpy(result, tuple->t_data, tuple->t_len);
288 
289 	heap_freetuple(tuple);
290 	pfree(buf.data);
291 	pfree(values);
292 	pfree(nulls);
293 	ReleaseTupleDesc(tupdesc);
294 
295 	PG_RETURN_HEAPTUPLEHEADER(result);
296 }
297 
298 /*
299  * record_out		- output routine for any composite type.
300  */
301 Datum
record_out(PG_FUNCTION_ARGS)302 record_out(PG_FUNCTION_ARGS)
303 {
304 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
305 	Oid			tupType;
306 	int32		tupTypmod;
307 	TupleDesc	tupdesc;
308 	HeapTupleData tuple;
309 	RecordIOData *my_extra;
310 	bool		needComma = false;
311 	int			ncolumns;
312 	int			i;
313 	Datum	   *values;
314 	bool	   *nulls;
315 	StringInfoData buf;
316 
317 	check_stack_depth();		/* recurses for record-type columns */
318 
319 	/* Extract type info from the tuple itself */
320 	tupType = HeapTupleHeaderGetTypeId(rec);
321 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
322 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
323 	ncolumns = tupdesc->natts;
324 
325 	/* Build a temporary HeapTuple control structure */
326 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
327 	ItemPointerSetInvalid(&(tuple.t_self));
328 	tuple.t_tableOid = InvalidOid;
329 	tuple.t_data = rec;
330 
331 	/*
332 	 * We arrange to look up the needed I/O info just once per series of
333 	 * calls, assuming the record type doesn't change underneath us.
334 	 */
335 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
336 	if (my_extra == NULL ||
337 		my_extra->ncolumns != ncolumns)
338 	{
339 		fcinfo->flinfo->fn_extra =
340 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
341 							   offsetof(RecordIOData, columns) +
342 							   ncolumns * sizeof(ColumnIOData));
343 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
344 		my_extra->record_type = InvalidOid;
345 		my_extra->record_typmod = 0;
346 	}
347 
348 	if (my_extra->record_type != tupType ||
349 		my_extra->record_typmod != tupTypmod)
350 	{
351 		MemSet(my_extra, 0,
352 			   offsetof(RecordIOData, columns) +
353 			   ncolumns * sizeof(ColumnIOData));
354 		my_extra->record_type = tupType;
355 		my_extra->record_typmod = tupTypmod;
356 		my_extra->ncolumns = ncolumns;
357 	}
358 
359 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
360 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
361 
362 	/* Break down the tuple into fields */
363 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
364 
365 	/* And build the result string */
366 	initStringInfo(&buf);
367 
368 	appendStringInfoChar(&buf, '(');
369 
370 	for (i = 0; i < ncolumns; i++)
371 	{
372 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
373 		ColumnIOData *column_info = &my_extra->columns[i];
374 		Oid			column_type = att->atttypid;
375 		Datum		attr;
376 		char	   *value;
377 		char	   *tmp;
378 		bool		nq;
379 
380 		/* Ignore dropped columns in datatype */
381 		if (att->attisdropped)
382 			continue;
383 
384 		if (needComma)
385 			appendStringInfoChar(&buf, ',');
386 		needComma = true;
387 
388 		if (nulls[i])
389 		{
390 			/* emit nothing... */
391 			continue;
392 		}
393 
394 		/*
395 		 * Convert the column value to text
396 		 */
397 		if (column_info->column_type != column_type)
398 		{
399 			getTypeOutputInfo(column_type,
400 							  &column_info->typiofunc,
401 							  &column_info->typisvarlena);
402 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
403 						  fcinfo->flinfo->fn_mcxt);
404 			column_info->column_type = column_type;
405 		}
406 
407 		attr = values[i];
408 		value = OutputFunctionCall(&column_info->proc, attr);
409 
410 		/* Detect whether we need double quotes for this value */
411 		nq = (value[0] == '\0');	/* force quotes for empty string */
412 		for (tmp = value; *tmp; tmp++)
413 		{
414 			char		ch = *tmp;
415 
416 			if (ch == '"' || ch == '\\' ||
417 				ch == '(' || ch == ')' || ch == ',' ||
418 				isspace((unsigned char) ch))
419 			{
420 				nq = true;
421 				break;
422 			}
423 		}
424 
425 		/* And emit the string */
426 		if (nq)
427 			appendStringInfoCharMacro(&buf, '"');
428 		for (tmp = value; *tmp; tmp++)
429 		{
430 			char		ch = *tmp;
431 
432 			if (ch == '"' || ch == '\\')
433 				appendStringInfoCharMacro(&buf, ch);
434 			appendStringInfoCharMacro(&buf, ch);
435 		}
436 		if (nq)
437 			appendStringInfoCharMacro(&buf, '"');
438 	}
439 
440 	appendStringInfoChar(&buf, ')');
441 
442 	pfree(values);
443 	pfree(nulls);
444 	ReleaseTupleDesc(tupdesc);
445 
446 	PG_RETURN_CSTRING(buf.data);
447 }
448 
449 /*
450  * record_recv		- binary input routine for any composite type.
451  */
452 Datum
record_recv(PG_FUNCTION_ARGS)453 record_recv(PG_FUNCTION_ARGS)
454 {
455 	StringInfo	buf = (StringInfo) PG_GETARG_POINTER(0);
456 	Oid			tupType = PG_GETARG_OID(1);
457 	int32		tupTypmod = PG_GETARG_INT32(2);
458 	HeapTupleHeader result;
459 	TupleDesc	tupdesc;
460 	HeapTuple	tuple;
461 	RecordIOData *my_extra;
462 	int			ncolumns;
463 	int			usercols;
464 	int			validcols;
465 	int			i;
466 	Datum	   *values;
467 	bool	   *nulls;
468 
469 	check_stack_depth();		/* recurses for record-type columns */
470 
471 	/*
472 	 * Give a friendly error message if we did not get enough info to identify
473 	 * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
474 	 * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
475 	 * for typmod, since composite types and RECORD have no type modifiers at
476 	 * the SQL level, and thus must fail for RECORD.  However some callers can
477 	 * supply a valid typmod, and then we can do something useful for RECORD.
478 	 */
479 	if (tupType == RECORDOID && tupTypmod < 0)
480 		ereport(ERROR,
481 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
482 				 errmsg("input of anonymous composite types is not implemented")));
483 
484 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
485 	ncolumns = tupdesc->natts;
486 
487 	/*
488 	 * We arrange to look up the needed I/O info just once per series of
489 	 * calls, assuming the record type doesn't change underneath us.
490 	 */
491 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
492 	if (my_extra == NULL ||
493 		my_extra->ncolumns != ncolumns)
494 	{
495 		fcinfo->flinfo->fn_extra =
496 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
497 							   offsetof(RecordIOData, columns) +
498 							   ncolumns * sizeof(ColumnIOData));
499 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
500 		my_extra->record_type = InvalidOid;
501 		my_extra->record_typmod = 0;
502 	}
503 
504 	if (my_extra->record_type != tupType ||
505 		my_extra->record_typmod != tupTypmod)
506 	{
507 		MemSet(my_extra, 0,
508 			   offsetof(RecordIOData, columns) +
509 			   ncolumns * sizeof(ColumnIOData));
510 		my_extra->record_type = tupType;
511 		my_extra->record_typmod = tupTypmod;
512 		my_extra->ncolumns = ncolumns;
513 	}
514 
515 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
516 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
517 
518 	/* Fetch number of columns user thinks it has */
519 	usercols = pq_getmsgint(buf, 4);
520 
521 	/* Need to scan to count nondeleted columns */
522 	validcols = 0;
523 	for (i = 0; i < ncolumns; i++)
524 	{
525 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
526 			validcols++;
527 	}
528 	if (usercols != validcols)
529 		ereport(ERROR,
530 				(errcode(ERRCODE_DATATYPE_MISMATCH),
531 				 errmsg("wrong number of columns: %d, expected %d",
532 						usercols, validcols)));
533 
534 	/* Process each column */
535 	for (i = 0; i < ncolumns; i++)
536 	{
537 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
538 		ColumnIOData *column_info = &my_extra->columns[i];
539 		Oid			column_type = att->atttypid;
540 		Oid			coltypoid;
541 		int			itemlen;
542 		StringInfoData item_buf;
543 		StringInfo	bufptr;
544 		char		csave;
545 
546 		/* Ignore dropped columns in datatype, but fill with nulls */
547 		if (att->attisdropped)
548 		{
549 			values[i] = (Datum) 0;
550 			nulls[i] = true;
551 			continue;
552 		}
553 
554 		/* Verify column datatype */
555 		coltypoid = pq_getmsgint(buf, sizeof(Oid));
556 		if (coltypoid != column_type)
557 			ereport(ERROR,
558 					(errcode(ERRCODE_DATATYPE_MISMATCH),
559 					 errmsg("wrong data type: %u, expected %u",
560 							coltypoid, column_type)));
561 
562 		/* Get and check the item length */
563 		itemlen = pq_getmsgint(buf, 4);
564 		if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
565 			ereport(ERROR,
566 					(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
567 					 errmsg("insufficient data left in message")));
568 
569 		if (itemlen == -1)
570 		{
571 			/* -1 length means NULL */
572 			bufptr = NULL;
573 			nulls[i] = true;
574 			csave = 0;			/* keep compiler quiet */
575 		}
576 		else
577 		{
578 			/*
579 			 * Rather than copying data around, we just set up a phony
580 			 * StringInfo pointing to the correct portion of the input buffer.
581 			 * We assume we can scribble on the input buffer so as to maintain
582 			 * the convention that StringInfos have a trailing null.
583 			 */
584 			item_buf.data = &buf->data[buf->cursor];
585 			item_buf.maxlen = itemlen + 1;
586 			item_buf.len = itemlen;
587 			item_buf.cursor = 0;
588 
589 			buf->cursor += itemlen;
590 
591 			csave = buf->data[buf->cursor];
592 			buf->data[buf->cursor] = '\0';
593 
594 			bufptr = &item_buf;
595 			nulls[i] = false;
596 		}
597 
598 		/* Now call the column's receiveproc */
599 		if (column_info->column_type != column_type)
600 		{
601 			getTypeBinaryInputInfo(column_type,
602 								   &column_info->typiofunc,
603 								   &column_info->typioparam);
604 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
605 						  fcinfo->flinfo->fn_mcxt);
606 			column_info->column_type = column_type;
607 		}
608 
609 		values[i] = ReceiveFunctionCall(&column_info->proc,
610 										bufptr,
611 										column_info->typioparam,
612 										att->atttypmod);
613 
614 		if (bufptr)
615 		{
616 			/* Trouble if it didn't eat the whole buffer */
617 			if (item_buf.cursor != itemlen)
618 				ereport(ERROR,
619 						(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
620 						 errmsg("improper binary format in record column %d",
621 								i + 1)));
622 
623 			buf->data[buf->cursor] = csave;
624 		}
625 	}
626 
627 	tuple = heap_form_tuple(tupdesc, values, nulls);
628 
629 	/*
630 	 * We cannot return tuple->t_data because heap_form_tuple allocates it as
631 	 * part of a larger chunk, and our caller may expect to be able to pfree
632 	 * our result.  So must copy the info into a new palloc chunk.
633 	 */
634 	result = (HeapTupleHeader) palloc(tuple->t_len);
635 	memcpy(result, tuple->t_data, tuple->t_len);
636 
637 	heap_freetuple(tuple);
638 	pfree(values);
639 	pfree(nulls);
640 	ReleaseTupleDesc(tupdesc);
641 
642 	PG_RETURN_HEAPTUPLEHEADER(result);
643 }
644 
645 /*
646  * record_send		- binary output routine for any composite type.
647  */
648 Datum
record_send(PG_FUNCTION_ARGS)649 record_send(PG_FUNCTION_ARGS)
650 {
651 	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
652 	Oid			tupType;
653 	int32		tupTypmod;
654 	TupleDesc	tupdesc;
655 	HeapTupleData tuple;
656 	RecordIOData *my_extra;
657 	int			ncolumns;
658 	int			validcols;
659 	int			i;
660 	Datum	   *values;
661 	bool	   *nulls;
662 	StringInfoData buf;
663 
664 	check_stack_depth();		/* recurses for record-type columns */
665 
666 	/* Extract type info from the tuple itself */
667 	tupType = HeapTupleHeaderGetTypeId(rec);
668 	tupTypmod = HeapTupleHeaderGetTypMod(rec);
669 	tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
670 	ncolumns = tupdesc->natts;
671 
672 	/* Build a temporary HeapTuple control structure */
673 	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
674 	ItemPointerSetInvalid(&(tuple.t_self));
675 	tuple.t_tableOid = InvalidOid;
676 	tuple.t_data = rec;
677 
678 	/*
679 	 * We arrange to look up the needed I/O info just once per series of
680 	 * calls, assuming the record type doesn't change underneath us.
681 	 */
682 	my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
683 	if (my_extra == NULL ||
684 		my_extra->ncolumns != ncolumns)
685 	{
686 		fcinfo->flinfo->fn_extra =
687 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
688 							   offsetof(RecordIOData, columns) +
689 							   ncolumns * sizeof(ColumnIOData));
690 		my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
691 		my_extra->record_type = InvalidOid;
692 		my_extra->record_typmod = 0;
693 	}
694 
695 	if (my_extra->record_type != tupType ||
696 		my_extra->record_typmod != tupTypmod)
697 	{
698 		MemSet(my_extra, 0,
699 			   offsetof(RecordIOData, columns) +
700 			   ncolumns * sizeof(ColumnIOData));
701 		my_extra->record_type = tupType;
702 		my_extra->record_typmod = tupTypmod;
703 		my_extra->ncolumns = ncolumns;
704 	}
705 
706 	values = (Datum *) palloc(ncolumns * sizeof(Datum));
707 	nulls = (bool *) palloc(ncolumns * sizeof(bool));
708 
709 	/* Break down the tuple into fields */
710 	heap_deform_tuple(&tuple, tupdesc, values, nulls);
711 
712 	/* And build the result string */
713 	pq_begintypsend(&buf);
714 
715 	/* Need to scan to count nondeleted columns */
716 	validcols = 0;
717 	for (i = 0; i < ncolumns; i++)
718 	{
719 		if (!TupleDescAttr(tupdesc, i)->attisdropped)
720 			validcols++;
721 	}
722 	pq_sendint32(&buf, validcols);
723 
724 	for (i = 0; i < ncolumns; i++)
725 	{
726 		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
727 		ColumnIOData *column_info = &my_extra->columns[i];
728 		Oid			column_type = att->atttypid;
729 		Datum		attr;
730 		bytea	   *outputbytes;
731 
732 		/* Ignore dropped columns in datatype */
733 		if (att->attisdropped)
734 			continue;
735 
736 		pq_sendint32(&buf, column_type);
737 
738 		if (nulls[i])
739 		{
740 			/* emit -1 data length to signify a NULL */
741 			pq_sendint32(&buf, -1);
742 			continue;
743 		}
744 
745 		/*
746 		 * Convert the column value to binary
747 		 */
748 		if (column_info->column_type != column_type)
749 		{
750 			getTypeBinaryOutputInfo(column_type,
751 									&column_info->typiofunc,
752 									&column_info->typisvarlena);
753 			fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
754 						  fcinfo->flinfo->fn_mcxt);
755 			column_info->column_type = column_type;
756 		}
757 
758 		attr = values[i];
759 		outputbytes = SendFunctionCall(&column_info->proc, attr);
760 		pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
761 		pq_sendbytes(&buf, VARDATA(outputbytes),
762 					 VARSIZE(outputbytes) - VARHDRSZ);
763 	}
764 
765 	pfree(values);
766 	pfree(nulls);
767 	ReleaseTupleDesc(tupdesc);
768 
769 	PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
770 }
771 
772 
773 /*
774  * record_cmp()
775  * Internal comparison function for records.
776  *
777  * Returns -1, 0 or 1
778  *
779  * Do not assume that the two inputs are exactly the same record type;
780  * for instance we might be comparing an anonymous ROW() construct against a
781  * named composite type.  We will compare as long as they have the same number
782  * of non-dropped columns of the same types.
783  */
784 static int
record_cmp(FunctionCallInfo fcinfo)785 record_cmp(FunctionCallInfo fcinfo)
786 {
787 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
788 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
789 	int			result = 0;
790 	Oid			tupType1;
791 	Oid			tupType2;
792 	int32		tupTypmod1;
793 	int32		tupTypmod2;
794 	TupleDesc	tupdesc1;
795 	TupleDesc	tupdesc2;
796 	HeapTupleData tuple1;
797 	HeapTupleData tuple2;
798 	int			ncolumns1;
799 	int			ncolumns2;
800 	RecordCompareData *my_extra;
801 	int			ncols;
802 	Datum	   *values1;
803 	Datum	   *values2;
804 	bool	   *nulls1;
805 	bool	   *nulls2;
806 	int			i1;
807 	int			i2;
808 	int			j;
809 
810 	check_stack_depth();		/* recurses for record-type columns */
811 
812 	/* Extract type info from the tuples */
813 	tupType1 = HeapTupleHeaderGetTypeId(record1);
814 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
815 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
816 	ncolumns1 = tupdesc1->natts;
817 	tupType2 = HeapTupleHeaderGetTypeId(record2);
818 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
819 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
820 	ncolumns2 = tupdesc2->natts;
821 
822 	/* Build temporary HeapTuple control structures */
823 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
824 	ItemPointerSetInvalid(&(tuple1.t_self));
825 	tuple1.t_tableOid = InvalidOid;
826 	tuple1.t_data = record1;
827 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
828 	ItemPointerSetInvalid(&(tuple2.t_self));
829 	tuple2.t_tableOid = InvalidOid;
830 	tuple2.t_data = record2;
831 
832 	/*
833 	 * We arrange to look up the needed comparison info just once per series
834 	 * of calls, assuming the record types don't change underneath us.
835 	 */
836 	ncols = Max(ncolumns1, ncolumns2);
837 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
838 	if (my_extra == NULL ||
839 		my_extra->ncolumns < ncols)
840 	{
841 		fcinfo->flinfo->fn_extra =
842 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
843 							   offsetof(RecordCompareData, columns) +
844 							   ncols * sizeof(ColumnCompareData));
845 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
846 		my_extra->ncolumns = ncols;
847 		my_extra->record1_type = InvalidOid;
848 		my_extra->record1_typmod = 0;
849 		my_extra->record2_type = InvalidOid;
850 		my_extra->record2_typmod = 0;
851 	}
852 
853 	if (my_extra->record1_type != tupType1 ||
854 		my_extra->record1_typmod != tupTypmod1 ||
855 		my_extra->record2_type != tupType2 ||
856 		my_extra->record2_typmod != tupTypmod2)
857 	{
858 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
859 		my_extra->record1_type = tupType1;
860 		my_extra->record1_typmod = tupTypmod1;
861 		my_extra->record2_type = tupType2;
862 		my_extra->record2_typmod = tupTypmod2;
863 	}
864 
865 	/* Break down the tuples into fields */
866 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
867 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
868 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
869 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
870 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
871 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
872 
873 	/*
874 	 * Scan corresponding columns, allowing for dropped columns in different
875 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
876 	 * the logical column index.
877 	 */
878 	i1 = i2 = j = 0;
879 	while (i1 < ncolumns1 || i2 < ncolumns2)
880 	{
881 		Form_pg_attribute att1;
882 		Form_pg_attribute att2;
883 		TypeCacheEntry *typentry;
884 		Oid			collation;
885 
886 		/*
887 		 * Skip dropped columns
888 		 */
889 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
890 		{
891 			i1++;
892 			continue;
893 		}
894 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
895 		{
896 			i2++;
897 			continue;
898 		}
899 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
900 			break;				/* we'll deal with mismatch below loop */
901 
902 		att1 = TupleDescAttr(tupdesc1, i1);
903 		att2 = TupleDescAttr(tupdesc2, i2);
904 
905 		/*
906 		 * Have two matching columns, they must be same type
907 		 */
908 		if (att1->atttypid != att2->atttypid)
909 			ereport(ERROR,
910 					(errcode(ERRCODE_DATATYPE_MISMATCH),
911 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
912 							format_type_be(att1->atttypid),
913 							format_type_be(att2->atttypid),
914 							j + 1)));
915 
916 		/*
917 		 * If they're not same collation, we don't complain here, but the
918 		 * comparison function might.
919 		 */
920 		collation = att1->attcollation;
921 		if (collation != att2->attcollation)
922 			collation = InvalidOid;
923 
924 		/*
925 		 * Lookup the comparison function if not done already
926 		 */
927 		typentry = my_extra->columns[j].typentry;
928 		if (typentry == NULL ||
929 			typentry->type_id != att1->atttypid)
930 		{
931 			typentry = lookup_type_cache(att1->atttypid,
932 										 TYPECACHE_CMP_PROC_FINFO);
933 			if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
934 				ereport(ERROR,
935 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
936 						 errmsg("could not identify a comparison function for type %s",
937 								format_type_be(typentry->type_id))));
938 			my_extra->columns[j].typentry = typentry;
939 		}
940 
941 		/*
942 		 * We consider two NULLs equal; NULL > not-NULL.
943 		 */
944 		if (!nulls1[i1] || !nulls2[i2])
945 		{
946 			LOCAL_FCINFO(locfcinfo, 2);
947 			int32		cmpresult;
948 
949 			if (nulls1[i1])
950 			{
951 				/* arg1 is greater than arg2 */
952 				result = 1;
953 				break;
954 			}
955 			if (nulls2[i2])
956 			{
957 				/* arg1 is less than arg2 */
958 				result = -1;
959 				break;
960 			}
961 
962 			/* Compare the pair of elements */
963 			InitFunctionCallInfoData(*locfcinfo, &typentry->cmp_proc_finfo, 2,
964 									 collation, NULL, NULL);
965 			locfcinfo->args[0].value = values1[i1];
966 			locfcinfo->args[0].isnull = false;
967 			locfcinfo->args[1].value = values2[i2];
968 			locfcinfo->args[1].isnull = false;
969 			cmpresult = DatumGetInt32(FunctionCallInvoke(locfcinfo));
970 
971 			/* We don't expect comparison support functions to return null */
972 			Assert(!locfcinfo->isnull);
973 
974 			if (cmpresult < 0)
975 			{
976 				/* arg1 is less than arg2 */
977 				result = -1;
978 				break;
979 			}
980 			else if (cmpresult > 0)
981 			{
982 				/* arg1 is greater than arg2 */
983 				result = 1;
984 				break;
985 			}
986 		}
987 
988 		/* equal, so continue to next column */
989 		i1++, i2++, j++;
990 	}
991 
992 	/*
993 	 * If we didn't break out of the loop early, check for column count
994 	 * mismatch.  (We do not report such mismatch if we found unequal column
995 	 * values; is that a feature or a bug?)
996 	 */
997 	if (result == 0)
998 	{
999 		if (i1 != ncolumns1 || i2 != ncolumns2)
1000 			ereport(ERROR,
1001 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1002 					 errmsg("cannot compare record types with different numbers of columns")));
1003 	}
1004 
1005 	pfree(values1);
1006 	pfree(nulls1);
1007 	pfree(values2);
1008 	pfree(nulls2);
1009 	ReleaseTupleDesc(tupdesc1);
1010 	ReleaseTupleDesc(tupdesc2);
1011 
1012 	/* Avoid leaking memory when handed toasted input. */
1013 	PG_FREE_IF_COPY(record1, 0);
1014 	PG_FREE_IF_COPY(record2, 1);
1015 
1016 	return result;
1017 }
1018 
1019 /*
1020  * record_eq :
1021  *		  compares two records for equality
1022  * result :
1023  *		  returns true if the records are equal, false otherwise.
1024  *
1025  * Note: we do not use record_cmp here, since equality may be meaningful in
1026  * datatypes that don't have a total ordering (and hence no btree support).
1027  */
1028 Datum
record_eq(PG_FUNCTION_ARGS)1029 record_eq(PG_FUNCTION_ARGS)
1030 {
1031 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1032 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1033 	bool		result = true;
1034 	Oid			tupType1;
1035 	Oid			tupType2;
1036 	int32		tupTypmod1;
1037 	int32		tupTypmod2;
1038 	TupleDesc	tupdesc1;
1039 	TupleDesc	tupdesc2;
1040 	HeapTupleData tuple1;
1041 	HeapTupleData tuple2;
1042 	int			ncolumns1;
1043 	int			ncolumns2;
1044 	RecordCompareData *my_extra;
1045 	int			ncols;
1046 	Datum	   *values1;
1047 	Datum	   *values2;
1048 	bool	   *nulls1;
1049 	bool	   *nulls2;
1050 	int			i1;
1051 	int			i2;
1052 	int			j;
1053 
1054 	check_stack_depth();		/* recurses for record-type columns */
1055 
1056 	/* Extract type info from the tuples */
1057 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1058 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1059 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1060 	ncolumns1 = tupdesc1->natts;
1061 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1062 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1063 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1064 	ncolumns2 = tupdesc2->natts;
1065 
1066 	/* Build temporary HeapTuple control structures */
1067 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1068 	ItemPointerSetInvalid(&(tuple1.t_self));
1069 	tuple1.t_tableOid = InvalidOid;
1070 	tuple1.t_data = record1;
1071 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1072 	ItemPointerSetInvalid(&(tuple2.t_self));
1073 	tuple2.t_tableOid = InvalidOid;
1074 	tuple2.t_data = record2;
1075 
1076 	/*
1077 	 * We arrange to look up the needed comparison info just once per series
1078 	 * of calls, assuming the record types don't change underneath us.
1079 	 */
1080 	ncols = Max(ncolumns1, ncolumns2);
1081 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1082 	if (my_extra == NULL ||
1083 		my_extra->ncolumns < ncols)
1084 	{
1085 		fcinfo->flinfo->fn_extra =
1086 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1087 							   offsetof(RecordCompareData, columns) +
1088 							   ncols * sizeof(ColumnCompareData));
1089 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1090 		my_extra->ncolumns = ncols;
1091 		my_extra->record1_type = InvalidOid;
1092 		my_extra->record1_typmod = 0;
1093 		my_extra->record2_type = InvalidOid;
1094 		my_extra->record2_typmod = 0;
1095 	}
1096 
1097 	if (my_extra->record1_type != tupType1 ||
1098 		my_extra->record1_typmod != tupTypmod1 ||
1099 		my_extra->record2_type != tupType2 ||
1100 		my_extra->record2_typmod != tupTypmod2)
1101 	{
1102 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1103 		my_extra->record1_type = tupType1;
1104 		my_extra->record1_typmod = tupTypmod1;
1105 		my_extra->record2_type = tupType2;
1106 		my_extra->record2_typmod = tupTypmod2;
1107 	}
1108 
1109 	/* Break down the tuples into fields */
1110 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1111 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1112 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1113 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1114 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1115 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1116 
1117 	/*
1118 	 * Scan corresponding columns, allowing for dropped columns in different
1119 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1120 	 * the logical column index.
1121 	 */
1122 	i1 = i2 = j = 0;
1123 	while (i1 < ncolumns1 || i2 < ncolumns2)
1124 	{
1125 		LOCAL_FCINFO(locfcinfo, 2);
1126 		Form_pg_attribute att1;
1127 		Form_pg_attribute att2;
1128 		TypeCacheEntry *typentry;
1129 		Oid			collation;
1130 		bool		oprresult;
1131 
1132 		/*
1133 		 * Skip dropped columns
1134 		 */
1135 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1136 		{
1137 			i1++;
1138 			continue;
1139 		}
1140 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1141 		{
1142 			i2++;
1143 			continue;
1144 		}
1145 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1146 			break;				/* we'll deal with mismatch below loop */
1147 
1148 		att1 = TupleDescAttr(tupdesc1, i1);
1149 		att2 = TupleDescAttr(tupdesc2, i2);
1150 
1151 		/*
1152 		 * Have two matching columns, they must be same type
1153 		 */
1154 		if (att1->atttypid != att2->atttypid)
1155 			ereport(ERROR,
1156 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1157 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1158 							format_type_be(att1->atttypid),
1159 							format_type_be(att2->atttypid),
1160 							j + 1)));
1161 
1162 		/*
1163 		 * If they're not same collation, we don't complain here, but the
1164 		 * equality function might.
1165 		 */
1166 		collation = att1->attcollation;
1167 		if (collation != att2->attcollation)
1168 			collation = InvalidOid;
1169 
1170 		/*
1171 		 * Lookup the equality function if not done already
1172 		 */
1173 		typentry = my_extra->columns[j].typentry;
1174 		if (typentry == NULL ||
1175 			typentry->type_id != att1->atttypid)
1176 		{
1177 			typentry = lookup_type_cache(att1->atttypid,
1178 										 TYPECACHE_EQ_OPR_FINFO);
1179 			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1180 				ereport(ERROR,
1181 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
1182 						 errmsg("could not identify an equality operator for type %s",
1183 								format_type_be(typentry->type_id))));
1184 			my_extra->columns[j].typentry = typentry;
1185 		}
1186 
1187 		/*
1188 		 * We consider two NULLs equal; NULL > not-NULL.
1189 		 */
1190 		if (!nulls1[i1] || !nulls2[i2])
1191 		{
1192 			if (nulls1[i1] || nulls2[i2])
1193 			{
1194 				result = false;
1195 				break;
1196 			}
1197 
1198 			/* Compare the pair of elements */
1199 			InitFunctionCallInfoData(*locfcinfo, &typentry->eq_opr_finfo, 2,
1200 									 collation, NULL, NULL);
1201 			locfcinfo->args[0].value = values1[i1];
1202 			locfcinfo->args[0].isnull = false;
1203 			locfcinfo->args[1].value = values2[i2];
1204 			locfcinfo->args[1].isnull = false;
1205 			oprresult = DatumGetBool(FunctionCallInvoke(locfcinfo));
1206 			if (locfcinfo->isnull || !oprresult)
1207 			{
1208 				result = false;
1209 				break;
1210 			}
1211 		}
1212 
1213 		/* equal, so continue to next column */
1214 		i1++, i2++, j++;
1215 	}
1216 
1217 	/*
1218 	 * If we didn't break out of the loop early, check for column count
1219 	 * mismatch.  (We do not report such mismatch if we found unequal column
1220 	 * values; is that a feature or a bug?)
1221 	 */
1222 	if (result)
1223 	{
1224 		if (i1 != ncolumns1 || i2 != ncolumns2)
1225 			ereport(ERROR,
1226 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1227 					 errmsg("cannot compare record types with different numbers of columns")));
1228 	}
1229 
1230 	pfree(values1);
1231 	pfree(nulls1);
1232 	pfree(values2);
1233 	pfree(nulls2);
1234 	ReleaseTupleDesc(tupdesc1);
1235 	ReleaseTupleDesc(tupdesc2);
1236 
1237 	/* Avoid leaking memory when handed toasted input. */
1238 	PG_FREE_IF_COPY(record1, 0);
1239 	PG_FREE_IF_COPY(record2, 1);
1240 
1241 	PG_RETURN_BOOL(result);
1242 }
1243 
1244 Datum
record_ne(PG_FUNCTION_ARGS)1245 record_ne(PG_FUNCTION_ARGS)
1246 {
1247 	PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1248 }
1249 
1250 Datum
record_lt(PG_FUNCTION_ARGS)1251 record_lt(PG_FUNCTION_ARGS)
1252 {
1253 	PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1254 }
1255 
1256 Datum
record_gt(PG_FUNCTION_ARGS)1257 record_gt(PG_FUNCTION_ARGS)
1258 {
1259 	PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1260 }
1261 
1262 Datum
record_le(PG_FUNCTION_ARGS)1263 record_le(PG_FUNCTION_ARGS)
1264 {
1265 	PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1266 }
1267 
1268 Datum
record_ge(PG_FUNCTION_ARGS)1269 record_ge(PG_FUNCTION_ARGS)
1270 {
1271 	PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1272 }
1273 
1274 Datum
btrecordcmp(PG_FUNCTION_ARGS)1275 btrecordcmp(PG_FUNCTION_ARGS)
1276 {
1277 	PG_RETURN_INT32(record_cmp(fcinfo));
1278 }
1279 
1280 
1281 /*
1282  * record_image_cmp :
1283  * Internal byte-oriented comparison function for records.
1284  *
1285  * Returns -1, 0 or 1
1286  *
1287  * Note: The normal concepts of "equality" do not apply here; different
1288  * representation of values considered to be equal are not considered to be
1289  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1290  * they are not identical.
1291  */
1292 static int
record_image_cmp(FunctionCallInfo fcinfo)1293 record_image_cmp(FunctionCallInfo fcinfo)
1294 {
1295 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1296 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1297 	int			result = 0;
1298 	Oid			tupType1;
1299 	Oid			tupType2;
1300 	int32		tupTypmod1;
1301 	int32		tupTypmod2;
1302 	TupleDesc	tupdesc1;
1303 	TupleDesc	tupdesc2;
1304 	HeapTupleData tuple1;
1305 	HeapTupleData tuple2;
1306 	int			ncolumns1;
1307 	int			ncolumns2;
1308 	RecordCompareData *my_extra;
1309 	int			ncols;
1310 	Datum	   *values1;
1311 	Datum	   *values2;
1312 	bool	   *nulls1;
1313 	bool	   *nulls2;
1314 	int			i1;
1315 	int			i2;
1316 	int			j;
1317 
1318 	/* Extract type info from the tuples */
1319 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1320 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1321 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1322 	ncolumns1 = tupdesc1->natts;
1323 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1324 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1325 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1326 	ncolumns2 = tupdesc2->natts;
1327 
1328 	/* Build temporary HeapTuple control structures */
1329 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1330 	ItemPointerSetInvalid(&(tuple1.t_self));
1331 	tuple1.t_tableOid = InvalidOid;
1332 	tuple1.t_data = record1;
1333 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1334 	ItemPointerSetInvalid(&(tuple2.t_self));
1335 	tuple2.t_tableOid = InvalidOid;
1336 	tuple2.t_data = record2;
1337 
1338 	/*
1339 	 * We arrange to look up the needed comparison info just once per series
1340 	 * of calls, assuming the record types don't change underneath us.
1341 	 */
1342 	ncols = Max(ncolumns1, ncolumns2);
1343 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1344 	if (my_extra == NULL ||
1345 		my_extra->ncolumns < ncols)
1346 	{
1347 		fcinfo->flinfo->fn_extra =
1348 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1349 							   offsetof(RecordCompareData, columns) +
1350 							   ncols * sizeof(ColumnCompareData));
1351 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1352 		my_extra->ncolumns = ncols;
1353 		my_extra->record1_type = InvalidOid;
1354 		my_extra->record1_typmod = 0;
1355 		my_extra->record2_type = InvalidOid;
1356 		my_extra->record2_typmod = 0;
1357 	}
1358 
1359 	if (my_extra->record1_type != tupType1 ||
1360 		my_extra->record1_typmod != tupTypmod1 ||
1361 		my_extra->record2_type != tupType2 ||
1362 		my_extra->record2_typmod != tupTypmod2)
1363 	{
1364 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1365 		my_extra->record1_type = tupType1;
1366 		my_extra->record1_typmod = tupTypmod1;
1367 		my_extra->record2_type = tupType2;
1368 		my_extra->record2_typmod = tupTypmod2;
1369 	}
1370 
1371 	/* Break down the tuples into fields */
1372 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1373 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1374 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1375 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1376 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1377 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1378 
1379 	/*
1380 	 * Scan corresponding columns, allowing for dropped columns in different
1381 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1382 	 * the logical column index.
1383 	 */
1384 	i1 = i2 = j = 0;
1385 	while (i1 < ncolumns1 || i2 < ncolumns2)
1386 	{
1387 		Form_pg_attribute att1;
1388 		Form_pg_attribute att2;
1389 
1390 		/*
1391 		 * Skip dropped columns
1392 		 */
1393 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1394 		{
1395 			i1++;
1396 			continue;
1397 		}
1398 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1399 		{
1400 			i2++;
1401 			continue;
1402 		}
1403 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1404 			break;				/* we'll deal with mismatch below loop */
1405 
1406 		att1 = TupleDescAttr(tupdesc1, i1);
1407 		att2 = TupleDescAttr(tupdesc2, i2);
1408 
1409 		/*
1410 		 * Have two matching columns, they must be same type
1411 		 */
1412 		if (att1->atttypid != att2->atttypid)
1413 			ereport(ERROR,
1414 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1415 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1416 							format_type_be(att1->atttypid),
1417 							format_type_be(att2->atttypid),
1418 							j + 1)));
1419 
1420 		/*
1421 		 * The same type should have the same length (or both should be
1422 		 * variable).
1423 		 */
1424 		Assert(att1->attlen == att2->attlen);
1425 
1426 		/*
1427 		 * We consider two NULLs equal; NULL > not-NULL.
1428 		 */
1429 		if (!nulls1[i1] || !nulls2[i2])
1430 		{
1431 			int			cmpresult = 0;
1432 
1433 			if (nulls1[i1])
1434 			{
1435 				/* arg1 is greater than arg2 */
1436 				result = 1;
1437 				break;
1438 			}
1439 			if (nulls2[i2])
1440 			{
1441 				/* arg1 is less than arg2 */
1442 				result = -1;
1443 				break;
1444 			}
1445 
1446 			/* Compare the pair of elements */
1447 			if (att1->attbyval)
1448 			{
1449 				if (values1[i1] != values2[i2])
1450 					cmpresult = (values1[i1] < values2[i2]) ? -1 : 1;
1451 			}
1452 			else if (att1->attlen > 0)
1453 			{
1454 				cmpresult = memcmp(DatumGetPointer(values1[i1]),
1455 								   DatumGetPointer(values2[i2]),
1456 								   att1->attlen);
1457 			}
1458 			else if (att1->attlen == -1)
1459 			{
1460 				Size		len1,
1461 							len2;
1462 				struct varlena *arg1val;
1463 				struct varlena *arg2val;
1464 
1465 				len1 = toast_raw_datum_size(values1[i1]);
1466 				len2 = toast_raw_datum_size(values2[i2]);
1467 				arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1468 				arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1469 
1470 				cmpresult = memcmp(VARDATA_ANY(arg1val),
1471 								   VARDATA_ANY(arg2val),
1472 								   Min(len1, len2) - VARHDRSZ);
1473 				if ((cmpresult == 0) && (len1 != len2))
1474 					cmpresult = (len1 < len2) ? -1 : 1;
1475 
1476 				if ((Pointer) arg1val != (Pointer) values1[i1])
1477 					pfree(arg1val);
1478 				if ((Pointer) arg2val != (Pointer) values2[i2])
1479 					pfree(arg2val);
1480 			}
1481 			else
1482 				elog(ERROR, "unexpected attlen: %d", att1->attlen);
1483 
1484 			if (cmpresult < 0)
1485 			{
1486 				/* arg1 is less than arg2 */
1487 				result = -1;
1488 				break;
1489 			}
1490 			else if (cmpresult > 0)
1491 			{
1492 				/* arg1 is greater than arg2 */
1493 				result = 1;
1494 				break;
1495 			}
1496 		}
1497 
1498 		/* equal, so continue to next column */
1499 		i1++, i2++, j++;
1500 	}
1501 
1502 	/*
1503 	 * If we didn't break out of the loop early, check for column count
1504 	 * mismatch.  (We do not report such mismatch if we found unequal column
1505 	 * values; is that a feature or a bug?)
1506 	 */
1507 	if (result == 0)
1508 	{
1509 		if (i1 != ncolumns1 || i2 != ncolumns2)
1510 			ereport(ERROR,
1511 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1512 					 errmsg("cannot compare record types with different numbers of columns")));
1513 	}
1514 
1515 	pfree(values1);
1516 	pfree(nulls1);
1517 	pfree(values2);
1518 	pfree(nulls2);
1519 	ReleaseTupleDesc(tupdesc1);
1520 	ReleaseTupleDesc(tupdesc2);
1521 
1522 	/* Avoid leaking memory when handed toasted input. */
1523 	PG_FREE_IF_COPY(record1, 0);
1524 	PG_FREE_IF_COPY(record2, 1);
1525 
1526 	return result;
1527 }
1528 
1529 /*
1530  * record_image_eq :
1531  *		  compares two records for identical contents, based on byte images
1532  * result :
1533  *		  returns true if the records are identical, false otherwise.
1534  *
1535  * Note: we do not use record_image_cmp here, since we can avoid
1536  * de-toasting for unequal lengths this way.
1537  */
1538 Datum
record_image_eq(PG_FUNCTION_ARGS)1539 record_image_eq(PG_FUNCTION_ARGS)
1540 {
1541 	HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1542 	HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1543 	bool		result = true;
1544 	Oid			tupType1;
1545 	Oid			tupType2;
1546 	int32		tupTypmod1;
1547 	int32		tupTypmod2;
1548 	TupleDesc	tupdesc1;
1549 	TupleDesc	tupdesc2;
1550 	HeapTupleData tuple1;
1551 	HeapTupleData tuple2;
1552 	int			ncolumns1;
1553 	int			ncolumns2;
1554 	RecordCompareData *my_extra;
1555 	int			ncols;
1556 	Datum	   *values1;
1557 	Datum	   *values2;
1558 	bool	   *nulls1;
1559 	bool	   *nulls2;
1560 	int			i1;
1561 	int			i2;
1562 	int			j;
1563 
1564 	/* Extract type info from the tuples */
1565 	tupType1 = HeapTupleHeaderGetTypeId(record1);
1566 	tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1567 	tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1568 	ncolumns1 = tupdesc1->natts;
1569 	tupType2 = HeapTupleHeaderGetTypeId(record2);
1570 	tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1571 	tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1572 	ncolumns2 = tupdesc2->natts;
1573 
1574 	/* Build temporary HeapTuple control structures */
1575 	tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1576 	ItemPointerSetInvalid(&(tuple1.t_self));
1577 	tuple1.t_tableOid = InvalidOid;
1578 	tuple1.t_data = record1;
1579 	tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1580 	ItemPointerSetInvalid(&(tuple2.t_self));
1581 	tuple2.t_tableOid = InvalidOid;
1582 	tuple2.t_data = record2;
1583 
1584 	/*
1585 	 * We arrange to look up the needed comparison info just once per series
1586 	 * of calls, assuming the record types don't change underneath us.
1587 	 */
1588 	ncols = Max(ncolumns1, ncolumns2);
1589 	my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1590 	if (my_extra == NULL ||
1591 		my_extra->ncolumns < ncols)
1592 	{
1593 		fcinfo->flinfo->fn_extra =
1594 			MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1595 							   offsetof(RecordCompareData, columns) +
1596 							   ncols * sizeof(ColumnCompareData));
1597 		my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1598 		my_extra->ncolumns = ncols;
1599 		my_extra->record1_type = InvalidOid;
1600 		my_extra->record1_typmod = 0;
1601 		my_extra->record2_type = InvalidOid;
1602 		my_extra->record2_typmod = 0;
1603 	}
1604 
1605 	if (my_extra->record1_type != tupType1 ||
1606 		my_extra->record1_typmod != tupTypmod1 ||
1607 		my_extra->record2_type != tupType2 ||
1608 		my_extra->record2_typmod != tupTypmod2)
1609 	{
1610 		MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1611 		my_extra->record1_type = tupType1;
1612 		my_extra->record1_typmod = tupTypmod1;
1613 		my_extra->record2_type = tupType2;
1614 		my_extra->record2_typmod = tupTypmod2;
1615 	}
1616 
1617 	/* Break down the tuples into fields */
1618 	values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1619 	nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1620 	heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1621 	values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1622 	nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1623 	heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1624 
1625 	/*
1626 	 * Scan corresponding columns, allowing for dropped columns in different
1627 	 * places in the two rows.  i1 and i2 are physical column indexes, j is
1628 	 * the logical column index.
1629 	 */
1630 	i1 = i2 = j = 0;
1631 	while (i1 < ncolumns1 || i2 < ncolumns2)
1632 	{
1633 		Form_pg_attribute att1;
1634 		Form_pg_attribute att2;
1635 
1636 		/*
1637 		 * Skip dropped columns
1638 		 */
1639 		if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1640 		{
1641 			i1++;
1642 			continue;
1643 		}
1644 		if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1645 		{
1646 			i2++;
1647 			continue;
1648 		}
1649 		if (i1 >= ncolumns1 || i2 >= ncolumns2)
1650 			break;				/* we'll deal with mismatch below loop */
1651 
1652 		att1 = TupleDescAttr(tupdesc1, i1);
1653 		att2 = TupleDescAttr(tupdesc2, i2);
1654 
1655 		/*
1656 		 * Have two matching columns, they must be same type
1657 		 */
1658 		if (att1->atttypid != att2->atttypid)
1659 			ereport(ERROR,
1660 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1661 					 errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1662 							format_type_be(att1->atttypid),
1663 							format_type_be(att2->atttypid),
1664 							j + 1)));
1665 
1666 		/*
1667 		 * We consider two NULLs equal; NULL > not-NULL.
1668 		 */
1669 		if (!nulls1[i1] || !nulls2[i2])
1670 		{
1671 			if (nulls1[i1] || nulls2[i2])
1672 			{
1673 				result = false;
1674 				break;
1675 			}
1676 
1677 			/* Compare the pair of elements */
1678 			result = datum_image_eq(values1[i1], values2[i2], att1->attbyval, att2->attlen);
1679 			if (!result)
1680 				break;
1681 		}
1682 
1683 		/* equal, so continue to next column */
1684 		i1++, i2++, j++;
1685 	}
1686 
1687 	/*
1688 	 * If we didn't break out of the loop early, check for column count
1689 	 * mismatch.  (We do not report such mismatch if we found unequal column
1690 	 * values; is that a feature or a bug?)
1691 	 */
1692 	if (result)
1693 	{
1694 		if (i1 != ncolumns1 || i2 != ncolumns2)
1695 			ereport(ERROR,
1696 					(errcode(ERRCODE_DATATYPE_MISMATCH),
1697 					 errmsg("cannot compare record types with different numbers of columns")));
1698 	}
1699 
1700 	pfree(values1);
1701 	pfree(nulls1);
1702 	pfree(values2);
1703 	pfree(nulls2);
1704 	ReleaseTupleDesc(tupdesc1);
1705 	ReleaseTupleDesc(tupdesc2);
1706 
1707 	/* Avoid leaking memory when handed toasted input. */
1708 	PG_FREE_IF_COPY(record1, 0);
1709 	PG_FREE_IF_COPY(record2, 1);
1710 
1711 	PG_RETURN_BOOL(result);
1712 }
1713 
1714 Datum
record_image_ne(PG_FUNCTION_ARGS)1715 record_image_ne(PG_FUNCTION_ARGS)
1716 {
1717 	PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1718 }
1719 
1720 Datum
record_image_lt(PG_FUNCTION_ARGS)1721 record_image_lt(PG_FUNCTION_ARGS)
1722 {
1723 	PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1724 }
1725 
1726 Datum
record_image_gt(PG_FUNCTION_ARGS)1727 record_image_gt(PG_FUNCTION_ARGS)
1728 {
1729 	PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1730 }
1731 
1732 Datum
record_image_le(PG_FUNCTION_ARGS)1733 record_image_le(PG_FUNCTION_ARGS)
1734 {
1735 	PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1736 }
1737 
1738 Datum
record_image_ge(PG_FUNCTION_ARGS)1739 record_image_ge(PG_FUNCTION_ARGS)
1740 {
1741 	PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1742 }
1743 
1744 Datum
btrecordimagecmp(PG_FUNCTION_ARGS)1745 btrecordimagecmp(PG_FUNCTION_ARGS)
1746 {
1747 	PG_RETURN_INT32(record_image_cmp(fcinfo));
1748 }
1749