1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2019 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 /**********************************************************************
20 *   $Id: main.cpp 1000 2013-07-24 21:05:51Z pleblanc $
21 *
22 *
23 ***********************************************************************/
24 /**
25  * @brief execution plan manager main program
26  *
27  * This is the ?main? program for dealing with execution plans and
28  * result sets. It sits in a loop waiting for CalpontExecutionPlan
29  * from the FEP. It then passes the CalpontExecutionPlan to the
30  * JobListFactory from which a JobList is obtained. This is passed
31  * to to the Query Manager running in the real-time portion of the
32  * EC. The ExecutionPlanManager waits until the Query Manager
33  * returns a result set for the job list. These results are passed
34  * into the CalpontResultFactory, which outputs a CalpontResultSet.
35  * The ExecutionPlanManager passes the CalpontResultSet into the
36  * VendorResultFactory which produces a result set tailored to the
37  * specific DBMS front end in use. The ExecutionPlanManager then
38  * sends the VendorResultSet back to the Calpont Database Connector
39  * on the Front-End Processor where it is returned to the DBMS
40  * front-end.
41  */
42 #include <iostream>
43 #include <cstdint>
44 #include <csignal>
45 #include <sys/resource.h>
46 
47 #undef root_name
48 #include <boost/filesystem.hpp>
49 
50 #include "calpontselectexecutionplan.h"
51 #include "activestatementcounter.h"
52 #include "distributedenginecomm.h"
53 #include "resourcemanager.h"
54 #include "configcpp.h"
55 #include "queryteleserverparms.h"
56 #include "iosocket.h"
57 #include "joblist.h"
58 #include "joblistfactory.h"
59 #include "oamcache.h"
60 #include "simplecolumn.h"
61 #include "bytestream.h"
62 #include "telestats.h"
63 #include "messageobj.h"
64 #include "messagelog.h"
65 #include "sqllogger.h"
66 #include "femsghandler.h"
67 #include "idberrorinfo.h"
68 #include "MonitorProcMem.h"
69 #include "liboamcpp.h"
70 #include "crashtrace.h"
71 #include "utils_utf8.h"
72 #include "service.h"
73 
74 #include <mutex>
75 #include <thread>
76 #include <condition_variable>
77 
78 #include "dbrm.h"
79 
80 #include "mariadb_my_sys.h"
81 
82 
83 class Opt
84 {
85 public:
86     int m_debug;
87     bool m_e;
88     bool m_fg;
Opt(int argc,char * argv[])89     Opt(int argc, char *argv[])
90      :m_debug(0),
91       m_e(false),
92       m_fg(false)
93     {
94         int c;
95         while ((c = getopt(argc, argv, "edf")) != EOF)
96         {
97             switch (c)
98             {
99                 case 'd':
100                     m_debug++;
101                     break;
102 
103                 case 'e':
104                     m_e= true;
105                     break;
106 
107                 case 'f':
108                     m_fg= true;
109                     break;
110 
111                 case '?':
112                 default:
113                     break;
114             }
115         }
116     }
117 };
118 
119 
120 class ServiceExeMgr: public Service, public Opt
121 {
122 protected:
123 
log(logging::LOG_TYPE type,const std::string & str)124     void log(logging::LOG_TYPE type, const std::string &str)
125     {
126         logging::LoggingID logid(16);
127         logging::Message::Args args;
128         logging::Message message(8);
129         args.add(strerror(errno));
130         message.format(args);
131         logging::Logger logger(logid.fSubsysID);
132         logger.logMessage(type, message, logid);
133     }
134 
135 public:
ServiceExeMgr(const Opt & opt)136     ServiceExeMgr(const Opt &opt)
137      :Service("ExeMgr"), Opt(opt)
138     { }
LogErrno()139     void LogErrno() override
140     {
141         log(logging::LOG_TYPE_CRITICAL, std::string(strerror(errno)));
142     }
ParentLogChildMessage(const std::string & str)143     void ParentLogChildMessage(const std::string &str) override
144     {
145         log(logging::LOG_TYPE_INFO, str);
146     }
147     int Child() override;
Run()148     int Run()
149     {
150         return m_fg ? Child() : RunForking();
151     }
152 };
153 
154 
155 namespace
156 {
157 
158 //If any flags other than the table mode flags are set, produce output to screeen
159 const uint32_t flagsWantOutput = (0xffffffff &
160                                   ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_AUTOSWITCH &
161                                   ~execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF);
162 
163 int gDebug;
164 
165 const unsigned logDefaultMsg           = logging::M0000;
166 const unsigned logDbProfStartStatement = logging::M0028;
167 const unsigned logDbProfEndStatement   = logging::M0029;
168 const unsigned logStartSql             = logging::M0041;
169 const unsigned logEndSql               = logging::M0042;
170 const unsigned logRssTooBig            = logging::M0044;
171 const unsigned logDbProfQueryStats     = logging::M0047;
172 const unsigned logExeMgrExcpt          = logging::M0055;
173 
174 logging::Logger msgLog(16);
175 
176 typedef std::map<uint32_t, size_t> SessionMemMap_t;
177 SessionMemMap_t sessionMemMap; // track memory% usage during a query
178 std::mutex sessionMemMapMutex;
179 
180 //...The FrontEnd may establish more than 1 connection (which results in
181 //   more than 1 ExeMgr thread) per session.  These threads will share
182 //   the same CalpontSystemCatalog object for that session.  Here, we
183 //   define a std::map to track how many threads are sharing each session, so
184 //   that we know when we can safely delete a CalpontSystemCatalog object
185 //   shared by multiple threads per session.
186 typedef std::map<uint32_t, uint32_t> ThreadCntPerSessionMap_t;
187 ThreadCntPerSessionMap_t        threadCntPerSessionMap;
188 std::mutex threadCntPerSessionMapMutex;
189 
190 //This var is only accessed using thread-safe inc/dec calls
191 ActiveStatementCounter* statementsRunningCount;
192 
193 joblist::DistributedEngineComm* ec;
194 
195 auto rm = joblist::ResourceManager::instance(true);
196 
toInt(const std::string & val)197 int toInt(const std::string& val)
198 {
199     if (val.length() == 0) return -1;
200 
201     return static_cast<int>(config::Config::fromText(val));
202 }
203 
204 const std::string ExeMgr("ExeMgr1");
205 
prettyPrintMiniInfo(const std::string & in)206 const std::string prettyPrintMiniInfo(const std::string& in)
207 {
208     //1. take the std::string and tok it by '\n'
209     //2. for each part in each line calc the longest part
210     //3. padding to each longest value, output a header and the lines
211     typedef boost::tokenizer<boost::char_separator<char> > my_tokenizer;
212     boost::char_separator<char> sep1("\n");
213     my_tokenizer tok1(in, sep1);
214     std::vector<std::string> lines;
215     std::string header = "Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows";
216     const int header_parts = 10;
217     lines.push_back(header);
218 
219     for (my_tokenizer::const_iterator iter1 = tok1.begin(); iter1 != tok1.end(); ++iter1)
220     {
221         if (!iter1->empty())
222             lines.push_back(*iter1);
223     }
224 
225     std::vector<unsigned> lens;
226 
227     for (int i = 0; i < header_parts; i++)
228         lens.push_back(0);
229 
230     std::vector<std::vector<std::string> > lineparts;
231     std::vector<std::string>::iterator iter2;
232     int j;
233 
234     for (iter2 = lines.begin(), j = 0; iter2 != lines.end(); ++iter2, j++)
235     {
236         boost::char_separator<char> sep2(" ");
237         my_tokenizer tok2(*iter2, sep2);
238         int i;
239         std::vector<std::string> parts;
240         my_tokenizer::iterator iter3;
241 
242         for (iter3 = tok2.begin(), i = 0; iter3 != tok2.end(); ++iter3, i++)
243         {
244             if (i >= header_parts) break;
245 
246             std::string part(*iter3);
247 
248             if (j != 0 && i == 8)
249                 part.resize(part.size() - 3);
250 
251             assert(i < header_parts);
252 
253             if (part.size() > lens[i])
254                 lens[i] = part.size();
255 
256             parts.push_back(part);
257         }
258 
259         assert(i == header_parts);
260         lineparts.push_back(parts);
261     }
262 
263     std::ostringstream oss;
264 
265     std::vector<std::vector<std::string> >::iterator iter1 = lineparts.begin();
266     std::vector<std::vector<std::string> >::iterator end1 = lineparts.end();
267 
268     oss << "\n";
269 
270     while (iter1 != end1)
271     {
272         std::vector<std::string>::iterator iter2 = iter1->begin();
273         std::vector<std::string>::iterator end2 = iter1->end();
274         assert(distance(iter2, end2) == header_parts);
275         int i = 0;
276 
277         while (iter2 != end2)
278         {
279             assert(i < header_parts);
280             oss << std::setw(lens[i]) << std::left << *iter2 << " ";
281             ++iter2;
282             i++;
283         }
284 
285         oss << "\n";
286         ++iter1;
287     }
288 
289     return oss.str();
290 }
291 
timeNow()292 const std::string timeNow()
293 {
294     time_t outputTime = time(0);
295     struct tm ltm;
296     char buf[32];	//ctime(3) says at least 26
297     size_t len = 0;
298 #ifdef _MSC_VER
299     asctime_s(buf, 32, localtime_r(&outputTime, &ltm));
300 #else
301     asctime_r(localtime_r(&outputTime, &ltm), buf);
302 #endif
303     len = strlen(buf);
304 
305     if (len > 0) --len;
306 
307     if (buf[len] == '\n') buf[len] = 0;
308 
309     return buf;
310 }
311 
312 querytele::QueryTeleServerParms gTeleServerParms;
313 
314 class SessionThread
315 {
316 public:
317 
SessionThread(const messageqcpp::IOSocket & ios,joblist::DistributedEngineComm * ec,joblist::ResourceManager * rm)318     SessionThread(const messageqcpp::IOSocket& ios, joblist::DistributedEngineComm* ec, joblist::ResourceManager* rm) :
319         fIos(ios), fEc(ec),
320         fRm(rm),
321         fStatsRetrieved(false),
322         fTeleClient(gTeleServerParms),
323         fOamCachePtr(oam::OamCache::makeOamCache())
324     {
325     }
326 
327 private:
328 
329     messageqcpp::IOSocket fIos;
330     joblist::DistributedEngineComm* fEc;
331     joblist::ResourceManager*  fRm;
332     querystats::QueryStats fStats;
333 
334     // Variables used to store return stats
335     bool       fStatsRetrieved;
336 
337     querytele::QueryTeleClient fTeleClient;
338 
339     oam::OamCache* fOamCachePtr;	//this ptr is copyable...
340 
341     //...Reinitialize stats for start of a new query
initStats(uint32_t sessionId,std::string & sqlText)342     void initStats ( uint32_t sessionId, std::string& sqlText )
343     {
344         initMaxMemPct ( sessionId );
345 
346         fStats.reset();
347         fStats.setStartTime();
348         fStats.fSessionID = sessionId;
349         fStats.fQuery = sqlText;
350         fStatsRetrieved  = false;
351     }
352 
353     //...Get % memory usage during latest query for sesssionId.
354     //...SessionId >= 0x80000000 is system catalog query we can ignore.
getMaxMemPct(uint32_t sessionId)355     static uint64_t getMaxMemPct ( uint32_t sessionId )
356     {
357         uint64_t maxMemoryPct = 0;
358 
359         if ( sessionId < 0x80000000 )
360         {
361             std::lock_guard<std::mutex> lk(sessionMemMapMutex);
362             SessionMemMap_t::iterator mapIter =
363                 sessionMemMap.find( sessionId );
364 
365             if ( mapIter != sessionMemMap.end() )
366             {
367                 maxMemoryPct = (uint64_t)mapIter->second;
368             }
369         }
370 
371         return maxMemoryPct;
372     }
373 
374     //...Delete sessionMemMap entry for the specified session's memory % use.
375     //...SessionId >= 0x80000000 is system catalog query we can ignore.
deleteMaxMemPct(uint32_t sessionId)376     static void deleteMaxMemPct ( uint32_t sessionId )
377     {
378         if ( sessionId < 0x80000000 )
379         {
380             std::lock_guard<std::mutex> lk(sessionMemMapMutex);
381             SessionMemMap_t::iterator mapIter =
382                 sessionMemMap.find( sessionId );
383 
384             if ( mapIter != sessionMemMap.end() )
385             {
386                 sessionMemMap.erase( sessionId );
387             }
388         }
389     }
390 
391     //...Get and log query stats to specified output stream
formatQueryStats(joblist::SJLP & jl,const std::string & label,bool includeNewLine,bool vtableModeOn,bool wantExtendedStats,uint64_t rowsReturned)392     const std::string formatQueryStats (
393         joblist::SJLP&    jl,			// joblist associated with query
394         const    std::string& label, // header label to print in front of log output
395         bool     includeNewLine,//include line breaks in query stats std::string
396         bool     vtableModeOn,
397         bool     wantExtendedStats,
398         uint64_t rowsReturned)
399     {
400         std::ostringstream os;
401 
402         // Get stats if not already acquired for current query
403         if ( !fStatsRetrieved )
404         {
405             if (wantExtendedStats)
406             {
407                 //wait for the ei data to be written by another thread (brain-dead)
408                 struct timespec req = { 0, 250000 }; //250 usec
409 #ifdef _MSC_VER
410                 Sleep(20); //20ms on Windows
411 #else
412                 nanosleep(&req, 0);
413 #endif
414             }
415 
416             // Get % memory usage during current query for sessionId
417             jl->querySummary( wantExtendedStats );
418             fStats = jl->queryStats();
419             fStats.fMaxMemPct = getMaxMemPct( fStats.fSessionID );
420             fStats.fRows = rowsReturned;
421             fStatsRetrieved = true;
422         }
423 
424         std::string queryMode;
425         queryMode = (vtableModeOn ? "Distributed" : "Standard");
426 
427         // Log stats to specified output stream
428         os << label <<
429            ": MaxMemPct-"     << fStats.fMaxMemPct <<
430            "; NumTempFiles-"  << fStats.fNumFiles  <<
431            "; TempFileSpace-" << roundBytes(fStats.fFileBytes) <<
432            "; ApproxPhyI/O-"  << fStats.fPhyIO     <<
433            "; CacheI/O-"      << fStats.fCacheIO   <<
434            "; BlocksTouched-" << fStats.fMsgRcvCnt;
435 
436         if ( includeNewLine )
437             os << std::endl << "      ";	// insert line break
438         else
439             os << "; ";				// continue without line break
440 
441         os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped <<
442            "; MsgBytesIn-"    << roundBytes(fStats.fMsgBytesIn) <<
443            "; MsgBytesOut-"   << roundBytes(fStats.fMsgBytesOut) <<
444            "; Mode-"          << queryMode;
445 
446         return os.str();
447     }
448 
449     //...Increment the number of threads using the specified sessionId
incThreadCntPerSession(uint32_t sessionId)450     static void incThreadCntPerSession(uint32_t sessionId)
451     {
452         std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex);
453         ThreadCntPerSessionMap_t::iterator mapIter =
454             threadCntPerSessionMap.find(sessionId);
455 
456         if (mapIter == threadCntPerSessionMap.end())
457             threadCntPerSessionMap.insert(ThreadCntPerSessionMap_t::value_type(sessionId, 1));
458         else
459             mapIter->second++;
460     }
461 
462     //...Decrement the number of threads using the specified sessionId.
463     //...When the thread count for a sessionId reaches 0, the corresponding
464     //...CalpontSystemCatalog objects are deleted.
465     //...The user query and its associated catalog query have a different
466     //...session Id where the highest bit is flipped.
467     //...The object with id(sessionId | 0x80000000) cannot be removed before
468     //...user query session completes because the sysdata may be used for
469     //...debugging/stats purpose, such as result graph, etc.
decThreadCntPerSession(uint32_t sessionId)470     static void decThreadCntPerSession(uint32_t sessionId)
471     {
472         std::lock_guard<std::mutex> lk(threadCntPerSessionMapMutex);
473         ThreadCntPerSessionMap_t::iterator mapIter =
474             threadCntPerSessionMap.find(sessionId);
475 
476         if (mapIter != threadCntPerSessionMap.end())
477         {
478             if (--mapIter->second == 0)
479             {
480                 threadCntPerSessionMap.erase(mapIter);
481                 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
482                 execplan::CalpontSystemCatalog::removeCalpontSystemCatalog((sessionId ^ 0x80000000));
483             }
484         }
485     }
486 
487     //...Init sessionMemMap entry for specified session to 0 memory %.
488     //...SessionId >= 0x80000000 is system catalog query we can ignore.
initMaxMemPct(uint32_t sessionId)489     static void initMaxMemPct ( uint32_t sessionId )
490     {
491         if ( sessionId < 0x80000000 )
492         {
493             // std::cout << "Setting pct to 0 for session " << sessionId << std::endl;
494             std::lock_guard<std::mutex> lk(sessionMemMapMutex);
495             SessionMemMap_t::iterator mapIter = sessionMemMap.find( sessionId );
496 
497             if ( mapIter == sessionMemMap.end() )
498             {
499                 sessionMemMap[sessionId] = 0;
500             }
501             else
502             {
503                 mapIter->second = 0;
504             }
505         }
506     }
507 
508     //... Round off to human readable format (KB, MB, or GB).
roundBytes(uint64_t value) const509     const std::string roundBytes(uint64_t value) const
510     {
511         const char* units[] = {"B", "KB", "MB", "GB", "TB"};
512         uint64_t i = 0, up = 0;
513         uint64_t roundedValue = value;
514 
515         while (roundedValue > 1024 && i < 4)
516         {
517             up = (roundedValue & 512);
518             roundedValue /= 1024;
519             i++;
520         }
521 
522         if (up)
523             roundedValue++;
524 
525         std::ostringstream oss;
526         oss << roundedValue << units[i];
527         return oss.str();
528     }
529 
530     //...Round off to nearest (1024*1024) MB
roundMB(uint64_t value) const531     uint64_t roundMB ( uint64_t value ) const
532     {
533         uint64_t roundedValue = value >> 20;
534 
535         if (value & 524288)
536             roundedValue++;
537 
538         return roundedValue;
539     }
540 
setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec & parms)541     void setRMParms ( const execplan::CalpontSelectExecutionPlan::RMParmVec& parms )
542     {
543         for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin();
544                 it != parms.end(); ++it)
545         {
546             switch (it->id)
547             {
548                 case execplan::PMSMALLSIDEMEMORY:
549                 {
550                     fRm->addHJPmMaxSmallSideMap(it->sessionId, it->value);
551                     break;
552                 }
553 
554                 case execplan::UMSMALLSIDEMEMORY:
555                 {
556                     fRm->addHJUmMaxSmallSideMap(it->sessionId, it->value);
557                     break;
558                 }
559 
560                 default:
561                     ;
562             }
563         }
564     }
565 
buildSysCache(const execplan::CalpontSelectExecutionPlan & csep,boost::shared_ptr<execplan::CalpontSystemCatalog> csc)566     void buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, boost::shared_ptr<execplan::CalpontSystemCatalog> csc)
567     {
568         const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap();
569         execplan::CalpontSelectExecutionPlan::ColumnMap::const_iterator it;
570         std::string schemaName;
571 
572         for (it = colMap.begin(); it != colMap.end(); ++it)
573         {
574             const auto sc = dynamic_cast<execplan::SimpleColumn*>((it->second).get());
575 
576             if (sc)
577             {
578                 schemaName = sc->schemaName();
579 
580                 // only the first time a schema is got will actually query
581                 // system catalog. System catalog keeps a schema name std::map.
582                 // if a schema exists, the call getSchemaInfo returns without
583                 // doing anything.
584                 if (!schemaName.empty())
585                     csc->getSchemaInfo(schemaName);
586             }
587         }
588 
589         execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt;
590 
591         for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++ subIt)
592         {
593             buildSysCache(*(dynamic_cast<execplan::CalpontSelectExecutionPlan*>(subIt->get())), csc);
594         }
595     }
596 
597 public:
598 
operator ()()599     void operator()()
600     {
601         messageqcpp::ByteStream bs, inbs;
602         execplan::CalpontSelectExecutionPlan csep;
603         csep.sessionID(0);
604         joblist::SJLP jl;
605         bool incSessionThreadCnt = true;
606         std::mutex jlMutex;
607         std::condition_variable jlCleanupDone;
608         int destructing = 0;
609 
610         bool selfJoin = false;
611         bool tryTuples = false;
612         bool usingTuples = false;
613         bool stmtCounted = false;
614 
615         try
616         {
617             for (;;)
618             {
619                 selfJoin = false;
620                 tryTuples = false;
621                 usingTuples = false;
622 
623                 if (jl)
624                 {
625                     // puts the real destruction in another thread to avoid
626                     // making the whole session wait.  It can take several seconds.
627                     std::unique_lock<std::mutex> scoped(jlMutex);
628                     destructing++;
629                     std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] {
630                         std::unique_lock<std::mutex> scoped(jlMutex);
631                         const_cast<joblist::SJLP &>(jl).reset();    // this happens second; does real destruction
632                         if (--destructing == 0)
633                             jlCleanupDone.notify_one();
634                     });
635                     jl.reset();     // this runs first
636                     bgdtor.detach();
637                 }
638 
639                 bs = fIos.read();
640 
641                 if (bs.length() == 0)
642                 {
643                     if (gDebug > 1 || (gDebug && !csep.isInternal()))
644                         std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
645 
646                     // connection closed by client
647                     fIos.close();
648                     break;
649                 }
650                 else if (bs.length() < 4) //Not a CalpontSelectExecutionPlan
651                 {
652                     if (gDebug)
653                         std::cout << "### Got a not-a-plan for session id " << csep.sessionID()
654                              << " with length " << bs.length() <<  std::endl;
655 
656                     fIos.close();
657                     break;
658                 }
659                 else if (bs.length() == 4) //possible tuple flag
660                 {
661                     messageqcpp::ByteStream::quadbyte qb;
662                     bs >> qb;
663 
664                     if (qb == 4) //UM wants new tuple i/f
665                     {
666                         if (gDebug)
667                             std::cout << "### UM wants tuples" << std::endl;
668 
669                         tryTuples = true;
670                         // now wait for the CSEP...
671                         bs = fIos.read();
672                     }
673                     else if (qb == 5) //somebody wants stats
674                     {
675                         bs.restart();
676                         qb = statementsRunningCount->cur();
677                         bs << qb;
678                         qb = statementsRunningCount->waiting();
679                         bs << qb;
680                         fIos.write(bs);
681                         fIos.close();
682                         break;
683                     }
684                     else
685                     {
686                         if (gDebug)
687                             std::cout << "### Got a not-a-plan value " << qb << std::endl;
688 
689                         fIos.close();
690                         break;
691                     }
692                 }
693 
694 new_plan:
695                 csep.unserialize(bs);
696 
697                 querytele::QueryTeleStats qts;
698 
699                 if ( !csep.isInternal() &&
700                         (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT") )
701                 {
702                     qts.query_uuid = csep.uuid();
703                     qts.msg_type = querytele::QueryTeleStats::QT_START;
704                     qts.start_time = querytele::QueryTeleClient::timeNowms();
705                     qts.query = csep.data();
706                     qts.session_id = csep.sessionID();
707                     qts.query_type = csep.queryType();
708                     qts.system_name = fOamCachePtr->getSystemName();
709                     qts.module_name = fOamCachePtr->getModuleName();
710                     qts.local_query = csep.localQuery();
711                     qts.schema_name = csep.schemaName();
712                     fTeleClient.postQueryTele(qts);
713                 }
714 
715                 if (gDebug > 1 || (gDebug && !csep.isInternal()))
716                     std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
717 
718                 setRMParms(csep.rmParms());
719                 // Re-establish lost PP connections.
720                 if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
721                 {
722                     fEc->Setup();
723                 }
724                 // @bug 1021. try to get schema cache for a come in query.
725                 // skip system catalog queries.
726                 if (!csep.isInternal())
727                 {
728                     boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
729                       execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
730                     buildSysCache(csep, csc);
731                 }
732 
733                 // As soon as we have a session id for this thread, update the
734                 // thread count per session; only do this once per thread.
735                 // Mask 0x80000000 is for associate user query and csc query
736                 if (incSessionThreadCnt)
737                 {
738                     incThreadCntPerSession( csep.sessionID() | 0x80000000 );
739                     incSessionThreadCnt = false;
740                 }
741 
742                 bool needDbProfEndStatementMsg = false;
743                 logging::Message::Args args;
744                 std::string sqlText = csep.data();
745                 logging::LoggingID li(16, csep.sessionID(), csep.txnID());
746 
747                 // Initialize stats for this query, including
748                 // init sessionMemMap entry for this session to 0 memory %.
749                 // We will need this later for traceOn() or if we receive a
750                 // table request with qb=3 (see below). This is also recorded
751                 // as query start time.
752                 initStats( csep.sessionID(), sqlText );
753                 fStats.fQueryType = csep.queryType();
754 
755                 // Log start and end statement if tracing is enabled.  Keep in
756                 // mind the trace flag won't be set for system catalog queries.
757                 if (csep.traceOn())
758                 {
759                     args.reset();
760                     args.add((int)csep.statementID());
761                     args.add((int)csep.verID().currentScn);
762                     args.add(sqlText);
763                     msgLog.logMessage(logging::LOG_TYPE_DEBUG,
764                                       logDbProfStartStatement,
765                                       args,
766                                       li);
767                     needDbProfEndStatementMsg = true;
768                 }
769 
770                 //Don't log subsequent self joins after first.
771                 if (selfJoin)
772                     sqlText = "";
773 
774                 std::ostringstream oss;
775                 oss << sqlText << "; |" << csep.schemaName() << "|";
776                 logging::SQLLogger sqlLog(oss.str(), li);
777 
778                 statementsRunningCount->incr(stmtCounted);
779 
780                 if (tryTuples)
781                 {
782                     try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
783                     {
784                         std::string emsg("NOERROR");
785                         messageqcpp::ByteStream emsgBs;
786                         messageqcpp::ByteStream::quadbyte tflg = 0;
787                         jl = joblist::JobListFactory::makeJobList(&csep, fRm, true, true);
788                         // assign query stats
789                         jl->queryStats(fStats);
790 
791                         messageqcpp::ByteStream tbs;
792 
793                         if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0))
794                         {
795                             usingTuples = true;
796 
797                             //Tell the FE that we're sending tuples back, not TableBands
798                             tbs << tflg;
799                             fIos.write(tbs);
800                             emsgBs.reset();
801                             emsgBs << emsg;
802                             fIos.write(emsgBs);
803                             auto tjlp = dynamic_cast<joblist::TupleJobList*>(jl.get());
804                             assert(tjlp);
805                             tbs.restart();
806                             tbs << tjlp->getOutputRowGroup();
807                             fIos.write(tbs);
808                         }
809                         else
810                         {
811                             statementsRunningCount->decr(stmtCounted);
812                             tflg = jl->status();
813                             emsg = jl->errMsg();
814                             tbs << tflg;
815                             fIos.write(tbs);
816                             emsgBs.reset();
817                             emsgBs << emsg;
818                             fIos.write(emsgBs);
819                             std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl;
820                             continue;
821                         }
822                     }
823                     catch (std::exception& ex)
824                     {
825                         std::ostringstream errMsg;
826                         errMsg << "ExeMgr: error writing makeJoblist "
827                                "response; " <<  ex.what();
828                         throw std::runtime_error( errMsg.str() );
829                     }
830                     catch (...)
831                     {
832                         std::ostringstream errMsg;
833                         errMsg << "ExeMgr: unknown error writing makeJoblist "
834                                "response; ";
835                         throw std::runtime_error( errMsg.str() );
836                     }
837 
838                     if (!usingTuples)
839                     {
840                         if (gDebug)
841                             std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl;
842                     }
843                     else
844                     {
845                         if (gDebug)
846                             std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
847                     }
848                 }
849                 else
850                 {
851                     usingTuples = false;
852                     jl = joblist::JobListFactory::makeJobList(&csep, fRm, false, true);
853 
854                     if (jl->status() == 0)
855                     {
856                         std::string emsg;
857 
858                         if (jl->putEngineComm(fEc) != 0)
859                             throw std::runtime_error(jl->errMsg());
860                     }
861                     else
862                     {
863                         throw std::runtime_error("ExeMgr: could not build a JobList!");
864                     }
865                 }
866 
867 
868                 jl->doQuery();
869 
870                 execplan::CalpontSystemCatalog::OID tableOID;
871                 bool swallowRows = false;
872                 joblist::DeliveredTableMap tm;
873                 uint64_t totalBytesSent = 0;
874                 uint64_t totalRowCount = 0;
875 
876                 // Project each table as the FE asks for it
877                 for (;;)
878                 {
879                     bs = fIos.read();
880 
881                     if (bs.length() == 0)
882                     {
883                         if (gDebug > 1 || (gDebug && !csep.isInternal()))
884                             std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
885 
886                         break;
887                     }
888 
889                     if (gDebug && bs.length() > 4)
890                         std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " <<
891                              bs.length() << std::endl;
892 
893                     //TODO: Holy crud! Can this be right?
894                     //@bug 1444 Yes, if there is a self-join
895                     if (bs.length() > 4)
896                     {
897                         selfJoin = true;
898                         statementsRunningCount->decr(stmtCounted);
899                         goto new_plan;
900                     }
901 
902                     assert(bs.length() == 4);
903 
904                     messageqcpp::ByteStream::quadbyte qb;
905 
906                     try // @bug2244: try/catch around fIos.write() calls responding to qb command
907                     {
908                         bs >> qb;
909 
910                         if (gDebug > 1 || (gDebug && !csep.isInternal()))
911                             std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl;
912 
913                         if (qb == 0)
914                         {
915                             // No more tables, query is done
916                             break;
917                         }
918                         else if (qb == 1)
919                         {
920                             // super-secret flag indicating that the UM is going to scarf down all the rows in the
921                             //   query.
922                             swallowRows = true;
923                             tm = jl->deliveredTables();
924                             continue;
925                         }
926                         else if (qb == 2)
927                         {
928                             // UM just wants any table
929                             assert(swallowRows);
930                             auto iter = tm.begin();
931 
932                             if (iter == tm.end())
933                             {
934                                 if (gDebug > 1 || (gDebug && !csep.isInternal()))
935                                     std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl;
936 
937                                 bs.restart();
938                                 bs << (messageqcpp::ByteStream::byte)1;
939                                 fIos.write(bs);
940                                 continue;
941                             }
942 
943                             tableOID = iter->first;
944                         }
945                         else if (qb == 3) //special option-UM wants job stats std::string
946                         {
947                             std::string statsString;
948 
949                             //Log stats std::string to be sent back to front end
950                             statsString = formatQueryStats(
951                                               jl,
952                                               "Query Stats",
953                                               false,
954                                               !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
955                                               (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
956                                               totalRowCount
957                                           );
958 
959                             bs.restart();
960                             bs << statsString;
961 
962                             if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
963                             {
964                                 bs << jl->extendedInfo();
965                                 bs << prettyPrintMiniInfo(jl->miniInfo());
966                             }
967                             else
968                             {
969                                 std::string empty;
970                                 bs << empty;
971                                 bs << empty;
972                             }
973 
974                             // send stats to connector for inserting to the querystats table
975                             fStats.serialize(bs);
976                             fIos.write(bs);
977                             continue;
978                         }
979                         // for table mode handling
980                         else if (qb == 4)
981                         {
982                             statementsRunningCount->decr(stmtCounted);
983                             bs = fIos.read();
984                             goto new_plan;
985                         }
986                         else // (qb > 3)
987                         {
988                             //Return table bands for the requested tableOID
989                             tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
990                         }
991                     }
992                     catch (std::exception& ex)
993                     {
994                         std::ostringstream errMsg;
995                         errMsg << "ExeMgr: error writing qb response "
996                                "for qb cmd " << qb <<
997                                "; " <<  ex.what();
998                         throw std::runtime_error( errMsg.str() );
999                     }
1000                     catch (...)
1001                     {
1002                         std::ostringstream errMsg;
1003                         errMsg << "ExeMgr: unknown error writing qb response "
1004                                "for qb cmd " << qb;
1005                         throw std::runtime_error( errMsg.str() );
1006                     }
1007 
1008                     if (swallowRows) tm.erase(tableOID);
1009 
1010                     FEMsgHandler msgHandler(jl, &fIos);
1011 
1012                     if (tableOID == 100)
1013                         msgHandler.start();
1014 
1015                     //...Loop serializing table bands projected for the tableOID
1016                     for (;;)
1017                     {
1018                         uint32_t rowCount;
1019 
1020                         rowCount = jl->projectTable(tableOID, bs);
1021 
1022                         msgHandler.stop();
1023 
1024                         if (jl->status())
1025                         {
1026                             const auto errInfo = logging::IDBErrorInfo::instance();
1027 
1028                             if (jl->errMsg().length() != 0)
1029                                 bs << jl->errMsg();
1030                             else
1031                                 bs << errInfo->errorMsg(jl->status());
1032                         }
1033 
1034                         try // @bug2244: try/catch around fIos.write() calls projecting rows
1035                         {
1036                             if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
1037                             {
1038                                 // Skip the write to the front end until the last empty band.  Used to time queries
1039                                 // through without any front end waiting.
1040                                 if (tableOID < 3000 || rowCount == 0)
1041                                     fIos.write(bs);
1042                             }
1043                             else
1044                             {
1045                                 fIos.write(bs);
1046                             }
1047                         }
1048                         catch (std::exception& ex)
1049                         {
1050                             msgHandler.stop();
1051                             std::ostringstream errMsg;
1052                             errMsg << "ExeMgr: error projecting rows "
1053                                    "for tableOID: " << tableOID <<
1054                                    "; rowCnt: " << rowCount <<
1055                                    "; prevTotRowCnt: " << totalRowCount <<
1056                                    "; " << ex.what();
1057                             jl->abort();
1058 
1059                             while (rowCount)
1060                                 rowCount = jl->projectTable(tableOID, bs);
1061 
1062                             if (tableOID == 100 && msgHandler.aborted())
1063                             {
1064                                 /* TODO: modularize the cleanup code, as well as
1065                                  * the rest of this fcn */
1066 
1067                                 decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1068                                 statementsRunningCount->decr(stmtCounted);
1069                                 fIos.close();
1070                                 return;
1071                             }
1072 
1073                             //std::cout << "connection drop\n";
1074                             throw std::runtime_error( errMsg.str() );
1075                         }
1076                         catch (...)
1077                         {
1078                             std::ostringstream errMsg;
1079                             msgHandler.stop();
1080                             errMsg << "ExeMgr: unknown error projecting rows "
1081                                    "for tableOID: " <<
1082                                    tableOID << "; rowCnt: " << rowCount <<
1083                                    "; prevTotRowCnt: " << totalRowCount;
1084                             jl->abort();
1085 
1086                             while (rowCount)
1087                                 rowCount = jl->projectTable(tableOID, bs);
1088 
1089                             throw std::runtime_error( errMsg.str() );
1090                         }
1091 
1092                         totalRowCount += rowCount;
1093                         totalBytesSent += bs.length();
1094 
1095                         if (rowCount == 0)
1096                         {
1097                             msgHandler.stop();
1098                             // No more bands, table is done
1099                             bs.reset();
1100 
1101                             // @bug 2083 decr active statement count here for table mode.
1102                             if (!usingTuples)
1103                                 statementsRunningCount->decr(stmtCounted);
1104 
1105                             break;
1106                         }
1107                         else
1108                         {
1109                             bs.restart();
1110                         }
1111                     } // End of loop to project and serialize table bands for a table
1112                 } // End of loop to process tables
1113 
1114                 // @bug 828
1115                 if (csep.traceOn())
1116                     jl->graph(csep.sessionID());
1117 
1118                 if (needDbProfEndStatementMsg)
1119                 {
1120                     std::string ss;
1121                     std::ostringstream prefix;
1122                     prefix << "ses:" << csep.sessionID() << " Query Totals";
1123 
1124                     //Log stats std::string to standard out
1125                     ss = formatQueryStats(
1126                              jl,
1127                              prefix.str(),
1128                              true,
1129                              !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
1130                              (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
1131                              totalRowCount);
1132                     //@Bug 1306. Added timing info for real time tracking.
1133                     std::cout << ss << " at " << timeNow() << std::endl;
1134 
1135                     // log query stats to debug log file
1136                     args.reset();
1137                     args.add((int)csep.statementID());
1138                     args.add(fStats.fMaxMemPct);
1139                     args.add(fStats.fNumFiles);
1140                     args.add(fStats.fFileBytes); // log raw byte count instead of MB
1141                     args.add(fStats.fPhyIO);
1142                     args.add(fStats.fCacheIO);
1143                     args.add(fStats.fMsgRcvCnt);
1144                     args.add(fStats.fMsgBytesIn);
1145                     args.add(fStats.fMsgBytesOut);
1146                     args.add(fStats.fCPBlocksSkipped);
1147                     msgLog.logMessage(logging::LOG_TYPE_DEBUG,
1148                                       logDbProfQueryStats,
1149                                       args,
1150                                       li);
1151                     //@bug 1327
1152                     deleteMaxMemPct( csep.sessionID() );
1153                     // Calling reset here, will cause joblist destructor to be
1154                     // called, which "joins" the threads.  We need to do that
1155                     // here to make sure all syslogging from all the threads
1156                     // are complete; and that our logDbProfEndStatement will
1157                     // appear "last" in the syslog for this SQL statement.
1158                     // puts the real destruction in another thread to avoid
1159                     // making the whole session wait.  It can take several seconds.
1160                     int stmtID = csep.statementID();
1161                     std::unique_lock<std::mutex> scoped(jlMutex);
1162                     // C7's compiler complains about the msgLog capture here
1163                     // msgLog is global scope, and passed by copy, so, unclear
1164                     // what the warning is about.
1165                     destructing++;
1166                     std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing] {
1167                         std::unique_lock<std::mutex> scoped(jlMutex);
1168                         const_cast<joblist::SJLP &>(jl).reset();    // this happens second; does real destruction
1169                         logging::Message::Args args;
1170                         args.add(stmtID);
1171                         msgLog.logMessage(logging::LOG_TYPE_DEBUG,
1172                                           logDbProfEndStatement,
1173                                           args,
1174                                           li);
1175                         if (--destructing == 0)
1176                             jlCleanupDone.notify_one();
1177                     });
1178                     jl.reset();   // this happens first
1179                     bgdtor.detach();
1180                 }
1181                 else
1182                     // delete sessionMemMap entry for this session's memory % use
1183                     deleteMaxMemPct( csep.sessionID() );
1184 
1185                 std::string endtime(timeNow());
1186 
1187                 if ((csep.traceFlags() & flagsWantOutput) && (csep.sessionID() < 0x80000000))
1188                 {
1189                     std::cout << "For session " << csep.sessionID() << ": " <<
1190                          totalBytesSent <<
1191                          " bytes sent back at " << endtime << std::endl;
1192 
1193                     // @bug 663 - Implemented caltraceon(16) to replace the
1194                     // $FIFO_SINK compiler definition in pColStep.
1195                     // This option consumes rows in the project steps.
1196                     if (csep.traceFlags() &
1197                       execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
1198                     {
1199                         std::cout << std::endl;
1200                         std::cout << "**** No data returned to DM.  Rows consumed "
1201                              "in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
1202                              " ****" << std::endl;
1203                         std::cout << std::endl;
1204                     }
1205                     else if (csep.traceFlags() &
1206                       execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
1207                     {
1208                         std::cout << std::endl;
1209                         std::cout << "**** No data returned to DM - caltrace(8) is "
1210                              "on (SWALLOW_ROWS_EXEMGR). ****" << std::endl;
1211                         std::cout << std::endl;
1212                     }
1213                 }
1214 
1215                 statementsRunningCount->decr(stmtCounted);
1216 
1217                 if ( !csep.isInternal() &&
1218                         (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT") )
1219                 {
1220                     qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
1221                     qts.max_mem_pct = fStats.fMaxMemPct;
1222                     qts.num_files = fStats.fNumFiles;
1223                     qts.phy_io = fStats.fPhyIO;
1224                     qts.cache_io = fStats.fCacheIO;
1225                     qts.msg_rcv_cnt = fStats.fMsgRcvCnt;
1226                     qts.cp_blocks_skipped = fStats.fCPBlocksSkipped;
1227                     qts.msg_bytes_in = fStats.fMsgBytesIn;
1228                     qts.msg_bytes_out = fStats.fMsgBytesOut;
1229                     qts.rows = totalRowCount;
1230                     qts.end_time = querytele::QueryTeleClient::timeNowms();
1231                     qts.session_id = csep.sessionID();
1232                     qts.query_type = csep.queryType();
1233                     qts.query = csep.data();
1234                     qts.system_name = fOamCachePtr->getSystemName();
1235                     qts.module_name = fOamCachePtr->getModuleName();
1236                     qts.local_query = csep.localQuery();
1237                     fTeleClient.postQueryTele(qts);
1238                 }
1239             }
1240 
1241             // Release CSC object (for sessionID) that was added by makeJobList()
1242             // Mask 0x80000000 is for associate user query and csc query.
1243             // (actual joblist destruction happens at the top of this loop)
1244             decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1245         }
1246         catch (std::exception& ex)
1247         {
1248             decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1249             statementsRunningCount->decr(stmtCounted);
1250             std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
1251             logging::Message::Args args;
1252             logging::LoggingID li(16, csep.sessionID(), csep.txnID());
1253             args.add(ex.what());
1254             msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
1255             fIos.close();
1256         }
1257         catch (...)
1258         {
1259             decThreadCntPerSession( csep.sessionID() | 0x80000000 );
1260             statementsRunningCount->decr(stmtCounted);
1261             std::cerr << "### Exception caught!" << std::endl;
1262             logging::Message::Args args;
1263             logging::LoggingID li(16, csep.sessionID(), csep.txnID());
1264             args.add("ExeMgr caught unknown exception");
1265             msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
1266             fIos.close();
1267         }
1268 
1269         // make sure we don't leave scope while joblists are being destroyed
1270         std::unique_lock<std::mutex> scoped(jlMutex);
1271         while (destructing > 0)
1272             jlCleanupDone.wait(scoped);
1273     }
1274 };
1275 
1276 
1277 class RssMonFcn : public utils::MonitorProcMem
1278 {
1279 public:
RssMonFcn(size_t maxPct,int pauseSeconds)1280     RssMonFcn(size_t maxPct, int pauseSeconds) :
1281         MonitorProcMem(maxPct, 0, 21, pauseSeconds) {}
1282 
1283     /*virtual*/
operator ()() const1284     void operator()() const
1285     {
1286         for (;;)
1287         {
1288             size_t rssMb = rss();
1289             size_t pct = rssMb * 100 / fMemTotal;
1290 
1291             if (pct > fMaxPct)
1292             {
1293                 if (fMaxPct >= 95)
1294                 {
1295                     std::cerr << "Too much memory allocated!" << std::endl;
1296                     logging::Message::Args args;
1297                     args.add((int)pct);
1298                     args.add((int)fMaxPct);
1299                     msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logRssTooBig, args, logging::LoggingID(16));
1300                     exit(1);
1301                 }
1302 
1303                 if (statementsRunningCount->cur() == 0)
1304                 {
1305                     std::cerr << "Too much memory allocated!" << std::endl;
1306                     logging::Message::Args args;
1307                     args.add((int)pct);
1308                     args.add((int)fMaxPct);
1309                     msgLog.logMessage(logging::LOG_TYPE_WARNING, logRssTooBig, args, logging::LoggingID(16));
1310                     exit(1);
1311                 }
1312 
1313                 std::cerr << "Too much memory allocated, but stmts running" << std::endl;
1314             }
1315 
1316             // Update sessionMemMap entries lower than current mem % use
1317             {
1318                 std::lock_guard<std::mutex> lk(sessionMemMapMutex);
1319 
1320                 for (SessionMemMap_t::iterator mapIter = sessionMemMap.begin();
1321                     mapIter != sessionMemMap.end();
1322                     ++mapIter)
1323                 {
1324                     if (pct > mapIter->second)
1325                     {
1326                         mapIter->second = pct;
1327                     }
1328                 }
1329 
1330             }
1331 
1332             pause_();
1333         }
1334     }
1335 };
1336 
1337 #ifdef _MSC_VER
exit_(int)1338 void exit_(int)
1339 {
1340     exit(0);
1341 }
1342 #endif
1343 
added_a_pm(int)1344 void added_a_pm(int)
1345 {
1346     logging::LoggingID logid(21, 0, 0);
1347     logging::Message::Args args1;
1348     logging::Message msg(1);
1349     args1.add("exeMgr caught SIGHUP. Resetting connections");
1350     msg.format( args1 );
1351     std::cout << msg.msg().c_str() << std::endl;
1352     logging::Logger logger(logid.fSubsysID);
1353     logger.logMessage(logging::LOG_TYPE_DEBUG, msg, logid);
1354 
1355     if (ec)
1356     {
1357         oam::OamCache* oamCache = oam::OamCache::makeOamCache();
1358         oamCache->forceReload();
1359         ec->Setup();
1360     }
1361 }
1362 
printTotalUmMemory(int sig)1363 void printTotalUmMemory(int sig)
1364 {
1365     int64_t num = rm->availableMemory();
1366     std::cout << "Total UM memory available: " << num << std::endl;
1367 }
1368 
setupSignalHandlers()1369 void setupSignalHandlers()
1370 {
1371 #ifdef _MSC_VER
1372     signal(SIGINT, exit_);
1373     signal(SIGTERM, exit_);
1374 #else
1375     struct sigaction ign;
1376 
1377     memset(&ign, 0, sizeof(ign));
1378     ign.sa_handler = SIG_IGN;
1379 
1380     sigaction(SIGPIPE, &ign, 0);
1381 
1382     memset(&ign, 0, sizeof(ign));
1383     ign.sa_handler = added_a_pm;
1384     sigaction(SIGHUP, &ign, 0);
1385     ign.sa_handler = printTotalUmMemory;
1386     sigaction(SIGUSR1, &ign, 0);
1387     memset(&ign, 0, sizeof(ign));
1388     ign.sa_handler = fatalHandler;
1389     sigaction(SIGSEGV, &ign, 0);
1390     sigaction(SIGABRT, &ign, 0);
1391     sigaction(SIGFPE, &ign, 0);
1392 #endif
1393 }
1394 
setupCwd(joblist::ResourceManager * rm)1395 int8_t setupCwd(joblist::ResourceManager* rm)
1396 {
1397     std::string workdir = rm->getScWorkingDir();
1398     int8_t rc = chdir(workdir.c_str());
1399 
1400     if (rc < 0 || access(".", W_OK) != 0)
1401         rc = chdir("/tmp");
1402 
1403     return (rc < 0) ? -5 : rc;
1404 }
1405 
startRssMon(size_t maxPct,int pauseSeconds)1406 void startRssMon(size_t maxPct, int pauseSeconds)
1407 {
1408     new boost::thread(RssMonFcn(maxPct, pauseSeconds));
1409 }
1410 
setupResources()1411 int setupResources()
1412 {
1413 #ifdef _MSC_VER
1414     //FIXME:
1415 #else
1416     struct rlimit rlim;
1417 
1418     if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
1419     {
1420         return -1;
1421     }
1422 
1423     rlim.rlim_cur = rlim.rlim_max = 65536;
1424 
1425     if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
1426     {
1427         return -2;
1428     }
1429 
1430     if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
1431     {
1432         return -3;
1433     }
1434 
1435     if (rlim.rlim_cur != 65536)
1436     {
1437         return -4;
1438     }
1439 
1440 #endif
1441     return 0;
1442 }
1443 
1444 }
1445 
cleanTempDir()1446 void cleanTempDir()
1447 {
1448   using TempDirPurpose = config::Config::TempDirPurpose;
1449   struct Dirs
1450   {
1451     std::string section;
1452     std::string allowed;
1453     TempDirPurpose purpose;
1454   };
1455   std::vector<Dirs> dirs{
1456       {
1457           "HashJoin",
1458           "AllowDiskBasedJoin",
1459           TempDirPurpose::Joins
1460       },
1461       {
1462           "RowAggregation",
1463           "AllowDiskBasedAggregation",
1464           TempDirPurpose::Aggregates
1465       }
1466   };
1467   const auto config = config::Config::makeConfig();
1468 
1469   for (const auto& dir : dirs)
1470   {
1471     std::string allowStr = config->getConfig(dir.section, dir.allowed);
1472     bool allow = (allowStr == "Y" || allowStr == "y");
1473 
1474     std::string tmpPrefix = config->getTempFileDir(dir.purpose);
1475 
1476     if (allow && tmpPrefix.empty())
1477     {
1478       std::cerr << "Empty tmp directory name for " << dir.section << std::endl;
1479       logging::LoggingID logid(16, 0, 0);
1480       logging::Message::Args args;
1481       logging::Message message(8);
1482       args.add("Empty tmp directory name for:");
1483       args.add(dir.section);
1484       message.format(args);
1485       logging::Logger logger(logid.fSubsysID);
1486       logger.logMessage(logging::LOG_TYPE_CRITICAL, message, logid);
1487     }
1488 
1489     tmpPrefix += "/";
1490 
1491     idbassert(tmpPrefix != "/");
1492 
1493     /* This is quite scary as ExeMgr usually runs as root */
1494     try
1495     {
1496       if (allow)
1497       {
1498         boost::filesystem::remove_all(tmpPrefix);
1499       }
1500       boost::filesystem::create_directories(tmpPrefix);
1501     }
1502     catch (const std::exception &ex)
1503     {
1504       std::cerr << ex.what() << std::endl;
1505       logging::LoggingID logid(16, 0, 0);
1506       logging::Message::Args args;
1507       logging::Message message(8);
1508       args.add("Exception whilst cleaning tmpdir: ");
1509       args.add(ex.what());
1510       message.format(args);
1511       logging::Logger logger(logid.fSubsysID);
1512       logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
1513     }
1514     catch (...)
1515     {
1516       std::cerr << "Caught unknown exception during tmpdir cleanup"
1517                 << std::endl;
1518       logging::LoggingID logid(16, 0, 0);
1519       logging::Message::Args args;
1520       logging::Message message(8);
1521       args.add("Unknown exception whilst cleaning tmpdir");
1522       message.format(args);
1523       logging::Logger logger(logid.fSubsysID);
1524       logger.logMessage(logging::LOG_TYPE_WARNING, message, logid);
1525     }
1526   }
1527 }
1528 
1529 
Child()1530 int ServiceExeMgr::Child()
1531 {
1532 
1533     gDebug= m_debug;
1534 
1535 #ifdef _MSC_VER
1536     //FIXME:
1537 #else
1538 
1539     // Make sure CSC thinks it's on a UM or else bucket reuse stuff below will stall
1540     if (!m_e)
1541         setenv("CALPONT_CSC_IDENT", "um", 1);
1542 
1543 #endif
1544     setupSignalHandlers();
1545     int err = 0;
1546     if (!m_debug)
1547         err = setupResources();
1548     std::string errMsg;
1549 
1550     switch (err)
1551     {
1552         case -1:
1553         case -3:
1554             errMsg = "Error getting file limits, please see non-root install documentation";
1555             break;
1556 
1557         case -2:
1558             errMsg = "Error setting file limits, please see non-root install documentation";
1559             break;
1560 
1561         case -4:
1562             errMsg = "Could not install file limits to required value, please see non-root install documentation";
1563             break;
1564 
1565         default:
1566             errMsg = "Couldn't change working directory or unknown error";
1567             break;
1568     }
1569 
1570     err = setupCwd(rm);
1571 
1572     if (err < 0)
1573     {
1574         oam::Oam oam;
1575         logging::Message::Args args;
1576         logging::Message message;
1577         args.add( errMsg );
1578         message.format(args);
1579         logging::LoggingID lid(16);
1580         logging::MessageLog ml(lid);
1581         ml.logCriticalMessage( message );
1582         std::cerr << errMsg << std::endl;
1583 
1584         NotifyServiceInitializationFailed();
1585         return 2;
1586     }
1587 
1588     cleanTempDir();
1589 
1590     logging::MsgMap msgMap;
1591     msgMap[logDefaultMsg]           = logging::Message(logDefaultMsg);
1592     msgMap[logDbProfStartStatement] = logging::Message(logDbProfStartStatement);
1593     msgMap[logDbProfEndStatement]   = logging::Message(logDbProfEndStatement);
1594     msgMap[logStartSql]             = logging::Message(logStartSql);
1595     msgMap[logEndSql]               = logging::Message(logEndSql);
1596     msgMap[logRssTooBig]            = logging::Message(logRssTooBig);
1597     msgMap[logDbProfQueryStats]     = logging::Message(logDbProfQueryStats);
1598     msgMap[logExeMgrExcpt]          = logging::Message(logExeMgrExcpt);
1599     msgLog.msgMap(msgMap);
1600 
1601     ec = joblist::DistributedEngineComm::instance(rm, true);
1602     ec->Open();
1603 
1604     bool tellUser = true;
1605 
1606     messageqcpp::MessageQueueServer* mqs;
1607 
1608     statementsRunningCount = new ActiveStatementCounter(rm->getEmExecQueueSize());
1609 
1610     for (;;)
1611     {
1612         try
1613         {
1614             mqs = new messageqcpp::MessageQueueServer(ExeMgr, rm->getConfig(), messageqcpp::ByteStream::BlockSize, 64);
1615             break;
1616         }
1617         catch (std::runtime_error& re)
1618         {
1619             std::string what = re.what();
1620 
1621             if (what.find("Address already in use") != std::string::npos)
1622             {
1623                 if (tellUser)
1624                 {
1625                     std::cerr << "Address already in use, retrying..." << std::endl;
1626                     tellUser = false;
1627                 }
1628 
1629                 sleep(5);
1630             }
1631             else
1632             {
1633                 throw;
1634             }
1635         }
1636     }
1637 
1638     // class jobstepThreadPool is used by other processes. We can't call
1639     // resourcemanaager (rm) functions during the static creation of threadpool
1640     // because rm has a "isExeMgr" flag that is set upon creation (rm is a singleton).
1641     // From  the pools perspective, it has no idea if it is ExeMgr doing the
1642     // creation, so it has no idea which way to set the flag. So we set the max here.
1643     joblist::JobStep::jobstepThreadPool.setMaxThreads(rm->getJLThreadPoolSize());
1644     joblist::JobStep::jobstepThreadPool.setName("ExeMgrJobList");
1645 
1646     if (rm->getJlThreadPoolDebug() == "Y" || rm->getJlThreadPoolDebug() == "y")
1647     {
1648       joblist::JobStep::jobstepThreadPool.setDebug(true);
1649       joblist::JobStep::jobstepThreadPool.invoke(threadpool::ThreadPoolMonitor(&joblist::JobStep::jobstepThreadPool));
1650     }
1651 
1652     int serverThreads = rm->getEmServerThreads();
1653     int maxPct = rm->getEmMaxPct();
1654     int pauseSeconds = rm->getEmSecondsBetweenMemChecks();
1655     int priority = rm->getEmPriority();
1656 
1657     FEMsgHandler::threadPool.setMaxThreads(serverThreads);
1658     FEMsgHandler::threadPool.setName("FEMsgHandler");
1659 
1660     if (maxPct > 0)
1661         startRssMon(maxPct, pauseSeconds);
1662 
1663 #ifndef _MSC_VER
1664     setpriority(PRIO_PROCESS, 0, priority);
1665 #endif
1666 
1667     std::string teleServerHost(rm->getConfig()->getConfig("QueryTele", "Host"));
1668 
1669     if (!teleServerHost.empty())
1670     {
1671         int teleServerPort = toInt(rm->getConfig()->getConfig("QueryTele", "Port"));
1672 
1673         if (teleServerPort > 0)
1674         {
1675             gTeleServerParms.host = teleServerHost;
1676             gTeleServerParms.port = teleServerPort;
1677         }
1678     }
1679 
1680     NotifyServiceStarted();
1681 
1682     std::cout << "Starting ExeMgr: st = " << serverThreads <<
1683          ", qs = " << rm->getEmExecQueueSize() << ", mx = " << maxPct << ", cf = " <<
1684          rm->getConfig()->configFile() << std::endl;
1685 
1686     {
1687         BRM::DBRM *dbrm = new BRM::DBRM();
1688         dbrm->setSystemQueryReady(true);
1689         delete dbrm;
1690     }
1691 
1692     threadpool::ThreadPool exeMgrThreadPool(serverThreads, 0);
1693     exeMgrThreadPool.setName("ExeMgrServer");
1694 
1695     if (rm->getExeMgrThreadPoolDebug() == "Y" || rm->getExeMgrThreadPoolDebug() == "y")
1696     {
1697         exeMgrThreadPool.setDebug(true);
1698         exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
1699     }
1700 
1701     for (;;)
1702     {
1703         messageqcpp::IOSocket ios;
1704         ios = mqs->accept();
1705         exeMgrThreadPool.invoke(SessionThread(ios, ec, rm));
1706     }
1707 
1708     exeMgrThreadPool.wait();
1709 
1710     return 0;
1711 }
1712 
1713 
main(int argc,char * argv[])1714 int main(int argc, char* argv[])
1715 {
1716     opterr = 0;
1717     Opt opt(argc, argv);
1718 
1719     // Set locale language
1720     setlocale(LC_ALL, "");
1721     setlocale(LC_NUMERIC, "C");
1722 
1723     // This is unset due to the way we start it
1724     program_invocation_short_name = const_cast<char*>("ExeMgr");
1725 
1726     // Initialize the charset library
1727     my_init();
1728 
1729     return ServiceExeMgr(opt).Run();
1730 }
1731 
1732 // vim:ts=4 sw=4:
1733