1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //  $Id: tupleaggregatestep.h 9732 2013-08-02 15:56:15Z pleblanc $
20 
21 
22 #ifndef JOBLIST_TUPLEAGGREGATESTEP_H
23 #define JOBLIST_TUPLEAGGREGATESTEP_H
24 
25 #include "jobstep.h"
26 #include "rowaggregation.h"
27 #include "threadnaming.h"
28 
29 #include <boost/thread.hpp>
30 
31 
32 namespace joblist
33 {
34 
35 // forward reference
36 struct JobInfo;
37 
38 /** @brief class TupleAggregateStep
39  *
40  */
41 class TupleAggregateStep : public JobStep, public TupleDeliveryStep
42 {
43 public:
44     /** @brief TupleAggregateStep constructor
45      */
46     TupleAggregateStep(
47         const rowgroup::SP_ROWAGG_UM_t&,
48         const rowgroup::RowGroup&,
49         const rowgroup::RowGroup&,
50         const JobInfo&);
51 
52     /** @brief TupleAggregateStep destructor
53      */
54     ~TupleAggregateStep();
55 
56     /** @brief virtual void Run method
57      */
58     void run();
59     void join();
60 
61     const std::string toString() const;
62 
63     void setOutputRowGroup(const rowgroup::RowGroup&);
64     const rowgroup::RowGroup& getOutputRowGroup() const;
65     const rowgroup::RowGroup& getDeliveredRowGroup() const;
66     void  deliverStringTableRowGroup(bool b);
67     bool  deliverStringTableRowGroup() const;
68     uint32_t nextBand(messageqcpp::ByteStream& bs);
69     uint32_t nextBand_singleThread(messageqcpp::ByteStream& bs);
70     bool setPmHJAggregation(JobStep* step);
71     void savePmHJData(rowgroup::SP_ROWAGG_t&, rowgroup::SP_ROWAGG_t&, rowgroup::RowGroup&);
72 
umOnly()73     bool umOnly() const
74     {
75         return fUmOnly;
76     }
umOnly(bool b)77     void umOnly(bool b)
78     {
79         fUmOnly = b;
80     }
81 
82     void configDeliveredRowGroup(const JobInfo&);
83     //void setEidMap(std::map<int, int>& m) { fIndexEidMap = m; }
84 
85     static SJSTEP prepAggregate(SJSTEP&, JobInfo&);
86 
87     // for multi-thread variables
88     void initializeMultiThread();
89 
90 private:
91     static void prep1PhaseDistinctAggregate(
92         JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&);
93     static void prep1PhaseAggregate(
94         JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&);
95     static void prep2PhasesAggregate(
96         JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&);
97     static void prep2PhasesDistinctAggregate(
98         JobInfo&, std::vector<rowgroup::RowGroup>&, std::vector<rowgroup::SP_ROWAGG_t>&);
99 
100     void prepExpressionOnAggregate(rowgroup::SP_ROWAGG_UM_t&, JobInfo&);
101     void addConstangAggregate(std::vector<rowgroup::ConstantAggData>&);
102 
103     void doAggregate();
104     void doAggregate_singleThread();
105     uint64_t doThreadedAggregate(messageqcpp::ByteStream& bs, RowGroupDL* dlp);
106     void aggregateRowGroups();
107     void threadedAggregateRowGroups(uint32_t threadID);
108     void threadedAggregateFinalize(uint32_t threadID);
109     void doThreadedSecondPhaseAggregate(uint32_t threadID);
110     bool nextDeliveredRowGroup();
111     void pruneAuxColumns();
112     void formatMiniStats();
113     void printCalTrace();
114 
115     boost::shared_ptr<execplan::CalpontSystemCatalog>fCatalog;
116     uint64_t fRowsReturned;
117     bool     fDoneAggregate;
118     bool     fEndOfResult;
119 
120     rowgroup::SP_ROWAGG_UM_t fAggregator;
121     rowgroup::RowGroup fRowGroupOut;
122     rowgroup::RowGroup fRowGroupDelivered;
123     rowgroup::RGData fRowGroupData;
124 
125     // for setting aggregate column eid in delivered rowgroup
126     //std::map<int, int> fIndexEidMap;
127 
128     // data from RowGroupDL
129     rowgroup::RowGroup fRowGroupIn;
130 
131     // for PM HashJoin
132     // PM hashjoin is selected at runtime, prepare for it anyway.
133     rowgroup::SP_ROWAGG_UM_t fAggregatorUM;
134     rowgroup::SP_ROWAGG_PM_t fAggregatorPM;
135     rowgroup::RowGroup fRowGroupPMHJ;
136 
137     // for run thread (first added for union)
138     class Aggregator
139     {
140     public:
Aggregator(TupleAggregateStep * step)141         Aggregator(TupleAggregateStep* step) : fStep(step) { }
operator()142         void operator()()
143         {
144             utils::setThreadName("TASAggr");
145             fStep->doAggregate();
146         }
147 
148         TupleAggregateStep* fStep;
149     };
150 
151     class ThreadedAggregator
152     {
153     public:
ThreadedAggregator(TupleAggregateStep * step,uint32_t threadID)154         ThreadedAggregator(TupleAggregateStep* step, uint32_t threadID) :
155             fStep(step),
156             fThreadID(threadID)
157         {}
operator()158         void operator()()
159         {
160             std::string t{"TASThrAggr"};
161             t.append(std::to_string(fThreadID));
162             utils::setThreadName(t.c_str());
163             fStep->threadedAggregateRowGroups(fThreadID);
164         }
165 
166         TupleAggregateStep* fStep;
167         uint32_t fThreadID;
168     };
169 
170     class ThreadedAggregateFinalizer
171     {
172     public:
ThreadedAggregateFinalizer(TupleAggregateStep * step,uint32_t threadID)173         ThreadedAggregateFinalizer(TupleAggregateStep* step, uint32_t threadID) :
174             fStep(step),
175             fThreadID(threadID)
176         {}
177 
operator()178         void operator()()
179         {
180             std::string t{"TASThrFin"};
181             t.append(std::to_string(fThreadID));
182             utils::setThreadName(t.c_str());
183             fStep->threadedAggregateFinalize(fThreadID);
184         }
185 
186         TupleAggregateStep* fStep;
187         uint32_t fThreadID;
188     };
189 
190     class ThreadedSecondPhaseAggregator
191     {
192     public:
ThreadedSecondPhaseAggregator(TupleAggregateStep * step,uint32_t threadID,uint32_t bucketsPerThread)193         ThreadedSecondPhaseAggregator(TupleAggregateStep* step, uint32_t threadID, uint32_t bucketsPerThread) :
194             fStep(step),
195             fThreadID(threadID),
196             bucketCount(bucketsPerThread)
197         {
198         }
operator()199         void operator()()
200         {
201             utils::setThreadName("TASThr2ndPAggr");
202             for (uint32_t i = 0; i < bucketCount; i++)
203                 fStep->doThreadedSecondPhaseAggregate(fThreadID + i);
204         }
205         TupleAggregateStep* fStep;
206         uint32_t fThreadID;
207         uint32_t bucketCount;
208     };
209 
210     uint64_t fRunner; // thread pool handle
211     bool fUmOnly;
212     ResourceManager* fRm;
213 
214     // multi-threaded
215     uint32_t fNumOfThreads;
216     uint32_t fNumOfBuckets;
217     uint32_t fNumOfRowGroups;
218     uint32_t fBucketNum;
219 
220     boost::mutex fMutex;
221     std::vector<boost::mutex*> fAgg_mutex;
222     std::vector<rowgroup::RGData > fRowGroupDatas;
223     std::vector<rowgroup::SP_ROWAGG_UM_t> fAggregators;
224     std::vector<rowgroup::RowGroup> fRowGroupIns;
225     std::vector<rowgroup::RowGroup> fRowGroupOuts;
226     std::vector<std::vector<rowgroup::RGData> > fRowGroupsDeliveredData;
227     bool fIsMultiThread;
228     int fInputIter; // iterator
229     boost::scoped_array<uint64_t> fMemUsage;
230 
231     boost::shared_ptr<int64_t> fSessionMemLimit;
232 };
233 
234 
235 } // namespace
236 
237 #endif  // JOBLIST_TUPLEAGGREGATESTEP_H
238 
239 // vim:ts=4 sw=4:
240