1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19  *   $Id: sm.h 9254 2013-02-04 19:40:31Z rdempsey $
20  *
21  ***********************************************************************/
22 /** @file */
23 
24 #ifndef SM_H__
25 #define SM_H__
26 
27 #include <stdint.h>
28 #include <set>
29 #include <map>
30 #include <string>
31 #include <sys/time.h>
32 #include <iostream>
33 
34 #include "calpontsystemcatalog.h"
35 #include "clientrotator.h"
36 #include "rowgroup.h"
37 #include "calpontselectexecutionplan.h"
38 #include "querystats.h"
39 
40 #define IDB_SM_DEBUG 0
41 #define IDB_SM_PROFILE 0
42 
43 #if defined(_MSC_VER) && defined(xxxDLLEXPORT)
44 #define EXPORT __declspec(dllexport)
45 #else
46 #define EXPORT
47 #endif
48 
49 #if IDB_SM_PROFILE
50 #include <sys/time.h>
51 #define GET_PF_TIME(n) {gettimeofday(&n, NULL);}
52 #else
53 #define GET_PF_TIME(n) {}
54 #endif
55 
56 namespace sm
57 {
58 const int STATUS_OK = 0;
59 const int SQL_NOT_FOUND = -1000;
60 const int SQL_KILLED = -1001;
61 const int CALPONT_INTERNAL_ERROR = -1007;
62 
63 //#if IDB_SM_DEBUG
64 //extern std::ofstream smlog;
65 //#define SMDEBUGLOG smlog
66 //#else
67 #define SMDEBUGLOG if (false) std::cout
68 //#endif
69 extern const std::string DEFAULT_SAVE_PATH;
70 
71 typedef uint64_t tableid_t;
72 typedef int32_t status_t;
73 
74 enum QueryState
75 {
76     NO_QUERY = 0,
77     QUERY_IN_PROCESS
78 };
79 
80 typedef struct Column
81 {
ColumnColumn82     Column(): tableID(-1) {}
~ColumnColumn83     ~Column() {}
84     int tableID;
85     int colPos;
86     int dataType;
87     std::vector <std::string> data;
88 } Column;
89 
90 typedef std::map <int, Column*> ResultMap;
91 
92 struct Profiler
93 {
94     struct timeval login;
95     struct timeval beforePlan;
96     struct timeval afterPlan;
97     struct timeval resultArrival;
98     struct timeval resultReady;
99     struct timeval endProcess;
prePlanProfiler100     long prePlan()
101     {
102         return (beforePlan.tv_sec - login.tv_sec) * 1000 +
103                (beforePlan.tv_usec - login.tv_usec) / 1000;
104     }
buildPlanProfiler105     long buildPlan()
106     {
107         return (afterPlan.tv_sec - beforePlan.tv_sec) * 1000 +
108                (afterPlan.tv_usec - beforePlan.tv_usec) / 1000;
109     }
jobProcessProfiler110     long jobProcess()
111     {
112         return (resultArrival.tv_sec - afterPlan.tv_sec) * 1000 +
113                (resultArrival.tv_usec - afterPlan.tv_usec) / 1000;
114     }
buildResultProfiler115     long buildResult()
116     {
117         return (resultReady.tv_sec - resultArrival.tv_sec) * 1000 +
118                (resultReady.tv_usec - resultArrival.tv_usec) / 1000;
119     }
tableFetchProfiler120     long tableFetch ()
121     {
122         return (endProcess.tv_sec - resultReady.tv_sec) * 1000 +
123                (endProcess.tv_usec - resultReady.tv_usec) / 1000;
124     }
125 };
126 
127 /** @brief Calpont table scan handle */
128 struct cpsm_tplsch_t
129 {
cpsm_tplsch_tcpsm_tplsch_t130     cpsm_tplsch_t() : tableid(0), rowsreturned(0), rowGroup(0), traceFlags(0), bandID(0), saveFlag(0), bandsReturned(0),
131         ctp(0) {}
~cpsm_tplsch_tcpsm_tplsch_t132     ~cpsm_tplsch_t()
133     {
134         delete rowGroup;
135     }
136 
137     tableid_t tableid;
138     uint64_t rowsreturned;
139     rowgroup::RowGroup* rowGroup;
140     messageqcpp::ByteStream bs;	// rowgroup bytestream. need to stay with the life span of rowgroup
141     uint32_t traceFlags;
142     // @bug 649
143     int bandID;		 // the band that being read from the disk
144     int key;			// unique key for the table's scan context
145     // @bug 626
146     uint16_t saveFlag;
147     uint32_t bandsReturned;
148     std::vector<execplan::CalpontSystemCatalog::ColType> ctp;
149     std::string errMsg;
150     rowgroup::RGData rgData;
deserializeTablecpsm_tplsch_t151     void deserializeTable(messageqcpp::ByteStream& bs)
152     {
153         if (!rowGroup)
154         {
155             rowGroup = new rowgroup::RowGroup();
156             rowGroup->deserialize(bs);
157         }
158         else
159         {
160             // XXXST: the 'true' is to ease the transition to RGDatas.  Take it out when the
161             // transition is done.
162             rgData.deserialize(bs, true);
163             rowGroup->setData(&rgData);
164             //rowGroup->setData(const_cast<uint8_t*>(bs.buf()));
165         }
166     }
167 
getStatuscpsm_tplsch_t168     uint16_t getStatus()
169     {
170         idbassert(rowGroup != 0);
171         return rowGroup->getStatus();
172     }
173 
getRowCountcpsm_tplsch_t174     uint64_t getRowCount()
175     {
176         if (rowGroup)
177             return rowGroup->getRowCount();
178         else
179             return 0;
180     }
181 
setErrMsgcpsm_tplsch_t182     void setErrMsg()
183     {
184         if (rowGroup && getStatus())
185         {
186             //bs.advance(rowGroup->getDataSize());
187             bs >> errMsg;
188         }
189         else
190         {
191             errMsg = "NOERROR";
192         }
193     }
194 };
195 typedef boost::shared_ptr<cpsm_tplsch_t> sp_cpsm_tplsch_t;
196 
197 /** @brief Calpont connection handle structure */
198 class cpsm_conhdl_t
199 {
200 public:
cpsm_conhdl_t(time_t v,const uint32_t sid,bool columnstore_local_query)201     cpsm_conhdl_t(time_t v, const uint32_t sid, bool columnstore_local_query) :
202         value(v), sessionID(sid), queryState (NO_QUERY),
203         exeMgr( new execplan::ClientRotator(sid, "ExeMgr", columnstore_local_query)),
204         tblinfo_idx(0), idxinfo_idx(0), curFetchTb (0)
205     { }
206 
207 
208     /** @brief connnect ExeMgr
209      *
210      * Try connecting to ExeMgr.  If no connection, try ExeMgr1,
211      * ExeMgr2... until timeout lapses. Then throw exception.
212      */
213     void connect(double timeout = 0.005)
214     {
215         exeMgr->connect(timeout);
216     }
217     EXPORT void write(messageqcpp::ByteStream bs);
218 
~cpsm_conhdl_t()219     ~cpsm_conhdl_t()
220     {
221         delete exeMgr;
222     }
223     EXPORT const std::string toString() const;
224     time_t value;
225     uint32_t sessionID;
226     short queryState;   // 0 -- NO_QUERY; 1 -- QUERY_IN_PROCESS
227     execplan::ClientRotator* exeMgr;
228     ResultMap resultSet;
229     Profiler pf;
230     int tblinfo_idx;
231     int idxinfo_idx;
232     std::string schemaname;
233     std::string tablename;
234     int tboid;
235     short requestType; // 0 -- ID2NAME; 1 -- NAME2ID
236     boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
237     // @bug 649; @bug 626
238     std::map <int, int> tidMap;	 // tableid-tableStartCount map
239     std::map <int, sp_cpsm_tplsch_t> tidScanMap;
240     std::map <int, int> keyBandMap; // key-savedBandCount map
241     int curFetchTb;				 // current fetching table key
242     std::string queryStats;
243     std::string extendedStats;
244     std::string miniStats;
245 private:
246 };
247 std::ostream& operator<<(std::ostream& output, const cpsm_conhdl_t& rhs);
248 
249 // @bug 626 save table bands to avoid sending plan too many times
250 enum SavingFlag
251 {
252     NO_SAVE = 0,
253     SAVING,
254     SAVED
255 };
256 
257 /** @brief Calpont table handle */
258 struct cpsm_tplh_t
259 {
cpsm_tplh_tcpsm_tplh_t260     cpsm_tplh_t() : tableid(0), rowsintable(0), bandID(0), saveFlag(NO_SAVE), bandsInTable(0) {}
261     tableid_t tableid;
262     int rowsintable;
263     // @bug 649
264     int bandID;		 // the band that being read from the disk
265     int key;			// unique key for the table's scan context
266     // @bug 626
267     uint16_t saveFlag;
268     int bandsInTable;
269 };
270 
271 struct cpsm_tid_t
272 {
cpsm_tid_tcpsm_tid_t273     cpsm_tid_t() : valid(false), value(0) {}
274     bool valid;
275     int value;
276 };
277 
278 extern status_t sm_init(uint32_t, cpsm_conhdl_t**, uint32_t columnstore_local_query = false);
279 extern status_t sm_cleanup(cpsm_conhdl_t*);
280 
281 extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*);
282 extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*);
283 extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0);
284 extern status_t tpl_scan_close(sp_cpsm_tplsch_t&);
285 extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false);
286 
287 }
288 
289 #undef EXPORT
290 
291 #endif
292 
293