1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //  $Id: limitedorderby.cpp 9581 2013-05-31 13:46:14Z pleblanc $
19 
20 
21 #include <iostream>
22 //#define NDEBUG
23 #include <cassert>
24 #include <string>
25 using namespace std;
26 
27 #include <boost/shared_array.hpp>
28 using namespace boost;
29 
30 #include "errorids.h"
31 #include "exceptclasses.h"
32 using namespace logging;
33 
34 #include "rowgroup.h"
35 using namespace rowgroup;
36 
37 #include "jlf_common.h"
38 #include "limitedorderby.h"
39 
40 using namespace ordering;
41 
42 namespace joblist
43 {
44 
45 const uint64_t LimitedOrderBy::fMaxUncommited = 102400; // 100KiB - make it configurable?
46 
47 // LimitedOrderBy class implementation
LimitedOrderBy()48 LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0)
49 {
50     fRule.fIdbCompare = this;
51 }
52 
53 
~LimitedOrderBy()54 LimitedOrderBy::~LimitedOrderBy()
55 {
56 }
57 
58 
initialize(const RowGroup & rg,const JobInfo & jobInfo,bool invertRules,bool isMultiThreaded)59 void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules, bool isMultiThreaded)
60 {
61     fRm = jobInfo.rm;
62     fSessionMemLimit = jobInfo.umMemLimit;
63     fErrorCode = ERR_LIMIT_TOO_BIG;
64 
65     // locate column position in the rowgroup
66     map<uint32_t, uint32_t> keyToIndexMap;
67 
68     for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
69     {
70         if (keyToIndexMap.find(rg.getKeys()[i]) == keyToIndexMap.end())
71             keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
72     }
73 
74     vector<pair<uint32_t, bool> >::const_iterator i = jobInfo.orderByColVec.begin();
75 
76     for ( ; i != jobInfo.orderByColVec.end(); i++)
77     {
78         map<uint32_t, uint32_t>::iterator j = keyToIndexMap.find(i->first);
79         idbassert(j != keyToIndexMap.end());
80 
81         fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules));
82     }
83 
84     // limit row count info
85     if (isMultiThreaded)
86     {
87         // CS can't apply offset at the first stage
88         // otherwise it looses records.
89         fStart = 0;
90         fCount = jobInfo.limitStart+jobInfo.limitCount;
91     }
92     else
93     {
94         fStart = jobInfo.limitStart;
95         fCount = jobInfo.limitCount;
96     }
97 
98     IdbOrderBy::initialize(rg);
99 }
100 
101 // This must return a proper number of key columns and
102 // not just a column count.
getKeyLength() const103 uint64_t LimitedOrderBy::getKeyLength() const
104 {
105     return fRow0.getColumnCount();
106 }
107 
108 
processRow(const rowgroup::Row & row)109 void LimitedOrderBy::processRow(const rowgroup::Row& row)
110 {
111     // check if this is a distinct row
112     if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end())
113         return;
114 
115     // @bug5312, limit count is 0, do nothing.
116     if (fCount == 0)
117         return;
118 
119     // if the row count is less than the limit
120     if (fOrderByQueue.size() < fStart + fCount)
121     {
122         copyRow(row, &fRow0);
123         OrderByRow newRow(fRow0, fRule);
124         fOrderByQueue.push(newRow);
125 
126         uint64_t memSizeInc = sizeof(newRow);
127         fUncommitedMemory += memSizeInc;
128         if (fUncommitedMemory >= fMaxUncommited)
129         {
130             fMemSize += fUncommitedMemory;
131             if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
132             {
133                 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
134                      << __FILE__ << ":" << __LINE__;
135                 throw IDBExcept(fErrorCode);
136             }
137             fUncommitedMemory = 0;
138         }
139 
140         // add to the distinct map
141         if (fDistinct)
142             fDistinctMap->insert(fRow0.getPointer());
143 
144         fRowGroup.incRowCount();
145         fRow0.nextRow();
146 
147         if (fRowGroup.getRowCount() >= fRowsPerRG)
148         {
149             fDataQueue.push(fData);
150             uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
151             fMemSize += newSize;
152 
153             if (!fRm->getMemory(newSize, fSessionMemLimit))
154             {
155                 cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
156                      << " @" << __FILE__ << ":" << __LINE__;
157                 throw IDBExcept(fErrorCode);
158             }
159 
160             fData.reinit(fRowGroup, fRowsPerRG);
161             fRowGroup.setData(&fData);
162             fRowGroup.resetRowGroup(0);
163             fRowGroup.getRow(0, &fRow0);
164         }
165     }
166 
167     else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
168     {
169         OrderByRow swapRow = fOrderByQueue.top();
170         row1.setData(swapRow.fData);
171         copyRow(row, &row1);
172 
173         if (fDistinct)
174         {
175             fDistinctMap->erase(fOrderByQueue.top().fData);
176             fDistinctMap->insert(row1.getPointer());
177         }
178 
179         fOrderByQueue.pop();
180         fOrderByQueue.push(swapRow);
181     }
182 }
183 
184 /*
185  * The f() copies top element from an ordered queue into a row group. It
186  * does this backwards to syncronise sorting orientation with the server.
187  * The top row from the queue goes last into the returned set.
188  */
finalize()189 void LimitedOrderBy::finalize()
190 {
191     if (fUncommitedMemory > 0)
192     {
193         fMemSize += fUncommitedMemory;
194         if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
195         {
196             cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @"
197                  << __FILE__ << ":" << __LINE__;
198             throw IDBExcept(fErrorCode);
199         }
200         fUncommitedMemory = 0;
201     }
202 
203     queue<RGData> tempQueue;
204     if (fRowGroup.getRowCount() > 0)
205         fDataQueue.push(fData);
206 
207     if (fOrderByQueue.size() > 0)
208     {
209         // *DRRTUY Very memory intensive. CS needs to account active
210         // memory only and release memory if needed.
211         uint64_t memSizeInc = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
212         fMemSize += memSizeInc;
213 
214         if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
215         {
216             cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
217                  << " @" << __FILE__ << ":" << __LINE__;
218             throw IDBExcept(fErrorCode);
219         }
220 
221         uint64_t offset = 0;
222         uint64_t i = 0;
223         // Reduce queue size by an offset value if it applicable.
224         uint64_t queueSizeWoOffset = fOrderByQueue.size() > fStart ?
225             fOrderByQueue.size() - fStart : 0;
226         list<RGData> tempRGDataList;
227 
228         if ( fCount <= queueSizeWoOffset )
229         {
230             offset = fCount % fRowsPerRG;
231             if(!offset && fCount > 0)
232                 offset = fRowsPerRG;
233         }
234         else
235         {
236             offset = queueSizeWoOffset % fRowsPerRG;
237             if(!offset && queueSizeWoOffset > 0)
238                 offset = fRowsPerRG;
239         }
240 
241         list<RGData>::iterator tempListIter = tempRGDataList.begin();
242 
243         i = 0;
244         uint32_t rSize = fRow0.getSize();
245         uint64_t preLastRowNumb = fRowsPerRG - 1;
246         fData.reinit(fRowGroup, fRowsPerRG);
247         fRowGroup.setData(&fData);
248         fRowGroup.resetRowGroup(0);
249         // *DRRTUY This approach won't work with
250         // OFSET > fRowsPerRG
251         offset = offset != 0 ? offset - 1 : offset;
252         fRowGroup.getRow(offset, &fRow0);
253 
254         while ((fOrderByQueue.size() > fStart) && (i++ < fCount))
255         {
256             const OrderByRow& topRow = fOrderByQueue.top();
257             row1.setData(topRow.fData);
258             copyRow(row1, &fRow0);
259             fRowGroup.incRowCount();
260             offset--;
261             fRow0.prevRow(rSize);
262             fOrderByQueue.pop();
263 
264             // if RG has fRowsPerRG rows
265             if(offset == (uint64_t)-1)
266             {
267                 tempRGDataList.push_front(fData);
268                 fMemSize += memSizeInc;
269 
270                 if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
271                 {
272                     cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode)
273                          << " @" << __FILE__ << ":" << __LINE__;
274                     throw IDBExcept(fErrorCode);
275                 }
276 
277                 fData.reinit(fRowGroup, fRowsPerRG);
278                 fRowGroup.setData(&fData);
279                 fRowGroup.resetRowGroup(0); // ?
280                 fRowGroup.getRow(preLastRowNumb, &fRow0);
281                 offset = preLastRowNumb;
282             }
283         }
284         // Push the last/only group into the queue.
285         if (fRowGroup.getRowCount() > 0)
286             tempRGDataList.push_front(fData);
287 
288         for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++)
289             tempQueue.push(*tempListIter);
290 
291         fDataQueue = tempQueue;
292     }
293 }
294 
295 
toString() const296 const string LimitedOrderBy::toString() const
297 {
298     ostringstream oss;
299     oss << "OrderBy   cols: ";
300     vector<IdbSortSpec>::const_iterator i = fOrderByCond.begin();
301 
302     for (; i != fOrderByCond.end(); i++)
303         oss << "(" << i->fIndex << ","
304             << ((i->fAsc) ? "Asc" : "Desc") << ","
305             << ((i->fNf) ? "null first" : "null last") << ") ";
306 
307     oss << " start-" << fStart << " count-" << fCount;
308 
309     if (fDistinct)
310         oss << " distinct";
311 
312     oss << endl;
313 
314     return oss.str();
315 }
316 
317 
318 }
319 // vim:ts=4 sw=4:
320 
321