1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 
7 #include <postgres.h>
8 #include <catalog/pg_type.h>
9 #include <executor/spi.h>
10 #include <fmgr.h>
11 #include <commands/dbcommands.h>
12 #include <commands/trigger.h>
13 #include <lib/stringinfo.h>
14 #include <miscadmin.h>
15 #include <utils/hsearch.h>
16 #include <access/tupconvert.h>
17 #include <storage/lmgr.h>
18 #include <utils/builtins.h>
19 #include <utils/rel.h>
20 #include <utils/relcache.h>
21 #include <access/xact.h>
22 
23 #include <scanner.h>
24 #include <continuous_agg.h>
25 
26 #include "compat/compat.h"
27 
28 #include "catalog.h"
29 #include "chunk.h"
30 #include "dimension.h"
31 #include "hypertable.h"
32 #include "hypertable_cache.h"
33 #include "invalidation.h"
34 #include "export.h"
35 #include "partitioning.h"
36 #include "utils.h"
37 #include "time_bucket.h"
38 
39 #include "continuous_aggs/insert.h"
40 
41 /*
42  * When tuples in a hypertable that has a continuous aggregate are modified, the
43  * lowest modified value and the greatest modified value must be tracked over
44  * the course of a transaction or statement. At the end of the statement these
45  * values will be inserted into the proper cache invalidation log table for
46  * their associated hypertable if they are below the speculative materialization
47  * watermark (or, if in REPEATABLE_READ isolation level or higher, they will be
48  * inserted no matter what as we cannot see if a materialization transaction has
49  * started and moved the watermark during our transaction in that case).
50  *
51  * We accomplish this at the transaction level by keeping a hash table of each
52  * hypertable that has been modified in the transaction and the lowest and
53  * greatest modified values. The hashtable will be updated via a trigger that
54  * will be called for every row that is inserted, updated or deleted. We use a
55  * hashtable because we need to keep track of this on a per hypertable basis and
56  * multiple can have tuples modified during a single transaction. (And if we
57  * move to per-chunk cache-invalidation it makes it even easier).
58  *
59  */
60 typedef struct ContinuousAggsCacheInvalEntry
61 {
62 	int32 hypertable_id;
63 	Oid hypertable_relid;
64 	int32 entry_id; /*
65 					 * This is what actually gets written to the hypertable log. It can be the same
66 					 * as the hypertable_id for normal hypertables. In the distributed case it is
67 					 * the ID of the parent hypertable in the Access Node.
68 					 */
69 	Dimension hypertable_open_dimension;
70 	Oid previous_chunk_relid;
71 	AttrNumber previous_chunk_open_dimension;
72 	bool value_is_set;
73 	int64 lowest_modified_value;
74 	int64 greatest_modified_value;
75 } ContinuousAggsCacheInvalEntry;
76 
77 static int64 get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid);
78 
79 #define CA_CACHE_INVAL_INIT_HTAB_SIZE 64
80 
81 static HTAB *continuous_aggs_cache_inval_htab = NULL;
82 static MemoryContext continuous_aggs_trigger_mctx = NULL;
83 
84 void _continuous_aggs_cache_inval_init(void);
85 void _continuous_aggs_cache_inval_fini(void);
86 
87 static void
cache_inval_init()88 cache_inval_init()
89 {
90 	HASHCTL ctl;
91 
92 	Assert(continuous_aggs_trigger_mctx == NULL);
93 
94 	continuous_aggs_trigger_mctx = AllocSetContextCreate(TopTransactionContext,
95 														 "ContinuousAggsTriggerCtx",
96 														 ALLOCSET_DEFAULT_SIZES);
97 
98 	memset(&ctl, 0, sizeof(ctl));
99 	ctl.keysize = sizeof(int32);
100 	ctl.entrysize = sizeof(ContinuousAggsCacheInvalEntry);
101 	ctl.hcxt = continuous_aggs_trigger_mctx;
102 
103 	continuous_aggs_cache_inval_htab = hash_create("TS Continuous Aggs Cache Inval",
104 												   CA_CACHE_INVAL_INIT_HTAB_SIZE,
105 												   &ctl,
106 												   HASH_ELEM | HASH_BLOBS);
107 };
108 
109 static int64
tuple_get_time(Dimension * d,HeapTuple tuple,AttrNumber col,TupleDesc tupdesc)110 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc)
111 {
112 	Datum datum;
113 	bool isnull;
114 	Oid dimtype;
115 
116 	datum = heap_getattr(tuple, col, tupdesc, &isnull);
117 
118 	if (NULL != d->partitioning)
119 	{
120 		Oid collation = TupleDescAttr(tupdesc, col)->attcollation;
121 		datum = ts_partitioning_func_apply(d->partitioning, collation, datum);
122 	}
123 
124 	Assert(d->type == DIMENSION_TYPE_OPEN);
125 
126 	dimtype = ts_dimension_get_partition_type(d);
127 
128 	if (isnull)
129 		ereport(ERROR,
130 				(errcode(ERRCODE_NOT_NULL_VIOLATION),
131 				 errmsg("NULL value in column \"%s\" violates not-null constraint",
132 						NameStr(d->fd.column_name)),
133 				 errhint("Columns used for time partitioning cannot be NULL")));
134 
135 	return ts_time_value_to_internal(datum, dimtype);
136 }
137 
138 static inline void
cache_inval_entry_init(ContinuousAggsCacheInvalEntry * cache_entry,int32 hypertable_id,int32 entry_id)139 cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id,
140 					   int32 entry_id)
141 {
142 	Cache *ht_cache = ts_hypertable_cache_pin();
143 	/* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the
144 	 * hypertable_relid directly from the Chunk*/
145 	Hypertable *ht = ts_hypertable_cache_get_entry_by_id(ht_cache, hypertable_id);
146 	cache_entry->hypertable_id = hypertable_id;
147 	cache_entry->entry_id = entry_id;
148 	cache_entry->hypertable_relid = ht->main_table_relid;
149 	cache_entry->hypertable_open_dimension = *hyperspace_get_open_dimension(ht->space, 0);
150 	if (cache_entry->hypertable_open_dimension.partitioning != NULL)
151 	{
152 		PartitioningInfo *open_dim_part_info =
153 			MemoryContextAllocZero(continuous_aggs_trigger_mctx, sizeof(*open_dim_part_info));
154 		*open_dim_part_info = *cache_entry->hypertable_open_dimension.partitioning;
155 		cache_entry->hypertable_open_dimension.partitioning = open_dim_part_info;
156 	}
157 	cache_entry->previous_chunk_relid = InvalidOid;
158 	cache_entry->value_is_set = false;
159 	cache_entry->lowest_modified_value = INVAL_POS_INFINITY;
160 	cache_entry->greatest_modified_value = INVAL_NEG_INFINITY;
161 	ts_cache_release(ht_cache);
162 }
163 
164 static inline void
cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry * cache_entry,Oid chunk_id,Relation chunk_relation)165 cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry, Oid chunk_id,
166 							Relation chunk_relation)
167 {
168 	Chunk *modified_tuple_chunk = ts_chunk_get_by_relid(chunk_id, false);
169 	if (modified_tuple_chunk == NULL)
170 		elog(ERROR, "continuous agg trigger function must be called on hypertable chunks only");
171 
172 	cache_entry->previous_chunk_relid = modified_tuple_chunk->table_id;
173 	cache_entry->previous_chunk_open_dimension =
174 		get_attnum(chunk_relation->rd_id,
175 				   NameStr(cache_entry->hypertable_open_dimension.fd.column_name));
176 
177 	if (cache_entry->previous_chunk_open_dimension == InvalidAttrNumber)
178 		elog(ERROR, "continuous agg trigger function must be called on hypertable chunks only");
179 }
180 
181 static inline void
update_cache_entry(ContinuousAggsCacheInvalEntry * cache_entry,int64 timeval)182 update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval)
183 {
184 	cache_entry->value_is_set = true;
185 	if (timeval < cache_entry->lowest_modified_value)
186 		cache_entry->lowest_modified_value = timeval;
187 	if (timeval > cache_entry->greatest_modified_value)
188 		cache_entry->greatest_modified_value = timeval;
189 }
190 
191 /*
192  * Trigger to store what the max/min updated values are for a function.
193  * This is used by continuous aggregates to ensure that the aggregated values
194  * are updated correctly. Upon creating a continuous aggregate for a hypertable,
195  * this trigger should be registered, if it does not already exist.
196  */
197 Datum
continuous_agg_trigfn(PG_FUNCTION_ARGS)198 continuous_agg_trigfn(PG_FUNCTION_ARGS)
199 {
200 	/*
201 	 * Use TriggerData to determine which row to return/work with, in the case
202 	 * of updates, we'll need to call the functions twice, once with the old
203 	 * rows (which act like deletes) and once with the new rows.
204 	 */
205 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
206 	char *hypertable_id_str, *parent_hypertable_id_str;
207 	int32 hypertable_id, parent_hypertable_id = 0;
208 	bool is_distributed_hypertable_trigger = false;
209 	if (trigdata->tg_trigger->tgnargs < 0)
210 		elog(ERROR, "must supply hypertable id");
211 
212 	hypertable_id_str = trigdata->tg_trigger->tgargs[0];
213 	hypertable_id = atol(hypertable_id_str);
214 
215 	if (trigdata->tg_trigger->tgnargs > 1)
216 	{
217 		parent_hypertable_id_str = trigdata->tg_trigger->tgargs[1];
218 		parent_hypertable_id = atol(parent_hypertable_id_str);
219 		is_distributed_hypertable_trigger = true;
220 	}
221 
222 	if (!CALLED_AS_TRIGGER(fcinfo))
223 		elog(ERROR, "continuous agg trigger function must be called by trigger manager");
224 	if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
225 		elog(ERROR, "continuous agg trigger function must be called in per row after trigger");
226 	execute_cagg_trigger(hypertable_id,
227 						 trigdata->tg_relation,
228 						 trigdata->tg_trigtuple,
229 						 trigdata->tg_newtuple,
230 						 TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event),
231 						 is_distributed_hypertable_trigger,
232 						 parent_hypertable_id);
233 	if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
234 		return PointerGetDatum(trigdata->tg_trigtuple);
235 	else
236 		return PointerGetDatum(trigdata->tg_newtuple);
237 }
238 
239 /*
240  * chunk_tuple is the tuple from trigdata->tg_trigtuple
241  * i.e. the one being/inserts/deleted/updated.
242  * (for updates: this is the row before modification)
243  * chunk_newtuple is the tuple from trigdata->tg_newtuple.
244  */
245 void
execute_cagg_trigger(int32 hypertable_id,Relation chunk_rel,HeapTuple chunk_tuple,HeapTuple chunk_newtuple,bool update,bool is_distributed_hypertable_trigger,int32 parent_hypertable_id)246 execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
247 					 HeapTuple chunk_newtuple, bool update, bool is_distributed_hypertable_trigger,
248 					 int32 parent_hypertable_id)
249 {
250 	ContinuousAggsCacheInvalEntry *cache_entry;
251 	bool found;
252 	int64 timeval;
253 	Oid chunk_relid = chunk_rel->rd_id;
254 	/* On first call, init the mctx and hash table */
255 	if (!continuous_aggs_cache_inval_htab)
256 		cache_inval_init();
257 
258 	cache_entry = (ContinuousAggsCacheInvalEntry *)
259 		hash_search(continuous_aggs_cache_inval_htab, &hypertable_id, HASH_ENTER, &found);
260 
261 	if (!found)
262 		cache_inval_entry_init(cache_entry,
263 							   hypertable_id,
264 							   is_distributed_hypertable_trigger ? parent_hypertable_id :
265 																   hypertable_id);
266 
267 	/* handle the case where we need to repopulate the cached chunk data */
268 	if (cache_entry->previous_chunk_relid != chunk_relid)
269 		cache_entry_switch_to_chunk(cache_entry, chunk_relid, chunk_rel);
270 
271 	timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
272 							 chunk_tuple,
273 							 cache_entry->previous_chunk_open_dimension,
274 							 RelationGetDescr(chunk_rel));
275 
276 	update_cache_entry(cache_entry, timeval);
277 
278 	if (!update)
279 		return;
280 
281 	/* on update we need to invalidate the new time value as well as the old one */
282 	timeval = tuple_get_time(&cache_entry->hypertable_open_dimension,
283 							 chunk_newtuple,
284 							 cache_entry->previous_chunk_open_dimension,
285 							 RelationGetDescr(chunk_rel));
286 
287 	update_cache_entry(cache_entry, timeval);
288 }
289 
290 static void
cache_inval_entry_write(ContinuousAggsCacheInvalEntry * entry)291 cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry)
292 {
293 	int64 liv;
294 
295 	if (!entry->value_is_set)
296 		return;
297 
298 	Cache *ht_cache = ts_hypertable_cache_pin();
299 	Hypertable *ht = ts_hypertable_cache_get_entry_by_id(ht_cache, entry->hypertable_id);
300 	bool is_distributed_member = hypertable_is_distributed_member(ht);
301 	ts_cache_release(ht_cache);
302 
303 	/* The materialization worker uses a READ COMMITTED isolation level by default. Therefore, if we
304 	 * use a stronger isolation level, the isolation thereshold could update without us seeing the
305 	 * new value. In order to prevent serialization errors, we always append invalidation entries in
306 	 * the case when we're using a strong enough isolation level that we won't see the new
307 	 * threshold. The same applies for distributed member invalidation triggers of hypertables.
308 	 * The materializer can handle invalidations that are beyond the threshold gracefully.
309 	 */
310 	if (IsolationUsesXactSnapshot() || is_distributed_member)
311 	{
312 		invalidation_hyper_log_add_entry(entry->entry_id,
313 										 entry->lowest_modified_value,
314 										 entry->greatest_modified_value);
315 		return;
316 	}
317 
318 	liv = get_lowest_invalidated_time_for_hypertable(entry->hypertable_relid);
319 
320 	if (entry->lowest_modified_value < liv)
321 		invalidation_hyper_log_add_entry(entry->entry_id,
322 										 entry->lowest_modified_value,
323 										 entry->greatest_modified_value);
324 };
325 
326 static void
cache_inval_cleanup(void)327 cache_inval_cleanup(void)
328 {
329 	Assert(continuous_aggs_cache_inval_htab != NULL);
330 	hash_destroy(continuous_aggs_cache_inval_htab);
331 	MemoryContextDelete(continuous_aggs_trigger_mctx);
332 
333 	continuous_aggs_cache_inval_htab = NULL;
334 	continuous_aggs_trigger_mctx = NULL;
335 };
336 
337 static void
cache_inval_htab_write(void)338 cache_inval_htab_write(void)
339 {
340 	HASH_SEQ_STATUS hash_seq;
341 	ContinuousAggsCacheInvalEntry *current_entry;
342 	Catalog *catalog;
343 
344 	if (hash_get_num_entries(continuous_aggs_cache_inval_htab) == 0)
345 		return;
346 
347 	catalog = ts_catalog_get();
348 
349 	/* The invalidation threshold must remain locked until the end of
350 	 * the transaction to ensure the materializer will see our updates,
351 	 * so we explicitly lock it here
352 	 */
353 	LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
354 					AccessShareLock);
355 
356 	hash_seq_init(&hash_seq, continuous_aggs_cache_inval_htab);
357 	while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
358 		cache_inval_entry_write(current_entry);
359 };
360 
361 /*
362  * We use TopTransactionContext for our cached invalidations.
363  * We need to make sure cache_inval_cleanup() is always called after cache_inval_htab_write().
364  * We need this memory context to survive the transaction lifetime so that cache_inval_cleanup()
365  * does not attempt to tear down memory that has already been freed due to a transaction ending.
366  *
367  * The order of operations in postgres can be this:
368  * CallXactCallbacks(XACT_EVENT_PRE_PREPARE);
369  * ...
370  * CallXactCallbacks(XACT_EVENT_PREPARE);
371  * ...
372  * MemoryContextDelete(TopTransactionContext);
373  *
374  * or that:
375  * CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
376  * ...
377  * CallXactCallbacks(XACT_EVENT_COMMIT);
378  * ...
379  * MemoryContextDelete(TopTransactionContext);
380  *
381  * In the case of a 2PC transaction, we need to make sure to apply the invalidations at
382  * XACT_EVENT_PRE_PREPARE time, before TopTransactionContext is torn down by PREPARE TRANSACTION.
383  * Otherwise, we are unable to call cache_inval_cleanup() without corrupting the memory. For
384  * this reason, we also deallocate at XACT_EVENT_PREPARE time.
385  *
386  * For local transactions we apply the invalidations at XACT_EVENT_PRE_COMMIT time.
387  * Similar care is taken of parallel workers and aborting transactions.
388  */
389 static void
continuous_agg_xact_invalidation_callback(XactEvent event,void * arg)390 continuous_agg_xact_invalidation_callback(XactEvent event, void *arg)
391 {
392 	/* Return quickly if we never initialize the hashtable */
393 	if (!continuous_aggs_cache_inval_htab)
394 		return;
395 
396 	switch (event)
397 	{
398 		case XACT_EVENT_PRE_PREPARE:
399 		case XACT_EVENT_PRE_COMMIT:
400 		case XACT_EVENT_PARALLEL_PRE_COMMIT:
401 			cache_inval_htab_write();
402 			break;
403 		case XACT_EVENT_PREPARE:
404 		case XACT_EVENT_COMMIT:
405 		case XACT_EVENT_PARALLEL_COMMIT:
406 		case XACT_EVENT_ABORT:
407 		case XACT_EVENT_PARALLEL_ABORT:
408 			cache_inval_cleanup();
409 			break;
410 		default:
411 			break;
412 	}
413 }
414 
415 void
_continuous_aggs_cache_inval_init(void)416 _continuous_aggs_cache_inval_init(void)
417 {
418 	RegisterXactCallback(continuous_agg_xact_invalidation_callback, NULL);
419 }
420 
421 void
_continuous_aggs_cache_inval_fini(void)422 _continuous_aggs_cache_inval_fini(void)
423 {
424 	UnregisterXactCallback(continuous_agg_xact_invalidation_callback, NULL);
425 }
426 
427 static ScanTupleResult
invalidation_tuple_found(TupleInfo * ti,void * min)428 invalidation_tuple_found(TupleInfo *ti, void *min)
429 {
430 	bool isnull;
431 	Datum watermark =
432 		slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull);
433 
434 	Assert(!isnull);
435 
436 	if (DatumGetInt64(watermark) < *((int64 *) min))
437 		*((int64 *) min) = DatumGetInt64(watermark);
438 
439 	/*
440 	 * Return SCAN_CONTINUE because we check for multiple tuples as an error
441 	 * condition.
442 	 */
443 	return SCAN_CONTINUE;
444 }
445 
446 int64
get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid)447 get_lowest_invalidated_time_for_hypertable(Oid hypertable_relid)
448 {
449 	int64 min_val = INVAL_POS_INFINITY;
450 	Catalog *catalog = ts_catalog_get();
451 	ScanKeyData scankey[1];
452 	ScannerCtx scanctx;
453 
454 	ScanKeyInit(&scankey[0],
455 				Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
456 				BTEqualStrategyNumber,
457 				F_INT4EQ,
458 				Int32GetDatum(ts_hypertable_relid_to_id(hypertable_relid)));
459 	scanctx = (ScannerCtx){
460 		.table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
461 		.index = catalog_get_index(catalog,
462 								   CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
463 								   CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY),
464 		.nkeys = 1,
465 		.scankey = scankey,
466 		.tuple_found = &invalidation_tuple_found,
467 		.filter = NULL,
468 		.data = &min_val,
469 		.lockmode = AccessShareLock,
470 		.scandirection = ForwardScanDirection,
471 		.result_mctx = NULL,
472 	};
473 
474 	/* if we don't find any watermark, then we've never done any materialization
475 	 * we'll treat this as if the invalidation timestamp is at min value, since
476 	 * the first materialization needs to scan the entire table anyway; the
477 	 * invalidations are redundant.
478 	 */
479 	if (!ts_scanner_scan_one(&scanctx, false, "invalidation watermark"))
480 		return INVAL_NEG_INFINITY;
481 
482 	return min_val;
483 }
484