1 /*
2 Copyright (c) 2007, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /*
26 * Update on master wait for update on slave
27 *
28 */
29
30 #include <ndb_global.h>
31 #include <NdbApi.hpp>
32 #include <NdbSleep.h>
33 #include <sys/time.h>
34 #include <NdbOut.hpp>
35 #include <NDBT.hpp>
36
37 struct Xxx
38 {
39 Ndb *ndb;
40 const NdbDictionary::Table *table;
41 Uint32 pk_col;
42 Uint32 col;
43 };
44
45 struct XxxR
46 {
47 Uint32 pk_val;
48 Uint32 val;
49 struct timeval start_time;
50 Uint32 latency;
51 };
52
53 static int
54 prepare_master_or_slave(Ndb &myNdb,
55 const char* table,
56 const char* pk,
57 Uint32 pk_val,
58 const char* col,
59 struct Xxx &xxx,
60 struct XxxR &xxxr);
61 static void
62 run_master_update(struct Xxx &xxx, struct XxxR &xxxr);
63 static void
64 run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr);
65
66 #define PRINT_ERROR(code,msg) \
67 g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \
68 << ", code: " << code \
69 << ", msg: " << msg << ".\n"
70 #define APIERROR(error) { \
71 PRINT_ERROR((error).code, (error).message); \
72 exit(-1); }
73
main(int argc,char ** argv)74 int main(int argc, char** argv)
75 {
76 // ndb_init must be called first
77 ndb_init();
78
79 if (argc != 8)
80 {
81 ndbout << "Arguments are <connect_string cluster 1> <connect_string cluster 2> <database> <table name> <primary key> <value of primary key> <attribute to update>.\n";
82 exit(-1);
83 }
84
85 {
86 const char *opt_connectstring1 = argv[1];
87 const char *opt_connectstring2 = argv[2];
88 const char *opt_db = argv[3];
89 const char *opt_table = argv[4];
90 const char *opt_pk = argv[5];
91 const Uint32 opt_pk_val = atoi(argv[6]);
92 const char *opt_col = argv[7];
93
94 // Object representing the cluster 1
95 Ndb_cluster_connection cluster1_connection(opt_connectstring1);
96 // Object representing the cluster 2
97 Ndb_cluster_connection cluster2_connection(opt_connectstring2);
98
99 // connect cluster 1 and run application
100 // Connect to cluster 1 management server (ndb_mgmd)
101 if (cluster1_connection.connect(4 /* retries */,
102 5 /* delay between retries */,
103 1 /* verbose */))
104 {
105 g_err << "Cluster 1 management server was not ready within 30 secs.\n";
106 exit(-1);
107 }
108 // Optionally connect and wait for the storage nodes (ndbd's)
109 if (cluster1_connection.wait_until_ready(30,0) < 0)
110 {
111 g_err << "Cluster 1 was not ready within 30 secs.\n";
112 exit(-1);
113 }
114 // connect cluster 2 and run application
115 // Connect to cluster management server (ndb_mgmd)
116 if (cluster2_connection.connect(4 /* retries */,
117 5 /* delay between retries */,
118 1 /* verbose */))
119 {
120 g_err << "Cluster 2 management server was not ready within 30 secs.\n";
121 exit(-1);
122 }
123 // Optionally connect and wait for the storage nodes (ndbd's)
124 if (cluster2_connection.wait_until_ready(30,0) < 0)
125 {
126 g_err << "Cluster 2 was not ready within 30 secs.\n";
127 exit(-1);
128 }
129 // Object representing the database
130 Ndb myNdb1(&cluster1_connection, opt_db);
131 Ndb myNdb2(&cluster2_connection, opt_db);
132 //
133 struct Xxx xxx1;
134 struct Xxx xxx2;
135 struct XxxR xxxr;
136 prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col,
137 xxx1, xxxr);
138 prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col,
139 xxx2, xxxr);
140 while (1)
141 {
142 // run the application code
143 run_master_update(xxx1, xxxr);
144 run_slave_wait(xxx2, xxxr);
145 ndbout << "latency: " << xxxr.latency << endl;
146 }
147 }
148 // Note: all connections must have been destroyed before calling ndb_end()
149 ndb_end(0);
150
151 return 0;
152 }
153
154 static int
prepare_master_or_slave(Ndb & myNdb,const char * table,const char * pk,Uint32 pk_val,const char * col,struct Xxx & xxx,struct XxxR & xxxr)155 prepare_master_or_slave(Ndb &myNdb,
156 const char* table,
157 const char* pk,
158 Uint32 pk_val,
159 const char* col,
160 struct Xxx &xxx,
161 struct XxxR &xxxr)
162 {
163 if (myNdb.init())
164 APIERROR(myNdb.getNdbError());
165 const NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
166 const NdbDictionary::Table *myTable = myDict->getTable(table);
167 if (myTable == NULL)
168 APIERROR(myDict->getNdbError());
169 const NdbDictionary::Column *myPkCol = myTable->getColumn(pk);
170 if (myPkCol == NULL)
171 APIERROR(myDict->getNdbError());
172 if (myPkCol->getType() != NdbDictionary::Column::Unsigned)
173 {
174 PRINT_ERROR(0, "Primary key column not of type unsigned");
175 exit(-1);
176 }
177 const NdbDictionary::Column *myCol = myTable->getColumn(col);
178 if (myCol == NULL)
179 APIERROR(myDict->getNdbError());
180 if (myCol->getType() != NdbDictionary::Column::Unsigned)
181 {
182 PRINT_ERROR(0, "Update column not of type unsigned");
183 exit(-1);
184 }
185
186 xxx.ndb = &myNdb;
187 xxx.table = myTable;
188 xxx.pk_col = myPkCol->getColumnNo();
189 xxx.col = myCol->getColumnNo();
190
191 xxxr.pk_val = pk_val;
192
193 return 0;
194 }
195
run_master_update(struct Xxx & xxx,struct XxxR & xxxr)196 static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr)
197 {
198 Ndb *ndb = xxx.ndb;
199 const NdbDictionary::Table *myTable = xxx.table;
200 int retry_sleep= 10; /* 10 milliseconds */
201 int retries= 100;
202 while (1)
203 {
204 Uint32 val;
205 NdbTransaction *trans = ndb->startTransaction();
206 if (trans == NULL)
207 goto err;
208 {
209 NdbOperation *op = trans->getNdbOperation(myTable);
210 if (op == NULL)
211 APIERROR(trans->getNdbError());
212 op->readTupleExclusive();
213 op->equal(xxx.pk_col, xxxr.pk_val);
214 op->getValue(xxx.col, (char *)&val);
215 }
216 if (trans->execute(NdbTransaction::NoCommit))
217 goto err;
218 //fprintf(stderr, "read %u\n", val);
219 xxxr.val = val + 1;
220 {
221 NdbOperation *op = trans->getNdbOperation(myTable);
222 if (op == NULL)
223 APIERROR(trans->getNdbError());
224 op->updateTuple();
225 op->equal(xxx.pk_col, xxxr.pk_val);
226 op->setValue(xxx.col, xxxr.val);
227 }
228 if (trans->execute(NdbTransaction::Commit))
229 goto err;
230 ndb->closeTransaction(trans);
231 //fprintf(stderr, "updated to %u\n", xxxr.val);
232 break;
233 err:
234 const NdbError this_error= trans ?
235 trans->getNdbError() : ndb->getNdbError();
236 if (this_error.status == NdbError::TemporaryError)
237 {
238 if (retries--)
239 {
240 if (trans)
241 ndb->closeTransaction(trans);
242 NdbSleep_MilliSleep(retry_sleep);
243 continue; // retry
244 }
245 }
246 if (trans)
247 ndb->closeTransaction(trans);
248 APIERROR(this_error);
249 }
250 /* update done start timer */
251 gettimeofday(&xxxr.start_time, 0);
252 }
253
run_slave_wait(struct Xxx & xxx,struct XxxR & xxxr)254 static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr)
255 {
256 struct timeval old_end_time = xxxr.start_time, end_time;
257 Ndb *ndb = xxx.ndb;
258 const NdbDictionary::Table *myTable = xxx.table;
259 int retry_sleep= 10; /* 10 milliseconds */
260 int retries= 100;
261 while (1)
262 {
263 Uint32 val;
264 NdbTransaction *trans = ndb->startTransaction();
265 if (trans == NULL)
266 goto err;
267 {
268 NdbOperation *op = trans->getNdbOperation(myTable);
269 if (op == NULL)
270 APIERROR(trans->getNdbError());
271 op->readTuple();
272 op->equal(xxx.pk_col, xxxr.pk_val);
273 op->getValue(xxx.col, (char *)&val);
274 if (trans->execute(NdbTransaction::Commit))
275 goto err;
276 }
277 /* read done, check time of read */
278 gettimeofday(&end_time, 0);
279 ndb->closeTransaction(trans);
280 //fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val);
281 if (xxxr.val != val)
282 {
283 /* expected value not received yet */
284 retries = 100;
285 NdbSleep_MilliSleep(retry_sleep);
286 old_end_time = end_time;
287 continue;
288 }
289 break;
290 err:
291 const NdbError this_error= trans ?
292 trans->getNdbError() : ndb->getNdbError();
293 if (this_error.status == NdbError::TemporaryError)
294 {
295 if (retries--)
296 {
297 if (trans)
298 ndb->closeTransaction(trans);
299 NdbSleep_MilliSleep(retry_sleep);
300 continue; // retry
301 }
302 }
303 if (trans)
304 ndb->closeTransaction(trans);
305 APIERROR(this_error);
306 }
307
308 Int64 elapsed_usec1 =
309 ((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 +
310 ((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec);
311 Int64 elapsed_usec2 =
312 ((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 +
313 ((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec);
314 xxxr.latency =
315 ((elapsed_usec1 - elapsed_usec2/2)+999)/1000;
316 }
317