1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 // $Id: limitedorderby.cpp 9581 2013-05-31 13:46:14Z pleblanc $
19
20
21 #include <iostream>
22 //#define NDEBUG
23 #include <cassert>
24 #include <string>
25 using namespace std;
26
27 #include <boost/shared_array.hpp>
28 using namespace boost;
29
30 #include "errorids.h"
31 #include "exceptclasses.h"
32 using namespace logging;
33
34 #include "rowgroup.h"
35 using namespace rowgroup;
36
37 #include "jlf_common.h"
38 #include "limitedorderby.h"
39
40 using namespace ordering;
41
42 namespace joblist
43 {
44
45 const uint64_t LimitedOrderBy::fMaxUncommited = 102400; // 100KiB - make it configurable?
46
47 // LimitedOrderBy class implementation
LimitedOrderBy()48 LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0)
49 {
50 fRule.fIdbCompare = this;
51 }
52
53
~LimitedOrderBy()54 LimitedOrderBy::~LimitedOrderBy()
55 {
56 }
57
58
initialize(const RowGroup & rg,const JobInfo & jobInfo,bool invertRules,bool isMultiThreaded)59 void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules, bool isMultiThreaded)
60 {
61 fRm = jobInfo.rm;
62 fSessionMemLimit = jobInfo.umMemLimit;
63 fErrorCode = ERR_LIMIT_TOO_BIG;
64
65 // locate column position in the rowgroup
66 map<uint32_t, uint32_t> keyToIndexMap;
67
68 for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
69 {
70 if (keyToIndexMap.find(rg.getKeys()[i]) == keyToIndexMap.end())
71 keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
72 }
73
74 vector<pair<uint32_t, bool> >::const_iterator i = jobInfo.orderByColVec.begin();
75
76 for ( ; i != jobInfo.orderByColVec.end(); i++)
77 {
78 map<uint32_t, uint32_t>::iterator j = keyToIndexMap.find(i->first);
79 idbassert(j != keyToIndexMap.end());
80
81 fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules));
82 }
83
84 // limit row count info
85 if (isMultiThreaded)
86 {
87 // CS can't apply offset at the first stage
88 // otherwise it looses records.
89 fStart = 0;
90 fCount = jobInfo.limitStart+jobInfo.limitCount;
91 }
92 else
93 {
94 fStart = jobInfo.limitStart;
95 fCount = jobInfo.limitCount;
96 }
97
98 IdbOrderBy::initialize(rg);
99 }
100
101 // This must return a proper number of key columns and
102 // not just a column count.
getKeyLength() const103 uint64_t LimitedOrderBy::getKeyLength() const
104 {
105 return fRow0.getColumnCount();
106 }
107
108
processRow(const rowgroup::Row & row)109 void LimitedOrderBy::processRow(const rowgroup::Row& row)
110 {
111 // check if this is a distinct row
112 if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end())
113 return;
114
115 // @bug5312, limit count is 0, do nothing.
116 if (fCount == 0)
117 return;
118
119 // if the row count is less than the limit
120 if (fOrderByQueue.size() < fStart + fCount)
121 {
122 copyRow(row, &fRow0);
123 OrderByRow newRow(fRow0, fRule);
124 fOrderByQueue.push(newRow);
125
126 uint64_t memSizeInc = sizeof(newRow);
127 fUncommitedMemory += memSizeInc;
128 if (fUncommitedMemory >= fMaxUncommited)
129 {
130 fMemSize += fUncommitedMemory;
131 if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
132 {
133 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
134 << __FILE__ << ":" << __LINE__;
135 throw IDBExcept(fErrorCode);
136 }
137 fUncommitedMemory = 0;
138 }
139
140 // add to the distinct map
141 if (fDistinct)
142 fDistinctMap->insert(fRow0.getPointer());
143
144 fRowGroup.incRowCount();
145 fRow0.nextRow();
146
147 if (fRowGroup.getRowCount() >= fRowsPerRG)
148 {
149 fDataQueue.push(fData);
150 uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
151 fMemSize += newSize;
152
153 if (!fRm->getMemory(newSize, fSessionMemLimit))
154 {
155 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
156 << " @" << __FILE__ << ":" << __LINE__;
157 throw IDBExcept(fErrorCode);
158 }
159
160 fData.reinit(fRowGroup, fRowsPerRG);
161 fRowGroup.setData(&fData);
162 fRowGroup.resetRowGroup(0);
163 fRowGroup.getRow(0, &fRow0);
164 }
165 }
166
167 else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
168 {
169 OrderByRow swapRow = fOrderByQueue.top();
170 row1.setData(swapRow.fData);
171 copyRow(row, &row1);
172
173 if (fDistinct)
174 {
175 fDistinctMap->erase(fOrderByQueue.top().fData);
176 fDistinctMap->insert(row1.getPointer());
177 }
178
179 fOrderByQueue.pop();
180 fOrderByQueue.push(swapRow);
181 }
182 }
183
184 /*
185 * The f() copies top element from an ordered queue into a row group. It
186 * does this backwards to syncronise sorting orientation with the server.
187 * The top row from the queue goes last into the returned set.
188 */
finalize()189 void LimitedOrderBy::finalize()
190 {
191 if (fUncommitedMemory > 0)
192 {
193 fMemSize += fUncommitedMemory;
194 if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
195 {
196 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
197 << __FILE__ << ":" << __LINE__;
198 throw IDBExcept(fErrorCode);
199 }
200 fUncommitedMemory = 0;
201 }
202
203 queue<RGData> tempQueue;
204 if (fRowGroup.getRowCount() > 0)
205 fDataQueue.push(fData);
206
207 if (fOrderByQueue.size() > 0)
208 {
209 // *DRRTUY Very memory intensive. CS needs to account active
210 // memory only and release memory if needed.
211 uint64_t memSizeInc = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
212 fMemSize += memSizeInc;
213
214 if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
215 {
216 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
217 << " @" << __FILE__ << ":" << __LINE__;
218 throw IDBExcept(fErrorCode);
219 }
220
221 uint64_t offset = 0;
222 uint64_t i = 0;
223 // Reduce queue size by an offset value if it applicable.
224 uint64_t queueSizeWoOffset = fOrderByQueue.size() > fStart ?
225 fOrderByQueue.size() - fStart : 0;
226 list<RGData> tempRGDataList;
227
228 if ( fCount <= queueSizeWoOffset )
229 {
230 offset = fCount % fRowsPerRG;
231 if(!offset && fCount > 0)
232 offset = fRowsPerRG;
233 }
234 else
235 {
236 offset = queueSizeWoOffset % fRowsPerRG;
237 if(!offset && queueSizeWoOffset > 0)
238 offset = fRowsPerRG;
239 }
240
241 list<RGData>::iterator tempListIter = tempRGDataList.begin();
242
243 i = 0;
244 uint32_t rSize = fRow0.getSize();
245 uint64_t preLastRowNumb = fRowsPerRG - 1;
246 fData.reinit(fRowGroup, fRowsPerRG);
247 fRowGroup.setData(&fData);
248 fRowGroup.resetRowGroup(0);
249 // *DRRTUY This approach won't work with
250 // OFSET > fRowsPerRG
251 offset = offset != 0 ? offset - 1 : offset;
252 fRowGroup.getRow(offset, &fRow0);
253
254 while ((fOrderByQueue.size() > fStart) && (i++ < fCount))
255 {
256 const OrderByRow& topRow = fOrderByQueue.top();
257 row1.setData(topRow.fData);
258 copyRow(row1, &fRow0);
259 fRowGroup.incRowCount();
260 offset--;
261 fRow0.prevRow(rSize);
262 fOrderByQueue.pop();
263
264 // if RG has fRowsPerRG rows
265 if(offset == (uint64_t)-1)
266 {
267 tempRGDataList.push_front(fData);
268 fMemSize += memSizeInc;
269
270 if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
271 {
272 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
273 << " @" << __FILE__ << ":" << __LINE__;
274 throw IDBExcept(fErrorCode);
275 }
276
277 fData.reinit(fRowGroup, fRowsPerRG);
278 fRowGroup.setData(&fData);
279 fRowGroup.resetRowGroup(0); // ?
280 fRowGroup.getRow(preLastRowNumb, &fRow0);
281 offset = preLastRowNumb;
282 }
283 }
284 // Push the last/only group into the queue.
285 if (fRowGroup.getRowCount() > 0)
286 tempRGDataList.push_front(fData);
287
288 for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++)
289 tempQueue.push(*tempListIter);
290
291 fDataQueue = tempQueue;
292 }
293 }
294
295
toString() const296 const string LimitedOrderBy::toString() const
297 {
298 ostringstream oss;
299 oss << "OrderBy cols: ";
300 vector<IdbSortSpec>::const_iterator i = fOrderByCond.begin();
301
302 for (; i != fOrderByCond.end(); i++)
303 oss << "(" << i->fIndex << ","
304 << ((i->fAsc) ? "Asc" : "Desc") << ","
305 << ((i->fNf) ? "null first" : "null last") << ") ";
306
307 oss << " start-" << fStart << " count-" << fCount;
308
309 if (fDistinct)
310 oss << " distinct";
311
312 oss << endl;
313
314 return oss.str();
315 }
316
317
318 }
319 // vim:ts=4 sw=4:
320
321