1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <fmgr.h>
8 #include <storage/ipc.h>
9 
10 #include "bgw_policy/compression_api.h"
11 #include "bgw_policy/continuous_aggregate_api.h"
12 #include "bgw_policy/retention_api.h"
13 #include "bgw_policy/job.h"
14 #include "bgw_policy/job_api.h"
15 #include "bgw_policy/reorder_api.h"
16 #include "chunk.h"
17 #include "chunk_api.h"
18 #include "compression/array.h"
19 #include "compression/compression.h"
20 #include "compression/compress_utils.h"
21 #include "compression/create.h"
22 #include "compression/deltadelta.h"
23 #include "compression/dictionary.h"
24 #include "compression/gorilla.h"
25 #include "compression/segment_meta.h"
26 #include "continuous_aggs/create.h"
27 #include "continuous_aggs/insert.h"
28 #include "continuous_aggs/options.h"
29 #include "continuous_aggs/refresh.h"
30 #include "continuous_aggs/invalidation.h"
31 #include "cross_module_fn.h"
32 #include "nodes/data_node_dispatch.h"
33 #include "data_node.h"
34 #include "dist_util.h"
35 #include "export.h"
36 #include "fdw/fdw.h"
37 #include "hypertable.h"
38 #include "license_guc.h"
39 #include "nodes/decompress_chunk/planner.h"
40 #include "nodes/skip_scan/skip_scan.h"
41 #include "nodes/gapfill/gapfill.h"
42 #include "partialize_finalize.h"
43 #include "planner.h"
44 #include "process_utility.h"
45 #include "process_utility.h"
46 #include "remote/connection_cache.h"
47 #include "remote/connection.h"
48 #include "remote/dist_commands.h"
49 #include "remote/dist_copy.h"
50 #include "remote/dist_txn.h"
51 #include "remote/txn_id.h"
52 #include "remote/txn_resolve.h"
53 #include "reorder.h"
54 #include "telemetry.h"
55 #include "dist_backup.h"
56 
57 #ifdef PG_MODULE_MAGIC
58 PG_MODULE_MAGIC;
59 #endif
60 
61 #ifdef APACHE_ONLY
62 #error "cannot compile the TSL for ApacheOnly mode"
63 #endif
64 
65 extern void PGDLLEXPORT _PG_init(void);
66 extern void PGDLLEXPORT _PG_fini(void);
67 
68 static void
cache_syscache_invalidate(Datum arg,int cacheid,uint32 hashvalue)69 cache_syscache_invalidate(Datum arg, int cacheid, uint32 hashvalue)
70 {
71 	remote_connection_cache_invalidate_callback(arg, cacheid, hashvalue);
72 }
73 
74 /*
75  * Cross module function initialization.
76  *
77  * During module start we set ts_cm_functions to point at the tsl version of the
78  * function registry.
79  *
80  * NOTE: To ensure that your cross-module function has a correct default, you
81  * must also add it to ts_cm_functions_default in cross_module_fn.c in the
82  * Apache codebase.
83  */
84 CrossModuleFunctions tsl_cm_functions = {
85 	.add_tsl_telemetry_info = tsl_telemetry_add_info,
86 
87 	.create_upper_paths_hook = tsl_create_upper_paths_hook,
88 	.set_rel_pathlist_dml = tsl_set_rel_pathlist_dml,
89 	.set_rel_pathlist_query = tsl_set_rel_pathlist_query,
90 
91 	/* bgw policies */
92 	.policy_compression_add = policy_compression_add,
93 	.policy_compression_remove = policy_compression_remove,
94 	.policy_recompression_proc = policy_recompression_proc,
95 	.policy_refresh_cagg_add = policy_refresh_cagg_add,
96 	.policy_refresh_cagg_proc = policy_refresh_cagg_proc,
97 	.policy_refresh_cagg_remove = policy_refresh_cagg_remove,
98 	.policy_reorder_add = policy_reorder_add,
99 	.policy_reorder_proc = policy_reorder_proc,
100 	.policy_reorder_remove = policy_reorder_remove,
101 	.policy_retention_add = policy_retention_add,
102 	.policy_retention_proc = policy_retention_proc,
103 	.policy_retention_remove = policy_retention_remove,
104 
105 	.job_add = job_add,
106 	.job_alter = job_alter,
107 	.job_delete = job_delete,
108 	.job_run = job_run,
109 	.job_execute = job_execute,
110 
111 	/* gapfill */
112 	.gapfill_marker = gapfill_marker,
113 	.gapfill_int16_time_bucket = gapfill_int16_time_bucket,
114 	.gapfill_int32_time_bucket = gapfill_int32_time_bucket,
115 	.gapfill_int64_time_bucket = gapfill_int64_time_bucket,
116 	.gapfill_date_time_bucket = gapfill_date_time_bucket,
117 	.gapfill_timestamp_time_bucket = gapfill_timestamp_time_bucket,
118 	.gapfill_timestamptz_time_bucket = gapfill_timestamptz_time_bucket,
119 
120 	.reorder_chunk = tsl_reorder_chunk,
121 	.move_chunk = tsl_move_chunk,
122 	.move_chunk_proc = tsl_move_chunk_proc,
123 	.copy_chunk_proc = tsl_copy_chunk_proc,
124 	.copy_chunk_cleanup_proc = tsl_copy_chunk_cleanup_proc,
125 
126 	/* Continuous Aggregates */
127 	.partialize_agg = tsl_partialize_agg,
128 	.finalize_agg_sfunc = tsl_finalize_agg_sfunc,
129 	.finalize_agg_ffunc = tsl_finalize_agg_ffunc,
130 	.process_cagg_viewstmt = tsl_process_continuous_agg_viewstmt,
131 	.continuous_agg_invalidation_trigger = continuous_agg_trigfn,
132 	.continuous_agg_call_invalidation_trigger = execute_cagg_trigger,
133 	.continuous_agg_refresh = continuous_agg_refresh,
134 	.continuous_agg_refresh_chunk = continuous_agg_refresh_chunk,
135 	.continuous_agg_invalidate = invalidation_add_entry,
136 	.continuous_agg_update_options = continuous_agg_update_options,
137 	.invalidation_cagg_log_add_entry = tsl_invalidation_cagg_log_add_entry,
138 	.invalidation_hyper_log_add_entry = tsl_invalidation_hyper_log_add_entry,
139 	.remote_invalidation_log_delete = remote_invalidation_log_delete,
140 	.drop_dist_ht_invalidation_trigger = tsl_drop_dist_ht_invalidation_trigger,
141 	.remote_drop_dist_ht_invalidation_trigger = remote_drop_dist_ht_invalidation_trigger,
142 	.invalidation_process_hypertable_log = tsl_invalidation_process_hypertable_log,
143 	.invalidation_process_cagg_log = tsl_invalidation_process_cagg_log,
144 
145 	.compressed_data_decompress_forward = tsl_compressed_data_decompress_forward,
146 	.compressed_data_decompress_reverse = tsl_compressed_data_decompress_reverse,
147 	.compressed_data_send = tsl_compressed_data_send,
148 	.compressed_data_recv = tsl_compressed_data_recv,
149 	.compressed_data_in = tsl_compressed_data_in,
150 	.compressed_data_out = tsl_compressed_data_out,
151 	.deltadelta_compressor_append = tsl_deltadelta_compressor_append,
152 	.deltadelta_compressor_finish = tsl_deltadelta_compressor_finish,
153 	.gorilla_compressor_append = tsl_gorilla_compressor_append,
154 	.gorilla_compressor_finish = tsl_gorilla_compressor_finish,
155 	.dictionary_compressor_append = tsl_dictionary_compressor_append,
156 	.dictionary_compressor_finish = tsl_dictionary_compressor_finish,
157 	.array_compressor_append = tsl_array_compressor_append,
158 	.array_compressor_finish = tsl_array_compressor_finish,
159 	.process_compress_table = tsl_process_compress_table,
160 	.process_altertable_cmd = tsl_process_altertable_cmd,
161 	.process_rename_cmd = tsl_process_rename_cmd,
162 	.compress_chunk = tsl_compress_chunk,
163 	.decompress_chunk = tsl_decompress_chunk,
164 	.recompress_chunk = tsl_recompress_chunk,
165 	.compress_row_init = compress_row_init,
166 	.compress_row_exec = compress_row_exec,
167 	.compress_row_end = compress_row_end,
168 	.compress_row_destroy = compress_row_destroy,
169 	.data_node_add = data_node_add,
170 	.data_node_delete = data_node_delete,
171 	.data_node_attach = data_node_attach,
172 	.data_node_ping = data_node_ping,
173 	.data_node_detach = data_node_detach,
174 	.data_node_allow_new_chunks = data_node_allow_new_chunks,
175 	.data_node_block_new_chunks = data_node_block_new_chunks,
176 	.chunk_set_default_data_node = chunk_set_default_data_node,
177 	.show_chunk = chunk_show,
178 	.create_chunk = chunk_create,
179 	.create_chunk_on_data_nodes = chunk_api_create_on_data_nodes,
180 	.chunk_drop_replica = chunk_drop_replica,
181 	.hypertable_make_distributed = hypertable_make_distributed,
182 	.get_and_validate_data_node_list = hypertable_get_and_validate_data_nodes,
183 	.timescaledb_fdw_handler = timescaledb_fdw_handler,
184 	.timescaledb_fdw_validator = timescaledb_fdw_validator,
185 	.remote_txn_id_in = remote_txn_id_in_pg,
186 	.remote_txn_id_out = remote_txn_id_out_pg,
187 	.remote_txn_heal_data_node = remote_txn_heal_data_node,
188 	.remote_connection_cache_show = remote_connection_cache_show,
189 	.set_rel_pathlist = tsl_set_rel_pathlist,
190 	.distributed_insert_path_create = tsl_create_distributed_insert_path,
191 	.distributed_copy = remote_distributed_copy,
192 	.ddl_command_start = tsl_ddl_command_start,
193 	.ddl_command_end = tsl_ddl_command_end,
194 	.sql_drop = tsl_sql_drop,
195 	.set_distributed_id = dist_util_set_id,
196 	.set_distributed_peer_id = dist_util_set_peer_id,
197 	.is_access_node_session = dist_util_is_access_node_session_on_data_node,
198 	.remove_from_distributed_db = dist_util_remove_from_db,
199 	.dist_remote_hypertable_info = dist_util_remote_hypertable_info,
200 	.dist_remote_chunk_info = dist_util_remote_chunk_info,
201 	.dist_remote_compressed_chunk_info = dist_util_remote_compressed_chunk_info,
202 	.dist_remote_hypertable_index_info = dist_util_remote_hypertable_index_info,
203 	.validate_as_data_node = validate_data_node_settings,
204 	.distributed_exec = ts_dist_cmd_exec,
205 	.create_distributed_restore_point = create_distributed_restore_point,
206 	.func_call_on_data_nodes = ts_dist_cmd_func_call_on_data_nodes,
207 	.chunk_get_relstats = chunk_api_get_chunk_relstats,
208 	.chunk_get_colstats = chunk_api_get_chunk_colstats,
209 	.chunk_create_empty_table = chunk_create_empty_table,
210 	.chunk_create_replica_table = chunk_create_replica_table,
211 	.hypertable_distributed_set_replication_factor = hypertable_set_replication_factor,
212 	.cache_syscache_invalidate = cache_syscache_invalidate,
213 	.update_compressed_chunk_relstats = update_compressed_chunk_relstats,
214 };
215 
216 static void
ts_module_cleanup_on_pg_exit(int code,Datum arg)217 ts_module_cleanup_on_pg_exit(int code, Datum arg)
218 {
219 	_tsl_process_utility_fini();
220 	_remote_dist_txn_fini();
221 	_remote_connection_cache_fini();
222 	_continuous_aggs_cache_inval_fini();
223 }
224 
225 TS_FUNCTION_INFO_V1(ts_module_init);
226 /*
227  * Module init function, sets ts_cm_functions to point at tsl_cm_functions
228  */
229 PGDLLEXPORT Datum
ts_module_init(PG_FUNCTION_ARGS)230 ts_module_init(PG_FUNCTION_ARGS)
231 {
232 	ts_cm_functions = &tsl_cm_functions;
233 
234 	_continuous_aggs_cache_inval_init();
235 	_decompress_chunk_init();
236 	_skip_scan_init();
237 	_remote_connection_cache_init();
238 	_remote_dist_txn_init();
239 	_tsl_process_utility_init();
240 	/* Register a cleanup function to be called when the backend exits */
241 	on_proc_exit(ts_module_cleanup_on_pg_exit, 0);
242 	PG_RETURN_BOOL(true);
243 }
244 
245 /* Informative functions */
246 
247 PGDLLEXPORT void
_PG_init(void)248 _PG_init(void)
249 {
250 	/*
251 	 * In a normal backend, we disable loading the tsl until after the main
252 	 * timescale library is loaded, after which we enable it from the loader.
253 	 * In parallel workers the restore shared libraries function will load the
254 	 * libraries itself, and we bypass the loader, so we need to ensure that
255 	 * timescale is aware it can use the tsl if needed. It is always safe to
256 	 * do this here, because if we reach this point, we must have already
257 	 * loaded the tsl, so we no longer need to worry about its load order
258 	 * relative to the other libraries.
259 	 */
260 	ts_license_enable_module_loading();
261 
262 	_remote_connection_init();
263 }
264 
265 PGDLLEXPORT void
_PG_fini(void)266 _PG_fini(void)
267 {
268 	ts_module_cleanup_on_pg_exit(0, 0);
269 }
270