1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 // $Id: subquerystep.h 6370 2010-03-18 02:58:09Z xlou $ 19 20 21 /** @file */ 22 23 #ifndef SUBQUERY_STEP_H 24 #define SUBQUERY_STEP_H 25 26 #include <boost/scoped_ptr.hpp> 27 #include <boost/shared_ptr.hpp> 28 #include <boost/shared_array.hpp> 29 #include <boost/thread.hpp> 30 31 #include "jobstep.h" 32 #include "joblist.h" 33 #include "funcexpwrapper.h" 34 #include "threadnaming.h" 35 36 namespace joblist 37 { 38 struct JobInfo; 39 40 class SubQueryStep : public JobStep 41 { 42 43 public: 44 /** @brief SubQueryStep constructor 45 */ 46 SubQueryStep(const JobInfo&); 47 48 /** @brief SubQueryStep destructor 49 */ 50 ~SubQueryStep(); 51 52 /** @brief virtual void run method 53 */ 54 void run(); 55 56 /** @brief virtual void join method 57 */ 58 void join(); 59 60 /** @brief virtual void abort method 61 */ 62 void abort(); 63 64 /** @brief virtual get table OID 65 * @returns OID 66 */ tableOid()67 execplan::CalpontSystemCatalog::OID tableOid() const 68 { 69 return fTableOid; 70 } 71 72 /** @brief virtual set table OID 73 */ tableOid(const execplan::CalpontSystemCatalog::OID & id)74 void tableOid(const execplan::CalpontSystemCatalog::OID& id) 75 { 76 fTableOid = id; 77 } 78 79 /** @brief virtual output info to a string 80 * @returns string 81 */ 82 const std::string toString() const; 83 84 /** @brief virtual set the output rowgroup 85 */ setOutputRowGroup(const rowgroup::RowGroup & rg)86 virtual void setOutputRowGroup(const rowgroup::RowGroup& rg) 87 { 88 fOutputRowGroup = rg; 89 } 90 91 /** @brief virtual get the output rowgroup 92 * @returns RowGroup 93 */ getOutputRowGroup()94 virtual const rowgroup::RowGroup& getOutputRowGroup() const 95 { 96 return fOutputRowGroup; 97 } 98 99 /** @brief virtual set the sub-query's joblist 100 */ subJoblist(const STJLP & sjl)101 virtual void subJoblist(const STJLP& sjl) 102 { 103 fSubJobList = sjl; 104 } 105 106 /** @brief virtual get the sub-query's joblist 107 * @returns boost::shared_ptr<TupleJobList> 108 */ subJoblist()109 virtual const STJLP& subJoblist() const 110 { 111 return fSubJobList; 112 } 113 114 115 protected: 116 uint64_t fRowsReturned; 117 118 execplan::CalpontSystemCatalog::OID fTableOid; 119 std::vector<execplan::CalpontSystemCatalog::OID> fColumnOids; 120 rowgroup::RowGroup fOutputRowGroup; 121 STJLP fSubJobList; 122 123 boost::scoped_ptr<boost::thread> fRunner; 124 }; 125 126 127 class SubAdapterStep : public JobStep, public TupleDeliveryStep 128 { 129 130 public: 131 /** @brief SubAdapterStep constructor 132 */ 133 SubAdapterStep(SJSTEP& s, const JobInfo&); 134 135 /** @brief SubAdapterStep destructor 136 */ 137 ~SubAdapterStep(); 138 139 /** @brief virtual void run method 140 */ 141 void run(); 142 143 /** @brief virtual void join method 144 */ 145 void join(); 146 147 /** @brief virtual void abort method 148 */ 149 void abort(); 150 151 /** @brief virtual get table OID 152 * @returns OID 153 */ tableOid()154 execplan::CalpontSystemCatalog::OID tableOid() const 155 { 156 return fTableOid; 157 } 158 159 /** @brief virtual set table OID 160 */ tableOid(const execplan::CalpontSystemCatalog::OID & id)161 void tableOid(const execplan::CalpontSystemCatalog::OID& id) 162 { 163 fTableOid = id; 164 } 165 166 /** @brief virtual output info to a string 167 * @returns string 168 */ 169 const std::string toString() const; 170 171 /** @brief virtual set the output rowgroup 172 */ 173 void setOutputRowGroup(const rowgroup::RowGroup& rg); 174 175 /** @brief virtual get the output rowgroup 176 * @returns RowGroup 177 */ getOutputRowGroup()178 const rowgroup::RowGroup& getOutputRowGroup() const 179 { 180 return fRowGroupOut; 181 } 182 183 /** @brief TupleDeliveryStep's pure virtual methods nextBand 184 * @returns row count 185 */ 186 uint32_t nextBand(messageqcpp::ByteStream& bs); 187 188 /** @brief Delivered Row Group 189 * @returns RowGroup 190 */ getDeliveredRowGroup()191 const rowgroup::RowGroup& getDeliveredRowGroup() const 192 { 193 return fRowGroupDeliver; 194 } 195 196 /** @brief Turn on/off string table delivery 197 */ 198 void deliverStringTableRowGroup(bool b); 199 200 /** @brief Check useStringTable flag on delivered RowGroup 201 * @returns boolean 202 */ 203 bool deliverStringTableRowGroup() const; 204 205 /** @brief set the rowgroup for FE to work on 206 */ 207 void setFeRowGroup(const rowgroup::RowGroup& rg); 208 209 /** @brief get the rowgroup for FE 210 * @returns RowGroup 211 */ getFeRowGroup()212 const rowgroup::RowGroup& getFeRowGroup() const 213 { 214 return fRowGroupFe; 215 } 216 217 /** @brief get subquery step 218 */ subStep()219 const SJSTEP& subStep() const 220 { 221 return fSubStep; 222 } 223 224 /** @brief add filters (expression steps) 225 */ 226 void addExpression(const JobStepVector&, JobInfo&); 227 228 /** @brief add function columns (returned columns) 229 */ 230 void addExpression(const std::vector<execplan::SRCP>&); 231 232 /** @brief add function join expresssion 233 */ 234 void addFcnJoinExp(const std::vector<execplan::SRCP>&); 235 236 237 protected: 238 void execute(); 239 void outputRow(rowgroup::Row&, rowgroup::Row&); 240 void checkDupOutputColumns(); 241 void dupOutputColumns(rowgroup::Row&); 242 void printCalTrace(); 243 void formatMiniStats(); 244 245 execplan::CalpontSystemCatalog::OID fTableOid; 246 rowgroup::RowGroup fRowGroupIn; 247 rowgroup::RowGroup fRowGroupOut; 248 rowgroup::RowGroup fRowGroupFe; 249 rowgroup::RowGroup fRowGroupDeliver; 250 SJSTEP fSubStep; 251 uint64_t fRowsInput; 252 uint64_t fRowsReturned; 253 bool fEndOfResult; 254 boost::shared_array<int> fIndexMap; 255 std::vector<std::pair<uint32_t, uint32_t> > fDupColumns; 256 257 RowGroupDL* fInputDL; 258 RowGroupDL* fOutputDL; 259 uint64_t fInputIterator; 260 uint64_t fOutputIterator; 261 262 class Runner 263 { 264 public: Runner(SubAdapterStep * step)265 Runner(SubAdapterStep* step) : fStep(step) { } operator()266 void operator()() 267 { 268 utils::setThreadName("SQSRunner"); 269 fStep->execute(); 270 } 271 272 SubAdapterStep* fStep; 273 }; 274 uint64_t fRunner; // thread pool handle 275 276 boost::scoped_ptr<funcexp::FuncExpWrapper> fExpression; 277 }; 278 279 280 } 281 282 #endif // SUBQUERY_STEP_H 283 // vim:ts=4 sw=4: 284 285