1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 //
19 // C++ Interface: TupleUnion
20 //
21 // Description:
22 //
23 //
24 // Author: Patrick <pleblanc@localhost.localdomain>, (C) 2009
25 //
26 // Copyright: See COPYING file that comes with this distribution
27 //
28 //
29 
30 #include "jobstep.h"
31 #ifndef _MSC_VER
32 #include <tr1/unordered_set>
33 #else
34 #include <unordered_set>
35 #endif
36 
37 #include "stlpoolallocator.h"
38 #include "threadnaming.h"
39 
40 #ifndef TUPLEUNION2_H_
41 #define TUPLEUNION2_H_
42 
43 namespace joblist
44 {
45 
46 
47 
48 class TupleUnion : public JobStep, public TupleDeliveryStep
49 {
50 public:
51     TupleUnion(execplan::CalpontSystemCatalog::OID tableOID, const JobInfo& jobInfo);
52     ~TupleUnion();
53 
54     void run();
55     void join();
56 
57     const std::string toString() const;
58     execplan::CalpontSystemCatalog::OID tableOid() const;
59 
60     void setInputRowGroups(const std::vector<rowgroup::RowGroup>&);
61     void setOutputRowGroup(const rowgroup::RowGroup&);
62     void setDistinctFlags(const std::vector<bool>&);
63 
getOutputRowGroup()64     const rowgroup::RowGroup& getOutputRowGroup() const
65     {
66         return outputRG;
67     }
getDeliveredRowGroup()68     const rowgroup::RowGroup& getDeliveredRowGroup() const
69     {
70         return outputRG;
71     }
deliverStringTableRowGroup(bool b)72     void  deliverStringTableRowGroup(bool b)
73     {
74         outputRG.setUseStringTable(b);
75     }
deliverStringTableRowGroup()76     bool  deliverStringTableRowGroup() const
77     {
78         return outputRG.usesStringTable();
79     }
80 
81     // @bug 598 for self-join
alias1()82     std::string alias1() const
83     {
84         return fAlias1;
85     }
alias1(const std::string & alias)86     void alias1(const std::string& alias)
87     {
88         fAlias = fAlias1 = alias;
89     }
alias2()90     std::string alias2() const
91     {
92         return fAlias2;
93     }
alias2(const std::string & alias)94     void alias2(const std::string& alias)
95     {
96         fAlias2 = alias;
97     }
98 
view1()99     std::string view1() const
100     {
101         return fView1;
102     }
view1(const std::string & vw)103     void view1(const std::string& vw)
104     {
105         fView = fView1 = vw;
106     }
view2()107     std::string view2() const
108     {
109         return fView2;
110     }
view2(const std::string & vw)111     void view2(const std::string& vw)
112     {
113         fView2 = vw;
114     }
115 
116     uint32_t nextBand(messageqcpp::ByteStream& bs);
117 
118 
119 private:
120 
121     struct RowPosition
122     {
123         uint64_t group : 48;
124         uint64_t row   : 16;
125 
groupRowPosition126         inline RowPosition(uint64_t i = 0, uint64_t j = 0) : group(i), row(j) {};
127         static const uint64_t normalizedFlag = 0x800000000000ULL;   // 48th bit is set
128     };
129 
130     void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data);
131     void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit,
132                      rowgroup::RGData& data);
133     void normalize(const rowgroup::Row& in, rowgroup::Row* out);
134     void writeNull(rowgroup::Row* out, uint32_t col);
135     void readInput(uint32_t);
136     void formatMiniStats();
137 
138     execplan::CalpontSystemCatalog::OID fTableOID;
139     // @bug 598 for self-join
140     std::string fAlias1;
141     std::string fAlias2;
142 
143     std::string fView1;
144     std::string fView2;
145 
146     rowgroup::RowGroup outputRG;
147     std::vector<rowgroup::RowGroup> inputRGs;
148     std::vector<RowGroupDL*> inputs;
149     RowGroupDL* output;
150     uint32_t outputIt;
151 
152     struct Runner
153     {
154         TupleUnion* tu;
155         uint32_t index;
RunnerRunner156         Runner(TupleUnion* t, uint32_t in) : tu(t), index(in) { }
operatorRunner157         void operator()()
158         {
159             utils::setThreadName("TUSRunner");
160             tu->readInput(index);
161         }
162     };
163     std::vector<uint64_t> runners; //thread pool handles
164 
165     struct Hasher
166     {
167         TupleUnion* ts;
168         utils::Hasher_r h;
HasherHasher169         Hasher(TupleUnion* t) : ts(t) { }
170         uint64_t operator()(const RowPosition&) const;
171     };
172     struct Eq
173     {
174         TupleUnion* ts;
EqEq175         Eq(TupleUnion* t) : ts(t) { }
176         bool operator()(const RowPosition&, const RowPosition&) const;
177     };
178 
179     typedef std::tr1::unordered_set<RowPosition, Hasher, Eq,
180             utils::STLPoolAllocator<RowPosition> > Uniquer_t;
181 
182     boost::scoped_ptr<Uniquer_t> uniquer;
183     std::vector<rowgroup::RGData> rowMemory;
184     boost::mutex sMutex, uniquerMutex;
185     uint64_t memUsage;
186     uint32_t rowLength;
187     rowgroup::Row row, row2;
188     std::vector<bool> distinctFlags;
189     ResourceManager* rm;
190     utils::STLPoolAllocator<RowPosition> allocator;
191     boost::scoped_array<rowgroup::RGData> normalizedData;
192 
193     uint32_t runnersDone;
194     uint32_t distinctCount;
195     uint32_t distinctDone;
196 
197     uint64_t fRowsReturned;
198 
199     // temporary hack to make sure JobList only calls run, join once
200     boost::mutex jlLock;
201     bool runRan, joinRan;
202 
203     boost::shared_ptr<int64_t> sessionMemLimit;
204     std::string fTimeZone;
205 };
206 
207 }
208 
209 #endif
210