1 /*-
2 * Copyright (c) 2006, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file EXAMPLES-LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 // NOTE: This example is a simplified version of the RepQuoteExample.cxx
10 // example that can be found in the db/examples/cxx/excxx_repquote directory.
11 //
12 // This example is intended only as an aid in learning Replication Manager
13 // concepts. It is not complete in that many features are not exercised
14 // in it, nor are many error conditions properly handled.
15
16 #include <iostream>
17 #include <errno.h>
18
19 #include <db_cxx.h>
20 #include "RepConfigInfo.h"
21
22 using std::cout;
23 using std::cin;
24 using std::cerr;
25 using std::endl;
26 using std::flush;
27
28 #define CACHESIZE (10 * 1024 * 1024)
29 #define DATABASE "quote.db"
30 #define SLEEPTIME 3
31
32 const char *progname = "excxx_repquote_gsg_repmgr";
33
34 #ifdef _WIN32
35 #define WIN32_LEAN_AND_MEAN
36 #include <windows.h>
37 #include <direct.h>
38 #define sleep(s) Sleep(1000 * (s))
39
40 extern "C" {
41 extern int getopt(int, char * const *, const char *);
42 extern char *optarg;
43 }
44 #endif
45
46 // Struct used to store information in Db app_private field.
47 typedef struct {
48 int is_master;
49 } APP_DATA;
50
51 class RepMgrGSG
52 {
53 public:
54 RepMgrGSG();
55 int init(RepConfigInfo* config);
56 int doloop();
57 int terminate();
58
59 static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
60
61 private:
62 // Disable copy constructor.
63 RepMgrGSG(const RepMgrGSG &);
64 void operator = (const RepMgrGSG &);
65
66 // Internal data members.
67 APP_DATA app_data;
68 RepConfigInfo *app_config;
69 DbEnv dbenv;
70
71 // Private methods.
72 static int print_stocks(Db *dbp);
73 };
74
usage()75 static void usage()
76 {
77 cerr << "usage: " << progname << endl
78 << "-h home -l|-L host:port [-r host:port] [-p priority]" << endl;
79
80 cerr
81 << "\t -h home directory (required)" << endl
82 << "\t -l host:port (required unless -L is specified;"
83 << "\t l stands for local)" << endl
84 << "\t -L host:port (optional, L means group creator)" << endl
85 << "\t -r host:port (optional; r stands for remote; any "
86 << "number of these" << endl
87 << "\t may be specified)" << endl
88 << "\t -p priority (optional; defaults to 100)" << endl;
89
90 exit(EXIT_FAILURE);
91 }
92
main(int argc,char ** argv)93 int main(int argc, char **argv)
94 {
95 RepConfigInfo config;
96 char ch, *last_colon, *portstr, *tmphost;
97 int tmpport;
98 int ret;
99
100 // Extract the command line parameters.
101 while ((ch = getopt(argc, argv, "h:l:L:p:r:")) != EOF) {
102 switch (ch) {
103 case 'h':
104 config.home = optarg;
105 break;
106 case 'L':
107 config.this_host.creator = true; // FALLTHROUGH
108 case 'l':
109 config.this_host.host = optarg;
110 /*
111 * The final colon in host:port string is the
112 * boundary between the host and the port portions
113 * of the string.
114 */
115 if ((last_colon = strrchr(optarg, ':')) == NULL) {
116 cerr << "Bad local host specification." << endl;
117 usage();
118 }
119 /*
120 * Separate the host and port portions of the
121 * string for further processing.
122 */
123 portstr = last_colon + 1;
124 *last_colon = '\0';
125 config.this_host.port = (unsigned short)atoi(portstr);
126 config.got_listen_address = true;
127 break;
128 case 'p':
129 config.priority = atoi(optarg);
130 break;
131 case 'r':
132 tmphost = optarg;
133 /*
134 * The final colon in host:port string is the
135 * boundary between the host and the port portions
136 * of the string.
137 */
138 if ((last_colon = strrchr(tmphost, ':')) == NULL) {
139 cerr << "Bad remote host specification." << endl;
140 usage();
141 }
142 /*
143 * Separate the host and port portions of the
144 * string for further processing.
145 */
146 portstr = last_colon + 1;
147 *last_colon = '\0';
148 tmpport = (unsigned short)atoi(portstr);
149 config.addOtherHost(tmphost, tmpport);
150 break;
151 case '?':
152 default:
153 usage();
154 }
155 }
156
157 // Error check command line.
158 if ((!config.got_listen_address) || config.home == NULL)
159 usage();
160
161 RepMgrGSG runner;
162 try {
163 if((ret = runner.init(&config)) != 0)
164 goto err;
165 if((ret = runner.doloop()) != 0)
166 goto err;
167 } catch (DbException dbe) {
168 cerr << "Caught an exception during initialization or"
169 << " processing: " << dbe.what() << endl;
170 }
171 err:
172 runner.terminate();
173 return 0;
174 }
175
RepMgrGSG()176 RepMgrGSG::RepMgrGSG() : app_config(0), dbenv((u_int32_t)0)
177 {
178 app_data.is_master = 0; // By default, assume this site is not a master.
179 }
180
init(RepConfigInfo * config)181 int RepMgrGSG::init(RepConfigInfo *config)
182 {
183 int ret = 0;
184
185 app_config = config;
186
187 dbenv.set_errfile(stderr);
188 dbenv.set_errpfx(progname);
189 dbenv.set_app_private(&app_data);
190 dbenv.set_event_notify(event_callback);
191 dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL);
192
193 DbSite *dbsite;
194 dbenv.repmgr_site(app_config->this_host.host,
195 app_config->this_host.port, &dbsite, 0);
196 dbsite->set_config(DB_LOCAL_SITE, 1);
197 if (app_config->this_host.creator)
198 dbsite->set_config(DB_GROUP_CREATOR, 1);
199
200 dbsite->close();
201
202 int i = 1;
203 for ( REP_HOST_INFO *cur = app_config->other_hosts;
204 cur != NULL && i <= app_config->nrsites;
205 cur = cur->next, i++) {
206
207 dbenv.repmgr_site(cur->host, cur->port, &dbsite, 0);
208 dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
209
210 dbsite->close();
211 }
212
213 dbenv.rep_set_priority(app_config->priority);
214
215 // Permanent messages require at least one ack.
216 dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ONE);
217 // Give 500 microseconds to receive the ack.
218 dbenv.rep_set_timeout(DB_REP_ACK_TIMEOUT, 5);
219
220 // We can now open our environment, although we're not ready to
221 // begin replicating. However, we want to have a dbenv around
222 // so that we can send it into any of our message handlers.
223 dbenv.set_cachesize(0, CACHESIZE, 0);
224 dbenv.set_flags(DB_TXN_NOSYNC, 1);
225
226 try {
227 dbenv.open(app_config->home, DB_CREATE | DB_RECOVER |
228 DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
229 DB_INIT_MPOOL | DB_INIT_TXN, 0);
230 } catch(DbException dbe) {
231 cerr << "Caught an exception during DB environment open." << endl
232 << "Ensure that the home directory is created prior to starting"
233 << " the application." << endl;
234 ret = ENOENT;
235 goto err;
236 }
237
238 if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0)
239 goto err;
240
241 err:
242 return ret;
243 }
244
terminate()245 int RepMgrGSG::terminate()
246 {
247 try {
248 dbenv.close(0);
249 } catch (DbException dbe) {
250 cerr << "error closing environment: " << dbe.what() << endl;
251 }
252 return 0;
253 }
254
255 // Provides the main data processing function for our application.
256 // This function provides a command line prompt to which the user
257 // can provide a ticker string and a stock price. Once a value is
258 // entered to the application, the application writes the value to
259 // the database and then displays the entire database.
260 #define BUFSIZE 1024
doloop()261 int RepMgrGSG::doloop()
262 {
263 Dbt key, data;
264 Db *dbp;
265 char buf[BUFSIZE], *rbuf;
266 int ret;
267
268 dbp = 0;
269 memset(&key, 0, sizeof(key));
270 memset(&data, 0, sizeof(data));
271 ret = 0;
272
273 for (;;) {
274 if (dbp == 0) {
275 dbp = new Db(&dbenv, 0);
276
277 try {
278 dbp->open(NULL, DATABASE, NULL, DB_BTREE,
279 app_data.is_master ? DB_CREATE | DB_AUTO_COMMIT :
280 DB_AUTO_COMMIT, 0);
281 } catch(DbException dbe) {
282 // It is expected that this condition will be triggered
283 // when client sites start up. It can take a while for
284 // the master site to be found and synced, and no DB will
285 // be available until then.
286 if (dbe.get_errno() == ENOENT) {
287 cout << "No stock db available yet - retrying." << endl;
288 try {
289 dbp->close(0);
290 } catch (DbException dbe2) {
291 cout << "Unexpected error closing after failed" <<
292 " open, message: " << dbe2.what() << endl;
293 dbp = NULL;
294 goto err;
295 }
296 dbp = NULL;
297 sleep(SLEEPTIME);
298 continue;
299 } else {
300 dbenv.err(ret, "DB->open");
301 throw dbe;
302 }
303 }
304 }
305
306 cout << "QUOTESERVER" ;
307 if (!app_data.is_master)
308 cout << "(read-only)";
309 cout << "> " << flush;
310
311 if (fgets(buf, sizeof(buf), stdin) == NULL)
312 break;
313 if (strtok(&buf[0], " \t\n") == NULL) {
314 switch ((ret = print_stocks(dbp))) {
315 case 0:
316 continue;
317 case DB_REP_HANDLE_DEAD:
318 (void)dbp->close(DB_NOSYNC);
319 cout << "closing db handle due to rep handle dead" << endl;
320 dbp = NULL;
321 continue;
322 default:
323 dbp->err(ret, "Error traversing data");
324 goto err;
325 }
326 }
327 rbuf = strtok(NULL, " \t\n");
328 if (rbuf == NULL || rbuf[0] == '\0') {
329 if (strncmp(buf, "exit", 4) == 0 ||
330 strncmp(buf, "quit", 4) == 0)
331 break;
332 dbenv.errx("Format: TICKER VALUE");
333 continue;
334 }
335
336 if (!app_data.is_master) {
337 dbenv.errx("Can't update at client");
338 continue;
339 }
340
341 key.set_data(buf);
342 key.set_size((u_int32_t)strlen(buf));
343
344 data.set_data(rbuf);
345 data.set_size((u_int32_t)strlen(rbuf));
346
347 if ((ret = dbp->put(NULL, &key, &data, 0)) != 0)
348 {
349 dbp->err(ret, "DB->put");
350 if (ret != DB_KEYEXIST)
351 goto err;
352 }
353 }
354
355 err: if (dbp != 0) {
356 (void)dbp->close(DB_NOSYNC);
357 }
358
359 return (ret);
360 }
361
362 // Handle replication events of interest to this application.
event_callback(DbEnv * dbenv,u_int32_t which,void * info)363 void RepMgrGSG::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
364 {
365 APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
366
367 info = 0; // Currently unused.
368
369 switch (which) {
370 case DB_EVENT_REP_MASTER:
371 app->is_master = 1;
372 break;
373
374 case DB_EVENT_REP_CLIENT:
375 app->is_master = 0;
376 break;
377
378 case DB_EVENT_REP_STARTUPDONE: // FALLTHROUGH
379 case DB_EVENT_REP_NEWMASTER:
380 // Ignore.
381 break;
382
383 default:
384 dbenv->errx("ignoring event %d", which);
385 }
386 }
387
388 // Display all the stock quote information in the database.
print_stocks(Db * dbp)389 int RepMgrGSG::print_stocks(Db *dbp)
390 {
391 Dbc *dbc;
392 Dbt key, data;
393 #define MAXKEYSIZE 10
394 #define MAXDATASIZE 20
395 char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
396 int ret, t_ret;
397 u_int32_t keysize, datasize;
398
399 if ((ret = dbp->cursor(NULL, &dbc, 0)) != 0) {
400 dbp->err(ret, "can't open cursor");
401 return (ret);
402 }
403
404 memset(&key, 0, sizeof(key));
405 memset(&data, 0, sizeof(data));
406
407 cout << "\tSymbol\tPrice" << endl
408 << "\t======\t=====" << endl;
409
410 for (ret = dbc->get(&key, &data, DB_FIRST);
411 ret == 0;
412 ret = dbc->get(&key, &data, DB_NEXT)) {
413 keysize = key.get_size() > MAXKEYSIZE ? MAXKEYSIZE : key.get_size();
414 memcpy(keybuf, key.get_data(), keysize);
415 keybuf[keysize] = '\0';
416
417 datasize = data.get_size() >=
418 MAXDATASIZE ? MAXDATASIZE : data.get_size();
419 memcpy(databuf, data.get_data(), datasize);
420 databuf[datasize] = '\0';
421
422 cout << "\t" << keybuf << "\t" << databuf << endl;
423 }
424 cout << endl << flush;
425
426 if ((t_ret = dbc->close()) != 0 && ret == 0) {
427 cout << "closed cursor" << endl;
428 ret = t_ret;
429 }
430
431 switch (ret) {
432 case 0:
433 case DB_NOTFOUND:
434 case DB_LOCK_DEADLOCK:
435 return (0);
436 default:
437 return (ret);
438 }
439 }
440
441