1 /*  $Id: test_bdb_bvstore.cpp 540797 2017-07-11 15:32:49Z mozese2 $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov
27  *
28  * File Description: Test application for NCBI Berkeley DB library (BDB)
29  *
30  */
31 
32 /// @file test_bdb_thr.cpp
33 /// Illustrates use of concurrent BDB transactions in threads
34 
35 #include <ncbi_pch.hpp>
36 #include <corelib/ncbiapp.hpp>
37 #include <corelib/ncbiargs.hpp>
38 #include <corelib/ncbifile.hpp>
39 #include <corelib/ncbitime.hpp>
40 #include <corelib/ncbithr.hpp>
41 #include <corelib/ncbimtx.hpp>
42 #include <corelib/ncbicntr.hpp>
43 #include <stdio.h>
44 
45 #include <db/bdb/bdb_expt.hpp>
46 #include <db/bdb/bdb_types.hpp>
47 #include <db/bdb/bdb_file.hpp>
48 #include <db/bdb/bdb_env.hpp>
49 #include <db/bdb/bdb_cursor.hpp>
50 #include <db/bdb/bdb_blob.hpp>
51 #include <db/bdb/bdb_trans.hpp>
52 #include <db/bdb/bdb_util.hpp>
53 #include <db/bdb/bdb_bv_store.hpp>
54 
55 
56 #include <common/test_assert.h>  /* This header must go last */
57 
58 USING_NCBI_SCOPE;
59 
60 /// @internal
61 struct SBVStoreDB : public CBDB_BvStore< bm::bvector<> >
62 {
63     CBDB_FieldUint4        thr_id;
64     CBDB_FieldUint4        rec_id;
65 
66     typedef CBDB_BvStore< bm::bvector<> >  TParent;
67 
SBVStoreDBSBVStoreDB68     SBVStoreDB()
69     : TParent(256)
70     {
71         DisableNull();
72         BindKey("thr_id",   &thr_id);
73         BindKey("rec_id",   &rec_id);
74     }
75 };
76 
77 /// Fill bitvector
78 ///
79 /// @internal
FillBV(bm::bvector<> * bv,unsigned length)80 void FillBV(bm::bvector<>* bv, unsigned length)
81 {
82     for (unsigned i = 1; i < length; i+=10) {
83         bv->set(i);
84     }
85 }
86 
87 
88 /// @internal
89 class  CBDB_BVTestThread : public CThread
90 {
91 public:
92     CBDB_BVTestThread(CBDB_Env&  env,
93                       unsigned   thread_id,
94                       unsigned   recs);
95 protected:
96     /// Overload from CThread (main thread function)
97     virtual void* Main(void);
98 private:
99     ~CBDB_BVTestThread();
100     CBDB_BVTestThread(const CBDB_BVTestThread&);
101     CBDB_BVTestThread& operator=(const CBDB_BVTestThread&);
102 private:
103     CBDB_Env&   m_Env;
104     unsigned    m_ThreadId;
105     unsigned    m_Recs;
106 };
107 
CBDB_BVTestThread(CBDB_Env & env,unsigned thread_id,unsigned recs)108 CBDB_BVTestThread::CBDB_BVTestThread(CBDB_Env& env,
109                                      unsigned  thread_id,
110                                      unsigned  recs)
111  : m_Env(env),
112    m_ThreadId(thread_id),
113    m_Recs(recs)
114 {
115 }
116 
~CBDB_BVTestThread()117 CBDB_BVTestThread::~CBDB_BVTestThread()
118 {
119 }
120 
121 
Main(void)122 void* CBDB_BVTestThread::Main(void)
123 {
124     SBVStoreDB db;
125     db.SetEnv(m_Env);
126     db.Open("bvdata.db", CBDB_RawFile::eReadWrite);
127 
128     for (unsigned i = 0; i < m_Recs; ++i) {
129         try {
130             CBDB_Transaction trans(*db.GetEnv(),
131                                 CBDB_Transaction::eTransASync,
132                                 CBDB_Transaction::eNoAssociation);
133             db.SetTransaction(&trans);
134 
135             db.thr_id = m_ThreadId;
136             db.rec_id = i;
137 
138             bm::bvector<> bv;
139             FillBV(&bv, 1000000);
140 
141             db.WriteVector(bv, SBVStoreDB::eCompact);
142 
143             trans.Commit();
144         }
145         catch (CBDB_ErrnoException& ex)
146         {
147             if (ex.IsDeadLock()) {
148 
149                 // dead lock transaction is a Berkeley DB reality which can
150                 // happen when two or more threads are writing in the same
151                 // file concurrently.
152                 //
153                 // typically we want to abort current transaction
154                 // and repeat it with the same data.
155                 //
156                 // In this case we simply ignore the error.
157 
158                 NcbiCerr <<
159                  "Dead lock situation detected. Transaction aborted!"
160                  << NcbiEndl;
161             } else {
162                 throw;
163             }
164         }
165     } // for
166 
167     return (void*)0;
168 }
169 
170 //////////////////////////////////////////////////////////////////
171 //
172 // Structure implements simple database table with integer id primary key
173 //
174 //
175 
176 
177 ////////////////////////////////
178 /// Test application
179 ///
180 /// @internal
181 ///
182 class CBDB_TestBVStore : public CNcbiApplication
183 {
184 public:
185     void Init(void);
186     int Run(void);
187 };
188 
189 
Init(void)190 void CBDB_TestBVStore::Init(void)
191 {
192     SetDiagTrace(eDT_Enable);
193 
194     SetDiagPostLevel(eDiag_Warning);
195     SetDiagPostFlag(eDPF_File);
196     SetDiagPostFlag(eDPF_Line);
197     SetDiagPostFlag(eDPF_Trace);
198 
199     unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
200     arg_desc->SetUsageContext("test_bdb_threads",
201                               "test BDB library with threads");
202 
203     arg_desc->AddOptionalKey("recs",
204                              "recs",
205                              "Number of records to load per thread",
206                              CArgDescriptions::eInteger);
207 
208     arg_desc->AddOptionalKey("threads",
209                              "threads",
210                              "Number of concurrent threads",
211                              CArgDescriptions::eInteger);
212 
213     SetupArgDescriptions(arg_desc.release());
214 }
215 
216 
Run(void)217 int CBDB_TestBVStore::Run(void)
218 {
219     const CArgs& args = GetArgs();
220 
221     NcbiCout << "Initialize transactional environment..." << flush;
222 
223     string path = "./bvstore_test_thr";
224     path = CDirEntry::AddTrailingPathSeparator(path);
225 
226     {{
227         CDir dir(path);
228         if ( !dir.Exists() ) {
229             dir.Create();
230         }
231     }}
232 
233 
234     CBDB_Env env;
235 
236     // Error file for Berkeley DB
237     string err_file = path + "err_bvstore_test.log";
238     env.OpenErrFile(err_file.c_str());
239 
240     env.SetLogFileMax(50 * 1024 * 1024);
241 
242     // use in-memory logging to improve performance (warning: not durable)
243     env.SetLogInMemory(true);
244     env.SetLogBSize(100 * 1024 * 1024);
245 
246     env.SetLogAutoRemove(true);
247 
248     env.SetCacheSize(5 * 1024 * 1024);
249 
250     env.OpenWithTrans(path.c_str(), CBDB_Env::eThreaded /*| CBDB_Env::eRunRecovery*/);
251     env.SetDirectDB(true);
252     env.SetDirectLog(true);
253     env.SetLockTimeout(10 * 1000000); // 10 sec
254     env.SetTasSpins(5);
255 
256     {{
257         SBVStoreDB db;
258         db.SetEnv(env);
259         db.Open("bvdata.db", CBDB_RawFile::eCreate);
260 
261         CBDB_Transaction trans(*db.GetEnv(),
262                             CBDB_Transaction::eTransASync,
263                             CBDB_Transaction::eNoAssociation);
264         db.SetTransaction(&trans);
265 
266         db.thr_id = 1;
267         db.rec_id = 1;
268 
269         bm::bvector<> bv;
270         FillBV(&bv, 1000000);
271 
272         db.WriteVector(bv, SBVStoreDB::eCompact);
273 
274         trans.Commit();
275     }}
276     {{
277         SBVStoreDB db;
278         db.SetEnv(env);
279         db.Open("bvdata.db", CBDB_RawFile::eReadWrite);
280 
281         db.thr_id = 1;
282         db.rec_id = 1;
283         bm::bvector<> bv;
284 
285         EBDB_ErrCode err = db.ReadVector(&bv);
286         assert(err == eBDB_Ok);
287         NcbiCout << bv.count() << NcbiEndl;
288 
289     }}
290 
291     NcbiCout << "Ok." << NcbiEndl;
292 
293     try
294     {
295         unsigned kProcessingRecs = 100000;
296         if (args["recs"]) {
297             kProcessingRecs = args["recs"].AsInteger();
298         }
299 
300         unsigned kThreadCount = 2;
301         if (args["threads"]) {
302             kThreadCount = args["threads"].AsInteger();
303         }
304 
305 
306         vector<CRef<CThread> > thread_list;
307         thread_list.reserve(kThreadCount);
308 
309         CStopWatch sw(CStopWatch::eStart);
310 
311         NcbiCout << "Starting " << kThreadCount << " threads..." << flush;
312         for (unsigned i = 0; i < kThreadCount; ++i) {
313             CRef<CThread> bdb_thread(
314                 new CBDB_BVTestThread(env, i, kProcessingRecs));
315             thread_list.push_back(bdb_thread);
316             bdb_thread->Run();
317         } // for
318         NcbiCout << "Ok." << NcbiEndl;
319 
320         NcbiCout << "Waiting for threads to finish..." << flush;
321 
322         NON_CONST_ITERATE(vector<CRef<CThread> >, it, thread_list) {
323             CRef<CThread> bdb_thread(*it);
324             bdb_thread->Join();
325         }
326 
327         NcbiCout << "Ok." << NcbiEndl;
328 
329         double elapsed = sw.Elapsed();
330         unsigned total_recs = kProcessingRecs * kThreadCount;
331         double rate = total_recs / elapsed;
332         double rate_per_thread = kProcessingRecs / elapsed;
333 
334         NcbiCout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
335         NcbiCout << "Elapsed: " << elapsed << " secs." << NcbiEndl;
336         NcbiCout << "Transaction rate: " << rate << " recs/secs." << NcbiEndl;
337         NcbiCout << "Rate per thread: " << rate_per_thread << " recs/secs."
338                  << NcbiEndl;
339     }
340     catch (CBDB_ErrnoException& ex)
341     {
342         cout << "Error! DBD errno exception:" << ex.what();
343         return 1;
344     }
345     catch (CBDB_LibException& ex)
346     {
347         cout << "Error! DBD library exception:" << ex.what();
348         return 1;
349     }
350 
351     cout << endl;
352     cout << "TEST execution completed successfully!" << endl << endl;
353     return 0;
354 }
355 
356 
357 ///////////////////////////////////
358 // APPLICATION OBJECT  and  MAIN
359 //
360 
main(int argc,const char * argv[])361 int main(int argc, const char* argv[])
362 {
363     // Execute main application function
364     return CBDB_TestBVStore().AppMain(argc, argv);
365 }
366