1 /*-------------------------------------------------------------------------
2  * cleanup_thread.c
3  *
4  *	Periodic cleanup of confirm-, event- and log-data.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <errno.h>
20 #include <signal.h>
21 #include <sys/types.h>
22 #ifndef WIN32
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #include "slon.h"
28 
29 
30 /* ----------
31  * Global data
32  * ----------
33  */
34 int			vac_frequency = SLON_VACUUM_FREQUENCY;
35 char	   *cleanup_interval;
36 
37 static unsigned long earliest_xid = 0;
38 static unsigned long get_earliest_xid(PGconn *dbconn);
39 
40 /* ----------
41  * cleanupThread_main
42  *
43  * Periodically calls the stored procedure to remove old events and log data and
44  * vacuums those tables.
45  * ----------
46  */
47 void *
cleanupThread_main(void * dummy)48 cleanupThread_main( /* @unused@ */ void *dummy)
49 {
50 	SlonConn   *conn;
51 	SlonDString query_baseclean;
52 	SlonDString query_cleanup_interval_second;
53 	SlonDString query2;
54 	SlonDString query_pertbl;
55 
56 	PGconn	   *dbconn;
57 	PGresult   *res;
58 	PGresult   *res2;
59 	PGresult   *res3;
60 	struct timeval tv_start;
61 	struct timeval tv_end;
62 	int			t;
63 	int			vac_count = 0;
64 	int			vac_enable = SLON_VACUUM_FREQUENCY;
65 	char	   *vacuum_action;
66 	int			ntuples;
67 	int              cleanup_interval_second;
68 	int vac_bias = 0;
69 
70 	slon_log(SLON_CONFIG, "cleanupThread: thread starts\n");
71 	/*
72 	 * Connect to the local database
73 	 */
74 	if ((conn = slon_connectdb(rtcfg_conninfo, "local_cleanup")) == NULL)
75 	{
76 #ifndef WIN32
77 		(void) kill(getpid(), SIGTERM);
78 		pthread_exit(NULL);
79 #else
80 		exit(0);
81 #endif
82 		/* slon_retry(); */
83 	}
84 
85 	dbconn = conn->dbconn;
86         monitor_state("local_cleanup", 0, conn->conn_pid, "thread main loop", 0, "n/a");
87         /*
88          *rnancy : Want the vacuum time bias to be 10% of the cleanup interval
89          */
90         if (vac_bias == 0)
91         {
92                 /* convert cleanup_interval in second*/
93 
94                 dstring_init(&query_cleanup_interval_second);
95                 slon_mkquery(&query_cleanup_interval_second,
96                                  "select date_part('epoch','%s'::interval);",
97                                  cleanup_interval
98                 );
99 
100                 res3 = PQexec(dbconn, dstring_data(&query_cleanup_interval_second));
101                 if (PQresultStatus(res3) != PGRES_TUPLES_OK) /* query error */
102                         {
103                                 slon_log(SLON_ERROR,
104                                                  "cleanupThread: \"%s\" - %s",
105                                                  dstring_data(&query_cleanup_interval_second), PQresultErrorMessage(res3));
106                                 slon_retry();
107                         }
108                 cleanup_interval_second = atoi(PQgetvalue(res3, 0, 0));
109 
110                 slon_log(SLON_DEBUG1, "cleanupThread: Cleanup interval is : %ds\n", cleanup_interval_second);
111 
112                 vac_bias = (int)((cleanup_interval_second * 10) /100);
113 
114                 PQclear(res3);
115                 dstring_free(&query_cleanup_interval_second);
116 
117         }
118         slon_log(SLON_CONFIG, "cleanupThread: bias = %d\n", vac_bias);
119 
120 	/*
121 	 * Build the query string for calling the cleanupEvent() stored procedure
122 	 */
123 	dstring_init(&query_baseclean);
124 	slon_mkquery(&query_baseclean,
125 				 "begin;"
126 				 "lock table %s.sl_config_lock;"
127 				 "select %s.cleanupEvent('%s'::interval);"
128 				 "commit;",
129 				 rtcfg_namespace,
130 				 rtcfg_namespace,
131 				 cleanup_interval
132 		);
133 	dstring_init(&query2);
134 
135 	/*
136 	 * Loop until shutdown time arrived
137 	 *
138 	 * Note the introduction of vac_bias and an up-to-100s random "fuzz"; this
139 	 * reduces the likelihood that having multiple slons hitting the same
140 	 * cluster will run into conflicts due to trying to vacuum common tables *
141 	 * such as pg_listener concurrently
142 	 */
143 	 while (sched_wait_time(conn,
144                                  SCHED_WAIT_SOCK_READ,
145                                  cleanup_interval_second * 1000 + vac_bias + (rand() % cleanup_interval_second)) == SCHED_STATUS_OK)
146 
147 	{
148 		/*
149 		 * Call the stored procedure cleanupEvent()
150 		 */
151 		monitor_state("local_cleanup", 0, conn->conn_pid, "cleanupEvent", 0, "n/a");
152 		gettimeofday(&tv_start, NULL);
153 		res = PQexec(dbconn, dstring_data(&query_baseclean));
154 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
155 		{
156 			slon_log(SLON_FATAL,
157 					 "cleanupThread: \"%s\" - %s",
158 				  dstring_data(&query_baseclean), PQresultErrorMessage(res));
159 			PQclear(res);
160 			slon_retry();
161 			break;
162 		}
163 		PQclear(res);
164 		gettimeofday(&tv_end, NULL);
165 		slon_log(SLON_INFO,
166 				 "cleanupThread: %8.3f seconds for cleanupEvent()\n",
167 				 TIMEVAL_DIFF(&tv_start, &tv_end));
168 
169 		/*
170 		 * Detain the usual suspects (vacuum event and log data)
171 		 */
172 		if (vac_frequency != 0)
173 		{
174 			vac_enable = vac_frequency;
175 		}
176 		if (++vac_count >= vac_enable)
177 		{
178 			unsigned long latest_xid;
179 
180 			vac_count = 0;
181 
182 			latest_xid = get_earliest_xid(dbconn);
183 			vacuum_action = "";
184 			if (earliest_xid == latest_xid)
185 			{
186 
187 				slon_log(SLON_INFO,
188 					"cleanupThread: xid %d still active - analyze instead\n",
189 						 earliest_xid);
190 			}
191 			else
192 			{
193 				if (vac_enable == vac_frequency)
194 				{
195 					vacuum_action = "vacuum ";
196 				}
197 			}
198 			earliest_xid = latest_xid;
199 
200 			/*
201 			 * Build the query string for vacuuming replication runtime data
202 			 * and event tables
203 			 */
204 			gettimeofday(&tv_start, NULL);
205 
206 			slon_mkquery(&query2, "select nspname, relname from %s.TablesToVacuum();", rtcfg_namespace);
207 			res = PQexec(dbconn, dstring_data(&query2));
208 
209 			/*
210 			 * for each table...  and we should set up the query to return not
211 			 * only the table name, but also a boolean to support what's in
212 			 * the SELECT below; that'll nicely simplify this process...
213 			 */
214 
215 			if (PQresultStatus(res) != PGRES_TUPLES_OK) /* query error */
216 			{
217 				slon_log(SLON_ERROR,
218 						 "cleanupThread: \"%s\" - %s",
219 						 dstring_data(&query2), PQresultErrorMessage(res));
220 			}
221 			ntuples = PQntuples(res);
222 			slon_log(SLON_DEBUG1, "cleanupThread: number of tables to clean: %d\n", ntuples);
223 
224 			monitor_state("local_cleanup", 0, conn->conn_pid, "vacuumTables", 0, "n/a");
225 			for (t = 0; t < ntuples; t++)
226 			{
227 				char	   *tab_nspname = PQgetvalue(res, t, 0);
228 				char	   *tab_relname = PQgetvalue(res, t, 1);
229 				ExecStatusType vrc;
230 
231 				slon_log(SLON_DEBUG1, "cleanupThread: %s analyze \"%s\".%s;\n",
232 						 vacuum_action, tab_nspname, tab_relname);
233 				dstring_init(&query_pertbl);
234 				slon_mkquery(&query_pertbl, "%s analyze \"%s\".%s;",
235 							 vacuum_action, tab_nspname, tab_relname);
236 				res2 = PQexec(dbconn, dstring_data(&query_pertbl));
237 				vrc = PQresultStatus(res2);
238 				if (vrc == PGRES_FATAL_ERROR)
239 				{
240 					slon_log(SLON_ERROR,
241 							 "cleanupThread: \"%s\" - %s\n",
242 					dstring_data(&query_pertbl), PQresultErrorMessage(res2));
243 
244 					/*
245 					 * slon_retry(); break;
246 					 */
247 				}
248 				else
249 				{
250 					if (vrc == PGRES_NONFATAL_ERROR)
251 					{
252 						slon_log(SLON_WARN,
253 								 "cleanupThread: \"%s\" - %s\n",
254 								 dstring_data(&query_pertbl), PQresultErrorMessage(res2));
255 
256 					}
257 				}
258 				PQclear(res2);
259 				dstring_reset(&query_pertbl);
260 			}
261 			gettimeofday(&tv_end, NULL);
262 			slon_log(SLON_INFO,
263 					 "cleanupThread: %8.3f seconds for vacuuming\n",
264 					 TIMEVAL_DIFF(&tv_start, &tv_end));
265 
266 			/*
267 			 * Free Resources
268 			 */
269 			dstring_free(&query_pertbl);
270 			PQclear(res);
271 			monitor_state("local_cleanup", 0, conn->conn_pid, "thread main loop", 0, "n/a");
272 		}
273 	}
274 
275 	/*
276 	 * Free Resources
277 	 */
278 	dstring_free(&query_baseclean);
279 	dstring_free(&query2);
280 
281 	/*
282 	 * Disconnect from the database
283 	 */
284 	slon_disconnectdb(conn);
285 
286 	/*
287 	 * Terminate this thread
288 	 */
289 	slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
290 	pthread_exit(NULL);
291 }
292 
293 
294 /* ----------
295  * get_earliest_xid()
296  *
297  * reads the earliest XID that is still active.
298  *
299  * The idea is that if, between cleanupThread iterations, this XID has
300  * not changed, then an old transaction is still in progress,
301  * PostgreSQL is holding onto the tuples, and there is no value in
302  * doing VACUUMs of the various Slony-I tables.
303  * ----------
304  */
305 static unsigned long
get_earliest_xid(PGconn * dbconn)306 get_earliest_xid(PGconn *dbconn)
307 {
308 	int64		xid;
309 	PGresult   *res;
310 	SlonDString query;
311 
312 	dstring_init(&query);
313 	(void) slon_mkquery(&query, "select pg_catalog.txid_snapshot_xmin(pg_catalog.txid_current_snapshot());");
314 	res = PQexec(dbconn, dstring_data(&query));
315 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
316 	{
317 		slon_log(SLON_FATAL, "cleanupThread: could not txid_snapshot_xmin()!\n");
318 		PQclear(res);
319 		slon_retry();
320 		return (unsigned long) -1;
321 	}
322 	xid = strtoll(PQgetvalue(res, 0, 0), NULL, 10);
323 	slon_log(SLON_DEBUG1, "cleanupThread: minxid: %d\n", xid);
324 	PQclear(res);
325 	dstring_free(&query);
326 	return (unsigned long) xid;
327 }
328