1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 /*********************************************************************** 19 * $Id: sm.h 9254 2013-02-04 19:40:31Z rdempsey $ 20 * 21 ***********************************************************************/ 22 /** @file */ 23 24 #ifndef SM_H__ 25 #define SM_H__ 26 27 #include <stdint.h> 28 #include <set> 29 #include <map> 30 #include <string> 31 #include <sys/time.h> 32 #include <iostream> 33 34 #include "calpontsystemcatalog.h" 35 #include "clientrotator.h" 36 #include "rowgroup.h" 37 #include "calpontselectexecutionplan.h" 38 #include "querystats.h" 39 40 #define IDB_SM_DEBUG 0 41 #define IDB_SM_PROFILE 0 42 43 #if defined(_MSC_VER) && defined(xxxDLLEXPORT) 44 #define EXPORT __declspec(dllexport) 45 #else 46 #define EXPORT 47 #endif 48 49 #if IDB_SM_PROFILE 50 #include <sys/time.h> 51 #define GET_PF_TIME(n) {gettimeofday(&n, NULL);} 52 #else 53 #define GET_PF_TIME(n) {} 54 #endif 55 56 namespace sm 57 { 58 const int STATUS_OK = 0; 59 const int SQL_NOT_FOUND = -1000; 60 const int SQL_KILLED = -1001; 61 const int CALPONT_INTERNAL_ERROR = -1007; 62 63 //#if IDB_SM_DEBUG 64 //extern std::ofstream smlog; 65 //#define SMDEBUGLOG smlog 66 //#else 67 #define SMDEBUGLOG if (false) std::cout 68 //#endif 69 extern const std::string DEFAULT_SAVE_PATH; 70 71 typedef uint64_t tableid_t; 72 typedef int32_t status_t; 73 74 enum QueryState 75 { 76 NO_QUERY = 0, 77 QUERY_IN_PROCESS 78 }; 79 80 typedef struct Column 81 { ColumnColumn82 Column(): tableID(-1) {} ~ColumnColumn83 ~Column() {} 84 int tableID; 85 int colPos; 86 int dataType; 87 std::vector <std::string> data; 88 } Column; 89 90 typedef std::map <int, Column*> ResultMap; 91 92 struct Profiler 93 { 94 struct timeval login; 95 struct timeval beforePlan; 96 struct timeval afterPlan; 97 struct timeval resultArrival; 98 struct timeval resultReady; 99 struct timeval endProcess; prePlanProfiler100 long prePlan() 101 { 102 return (beforePlan.tv_sec - login.tv_sec) * 1000 + 103 (beforePlan.tv_usec - login.tv_usec) / 1000; 104 } buildPlanProfiler105 long buildPlan() 106 { 107 return (afterPlan.tv_sec - beforePlan.tv_sec) * 1000 + 108 (afterPlan.tv_usec - beforePlan.tv_usec) / 1000; 109 } jobProcessProfiler110 long jobProcess() 111 { 112 return (resultArrival.tv_sec - afterPlan.tv_sec) * 1000 + 113 (resultArrival.tv_usec - afterPlan.tv_usec) / 1000; 114 } buildResultProfiler115 long buildResult() 116 { 117 return (resultReady.tv_sec - resultArrival.tv_sec) * 1000 + 118 (resultReady.tv_usec - resultArrival.tv_usec) / 1000; 119 } tableFetchProfiler120 long tableFetch () 121 { 122 return (endProcess.tv_sec - resultReady.tv_sec) * 1000 + 123 (endProcess.tv_usec - resultReady.tv_usec) / 1000; 124 } 125 }; 126 127 /** @brief Calpont table scan handle */ 128 struct cpsm_tplsch_t 129 { cpsm_tplsch_tcpsm_tplsch_t130 cpsm_tplsch_t() : tableid(0), rowsreturned(0), rowGroup(0), traceFlags(0), bandID(0), saveFlag(0), bandsReturned(0), 131 ctp(0) {} ~cpsm_tplsch_tcpsm_tplsch_t132 ~cpsm_tplsch_t() 133 { 134 delete rowGroup; 135 } 136 137 tableid_t tableid; 138 uint64_t rowsreturned; 139 rowgroup::RowGroup* rowGroup; 140 messageqcpp::ByteStream bs; // rowgroup bytestream. need to stay with the life span of rowgroup 141 uint32_t traceFlags; 142 // @bug 649 143 int bandID; // the band that being read from the disk 144 int key; // unique key for the table's scan context 145 // @bug 626 146 uint16_t saveFlag; 147 uint32_t bandsReturned; 148 std::vector<execplan::CalpontSystemCatalog::ColType> ctp; 149 std::string errMsg; 150 rowgroup::RGData rgData; deserializeTablecpsm_tplsch_t151 void deserializeTable(messageqcpp::ByteStream& bs) 152 { 153 if (!rowGroup) 154 { 155 rowGroup = new rowgroup::RowGroup(); 156 rowGroup->deserialize(bs); 157 } 158 else 159 { 160 // XXXST: the 'true' is to ease the transition to RGDatas. Take it out when the 161 // transition is done. 162 rgData.deserialize(bs, true); 163 rowGroup->setData(&rgData); 164 //rowGroup->setData(const_cast<uint8_t*>(bs.buf())); 165 } 166 } 167 getStatuscpsm_tplsch_t168 uint16_t getStatus() 169 { 170 idbassert(rowGroup != 0); 171 return rowGroup->getStatus(); 172 } 173 getRowCountcpsm_tplsch_t174 uint64_t getRowCount() 175 { 176 if (rowGroup) 177 return rowGroup->getRowCount(); 178 else 179 return 0; 180 } 181 setErrMsgcpsm_tplsch_t182 void setErrMsg() 183 { 184 if (rowGroup && getStatus()) 185 { 186 //bs.advance(rowGroup->getDataSize()); 187 bs >> errMsg; 188 } 189 else 190 { 191 errMsg = "NOERROR"; 192 } 193 } 194 }; 195 typedef boost::shared_ptr<cpsm_tplsch_t> sp_cpsm_tplsch_t; 196 197 /** @brief Calpont connection handle structure */ 198 class cpsm_conhdl_t 199 { 200 public: cpsm_conhdl_t(time_t v,const uint32_t sid,bool columnstore_local_query)201 cpsm_conhdl_t(time_t v, const uint32_t sid, bool columnstore_local_query) : 202 value(v), sessionID(sid), queryState (NO_QUERY), 203 exeMgr( new execplan::ClientRotator(sid, "ExeMgr", columnstore_local_query)), 204 tblinfo_idx(0), idxinfo_idx(0), curFetchTb (0) 205 { } 206 207 208 /** @brief connnect ExeMgr 209 * 210 * Try connecting to ExeMgr. If no connection, try ExeMgr1, 211 * ExeMgr2... until timeout lapses. Then throw exception. 212 */ 213 void connect(double timeout = 0.005) 214 { 215 exeMgr->connect(timeout); 216 } 217 EXPORT void write(messageqcpp::ByteStream bs); 218 ~cpsm_conhdl_t()219 ~cpsm_conhdl_t() 220 { 221 delete exeMgr; 222 } 223 EXPORT const std::string toString() const; 224 time_t value; 225 uint32_t sessionID; 226 short queryState; // 0 -- NO_QUERY; 1 -- QUERY_IN_PROCESS 227 execplan::ClientRotator* exeMgr; 228 ResultMap resultSet; 229 Profiler pf; 230 int tblinfo_idx; 231 int idxinfo_idx; 232 std::string schemaname; 233 std::string tablename; 234 int tboid; 235 short requestType; // 0 -- ID2NAME; 1 -- NAME2ID 236 boost::shared_ptr<execplan::CalpontSystemCatalog> csc; 237 // @bug 649; @bug 626 238 std::map <int, int> tidMap; // tableid-tableStartCount map 239 std::map <int, sp_cpsm_tplsch_t> tidScanMap; 240 std::map <int, int> keyBandMap; // key-savedBandCount map 241 int curFetchTb; // current fetching table key 242 std::string queryStats; 243 std::string extendedStats; 244 std::string miniStats; 245 private: 246 }; 247 std::ostream& operator<<(std::ostream& output, const cpsm_conhdl_t& rhs); 248 249 // @bug 626 save table bands to avoid sending plan too many times 250 enum SavingFlag 251 { 252 NO_SAVE = 0, 253 SAVING, 254 SAVED 255 }; 256 257 /** @brief Calpont table handle */ 258 struct cpsm_tplh_t 259 { cpsm_tplh_tcpsm_tplh_t260 cpsm_tplh_t() : tableid(0), rowsintable(0), bandID(0), saveFlag(NO_SAVE), bandsInTable(0) {} 261 tableid_t tableid; 262 int rowsintable; 263 // @bug 649 264 int bandID; // the band that being read from the disk 265 int key; // unique key for the table's scan context 266 // @bug 626 267 uint16_t saveFlag; 268 int bandsInTable; 269 }; 270 271 struct cpsm_tid_t 272 { cpsm_tid_tcpsm_tid_t273 cpsm_tid_t() : valid(false), value(0) {} 274 bool valid; 275 int value; 276 }; 277 278 extern status_t sm_init(uint32_t, cpsm_conhdl_t**, uint32_t columnstore_local_query = false); 279 extern status_t sm_cleanup(cpsm_conhdl_t*); 280 281 extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*); 282 extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); 283 extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); 284 extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); 285 extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false); 286 287 } 288 289 #undef EXPORT 290 291 #endif 292 293