1 //-< TESTCONC.CPP >--------------------------------------------------*--------*
2 // FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
3 // (Main Memory Database Management System)                          *   /\|  *
4 //                                                                   *  /  \  *
5 //                          Created:     31-Nov-2002  K.A. Knizhnik  * / [] \ *
6 //                          Last update: 31-Nov-2002  K.A. Knizhnik  * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Test for concurrent update model
9 //-------------------------------------------------------------------*--------*
10 
11 #include <stdio.h>
12 #include "fastdb.h"
13 
14 USE_FASTDB_NAMESPACE
15 
16 class Bank {
17   public:
18     db_int8 total;
19     bool    online;
20 
21     TYPE_DESCRIPTOR((FIELD(total), FIELD(online)));
22 };
23 
24 REGISTER(Bank);
25 
26 class Account {
27   public:
28     int4    id;
29     db_int8 amount;
30 
31     TYPE_DESCRIPTOR((KEY(id, HASHED), FIELD(amount)));
32 };
33 
34 REGISTER(Account);
35 
36 dbLocalEvent timer;
37 
38 const int nUpdateIterations = 10000;
39 const int nMaxAccounts = 1000;
40 
inspector(void * arg)41 void thread_proc inspector(void* arg)
42 {
43     dbDatabase* db = (dbDatabase*)arg;
44     db->attach();
45     dbCursor<Account> accounts;
46     dbCursor<Bank> bank;
47     dbMutex mutex;
48     int n = 0;
49     bool online = true;
50     mutex.lock();
51     do {
52         if (accounts.select() > 0) {
53             db_int8 total = 0;
54             do {
55                 total += accounts->amount;
56             } while (accounts.next());
57             bank.select();
58             assert(total == bank->total);
59             online = bank->online;
60         }
61         db->commit();
62         n += 1;
63         timer.wait(mutex, 1);
64     } while (online);
65     mutex.unlock();
66     printf("Thread performs %d inspection iterations\n", n);
67 }
68 
main(int argc,char * argv[])69 int main(int argc, char* argv[])
70 {
71     int i;
72     int nThreads = 4;
73 #ifdef REPLICATION_SUPPORT
74     char* servers[3] = {"localhost:6101", "localhost:6102", "localhost:6103"};
75     char dbName[16];
76     if (argc < 3) {
77         fprintf(stderr, "Usage: testconc (update|inspect|coinspect) <REPLICATION-NODE-ID> [number-of-inspection-threads]\n");
78         return 1;
79     }
80     int replNodeId = atoi(argv[2]);
81     sprintf(dbName, "testconc%d", replNodeId);
82     if (argc == 4) {
83         nThreads = atoi(argv[3]);
84     }
85     if (strcmp(argv[1], "coinspect") == 0) {
86         char fileName[32];
87         sprintf(fileName, "%s.fdb", dbName);
88 
89         dbReplicatedDatabase db(dbDatabase::dbConcurrentRead);
90         if (db.open(dbName, fileName, replNodeId, servers, itemsof(servers))) {
91             printf("Start %d inspection threads\n", nThreads);
92             dbThread* thread = new dbThread[nThreads];
93             timer.open(false);
94             while (true) {
95                 for (i = 0; i < nThreads; i++) {
96                     thread[i].create(inspector, &db);
97                 }
98                 for (i = 0; i < nThreads; i++) {
99                     thread[i].join();
100                 }
101                 dbThread::sleep(10);
102             }
103             timer.close();
104             db.close();
105             return EXIT_SUCCESS;
106         }
107     } else
108 #else
109     char* dbName = "testconc";
110     if (argc < 2) {
111         fprintf(stderr, "Usage: testconc (update|inspect) [number-of-inspection-threads]\n");
112         return 1;
113     }
114     if (argc == 3) {
115         nThreads = atoi(argv[2]);
116     }
117 #endif
118     if (strcmp(argv[1], "inspect") == 0) {
119         dbDatabase db(dbDatabase::dbConcurrentRead);
120         if (db.open(WC_STRING(dbName))) {
121             printf("Start %d inspection threads\n", nThreads);
122             dbThread* thread = new dbThread[nThreads];
123             timer.open(false);
124             for (i = 0; i < nThreads; i++) {
125                 thread[i].create(inspector, &db);
126             }
127             for (i = 0; i < nThreads; i++) {
128                 thread[i].join();
129             }
130             timer.close();
131             db.close();
132             return EXIT_SUCCESS;
133         }
134     } else {
135 #ifdef REPLICATION_SUPPORT
136         char fileName[32];
137         sprintf(fileName, "%s.fdb", dbName);
138 
139         dbReplicatedDatabase db(dbDatabase::dbConcurrentUpdate);
140         if (db.open(WC_STRING(dbName), WC_STRING(fileName), replNodeId, servers, itemsof(servers))) {
141             printf("Update process started, now you should start inspectors: \"testconc inspector 1 N\", where N is number of inspectpors\n");
142 #else
143         dbDatabase db(dbDatabase::dbConcurrentUpdate);
144         if (db.open(WC_STRING(dbName))) {
145             printf("Update process started, now you should start inspectors: \"testconc inspector N\", where N is number of inspectpors\n");
146 #endif
147             dbThread::sleep(2);
148             dbCursor<Bank> bank(dbCursorForUpdate);
149             dbCursor<Account> accounts(dbCursorForUpdate);
150             dbQuery q;
151             int id;
152             q = "id=",id;
153 
154             // Initialization
155             if (bank.select() == 0) {
156                 Bank theBank;
157                 theBank.total = 0;
158                 theBank.online = true;
159                 insert(theBank);
160             } else {
161                 bank->online = true;
162                 bank.update();
163             }
164             db.commit();
165 
166             for (i = 0; i < nUpdateIterations; i++) {
167                 int nAccounts = accounts.select();
168                 id = 0;
169                 db_int8 delta = i;
170                 if (accounts.last()) {
171                     id = accounts->id + 1;
172                     accounts->amount += i;
173                     accounts.update();
174                     delta += i;
175                 }
176                 Account acc;
177                 acc.amount = i;
178                 acc.id = id;
179                 insert(acc);
180                 if (nAccounts > nMaxAccounts) {
181                     accounts.first();
182                     id = accounts->id;
183                     int n = accounts.select(q);
184                     assert(n == 1);
185                     delta -= accounts->amount;
186                     accounts.remove();
187                 }
188                 bank.select();
189                 bank->total += delta;
190                 bank.update();
191                 db.commit();
192             }
193             printf("End of %d update iterations\n", nUpdateIterations);
194             bank.select();
195             bank->online = false;
196             bank.update();
197             db.close();
198             return EXIT_SUCCESS;
199         }
200     }
201     printf("Failed to open database\n");
202     return EXIT_FAILURE;
203 }
204 
205 
206 
207 
208