1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /***************************************************************************
20  *
21  *   $Id: blockrequestprocessor.cpp 2055 2013-02-08 19:09:09Z pleblanc $
22  *
23  *   jrodriguez@calpont.com   *
24  *                                                                         *
25  ***************************************************************************/
26 
27 #include <sys/time.h>
28 #include <sstream>
29 #include <string>
30 #include <list>
31 using namespace std;
32 
33 #include "blockrequestprocessor.h"
34 #include "rwlock_local.h"
35 #include "dbrm.h"
36 #include "pp_logger.h"
37 #include "mcsconfig.h"
38 
39 namespace dbbc
40 {
41 
BlockRequestProcessor(uint32_t numBlcks,int thrCount,int blocksPerRead,uint32_t deleteBlocks,uint32_t blckSz)42 BlockRequestProcessor::BlockRequestProcessor(uint32_t numBlcks,
43         int thrCount,
44         int blocksPerRead,
45         uint32_t deleteBlocks,
46         uint32_t blckSz) :
47     fbMgr(numBlcks, blckSz, deleteBlocks),
48     fIOMgr(fbMgr, fBRPRequestQueue, thrCount, blocksPerRead)
49 {
50     //pthread_mutex_init(&check_mutex, NULL);
51     config::Config* fConfig = config::Config::makeConfig();
52     string val = fConfig->getConfig("DBBC", "BRPTracing");
53     int temp = 0;
54 #ifdef _MSC_VER
55     int tid = GetCurrentThreadId();
56 #else
57     pthread_t tid = pthread_self();
58 #endif
59 
60     if (val.length() > 0) temp = static_cast<int>(config::Config::fromText(val));
61 
62     if (temp > 0)
63         fTrace = true;
64     else
65         fTrace = false;
66 
67     if (fTrace)
68     {
69         ostringstream brpLogFileName;
70 #ifdef _MSC_VER
71         brpLogFileName << "C:/Calpont/log/trace/brp." << tid;
72 #else
73         brpLogFileName << MCSLOGDIR << "/trace/brp." << tid;
74 #endif
75         fLogFile.open(brpLogFileName.str().c_str(), ios_base::app | ios_base::ate);
76     }
77 }
78 
79 
~BlockRequestProcessor()80 BlockRequestProcessor::~BlockRequestProcessor()
81 {
82     //pthread_mutex_destroy(&check_mutex);
83     if (fTrace)
84         fLogFile.close();
85 }
86 
stop()87 void BlockRequestProcessor::stop()
88 {
89     fBRPRequestQueue.stop();
90     fIOMgr.stop();
91 }
92 
check(const BRM::InlineLBIDRange & range,const BRM::QueryContext & ver,const BRM::VER_t txn,const int compType,uint32_t & lbidCount)93 int BlockRequestProcessor::check(const BRM::InlineLBIDRange& range, const BRM::QueryContext& ver, const BRM::VER_t txn, const int compType, uint32_t& lbidCount)
94 {
95     uint64_t maxLbid = range.start; // highest existent lbid
96     uint64_t rangeLen = range.size;
97     uint64_t idx;
98     uint64_t adjSz;
99     struct timespec start_tm;
100     lbidCount = 0;
101 
102     if (fTrace)
103         clock_gettime(CLOCK_MONOTONIC, &start_tm);
104 
105     for (idx = 0; fbMgr.exists(maxLbid, ver.currentScn) == true && idx < rangeLen; maxLbid++, idx++)
106         (void)0;
107 
108     if (idx == rangeLen)   // range is already loaded
109     {
110         if (fTrace)
111         {
112             uint16_t dbroot;
113             uint32_t partNum;
114             uint16_t segNum;
115             uint32_t fbo;
116             BRM::OID_t oid;
117             fdbrm.lookupLocal(maxLbid, ver.currentScn, false, oid, dbroot, partNum, segNum, fbo);
118             fLogFile
119                     << oid << " "
120                     << maxLbid << " "
121                     << fbo << " "
122                     << rangeLen << " "
123                     << 0 << " "
124                     << 0 << " "
125                     << 0 << " "
126                     << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
127                     << endl;
128         }
129 
130         return 0;
131     }
132 
133     adjSz = rangeLen - idx;
134     BRM::InlineLBIDRange adjRange;
135     adjRange.start = maxLbid;
136     adjRange.size = adjSz;
137     fileRequest rqstBlk(adjRange, ver, txn, compType);
138     check(rqstBlk);
139 
140     if (rqstBlk.RequestStatus() == fileRequest::BRM_LOOKUP_ERROR)
141         throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_BRM_LOOKUP), logging::ERR_BRM_LOOKUP);
142     else if (rqstBlk.RequestStatus() == fileRequest::FS_EINVAL)
143         throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
144                                  logging::ERR_O_DIRECT);
145     else if (rqstBlk.RequestStatus() == fileRequest::FS_ENOENT)
146         throw logging::IDBExcept(
147             logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_ENOENT),
148             logging::ERR_ENOENT);
149     else if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
150         throw runtime_error(rqstBlk.RequestStatusStr());
151 
152     lbidCount = rqstBlk.BlocksRead();
153 
154     if (fTrace)
155     {
156         uint16_t dbroot;
157         uint32_t partNum;
158         uint16_t segNum;
159         uint32_t fbo;
160         BRM::OID_t oid;
161         fdbrm.lookupLocal(maxLbid, ver.currentScn, false, oid, dbroot, partNum, segNum, fbo);
162         fLogFile
163                 << oid << " "
164                 << maxLbid << " "
165                 << fbo << " "
166                 << rangeLen << " "
167                 << adjSz << " "
168                 << rqstBlk.BlocksRead() << " "
169                 << rqstBlk.BlocksLoaded() << " "
170                 << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
171                 << endl;
172     }
173 
174     return rqstBlk.BlocksLoaded();
175 } // check
176 
check(fileRequest & rqstBlk)177 int BlockRequestProcessor::check(fileRequest& rqstBlk)
178 {
179     rqstBlk.frMutex().lock();
180     rqstBlk.SetPredicate(fileRequest::SENDING);
181     sendRequest(rqstBlk); 	// start file read request
182 
183     while (rqstBlk.frPredicate() < fileRequest::COMPLETE)
184         rqstBlk.frCond().wait(rqstBlk.frMutex());
185 
186     rqstBlk.frMutex().unlock();
187 
188     return 0;
189 }
190 
191 // For future use.  Not currently used.
check(BRM::LBID_t lbid,const BRM::QueryContext & ver,BRM::VER_t txn,bool flg,int compType,bool & wasBlockInCache)192 int BlockRequestProcessor::check(BRM::LBID_t lbid, const BRM::QueryContext& ver, BRM::VER_t txn, bool flg, int compType, bool& wasBlockInCache)
193 {
194     if (fbMgr.exists(lbid, ver.currentScn) == true)
195     {
196         wasBlockInCache = true;
197         return 0;
198     }
199     else
200     {
201         wasBlockInCache = false;
202         fileRequest rqstBlk(lbid, ver, flg, txn, compType);
203         int ret = check(rqstBlk);
204 
205         if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
206         {
207             throw runtime_error(rqstBlk.RequestStatusStr());
208         }
209 
210         return ret;
211     }
212 }
213 
getBlock(const BRM::LBID_t & lbid,const BRM::QueryContext & ver,BRM::VER_t txn,int compType,void * bufferPtr,bool vbFlg,bool & wasCached,bool * versioned,bool insertIntoCache,bool readFromCache)214 int BlockRequestProcessor::getBlock(const BRM::LBID_t& lbid, const BRM::QueryContext& ver, BRM::VER_t txn,
215         int compType, void* bufferPtr, bool vbFlg, bool& wasCached, bool* versioned, bool insertIntoCache,
216         bool readFromCache)
217 {
218     if (readFromCache)
219     {
220         HashObject_t hashObj(lbid, ver.currentScn, 0);
221         wasCached = fbMgr.find(hashObj, bufferPtr);
222 
223         if (wasCached)
224             return 1;
225     }
226 
227     wasCached = false;
228     fileRequest rqstBlk(lbid, ver, vbFlg, txn, compType, (uint8_t*) bufferPtr, insertIntoCache);
229     check(rqstBlk);
230 
231     if (rqstBlk.RequestStatus() == fileRequest::BRM_LOOKUP_ERROR)
232     {
233         ostringstream os;
234         os << "BRP::getBlock(): got a BRM lookup error.  LBID=" << lbid << " ver=" << ver << " txn="
235            << txn << " vbFlg=" << (int) vbFlg;
236         primitiveprocessor::Logger logger;
237         logger.logMessage(os.str(), false);
238         throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_BRM_LOOKUP), logging::ERR_BRM_LOOKUP);
239     }
240     else if (rqstBlk.RequestStatus() == fileRequest::FS_EINVAL)
241     {
242         throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
243                                  logging::ERR_O_DIRECT);
244     }
245     else if (rqstBlk.RequestStatus() == fileRequest::FS_ENOENT)
246     {
247         throw logging::IDBExcept(
248             logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_ENOENT),
249             logging::ERR_ENOENT);
250     }
251     else if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
252     {
253         throw runtime_error(rqstBlk.RequestStatusStr());
254     }
255 
256     if (versioned)
257         *versioned = rqstBlk.versioned();
258 
259     return 1;
260 }
261 
getCachedBlocks(const BRM::LBID_t * lbids,const BRM::VER_t * vers,uint8_t ** ptrs,bool * wasCached,uint32_t count)262 int BlockRequestProcessor::getCachedBlocks(const BRM::LBID_t* lbids, const BRM::VER_t* vers,
263         uint8_t** ptrs, bool* wasCached, uint32_t count)
264 {
265     return fbMgr.bulkFind(lbids, vers, ptrs, wasCached, count);
266 }
267 
268 
269 } // namespace dbbc
270