1 /* Copyright (C) 2014 InfiniDB, Inc. 2 Copyright (C) 2019 MariaDB Corporation. 3 4 This program is free software; you can redistribute it and/or 5 modify it under the terms of the GNU General Public License 6 as published by the Free Software Foundation; version 2 of 7 the License. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 17 MA 02110-1301, USA. */ 18 19 // $Id: tupleaggregatestep.h 9732 2013-08-02 15:56:15Z pleblanc $ 20 21 22 #ifndef JOBLIST_TUPLEAGGREGATESTEP_H 23 #define JOBLIST_TUPLEAGGREGATESTEP_H 24 25 #include "jobstep.h" 26 #include "rowaggregation.h" 27 #include "threadnaming.h" 28 29 #include <boost/thread.hpp> 30 31 32 namespace joblist 33 { 34 35 // forward reference 36 struct JobInfo; 37 38 /** @brief class TupleAggregateStep 39 * 40 */ 41 class TupleAggregateStep : public JobStep, public TupleDeliveryStep 42 { 43 public: 44 /** @brief TupleAggregateStep constructor 45 */ 46 TupleAggregateStep( 47 const rowgroup::SP_ROWAGG_UM_t&, 48 const rowgroup::RowGroup&, 49 const rowgroup::RowGroup&, 50 const JobInfo&); 51 52 /** @brief TupleAggregateStep destructor 53 */ 54 ~TupleAggregateStep(); 55 56 /** @brief virtual void Run method 57 */ 58 void run(); 59 void join(); 60 61 const std::string toString() const; 62 63 void setOutputRowGroup(const rowgroup::RowGroup&); 64 const rowgroup::RowGroup& getOutputRowGroup() const; 65 const rowgroup::RowGroup& getDeliveredRowGroup() const; 66 void deliverStringTableRowGroup(bool b); 67 bool deliverStringTableRowGroup() const; 68 uint32_t nextBand(messageqcpp::ByteStream& bs); 69 uint32_t nextBand_singleThread(messageqcpp::ByteStream& bs); 70 bool setPmHJAggregation(JobStep* step); 71 void savePmHJData(rowgroup::SP_ROWAGG_t&, rowgroup::SP_ROWAGG_t&, rowgroup::RowGroup&); 72 umOnly()73 bool umOnly() const 74 { 75 return fUmOnly; 76 } umOnly(bool b)77 void umOnly(bool b) 78 { 79 fUmOnly = b; 80 } 81 82 void configDeliveredRowGroup(const JobInfo&); 83 //void setEidMap(std::map<int, int>& m) { fIndexEidMap = m; } 84 85 static SJSTEP prepAggregate(SJSTEP&, JobInfo&); 86 87 // for multi-thread variables 88 void initializeMultiThread(); 89 90 private: 91 static void prep1PhaseDistinctAggregate( 92 JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&); 93 static void prep1PhaseAggregate( 94 JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&); 95 static void prep2PhasesAggregate( 96 JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&); 97 static void prep2PhasesDistinctAggregate( 98 JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&); 99 100 void prepExpressionOnAggregate(rowgroup::SP_ROWAGG_UM_t&, JobInfo&); 101 void addConstangAggregate(std::vector<rowgroup::ConstantAggData>&); 102 103 void doAggregate(); 104 void doAggregate_singleThread(); 105 uint64_t doThreadedAggregate(messageqcpp::ByteStream& bs, RowGroupDL* dlp); 106 void aggregateRowGroups(); 107 void threadedAggregateRowGroups(uint32_t threadID); 108 void threadedAggregateFinalize(uint32_t threadID); 109 void doThreadedSecondPhaseAggregate(uint32_t threadID); 110 bool nextDeliveredRowGroup(); 111 void pruneAuxColumns(); 112 void formatMiniStats(); 113 void printCalTrace(); 114 115 boost::shared_ptr<execplan::CalpontSystemCatalog>fCatalog; 116 uint64_t fRowsReturned; 117 bool fDoneAggregate; 118 bool fEndOfResult; 119 120 rowgroup::SP_ROWAGG_UM_t fAggregator; 121 rowgroup::RowGroup fRowGroupOut; 122 rowgroup::RowGroup fRowGroupDelivered; 123 rowgroup::RGData fRowGroupData; 124 125 // for setting aggregate column eid in delivered rowgroup 126 //std::map<int, int> fIndexEidMap; 127 128 // data from RowGroupDL 129 rowgroup::RowGroup fRowGroupIn; 130 131 // for PM HashJoin 132 // PM hashjoin is selected at runtime, prepare for it anyway. 133 rowgroup::SP_ROWAGG_UM_t fAggregatorUM; 134 rowgroup::SP_ROWAGG_PM_t fAggregatorPM; 135 rowgroup::RowGroup fRowGroupPMHJ; 136 137 // for run thread (first added for union) 138 class Aggregator 139 { 140 public: Aggregator(TupleAggregateStep * step)141 Aggregator(TupleAggregateStep* step) : fStep(step) { } operator()142 void operator()() 143 { 144 utils::setThreadName("TASAggr"); 145 fStep->doAggregate(); 146 } 147 148 TupleAggregateStep* fStep; 149 }; 150 151 class ThreadedAggregator 152 { 153 public: ThreadedAggregator(TupleAggregateStep * step,uint32_t threadID)154 ThreadedAggregator(TupleAggregateStep* step, uint32_t threadID) : 155 fStep(step), 156 fThreadID(threadID) 157 {} operator()158 void operator()() 159 { 160 std::string t{"TASThrAggr"}; 161 t.append(std::to_string(fThreadID)); 162 utils::setThreadName(t.c_str()); 163 fStep->threadedAggregateRowGroups(fThreadID); 164 } 165 166 TupleAggregateStep* fStep; 167 uint32_t fThreadID; 168 }; 169 170 class ThreadedAggregateFinalizer 171 { 172 public: ThreadedAggregateFinalizer(TupleAggregateStep * step,uint32_t threadID)173 ThreadedAggregateFinalizer(TupleAggregateStep* step, uint32_t threadID) : 174 fStep(step), 175 fThreadID(threadID) 176 {} 177 operator()178 void operator()() 179 { 180 std::string t{"TASThrFin"}; 181 t.append(std::to_string(fThreadID)); 182 utils::setThreadName(t.c_str()); 183 fStep->threadedAggregateFinalize(fThreadID); 184 } 185 186 TupleAggregateStep* fStep; 187 uint32_t fThreadID; 188 }; 189 190 class ThreadedSecondPhaseAggregator 191 { 192 public: ThreadedSecondPhaseAggregator(TupleAggregateStep * step,uint32_t threadID,uint32_t bucketsPerThread)193 ThreadedSecondPhaseAggregator(TupleAggregateStep* step, uint32_t threadID, uint32_t bucketsPerThread) : 194 fStep(step), 195 fThreadID(threadID), 196 bucketCount(bucketsPerThread) 197 { 198 } operator()199 void operator()() 200 { 201 utils::setThreadName("TASThr2ndPAggr"); 202 for (uint32_t i = 0; i < bucketCount; i++) 203 fStep->doThreadedSecondPhaseAggregate(fThreadID + i); 204 } 205 TupleAggregateStep* fStep; 206 uint32_t fThreadID; 207 uint32_t bucketCount; 208 }; 209 210 uint64_t fRunner; // thread pool handle 211 bool fUmOnly; 212 ResourceManager* fRm; 213 214 // multi-threaded 215 uint32_t fNumOfThreads; 216 uint32_t fNumOfBuckets; 217 uint32_t fNumOfRowGroups; 218 uint32_t fBucketNum; 219 220 boost::mutex fMutex; 221 std::vector<boost::mutex*> fAgg_mutex; 222 std::vector<rowgroup::RGData > fRowGroupDatas; 223 std::vector<rowgroup::SP_ROWAGG_UM_t> fAggregators; 224 std::vector<rowgroup::RowGroup> fRowGroupIns; 225 std::vector<rowgroup::RowGroup> fRowGroupOuts; 226 std::vector<std::vector<rowgroup::RGData> > fRowGroupsDeliveredData; 227 bool fIsMultiThread; 228 int fInputIter; // iterator 229 boost::scoped_array<uint64_t> fMemUsage; 230 231 boost::shared_ptr<int64_t> fSessionMemLimit; 232 }; 233 234 235 } // namespace 236 237 #endif // JOBLIST_TUPLEAGGREGATESTEP_H 238 239 // vim:ts=4 sw=4: 240