1 /*-------------------------------------------------------------------------
2 * sync_thread.c
3 *
4 * Implementation of the thread that generates SYNC events.
5 *
6 * Copyright (c) 2003-2009, PostgreSQL Global Development Group
7 * Author: Jan Wieck, Afilias USA INC.
8 *
9 *
10 *-------------------------------------------------------------------------
11 */
12
13
14 #include <pthread.h>
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <errno.h>
20 #include <signal.h>
21 #include <sys/types.h>
22
23 #ifndef WIN32
24 #include <unistd.h>
25 #include <sys/time.h>
26 #endif
27
28 #include "slon.h"
29
30
31 /* ----------
32 * Global variables
33 * ----------
34 */
35 int sync_interval;
36 int sync_interval_timeout;
37
38
39 /* ----------
40 * slon_localSyncThread
41 *
42 * Generate SYNC event if local database activity created new log info.
43 * ----------
44 */
45 void *
syncThread_main(void * dummy)46 syncThread_main(void *dummy)
47 {
48 SlonConn *conn;
49 char last_actseq_buf[64];
50 SlonDString query1;
51 SlonDString query2;
52 PGconn *dbconn;
53 PGresult *res;
54 int timeout_count;
55
56 slon_log(SLON_INFO,
57 "syncThread: thread starts\n");
58
59 /*
60 * Connect to the local database
61 */
62 if ((conn = slon_connectdb(rtcfg_conninfo, "local_sync")) == NULL)
63 slon_retry();
64 dbconn = conn->dbconn;
65 monitor_state("local_sync", 0, conn->conn_pid, "thread main loop", 0, "n/a");
66
67 /*
68 * We don't initialize the last known action sequence to the actual value.
69 * This causes that we create a SYNC event allways on startup, just in
70 * case.
71 */
72 last_actseq_buf[0] = '\0';
73
74 /*
75 * Build the query that starts a transaction and retrieves the last value
76 * from the action sequence.
77 */
78 dstring_init(&query1);
79 slon_mkquery(&query1,
80 "start transaction;"
81 "set transaction isolation level serializable;"
82 "lock table %s.sl_event_lock;"
83 "select last_value from %s.sl_action_seq;",
84 rtcfg_namespace, rtcfg_namespace);
85
86 /*
87 * Build the query that calls createEvent() for the SYNC
88 */
89 dstring_init(&query2);
90 slon_mkquery(&query2,
91 "select %s.createEvent('_%s', 'SYNC', NULL);",
92 rtcfg_namespace, rtcfg_cluster_name);
93
94 timeout_count = (sync_interval_timeout == 0) ? 0 :
95 sync_interval_timeout - sync_interval;
96 while (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, sync_interval) == SCHED_STATUS_OK)
97 {
98 /*
99 * Start a serializable transaction and get the last value from the
100 * action sequence number.
101 */
102 res = PQexec(dbconn, dstring_data(&query1));
103 if (PQresultStatus(res) != PGRES_TUPLES_OK)
104 {
105 slon_log(SLON_FATAL,
106 "syncThread: \"%s\" - %s",
107 dstring_data(&query1), PQresultErrorMessage(res));
108 PQclear(res);
109 slon_retry();
110 break;
111 }
112
113 /*
114 * Check if it's identical to the last known seq or if the sync
115 * interval timeout has arrived.
116 */
117 if (sync_interval_timeout != 0)
118 timeout_count -= sync_interval;
119
120 if (strcmp(last_actseq_buf, PQgetvalue(res, 0, 0)) != 0 ||
121 timeout_count < 0)
122 {
123 /*
124 * Action sequence has changed, generate a SYNC event and read the
125 * resulting currval of the event sequence.
126 */
127 monitor_state("local_sync", 0, conn->conn_pid, "GenSync", 0, "n/a");
128 strcpy(last_actseq_buf, PQgetvalue(res, 0, 0));
129
130 PQclear(res);
131 res = PQexec(dbconn, dstring_data(&query2));
132 if (PQresultStatus(res) != PGRES_TUPLES_OK)
133 {
134 slon_log(SLON_FATAL,
135 "syncThread: \"%s\" - %s",
136 dstring_data(&query2), PQresultErrorMessage(res));
137 PQclear(res);
138 slon_retry();
139 break;
140 }
141 slon_log(SLON_DEBUG2,
142 "syncThread: new sl_action_seq %s - SYNC %s\n",
143 last_actseq_buf, PQgetvalue(res, 0, 0));
144 PQclear(res);
145
146 /*
147 * Commit the transaction
148 */
149 res = PQexec(dbconn, "commit transaction;");
150 if (PQresultStatus(res) != PGRES_COMMAND_OK)
151 {
152 slon_log(SLON_FATAL,
153 "syncThread: \"commit transaction;\" - %s",
154 PQresultErrorMessage(res));
155 PQclear(res);
156 slon_retry();
157 }
158 PQclear(res);
159
160 /*
161 * Restart the timeout on a sync.
162 */
163 timeout_count = (sync_interval_timeout == 0) ? 0 :
164 sync_interval_timeout - sync_interval;
165 }
166 else
167 {
168 /*
169 * No database activity detected - rollback.
170 */
171 PQclear(res);
172 res = PQexec(dbconn, "rollback transaction;");
173 if (PQresultStatus(res) != PGRES_COMMAND_OK)
174 {
175 slon_log(SLON_FATAL,
176 "syncThread: \"rollback transaction;\" - %s",
177 PQresultErrorMessage(res));
178 PQclear(res);
179 slon_retry();
180 }
181 PQclear(res);
182 }
183 monitor_state("local_sync", 0, conn->conn_pid, "thread main loop", 0, "n/a");
184 }
185
186 dstring_free(&query1);
187 dstring_free(&query2);
188 slon_disconnectdb(conn);
189
190 slon_log(SLON_INFO, "syncThread: thread done\n");
191 pthread_exit(NULL);
192 }
193
194 /*
195 * Local Variables:
196 * tab-width: 4
197 * c-indent-level: 4
198 * c-basic-offset: 4
199 * End:
200 */
201