1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6
7 #include <postgres.h>
8 #include <catalog/pg_type.h>
9 #include <executor/spi.h>
10 #include <fmgr.h>
11 #include <commands/dbcommands.h>
12 #include <commands/trigger.h>
13 #include <lib/stringinfo.h>
14 #include <miscadmin.h>
15 #include <utils/hsearch.h>
16 #include <access/tupconvert.h>
17 #include <storage/lmgr.h>
18 #include <utils/builtins.h>
19 #include <utils/rel.h>
20 #include <utils/relcache.h>
21 #include <access/xact.h>
22
23 #include <scanner.h>
24 #include <continuous_agg.h>
25
26 #include "compat/compat.h"
27
28 #include "catalog.h"
29 #include "chunk.h"
30 #include "dimension.h"
31 #include "hypertable.h"
32 #include "hypertable_cache.h"
33 #include "invalidation.h"
34 #include "export.h"
35 #include "partitioning.h"
36 #include "utils.h"
37 #include "time_bucket.h"
38
39 #include "continuous_aggs/insert.h"
40
41 /*
42 * When tuples in a hypertable that has a continuous aggregate are modified, the
43 * lowest modified value and the greatest modified value must be tracked over
44 * the course of a transaction or statement. At the end of the statement these
45 * values will be inserted into the proper cache invalidation log table for
46 * their associated hypertable if they are below the speculative materialization
47 * watermark (or, if in REPEATABLE_READ isolation level or higher, they will be
48 * inserted no matter what as we cannot see if a materialization transaction has
49 * started and moved the watermark during our transaction in that case).
50 *
51 * We accomplish this at the transaction level by keeping a hash table of each
52 * hypertable that has been modified in the transaction and the lowest and
53 * greatest modified values. The hashtable will be updated via a trigger that
54 * will be called for every row that is inserted, updated or deleted. We use a
55 * hashtable because we need to keep track of this on a per hypertable basis and
56 * multiple can have tuples modified during a single transaction. (And if we
57 * move to per-chunk cache-invalidation it makes it even easier).
58 *
59 */
60 typedef struct ContinuousAggsCacheInvalEntry
61 {
62 int32 hypertable_id;
63 Oid hypertable_relid;
64 int32 entry_id; /*
65 * This is what actually gets written to the hypertable log. It can be the same
66 * as the hypertable_id for normal hypertables. In the distributed case it is
67 * the ID of the parent hypertable in the Access Node.
68 */
69 Dimension hypertable_open_dimension;
70 Oid previous_chunk_relid;
71 AttrNumber previous_chunk_open_dimension;
72 bool value_is_set;
73 int64 lowest_modified_value;
74 int64 greatest_modified_value;
75 } ContinuousAggsCacheInvalEntry;
76
77 static int64 get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid);
78
79 #define CA_CACHE_INVAL_INIT_HTAB_SIZE 64
80
81 static HTAB *continuous_aggs_cache_inval_htab = NULL;
82 static MemoryContext continuous_aggs_trigger_mctx = NULL;
83
84 void _continuous_aggs_cache_inval_init(void);
85 void _continuous_aggs_cache_inval_fini(void);
86
87 static void
cache_inval_init()88 cache_inval_init()
89 {
90 HASHCTL ctl;
91
92 Assert(continuous_aggs_trigger_mctx == NULL);
93
94 continuous_aggs_trigger_mctx = AllocSetContextCreate(TopTransactionContext,
95 "ContinuousAggsTriggerCtx",
96 ALLOCSET_DEFAULT_SIZES);
97
98 memset(&ctl, 0, sizeof(ctl));
99 ctl.keysize = sizeof(int32);
100 ctl.entrysize = sizeof(ContinuousAggsCacheInvalEntry);
101 ctl.hcxt = continuous_aggs_trigger_mctx;
102
103 continuous_aggs_cache_inval_htab = hash_create("TS Continuous Aggs Cache Inval",
104 CA_CACHE_INVAL_INIT_HTAB_SIZE,
105 &ctl,
106 HASH_ELEM | HASH_BLOBS);
107 };
108
109 static int64
tuple_get_time(Dimension * d,HeapTuple tuple,AttrNumber col,TupleDesc tupdesc)110 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc)
111 {
112 Datum datum;
113 bool isnull;
114 Oid dimtype;
115
116 datum = heap_getattr(tuple, col, tupdesc, &isnull);
117
118 if (NULL != d->partitioning)
119 {
120 Oid collation = TupleDescAttr(tupdesc, col)->attcollation;
121 datum = ts_partitioning_func_apply(d->partitioning, collation, datum);
122 }
123
124 Assert(d->type == DIMENSION_TYPE_OPEN);
125
126 dimtype = ts_dimension_get_partition_type(d);
127
128 if (isnull)
129 ereport(ERROR,
130 (errcode(ERRCODE_NOT_NULL_VIOLATION),
131 errmsg("NULL value in column \"%s\" violates not-null constraint",
132 NameStr(d->fd.column_name)),
133 errhint("Columns used for time partitioning cannot be NULL")));
134
135 return ts_time_value_to_internal(datum, dimtype);
136 }
137
138 static inline void
cache_inval_entry_init(ContinuousAggsCacheInvalEntry * cache_entry,int32 hypertable_id,int32 entry_id)139 cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id,
140 int32 entry_id)
141 {
142 Cache *ht_cache = ts_hypertable_cache_pin();
143 /* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the
144 * hypertable_relid directly from the Chunk*/
145 Hypertable *ht = ts_hypertable_cache_get_entry_by_id(ht_cache, hypertable_id);
146 cache_entry->hypertable_id = hypertable_id;
147 cache_entry->entry_id = entry_id;
148 cache_entry->hypertable_relid = ht->main_table_relid;
149 cache_entry->hypertable_open_dimension = *hyperspace_get_open_dimension(ht->space, 0);
150 if (cache_entry->hypertable_open_dimension.partitioning != NULL)
151 {
152 PartitioningInfo *open_dim_part_info =
153 MemoryContextAllocZero(continuous_aggs_trigger_mctx, sizeof(*open_dim_part_info));
154 *open_dim_part_info = *cache_entry->hypertable_open_dimension.partitioning;
155 cache_entry->hypertable_open_dimension.partitioning = open_dim_part_info;
156 }
157 cache_entry->previous_chunk_relid = InvalidOid;
158 cache_entry->value_is_set = false;
159 cache_entry->lowest_modified_value = INVAL_POS_INFINITY;
160 cache_entry->greatest_modified_value = INVAL_NEG_INFINITY;
161 ts_cache_release(ht_cache);
162 }
163
164 static inline void
cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry * cache_entry,Oid chunk_id,Relation chunk_relation)165 cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry, Oid chunk_id,
166 Relation chunk_relation)
167 {
168 Chunk *modified_tuple_chunk = ts_chunk_get_by_relid(chunk_id, false);
169 if (modified_tuple_chunk == NULL)
170 elog(ERROR, "continuous agg trigger function must be called on hypertable chunks only");
171
172 cache_entry->previous_chunk_relid = modified_tuple_chunk->table_id;
173 cache_entry->previous_chunk_open_dimension =
174 get_attnum(chunk_relation->rd_id,
175 NameStr(cache_entry->hypertable_open_dimension.fd.column_name));
176
177 if (cache_entry->previous_chunk_open_dimension == InvalidAttrNumber)
178 elog(ERROR, "continuous agg trigger function must be called on hypertable chunks only");
179 }
180
181 static inline void
update_cache_entry(ContinuousAggsCacheInvalEntry * cache_entry,int64 timeval)182 update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval)
183 {
184 cache_entry->value_is_set = true;
185 if (timeval < cache_entry->lowest_modified_value)
186 cache_entry->lowest_modified_value = timeval;
187 if (timeval > cache_entry->greatest_modified_value)
188 cache_entry->greatest_modified_value = timeval;
189 }
190
191 /*
192 * Trigger to store what the max/min updated values are for a function.
193 * This is used by continuous aggregates to ensure that the aggregated values
194 * are updated correctly. Upon creating a continuous aggregate for a hypertable,
195 * this trigger should be registered, if it does not already exist.
196 */
197 Datum
continuous_agg_trigfn(PG_FUNCTION_ARGS)198 continuous_agg_trigfn(PG_FUNCTION_ARGS)
199 {
200 /*
201 * Use TriggerData to determine which row to return/work with, in the case
202 * of updates, we'll need to call the functions twice, once with the old
203 * rows (which act like deletes) and once with the new rows.
204 */
205 TriggerData *trigdata = (TriggerData *) fcinfo->context;
206 char *hypertable_id_str, *parent_hypertable_id_str;
207 int32 hypertable_id, parent_hypertable_id = 0;
208 bool is_distributed_hypertable_trigger = false;
209 if (trigdata->tg_trigger->tgnargs < 0)
210 elog(ERROR, "must supply hypertable id");
211
212 hypertable_id_str = trigdata->tg_trigger->tgargs[0];
213 hypertable_id = atol(hypertable_id_str);
214
215 if (trigdata->tg_trigger->tgnargs > 1)
216 {
217 parent_hypertable_id_str = trigdata->tg_trigger->tgargs[1];
218 parent_hypertable_id = atol(parent_hypertable_id_str);
219 is_distributed_hypertable_trigger = true;
220 }
221
222 if (!CALLED_AS_TRIGGER(fcinfo))
223 elog(ERROR, "continuous agg trigger function must be called by trigger manager");
224 if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
225 elog(ERROR, "continuous agg trigger function must be called in per row after trigger");
226 execute_cagg_trigger(hypertable_id,
227 trigdata->tg_relation,
228 trigdata->tg_trigtuple,
229 trigdata->tg_newtuple,
230 TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event),
231 is_distributed_hypertable_trigger,
232 parent_hypertable_id);
233 if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
234 return PointerGetDatum(trigdata->tg_trigtuple);
235 else
236 return PointerGetDatum(trigdata->tg_newtuple);
237 }
238
239 /*
240 * chunk_tuple is the tuple from trigdata->tg_trigtuple
241 * i.e. the one being/inserts/deleted/updated.
242 * (for updates: this is the row before modification)
243 * chunk_newtuple is the tuple from trigdata->tg_newtuple.
244 */
245 void
execute_cagg_trigger(int32 hypertable_id,Relation chunk_rel,HeapTuple chunk_tuple,HeapTuple chunk_newtuple,bool update,bool is_distributed_hypertable_trigger,int32 parent_hypertable_id)246 execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
247 HeapTuple chunk_newtuple, bool update, bool is_distributed_hypertable_trigger,
248 int32 parent_hypertable_id)
249 {
250 ContinuousAggsCacheInvalEntry *cache_entry;
251 bool found;
252 int64 timeval;
253 Oid chunk_relid = chunk_rel->rd_id;
254 /* On first call, init the mctx and hash table */
255 if (!continuous_aggs_cache_inval_htab)
256 cache_inval_init();
257
258 cache_entry = (ContinuousAggsCacheInvalEntry *)
259 hash_search(continuous_aggs_cache_inval_htab, &hypertable_id, HASH_ENTER, &found);
260
261 if (!found)
262 cache_inval_entry_init(cache_entry,
263 hypertable_id,
264 is_distributed_hypertable_trigger ? parent_hypertable_id :
265 hypertable_id);
266
267 /* handle the case where we need to repopulate the cached chunk data */
268 if (cache_entry->previous_chunk_relid != chunk_relid)
269 cache_entry_switch_to_chunk(cache_entry, chunk_relid, chunk_rel);
270
271 timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
272 chunk_tuple,
273 cache_entry->previous_chunk_open_dimension,
274 RelationGetDescr(chunk_rel));
275
276 update_cache_entry(cache_entry, timeval);
277
278 if (!update)
279 return;
280
281 /* on update we need to invalidate the new time value as well as the old one */
282 timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
283 chunk_newtuple,
284 cache_entry->previous_chunk_open_dimension,
285 RelationGetDescr(chunk_rel));
286
287 update_cache_entry(cache_entry, timeval);
288 }
289
290 static void
cache_inval_entry_write(ContinuousAggsCacheInvalEntry * entry)291 cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry)
292 {
293 int64 liv;
294
295 if (!entry->value_is_set)
296 return;
297
298 Cache *ht_cache = ts_hypertable_cache_pin();
299 Hypertable *ht = ts_hypertable_cache_get_entry_by_id(ht_cache, entry->hypertable_id);
300 bool is_distributed_member = hypertable_is_distributed_member(ht);
301 ts_cache_release(ht_cache);
302
303 /* The materialization worker uses a READ COMMITTED isolation level by default. Therefore, if we
304 * use a stronger isolation level, the isolation thereshold could update without us seeing the
305 * new value. In order to prevent serialization errors, we always append invalidation entries in
306 * the case when we're using a strong enough isolation level that we won't see the new
307 * threshold. The same applies for distributed member invalidation triggers of hypertables.
308 * The materializer can handle invalidations that are beyond the threshold gracefully.
309 */
310 if (IsolationUsesXactSnapshot() || is_distributed_member)
311 {
312 invalidation_hyper_log_add_entry(entry->entry_id,
313 entry->lowest_modified_value,
314 entry->greatest_modified_value);
315 return;
316 }
317
318 liv = get_lowest_invalidated_time_for_hypertable(entry->hypertable_relid);
319
320 if (entry->lowest_modified_value < liv)
321 invalidation_hyper_log_add_entry(entry->entry_id,
322 entry->lowest_modified_value,
323 entry->greatest_modified_value);
324 };
325
326 static void
cache_inval_cleanup(void)327 cache_inval_cleanup(void)
328 {
329 Assert(continuous_aggs_cache_inval_htab != NULL);
330 hash_destroy(continuous_aggs_cache_inval_htab);
331 MemoryContextDelete(continuous_aggs_trigger_mctx);
332
333 continuous_aggs_cache_inval_htab = NULL;
334 continuous_aggs_trigger_mctx = NULL;
335 };
336
337 static void
cache_inval_htab_write(void)338 cache_inval_htab_write(void)
339 {
340 HASH_SEQ_STATUS hash_seq;
341 ContinuousAggsCacheInvalEntry *current_entry;
342 Catalog *catalog;
343
344 if (hash_get_num_entries(continuous_aggs_cache_inval_htab) == 0)
345 return;
346
347 catalog = ts_catalog_get();
348
349 /* The invalidation threshold must remain locked until the end of
350 * the transaction to ensure the materializer will see our updates,
351 * so we explicitly lock it here
352 */
353 LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
354 AccessShareLock);
355
356 hash_seq_init(&hash_seq, continuous_aggs_cache_inval_htab);
357 while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
358 cache_inval_entry_write(current_entry);
359 };
360
361 /*
362 * We use TopTransactionContext for our cached invalidations.
363 * We need to make sure cache_inval_cleanup() is always called after cache_inval_htab_write().
364 * We need this memory context to survive the transaction lifetime so that cache_inval_cleanup()
365 * does not attempt to tear down memory that has already been freed due to a transaction ending.
366 *
367 * The order of operations in postgres can be this:
368 * CallXactCallbacks(XACT_EVENT_PRE_PREPARE);
369 * ...
370 * CallXactCallbacks(XACT_EVENT_PREPARE);
371 * ...
372 * MemoryContextDelete(TopTransactionContext);
373 *
374 * or that:
375 * CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
376 * ...
377 * CallXactCallbacks(XACT_EVENT_COMMIT);
378 * ...
379 * MemoryContextDelete(TopTransactionContext);
380 *
381 * In the case of a 2PC transaction, we need to make sure to apply the invalidations at
382 * XACT_EVENT_PRE_PREPARE time, before TopTransactionContext is torn down by PREPARE TRANSACTION.
383 * Otherwise, we are unable to call cache_inval_cleanup() without corrupting the memory. For
384 * this reason, we also deallocate at XACT_EVENT_PREPARE time.
385 *
386 * For local transactions we apply the invalidations at XACT_EVENT_PRE_COMMIT time.
387 * Similar care is taken of parallel workers and aborting transactions.
388 */
389 static void
continuous_agg_xact_invalidation_callback(XactEvent event,void * arg)390 continuous_agg_xact_invalidation_callback(XactEvent event, void *arg)
391 {
392 /* Return quickly if we never initialize the hashtable */
393 if (!continuous_aggs_cache_inval_htab)
394 return;
395
396 switch (event)
397 {
398 case XACT_EVENT_PRE_PREPARE:
399 case XACT_EVENT_PRE_COMMIT:
400 case XACT_EVENT_PARALLEL_PRE_COMMIT:
401 cache_inval_htab_write();
402 break;
403 case XACT_EVENT_PREPARE:
404 case XACT_EVENT_COMMIT:
405 case XACT_EVENT_PARALLEL_COMMIT:
406 case XACT_EVENT_ABORT:
407 case XACT_EVENT_PARALLEL_ABORT:
408 cache_inval_cleanup();
409 break;
410 default:
411 break;
412 }
413 }
414
415 void
_continuous_aggs_cache_inval_init(void)416 _continuous_aggs_cache_inval_init(void)
417 {
418 RegisterXactCallback(continuous_agg_xact_invalidation_callback, NULL);
419 }
420
421 void
_continuous_aggs_cache_inval_fini(void)422 _continuous_aggs_cache_inval_fini(void)
423 {
424 UnregisterXactCallback(continuous_agg_xact_invalidation_callback, NULL);
425 }
426
427 static ScanTupleResult
invalidation_tuple_found(TupleInfo * ti,void * min)428 invalidation_tuple_found(TupleInfo *ti, void *min)
429 {
430 bool isnull;
431 Datum watermark =
432 slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull);
433
434 Assert(!isnull);
435
436 if (DatumGetInt64(watermark) < *((int64 *) min))
437 *((int64 *) min) = DatumGetInt64(watermark);
438
439 /*
440 * Return SCAN_CONTINUE because we check for multiple tuples as an error
441 * condition.
442 */
443 return SCAN_CONTINUE;
444 }
445
446 int64
get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid)447 get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid)
448 {
449 int64 min_val = INVAL_POS_INFINITY;
450 Catalog *catalog = ts_catalog_get();
451 ScanKeyData scankey[1];
452 ScannerCtx scanctx;
453
454 ScanKeyInit(&scankey[0],
455 Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
456 BTEqualStrategyNumber,
457 F_INT4EQ,
458 Int32GetDatum(ts_hypertable_relid_to_id(hypertable_relid)));
459 scanctx = (ScannerCtx){
460 .table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
461 .index = catalog_get_index(catalog,
462 CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
463 CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY),
464 .nkeys = 1,
465 .scankey = scankey,
466 .tuple_found = &invalidation_tuple_found,
467 .filter = NULL,
468 .data = &min_val,
469 .lockmode = AccessShareLock,
470 .scandirection = ForwardScanDirection,
471 .result_mctx = NULL,
472 };
473
474 /* if we don't find any watermark, then we've never done any materialization
475 * we'll treat this as if the invalidation timestamp is at min value, since
476 * the first materialization needs to scan the entire table anyway; the
477 * invalidations are redundant.
478 */
479 if (!ts_scanner_scan_one(&scanctx, false, "invalidation watermark"))
480 return INVAL_NEG_INFINITY;
481
482 return min_val;
483 }
484