1 /*
2 * This file and its contents are licensed under the Apache License 2.0.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-APACHE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <fmgr.h>
8 #include <access/htup_details.h>
9 #include <catalog/namespace.h>
10 #include <catalog/pg_type.h>
11 #include <libpq/pqformat.h>
12 #include <lib/stringinfo.h>
13 #include <nodes/value.h>
14 #include <utils/datum.h>
15 #include <utils/lsyscache.h>
16 #include <utils/syscache.h>
17
18 #include "export.h"
19
20 /* bookend aggregates first and last:
21 * first(value, cmp) returns the value for the row with the smallest cmp element.
22 * last(value, cmp) returns the value for the row with the biggest cmp element.
23 *
24 * Usage:
25 * SELECT first(metric, time), last(metric, time) FROM metric GROUP BY hostname.
26 */
27
28 TS_FUNCTION_INFO_V1(ts_first_sfunc);
29 TS_FUNCTION_INFO_V1(ts_first_combinefunc);
30 TS_FUNCTION_INFO_V1(ts_last_sfunc);
31 TS_FUNCTION_INFO_V1(ts_last_combinefunc);
32 TS_FUNCTION_INFO_V1(ts_bookend_finalfunc);
33 TS_FUNCTION_INFO_V1(ts_bookend_serializefunc);
34 TS_FUNCTION_INFO_V1(ts_bookend_deserializefunc);
35
36 /* A PolyDatum represents a polymorphic datum */
37 typedef struct PolyDatum
38 {
39 Oid type_oid;
40 bool is_null;
41 Datum datum;
42 } PolyDatum;
43
44 /* PolyDatumIOState is internal state used by polydatum_serialize and polydatum_deserialize */
45 typedef struct PolyDatumIOState
46 {
47 Oid type_oid;
48 FmgrInfo proc;
49 Oid typeioparam;
50 } PolyDatumIOState;
51
52 static PolyDatum
polydatum_from_arg(int argno,FunctionCallInfo fcinfo)53 polydatum_from_arg(int argno, FunctionCallInfo fcinfo)
54 {
55 PolyDatum value;
56
57 value.type_oid = get_fn_expr_argtype(fcinfo->flinfo, argno);
58 value.is_null = PG_ARGISNULL(argno);
59 if (!value.is_null)
60 value.datum = PG_GETARG_DATUM(argno);
61 else
62 value.datum = PointerGetDatum(NULL);
63 return value;
64 }
65
66 /* Serialize type as namespace name string + type name string.
67 * Don't simple send Oid since this state may be needed across pg_dumps.
68 */
69 static void
polydatum_serialize_type(StringInfo buf,Oid type_oid)70 polydatum_serialize_type(StringInfo buf, Oid type_oid)
71 {
72 HeapTuple tup;
73 Form_pg_type type_tuple;
74 char *namespace_name;
75
76 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid));
77 if (!HeapTupleIsValid(tup))
78 elog(ERROR, "cache lookup failed for type %u", type_oid);
79 type_tuple = (Form_pg_type) GETSTRUCT(tup);
80 namespace_name = get_namespace_name(type_tuple->typnamespace);
81
82 /* send qualified type name */
83 pq_sendstring(buf, namespace_name);
84 pq_sendstring(buf, NameStr(type_tuple->typname));
85
86 ReleaseSysCache(tup);
87 }
88
89 /* serializes the polydatum pd unto buf */
90 static void
polydatum_serialize(PolyDatum * pd,StringInfo buf,PolyDatumIOState * state,FunctionCallInfo fcinfo)91 polydatum_serialize(PolyDatum *pd, StringInfo buf, PolyDatumIOState *state, FunctionCallInfo fcinfo)
92 {
93 bytea *outputbytes;
94
95 polydatum_serialize_type(buf, pd->type_oid);
96
97 if (pd->is_null)
98 {
99 /* emit -1 data length to signify a NULL */
100 pq_sendint32(buf, -1);
101 return;
102 }
103
104 if (state->type_oid != pd->type_oid)
105 {
106 Oid func;
107 bool is_varlena;
108
109 getTypeBinaryOutputInfo(pd->type_oid, &func, &is_varlena);
110 fmgr_info_cxt(func, &state->proc, fcinfo->flinfo->fn_mcxt);
111 state->type_oid = pd->type_oid;
112 }
113 outputbytes = SendFunctionCall(&state->proc, pd->datum);
114 pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
115 pq_sendbytes(buf, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ);
116 }
117
118 static Oid
polydatum_deserialize_type(StringInfo buf)119 polydatum_deserialize_type(StringInfo buf)
120 {
121 const char *schema_name = pq_getmsgstring(buf);
122 const char *type_name = pq_getmsgstring(buf);
123 Oid schema_oid = LookupExplicitNamespace(schema_name, false);
124 Oid type_oid = GetSysCacheOid2(TYPENAMENSP,
125 Anum_pg_type_oid,
126 PointerGetDatum(type_name),
127 ObjectIdGetDatum(schema_oid));
128 if (!OidIsValid(type_oid))
129 elog(ERROR, "cache lookup failed for type %s.%s", schema_name, type_name);
130
131 return type_oid;
132 }
133
134 /*
135 * Deserialize the PolyDatum where the binary representation is in buf.
136 * If a not-null PolyDatum is passed in, fill in it's fields, otherwise palloc.
137 *
138 */
139 static PolyDatum *
polydatum_deserialize(PolyDatum * result,StringInfo buf,PolyDatumIOState * state,FunctionCallInfo fcinfo)140 polydatum_deserialize(PolyDatum *result, StringInfo buf, PolyDatumIOState *state,
141 FunctionCallInfo fcinfo)
142 {
143 int itemlen;
144 StringInfoData item_buf;
145 StringInfo bufptr;
146 char csave;
147
148 if (NULL == result)
149 {
150 result = palloc(sizeof(PolyDatum));
151 }
152
153 result->type_oid = polydatum_deserialize_type(buf);
154
155 /* Following is copied/adapted from record_recv in core postgres */
156
157 /* Get and check the item length */
158 itemlen = pq_getmsgint(buf, 4);
159 if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
160 ereport(ERROR,
161 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
162 errmsg("insufficient data left in message %d %d", itemlen, buf->len)));
163
164 if (itemlen == -1)
165 {
166 /* -1 length means NULL */
167 result->is_null = true;
168 bufptr = NULL;
169 csave = 0;
170 }
171 else
172 {
173 /*
174 * Rather than copying data around, we just set up a phony StringInfo
175 * pointing to the correct portion of the input buffer. We assume we
176 * can scribble on the input buffer so as to maintain the convention
177 * that StringInfos have a trailing null.
178 */
179 item_buf.data = &buf->data[buf->cursor];
180 item_buf.maxlen = itemlen + 1;
181 item_buf.len = itemlen;
182 item_buf.cursor = 0;
183
184 buf->cursor += itemlen;
185
186 csave = buf->data[buf->cursor];
187 buf->data[buf->cursor] = '\0';
188
189 bufptr = &item_buf;
190 result->is_null = false;
191 }
192
193 /* Now call the column's receiveproc */
194 if (state->type_oid != result->type_oid)
195 {
196 Oid func;
197
198 getTypeBinaryInputInfo(result->type_oid, &func, &state->typeioparam);
199 fmgr_info_cxt(func, &state->proc, fcinfo->flinfo->fn_mcxt);
200 state->type_oid = result->type_oid;
201 }
202
203 result->datum = ReceiveFunctionCall(&state->proc, bufptr, state->typeioparam, -1);
204
205 if (bufptr)
206 {
207 /* Trouble if it didn't eat the whole buffer */
208 if (item_buf.cursor != itemlen)
209 ereport(ERROR,
210 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
211 errmsg("improper binary format in polydata")));
212
213 buf->data[buf->cursor] = csave;
214 }
215 return result;
216 }
217
218 /* Internal state for bookend aggregates */
219 typedef struct InternalCmpAggStore
220 {
221 PolyDatum value;
222 PolyDatum cmp; /* the comparison element. e.g. time */
223 } InternalCmpAggStore;
224
225 /* State used to cache data for serialize/deserialize operations */
226 typedef struct InternalCmpAggStoreIOState
227 {
228 PolyDatumIOState value;
229 PolyDatumIOState cmp; /* the comparison element. e.g. time */
230 } InternalCmpAggStoreIOState;
231
232 typedef struct TypeInfoCache
233 {
234 Oid type_oid;
235 int16 typelen;
236 bool typebyval;
237 } TypeInfoCache;
238
239 inline static void
typeinfocache_init(TypeInfoCache * tic)240 typeinfocache_init(TypeInfoCache *tic)
241 {
242 tic->type_oid = InvalidOid;
243 }
244
245 inline static void
typeinfocache_polydatumcopy(TypeInfoCache * tic,PolyDatum input,PolyDatum * output)246 typeinfocache_polydatumcopy(TypeInfoCache *tic, PolyDatum input, PolyDatum *output)
247 {
248 if (tic->type_oid != input.type_oid)
249 {
250 tic->type_oid = input.type_oid;
251 get_typlenbyval(tic->type_oid, &tic->typelen, &tic->typebyval);
252 }
253 *output = input;
254 if (!input.is_null)
255 {
256 output->datum = datumCopy(input.datum, tic->typebyval, tic->typelen);
257 output->is_null = false;
258 }
259 else
260 {
261 output->datum = PointerGetDatum(NULL);
262 output->is_null = true;
263 }
264 }
265
266 typedef struct CmpFuncCache
267 {
268 Oid cmp_type;
269 char op;
270 FmgrInfo proc;
271 } CmpFuncCache;
272
273 inline static void
cmpfunccache_init(CmpFuncCache * cache)274 cmpfunccache_init(CmpFuncCache *cache)
275 {
276 cache->cmp_type = InvalidOid;
277 }
278
279 inline static bool
cmpfunccache_cmp(CmpFuncCache * cache,FunctionCallInfo fcinfo,char * opname,PolyDatum left,PolyDatum right)280 cmpfunccache_cmp(CmpFuncCache *cache, FunctionCallInfo fcinfo, char *opname, PolyDatum left,
281 PolyDatum right)
282 {
283 Assert(left.type_oid == right.type_oid);
284 Assert(opname[1] == '\0');
285
286 if (cache->cmp_type != left.type_oid || cache->op != opname[0])
287 {
288 Oid cmp_op, cmp_regproc;
289
290 if (!OidIsValid(left.type_oid))
291 elog(ERROR, "could not determine the type of the comparison_element");
292 cmp_op = OpernameGetOprid(list_make1(makeString(opname)), left.type_oid, left.type_oid);
293 if (!OidIsValid(cmp_op))
294 elog(ERROR, "could not find a %s operator for type %d", opname, left.type_oid);
295 cmp_regproc = get_opcode(cmp_op);
296 if (!OidIsValid(cmp_regproc))
297 elog(ERROR,
298 "could not find the procedure for the %s operator for type %d",
299 opname,
300 left.type_oid);
301 fmgr_info_cxt(cmp_regproc, &cache->proc, fcinfo->flinfo->fn_mcxt);
302 }
303 return DatumGetBool(
304 FunctionCall2Coll(&cache->proc, fcinfo->fncollation, left.datum, right.datum));
305 }
306
307 typedef struct TransCache
308 {
309 TypeInfoCache value_type_cache;
310 TypeInfoCache cmp_type_cache;
311 CmpFuncCache cmp_func_cache;
312 } TransCache;
313
314 static TransCache *
transcache_get(FunctionCallInfo fcinfo)315 transcache_get(FunctionCallInfo fcinfo)
316 {
317 TransCache *my_extra = (TransCache *) fcinfo->flinfo->fn_extra;
318
319 if (my_extra == NULL)
320 {
321 fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, sizeof(TransCache));
322 my_extra = (TransCache *) fcinfo->flinfo->fn_extra;
323 typeinfocache_init(&my_extra->value_type_cache);
324 typeinfocache_init(&my_extra->cmp_type_cache);
325 cmpfunccache_init(&my_extra->cmp_func_cache);
326 }
327 return my_extra;
328 }
329
330 /*
331 * bookend_sfunc - internal function called by ts_last_sfunc and ts_first_sfunc;
332 */
333 static inline Datum
bookend_sfunc(MemoryContext aggcontext,InternalCmpAggStore * state,PolyDatum value,PolyDatum cmp,char * opname,FunctionCallInfo fcinfo)334 bookend_sfunc(MemoryContext aggcontext, InternalCmpAggStore *state, PolyDatum value, PolyDatum cmp,
335 char *opname, FunctionCallInfo fcinfo)
336 {
337 MemoryContext old_context;
338 TransCache *cache = transcache_get(fcinfo);
339
340 old_context = MemoryContextSwitchTo(aggcontext);
341
342 if (state == NULL)
343 {
344 state = (InternalCmpAggStore *) MemoryContextAlloc(aggcontext, sizeof(InternalCmpAggStore));
345 typeinfocache_polydatumcopy(&cache->value_type_cache, value, &state->value);
346 typeinfocache_polydatumcopy(&cache->cmp_type_cache, cmp, &state->cmp);
347 }
348 else
349 {
350 /* only do comparison if cmp is not NULL */
351 if (!cmp.is_null &&
352 cmpfunccache_cmp(&cache->cmp_func_cache, fcinfo, opname, cmp, state->cmp))
353 {
354 typeinfocache_polydatumcopy(&cache->value_type_cache, value, &state->value);
355 typeinfocache_polydatumcopy(&cache->cmp_type_cache, cmp, &state->cmp);
356 }
357 }
358 MemoryContextSwitchTo(old_context);
359
360 PG_RETURN_POINTER(state);
361 }
362
363 /* bookend_combinefunc - internal function called by ts_last_combinefunc and ts_first_combinefunc;
364 * fmgr args are: (internal internal_state, internal2 internal_state)
365 */
366 static inline Datum
bookend_combinefunc(MemoryContext aggcontext,InternalCmpAggStore * state1,InternalCmpAggStore * state2,char * opname,FunctionCallInfo fcinfo)367 bookend_combinefunc(MemoryContext aggcontext, InternalCmpAggStore *state1,
368 InternalCmpAggStore *state2, char *opname, FunctionCallInfo fcinfo)
369 {
370 MemoryContext old_context;
371 TransCache *cache;
372
373 if (state2 == NULL)
374 PG_RETURN_POINTER(state1);
375
376 cache = transcache_get(fcinfo);
377
378 /*
379 * manually copy all fields from state2 to state1, as per other combine
380 * func like int8_avg_combine
381 */
382 if (state1 == NULL)
383 {
384 old_context = MemoryContextSwitchTo(aggcontext);
385
386 state1 =
387 (InternalCmpAggStore *) MemoryContextAlloc(aggcontext, sizeof(InternalCmpAggStore));
388 typeinfocache_polydatumcopy(&cache->value_type_cache, state2->value, &state1->value);
389 typeinfocache_polydatumcopy(&cache->cmp_type_cache, state2->cmp, &state1->cmp);
390
391 MemoryContextSwitchTo(old_context);
392 PG_RETURN_POINTER(state1);
393 }
394
395 if (state1->cmp.is_null && state2->cmp.is_null)
396 {
397 PG_RETURN_POINTER(state1);
398 }
399 else if (state1->cmp.is_null != state2->cmp.is_null)
400 {
401 if (state1->cmp.is_null)
402 PG_RETURN_POINTER(state2);
403 else
404 PG_RETURN_POINTER(state1);
405 }
406 else if (cmpfunccache_cmp(&cache->cmp_func_cache, fcinfo, opname, state2->cmp, state1->cmp))
407 {
408 old_context = MemoryContextSwitchTo(aggcontext);
409 typeinfocache_polydatumcopy(&cache->value_type_cache, state2->value, &state1->value);
410 typeinfocache_polydatumcopy(&cache->cmp_type_cache, state2->cmp, &state1->cmp);
411 MemoryContextSwitchTo(old_context);
412 }
413
414 PG_RETURN_POINTER(state1);
415 }
416
417 /* first(internal internal_state, anyelement value, "any" comparison_element) */
418 Datum
ts_first_sfunc(PG_FUNCTION_ARGS)419 ts_first_sfunc(PG_FUNCTION_ARGS)
420 {
421 InternalCmpAggStore *store =
422 PG_ARGISNULL(0) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(0);
423 PolyDatum value = polydatum_from_arg(1, fcinfo);
424 PolyDatum cmp = polydatum_from_arg(2, fcinfo);
425 MemoryContext aggcontext;
426
427 if (!AggCheckCallContext(fcinfo, &aggcontext))
428 {
429 /* cannot be called directly because of internal-type argument */
430 elog(ERROR, "first_sfun called in non-aggregate context");
431 }
432
433 return bookend_sfunc(aggcontext, store, value, cmp, "<", fcinfo);
434 }
435
436 /* last(internal internal_state, anyelement value, "any" comparison_element) */
437 Datum
ts_last_sfunc(PG_FUNCTION_ARGS)438 ts_last_sfunc(PG_FUNCTION_ARGS)
439 {
440 InternalCmpAggStore *store =
441 PG_ARGISNULL(0) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(0);
442 PolyDatum value = polydatum_from_arg(1, fcinfo);
443 PolyDatum cmp = polydatum_from_arg(2, fcinfo);
444 MemoryContext aggcontext;
445
446 if (!AggCheckCallContext(fcinfo, &aggcontext))
447 {
448 /* cannot be called directly because of internal-type argument */
449 elog(ERROR, "last_sfun called in non-aggregate context");
450 }
451
452 return bookend_sfunc(aggcontext, store, value, cmp, ">", fcinfo);
453 }
454
455 /* first_combinerfunc(internal, internal) => internal */
456 Datum
ts_first_combinefunc(PG_FUNCTION_ARGS)457 ts_first_combinefunc(PG_FUNCTION_ARGS)
458 {
459 MemoryContext aggcontext;
460 InternalCmpAggStore *state1 =
461 PG_ARGISNULL(0) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(0);
462 InternalCmpAggStore *state2 =
463 PG_ARGISNULL(1) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(1);
464
465 if (!AggCheckCallContext(fcinfo, &aggcontext))
466 {
467 /* cannot be called directly because of internal-type argument */
468 elog(ERROR, "ts_first_combinefunc called in non-aggregate context");
469 }
470 return bookend_combinefunc(aggcontext, state1, state2, "<", fcinfo);
471 }
472
473 /* last_combinerfunc(internal, internal) => internal */
474 Datum
ts_last_combinefunc(PG_FUNCTION_ARGS)475 ts_last_combinefunc(PG_FUNCTION_ARGS)
476 {
477 MemoryContext aggcontext;
478 InternalCmpAggStore *state1 =
479 PG_ARGISNULL(0) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(0);
480 InternalCmpAggStore *state2 =
481 PG_ARGISNULL(1) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(1);
482
483 if (!AggCheckCallContext(fcinfo, &aggcontext))
484 {
485 /* cannot be called directly because of internal-type argument */
486 elog(ERROR, "ts_last_combinefunc called in non-aggregate context");
487 }
488 return bookend_combinefunc(aggcontext, state1, state2, ">", fcinfo);
489 }
490
491 /* ts_bookend_serializefunc(internal) => bytea */
492 Datum
ts_bookend_serializefunc(PG_FUNCTION_ARGS)493 ts_bookend_serializefunc(PG_FUNCTION_ARGS)
494 {
495 StringInfoData buf;
496 InternalCmpAggStoreIOState *my_extra;
497 InternalCmpAggStore *state;
498
499 Assert(!PG_ARGISNULL(0));
500 state = (InternalCmpAggStore *) PG_GETARG_POINTER(0);
501
502 my_extra = (InternalCmpAggStoreIOState *) fcinfo->flinfo->fn_extra;
503 if (my_extra == NULL)
504 {
505 fcinfo->flinfo->fn_extra =
506 MemoryContextAllocZero(fcinfo->flinfo->fn_mcxt, sizeof(InternalCmpAggStoreIOState));
507 my_extra = (InternalCmpAggStoreIOState *) fcinfo->flinfo->fn_extra;
508 }
509 pq_begintypsend(&buf);
510 polydatum_serialize(&state->value, &buf, &my_extra->value, fcinfo);
511 polydatum_serialize(&state->cmp, &buf, &my_extra->cmp, fcinfo);
512 PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
513 }
514
515 /* ts_bookend_deserializefunc(bytea, internal) => internal */
516 Datum
ts_bookend_deserializefunc(PG_FUNCTION_ARGS)517 ts_bookend_deserializefunc(PG_FUNCTION_ARGS)
518 {
519 bytea *sstate;
520 StringInfoData buf;
521 InternalCmpAggStore *result;
522 InternalCmpAggStoreIOState *my_extra;
523
524 if (!AggCheckCallContext(fcinfo, NULL))
525 elog(ERROR, "aggregate function called in non-aggregate context");
526
527 sstate = PG_GETARG_BYTEA_P(0);
528
529 /*
530 * Copy the bytea into a StringInfo so that we can "receive" it using the
531 * standard recv-function infrastructure.
532 */
533 initStringInfo(&buf);
534 appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
535
536 my_extra = (InternalCmpAggStoreIOState *) fcinfo->flinfo->fn_extra;
537 if (my_extra == NULL)
538 {
539 fcinfo->flinfo->fn_extra =
540 MemoryContextAllocZero(fcinfo->flinfo->fn_mcxt, sizeof(InternalCmpAggStoreIOState));
541 my_extra = (InternalCmpAggStoreIOState *) fcinfo->flinfo->fn_extra;
542 }
543
544 result = palloc(sizeof(InternalCmpAggStore));
545 polydatum_deserialize(&result->value, &buf, &my_extra->value, fcinfo);
546 polydatum_deserialize(&result->cmp, &buf, &my_extra->cmp, fcinfo);
547 PG_RETURN_POINTER(result);
548 }
549
550 /* ts_bookend_finalfunc(internal, anyelement, "any") => anyelement */
551 Datum
ts_bookend_finalfunc(PG_FUNCTION_ARGS)552 ts_bookend_finalfunc(PG_FUNCTION_ARGS)
553 {
554 InternalCmpAggStore *state;
555
556 if (!AggCheckCallContext(fcinfo, NULL))
557 {
558 /* cannot be called directly because of internal-type argument */
559 elog(ERROR, "ts_bookend_finalfunc called in non-aggregate context");
560 }
561
562 state = PG_ARGISNULL(0) ? NULL : (InternalCmpAggStore *) PG_GETARG_POINTER(0);
563
564 if (state == NULL || state->value.is_null || state->cmp.is_null)
565 PG_RETURN_NULL();
566
567 PG_RETURN_DATUM(state->value.datum);
568 }
569