1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /******************************************************************************************
19  * $Id: resourcemanager.h 9655 2013-06-25 23:08:13Z xlou $
20  *
21  ******************************************************************************************/
22 /**
23  * @file
24  */
25 #ifndef JOBLIST_RESOURCEMANAGER_H
26 #define JOBLIST_RESOURCEMANAGER_H
27 
28 #include <vector>
29 #include <iostream>
30 #include <boost/thread.hpp>
31 #include <boost/algorithm/string.hpp>
32 #include <unistd.h>
33 
34 #include "configcpp.h"
35 #include "calpontselectexecutionplan.h"
36 #include "resourcedistributor.h"
37 #include "installdir.h"
38 #include "branchpred.h"
39 
40 #include "atomicops.h"
41 
42 #if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT)
43 #define EXPORT __declspec(dllexport)
44 #else
45 #define EXPORT
46 #endif
47 
48 namespace joblist
49 {
50 //aggfilterstep
51 const uint32_t  defaultNumThreads = 8;
52 //joblistfactory
53 const uint32_t defaultFlushInterval = 8 * 1024;
54 const uint32_t defaultFifoSize = 10;
55 const uint32_t defaultHJFifoSizeLargeSide = 128;
56 const uint64_t defaultHJMaxElems = 512 * 1024;   //hashjoin uses 8192
57 const int      defaultHJMaxBuckets = 32;	   //hashjoin uses 4
58 const uint64_t defaultHJPmMaxMemorySmallSide = 1 * 1024 * 1024 * 1024ULL;
59 const uint64_t defaultHJUmMaxMemorySmallSide = 4 * 1024 * 1024 * 1024ULL;
60 const uint32_t defaultTempSaveSize = defaultHJMaxElems;
61 const uint64_t defaultTotalUmMemory = 8 * 1024 * 1024 * 1024ULL;
62 const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL;
63 
64 const uint32_t defaultTupleDLMaxSize = 64 * 1024;
65 
66 const uint32_t defaultJLThreadPoolSize = 100;
67 
68 //pcolscan.cpp
69 const uint32_t defaultScanLbidReqLimit  = 10000;
70 const uint32_t defaultScanLbidReqThreshold = 5000;
71 const uint32_t defaultLogicalBlocksPerScan = 1024;   // added for bug 1264.
72 const uint32_t defaultScanBlockThreshhold = 10000;   //in jobstep.h
73 
74 const uint32_t defaultScanReceiveThreads = 8;
75 
76 //pcolstep.cpp
77 const uint32_t defaultProjectBlockReqLimit  = 32 * 1024;
78 const uint32_t defaultProjectBlockReqThreshold = 16 * 1024;  //256 in jobstep.h
79 
80 //BatchPrimitiveStep
81 const uint32_t defaultRequestSize  = 1;
82 const uint32_t defaultMaxOutstandingRequests = 20;
83 const uint32_t defaultProcessorThreadsPerScan = 16;
84 const uint32_t defaultJoinerChunkSize = 16 * 1024 * 1024;
85 
86 //bucketreuse
87 const std::string defaultTempDiskPath = "/tmp";
88 const std::string defaultWorkingDir = ".";   //"/tmp";
89 
90 //largedatalist
91 const uint32_t defaultLDLMaxElements = 32 * 1024 * 1024;
92 
93 //zdl
94 const uint64_t defaultMaxElementsInMem = 32 * 1024 * 1024;
95 const uint64_t defaultNumBuckets = 128;
96 const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024;
97 
98 const int defaultEMServerThreads = 50;
99 const int defaultEMSecondsBetweenMemChecks = 1;
100 const int defaultEMMaxPct = 95;
101 const int defaultEMPriority = 21; // @Bug 3385
102 const int defaultEMExecQueueSize = 20;
103 
104 
105 const uint64_t defaultInitialCapacity = 1024 * 1024;
106 const int  defaultTWMaxBuckets = 256;
107 const int  defaultPSCount = 0;
108 const int  defaultConnectionsPerPrimProc = 1;
109 const uint32_t defaultLBID_Shift = 13;
110 const uint64_t defaultExtentRows = 8 * 1024 * 1024;
111 
112 // DMLProc
113 // @bug 1886.  Knocked a 0 off the default below dropping it from 4M down to 256K.  Delete was consuming too much memory.
114 const uint64_t defaultDMLMaxDeleteRows  = 256 * 1024;
115 
116 // Connector
117 // @bug 2048.  To control batch insert memory usage.
118 const uint64_t defaultRowsPerBatch  = 10000;
119 
120 /* HJ CP feedback, see bug #1465 */
121 const uint32_t defaultHjCPUniqueLimit = 100;
122 
123 // Order By and Limit
124 const uint64_t defaultOrderByLimitMaxMemory = 1 * 1024 * 1024 * 1024ULL;
125 
126 const uint64_t defaultDECThrottleThreshold = 200000000;  // ~200 MB
127 
128 const uint8_t defaultUseCpimport = 1;
129 
130 const bool defaultAllowDiskAggregation = false;
131 /** @brief ResourceManager
132  *	Returns requested values from Config
133  *
134  */
135 class ResourceManager
136 {
137 public:
138 
139     /** @brief ctor
140      *
141      */
142     EXPORT ResourceManager(bool runningInExeMgr = false);
143     static ResourceManager* instance(bool runningInExeMgr = false);
144 //     ResourceManager(const config::Config *cf);
145 //     ResourceManager(const std::string& config);
146 //passed by ExeMgr and DistributedEngineComm to MessageQueueServer or -Client
getConfig()147     config::Config* getConfig()
148     {
149         return fConfig;
150     }
151 
152     /** @brief dtor
153      */
~ResourceManager()154     virtual ~ResourceManager() {}
155 
156     typedef std::map <uint32_t, uint64_t> MemMap;
157 
158     // @MCOL-513 - Added threadpool to ExeMgr
getEmServerThreads()159     int  	    getEmServerThreads() const
160     {
161         return  getIntVal(fExeMgrStr, "ThreadPoolSize", defaultEMServerThreads);
162     }
getExeMgrThreadPoolDebug()163     std::string	getExeMgrThreadPoolDebug() const
164     {
165         return  getStringVal(fExeMgrStr, "ThreadPoolDebug", "N");
166     }
167 
getEmSecondsBetweenMemChecks()168     int  	    getEmSecondsBetweenMemChecks() const
169     {
170         return  getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks);
171     }
getEmMaxPct()172     int  	    getEmMaxPct() const
173     {
174         return  getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct);
175     }
176     EXPORT int  getEmPriority() const;
getEmExecQueueSize()177     int  	    getEmExecQueueSize() const
178     {
179         return  getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize);
180     }
181 
getAllowDiskAggregation()182     bool         getAllowDiskAggregation() const
183     {
184         return fAllowedDiskAggregation;
185     }
186 
getHjMaxBuckets()187     int	      	getHjMaxBuckets() const
188     {
189         return  getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets);
190     }
getHjNumThreads()191     unsigned  	getHjNumThreads() const
192     {
193         return  fHjNumThreads;
194     } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); }
getHjMaxElems()195     uint64_t  	getHjMaxElems()  const
196     {
197         return  getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems);
198     }
getHjFifoSizeLargeSide()199     uint32_t  	getHjFifoSizeLargeSide() const
200     {
201         return  getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide);
202     }
getHjCPUniqueLimit()203     uint32_t 	getHjCPUniqueLimit() const
204     {
205         return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit);
206     }
getPMJoinMemLimit()207     uint64_t	getPMJoinMemLimit() const
208     {
209         return pmJoinMemLimit;
210     }
211 
getJLFlushInterval()212     uint32_t  	getJLFlushInterval() const
213     {
214         return  getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval);
215     }
getJlFifoSize()216     uint32_t  	getJlFifoSize() const
217     {
218         return  getUintVal(fJobListStr, "FifoSize", defaultFifoSize);
219     }
getJlScanLbidReqLimit()220     uint32_t  	getJlScanLbidReqLimit() const
221     {
222         return  getUintVal(fJobListStr, "ScanLbidReqLimit", defaultScanLbidReqLimit);
223     }
getJlScanLbidReqThreshold()224     uint32_t  	getJlScanLbidReqThreshold() const
225     {
226         return  getUintVal(fJobListStr, "ScanLbidReqThreshold", defaultScanLbidReqThreshold);
227     }
228 
229     // @MCOL-513 - Added threadpool to JobSteps
getJLThreadPoolSize()230     int         getJLThreadPoolSize() const
231     {
232         return  getIntVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize);
233     }
getJlThreadPoolDebug()234     std::string	getJlThreadPoolDebug() const
235     {
236         return  getStringVal(fJobListStr, "ThreadPoolDebug", "N");
237     }
getDMLJlThreadPoolDebug()238     std::string	getDMLJlThreadPoolDebug() const
239     {
240         return  getStringVal(fJobListStr, "DMLThreadPoolDebug", "N");
241     }
242 
243     // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request.
getJlLogicalBlocksPerScan()244     uint32_t    getJlLogicalBlocksPerScan() const
245     {
246         return  getUintVal(fJobListStr, "LogicalBlocksPerScan", defaultLogicalBlocksPerScan);
247     }
getJlProjectBlockReqLimit()248     uint32_t  	getJlProjectBlockReqLimit() const
249     {
250         return  getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit  );
251     }
getJlProjectBlockReqThreshold()252     uint32_t  	getJlProjectBlockReqThreshold() const
253     {
254         return  getUintVal(fJobListStr, "ProjectBlockReqThreshold", defaultProjectBlockReqThreshold);
255     }
getJlNumScanReceiveThreads()256     uint32_t  	getJlNumScanReceiveThreads() const
257     {
258         return  fJlNumScanReceiveThreads;
259     } //getUintVal(fJobListStr, "NumScanReceiveThreads", defaultScanReceiveThreads); }
260 
261     // @bug 1424,1298
getJlProcessorThreadsPerScan()262     uint32_t    getJlProcessorThreadsPerScan() const
263     {
264         return  fJlProcessorThreadsPerScan;
265     } //getUintVal(fJobListStr,"ProcessorThreadsPerScan", defaultProcessorThreadsPerScan); }
getJlRequestSize()266     uint32_t  	getJlRequestSize() const
267     {
268         return  getUintVal(fJobListStr, "RequestSize", defaultRequestSize  );
269     }
getJlMaxOutstandingRequests()270     uint32_t  	getJlMaxOutstandingRequests() const
271     {
272         return  fJlMaxOutstandingRequests;
273         //getUintVal(fJobListStr, "MaxOutstandingRequests", defaultMaxOutstandingRequests);
274     }
getJlJoinerChunkSize()275     uint32_t  	getJlJoinerChunkSize() const
276     {
277         return  getUintVal(fJobListStr, "JoinerChunkSize", defaultJoinerChunkSize);
278     }
279 
getPsCount()280     int	      	getPsCount() const
281     {
282         return  getUintVal(fPrimitiveServersStr, "Count", defaultPSCount );
283     }
getPsConnectionsPerPrimProc()284     int	      	getPsConnectionsPerPrimProc() const
285     {
286         return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc);
287     }
getPsLBID_Shift()288     uint32_t   	getPsLBID_Shift() const
289     {
290         return  getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift );
291     }
292 
getScTempDiskPath()293     std::string getScTempDiskPath() const
294     {
295         return  startup::StartUp::tmpDir();
296     }
getScTempSaveSize()297     uint64_t  	getScTempSaveSize() const
298     {
299         return  getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize);
300     }
getScWorkingDir()301     std::string getScWorkingDir() const
302     {
303         return  startup::StartUp::tmpDir();
304     }
305 
getTwMaxSize()306     uint32_t  	getTwMaxSize() const
307     {
308         return  getUintVal(fTupleWSDLStr, "MaxSize", defaultTupleDLMaxSize  );
309     }
getTwInitialCapacity()310     uint64_t  	getTwInitialCapacity() const
311     {
312         return  getUintVal(fTupleWSDLStr, "InitialCapacity", defaultInitialCapacity  );
313     }
getTwMaxBuckets()314     int       	getTwMaxBuckets	() const
315     {
316         return  getUintVal(fTupleWSDLStr, "MaxBuckets", defaultTWMaxBuckets  );
317     }
getTwNumThreads()318     uint8_t   	getTwNumThreads() const
319     {
320         return  fTwNumThreads;
321     } //getUintVal(fTupleWSDLStr, "NumThreads", defaultNumThreads  ); }
getZdl_MaxElementsInMem()322     uint64_t  	getZdl_MaxElementsInMem() const
323     {
324         return  getUintVal(fZDLStr, "ZDL_MaxElementsInMem", defaultMaxElementsInMem  );
325     }
getZdl_MaxElementsPerBucket()326     uint64_t  	getZdl_MaxElementsPerBucket () const
327     {
328         return  getUintVal(fZDLStr, "ZDL_MaxElementsPerBucket", defaultMaxElementsPerBuckert );
329     }
330 
getExtentRows()331     uint64_t  	getExtentRows() const
332     {
333         return  getUintVal(fExtentMapStr, "ExtentRows", defaultExtentRows  );
334     }
335 
getDBRootCount()336     uint32_t 	getDBRootCount() const
337     {
338         return getUintVal(fSystemConfigStr, "DBRootCount", 1);
339     }
getPMCount()340     uint32_t 	getPMCount() const
341     {
342         return getUintVal(fPrimitiveServersStr, "Count", 1);
343     }
344 
getHbrPredicate()345     std::vector<std::string>	getHbrPredicate() const
346     {
347         std::vector<std::string> columns;
348         fConfig->getConfig(fHashBucketReuseStr, "Predicate", columns);
349         return columns;
350     }
351 
getDMLMaxDeleteRows()352     uint64_t  	getDMLMaxDeleteRows () const
353     {
354         return  getUintVal(fDMLProcStr, "MaxDeleteRows", defaultDMLMaxDeleteRows);
355     }
356 
getRowsPerBatch()357     uint64_t  	getRowsPerBatch() const
358     {
359         return  getUintVal(fBatchInsertStr, "RowsPerBatch", defaultRowsPerBatch);
360     }
361 
getUseCpimport()362     uint8_t  	getUseCpimport() const
363     {
364         int val = getIntVal(fBatchInsertStr, "UseCpimport", defaultUseCpimport);
365         return val;
366     }
367 
getOrderByLimitMaxMemory()368     uint64_t  	getOrderByLimitMaxMemory() const
369     {
370         return  getUintVal(fOrderByLimitStr, "MaxMemory", defaultOrderByLimitMaxMemory);
371     }
372 
getDECThrottleThreshold()373     uint64_t	getDECThrottleThreshold() const
374     {
375         return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold);
376     }
377 
378     EXPORT void  emServerThreads();
379     EXPORT void  emServerQueueSize();
380     EXPORT void  emSecondsBetweenMemChecks();
381     EXPORT void  emMaxPct();
382     EXPORT void  emPriority();
383     EXPORT void  emExecQueueSize();
384 
385     EXPORT void  hjNumThreads();
386     EXPORT void  hjMaxBuckets();
387     EXPORT void  hjMaxElems();
388     EXPORT void  hjFifoSizeLargeSide();
389     EXPORT void  hjPmMaxMemorySmallSide();
390 
391     /* new HJ/Union/Aggregation mem interface, used by TupleBPS */
392     /* sessionLimit is a pointer to the var holding the session-scope limit, should be JobInfo.umMemLimit
393        for the query. */
394     /* Temporary parameter 'patience', will wait for up to 10s to get the memory. */
395     EXPORT bool getMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit, bool patience = true);
returnMemory(int64_t amount,boost::shared_ptr<int64_t> sessionLimit)396     inline void returnMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit)
397     {
398         atomicops::atomicAdd(&totalUmMemLimit, amount);
399         atomicops::atomicAdd(sessionLimit.get(), amount);
400     }
availableMemory()401     inline int64_t availableMemory() const
402     {
403         return totalUmMemLimit;
404     }
405 
406     /* old HJ mem interface, used by HashJoin */
getHjPmMaxMemorySmallSide(uint32_t sessionID)407     uint64_t   getHjPmMaxMemorySmallSide(uint32_t sessionID)
408     {
409         return fHJPmMaxMemorySmallSideSessionMap.getSessionResource(sessionID);
410     }
getHjUmMaxMemorySmallSide(uint32_t sessionID)411     uint64_t   getHjUmMaxMemorySmallSide(uint32_t sessionID)
412     {
413         return fHJUmMaxMemorySmallSideDistributor.getSessionResource(sessionID);
414     }
getHjTotalUmMaxMemorySmallSide()415     uint64_t   getHjTotalUmMaxMemorySmallSide() const
416     {
417         return fHJUmMaxMemorySmallSideDistributor.getTotalResource();
418     }
419 
420     EXPORT void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
421 
removeHJUmMaxSmallSideMap(uint32_t sessionID)422     void removeHJUmMaxSmallSideMap(uint32_t sessionID)
423     {
424         fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID);
425     }
426 
427     EXPORT void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
removeHJPmMaxSmallSideMap(uint32_t sessionID)428     void removeHJPmMaxSmallSideMap(uint32_t sessionID)
429     {
430         fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID);
431     }
432 
removeSessionMaps(uint32_t sessionID)433     void removeSessionMaps(uint32_t sessionID)
434     {
435         fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID);
436         fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID);
437     }
438 
requestHJMaxMemorySmallSide(uint32_t sessionID,uint64_t amount)439     uint64_t requestHJMaxMemorySmallSide(uint32_t sessionID, uint64_t amount)
440     {
441         return fHJUmMaxMemorySmallSideDistributor.requestResource(sessionID, amount);
442     }
443 
requestHJUmMaxMemorySmallSide(uint32_t sessionID)444     uint64_t requestHJUmMaxMemorySmallSide(uint32_t sessionID)
445     {
446         return fHJUmMaxMemorySmallSideDistributor.requestResource(sessionID);
447     }
returnHJUmMaxMemorySmallSide(uint64_t mem)448     void returnHJUmMaxMemorySmallSide(uint64_t mem)
449     {
450         fHJUmMaxMemorySmallSideDistributor.returnResource(mem);
451     }
452 
453 
454     EXPORT void  jlFlushInterval();
455     EXPORT void  jlFifoSize();
456     EXPORT void  jlScanLbidReqLimit();
457     EXPORT void  jlScanLbidReqThreshold();
458     EXPORT void  jlProjectBlockReqLimit();
459     EXPORT void  jlProjectBlockReqThreshold();
460     EXPORT void  jlNumScanReceiveThreads();
461 
462     EXPORT void  psCount();
463     EXPORT void  psConnectionsPerPrimProc() ;
464     EXPORT void  psLBID_Shift();
465 
466     EXPORT void  scTempDiskPath();
467     EXPORT void  scTempSaveSize() ;
468     EXPORT void  scWorkingDir();
469 
470     EXPORT void  twMaxSize();
471     EXPORT void  twInitialCapacity() ;
472     EXPORT void  twMaxBuckets	() ;
473     EXPORT void  twNumThreads();
474 
475     EXPORT void  zdl_MaxElementsInMem();
476     EXPORT void  zdl_MaxElementsPerBucket() ;
477 
478     EXPORT void  hbrPredicate();
479 
setTraceFlags(uint32_t flags)480     void setTraceFlags(uint32_t flags)
481     {
482         fTraceFlags = flags;
483         fHJUmMaxMemorySmallSideDistributor.setTrace(((fTraceFlags & execplan::CalpontSelectExecutionPlan::TRACE_RESRCMGR) != 0));
484     }
rmtraceOn()485     bool rmtraceOn() const
486     {
487         return ((fTraceFlags & execplan::CalpontSelectExecutionPlan::TRACE_RESRCMGR) != 0);
488     }
489 
numCores(unsigned numCores)490     void numCores(unsigned numCores)
491     {
492         fNumCores = numCores;
493     }
numCores()494     unsigned numCores() const
495     {
496         return fNumCores;
497     }
498 
aggNumThreads(uint32_t numThreads)499     void aggNumThreads(uint32_t numThreads)
500     {
501         fAggNumThreads = numThreads;
502     }
aggNumThreads()503     uint32_t aggNumThreads() const
504     {
505         return fAggNumThreads;
506     }
507 
aggNumBuckets(uint32_t numBuckets)508     void aggNumBuckets(uint32_t numBuckets)
509     {
510         fAggNumBuckets = numBuckets;
511     }
aggNumBuckets()512     uint32_t aggNumBuckets() const
513     {
514         return fAggNumBuckets;
515     }
516 
aggNumRowGroups(uint32_t numRowGroups)517     void aggNumRowGroups(uint32_t numRowGroups)
518     {
519         fAggNumRowGroups = numRowGroups;
520     }
aggNumRowGroups()521     uint32_t aggNumRowGroups() const
522     {
523         return fAggNumRowGroups;
524     }
525 
windowFunctionThreads(uint32_t n)526     void windowFunctionThreads(uint32_t n)
527     {
528         fWindowFunctionThreads = n;
529     }
windowFunctionThreads()530     uint32_t windowFunctionThreads() const
531     {
532         return fWindowFunctionThreads;
533     }
534 
useHdfs()535     bool useHdfs() const
536     {
537         return fUseHdfs;
538     }
539 
540     EXPORT bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const;
541     EXPORT bool queryStatsEnabled() const;
542     EXPORT bool userPriorityEnabled() const;
543 
getConfiguredUMMemLimit()544     uint64_t getConfiguredUMMemLimit() const
545     {
546         return configuredUmMemLimit;
547     }
548 private:
549 
550     void logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue, uint64_t value, const std::string& source, logging::Message::MessageID mid);
551     /** @brief get name's value from section
552      *
553      * get name's value from section in the current config file or default value .
554      * @param section the name of the config file section to search
555      * @param name the param name whose value is to be returned
556      * @param defVal the default value returned if the value is missing
557      */
558     std::string getStringVal(const std::string& section,
559                              const std::string& name,
560                              const std::string& defVal,
561                              const bool reReadConfigIfNeeded = false) const;
562 
563     template<typename IntType>
564     IntType getUintVal(const std::string& section, const std::string& name, IntType defval) const;
565 
566     template<typename IntType>
567     IntType getIntVal(const std::string& section, const std::string& name, IntType defval) const;
568 
569     bool getBoolVal(const std::string& section, const std::string& name, bool defval) const;
570 
571     void logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value = 0, uint32_t sessionId = 0);
572 
573     /*static	const*/ std::string fExeMgrStr;
574     static	const std::string fHashJoinStr;
575     static	const std::string fHashBucketReuseStr;
576     static	const std::string fJobListStr;
577     static	const std::string fPrimitiveServersStr;
578     /*static	const*/ std::string fSystemConfigStr;
579     static	const std::string fTupleWSDLStr;
580     static	const std::string fZDLStr;
581     static	const std::string fExtentMapStr;
582     /*static	const*/ std::string fDMLProcStr;
583     /*static	const*/ std::string fBatchInsertStr;
584     static	const std::string fOrderByLimitStr;
585     static      const std::string fRowAggregationStr;
586     config::Config* fConfig;
587     static ResourceManager* fInstance;
588     uint32_t fTraceFlags;
589 
590     unsigned fNumCores;
591     unsigned fHjNumThreads;
592     uint32_t fJlProcessorThreadsPerScan;
593     uint32_t fJlNumScanReceiveThreads;
594     uint8_t fTwNumThreads;
595     uint32_t fJlMaxOutstandingRequests;
596 
597     /* old HJ support */
598     ResourceDistributor	fHJUmMaxMemorySmallSideDistributor;
599     LockedSessionMap   fHJPmMaxMemorySmallSideSessionMap;
600 
601     /* new HJ/Union/Aggregation support */
602     volatile int64_t totalUmMemLimit;	// mem limit for join, union, and aggregation on the UM
603     uint64_t configuredUmMemLimit;
604     uint64_t pmJoinMemLimit;	// mem limit on individual PM joins
605 
606     /* multi-thread aggregate */
607     uint32_t fAggNumThreads;
608     uint32_t fAggNumBuckets;
609     uint32_t fAggNumRowGroups;
610 
611     // window function
612     uint32_t fWindowFunctionThreads;
613 
614 
615     bool isExeMgr;
616     bool fUseHdfs;
617     bool fAllowedDiskAggregation{false};
618 };
619 
620 
getStringVal(const std::string & section,const std::string & name,const std::string & defval,const bool reReadConfigIfNeeded)621 inline std::string ResourceManager::getStringVal(const std::string& section,
622                                                  const std::string& name,
623                                                  const std::string& defval,
624                                                  const bool reReadConfigIfNeeded) const
625 {
626     std::string val = UNLIKELY(reReadConfigIfNeeded)
627                         ? fConfig->getFromActualConfig(section, name)
628                         : fConfig->getConfig(section, name);
629 #ifdef DEBUGRM
630     std::cout << "RM getStringVal for " << section << " : " << name << " val: " << val << " default: " << defval << std::endl;
631 #endif
632     if (val.empty())
633         val = defval;
634     return val;
635 }
636 
637 template<typename IntType>
getUintVal(const std::string & section,const std::string & name,IntType defval)638 inline IntType ResourceManager::getUintVal(const std::string& section, const std::string& name, IntType defval) const
639 {
640     IntType val = fConfig->uFromText(fConfig->getConfig(section, name));
641 #ifdef DEBUGRM
642     std::cout << "RM getUintVal val: " << section << " : " << name << " val: " << val << " default: " << defval << std::endl;
643 #endif
644     return ( 0 == val ? defval : val );
645 
646 }
647 
648 template<typename IntType>
getIntVal(const std::string & section,const std::string & name,IntType defval)649 inline IntType ResourceManager::getIntVal(const std::string& section, const std::string& name, IntType defval) const
650 {
651     std::string retStr = fConfig->getConfig(section, name);
652 #ifdef DEBUGRM
653     std::cout << "RM getIntVal val: " << section << " : " << name << " val: " << retStr << " default: " << defval << std::endl;
654 #endif
655     return ( 0 == retStr.length() ? defval : fConfig->fromText(retStr) );
656 }
657 
getBoolVal(const std::string & section,const std::string & name,bool defval)658 inline bool ResourceManager::getBoolVal(const std::string& section, const std::string& name, bool defval) const
659 {
660   auto retStr = fConfig->getConfig(section, name);
661   return ( 0 == retStr.length() ? defval : (retStr == "y" || retStr == "Y") );
662 }
663 
664 }
665 
666 #undef EXPORT
667 
668 #endif
669