1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2019 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 /**********************************************************************
20 * $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
21 *
22 *
23 ***********************************************************************/
24 /**
25 * @brief execution plan manager main program
26 *
27 * This is the ?main? program for dealing with execution plans and
28 * result sets. It sits in a loop waiting for CalpontExecutionPlan
29 * from the FEP. It then passes the CalpontExecutionPlan to the
30 * JobListFactory from which a JobList is obtained. This is passed
31 * to to the Query Manager running in the real-time portion of the
32 * EC. The ExecutionPlanManager waits until the Query Manager
33 * returns a result set for the job list. These results are passed
34 * into the CalpontResultFactory, which outputs a CalpontResultSet.
35 * The ExecutionPlanManager passes the CalpontResultSet into the
36 * VendorResultFactory which produces a result set tailored to the
37 * specific DBMS front end in use. The ExecutionPlanManager then
38 * sends the VendorResultSet back to the Calpont Database Connector
39 * on the Front-End Processor where it is returned to the DBMS
40 * front-end.
41 */
42 #include <iostream>
43 #include <cstdint>
44 #include <csignal>
45 #include <sys/resource.h>
46
47 #undef root_name
48 #include <boost/filesystem.hpp>
49
50 #include "calpontselectexecutionplan.h"
51 #include "activestatementcounter.h"
52 #include "distributedenginecomm.h"
53 #include "resourcemanager.h"
54 #include "configcpp.h"
55 #include "queryteleserverparms.h"
56 #include "iosocket.h"
57 #include "joblist.h"
58 #include "joblistfactory.h"
59 #include "oamcache.h"
60 #include "simplecolumn.h"
61 #include "bytestream.h"
62 #include "telestats.h"
63 #include "messageobj.h"
64 #include "messagelog.h"
65 #include "sqllogger.h"
66 #include "femsghandler.h"
67 #include "idberrorinfo.h"
68 #include "MonitorProcMem.h"
69 #include "liboamcpp.h"
70 #include "crashtrace.h"
71 #include "utils_utf8.h"
72 #include "service.h"
73
74 #include <mutex>
75 #include <thread>
76 #include <condition_variable>
77
78 #include "dbrm.h"
79
80 #include "mariadb_my_sys.h"
81
82
83 class Opt
84 {
85 public:
86 int m_debug;
87 bool m_e;
88 bool m_fg;
Opt(int argc,char * argv[])89 Opt(int argc, char *argv[])
90 :m_debug(0),
91 m_e(false),
92 m_fg(false)
93 {
94 int c;
95 while ((c = getopt(argc, argv, "edf")) != EOF)
96 {
97 switch (c)
98 {
99 case 'd':
100 m_debug++;
101 break;
102
103 case 'e':
104 m_e= true;
105 break;
106
107 case 'f':
108 m_fg= true;
109 break;
110
111 case '?':
112 default:
113 break;
114 }
115 }
116 }
117 };
118
119
120 class ServiceExeMgr: public Service, public Opt
121 {
122 protected:
123
log(logging::LOG_TYPE type,const std::string & str)124 void log(logging::LOG_TYPE type, const std::string &str)
125 {
126 logging::LoggingID logid(16);
127 logging::Message::Args args;
128 logging::Message message(8);
129 args.add(strerror(errno));
130 message.format(args);
131 logging::Logger logger(logid.fSubsysID);
132 logger.logMessage(type, message, logid);
133 }
134
135 public:
ServiceExeMgr(const Opt & opt)136 ServiceExeMgr(const Opt &opt)
137 :Service("ExeMgr"), Opt(opt)
138 { }
LogErrno()139 void LogErrno() override
140 {
141 log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
142 }
ParentLogChildMessage(const std::string & str)143 void ParentLogChildMessage(const std::string &str) override
144 {
145 log(logging::LOG_TYPE_INFO, str);
146 }
147 int Child() override;
Run()148 int Run()
149 {
150 return m_fg ? Child() : RunForking();
151 }
152 };
153
154
155 namespace
156 {
157
158 //If any flags other than the table mode flags are set, produce output to screeen
159 const uint32_t flagsWantOutput = (0xffffffff &
160 ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
161 ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
162
163 int gDebug;
164
165 const unsigned logDefaultMsg = logging::M0000;
166 const unsigned logDbProfStartStatement = logging::M0028;
167 const unsigned logDbProfEndStatement = logging::M0029;
168 const unsigned logStartSql = logging::M0041;
169 const unsigned logEndSql = logging::M0042;
170 const unsigned logRssTooBig = logging::M0044;
171 const unsigned logDbProfQueryStats = logging::M0047;
172 const unsigned logExeMgrExcpt = logging::M0055;
173
174 logging::Logger msgLog(16);
175
176 typedef std::map<uint32_t, size_t> SessionMemMap_t;
177 SessionMemMap_t sessionMemMap; // track memory% usage during a query
178 std::mutex sessionMemMapMutex;
179
180 //...The FrontEnd may establish more than 1 connection (which results in
181 // more than 1 ExeMgr thread) per session. These threads will share
182 // the same CalpontSystemCatalog object for that session. Here, we
183 // define a std::map to track how many threads are sharing each session, so
184 // that we know when we can safely delete a CalpontSystemCatalog object
185 // shared by multiple threads per session.
186 typedef std::map<uint32_t, uint32_t> ThreadCntPerSessionMap_t;
187 ThreadCntPerSessionMap_t threadCntPerSessionMap;
188 std::mutex threadCntPerSessionMapMutex;
189
190 //This var is only accessed using thread-safe inc/dec calls
191 ActiveStatementCounter* statementsRunningCount;
192
193 joblist::DistributedEngineComm* ec;
194
195 auto rm = joblist::ResourceManager::instance(true);
196
toInt(const std::string & val)197 int toInt(const std::string& val)
198 {
199 if (val.length() == 0) return -1;
200
201 return static_cast<int>(config::Config::fromText(val));
202 }
203
204 const std::string ExeMgr("ExeMgr1");
205
prettyPrintMiniInfo(const std::string & in)206 const std::string prettyPrintMiniInfo(const std::string& in)
207 {
208 //1. take the std::string and tok it by '\n'
209 //2. for each part in each line calc the longest part
210 //3. padding to each longest value, output a header and the lines
211 typedef boost::tokenizer<boost::char_separator<char> > my_tokenizer;
212 boost::char_separator<char> sep1("\n");
213 my_tokenizer tok1(in, sep1);
214 std::vector<std::string> lines;
215 std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows";
216 const int header_parts = 10;
217 lines.push_back(header);
218
219 for (my_tokenizer::const_iterator iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1)
220 {
221 if (!iter1->empty())
222 lines.push_back(*iter1);
223 }
224
225 std::vector<unsigned> lens;
226
227 for (int i = 0; i < header_parts; i++)
228 lens.push_back(0);
229
230 std::vector<std::vector<std::string> > lineparts;
231 std::vector<std::string>::iterator iter2;
232 int j;
233
234 for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++)
235 {
236 boost::char_separator<char> sep2(" ");
237 my_tokenizer tok2(*iter2, sep2);
238 int i;
239 std::vector<std::string> parts;
240 my_tokenizer::iterator iter3;
241
242 for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++)
243 {
244 if (i >= header_parts) break;
245
246 std::string part(*iter3);
247
248 if (j != 0 && i == 8)
249 part.resize(part.size() - 3);
250
251 assert(i < header_parts);
252
253 if (part.size() > lens[i])
254 lens[i] = part.size();
255
256 parts.push_back(part);
257 }
258
259 assert(i == header_parts);
260 lineparts.push_back(parts);
261 }
262
263 std::ostringstream oss;
264
265 std::vector<std::vector<std::string> >::iterator iter1 = lineparts.begin();
266 std::vector<std::vector<std::string> >::iterator end1 = lineparts.end();
267
268 oss << "\n";
269
270 while (iter1 != end1)
271 {
272 std::vector<std::string>::iterator iter2 = iter1->begin();
273 std::vector<std::string>::iterator end2 = iter1->end();
274 assert(distance(iter2, end2) == header_parts);
275 int i = 0;
276
277 while (iter2 != end2)
278 {
279 assert(i < header_parts);
280 oss << std::setw(lens[i]) << std::left << *iter2 << " ";
281 ++iter2;
282 i++;
283 }
284
285 oss << "\n";
286 ++iter1;
287 }
288
289 return oss.str();
290 }
291
timeNow()292 const std::string timeNow()
293 {
294 time_t outputTime = time(0);
295 struct tm ltm;
296 char buf[32]; //ctime(3) says at least 26
297 size_t len = 0;
298 #ifdef _MSC_VER
299 asctime_s(buf, 32, localtime_r(&outputTime, <m));
300 #else
301 asctime_r(localtime_r(&outputTime, <m), buf);
302 #endif
303 len = strlen(buf);
304
305 if (len > 0) --len;
306
307 if (buf[len] == '\n') buf[len] = 0;
308
309 return buf;
310 }
311
312 querytele::QueryTeleServerParms gTeleServerParms;
313
314 class SessionThread
315 {
316 public:
317
SessionThread(const messageqcpp::IOSocket & ios,joblist::DistributedEngineComm * ec,joblist::ResourceManager * rm)318 SessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec, joblist::ResourceManager* rm) :
319 fIos(ios), fEc(ec),
320 fRm(rm),
321 fStatsRetrieved(false),
322 fTeleClient(gTeleServerParms),
323 fOamCachePtr(oam::OamCache::makeOamCache())
324 {
325 }
326
327 private:
328
329 messageqcpp::IOSocket fIos;
330 joblist::DistributedEngineComm* fEc;
331 joblist::ResourceManager* fRm;
332 querystats::QueryStats fStats;
333
334 // Variables used to store return stats
335 bool fStatsRetrieved;
336
337 querytele::QueryTeleClient fTeleClient;
338
339 oam::OamCache* fOamCachePtr; //this ptr is copyable...
340
341 //...Reinitialize stats for start of a new query
initStats(uint32_t sessionId,std::string & sqlText)342 void initStats ( uint32_t sessionId, std::string& sqlText )
343 {
344 initMaxMemPct ( sessionId );
345
346 fStats.reset();
347 fStats.setStartTime();
348 fStats.fSessionID = sessionId;
349 fStats.fQuery = sqlText;
350 fStatsRetrieved = false;
351 }
352
353 //...Get % memory usage during latest query for sesssionId.
354 //...SessionId >= 0x80000000 is system catalog query we can ignore.
getMaxMemPct(uint32_t sessionId)355 static uint64_t getMaxMemPct ( uint32_t sessionId )
356 {
357 uint64_t maxMemoryPct = 0;
358
359 if ( sessionId < 0x80000000 )
360 {
361 std::lock_guard<std::mutex> lk(sessionMemMapMutex);
362 SessionMemMap_t::iterator mapIter =
363 sessionMemMap.find( sessionId );
364
365 if ( mapIter != sessionMemMap.end() )
366 {
367 maxMemoryPct = (uint64_t)mapIter->second;
368 }
369 }
370
371 return maxMemoryPct;
372 }
373
374 //...Delete sessionMemMap entry for the specified session's memory % use.
375 //...SessionId >= 0x80000000 is system catalog query we can ignore.
deleteMaxMemPct(uint32_t sessionId)376 static void deleteMaxMemPct ( uint32_t sessionId )
377 {
378 if ( sessionId < 0x80000000 )
379 {
380 std::lock_guard<std::mutex> lk(sessionMemMapMutex);
381 SessionMemMap_t::iterator mapIter =
382 sessionMemMap.find( sessionId );
383
384 if ( mapIter != sessionMemMap.end() )
385 {
386 sessionMemMap.erase( sessionId );
387 }
388 }
389 }
390
391 //...Get and log query stats to specified output stream
formatQueryStats(joblist::SJLP & jl,const std::string & label,bool includeNewLine,bool vtableModeOn,bool wantExtendedStats,uint64_t rowsReturned)392 const std::string formatQueryStats (
393 joblist::SJLP& jl, // joblist associated with query
394 const std::string& label, // header label to print in front of log output
395 bool includeNewLine,//include line breaks in query stats std::string
396 bool vtableModeOn,
397 bool wantExtendedStats,
398 uint64_t rowsReturned)
399 {
400 std::ostringstream os;
401
402 // Get stats if not already acquired for current query
403 if ( !fStatsRetrieved )
404 {
405 if (wantExtendedStats)
406 {
407 //wait for the ei data to be written by another thread (brain-dead)
408 struct timespec req = { 0, 250000 }; //250 usec
409 #ifdef _MSC_VER
410 Sleep(20); //20ms on Windows
411 #else
412 nanosleep(&req, 0);
413 #endif
414 }
415
416 // Get % memory usage during current query for sessionId
417 jl->querySummary( wantExtendedStats );
418 fStats = jl->queryStats();
419 fStats.fMaxMemPct = getMaxMemPct( fStats.fSessionID );
420 fStats.fRows = rowsReturned;
421 fStatsRetrieved = true;
422 }
423
424 std::string queryMode;
425 queryMode = (vtableModeOn ? "Distributed" : "Standard");
426
427 // Log stats to specified output stream
428 os << label <<
429 ": MaxMemPct-" << fStats.fMaxMemPct <<
430 "; NumTempFiles-" << fStats.fNumFiles <<
431 "; TempFileSpace-" << roundBytes(fStats.fFileBytes) <<
432 "; ApproxPhyI/O-" << fStats.fPhyIO <<
433 "; CacheI/O-" << fStats.fCacheIO <<
434 "; BlocksTouched-" << fStats.fMsgRcvCnt;
435
436 if ( includeNewLine )
437 os << std::endl << " "; // insert line break
438 else
439 os << "; "; // continue without line break
440
441 os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped <<
442 "; MsgBytesIn-" << roundBytes(fStats.fMsgBytesIn) <<
443 "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) <<
444 "; Mode-" << queryMode;
445
446 return os.str();
447 }
448
449 //...Increment the number of threads using the specified sessionId
incThreadCntPerSession(uint32_t sessionId)450 static void incThreadCntPerSession(uint32_t sessionId)
451 {
452 std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex);
453 ThreadCntPerSessionMap_t::iterator mapIter =
454 threadCntPerSessionMap.find(sessionId);
455
456 if (mapIter == threadCntPerSessionMap.end())
457 threadCntPerSessionMap.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
458 else
459 mapIter->second++;
460 }
461
462 //...Decrement the number of threads using the specified sessionId.
463 //...When the thread count for a sessionId reaches 0, the corresponding
464 //...CalpontSystemCatalog objects are deleted.
465 //...The user query and its associated catalog query have a different
466 //...session Id where the highest bit is flipped.
467 //...The object with id(sessionId | 0x80000000) cannot be removed before
468 //...user query session completes because the sysdata may be used for
469 //...debugging/stats purpose, such as result graph, etc.
decThreadCntPerSession(uint32_t sessionId)470 static void decThreadCntPerSession(uint32_t sessionId)
471 {
472 std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex);
473 ThreadCntPerSessionMap_t::iterator mapIter =
474 threadCntPerSessionMap.find(sessionId);
475
476 if (mapIter != threadCntPerSessionMap.end())
477 {
478 if (--mapIter->second == 0)
479 {
480 threadCntPerSessionMap.erase(mapIter);
481 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
482 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
483 }
484 }
485 }
486
487 //...Init sessionMemMap entry for specified session to 0 memory %.
488 //...SessionId >= 0x80000000 is system catalog query we can ignore.
initMaxMemPct(uint32_t sessionId)489 static void initMaxMemPct ( uint32_t sessionId )
490 {
491 if ( sessionId < 0x80000000 )
492 {
493 // std::cout << "Setting pct to 0 for session " << sessionId << std::endl;
494 std::lock_guard<std::mutex> lk(sessionMemMapMutex);
495 SessionMemMap_t::iterator mapIter = sessionMemMap.find( sessionId );
496
497 if ( mapIter == sessionMemMap.end() )
498 {
499 sessionMemMap[sessionId] = 0;
500 }
501 else
502 {
503 mapIter->second = 0;
504 }
505 }
506 }
507
508 //... Round off to human readable format (KB, MB, or GB).
roundBytes(uint64_t value) const509 const std::string roundBytes(uint64_t value) const
510 {
511 const char* units[] = {"B", "KB", "MB", "GB", "TB"};
512 uint64_t i = 0, up = 0;
513 uint64_t roundedValue = value;
514
515 while (roundedValue > 1024 && i < 4)
516 {
517 up = (roundedValue & 512);
518 roundedValue /= 1024;
519 i++;
520 }
521
522 if (up)
523 roundedValue++;
524
525 std::ostringstream oss;
526 oss << roundedValue << units[i];
527 return oss.str();
528 }
529
530 //...Round off to nearest (1024*1024) MB
roundMB(uint64_t value) const531 uint64_t roundMB ( uint64_t value ) const
532 {
533 uint64_t roundedValue = value >> 20;
534
535 if (value & 524288)
536 roundedValue++;
537
538 return roundedValue;
539 }
540
setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec & parms)541 void setRMParms ( const execplan::CalpontSelectExecutionPlan::RMParmVec& parms )
542 {
543 for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin();
544 it != parms.end(); ++it)
545 {
546 switch (it->id)
547 {
548 case execplan::PMSMALLSIDEMEMORY:
549 {
550 fRm->addHJPmMaxSmallSideMap(it->sessionId, it->value);
551 break;
552 }
553
554 case execplan::UMSMALLSIDEMEMORY:
555 {
556 fRm->addHJUmMaxSmallSideMap(it->sessionId, it->value);
557 break;
558 }
559
560 default:
561 ;
562 }
563 }
564 }
565
buildSysCache(const execplan::CalpontSelectExecutionPlan & csep,boost::shared_ptr<execplan::CalpontSystemCatalog> csc)566 void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
567 {
568 const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
569 execplan::CalpontSelectExecutionPlan::ColumnMap::const_iterator it;
570 std::string schemaName;
571
572 for (it = colMap.begin(); it != colMap.end(); ++it)
573 {
574 const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
575
576 if (sc)
577 {
578 schemaName = sc->schemaName();
579
580 // only the first time a schema is got will actually query
581 // system catalog. System catalog keeps a schema name std::map.
582 // if a schema exists, the call getSchemaInfo returns without
583 // doing anything.
584 if (!schemaName.empty())
585 csc->getSchemaInfo(schemaName);
586 }
587 }
588
589 execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
590
591 for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++ subIt)
592 {
593 buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
594 }
595 }
596
597 public:
598
operator ()()599 void operator()()
600 {
601 messageqcpp::ByteStream bs, inbs;
602 execplan::CalpontSelectExecutionPlan csep;
603 csep.sessionID(0);
604 joblist::SJLP jl;
605 bool incSessionThreadCnt = true;
606 std::mutex jlMutex;
607 std::condition_variable jlCleanupDone;
608 int destructing = 0;
609
610 bool selfJoin = false;
611 bool tryTuples = false;
612 bool usingTuples = false;
613 bool stmtCounted = false;
614
615 try
616 {
617 for (;;)
618 {
619 selfJoin = false;
620 tryTuples = false;
621 usingTuples = false;
622
623 if (jl)
624 {
625 // puts the real destruction in another thread to avoid
626 // making the whole session wait. It can take several seconds.
627 std::unique_lock<std::mutex> scoped(jlMutex);
628 destructing++;
629 std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] {
630 std::unique_lock<std::mutex> scoped(jlMutex);
631 const_cast<joblist::SJLP &>(jl).reset(); // this happens second; does real destruction
632 if (--destructing == 0)
633 jlCleanupDone.notify_one();
634 });
635 jl.reset(); // this runs first
636 bgdtor.detach();
637 }
638
639 bs = fIos.read();
640
641 if (bs.length() == 0)
642 {
643 if (gDebug > 1 || (gDebug && !csep.isInternal()))
644 std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
645
646 // connection closed by client
647 fIos.close();
648 break;
649 }
650 else if (bs.length() < 4) //Not a CalpontSelectExecutionPlan
651 {
652 if (gDebug)
653 std::cout << "### Got a not-a-plan for session id " << csep.sessionID()
654 << " with length " << bs.length() << std::endl;
655
656 fIos.close();
657 break;
658 }
659 else if (bs.length() == 4) //possible tuple flag
660 {
661 messageqcpp::ByteStream::quadbyte qb;
662 bs >> qb;
663
664 if (qb == 4) //UM wants new tuple i/f
665 {
666 if (gDebug)
667 std::cout << "### UM wants tuples" << std::endl;
668
669 tryTuples = true;
670 // now wait for the CSEP...
671 bs = fIos.read();
672 }
673 else if (qb == 5) //somebody wants stats
674 {
675 bs.restart();
676 qb = statementsRunningCount->cur();
677 bs << qb;
678 qb = statementsRunningCount->waiting();
679 bs << qb;
680 fIos.write(bs);
681 fIos.close();
682 break;
683 }
684 else
685 {
686 if (gDebug)
687 std::cout << "### Got a not-a-plan value " << qb << std::endl;
688
689 fIos.close();
690 break;
691 }
692 }
693
694 new_plan:
695 csep.unserialize(bs);
696
697 querytele::QueryTeleStats qts;
698
699 if ( !csep.isInternal() &&
700 (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT") )
701 {
702 qts.query_uuid = csep.uuid();
703 qts.msg_type = querytele::QueryTeleStats::QT_START;
704 qts.start_time = querytele::QueryTeleClient::timeNowms();
705 qts.query = csep.data();
706 qts.session_id = csep.sessionID();
707 qts.query_type = csep.queryType();
708 qts.system_name = fOamCachePtr->getSystemName();
709 qts.module_name = fOamCachePtr->getModuleName();
710 qts.local_query = csep.localQuery();
711 qts.schema_name = csep.schemaName();
712 fTeleClient.postQueryTele(qts);
713 }
714
715 if (gDebug > 1 || (gDebug && !csep.isInternal()))
716 std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
717
718 setRMParms(csep.rmParms());
719 // Re-establish lost PP connections.
720 if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
721 {
722 fEc->Setup();
723 }
724 // @bug 1021. try to get schema cache for a come in query.
725 // skip system catalog queries.
726 if (!csep.isInternal())
727 {
728 boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
729 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
730 buildSysCache(csep, csc);
731 }
732
733 // As soon as we have a session id for this thread, update the
734 // thread count per session; only do this once per thread.
735 // Mask 0x80000000 is for associate user query and csc query
736 if (incSessionThreadCnt)
737 {
738 incThreadCntPerSession( csep.sessionID() | 0x80000000 );
739 incSessionThreadCnt = false;
740 }
741
742 bool needDbProfEndStatementMsg = false;
743 logging::Message::Args args;
744 std::string sqlText = csep.data();
745 logging::LoggingID li(16, csep.sessionID(), csep.txnID());
746
747 // Initialize stats for this query, including
748 // init sessionMemMap entry for this session to 0 memory %.
749 // We will need this later for traceOn() or if we receive a
750 // table request with qb=3 (see below). This is also recorded
751 // as query start time.
752 initStats( csep.sessionID(), sqlText );
753 fStats.fQueryType = csep.queryType();
754
755 // Log start and end statement if tracing is enabled. Keep in
756 // mind the trace flag won't be set for system catalog queries.
757 if (csep.traceOn())
758 {
759 args.reset();
760 args.add((int)csep.statementID());
761 args.add((int)csep.verID().currentScn);
762 args.add(sqlText);
763 msgLog.logMessage(logging::LOG_TYPE_DEBUG,
764 logDbProfStartStatement,
765 args,
766 li);
767 needDbProfEndStatementMsg = true;
768 }
769
770 //Don't log subsequent self joins after first.
771 if (selfJoin)
772 sqlText = "";
773
774 std::ostringstream oss;
775 oss << sqlText << "; |" << csep.schemaName() << "|";
776 logging::SQLLogger sqlLog(oss.str(), li);
777
778 statementsRunningCount->incr(stmtCounted);
779
780 if (tryTuples)
781 {
782 try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
783 {
784 std::string emsg("NOERROR");
785 messageqcpp::ByteStream emsgBs;
786 messageqcpp::ByteStream::quadbyte tflg = 0;
787 jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true);
788 // assign query stats
789 jl->queryStats(fStats);
790
791 messageqcpp::ByteStream tbs;
792
793 if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
794 {
795 usingTuples = true;
796
797 //Tell the FE that we're sending tuples back, not TableBands
798 tbs << tflg;
799 fIos.write(tbs);
800 emsgBs.reset();
801 emsgBs << emsg;
802 fIos.write(emsgBs);
803 auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
804 assert(tjlp);
805 tbs.restart();
806 tbs << tjlp->getOutputRowGroup();
807 fIos.write(tbs);
808 }
809 else
810 {
811 statementsRunningCount->decr(stmtCounted);
812 tflg = jl->status();
813 emsg = jl->errMsg();
814 tbs << tflg;
815 fIos.write(tbs);
816 emsgBs.reset();
817 emsgBs << emsg;
818 fIos.write(emsgBs);
819 std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
820 continue;
821 }
822 }
823 catch (std::exception& ex)
824 {
825 std::ostringstream errMsg;
826 errMsg << "ExeMgr: error writing makeJoblist "
827 "response; " << ex.what();
828 throw std::runtime_error( errMsg.str() );
829 }
830 catch (...)
831 {
832 std::ostringstream errMsg;
833 errMsg << "ExeMgr: unknown error writing makeJoblist "
834 "response; ";
835 throw std::runtime_error( errMsg.str() );
836 }
837
838 if (!usingTuples)
839 {
840 if (gDebug)
841 std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
842 }
843 else
844 {
845 if (gDebug)
846 std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
847 }
848 }
849 else
850 {
851 usingTuples = false;
852 jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true);
853
854 if (jl->status() == 0)
855 {
856 std::string emsg;
857
858 if (jl->putEngineComm(fEc) != 0)
859 throw std::runtime_error(jl->errMsg());
860 }
861 else
862 {
863 throw std::runtime_error("ExeMgr: could not build a JobList!");
864 }
865 }
866
867
868 jl->doQuery();
869
870 execplan::CalpontSystemCatalog::OID tableOID;
871 bool swallowRows = false;
872 joblist::DeliveredTableMap tm;
873 uint64_t totalBytesSent = 0;
874 uint64_t totalRowCount = 0;
875
876 // Project each table as the FE asks for it
877 for (;;)
878 {
879 bs = fIos.read();
880
881 if (bs.length() == 0)
882 {
883 if (gDebug > 1 || (gDebug && !csep.isInternal()))
884 std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
885
886 break;
887 }
888
889 if (gDebug && bs.length() > 4)
890 std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " <<
891 bs.length() << std::endl;
892
893 //TODO: Holy crud! Can this be right?
894 //@bug 1444 Yes, if there is a self-join
895 if (bs.length() > 4)
896 {
897 selfJoin = true;
898 statementsRunningCount->decr(stmtCounted);
899 goto new_plan;
900 }
901
902 assert(bs.length() == 4);
903
904 messageqcpp::ByteStream::quadbyte qb;
905
906 try // @bug2244: try/catch around fIos.write() calls responding to qb command
907 {
908 bs >> qb;
909
910 if (gDebug > 1 || (gDebug && !csep.isInternal()))
911 std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl;
912
913 if (qb == 0)
914 {
915 // No more tables, query is done
916 break;
917 }
918 else if (qb == 1)
919 {
920 // super-secret flag indicating that the UM is going to scarf down all the rows in the
921 // query.
922 swallowRows = true;
923 tm = jl->deliveredTables();
924 continue;
925 }
926 else if (qb == 2)
927 {
928 // UM just wants any table
929 assert(swallowRows);
930 auto iter = tm.begin();
931
932 if (iter == tm.end())
933 {
934 if (gDebug > 1 || (gDebug && !csep.isInternal()))
935 std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl;
936
937 bs.restart();
938 bs << (messageqcpp::ByteStream::byte)1;
939 fIos.write(bs);
940 continue;
941 }
942
943 tableOID = iter->first;
944 }
945 else if (qb == 3) //special option-UM wants job stats std::string
946 {
947 std::string statsString;
948
949 //Log stats std::string to be sent back to front end
950 statsString = formatQueryStats(
951 jl,
952 "Query Stats",
953 false,
954 !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
955 (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
956 totalRowCount
957 );
958
959 bs.restart();
960 bs << statsString;
961
962 if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
963 {
964 bs << jl->extendedInfo();
965 bs << prettyPrintMiniInfo(jl->miniInfo());
966 }
967 else
968 {
969 std::string empty;
970 bs << empty;
971 bs << empty;
972 }
973
974 // send stats to connector for inserting to the querystats table
975 fStats.serialize(bs);
976 fIos.write(bs);
977 continue;
978 }
979 // for table mode handling
980 else if (qb == 4)
981 {
982 statementsRunningCount->decr(stmtCounted);
983 bs = fIos.read();
984 goto new_plan;
985 }
986 else // (qb > 3)
987 {
988 //Return table bands for the requested tableOID
989 tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
990 }
991 }
992 catch (std::exception& ex)
993 {
994 std::ostringstream errMsg;
995 errMsg << "ExeMgr: error writing qb response "
996 "for qb cmd " << qb <<
997 "; " << ex.what();
998 throw std::runtime_error( errMsg.str() );
999 }
1000 catch (...)
1001 {
1002 std::ostringstream errMsg;
1003 errMsg << "ExeMgr: unknown error writing qb response "
1004 "for qb cmd " << qb;
1005 throw std::runtime_error( errMsg.str() );
1006 }
1007
1008 if (swallowRows) tm.erase(tableOID);
1009
1010 FEMsgHandler msgHandler(jl, &fIos);
1011
1012 if (tableOID == 100)
1013 msgHandler.start();
1014
1015 //...Loop serializing table bands projected for the tableOID
1016 for (;;)
1017 {
1018 uint32_t rowCount;
1019
1020 rowCount = jl->projectTable(tableOID, bs);
1021
1022 msgHandler.stop();
1023
1024 if (jl->status())
1025 {
1026 const auto errInfo = logging::IDBErrorInfo::instance();
1027
1028 if (jl->errMsg().length() != 0)
1029 bs << jl->errMsg();
1030 else
1031 bs << errInfo->errorMsg(jl->status());
1032 }
1033
1034 try // @bug2244: try/catch around fIos.write() calls projecting rows
1035 {
1036 if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
1037 {
1038 // Skip the write to the front end until the last empty band. Used to time queries
1039 // through without any front end waiting.
1040 if (tableOID < 3000 || rowCount == 0)
1041 fIos.write(bs);
1042 }
1043 else
1044 {
1045 fIos.write(bs);
1046 }
1047 }
1048 catch (std::exception& ex)
1049 {
1050 msgHandler.stop();
1051 std::ostringstream errMsg;
1052 errMsg << "ExeMgr: error projecting rows "
1053 "for tableOID: " << tableOID <<
1054 "; rowCnt: " << rowCount <<
1055 "; prevTotRowCnt: " << totalRowCount <<
1056 "; " << ex.what();
1057 jl->abort();
1058
1059 while (rowCount)
1060 rowCount = jl->projectTable(tableOID, bs);
1061
1062 if (tableOID == 100 && msgHandler.aborted())
1063 {
1064 /* TODO: modularize the cleanup code, as well as
1065 * the rest of this fcn */
1066
1067 decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1068 statementsRunningCount->decr(stmtCounted);
1069 fIos.close();
1070 return;
1071 }
1072
1073 //std::cout << "connection drop\n";
1074 throw std::runtime_error( errMsg.str() );
1075 }
1076 catch (...)
1077 {
1078 std::ostringstream errMsg;
1079 msgHandler.stop();
1080 errMsg << "ExeMgr: unknown error projecting rows "
1081 "for tableOID: " <<
1082 tableOID << "; rowCnt: " << rowCount <<
1083 "; prevTotRowCnt: " << totalRowCount;
1084 jl->abort();
1085
1086 while (rowCount)
1087 rowCount = jl->projectTable(tableOID, bs);
1088
1089 throw std::runtime_error( errMsg.str() );
1090 }
1091
1092 totalRowCount += rowCount;
1093 totalBytesSent += bs.length();
1094
1095 if (rowCount == 0)
1096 {
1097 msgHandler.stop();
1098 // No more bands, table is done
1099 bs.reset();
1100
1101 // @bug 2083 decr active statement count here for table mode.
1102 if (!usingTuples)
1103 statementsRunningCount->decr(stmtCounted);
1104
1105 break;
1106 }
1107 else
1108 {
1109 bs.restart();
1110 }
1111 } // End of loop to project and serialize table bands for a table
1112 } // End of loop to process tables
1113
1114 // @bug 828
1115 if (csep.traceOn())
1116 jl->graph(csep.sessionID());
1117
1118 if (needDbProfEndStatementMsg)
1119 {
1120 std::string ss;
1121 std::ostringstream prefix;
1122 prefix << "ses:" << csep.sessionID() << " Query Totals";
1123
1124 //Log stats std::string to standard out
1125 ss = formatQueryStats(
1126 jl,
1127 prefix.str(),
1128 true,
1129 !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
1130 (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
1131 totalRowCount);
1132 //@Bug 1306. Added timing info for real time tracking.
1133 std::cout << ss << " at " << timeNow() << std::endl;
1134
1135 // log query stats to debug log file
1136 args.reset();
1137 args.add((int)csep.statementID());
1138 args.add(fStats.fMaxMemPct);
1139 args.add(fStats.fNumFiles);
1140 args.add(fStats.fFileBytes); // log raw byte count instead of MB
1141 args.add(fStats.fPhyIO);
1142 args.add(fStats.fCacheIO);
1143 args.add(fStats.fMsgRcvCnt);
1144 args.add(fStats.fMsgBytesIn);
1145 args.add(fStats.fMsgBytesOut);
1146 args.add(fStats.fCPBlocksSkipped);
1147 msgLog.logMessage(logging::LOG_TYPE_DEBUG,
1148 logDbProfQueryStats,
1149 args,
1150 li);
1151 //@bug 1327
1152 deleteMaxMemPct( csep.sessionID() );
1153 // Calling reset here, will cause joblist destructor to be
1154 // called, which "joins" the threads. We need to do that
1155 // here to make sure all syslogging from all the threads
1156 // are complete; and that our logDbProfEndStatement will
1157 // appear "last" in the syslog for this SQL statement.
1158 // puts the real destruction in another thread to avoid
1159 // making the whole session wait. It can take several seconds.
1160 int stmtID = csep.statementID();
1161 std::unique_lock<std::mutex> scoped(jlMutex);
1162 // C7's compiler complains about the msgLog capture here
1163 // msgLog is global scope, and passed by copy, so, unclear
1164 // what the warning is about.
1165 destructing++;
1166 std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing] {
1167 std::unique_lock<std::mutex> scoped(jlMutex);
1168 const_cast<joblist::SJLP &>(jl).reset(); // this happens second; does real destruction
1169 logging::Message::Args args;
1170 args.add(stmtID);
1171 msgLog.logMessage(logging::LOG_TYPE_DEBUG,
1172 logDbProfEndStatement,
1173 args,
1174 li);
1175 if (--destructing == 0)
1176 jlCleanupDone.notify_one();
1177 });
1178 jl.reset(); // this happens first
1179 bgdtor.detach();
1180 }
1181 else
1182 // delete sessionMemMap entry for this session's memory % use
1183 deleteMaxMemPct( csep.sessionID() );
1184
1185 std::string endtime(timeNow());
1186
1187 if ((csep.traceFlags() & flagsWantOutput) && (csep.sessionID() < 0x80000000))
1188 {
1189 std::cout << "For session " << csep.sessionID() << ": " <<
1190 totalBytesSent <<
1191 " bytes sent back at " << endtime << std::endl;
1192
1193 // @bug 663 - Implemented caltraceon(16) to replace the
1194 // $FIFO_SINK compiler definition in pColStep.
1195 // This option consumes rows in the project steps.
1196 if (csep.traceFlags() &
1197 execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
1198 {
1199 std::cout << std::endl;
1200 std::cout << "**** No data returned to DM. Rows consumed "
1201 "in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
1202 " ****" << std::endl;
1203 std::cout << std::endl;
1204 }
1205 else if (csep.traceFlags() &
1206 execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
1207 {
1208 std::cout << std::endl;
1209 std::cout << "**** No data returned to DM - caltrace(8) is "
1210 "on (SWALLOW_ROWS_EXEMGR). ****" << std::endl;
1211 std::cout << std::endl;
1212 }
1213 }
1214
1215 statementsRunningCount->decr(stmtCounted);
1216
1217 if ( !csep.isInternal() &&
1218 (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT") )
1219 {
1220 qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
1221 qts.max_mem_pct = fStats.fMaxMemPct;
1222 qts.num_files = fStats.fNumFiles;
1223 qts.phy_io = fStats.fPhyIO;
1224 qts.cache_io = fStats.fCacheIO;
1225 qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
1226 qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
1227 qts.msg_bytes_in = fStats.fMsgBytesIn;
1228 qts.msg_bytes_out = fStats.fMsgBytesOut;
1229 qts.rows = totalRowCount;
1230 qts.end_time = querytele::QueryTeleClient::timeNowms();
1231 qts.session_id = csep.sessionID();
1232 qts.query_type = csep.queryType();
1233 qts.query = csep.data();
1234 qts.system_name = fOamCachePtr->getSystemName();
1235 qts.module_name = fOamCachePtr->getModuleName();
1236 qts.local_query = csep.localQuery();
1237 fTeleClient.postQueryTele(qts);
1238 }
1239 }
1240
1241 // Release CSC object (for sessionID) that was added by makeJobList()
1242 // Mask 0x80000000 is for associate user query and csc query.
1243 // (actual joblist destruction happens at the top of this loop)
1244 decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1245 }
1246 catch (std::exception& ex)
1247 {
1248 decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1249 statementsRunningCount->decr(stmtCounted);
1250 std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
1251 logging::Message::Args args;
1252 logging::LoggingID li(16, csep.sessionID(), csep.txnID());
1253 args.add(ex.what());
1254 msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
1255 fIos.close();
1256 }
1257 catch (...)
1258 {
1259 decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1260 statementsRunningCount->decr(stmtCounted);
1261 std::cerr << "### Exception caught!" << std::endl;
1262 logging::Message::Args args;
1263 logging::LoggingID li(16, csep.sessionID(), csep.txnID());
1264 args.add("ExeMgr caught unknown exception");
1265 msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
1266 fIos.close();
1267 }
1268
1269 // make sure we don't leave scope while joblists are being destroyed
1270 std::unique_lock<std::mutex> scoped(jlMutex);
1271 while (destructing > 0)
1272 jlCleanupDone.wait(scoped);
1273 }
1274 };
1275
1276
1277 class RssMonFcn : public utils::MonitorProcMem
1278 {
1279 public:
RssMonFcn(size_t maxPct,int pauseSeconds)1280 RssMonFcn(size_t maxPct, int pauseSeconds) :
1281 MonitorProcMem(maxPct, 0, 21, pauseSeconds) {}
1282
1283 /*virtual*/
operator ()() const1284 void operator()() const
1285 {
1286 for (;;)
1287 {
1288 size_t rssMb = rss();
1289 size_t pct = rssMb * 100 / fMemTotal;
1290
1291 if (pct > fMaxPct)
1292 {
1293 if (fMaxPct >= 95)
1294 {
1295 std::cerr << "Too much memory allocated!" << std::endl;
1296 logging::Message::Args args;
1297 args.add((int)pct);
1298 args.add((int)fMaxPct);
1299 msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logRssTooBig, args, logging::LoggingID(16));
1300 exit(1);
1301 }
1302
1303 if (statementsRunningCount->cur() == 0)
1304 {
1305 std::cerr << "Too much memory allocated!" << std::endl;
1306 logging::Message::Args args;
1307 args.add((int)pct);
1308 args.add((int)fMaxPct);
1309 msgLog.logMessage(logging::LOG_TYPE_WARNING, logRssTooBig, args, logging::LoggingID(16));
1310 exit(1);
1311 }
1312
1313 std::cerr << "Too much memory allocated, but stmts running" << std::endl;
1314 }
1315
1316 // Update sessionMemMap entries lower than current mem % use
1317 {
1318 std::lock_guard<std::mutex> lk(sessionMemMapMutex);
1319
1320 for (SessionMemMap_t::iterator mapIter = sessionMemMap.begin();
1321 mapIter != sessionMemMap.end();
1322 ++mapIter)
1323 {
1324 if (pct > mapIter->second)
1325 {
1326 mapIter->second = pct;
1327 }
1328 }
1329
1330 }
1331
1332 pause_();
1333 }
1334 }
1335 };
1336
1337 #ifdef _MSC_VER
exit_(int)1338 void exit_(int)
1339 {
1340 exit(0);
1341 }
1342 #endif
1343
added_a_pm(int)1344 void added_a_pm(int)
1345 {
1346 logging::LoggingID logid(21, 0, 0);
1347 logging::Message::Args args1;
1348 logging::Message msg(1);
1349 args1.add("exeMgr caught SIGHUP. Resetting connections");
1350 msg.format( args1 );
1351 std::cout << msg.msg().c_str() << std::endl;
1352 logging::Logger logger(logid.fSubsysID);
1353 logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid);
1354
1355 if (ec)
1356 {
1357 oam::OamCache* oamCache = oam::OamCache::makeOamCache();
1358 oamCache->forceReload();
1359 ec->Setup();
1360 }
1361 }
1362
printTotalUmMemory(int sig)1363 void printTotalUmMemory(int sig)
1364 {
1365 int64_t num = rm->availableMemory();
1366 std::cout << "Total UM memory available: " << num << std::endl;
1367 }
1368
setupSignalHandlers()1369 void setupSignalHandlers()
1370 {
1371 #ifdef _MSC_VER
1372 signal(SIGINT, exit_);
1373 signal(SIGTERM, exit_);
1374 #else
1375 struct sigaction ign;
1376
1377 memset(&ign, 0, sizeof(ign));
1378 ign.sa_handler = SIG_IGN;
1379
1380 sigaction(SIGPIPE, &ign, 0);
1381
1382 memset(&ign, 0, sizeof(ign));
1383 ign.sa_handler = added_a_pm;
1384 sigaction(SIGHUP, &ign, 0);
1385 ign.sa_handler = printTotalUmMemory;
1386 sigaction(SIGUSR1, &ign, 0);
1387 memset(&ign, 0, sizeof(ign));
1388 ign.sa_handler = fatalHandler;
1389 sigaction(SIGSEGV, &ign, 0);
1390 sigaction(SIGABRT, &ign, 0);
1391 sigaction(SIGFPE, &ign, 0);
1392 #endif
1393 }
1394
setupCwd(joblist::ResourceManager * rm)1395 int8_t setupCwd(joblist::ResourceManager* rm)
1396 {
1397 std::string workdir = rm->getScWorkingDir();
1398 int8_t rc = chdir(workdir.c_str());
1399
1400 if (rc < 0 || access(".", W_OK) != 0)
1401 rc = chdir("/tmp");
1402
1403 return (rc < 0) ? -5 : rc;
1404 }
1405
startRssMon(size_t maxPct,int pauseSeconds)1406 void startRssMon(size_t maxPct, int pauseSeconds)
1407 {
1408 new boost::thread(RssMonFcn(maxPct, pauseSeconds));
1409 }
1410
setupResources()1411 int setupResources()
1412 {
1413 #ifdef _MSC_VER
1414 //FIXME:
1415 #else
1416 struct rlimit rlim;
1417
1418 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
1419 {
1420 return -1;
1421 }
1422
1423 rlim.rlim_cur = rlim.rlim_max = 65536;
1424
1425 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
1426 {
1427 return -2;
1428 }
1429
1430 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
1431 {
1432 return -3;
1433 }
1434
1435 if (rlim.rlim_cur != 65536)
1436 {
1437 return -4;
1438 }
1439
1440 #endif
1441 return 0;
1442 }
1443
1444 }
1445
cleanTempDir()1446 void cleanTempDir()
1447 {
1448 using TempDirPurpose = config::Config::TempDirPurpose;
1449 struct Dirs
1450 {
1451 std::string section;
1452 std::string allowed;
1453 TempDirPurpose purpose;
1454 };
1455 std::vector<Dirs> dirs{
1456 {
1457 "HashJoin",
1458 "AllowDiskBasedJoin",
1459 TempDirPurpose::Joins
1460 },
1461 {
1462 "RowAggregation",
1463 "AllowDiskBasedAggregation",
1464 TempDirPurpose::Aggregates
1465 }
1466 };
1467 const auto config = config::Config::makeConfig();
1468
1469 for (const auto& dir : dirs)
1470 {
1471 std::string allowStr = config->getConfig(dir.section, dir.allowed);
1472 bool allow = (allowStr == "Y" || allowStr == "y");
1473
1474 std::string tmpPrefix = config->getTempFileDir(dir.purpose);
1475
1476 if (allow && tmpPrefix.empty())
1477 {
1478 std::cerr << "Empty tmp directory name for " << dir.section << std::endl;
1479 logging::LoggingID logid(16, 0, 0);
1480 logging::Message::Args args;
1481 logging::Message message(8);
1482 args.add("Empty tmp directory name for:");
1483 args.add(dir.section);
1484 message.format(args);
1485 logging::Logger logger(logid.fSubsysID);
1486 logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid);
1487 }
1488
1489 tmpPrefix += "/";
1490
1491 idbassert(tmpPrefix != "/");
1492
1493 /* This is quite scary as ExeMgr usually runs as root */
1494 try
1495 {
1496 if (allow)
1497 {
1498 boost::filesystem::remove_all(tmpPrefix);
1499 }
1500 boost::filesystem::create_directories(tmpPrefix);
1501 }
1502 catch (const std::exception &ex)
1503 {
1504 std::cerr << ex.what() << std::endl;
1505 logging::LoggingID logid(16, 0, 0);
1506 logging::Message::Args args;
1507 logging::Message message(8);
1508 args.add("Exception whilst cleaning tmpdir: ");
1509 args.add(ex.what());
1510 message.format(args);
1511 logging::Logger logger(logid.fSubsysID);
1512 logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
1513 }
1514 catch (...)
1515 {
1516 std::cerr << "Caught unknown exception during tmpdir cleanup"
1517 << std::endl;
1518 logging::LoggingID logid(16, 0, 0);
1519 logging::Message::Args args;
1520 logging::Message message(8);
1521 args.add("Unknown exception whilst cleaning tmpdir");
1522 message.format(args);
1523 logging::Logger logger(logid.fSubsysID);
1524 logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
1525 }
1526 }
1527 }
1528
1529
Child()1530 int ServiceExeMgr::Child()
1531 {
1532
1533 gDebug= m_debug;
1534
1535 #ifdef _MSC_VER
1536 //FIXME:
1537 #else
1538
1539 // Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
1540 if (!m_e)
1541 setenv("CALPONT_CSC_IDENT", "um", 1);
1542
1543 #endif
1544 setupSignalHandlers();
1545 int err = 0;
1546 if (!m_debug)
1547 err = setupResources();
1548 std::string errMsg;
1549
1550 switch (err)
1551 {
1552 case -1:
1553 case -3:
1554 errMsg = "Error getting file limits, please see non-root install documentation";
1555 break;
1556
1557 case -2:
1558 errMsg = "Error setting file limits, please see non-root install documentation";
1559 break;
1560
1561 case -4:
1562 errMsg = "Could not install file limits to required value, please see non-root install documentation";
1563 break;
1564
1565 default:
1566 errMsg = "Couldn't change working directory or unknown error";
1567 break;
1568 }
1569
1570 err = setupCwd(rm);
1571
1572 if (err < 0)
1573 {
1574 oam::Oam oam;
1575 logging::Message::Args args;
1576 logging::Message message;
1577 args.add( errMsg );
1578 message.format(args);
1579 logging::LoggingID lid(16);
1580 logging::MessageLog ml(lid);
1581 ml.logCriticalMessage( message );
1582 std::cerr << errMsg << std::endl;
1583
1584 NotifyServiceInitializationFailed();
1585 return 2;
1586 }
1587
1588 cleanTempDir();
1589
1590 logging::MsgMap msgMap;
1591 msgMap[logDefaultMsg] = logging::Message(logDefaultMsg);
1592 msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement);
1593 msgMap[logDbProfEndStatement] = logging::Message(logDbProfEndStatement);
1594 msgMap[logStartSql] = logging::Message(logStartSql);
1595 msgMap[logEndSql] = logging::Message(logEndSql);
1596 msgMap[logRssTooBig] = logging::Message(logRssTooBig);
1597 msgMap[logDbProfQueryStats] = logging::Message(logDbProfQueryStats);
1598 msgMap[logExeMgrExcpt] = logging::Message(logExeMgrExcpt);
1599 msgLog.msgMap(msgMap);
1600
1601 ec = joblist::DistributedEngineComm::instance(rm, true);
1602 ec->Open();
1603
1604 bool tellUser = true;
1605
1606 messageqcpp::MessageQueueServer* mqs;
1607
1608 statementsRunningCount = new ActiveStatementCounter(rm->getEmExecQueueSize());
1609
1610 for (;;)
1611 {
1612 try
1613 {
1614 mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm->getConfig(), messageqcpp::ByteStream::BlockSize, 64);
1615 break;
1616 }
1617 catch (std::runtime_error& re)
1618 {
1619 std::string what = re.what();
1620
1621 if (what.find("Address already in use") != std::string::npos)
1622 {
1623 if (tellUser)
1624 {
1625 std::cerr << "Address already in use, retrying..." << std::endl;
1626 tellUser = false;
1627 }
1628
1629 sleep(5);
1630 }
1631 else
1632 {
1633 throw;
1634 }
1635 }
1636 }
1637
1638 // class jobstepThreadPool is used by other processes. We can't call
1639 // resourcemanaager (rm) functions during the static creation of threadpool
1640 // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
1641 // From the pools perspective, it has no idea if it is ExeMgr doing the
1642 // creation, so it has no idea which way to set the flag. So we set the max here.
1643 joblist::JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
1644 joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList");
1645
1646 if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
1647 {
1648 joblist::JobStep::jobstepThreadPool.setDebug(true);
1649 joblist::JobStep::jobstepThreadPool.invoke(threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool));
1650 }
1651
1652 int serverThreads = rm->getEmServerThreads();
1653 int maxPct = rm->getEmMaxPct();
1654 int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
1655 int priority = rm->getEmPriority();
1656
1657 FEMsgHandler::threadPool.setMaxThreads(serverThreads);
1658 FEMsgHandler::threadPool.setName("FEMsgHandler");
1659
1660 if (maxPct > 0)
1661 startRssMon(maxPct, pauseSeconds);
1662
1663 #ifndef _MSC_VER
1664 setpriority(PRIO_PROCESS, 0, priority);
1665 #endif
1666
1667 std::string teleServerHost(rm->getConfig()->getConfig("QueryTele", "Host"));
1668
1669 if (!teleServerHost.empty())
1670 {
1671 int teleServerPort = toInt(rm->getConfig()->getConfig("QueryTele", "Port"));
1672
1673 if (teleServerPort > 0)
1674 {
1675 gTeleServerParms.host = teleServerHost;
1676 gTeleServerParms.port = teleServerPort;
1677 }
1678 }
1679
1680 NotifyServiceStarted();
1681
1682 std::cout << "Starting ExeMgr: st = " << serverThreads <<
1683 ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
1684 rm->getConfig()->configFile() << std::endl;
1685
1686 {
1687 BRM::DBRM *dbrm = new BRM::DBRM();
1688 dbrm->setSystemQueryReady(true);
1689 delete dbrm;
1690 }
1691
1692 threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
1693 exeMgrThreadPool.setName("ExeMgrServer");
1694
1695 if (rm->getExeMgrThreadPoolDebug() == "Y" || rm->getExeMgrThreadPoolDebug() == "y")
1696 {
1697 exeMgrThreadPool.setDebug(true);
1698 exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
1699 }
1700
1701 for (;;)
1702 {
1703 messageqcpp::IOSocket ios;
1704 ios = mqs->accept();
1705 exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
1706 }
1707
1708 exeMgrThreadPool.wait();
1709
1710 return 0;
1711 }
1712
1713
main(int argc,char * argv[])1714 int main(int argc, char* argv[])
1715 {
1716 opterr = 0;
1717 Opt opt(argc, argv);
1718
1719 // Set locale language
1720 setlocale(LC_ALL, "");
1721 setlocale(LC_NUMERIC, "C");
1722
1723 // This is unset due to the way we start it
1724 program_invocation_short_name = const_cast<char*>("ExeMgr");
1725
1726 // Initialize the charset library
1727 my_init();
1728
1729 return ServiceExeMgr(opt).Run();
1730 }
1731
1732 // vim:ts=4 sw=4:
1733