1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 // 19 // C++ Interface: TupleUnion 20 // 21 // Description: 22 // 23 // 24 // Author: Patrick <pleblanc@localhost.localdomain>, (C) 2009 25 // 26 // Copyright: See COPYING file that comes with this distribution 27 // 28 // 29 30 #include "jobstep.h" 31 #ifndef _MSC_VER 32 #include <tr1/unordered_set> 33 #else 34 #include <unordered_set> 35 #endif 36 37 #include "stlpoolallocator.h" 38 #include "threadnaming.h" 39 40 #ifndef TUPLEUNION2_H_ 41 #define TUPLEUNION2_H_ 42 43 namespace joblist 44 { 45 46 47 48 class TupleUnion : public JobStep, public TupleDeliveryStep 49 { 50 public: 51 TupleUnion(execplan::CalpontSystemCatalog::OID tableOID, const JobInfo& jobInfo); 52 ~TupleUnion(); 53 54 void run(); 55 void join(); 56 57 const std::string toString() const; 58 execplan::CalpontSystemCatalog::OID tableOid() const; 59 60 void setInputRowGroups(const std::vector<rowgroup::RowGroup>&); 61 void setOutputRowGroup(const rowgroup::RowGroup&); 62 void setDistinctFlags(const std::vector<bool>&); 63 getOutputRowGroup()64 const rowgroup::RowGroup& getOutputRowGroup() const 65 { 66 return outputRG; 67 } getDeliveredRowGroup()68 const rowgroup::RowGroup& getDeliveredRowGroup() const 69 { 70 return outputRG; 71 } deliverStringTableRowGroup(bool b)72 void deliverStringTableRowGroup(bool b) 73 { 74 outputRG.setUseStringTable(b); 75 } deliverStringTableRowGroup()76 bool deliverStringTableRowGroup() const 77 { 78 return outputRG.usesStringTable(); 79 } 80 81 // @bug 598 for self-join alias1()82 std::string alias1() const 83 { 84 return fAlias1; 85 } alias1(const std::string & alias)86 void alias1(const std::string& alias) 87 { 88 fAlias = fAlias1 = alias; 89 } alias2()90 std::string alias2() const 91 { 92 return fAlias2; 93 } alias2(const std::string & alias)94 void alias2(const std::string& alias) 95 { 96 fAlias2 = alias; 97 } 98 view1()99 std::string view1() const 100 { 101 return fView1; 102 } view1(const std::string & vw)103 void view1(const std::string& vw) 104 { 105 fView = fView1 = vw; 106 } view2()107 std::string view2() const 108 { 109 return fView2; 110 } view2(const std::string & vw)111 void view2(const std::string& vw) 112 { 113 fView2 = vw; 114 } 115 116 uint32_t nextBand(messageqcpp::ByteStream& bs); 117 118 119 private: 120 121 struct RowPosition 122 { 123 uint64_t group : 48; 124 uint64_t row : 16; 125 groupRowPosition126 inline RowPosition(uint64_t i = 0, uint64_t j = 0) : group(i), row(j) {}; 127 static const uint64_t normalizedFlag = 0x800000000000ULL; // 48th bit is set 128 }; 129 130 void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data); 131 void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, 132 rowgroup::RGData& data); 133 void normalize(const rowgroup::Row& in, rowgroup::Row* out); 134 void writeNull(rowgroup::Row* out, uint32_t col); 135 void readInput(uint32_t); 136 void formatMiniStats(); 137 138 execplan::CalpontSystemCatalog::OID fTableOID; 139 // @bug 598 for self-join 140 std::string fAlias1; 141 std::string fAlias2; 142 143 std::string fView1; 144 std::string fView2; 145 146 rowgroup::RowGroup outputRG; 147 std::vector<rowgroup::RowGroup> inputRGs; 148 std::vector<RowGroupDL*> inputs; 149 RowGroupDL* output; 150 uint32_t outputIt; 151 152 struct Runner 153 { 154 TupleUnion* tu; 155 uint32_t index; RunnerRunner156 Runner(TupleUnion* t, uint32_t in) : tu(t), index(in) { } operatorRunner157 void operator()() 158 { 159 utils::setThreadName("TUSRunner"); 160 tu->readInput(index); 161 } 162 }; 163 std::vector<uint64_t> runners; //thread pool handles 164 165 struct Hasher 166 { 167 TupleUnion* ts; 168 utils::Hasher_r h; HasherHasher169 Hasher(TupleUnion* t) : ts(t) { } 170 uint64_t operator()(const RowPosition&) const; 171 }; 172 struct Eq 173 { 174 TupleUnion* ts; EqEq175 Eq(TupleUnion* t) : ts(t) { } 176 bool operator()(const RowPosition&, const RowPosition&) const; 177 }; 178 179 typedef std::tr1::unordered_set<RowPosition, Hasher, Eq, 180 utils::STLPoolAllocator<RowPosition> > Uniquer_t; 181 182 boost::scoped_ptr<Uniquer_t> uniquer; 183 std::vector<rowgroup::RGData> rowMemory; 184 boost::mutex sMutex, uniquerMutex; 185 uint64_t memUsage; 186 uint32_t rowLength; 187 rowgroup::Row row, row2; 188 std::vector<bool> distinctFlags; 189 ResourceManager* rm; 190 utils::STLPoolAllocator<RowPosition> allocator; 191 boost::scoped_array<rowgroup::RGData> normalizedData; 192 193 uint32_t runnersDone; 194 uint32_t distinctCount; 195 uint32_t distinctDone; 196 197 uint64_t fRowsReturned; 198 199 // temporary hack to make sure JobList only calls run, join once 200 boost::mutex jlLock; 201 bool runRan, joinRan; 202 203 boost::shared_ptr<int64_t> sessionMemLimit; 204 std::string fTimeZone; 205 }; 206 207 } 208 209 #endif 210