1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //  $Id: subquerystep.h 6370 2010-03-18 02:58:09Z xlou $
19 
20 
21 /** @file */
22 
23 #ifndef SUBQUERY_STEP_H
24 #define SUBQUERY_STEP_H
25 
26 #include <boost/scoped_ptr.hpp>
27 #include <boost/shared_ptr.hpp>
28 #include <boost/shared_array.hpp>
29 #include <boost/thread.hpp>
30 
31 #include "jobstep.h"
32 #include "joblist.h"
33 #include "funcexpwrapper.h"
34 #include "threadnaming.h"
35 
36 namespace joblist
37 {
38 struct JobInfo;
39 
40 class SubQueryStep : public JobStep
41 {
42 
43 public:
44     /** @brief SubQueryStep constructor
45      */
46     SubQueryStep(const JobInfo&);
47 
48     /** @brief SubQueryStep destructor
49      */
50     ~SubQueryStep();
51 
52     /** @brief virtual void run method
53      */
54     void run();
55 
56     /** @brief virtual void join method
57      */
58     void join();
59 
60     /** @brief virtual void abort method
61      */
62     void abort();
63 
64     /** @brief virtual get table OID
65      *  @returns OID
66      */
tableOid()67     execplan::CalpontSystemCatalog::OID tableOid() const
68     {
69         return fTableOid;
70     }
71 
72     /** @brief virtual set table OID
73      */
tableOid(const execplan::CalpontSystemCatalog::OID & id)74     void tableOid(const execplan::CalpontSystemCatalog::OID& id)
75     {
76         fTableOid = id;
77     }
78 
79     /** @brief virtual output info to a string
80      *  @returns string
81      */
82     const std::string toString() const;
83 
84     /** @brief virtual set the output rowgroup
85      */
setOutputRowGroup(const rowgroup::RowGroup & rg)86     virtual void setOutputRowGroup(const rowgroup::RowGroup& rg)
87     {
88         fOutputRowGroup = rg;
89     }
90 
91     /** @brief virtual get the output rowgroup
92      *  @returns RowGroup
93      */
getOutputRowGroup()94     virtual const rowgroup::RowGroup& getOutputRowGroup() const
95     {
96         return fOutputRowGroup;
97     }
98 
99     /** @brief virtual set the sub-query's joblist
100      */
subJoblist(const STJLP & sjl)101     virtual void subJoblist(const STJLP& sjl)
102     {
103         fSubJobList = sjl;
104     }
105 
106     /** @brief virtual get the sub-query's joblist
107      *  @returns boost::shared_ptr<TupleJobList>
108      */
subJoblist()109     virtual const STJLP& subJoblist() const
110     {
111         return fSubJobList;
112     }
113 
114 
115 protected:
116     uint64_t                                         fRowsReturned;
117 
118     execplan::CalpontSystemCatalog::OID              fTableOid;
119     std::vector<execplan::CalpontSystemCatalog::OID> fColumnOids;
120     rowgroup::RowGroup                               fOutputRowGroup;
121     STJLP                                            fSubJobList;
122 
123     boost::scoped_ptr<boost::thread>                 fRunner;
124 };
125 
126 
127 class SubAdapterStep : public JobStep, public TupleDeliveryStep
128 {
129 
130 public:
131     /** @brief SubAdapterStep constructor
132      */
133     SubAdapterStep(SJSTEP& s, const JobInfo&);
134 
135     /** @brief SubAdapterStep destructor
136      */
137     ~SubAdapterStep();
138 
139     /** @brief virtual void run method
140      */
141     void run();
142 
143     /** @brief virtual void join method
144      */
145     void join();
146 
147     /** @brief virtual void abort method
148      */
149     void abort();
150 
151     /** @brief virtual get table OID
152      *  @returns OID
153      */
tableOid()154     execplan::CalpontSystemCatalog::OID tableOid() const
155     {
156         return fTableOid;
157     }
158 
159     /** @brief virtual set table OID
160      */
tableOid(const execplan::CalpontSystemCatalog::OID & id)161     void tableOid(const execplan::CalpontSystemCatalog::OID& id)
162     {
163         fTableOid = id;
164     }
165 
166     /** @brief virtual output info to a string
167      *  @returns string
168      */
169     const std::string toString() const;
170 
171     /** @brief virtual set the output rowgroup
172      */
173     void setOutputRowGroup(const rowgroup::RowGroup& rg);
174 
175     /** @brief virtual get the output rowgroup
176      *  @returns RowGroup
177      */
getOutputRowGroup()178     const rowgroup::RowGroup& getOutputRowGroup() const
179     {
180         return fRowGroupOut;
181     }
182 
183     /** @brief TupleDeliveryStep's pure virtual methods nextBand
184      *  @returns row count
185      */
186     uint32_t nextBand(messageqcpp::ByteStream& bs);
187 
188     /** @brief Delivered Row Group
189      *  @returns RowGroup
190      */
getDeliveredRowGroup()191     const rowgroup::RowGroup& getDeliveredRowGroup() const
192     {
193         return fRowGroupDeliver;
194     }
195 
196     /** @brief Turn on/off string table delivery
197      */
198     void  deliverStringTableRowGroup(bool b);
199 
200     /** @brief Check useStringTable flag on delivered RowGroup
201      *  @returns boolean
202      */
203     bool  deliverStringTableRowGroup() const;
204 
205     /** @brief set the rowgroup for FE to work on
206      */
207     void setFeRowGroup(const rowgroup::RowGroup& rg);
208 
209     /** @brief get the rowgroup for FE
210      *  @returns RowGroup
211      */
getFeRowGroup()212     const rowgroup::RowGroup& getFeRowGroup() const
213     {
214         return fRowGroupFe;
215     }
216 
217     /** @brief get subquery step
218      */
subStep()219     const SJSTEP& subStep() const
220     {
221         return fSubStep;
222     }
223 
224     /** @brief add filters (expression steps)
225      */
226     void addExpression(const JobStepVector&, JobInfo&);
227 
228     /** @brief add function columns (returned columns)
229      */
230     void addExpression(const std::vector<execplan::SRCP>&);
231 
232     /** @brief add function join expresssion
233      */
234     void addFcnJoinExp(const std::vector<execplan::SRCP>&);
235 
236 
237 protected:
238     void execute();
239     void outputRow(rowgroup::Row&, rowgroup::Row&);
240     void checkDupOutputColumns();
241     void dupOutputColumns(rowgroup::Row&);
242     void printCalTrace();
243     void formatMiniStats();
244 
245     execplan::CalpontSystemCatalog::OID              fTableOid;
246     rowgroup::RowGroup                               fRowGroupIn;
247     rowgroup::RowGroup                               fRowGroupOut;
248     rowgroup::RowGroup                               fRowGroupFe;
249     rowgroup::RowGroup                               fRowGroupDeliver;
250     SJSTEP                                           fSubStep;
251     uint64_t                                         fRowsInput;
252     uint64_t                                         fRowsReturned;
253     bool                                             fEndOfResult;
254     boost::shared_array<int>                         fIndexMap;
255     std::vector<std::pair<uint32_t, uint32_t> >      fDupColumns;
256 
257     RowGroupDL*                                      fInputDL;
258     RowGroupDL*                                      fOutputDL;
259     uint64_t                                         fInputIterator;
260     uint64_t                                         fOutputIterator;
261 
262     class Runner
263     {
264     public:
Runner(SubAdapterStep * step)265         Runner(SubAdapterStep* step) : fStep(step) { }
operator()266         void operator()()
267         {
268             utils::setThreadName("SQSRunner");
269             fStep->execute();
270         }
271 
272         SubAdapterStep* fStep;
273     };
274     uint64_t				                         fRunner; // thread pool handle
275 
276     boost::scoped_ptr<funcexp::FuncExpWrapper>       fExpression;
277 };
278 
279 
280 }
281 
282 #endif  // SUBQUERY_STEP_H
283 // vim:ts=4 sw=4:
284 
285