1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***************************************************************************
19 *
20 * $Id: blockrequestprocessor.cpp 725 2008-09-26 16:26:47Z jrodriguez $
21 *
22 * jrodriguez@calpont.com *
23 * *
24 ***************************************************************************/
25
26 #include "blockrequestprocessor.h"
27 #include "rwlock_local.h"
28 #include "dbrm.h"
29 #include <sys/time.h>
30 #include <pthread.h>
31 #include <sstream>
32 #include <string>
33 #include <list>
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 using namespace std;
36
37 namespace dbbc
38 {
39
BlockRequestProcessor(uint32_t numBlcks,int thrCount,int blocksPerRead,uint32_t deleteBlocks,uint32_t blckSz)40 BlockRequestProcessor::BlockRequestProcessor(uint32_t numBlcks,
41 int thrCount,
42 int blocksPerRead,
43 uint32_t deleteBlocks,
44 uint32_t blckSz) :
45 fbMgr(numBlcks, blckSz, deleteBlocks),
46 fIOMgr(fbMgr, fBRPRequestQueue, thrCount, blocksPerRead)
47 {
48 }
49
50
~BlockRequestProcessor()51 BlockRequestProcessor::~BlockRequestProcessor()
52 {
53 }
54
stop()55 void BlockRequestProcessor::stop()
56 {
57 fBRPRequestQueue.stop();
58 fIOMgr.stop();
59 }
60
61
check(const BRM::InlineLBIDRange & range,const BRM::VER_t ver,uint32_t & lbidCount)62 int BlockRequestProcessor::check(const BRM::InlineLBIDRange& range, const BRM::VER_t ver, uint32_t& lbidCount)
63 {
64 uint64_t maxLbid = range.start; // highest existent lbid
65 uint64_t rangeLen = range.size;
66 uint64_t idx;
67 uint64_t adjSz;
68 struct timespec start_tm;
69
70 if (fTrace) clock_gettime(CLOCK_MONOTONIC, &start_tm);
71
72 for (idx = 0; fbMgr.exists(maxLbid, ver) == true && idx < rangeLen; maxLbid++, idx++);
73
74 if (idx == rangeLen) // range is already loaded
75 {
76 uint32_t fbo;
77 BRM::OID_t oid;
78 fdbrm.lookup(maxLbid, ver, false, oid, fbo);
79 fLogFile
80 << oid << " "
81 << maxLbid << " "
82 << fbo << " "
83 << rangeLen << " "
84 << 0 << " "
85 << 0 << " "
86 << 0 << " "
87 << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
88 << endl;
89 return 0;
90 }
91
92 adjSz = rangeLen - idx;
93 BRM::InlineLBIDRange adjRange;
94 adjRange.start = maxLbid;
95 adjRange.size = adjSz;
96 fileRequest rqstBlk(adjRange, ver);
97 check(rqstBlk);
98 lbidCount = rqstBlk.BlocksRead();
99
100 if (fTrace)
101 {
102 uint32_t fbo;
103 BRM::OID_t oid;
104 fdbrm.lookup(maxLbid, ver, false, oid, fbo);
105 fLogFile
106 << oid << " "
107 << maxLbid << " "
108 << fbo << " "
109 << rangeLen << " "
110 << adjSz << " "
111 << rqstBlk.BlocksRead() << " "
112 << rqstBlk.BlocksLoaded() << " "
113 << right << fixed << ((double)(start_tm.tv_sec + (1.e-9 * start_tm.tv_nsec)))
114 << endl;
115 }
116
117 return rqstBlk.BlocksLoaded();
118 } // check
119
120
check(fileRequest & rqstBlk)121 int BlockRequestProcessor::check(fileRequest& rqstBlk)
122 {
123 pthread_mutex_lock(&rqstBlk.frMutex());
124 rqstBlk.SetPredicate(fileRequest::SENDING);
125 sendRequest(rqstBlk); // start file read request
126
127 while (rqstBlk.frPredicate() < fileRequest::COMPLETE)
128 pthread_cond_wait(&rqstBlk.frCond(), &rqstBlk.frMutex());
129
130 pthread_mutex_unlock(&rqstBlk.frMutex());
131
132 return 0;
133 }
134
135
check(BRM::LBID_t lbid,BRM::VER_t ver,bool flg,bool & wasBlockInCache)136 int BlockRequestProcessor::check(BRM::LBID_t lbid, BRM::VER_t ver, bool flg, bool& wasBlockInCache)
137 {
138 if (fbMgr.exists(lbid, ver) == true)
139 {
140 wasBlockInCache = true;
141 return 0;
142 }
143 else
144 {
145 wasBlockInCache = false;
146 fileRequest rqstBlk(lbid, ver, flg);
147 int ret = check(rqstBlk);
148 return ret;
149 }
150 }
151
152
sendRequest(fileRequest & blk)153 int BlockRequestProcessor::sendRequest(fileRequest& blk)
154 {
155 int ret = fBRPRequestQueue.push(blk);
156 return ret;
157 }
158
159
read(const BRM::InlineLBIDRange & range,FileBufferList_t & readList,const BRM::VER_t ver)160 const int BlockRequestProcessor::read(const BRM::InlineLBIDRange& range, FileBufferList_t& readList, const BRM::VER_t ver)
161 {
162 int blksLoaded = 0;
163 HashObject_t fb = {0, 0, 0};
164
165 for (int idx = 0; (uint64_t)idx < range.size; idx++)
166 {
167 fb.lbid = range.start + idx;
168 fb.ver = ver;
169 fb.poolIdx = 0;
170 FileBuffer fbRet(-1, -1);
171 bool ret = false; //fbMgr.find(fb, fbRet);
172
173 if (ret)
174 {
175 blksLoaded++;
176 readList.push_back(fbRet);
177 }
178 }
179
180 return blksLoaded;
181 }
182
183
getBlockPtr(const BRM::LBID_t lbid,const BRM::VER_t ver)184 FileBuffer* BlockRequestProcessor::getBlockPtr(const BRM::LBID_t lbid, const BRM::VER_t ver )
185 {
186
187 HashObject_t hashObj = {lbid, ver, 0};
188 FileBuffer* fb = fbMgr.findPtr(hashObj);
189 return fb;
190 }
191
192
read(const BRM::LBID_t & lbid,const BRM::VER_t & ver,FileBuffer & fb)193 const int BlockRequestProcessor::read(const BRM::LBID_t& lbid, const BRM::VER_t& ver, FileBuffer& fb)
194 {
195
196 HashObject_t hashObj = {lbid, ver, 0};
197 bool ret = fbMgr.find(hashObj, fb);
198
199 if (ret == true)
200 return 1;
201 else
202 return 0;
203 }
204
read(const BRM::LBID_t & lbid,const BRM::VER_t & ver,void * bufferPtr)205 const int BlockRequestProcessor::read(const BRM::LBID_t& lbid, const BRM::VER_t& ver, void* bufferPtr)
206 {
207 HashObject_t hashObj = {lbid, ver, 0};
208 bool ret = fbMgr.find(hashObj, bufferPtr);
209
210 if (ret == true)
211 return 1;
212 else
213 return 0;
214 }
215
getBlock(const BRM::LBID_t & lbid,const BRM::VER_t & ver,void * bufferPtr,bool flg,bool & wasCached)216 const int BlockRequestProcessor::getBlock(const BRM::LBID_t& lbid, const BRM::VER_t& ver, void* bufferPtr, bool flg, bool& wasCached)
217 {
218 HashObject_t hashObj = {lbid, ver, 0};
219 wasCached = fbMgr.find(hashObj, bufferPtr);
220
221 if (wasCached)
222 return 1;
223
224 wasCached = false;
225 fileRequest rqstBlk(lbid, ver, flg, (uint8_t*) bufferPtr);
226 check(rqstBlk);
227 return 1;
228 }
229
exists(BRM::LBID_t lbid,BRM::VER_t ver)230 bool BlockRequestProcessor::exists(BRM::LBID_t lbid, BRM::VER_t ver)
231 {
232 HashObject_t ho = {lbid, ver, 0};
233
234 return fbMgr.exists(ho);
235 }
236
flushCache()237 void BlockRequestProcessor::flushCache()
238 {
239 fbMgr.flushCache();
240 }
241 /**
242 const uint32_t BlockRequestProcessor::resize(const uint32_t s)
243 {
244 int rc = fbMgr.resize(s);
245 return rc;
246 }
247 **/
formatLRUList(ostream & os) const248 ostream& BlockRequestProcessor::formatLRUList(ostream& os) const
249 {
250 return fbMgr.formatLRUList(os);
251 }
252
253 } // namespace dbbc
254