1 /*-------------------------------------------------------------------------
2 * cleanup_thread.c
3 *
4 * Periodic cleanup of confirm-, event- and log-data.
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 *-------------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <errno.h>
20 #include <signal.h>
21 #include <sys/types.h>
22 #ifndef WIN32
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26
27 #include "slon.h"
28
29
30 /* ----------
31 * Global data
32 * ----------
33 */
34 int vac_frequency = SLON_VACUUM_FREQUENCY;
35 char *cleanup_interval;
36
37 static unsigned long earliest_xid = 0;
38 static unsigned long get_earliest_xid(PGconn *dbconn);
39
40 /* ----------
41 * cleanupThread_main
42 *
43 * Periodically calls the stored procedure to remove old events and log data and
44 * vacuums those tables.
45 * ----------
46 */
47 void *
cleanupThread_main(void * dummy)48 cleanupThread_main( /* @unused@ */ void *dummy)
49 {
50 SlonConn *conn;
51 SlonDString query_baseclean;
52 SlonDString query_cleanup_interval_second;
53 SlonDString query2;
54 SlonDString query_pertbl;
55
56 PGconn *dbconn;
57 PGresult *res;
58 PGresult *res2;
59 PGresult *res3;
60 struct timeval tv_start;
61 struct timeval tv_end;
62 int t;
63 int vac_count = 0;
64 int vac_enable = SLON_VACUUM_FREQUENCY;
65 char *vacuum_action;
66 int ntuples;
67 int cleanup_interval_second;
68 int vac_bias = 0;
69
70 slon_log(SLON_CONFIG, "cleanupThread: thread starts\n");
71 /*
72 * Connect to the local database
73 */
74 if ((conn = slon_connectdb(rtcfg_conninfo, "local_cleanup")) == NULL)
75 {
76 #ifndef WIN32
77 (void) kill(getpid(), SIGTERM);
78 pthread_exit(NULL);
79 #else
80 exit(0);
81 #endif
82 /* slon_retry(); */
83 }
84
85 dbconn = conn->dbconn;
86 monitor_state("local_cleanup", 0, conn->conn_pid, "thread main loop", 0, "n/a");
87 /*
88 *rnancy : Want the vacuum time bias to be 10% of the cleanup interval
89 */
90 if (vac_bias == 0)
91 {
92 /* convert cleanup_interval in second*/
93
94 dstring_init(&query_cleanup_interval_second);
95 slon_mkquery(&query_cleanup_interval_second,
96 "select date_part('epoch','%s'::interval);",
97 cleanup_interval
98 );
99
100 res3 = PQexec(dbconn, dstring_data(&query_cleanup_interval_second));
101 if (PQresultStatus(res3) != PGRES_TUPLES_OK) /* query error */
102 {
103 slon_log(SLON_ERROR,
104 "cleanupThread: \"%s\" - %s",
105 dstring_data(&query_cleanup_interval_second), PQresultErrorMessage(res3));
106 slon_retry();
107 }
108 cleanup_interval_second = atoi(PQgetvalue(res3, 0, 0));
109
110 slon_log(SLON_DEBUG1, "cleanupThread: Cleanup interval is : %ds\n", cleanup_interval_second);
111
112 vac_bias = (int)((cleanup_interval_second * 10) /100);
113
114 PQclear(res3);
115 dstring_free(&query_cleanup_interval_second);
116
117 }
118 slon_log(SLON_CONFIG, "cleanupThread: bias = %d\n", vac_bias);
119
120 /*
121 * Build the query string for calling the cleanupEvent() stored procedure
122 */
123 dstring_init(&query_baseclean);
124 slon_mkquery(&query_baseclean,
125 "begin;"
126 "lock table %s.sl_config_lock;"
127 "select %s.cleanupEvent('%s'::interval);"
128 "commit;",
129 rtcfg_namespace,
130 rtcfg_namespace,
131 cleanup_interval
132 );
133 dstring_init(&query2);
134
135 /*
136 * Loop until shutdown time arrived
137 *
138 * Note the introduction of vac_bias and an up-to-100s random "fuzz"; this
139 * reduces the likelihood that having multiple slons hitting the same
140 * cluster will run into conflicts due to trying to vacuum common tables *
141 * such as pg_listener concurrently
142 */
143 while (sched_wait_time(conn,
144 SCHED_WAIT_SOCK_READ,
145 cleanup_interval_second * 1000 + vac_bias + (rand() % cleanup_interval_second)) == SCHED_STATUS_OK)
146
147 {
148 /*
149 * Call the stored procedure cleanupEvent()
150 */
151 monitor_state("local_cleanup", 0, conn->conn_pid, "cleanupEvent", 0, "n/a");
152 gettimeofday(&tv_start, NULL);
153 res = PQexec(dbconn, dstring_data(&query_baseclean));
154 if (PQresultStatus(res) != PGRES_COMMAND_OK)
155 {
156 slon_log(SLON_FATAL,
157 "cleanupThread: \"%s\" - %s",
158 dstring_data(&query_baseclean), PQresultErrorMessage(res));
159 PQclear(res);
160 slon_retry();
161 break;
162 }
163 PQclear(res);
164 gettimeofday(&tv_end, NULL);
165 slon_log(SLON_INFO,
166 "cleanupThread: %8.3f seconds for cleanupEvent()\n",
167 TIMEVAL_DIFF(&tv_start, &tv_end));
168
169 /*
170 * Detain the usual suspects (vacuum event and log data)
171 */
172 if (vac_frequency != 0)
173 {
174 vac_enable = vac_frequency;
175 }
176 if (++vac_count >= vac_enable)
177 {
178 unsigned long latest_xid;
179
180 vac_count = 0;
181
182 latest_xid = get_earliest_xid(dbconn);
183 vacuum_action = "";
184 if (earliest_xid == latest_xid)
185 {
186
187 slon_log(SLON_INFO,
188 "cleanupThread: xid %d still active - analyze instead\n",
189 earliest_xid);
190 }
191 else
192 {
193 if (vac_enable == vac_frequency)
194 {
195 vacuum_action = "vacuum ";
196 }
197 }
198 earliest_xid = latest_xid;
199
200 /*
201 * Build the query string for vacuuming replication runtime data
202 * and event tables
203 */
204 gettimeofday(&tv_start, NULL);
205
206 slon_mkquery(&query2, "select nspname, relname from %s.TablesToVacuum();", rtcfg_namespace);
207 res = PQexec(dbconn, dstring_data(&query2));
208
209 /*
210 * for each table... and we should set up the query to return not
211 * only the table name, but also a boolean to support what's in
212 * the SELECT below; that'll nicely simplify this process...
213 */
214
215 if (PQresultStatus(res) != PGRES_TUPLES_OK) /* query error */
216 {
217 slon_log(SLON_ERROR,
218 "cleanupThread: \"%s\" - %s",
219 dstring_data(&query2), PQresultErrorMessage(res));
220 }
221 ntuples = PQntuples(res);
222 slon_log(SLON_DEBUG1, "cleanupThread: number of tables to clean: %d\n", ntuples);
223
224 monitor_state("local_cleanup", 0, conn->conn_pid, "vacuumTables", 0, "n/a");
225 for (t = 0; t < ntuples; t++)
226 {
227 char *tab_nspname = PQgetvalue(res, t, 0);
228 char *tab_relname = PQgetvalue(res, t, 1);
229 ExecStatusType vrc;
230
231 slon_log(SLON_DEBUG1, "cleanupThread: %s analyze \"%s\".%s;\n",
232 vacuum_action, tab_nspname, tab_relname);
233 dstring_init(&query_pertbl);
234 slon_mkquery(&query_pertbl, "%s analyze \"%s\".%s;",
235 vacuum_action, tab_nspname, tab_relname);
236 res2 = PQexec(dbconn, dstring_data(&query_pertbl));
237 vrc = PQresultStatus(res2);
238 if (vrc == PGRES_FATAL_ERROR)
239 {
240 slon_log(SLON_ERROR,
241 "cleanupThread: \"%s\" - %s\n",
242 dstring_data(&query_pertbl), PQresultErrorMessage(res2));
243
244 /*
245 * slon_retry(); break;
246 */
247 }
248 else
249 {
250 if (vrc == PGRES_NONFATAL_ERROR)
251 {
252 slon_log(SLON_WARN,
253 "cleanupThread: \"%s\" - %s\n",
254 dstring_data(&query_pertbl), PQresultErrorMessage(res2));
255
256 }
257 }
258 PQclear(res2);
259 dstring_reset(&query_pertbl);
260 }
261 gettimeofday(&tv_end, NULL);
262 slon_log(SLON_INFO,
263 "cleanupThread: %8.3f seconds for vacuuming\n",
264 TIMEVAL_DIFF(&tv_start, &tv_end));
265
266 /*
267 * Free Resources
268 */
269 dstring_free(&query_pertbl);
270 PQclear(res);
271 monitor_state("local_cleanup", 0, conn->conn_pid, "thread main loop", 0, "n/a");
272 }
273 }
274
275 /*
276 * Free Resources
277 */
278 dstring_free(&query_baseclean);
279 dstring_free(&query2);
280
281 /*
282 * Disconnect from the database
283 */
284 slon_disconnectdb(conn);
285
286 /*
287 * Terminate this thread
288 */
289 slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
290 pthread_exit(NULL);
291 }
292
293
294 /* ----------
295 * get_earliest_xid()
296 *
297 * reads the earliest XID that is still active.
298 *
299 * The idea is that if, between cleanupThread iterations, this XID has
300 * not changed, then an old transaction is still in progress,
301 * PostgreSQL is holding onto the tuples, and there is no value in
302 * doing VACUUMs of the various Slony-I tables.
303 * ----------
304 */
305 static unsigned long
get_earliest_xid(PGconn * dbconn)306 get_earliest_xid(PGconn *dbconn)
307 {
308 int64 xid;
309 PGresult *res;
310 SlonDString query;
311
312 dstring_init(&query);
313 (void) slon_mkquery(&query, "select pg_catalog.txid_snapshot_xmin(pg_catalog.txid_current_snapshot());");
314 res = PQexec(dbconn, dstring_data(&query));
315 if (PQresultStatus(res) != PGRES_TUPLES_OK)
316 {
317 slon_log(SLON_FATAL, "cleanupThread: could not txid_snapshot_xmin()!\n");
318 PQclear(res);
319 slon_retry();
320 return (unsigned long) -1;
321 }
322 xid = strtoll(PQgetvalue(res, 0, 0), NULL, 10);
323 slon_log(SLON_DEBUG1, "cleanupThread: minxid: %d\n", xid);
324 PQclear(res);
325 dstring_free(&query);
326 return (unsigned long) xid;
327 }
328