1 /*-------------------------------------------------------------------------
2  * sync_thread.c
3  *
4  *	Implementation of the thread that generates SYNC events.
5  *
6  *	Copyright (c) 2003-2009, PostgreSQL Global Development Group
7  *	Author: Jan Wieck, Afilias USA INC.
8  *
9  *
10  *-------------------------------------------------------------------------
11  */
12 
13 
14 #include <pthread.h>
15 
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <errno.h>
20 #include <signal.h>
21 #include <sys/types.h>
22 
23 #ifndef WIN32
24 #include <unistd.h>
25 #include <sys/time.h>
26 #endif
27 
28 #include "slon.h"
29 
30 
31 /* ----------
32  * Global variables
33  * ----------
34  */
35 int			sync_interval;
36 int			sync_interval_timeout;
37 
38 
39 /* ----------
40  * slon_localSyncThread
41  *
42  * Generate SYNC event if local database activity created new log info.
43  * ----------
44  */
45 void *
syncThread_main(void * dummy)46 syncThread_main(void *dummy)
47 {
48 	SlonConn   *conn;
49 	char		last_actseq_buf[64];
50 	SlonDString query1;
51 	SlonDString query2;
52 	PGconn	   *dbconn;
53 	PGresult   *res;
54 	int			timeout_count;
55 
56 	slon_log(SLON_INFO,
57 			 "syncThread: thread starts\n");
58 
59 	/*
60 	 * Connect to the local database
61 	 */
62 	if ((conn = slon_connectdb(rtcfg_conninfo, "local_sync")) == NULL)
63 		slon_retry();
64 	dbconn = conn->dbconn;
65 	monitor_state("local_sync", 0, conn->conn_pid, "thread main loop", 0, "n/a");
66 
67 	/*
68 	 * We don't initialize the last known action sequence to the actual value.
69 	 * This causes that we create a SYNC event allways on startup, just in
70 	 * case.
71 	 */
72 	last_actseq_buf[0] = '\0';
73 
74 	/*
75 	 * Build the query that starts a transaction and retrieves the last value
76 	 * from the action sequence.
77 	 */
78 	dstring_init(&query1);
79 	slon_mkquery(&query1,
80 				 "start transaction;"
81 				 "set transaction isolation level serializable;"
82 				 "lock table %s.sl_event_lock;"
83 				 "select last_value from %s.sl_action_seq;",
84 				 rtcfg_namespace, rtcfg_namespace);
85 
86 	/*
87 	 * Build the query that calls createEvent() for the SYNC
88 	 */
89 	dstring_init(&query2);
90 	slon_mkquery(&query2,
91 				 "select %s.createEvent('_%s', 'SYNC', NULL);",
92 				 rtcfg_namespace, rtcfg_cluster_name);
93 
94 	timeout_count = (sync_interval_timeout == 0) ? 0 :
95 		sync_interval_timeout - sync_interval;
96 	while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, sync_interval) == SCHED_STATUS_OK)
97 	{
98 		/*
99 		 * Start a serializable transaction and get the last value from the
100 		 * action sequence number.
101 		 */
102 		res = PQexec(dbconn, dstring_data(&query1));
103 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
104 		{
105 			slon_log(SLON_FATAL,
106 					 "syncThread: \"%s\" - %s",
107 					 dstring_data(&query1), PQresultErrorMessage(res));
108 			PQclear(res);
109 			slon_retry();
110 			break;
111 		}
112 
113 		/*
114 		 * Check if it's identical to the last known seq or if the sync
115 		 * interval timeout has arrived.
116 		 */
117 		if (sync_interval_timeout != 0)
118 			timeout_count -= sync_interval;
119 
120 		if (strcmp(last_actseq_buf, PQgetvalue(res, 0, 0)) != 0 ||
121 			timeout_count < 0)
122 		{
123 			/*
124 			 * Action sequence has changed, generate a SYNC event and read the
125 			 * resulting currval of the event sequence.
126 			 */
127 			monitor_state("local_sync", 0, conn->conn_pid, "GenSync", 0, "n/a");
128 			strcpy(last_actseq_buf, PQgetvalue(res, 0, 0));
129 
130 			PQclear(res);
131 			res = PQexec(dbconn, dstring_data(&query2));
132 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
133 			{
134 				slon_log(SLON_FATAL,
135 						 "syncThread: \"%s\" - %s",
136 						 dstring_data(&query2), PQresultErrorMessage(res));
137 				PQclear(res);
138 				slon_retry();
139 				break;
140 			}
141 			slon_log(SLON_DEBUG2,
142 					 "syncThread: new sl_action_seq %s - SYNC %s\n",
143 					 last_actseq_buf, PQgetvalue(res, 0, 0));
144 			PQclear(res);
145 
146 			/*
147 			 * Commit the transaction
148 			 */
149 			res = PQexec(dbconn, "commit transaction;");
150 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
151 			{
152 				slon_log(SLON_FATAL,
153 						 "syncThread: \"commit transaction;\" - %s",
154 						 PQresultErrorMessage(res));
155 				PQclear(res);
156 				slon_retry();
157 			}
158 			PQclear(res);
159 
160 			/*
161 			 * Restart the timeout on a sync.
162 			 */
163 			timeout_count = (sync_interval_timeout == 0) ? 0 :
164 				sync_interval_timeout - sync_interval;
165 		}
166 		else
167 		{
168 			/*
169 			 * No database activity detected - rollback.
170 			 */
171 			PQclear(res);
172 			res = PQexec(dbconn, "rollback transaction;");
173 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
174 			{
175 				slon_log(SLON_FATAL,
176 						 "syncThread: \"rollback transaction;\" - %s",
177 						 PQresultErrorMessage(res));
178 				PQclear(res);
179 				slon_retry();
180 			}
181 			PQclear(res);
182 		}
183 		monitor_state("local_sync", 0, conn->conn_pid, "thread main loop", 0, "n/a");
184 	}
185 
186 	dstring_free(&query1);
187 	dstring_free(&query2);
188 	slon_disconnectdb(conn);
189 
190 	slon_log(SLON_INFO, "syncThread: thread done\n");
191 	pthread_exit(NULL);
192 }
193 
194 /*
195  * Local Variables:
196  *	tab-width: 4
197  *	c-indent-level: 4
198  *	c-basic-offset: 4
199  * End:
200  */
201