1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 
7 #include <postgres.h>
8 #include <funcapi.h>
9 #include <miscadmin.h>
10 #include <utils/acl.h>
11 #include <utils/builtins.h>
12 
13 #include <bgw/job.h>
14 #include <bgw/job_stat.h>
15 
16 #include "job.h"
17 #include "job_api.h"
18 
19 /* Default max runtime for a custom job is unlimited for now */
20 #define DEFAULT_MAX_RUNTIME 0
21 
22 /* Right now, there is an infinite number of retries for custom jobs */
23 #define DEFAULT_MAX_RETRIES -1
24 /* Default retry period for reorder_jobs is currently 5 minutes */
25 #define DEFAULT_RETRY_PERIOD 5 * USECS_PER_MINUTE
26 
27 #define ALTER_JOB_NUM_COLS 8
28 
29 /*
30  * Check configuration for a job type.
31  */
32 static void
job_config_check(Name proc_schema,Name proc_name,Jsonb * config)33 job_config_check(Name proc_schema, Name proc_name, Jsonb *config)
34 {
35 	if (namestrcmp(proc_schema, INTERNAL_SCHEMA_NAME) == 0)
36 	{
37 		if (namestrcmp(proc_name, "policy_retention") == 0)
38 			policy_retention_read_and_validate_config(config, NULL);
39 		else if (namestrcmp(proc_name, "policy_reorder") == 0)
40 			policy_reorder_read_and_validate_config(config, NULL);
41 		else if (namestrcmp(proc_name, "policy_compression") == 0)
42 		{
43 			PolicyCompressionData policy_data;
44 			policy_compression_read_and_validate_config(config, &policy_data);
45 			ts_cache_release(policy_data.hcache);
46 		}
47 		else if (namestrcmp(proc_name, "policy_refresh_continuous_aggregate") == 0)
48 			policy_refresh_cagg_read_and_validate_config(config, NULL);
49 	}
50 }
51 
52 /*
53  * CREATE FUNCTION add_job(
54  * 0 proc REGPROC,
55  * 1 schedule_interval INTERVAL,
56  * 2 config JSONB DEFAULT NULL,
57  * 3 initial_start TIMESTAMPTZ DEFAULT NULL,
58  * 4 scheduled BOOL DEFAULT true
59  * ) RETURNS INTEGER
60  */
61 Datum
job_add(PG_FUNCTION_ARGS)62 job_add(PG_FUNCTION_ARGS)
63 {
64 	NameData application_name;
65 	NameData proc_name;
66 	NameData proc_schema;
67 	NameData owner_name;
68 	Interval max_runtime = { .time = DEFAULT_MAX_RUNTIME };
69 	Interval retry_period = { .time = DEFAULT_RETRY_PERIOD };
70 	int32 job_id;
71 	char *func_name = NULL;
72 
73 	Oid owner = GetUserId();
74 	Oid proc = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
75 	Interval *schedule_interval = PG_ARGISNULL(1) ? NULL : PG_GETARG_INTERVAL_P(1);
76 	Jsonb *config = PG_ARGISNULL(2) ? NULL : PG_GETARG_JSONB_P(2);
77 	bool scheduled = PG_ARGISNULL(4) ? true : PG_GETARG_BOOL(4);
78 
79 	TS_PREVENT_FUNC_IF_READ_ONLY();
80 
81 	if (PG_ARGISNULL(0))
82 		ereport(ERROR,
83 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
84 				 errmsg("function or procedure cannot be NULL")));
85 
86 	if (NULL == schedule_interval)
87 		ereport(ERROR,
88 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
89 				 errmsg("schedule interval cannot be NULL")));
90 
91 	func_name = get_func_name(proc);
92 	if (func_name == NULL)
93 		ereport(ERROR,
94 				(errcode(ERRCODE_UNDEFINED_OBJECT),
95 				 errmsg("function or procedure with OID %u does not exist", proc)));
96 
97 	if (pg_proc_aclcheck(proc, owner, ACL_EXECUTE) != ACLCHECK_OK)
98 		ereport(ERROR,
99 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
100 				 errmsg("permission denied for function \"%s\"", func_name),
101 				 errhint("Job owner must have EXECUTE privilege on the function.")));
102 
103 	/* Verify that the owner can create a background worker */
104 	ts_bgw_job_validate_job_owner(owner);
105 
106 	/* Next, insert a new job into jobs table */
107 	namestrcpy(&application_name, "User-Defined Action");
108 	namestrcpy(&proc_schema, get_namespace_name(get_func_namespace(proc)));
109 	namestrcpy(&proc_name, func_name);
110 	namestrcpy(&owner_name, GetUserNameFromId(owner, false));
111 
112 	if (config)
113 		job_config_check(&proc_schema, &proc_name, config);
114 
115 	job_id = ts_bgw_job_insert_relation(&application_name,
116 										schedule_interval,
117 										&max_runtime,
118 										DEFAULT_MAX_RETRIES,
119 										&retry_period,
120 										&proc_schema,
121 										&proc_name,
122 										&owner_name,
123 										scheduled,
124 										0,
125 										config);
126 
127 	if (!PG_ARGISNULL(3))
128 	{
129 		TimestampTz initial_start = PG_GETARG_TIMESTAMPTZ(3);
130 		ts_bgw_job_stat_upsert_next_start(job_id, initial_start);
131 	}
132 
133 	PG_RETURN_INT32(job_id);
134 }
135 
136 static BgwJob *
find_job(int32 job_id,bool null_job_id,bool missing_ok)137 find_job(int32 job_id, bool null_job_id, bool missing_ok)
138 {
139 	BgwJob *job;
140 
141 	if (null_job_id && !missing_ok)
142 		ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("job ID cannot be NULL")));
143 
144 	job = ts_bgw_job_find(job_id, CurrentMemoryContext, !missing_ok);
145 
146 	if (NULL == job)
147 	{
148 		Assert(missing_ok);
149 		ereport(NOTICE,
150 				(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("job %d not found, skipping", job_id)));
151 	}
152 
153 	return job;
154 }
155 
156 /*
157  * CREATE OR REPLACE FUNCTION delete_job(job_id INTEGER) RETURNS VOID
158  */
159 Datum
job_delete(PG_FUNCTION_ARGS)160 job_delete(PG_FUNCTION_ARGS)
161 {
162 	int32 job_id = PG_GETARG_INT32(0);
163 	BgwJob *job;
164 	Oid owner;
165 
166 	TS_PREVENT_FUNC_IF_READ_ONLY();
167 
168 	job = find_job(job_id, PG_ARGISNULL(0), false);
169 	owner = get_role_oid(NameStr(job->fd.owner), false);
170 
171 	if (!has_privs_of_role(GetUserId(), owner))
172 		ereport(ERROR,
173 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
174 				 errmsg("insufficient permissions to delete job for user \"%s\"",
175 						NameStr(job->fd.owner))));
176 
177 	ts_bgw_job_delete_by_id(job_id);
178 
179 	PG_RETURN_VOID();
180 }
181 
182 /* This function only updates the fields modifiable with alter_job. */
183 static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo * ti,void * const data)184 bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
185 {
186 	BgwJob *updated_job = (BgwJob *) data;
187 	bool should_free;
188 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
189 	HeapTuple new_tuple;
190 
191 	Datum values[Natts_bgw_job] = { 0 };
192 	bool isnull[Natts_bgw_job] = { 0 };
193 	bool repl[Natts_bgw_job] = { 0 };
194 
195 	Datum old_schedule_interval =
196 		slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
197 	Assert(!isnull[0]);
198 
199 	/* when we update the schedule interval, modify the next start time as well*/
200 	if (!DatumGetBool(DirectFunctionCall2(interval_eq,
201 										  old_schedule_interval,
202 										  IntervalPGetDatum(&updated_job->fd.schedule_interval))))
203 	{
204 		BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);
205 
206 		if (stat != NULL)
207 		{
208 			TimestampTz next_start = DatumGetTimestampTz(
209 				DirectFunctionCall2(timestamptz_pl_interval,
210 									TimestampTzGetDatum(stat->fd.last_finish),
211 									IntervalPGetDatum(&updated_job->fd.schedule_interval)));
212 			/* allow DT_NOBEGIN for next_start here through allow_unset=true in the case that
213 			 * last_finish is DT_NOBEGIN,
214 			 * This means the value is counted as unset which is what we want */
215 			ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
216 		}
217 		values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
218 			IntervalPGetDatum(&updated_job->fd.schedule_interval);
219 		repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
220 	}
221 
222 	values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
223 		IntervalPGetDatum(&updated_job->fd.max_runtime);
224 	repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;
225 
226 	values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
227 		Int32GetDatum(updated_job->fd.max_retries);
228 	repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;
229 
230 	values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
231 		IntervalPGetDatum(&updated_job->fd.retry_period);
232 	repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;
233 
234 	values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
235 		BoolGetDatum(updated_job->fd.scheduled);
236 	repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;
237 
238 	repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
239 	if (updated_job->fd.config)
240 	{
241 		job_config_check(&updated_job->fd.proc_schema,
242 						 &updated_job->fd.proc_name,
243 						 updated_job->fd.config);
244 		values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
245 			JsonbPGetDatum(updated_job->fd.config);
246 	}
247 	else
248 		isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
249 
250 	new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);
251 
252 	ts_catalog_update(ti->scanrel, new_tuple);
253 
254 	heap_freetuple(new_tuple);
255 	if (should_free)
256 		heap_freetuple(tuple);
257 
258 	return SCAN_DONE;
259 }
260 
261 /*
262  * Overwrite job with specified job_id with the given fields
263  *
264  * This function only updates the fields modifiable with alter_job.
265  */
266 static void
ts_bgw_job_update_by_id(int32 job_id,BgwJob * job)267 ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
268 {
269 	ScanKeyData scankey[1];
270 	Catalog *catalog = ts_catalog_get();
271 	ScanTupLock scantuplock = {
272 		.waitpolicy = LockWaitBlock,
273 		.lockmode = LockTupleExclusive,
274 	};
275 	ScannerCtx scanctx = { .table = catalog_get_table_id(catalog, BGW_JOB),
276 						   .index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
277 						   .nkeys = 1,
278 						   .scankey = scankey,
279 						   .data = job,
280 						   .limit = 1,
281 						   .tuple_found = bgw_job_tuple_update_by_id,
282 						   .lockmode = RowExclusiveLock,
283 						   .scandirection = ForwardScanDirection,
284 						   .result_mctx = CurrentMemoryContext,
285 						   .tuplock = &scantuplock };
286 
287 	ScanKeyInit(&scankey[0],
288 				Anum_bgw_job_pkey_idx_id,
289 				BTEqualStrategyNumber,
290 				F_INT4EQ,
291 				Int32GetDatum(job_id));
292 
293 	ts_scanner_scan(&scanctx);
294 }
295 
296 /*
297  * CREATE OR REPLACE PROCEDURE run_job(job_id INTEGER)
298  */
299 Datum
job_run(PG_FUNCTION_ARGS)300 job_run(PG_FUNCTION_ARGS)
301 {
302 	int32 job_id = PG_GETARG_INT32(0);
303 	BgwJob *job = find_job(job_id, PG_ARGISNULL(0), false);
304 
305 	job_execute(job);
306 
307 	PG_RETURN_VOID();
308 }
309 
310 /*
311  * CREATE OR REPLACE FUNCTION alter_job(
312  * 0    job_id INTEGER,
313  * 1    schedule_interval INTERVAL = NULL,
314  * 2    max_runtime INTERVAL = NULL,
315  * 3    max_retries INTEGER = NULL,
316  * 4    retry_period INTERVAL = NULL,
317  * 5    scheduled BOOL = NULL,
318  * 6    config JSONB = NULL,
319  * 7    next_start TIMESTAMPTZ = NULL
320  * 8    if_exists BOOL = FALSE,
321  * ) RETURNS TABLE (
322  *      job_id INTEGER,
323  *      schedule_interval INTERVAL,
324  *      max_runtime INTERVAL,
325  *      max_retries INTEGER,
326  *      retry_period INTERVAL,
327  *      scheduled BOOL,
328  *      config JSONB,
329  *      next_start TIMESTAMPTZ
330  * )
331  */
332 Datum
job_alter(PG_FUNCTION_ARGS)333 job_alter(PG_FUNCTION_ARGS)
334 {
335 	BgwJobStat *stat;
336 	TupleDesc tupdesc;
337 	Datum values[ALTER_JOB_NUM_COLS] = { 0 };
338 	bool nulls[ALTER_JOB_NUM_COLS] = { false };
339 	HeapTuple tuple;
340 	TimestampTz next_start;
341 	int job_id = PG_GETARG_INT32(0);
342 	bool if_exists = PG_GETARG_BOOL(8);
343 	BgwJob *job;
344 
345 	TS_PREVENT_FUNC_IF_READ_ONLY();
346 
347 	/* check that caller accepts tuple and abort early if that is not the
348 	 * case */
349 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
350 		ereport(ERROR,
351 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
352 				 errmsg("function returning record called in context "
353 						"that cannot accept type record")));
354 
355 	job = find_job(job_id, PG_ARGISNULL(0), if_exists);
356 	if (job == NULL)
357 		PG_RETURN_NULL();
358 
359 	ts_bgw_job_permission_check(job);
360 
361 	if (!PG_ARGISNULL(1))
362 		job->fd.schedule_interval = *PG_GETARG_INTERVAL_P(1);
363 	if (!PG_ARGISNULL(2))
364 		job->fd.max_runtime = *PG_GETARG_INTERVAL_P(2);
365 	if (!PG_ARGISNULL(3))
366 		job->fd.max_retries = PG_GETARG_INT32(3);
367 	if (!PG_ARGISNULL(4))
368 		job->fd.retry_period = *PG_GETARG_INTERVAL_P(4);
369 	if (!PG_ARGISNULL(5))
370 		job->fd.scheduled = PG_GETARG_BOOL(5);
371 	if (!PG_ARGISNULL(6))
372 		job->fd.config = PG_GETARG_JSONB_P(6);
373 
374 	ts_bgw_job_update_by_id(job_id, job);
375 
376 	if (!PG_ARGISNULL(7))
377 		ts_bgw_job_stat_upsert_next_start(job_id, PG_GETARG_TIMESTAMPTZ(7));
378 
379 	stat = ts_bgw_job_stat_find(job_id);
380 	if (stat != NULL)
381 		next_start = stat->fd.next_start;
382 	else
383 		next_start = DT_NOBEGIN;
384 
385 	tupdesc = BlessTupleDesc(tupdesc);
386 	values[0] = Int32GetDatum(job->fd.id);
387 	values[1] = IntervalPGetDatum(&job->fd.schedule_interval);
388 	values[2] = IntervalPGetDatum(&job->fd.max_runtime);
389 	values[3] = Int32GetDatum(job->fd.max_retries);
390 	values[4] = IntervalPGetDatum(&job->fd.retry_period);
391 	values[5] = BoolGetDatum(job->fd.scheduled);
392 
393 	if (job->fd.config == NULL)
394 		nulls[6] = true;
395 	else
396 		values[6] = JsonbPGetDatum(job->fd.config);
397 
398 	values[7] = TimestampTzGetDatum(next_start);
399 
400 	tuple = heap_form_tuple(tupdesc, values, nulls);
401 	return HeapTupleGetDatum(tuple);
402 }
403