1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /***************************************************************************
20 *
21 * $Id: blockrequestprocessor.cpp 2055 2013-02-08 19:09:09Z pleblanc $
22 *
23 * jrodriguez@calpont.com *
24 * *
25 ***************************************************************************/
26
27 #include <sys/time.h>
28 #include <sstream>
29 #include <string>
30 #include <list>
31 using namespace std;
32
33 #include "blockrequestprocessor.h"
34 #include "rwlock_local.h"
35 #include "dbrm.h"
36 #include "pp_logger.h"
37 #include "mcsconfig.h"
38
39 namespace dbbc
40 {
41
BlockRequestProcessor(uint32_t numBlcks,int thrCount,int blocksPerRead,uint32_t deleteBlocks,uint32_t blckSz)42 BlockRequestProcessor::BlockRequestProcessor(uint32_t numBlcks,
43 int thrCount,
44 int blocksPerRead,
45 uint32_t deleteBlocks,
46 uint32_t blckSz) :
47 fbMgr(numBlcks, blckSz, deleteBlocks),
48 fIOMgr(fbMgr, fBRPRequestQueue, thrCount, blocksPerRead)
49 {
50 //pthread_mutex_init(&check_mutex, NULL);
51 config::Config* fConfig = config::Config::makeConfig();
52 string val = fConfig->getConfig("DBBC", "BRPTracing");
53 int temp = 0;
54 #ifdef _MSC_VER
55 int tid = GetCurrentThreadId();
56 #else
57 pthread_t tid = pthread_self();
58 #endif
59
60 if (val.length() > 0) temp = static_cast<int>(config::Config::fromText(val));
61
62 if (temp > 0)
63 fTrace = true;
64 else
65 fTrace = false;
66
67 if (fTrace)
68 {
69 ostringstream brpLogFileName;
70 #ifdef _MSC_VER
71 brpLogFileName << "C:/Calpont/log/trace/brp." << tid;
72 #else
73 brpLogFileName << MCSLOGDIR << "/trace/brp." << tid;
74 #endif
75 fLogFile.open(brpLogFileName.str().c_str(), ios_base::app | ios_base::ate);
76 }
77 }
78
79
~BlockRequestProcessor()80 BlockRequestProcessor::~BlockRequestProcessor()
81 {
82 //pthread_mutex_destroy(&check_mutex);
83 if (fTrace)
84 fLogFile.close();
85 }
86
stop()87 void BlockRequestProcessor::stop()
88 {
89 fBRPRequestQueue.stop();
90 fIOMgr.stop();
91 }
92
check(const BRM::InlineLBIDRange & range,const BRM::QueryContext & ver,const BRM::VER_t txn,const int compType,uint32_t & lbidCount)93 int BlockRequestProcessor::check(const BRM::InlineLBIDRange& range, const BRM::QueryContext& ver, const BRM::VER_t txn, const int compType, uint32_t& lbidCount)
94 {
95 uint64_t maxLbid = range.start; // highest existent lbid
96 uint64_t rangeLen = range.size;
97 uint64_t idx;
98 uint64_t adjSz;
99 struct timespec start_tm;
100 lbidCount = 0;
101
102 if (fTrace)
103 clock_gettime(CLOCK_MONOTONIC, &start_tm);
104
105 for (idx = 0; fbMgr.exists(maxLbid, ver.currentScn) == true && idx < rangeLen; maxLbid++, idx++)
106 (void)0;
107
108 if (idx == rangeLen) // range is already loaded
109 {
110 if (fTrace)
111 {
112 uint16_t dbroot;
113 uint32_t partNum;
114 uint16_t segNum;
115 uint32_t fbo;
116 BRM::OID_t oid;
117 fdbrm.lookupLocal(maxLbid, ver.currentScn, false, oid, dbroot, partNum, segNum, fbo);
118 fLogFile
119 << oid << " "
120 << maxLbid << " "
121 << fbo << " "
122 << rangeLen << " "
123 << 0 << " "
124 << 0 << " "
125 << 0 << " "
126 << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
127 << endl;
128 }
129
130 return 0;
131 }
132
133 adjSz = rangeLen - idx;
134 BRM::InlineLBIDRange adjRange;
135 adjRange.start = maxLbid;
136 adjRange.size = adjSz;
137 fileRequest rqstBlk(adjRange, ver, txn, compType);
138 check(rqstBlk);
139
140 if (rqstBlk.RequestStatus() == fileRequest::BRM_LOOKUP_ERROR)
141 throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_BRM_LOOKUP), logging::ERR_BRM_LOOKUP);
142 else if (rqstBlk.RequestStatus() == fileRequest::FS_EINVAL)
143 throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
144 logging::ERR_O_DIRECT);
145 else if (rqstBlk.RequestStatus() == fileRequest::FS_ENOENT)
146 throw logging::IDBExcept(
147 logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_ENOENT),
148 logging::ERR_ENOENT);
149 else if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
150 throw runtime_error(rqstBlk.RequestStatusStr());
151
152 lbidCount = rqstBlk.BlocksRead();
153
154 if (fTrace)
155 {
156 uint16_t dbroot;
157 uint32_t partNum;
158 uint16_t segNum;
159 uint32_t fbo;
160 BRM::OID_t oid;
161 fdbrm.lookupLocal(maxLbid, ver.currentScn, false, oid, dbroot, partNum, segNum, fbo);
162 fLogFile
163 << oid << " "
164 << maxLbid << " "
165 << fbo << " "
166 << rangeLen << " "
167 << adjSz << " "
168 << rqstBlk.BlocksRead() << " "
169 << rqstBlk.BlocksLoaded() << " "
170 << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
171 << endl;
172 }
173
174 return rqstBlk.BlocksLoaded();
175 } // check
176
check(fileRequest & rqstBlk)177 int BlockRequestProcessor::check(fileRequest& rqstBlk)
178 {
179 rqstBlk.frMutex().lock();
180 rqstBlk.SetPredicate(fileRequest::SENDING);
181 sendRequest(rqstBlk); // start file read request
182
183 while (rqstBlk.frPredicate() < fileRequest::COMPLETE)
184 rqstBlk.frCond().wait(rqstBlk.frMutex());
185
186 rqstBlk.frMutex().unlock();
187
188 return 0;
189 }
190
191 // For future use. Not currently used.
check(BRM::LBID_t lbid,const BRM::QueryContext & ver,BRM::VER_t txn,bool flg,int compType,bool & wasBlockInCache)192 int BlockRequestProcessor::check(BRM::LBID_t lbid, const BRM::QueryContext& ver, BRM::VER_t txn, bool flg, int compType, bool& wasBlockInCache)
193 {
194 if (fbMgr.exists(lbid, ver.currentScn) == true)
195 {
196 wasBlockInCache = true;
197 return 0;
198 }
199 else
200 {
201 wasBlockInCache = false;
202 fileRequest rqstBlk(lbid, ver, flg, txn, compType);
203 int ret = check(rqstBlk);
204
205 if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
206 {
207 throw runtime_error(rqstBlk.RequestStatusStr());
208 }
209
210 return ret;
211 }
212 }
213
getBlock(const BRM::LBID_t & lbid,const BRM::QueryContext & ver,BRM::VER_t txn,int compType,void * bufferPtr,bool vbFlg,bool & wasCached,bool * versioned,bool insertIntoCache,bool readFromCache)214 int BlockRequestProcessor::getBlock(const BRM::LBID_t& lbid, const BRM::QueryContext& ver, BRM::VER_t txn,
215 int compType, void* bufferPtr, bool vbFlg, bool& wasCached, bool* versioned, bool insertIntoCache,
216 bool readFromCache)
217 {
218 if (readFromCache)
219 {
220 HashObject_t hashObj(lbid, ver.currentScn, 0);
221 wasCached = fbMgr.find(hashObj, bufferPtr);
222
223 if (wasCached)
224 return 1;
225 }
226
227 wasCached = false;
228 fileRequest rqstBlk(lbid, ver, vbFlg, txn, compType, (uint8_t*) bufferPtr, insertIntoCache);
229 check(rqstBlk);
230
231 if (rqstBlk.RequestStatus() == fileRequest::BRM_LOOKUP_ERROR)
232 {
233 ostringstream os;
234 os << "BRP::getBlock(): got a BRM lookup error. LBID=" << lbid << " ver=" << ver << " txn="
235 << txn << " vbFlg=" << (int) vbFlg;
236 primitiveprocessor::Logger logger;
237 logger.logMessage(os.str(), false);
238 throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_BRM_LOOKUP), logging::ERR_BRM_LOOKUP);
239 }
240 else if (rqstBlk.RequestStatus() == fileRequest::FS_EINVAL)
241 {
242 throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
243 logging::ERR_O_DIRECT);
244 }
245 else if (rqstBlk.RequestStatus() == fileRequest::FS_ENOENT)
246 {
247 throw logging::IDBExcept(
248 logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_ENOENT),
249 logging::ERR_ENOENT);
250 }
251 else if (rqstBlk.RequestStatus() != fileRequest::SUCCESSFUL)
252 {
253 throw runtime_error(rqstBlk.RequestStatusStr());
254 }
255
256 if (versioned)
257 *versioned = rqstBlk.versioned();
258
259 return 1;
260 }
261
getCachedBlocks(const BRM::LBID_t * lbids,const BRM::VER_t * vers,uint8_t ** ptrs,bool * wasCached,uint32_t count)262 int BlockRequestProcessor::getCachedBlocks(const BRM::LBID_t* lbids, const BRM::VER_t* vers,
263 uint8_t** ptrs, bool* wasCached, uint32_t count)
264 {
265 return fbMgr.bulkFind(lbids, vers, ptrs, wasCached, count);
266 }
267
268
269 } // namespace dbbc
270