1 //-< TESTCONC.CPP >--------------------------------------------------*--------*
2 // FastDB Version 1.0 (c) 1999 GARRET * ? *
3 // (Main Memory Database Management System) * /\| *
4 // * / \ *
5 // Created: 31-Nov-2002 K.A. Knizhnik * / [] \ *
6 // Last update: 31-Nov-2002 K.A. Knizhnik * GARRET *
7 //-------------------------------------------------------------------*--------*
8 // Test for concurrent update model
9 //-------------------------------------------------------------------*--------*
10
11 #include <stdio.h>
12 #include "fastdb.h"
13
14 USE_FASTDB_NAMESPACE
15
16 class Bank {
17 public:
18 db_int8 total;
19 bool online;
20
21 TYPE_DESCRIPTOR((FIELD(total), FIELD(online)));
22 };
23
24 REGISTER(Bank);
25
26 class Account {
27 public:
28 int4 id;
29 db_int8 amount;
30
31 TYPE_DESCRIPTOR((KEY(id, HASHED), FIELD(amount)));
32 };
33
34 REGISTER(Account);
35
36 dbLocalEvent timer;
37
38 const int nUpdateIterations = 10000;
39 const int nMaxAccounts = 1000;
40
inspector(void * arg)41 void thread_proc inspector(void* arg)
42 {
43 dbDatabase* db = (dbDatabase*)arg;
44 db->attach();
45 dbCursor<Account> accounts;
46 dbCursor<Bank> bank;
47 dbMutex mutex;
48 int n = 0;
49 bool online = true;
50 mutex.lock();
51 do {
52 if (accounts.select() > 0) {
53 db_int8 total = 0;
54 do {
55 total += accounts->amount;
56 } while (accounts.next());
57 bank.select();
58 assert(total == bank->total);
59 online = bank->online;
60 }
61 db->commit();
62 n += 1;
63 timer.wait(mutex, 1);
64 } while (online);
65 mutex.unlock();
66 printf("Thread performs %d inspection iterations\n", n);
67 }
68
main(int argc,char * argv[])69 int main(int argc, char* argv[])
70 {
71 int i;
72 int nThreads = 4;
73 #ifdef REPLICATION_SUPPORT
74 char* servers[3] = {"localhost:6101", "localhost:6102", "localhost:6103"};
75 char dbName[16];
76 if (argc < 3) {
77 fprintf(stderr, "Usage: testconc (update|inspect|coinspect) <REPLICATION-NODE-ID> [number-of-inspection-threads]\n");
78 return 1;
79 }
80 int replNodeId = atoi(argv[2]);
81 sprintf(dbName, "testconc%d", replNodeId);
82 if (argc == 4) {
83 nThreads = atoi(argv[3]);
84 }
85 if (strcmp(argv[1], "coinspect") == 0) {
86 char fileName[32];
87 sprintf(fileName, "%s.fdb", dbName);
88
89 dbReplicatedDatabase db(dbDatabase::dbConcurrentRead);
90 if (db.open(dbName, fileName, replNodeId, servers, itemsof(servers))) {
91 printf("Start %d inspection threads\n", nThreads);
92 dbThread* thread = new dbThread[nThreads];
93 timer.open(false);
94 while (true) {
95 for (i = 0; i < nThreads; i++) {
96 thread[i].create(inspector, &db);
97 }
98 for (i = 0; i < nThreads; i++) {
99 thread[i].join();
100 }
101 dbThread::sleep(10);
102 }
103 timer.close();
104 db.close();
105 return EXIT_SUCCESS;
106 }
107 } else
108 #else
109 char* dbName = "testconc";
110 if (argc < 2) {
111 fprintf(stderr, "Usage: testconc (update|inspect) [number-of-inspection-threads]\n");
112 return 1;
113 }
114 if (argc == 3) {
115 nThreads = atoi(argv[2]);
116 }
117 #endif
118 if (strcmp(argv[1], "inspect") == 0) {
119 dbDatabase db(dbDatabase::dbConcurrentRead);
120 if (db.open(WC_STRING(dbName))) {
121 printf("Start %d inspection threads\n", nThreads);
122 dbThread* thread = new dbThread[nThreads];
123 timer.open(false);
124 for (i = 0; i < nThreads; i++) {
125 thread[i].create(inspector, &db);
126 }
127 for (i = 0; i < nThreads; i++) {
128 thread[i].join();
129 }
130 timer.close();
131 db.close();
132 return EXIT_SUCCESS;
133 }
134 } else {
135 #ifdef REPLICATION_SUPPORT
136 char fileName[32];
137 sprintf(fileName, "%s.fdb", dbName);
138
139 dbReplicatedDatabase db(dbDatabase::dbConcurrentUpdate);
140 if (db.open(WC_STRING(dbName), WC_STRING(fileName), replNodeId, servers, itemsof(servers))) {
141 printf("Update process started, now you should start inspectors: \"testconc inspector 1 N\", where N is number of inspectpors\n");
142 #else
143 dbDatabase db(dbDatabase::dbConcurrentUpdate);
144 if (db.open(WC_STRING(dbName))) {
145 printf("Update process started, now you should start inspectors: \"testconc inspector N\", where N is number of inspectpors\n");
146 #endif
147 dbThread::sleep(2);
148 dbCursor<Bank> bank(dbCursorForUpdate);
149 dbCursor<Account> accounts(dbCursorForUpdate);
150 dbQuery q;
151 int id;
152 q = "id=",id;
153
154 // Initialization
155 if (bank.select() == 0) {
156 Bank theBank;
157 theBank.total = 0;
158 theBank.online = true;
159 insert(theBank);
160 } else {
161 bank->online = true;
162 bank.update();
163 }
164 db.commit();
165
166 for (i = 0; i < nUpdateIterations; i++) {
167 int nAccounts = accounts.select();
168 id = 0;
169 db_int8 delta = i;
170 if (accounts.last()) {
171 id = accounts->id + 1;
172 accounts->amount += i;
173 accounts.update();
174 delta += i;
175 }
176 Account acc;
177 acc.amount = i;
178 acc.id = id;
179 insert(acc);
180 if (nAccounts > nMaxAccounts) {
181 accounts.first();
182 id = accounts->id;
183 int n = accounts.select(q);
184 assert(n == 1);
185 delta -= accounts->amount;
186 accounts.remove();
187 }
188 bank.select();
189 bank->total += delta;
190 bank.update();
191 db.commit();
192 }
193 printf("End of %d update iterations\n", nUpdateIterations);
194 bank.select();
195 bank->online = false;
196 bank.update();
197 db.close();
198 return EXIT_SUCCESS;
199 }
200 }
201 printf("Failed to open database\n");
202 return EXIT_FAILURE;
203 }
204
205
206
207
208