1 /*-
2  * Copyright (c) 2006, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file EXAMPLES-LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 // NOTE: This example is a simplified version of the RepQuoteExample.cxx
10 // example that can be found in the db/examples/cxx/excxx_repquote directory.
11 //
12 // This example is intended only as an aid in learning Replication Manager
13 // concepts. It is not complete in that many features are not exercised
14 // in it, nor are many error conditions properly handled.
15 
16 #include <iostream>
17 #include <errno.h>
18 
19 #include <db_cxx.h>
20 #include "RepConfigInfo.h"
21 
22 using std::cout;
23 using std::cin;
24 using std::cerr;
25 using std::endl;
26 using std::flush;
27 
28 #define CACHESIZE (10 * 1024 * 1024)
29 #define DATABASE "quote.db"
30 #define SLEEPTIME 3
31 
32 const char *progname = "excxx_repquote_gsg_repmgr";
33 
34 #ifdef _WIN32
35 #define WIN32_LEAN_AND_MEAN
36 #include <windows.h>
37 #include <direct.h>
38 #define    sleep(s)        Sleep(1000 * (s))
39 
40 extern "C" {
41   extern int getopt(int, char * const *, const char *);
42   extern char *optarg;
43 }
44 #endif
45 
46 // Struct used to store information in Db app_private field.
47 typedef struct {
48     int is_master;
49 } APP_DATA;
50 
51 class RepMgrGSG
52 {
53 public:
54     RepMgrGSG();
55     int init(RepConfigInfo* config);
56     int doloop();
57     int terminate();
58 
59     static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
60 
61 private:
62     // Disable copy constructor.
63     RepMgrGSG(const RepMgrGSG &);
64     void operator = (const RepMgrGSG &);
65 
66     // Internal data members.
67     APP_DATA        app_data;
68     RepConfigInfo   *app_config;
69     DbEnv           dbenv;
70 
71     // Private methods.
72     static int print_stocks(Db *dbp);
73 };
74 
usage()75 static void usage()
76 {
77     cerr << "usage: " << progname << endl
78         << "-h home -l|-L host:port [-r host:port] [-p priority]" << endl;
79 
80     cerr
81         << "\t -h home directory (required)" << endl
82         << "\t -l host:port (required unless -L is specified;"
83         << "\t    l stands for local)" << endl
84         << "\t -L host:port (optional, L means group creator)" << endl
85         << "\t -r host:port (optional; r stands for remote; any "
86         << "number of these" << endl
87         << "\t    may be specified)" << endl
88         << "\t -p priority (optional; defaults to 100)" << endl;
89 
90     exit(EXIT_FAILURE);
91 }
92 
main(int argc,char ** argv)93 int main(int argc, char **argv)
94 {
95     RepConfigInfo config;
96     char ch, *last_colon, *portstr, *tmphost;
97     int tmpport;
98     int ret;
99 
100     // Extract the command line parameters.
101     while ((ch = getopt(argc, argv, "h:l:L:p:r:")) != EOF) {
102         switch (ch) {
103         case 'h':
104             config.home = optarg;
105             break;
106         case 'L':
107             config.this_host.creator = true; // FALLTHROUGH
108         case 'l':
109             config.this_host.host = optarg;
110             /*
111              * The final colon in host:port string is the
112              * boundary between the host and the port portions
113              * of the string.
114              */
115             if ((last_colon = strrchr(optarg, ':')) == NULL) {
116                 cerr << "Bad local host specification." << endl;
117                 usage();
118             }
119             /*
120              * Separate the host and port portions of the
121              * string for further processing.
122              */
123             portstr = last_colon + 1;
124             *last_colon = '\0';
125             config.this_host.port = (unsigned short)atoi(portstr);
126             config.got_listen_address = true;
127             break;
128         case 'p':
129             config.priority = atoi(optarg);
130             break;
131         case 'r':
132             tmphost = optarg;
133             /*
134              * The final colon in host:port string is the
135              * boundary between the host and the port portions
136              * of the string.
137              */
138             if ((last_colon = strrchr(tmphost, ':')) == NULL) {
139                 cerr << "Bad remote host specification." << endl;
140                 usage();
141             }
142             /*
143              * Separate the host and port portions of the
144              * string for further processing.
145              */
146             portstr = last_colon + 1;
147             *last_colon = '\0';
148             tmpport = (unsigned short)atoi(portstr);
149             config.addOtherHost(tmphost, tmpport);
150             break;
151         case '?':
152         default:
153             usage();
154         }
155     }
156 
157     // Error check command line.
158     if ((!config.got_listen_address) || config.home == NULL)
159         usage();
160 
161     RepMgrGSG runner;
162     try {
163         if((ret = runner.init(&config)) != 0)
164             goto err;
165         if((ret = runner.doloop()) != 0)
166             goto err;
167     } catch (DbException dbe) {
168         cerr << "Caught an exception during initialization or"
169             << " processing: " << dbe.what() << endl;
170     }
171 err:
172     runner.terminate();
173     return 0;
174 }
175 
RepMgrGSG()176 RepMgrGSG::RepMgrGSG() : app_config(0), dbenv((u_int32_t)0)
177 {
178     app_data.is_master = 0; // By default, assume this site is not a master.
179 }
180 
init(RepConfigInfo * config)181 int RepMgrGSG::init(RepConfigInfo *config)
182 {
183     int ret = 0;
184 
185     app_config = config;
186 
187     dbenv.set_errfile(stderr);
188     dbenv.set_errpfx(progname);
189     dbenv.set_app_private(&app_data);
190     dbenv.set_event_notify(event_callback);
191     dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL);
192 
193     DbSite *dbsite;
194     dbenv.repmgr_site(app_config->this_host.host,
195         app_config->this_host.port, &dbsite, 0);
196     dbsite->set_config(DB_LOCAL_SITE, 1);
197     if (app_config->this_host.creator)
198         dbsite->set_config(DB_GROUP_CREATOR, 1);
199 
200     dbsite->close();
201 
202     int i = 1;
203     for ( REP_HOST_INFO *cur = app_config->other_hosts;
204         cur != NULL && i <= app_config->nrsites;
205         cur = cur->next, i++) {
206 
207         dbenv.repmgr_site(cur->host, cur->port, &dbsite, 0);
208         dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
209 
210         dbsite->close();
211     }
212 
213     dbenv.rep_set_priority(app_config->priority);
214 
215     // Permanent messages require at least one ack.
216     dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ONE);
217     // Give 500 microseconds to receive the ack.
218     dbenv.rep_set_timeout(DB_REP_ACK_TIMEOUT, 5);
219 
220     // We can now open our environment, although we're not ready to
221     // begin replicating.  However, we want to have a dbenv around
222     // so that we can send it into any of our message handlers.
223     dbenv.set_cachesize(0, CACHESIZE, 0);
224     dbenv.set_flags(DB_TXN_NOSYNC, 1);
225 
226     try {
227         dbenv.open(app_config->home, DB_CREATE | DB_RECOVER |
228             DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
229             DB_INIT_MPOOL | DB_INIT_TXN, 0);
230     } catch(DbException dbe) {
231         cerr << "Caught an exception during DB environment open." << endl
232             << "Ensure that the home directory is created prior to starting"
233             << " the application." << endl;
234         ret = ENOENT;
235         goto err;
236     }
237 
238     if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0)
239         goto err;
240 
241 err:
242     return ret;
243 }
244 
terminate()245 int RepMgrGSG::terminate()
246 {
247     try {
248         dbenv.close(0);
249     } catch (DbException dbe) {
250         cerr << "error closing environment: " << dbe.what() << endl;
251     }
252     return 0;
253 }
254 
255 // Provides the main data processing function for our application.
256 // This function provides a command line prompt to which the user
257 // can provide a ticker string and a stock price.  Once a value is
258 // entered to the application, the application writes the value to
259 // the database and then displays the entire database.
260 #define BUFSIZE 1024
doloop()261 int RepMgrGSG::doloop()
262 {
263     Dbt key, data;
264     Db *dbp;
265     char buf[BUFSIZE], *rbuf;
266     int ret;
267 
268     dbp = 0;
269     memset(&key, 0, sizeof(key));
270     memset(&data, 0, sizeof(data));
271     ret = 0;
272 
273     for (;;) {
274         if (dbp == 0) {
275             dbp = new Db(&dbenv, 0);
276 
277             try {
278                 dbp->open(NULL, DATABASE, NULL, DB_BTREE,
279                     app_data.is_master ? DB_CREATE | DB_AUTO_COMMIT :
280                     DB_AUTO_COMMIT, 0);
281             } catch(DbException dbe) {
282                 // It is expected that this condition will be triggered
283                 // when client sites start up.  It can take a while for
284                 // the master site to be found and synced, and no DB will
285                 // be available until then.
286                 if (dbe.get_errno() == ENOENT) {
287                     cout << "No stock db available yet - retrying." << endl;
288                     try {
289                         dbp->close(0);
290                     } catch (DbException dbe2) {
291                         cout << "Unexpected error closing after failed" <<
292                             " open, message: " << dbe2.what() << endl;
293                         dbp = NULL;
294                         goto err;
295                     }
296                     dbp = NULL;
297                     sleep(SLEEPTIME);
298                     continue;
299                 } else {
300                     dbenv.err(ret, "DB->open");
301                     throw dbe;
302                 }
303             }
304         }
305 
306         cout << "QUOTESERVER" ;
307         if (!app_data.is_master)
308             cout << "(read-only)";
309         cout << "> " << flush;
310 
311         if (fgets(buf, sizeof(buf), stdin) == NULL)
312             break;
313         if (strtok(&buf[0], " \t\n") == NULL) {
314             switch ((ret = print_stocks(dbp))) {
315             case 0:
316                 continue;
317             case DB_REP_HANDLE_DEAD:
318                 (void)dbp->close(DB_NOSYNC);
319                 cout << "closing db handle due to rep handle dead" << endl;
320                 dbp = NULL;
321                 continue;
322             default:
323                 dbp->err(ret, "Error traversing data");
324                 goto err;
325             }
326         }
327         rbuf = strtok(NULL, " \t\n");
328         if (rbuf == NULL || rbuf[0] == '\0') {
329             if (strncmp(buf, "exit", 4) == 0 ||
330                 strncmp(buf, "quit", 4) == 0)
331                 break;
332             dbenv.errx("Format: TICKER VALUE");
333             continue;
334         }
335 
336         if (!app_data.is_master) {
337             dbenv.errx("Can't update at client");
338             continue;
339         }
340 
341         key.set_data(buf);
342         key.set_size((u_int32_t)strlen(buf));
343 
344         data.set_data(rbuf);
345         data.set_size((u_int32_t)strlen(rbuf));
346 
347         if ((ret = dbp->put(NULL, &key, &data, 0)) != 0)
348         {
349             dbp->err(ret, "DB->put");
350             if (ret != DB_KEYEXIST)
351                 goto err;
352         }
353     }
354 
355 err:    if (dbp != 0) {
356         (void)dbp->close(DB_NOSYNC);
357         }
358 
359     return (ret);
360 }
361 
362 // Handle replication events of interest to this application.
event_callback(DbEnv * dbenv,u_int32_t which,void * info)363 void RepMgrGSG::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
364 {
365     APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
366 
367     info = 0;                // Currently unused.
368 
369     switch (which) {
370     case DB_EVENT_REP_MASTER:
371         app->is_master = 1;
372         break;
373 
374     case DB_EVENT_REP_CLIENT:
375         app->is_master = 0;
376         break;
377 
378     case DB_EVENT_REP_STARTUPDONE: // FALLTHROUGH
379     case DB_EVENT_REP_NEWMASTER:
380         // Ignore.
381         break;
382 
383     default:
384         dbenv->errx("ignoring event %d", which);
385     }
386 }
387 
388 // Display all the stock quote information in the database.
print_stocks(Db * dbp)389 int RepMgrGSG::print_stocks(Db *dbp)
390 {
391     Dbc *dbc;
392     Dbt key, data;
393 #define    MAXKEYSIZE    10
394 #define    MAXDATASIZE    20
395     char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
396     int ret, t_ret;
397     u_int32_t keysize, datasize;
398 
399      if ((ret = dbp->cursor(NULL, &dbc, 0)) != 0) {
400         dbp->err(ret, "can't open cursor");
401         return (ret);
402     }
403 
404     memset(&key, 0, sizeof(key));
405     memset(&data, 0, sizeof(data));
406 
407     cout << "\tSymbol\tPrice" << endl
408         << "\t======\t=====" << endl;
409 
410     for (ret = dbc->get(&key, &data, DB_FIRST);
411         ret == 0;
412         ret = dbc->get(&key, &data, DB_NEXT)) {
413         keysize = key.get_size() > MAXKEYSIZE ? MAXKEYSIZE : key.get_size();
414         memcpy(keybuf, key.get_data(), keysize);
415         keybuf[keysize] = '\0';
416 
417         datasize = data.get_size() >=
418             MAXDATASIZE ? MAXDATASIZE : data.get_size();
419         memcpy(databuf, data.get_data(), datasize);
420         databuf[datasize] = '\0';
421 
422         cout << "\t" << keybuf << "\t" << databuf << endl;
423     }
424     cout << endl << flush;
425 
426     if ((t_ret = dbc->close()) != 0 && ret == 0) {
427         cout << "closed cursor" << endl;
428         ret = t_ret;
429     }
430 
431     switch (ret) {
432     case 0:
433     case DB_NOTFOUND:
434     case DB_LOCK_DEADLOCK:
435         return (0);
436     default:
437         return (ret);
438     }
439 }
440 
441