1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /******************************************************************************************
19 * $Id: resourcemanager.h 9655 2013-06-25 23:08:13Z xlou $
20 *
21 ******************************************************************************************/
22 /**
23 * @file
24 */
25 #ifndef JOBLIST_RESOURCEMANAGER_H
26 #define JOBLIST_RESOURCEMANAGER_H
27
28 #include <vector>
29 #include <iostream>
30 #include <boost/thread.hpp>
31 #include <boost/algorithm/string.hpp>
32 #include <unistd.h>
33
34 #include "configcpp.h"
35 #include "calpontselectexecutionplan.h"
36 #include "resourcedistributor.h"
37 #include "installdir.h"
38 #include "branchpred.h"
39
40 #include "atomicops.h"
41
42 #if defined(_MSC_VER) && defined(JOBLIST_DLLEXPORT)
43 #define EXPORT __declspec(dllexport)
44 #else
45 #define EXPORT
46 #endif
47
48 namespace joblist
49 {
50 //aggfilterstep
51 const uint32_t defaultNumThreads = 8;
52 //joblistfactory
53 const uint32_t defaultFlushInterval = 8 * 1024;
54 const uint32_t defaultFifoSize = 10;
55 const uint32_t defaultHJFifoSizeLargeSide = 128;
56 const uint64_t defaultHJMaxElems = 512 * 1024; //hashjoin uses 8192
57 const int defaultHJMaxBuckets = 32; //hashjoin uses 4
58 const uint64_t defaultHJPmMaxMemorySmallSide = 1 * 1024 * 1024 * 1024ULL;
59 const uint64_t defaultHJUmMaxMemorySmallSide = 4 * 1024 * 1024 * 1024ULL;
60 const uint32_t defaultTempSaveSize = defaultHJMaxElems;
61 const uint64_t defaultTotalUmMemory = 8 * 1024 * 1024 * 1024ULL;
62 const uint64_t defaultHUATotalMem = 8 * 1024 * 1024 * 1024ULL;
63
64 const uint32_t defaultTupleDLMaxSize = 64 * 1024;
65
66 const uint32_t defaultJLThreadPoolSize = 100;
67
68 //pcolscan.cpp
69 const uint32_t defaultScanLbidReqLimit = 10000;
70 const uint32_t defaultScanLbidReqThreshold = 5000;
71 const uint32_t defaultLogicalBlocksPerScan = 1024; // added for bug 1264.
72 const uint32_t defaultScanBlockThreshhold = 10000; //in jobstep.h
73
74 const uint32_t defaultScanReceiveThreads = 8;
75
76 //pcolstep.cpp
77 const uint32_t defaultProjectBlockReqLimit = 32 * 1024;
78 const uint32_t defaultProjectBlockReqThreshold = 16 * 1024; //256 in jobstep.h
79
80 //BatchPrimitiveStep
81 const uint32_t defaultRequestSize = 1;
82 const uint32_t defaultMaxOutstandingRequests = 20;
83 const uint32_t defaultProcessorThreadsPerScan = 16;
84 const uint32_t defaultJoinerChunkSize = 16 * 1024 * 1024;
85
86 //bucketreuse
87 const std::string defaultTempDiskPath = "/tmp";
88 const std::string defaultWorkingDir = "."; //"/tmp";
89
90 //largedatalist
91 const uint32_t defaultLDLMaxElements = 32 * 1024 * 1024;
92
93 //zdl
94 const uint64_t defaultMaxElementsInMem = 32 * 1024 * 1024;
95 const uint64_t defaultNumBuckets = 128;
96 const uint64_t defaultMaxElementsPerBuckert = 16 * 1024 * 1024;
97
98 const int defaultEMServerThreads = 50;
99 const int defaultEMSecondsBetweenMemChecks = 1;
100 const int defaultEMMaxPct = 95;
101 const int defaultEMPriority = 21; // @Bug 3385
102 const int defaultEMExecQueueSize = 20;
103
104
105 const uint64_t defaultInitialCapacity = 1024 * 1024;
106 const int defaultTWMaxBuckets = 256;
107 const int defaultPSCount = 0;
108 const int defaultConnectionsPerPrimProc = 1;
109 const uint32_t defaultLBID_Shift = 13;
110 const uint64_t defaultExtentRows = 8 * 1024 * 1024;
111
112 // DMLProc
113 // @bug 1886. Knocked a 0 off the default below dropping it from 4M down to 256K. Delete was consuming too much memory.
114 const uint64_t defaultDMLMaxDeleteRows = 256 * 1024;
115
116 // Connector
117 // @bug 2048. To control batch insert memory usage.
118 const uint64_t defaultRowsPerBatch = 10000;
119
120 /* HJ CP feedback, see bug #1465 */
121 const uint32_t defaultHjCPUniqueLimit = 100;
122
123 // Order By and Limit
124 const uint64_t defaultOrderByLimitMaxMemory = 1 * 1024 * 1024 * 1024ULL;
125
126 const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB
127
128 const uint8_t defaultUseCpimport = 1;
129
130 const bool defaultAllowDiskAggregation = false;
131 /** @brief ResourceManager
132 * Returns requested values from Config
133 *
134 */
135 class ResourceManager
136 {
137 public:
138
139 /** @brief ctor
140 *
141 */
142 EXPORT ResourceManager(bool runningInExeMgr = false);
143 static ResourceManager* instance(bool runningInExeMgr = false);
144 // ResourceManager(const config::Config *cf);
145 // ResourceManager(const std::string& config);
146 //passed by ExeMgr and DistributedEngineComm to MessageQueueServer or -Client
getConfig()147 config::Config* getConfig()
148 {
149 return fConfig;
150 }
151
152 /** @brief dtor
153 */
~ResourceManager()154 virtual ~ResourceManager() {}
155
156 typedef std::map <uint32_t, uint64_t> MemMap;
157
158 // @MCOL-513 - Added threadpool to ExeMgr
getEmServerThreads()159 int getEmServerThreads() const
160 {
161 return getIntVal(fExeMgrStr, "ThreadPoolSize", defaultEMServerThreads);
162 }
getExeMgrThreadPoolDebug()163 std::string getExeMgrThreadPoolDebug() const
164 {
165 return getStringVal(fExeMgrStr, "ThreadPoolDebug", "N");
166 }
167
getEmSecondsBetweenMemChecks()168 int getEmSecondsBetweenMemChecks() const
169 {
170 return getUintVal(fExeMgrStr, "SecondsBetweenMemChecks", defaultEMSecondsBetweenMemChecks);
171 }
getEmMaxPct()172 int getEmMaxPct() const
173 {
174 return getUintVal(fExeMgrStr, "MaxPct", defaultEMMaxPct);
175 }
176 EXPORT int getEmPriority() const;
getEmExecQueueSize()177 int getEmExecQueueSize() const
178 {
179 return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize);
180 }
181
getAllowDiskAggregation()182 bool getAllowDiskAggregation() const
183 {
184 return fAllowedDiskAggregation;
185 }
186
getHjMaxBuckets()187 int getHjMaxBuckets() const
188 {
189 return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets);
190 }
getHjNumThreads()191 unsigned getHjNumThreads() const
192 {
193 return fHjNumThreads;
194 } //getUintVal(fHashJoinStr, "NumThreads", defaultNumThreads); }
getHjMaxElems()195 uint64_t getHjMaxElems() const
196 {
197 return getUintVal(fHashJoinStr, "MaxElems", defaultHJMaxElems);
198 }
getHjFifoSizeLargeSide()199 uint32_t getHjFifoSizeLargeSide() const
200 {
201 return getUintVal(fHashJoinStr, "FifoSizeLargeSide", defaultHJFifoSizeLargeSide);
202 }
getHjCPUniqueLimit()203 uint32_t getHjCPUniqueLimit() const
204 {
205 return getUintVal(fHashJoinStr, "CPUniqueLimit", defaultHjCPUniqueLimit);
206 }
getPMJoinMemLimit()207 uint64_t getPMJoinMemLimit() const
208 {
209 return pmJoinMemLimit;
210 }
211
getJLFlushInterval()212 uint32_t getJLFlushInterval() const
213 {
214 return getUintVal(fJobListStr, "FlushInterval", defaultFlushInterval);
215 }
getJlFifoSize()216 uint32_t getJlFifoSize() const
217 {
218 return getUintVal(fJobListStr, "FifoSize", defaultFifoSize);
219 }
getJlScanLbidReqLimit()220 uint32_t getJlScanLbidReqLimit() const
221 {
222 return getUintVal(fJobListStr, "ScanLbidReqLimit", defaultScanLbidReqLimit);
223 }
getJlScanLbidReqThreshold()224 uint32_t getJlScanLbidReqThreshold() const
225 {
226 return getUintVal(fJobListStr, "ScanLbidReqThreshold", defaultScanLbidReqThreshold);
227 }
228
229 // @MCOL-513 - Added threadpool to JobSteps
getJLThreadPoolSize()230 int getJLThreadPoolSize() const
231 {
232 return getIntVal(fJobListStr, "ThreadPoolSize", defaultJLThreadPoolSize);
233 }
getJlThreadPoolDebug()234 std::string getJlThreadPoolDebug() const
235 {
236 return getStringVal(fJobListStr, "ThreadPoolDebug", "N");
237 }
getDMLJlThreadPoolDebug()238 std::string getDMLJlThreadPoolDebug() const
239 {
240 return getStringVal(fJobListStr, "DMLThreadPoolDebug", "N");
241 }
242
243 // @bug 1264 - Added LogicalBlocksPerScan configurable which determines the number of blocks contained in each BPS scan request.
getJlLogicalBlocksPerScan()244 uint32_t getJlLogicalBlocksPerScan() const
245 {
246 return getUintVal(fJobListStr, "LogicalBlocksPerScan", defaultLogicalBlocksPerScan);
247 }
getJlProjectBlockReqLimit()248 uint32_t getJlProjectBlockReqLimit() const
249 {
250 return getUintVal(fJobListStr, "ProjectBlockReqLimit", defaultProjectBlockReqLimit );
251 }
getJlProjectBlockReqThreshold()252 uint32_t getJlProjectBlockReqThreshold() const
253 {
254 return getUintVal(fJobListStr, "ProjectBlockReqThreshold", defaultProjectBlockReqThreshold);
255 }
getJlNumScanReceiveThreads()256 uint32_t getJlNumScanReceiveThreads() const
257 {
258 return fJlNumScanReceiveThreads;
259 } //getUintVal(fJobListStr, "NumScanReceiveThreads", defaultScanReceiveThreads); }
260
261 // @bug 1424,1298
getJlProcessorThreadsPerScan()262 uint32_t getJlProcessorThreadsPerScan() const
263 {
264 return fJlProcessorThreadsPerScan;
265 } //getUintVal(fJobListStr,"ProcessorThreadsPerScan", defaultProcessorThreadsPerScan); }
getJlRequestSize()266 uint32_t getJlRequestSize() const
267 {
268 return getUintVal(fJobListStr, "RequestSize", defaultRequestSize );
269 }
getJlMaxOutstandingRequests()270 uint32_t getJlMaxOutstandingRequests() const
271 {
272 return fJlMaxOutstandingRequests;
273 //getUintVal(fJobListStr, "MaxOutstandingRequests", defaultMaxOutstandingRequests);
274 }
getJlJoinerChunkSize()275 uint32_t getJlJoinerChunkSize() const
276 {
277 return getUintVal(fJobListStr, "JoinerChunkSize", defaultJoinerChunkSize);
278 }
279
getPsCount()280 int getPsCount() const
281 {
282 return getUintVal(fPrimitiveServersStr, "Count", defaultPSCount );
283 }
getPsConnectionsPerPrimProc()284 int getPsConnectionsPerPrimProc() const
285 {
286 return getUintVal(fPrimitiveServersStr, "ConnectionsPerPrimProc", defaultConnectionsPerPrimProc);
287 }
getPsLBID_Shift()288 uint32_t getPsLBID_Shift() const
289 {
290 return getUintVal(fPrimitiveServersStr, "LBID_Shift", defaultLBID_Shift );
291 }
292
getScTempDiskPath()293 std::string getScTempDiskPath() const
294 {
295 return startup::StartUp::tmpDir();
296 }
getScTempSaveSize()297 uint64_t getScTempSaveSize() const
298 {
299 return getUintVal(fSystemConfigStr, "TempSaveSize", defaultTempSaveSize);
300 }
getScWorkingDir()301 std::string getScWorkingDir() const
302 {
303 return startup::StartUp::tmpDir();
304 }
305
getTwMaxSize()306 uint32_t getTwMaxSize() const
307 {
308 return getUintVal(fTupleWSDLStr, "MaxSize", defaultTupleDLMaxSize );
309 }
getTwInitialCapacity()310 uint64_t getTwInitialCapacity() const
311 {
312 return getUintVal(fTupleWSDLStr, "InitialCapacity", defaultInitialCapacity );
313 }
getTwMaxBuckets()314 int getTwMaxBuckets () const
315 {
316 return getUintVal(fTupleWSDLStr, "MaxBuckets", defaultTWMaxBuckets );
317 }
getTwNumThreads()318 uint8_t getTwNumThreads() const
319 {
320 return fTwNumThreads;
321 } //getUintVal(fTupleWSDLStr, "NumThreads", defaultNumThreads ); }
getZdl_MaxElementsInMem()322 uint64_t getZdl_MaxElementsInMem() const
323 {
324 return getUintVal(fZDLStr, "ZDL_MaxElementsInMem", defaultMaxElementsInMem );
325 }
getZdl_MaxElementsPerBucket()326 uint64_t getZdl_MaxElementsPerBucket () const
327 {
328 return getUintVal(fZDLStr, "ZDL_MaxElementsPerBucket", defaultMaxElementsPerBuckert );
329 }
330
getExtentRows()331 uint64_t getExtentRows() const
332 {
333 return getUintVal(fExtentMapStr, "ExtentRows", defaultExtentRows );
334 }
335
getDBRootCount()336 uint32_t getDBRootCount() const
337 {
338 return getUintVal(fSystemConfigStr, "DBRootCount", 1);
339 }
getPMCount()340 uint32_t getPMCount() const
341 {
342 return getUintVal(fPrimitiveServersStr, "Count", 1);
343 }
344
getHbrPredicate()345 std::vector<std::string> getHbrPredicate() const
346 {
347 std::vector<std::string> columns;
348 fConfig->getConfig(fHashBucketReuseStr, "Predicate", columns);
349 return columns;
350 }
351
getDMLMaxDeleteRows()352 uint64_t getDMLMaxDeleteRows () const
353 {
354 return getUintVal(fDMLProcStr, "MaxDeleteRows", defaultDMLMaxDeleteRows);
355 }
356
getRowsPerBatch()357 uint64_t getRowsPerBatch() const
358 {
359 return getUintVal(fBatchInsertStr, "RowsPerBatch", defaultRowsPerBatch);
360 }
361
getUseCpimport()362 uint8_t getUseCpimport() const
363 {
364 int val = getIntVal(fBatchInsertStr, "UseCpimport", defaultUseCpimport);
365 return val;
366 }
367
getOrderByLimitMaxMemory()368 uint64_t getOrderByLimitMaxMemory() const
369 {
370 return getUintVal(fOrderByLimitStr, "MaxMemory", defaultOrderByLimitMaxMemory);
371 }
372
getDECThrottleThreshold()373 uint64_t getDECThrottleThreshold() const
374 {
375 return getUintVal(fJobListStr, "DECThrottleThreshold", defaultDECThrottleThreshold);
376 }
377
378 EXPORT void emServerThreads();
379 EXPORT void emServerQueueSize();
380 EXPORT void emSecondsBetweenMemChecks();
381 EXPORT void emMaxPct();
382 EXPORT void emPriority();
383 EXPORT void emExecQueueSize();
384
385 EXPORT void hjNumThreads();
386 EXPORT void hjMaxBuckets();
387 EXPORT void hjMaxElems();
388 EXPORT void hjFifoSizeLargeSide();
389 EXPORT void hjPmMaxMemorySmallSide();
390
391 /* new HJ/Union/Aggregation mem interface, used by TupleBPS */
392 /* sessionLimit is a pointer to the var holding the session-scope limit, should be JobInfo.umMemLimit
393 for the query. */
394 /* Temporary parameter 'patience', will wait for up to 10s to get the memory. */
395 EXPORT bool getMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit, bool patience = true);
returnMemory(int64_t amount,boost::shared_ptr<int64_t> sessionLimit)396 inline void returnMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit)
397 {
398 atomicops::atomicAdd(&totalUmMemLimit, amount);
399 atomicops::atomicAdd(sessionLimit.get(), amount);
400 }
availableMemory()401 inline int64_t availableMemory() const
402 {
403 return totalUmMemLimit;
404 }
405
406 /* old HJ mem interface, used by HashJoin */
getHjPmMaxMemorySmallSide(uint32_t sessionID)407 uint64_t getHjPmMaxMemorySmallSide(uint32_t sessionID)
408 {
409 return fHJPmMaxMemorySmallSideSessionMap.getSessionResource(sessionID);
410 }
getHjUmMaxMemorySmallSide(uint32_t sessionID)411 uint64_t getHjUmMaxMemorySmallSide(uint32_t sessionID)
412 {
413 return fHJUmMaxMemorySmallSideDistributor.getSessionResource(sessionID);
414 }
getHjTotalUmMaxMemorySmallSide()415 uint64_t getHjTotalUmMaxMemorySmallSide() const
416 {
417 return fHJUmMaxMemorySmallSideDistributor.getTotalResource();
418 }
419
420 EXPORT void addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
421
removeHJUmMaxSmallSideMap(uint32_t sessionID)422 void removeHJUmMaxSmallSideMap(uint32_t sessionID)
423 {
424 fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID);
425 }
426
427 EXPORT void addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem);
removeHJPmMaxSmallSideMap(uint32_t sessionID)428 void removeHJPmMaxSmallSideMap(uint32_t sessionID)
429 {
430 fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID);
431 }
432
removeSessionMaps(uint32_t sessionID)433 void removeSessionMaps(uint32_t sessionID)
434 {
435 fHJPmMaxMemorySmallSideSessionMap.removeSession(sessionID);
436 fHJUmMaxMemorySmallSideDistributor.removeSession(sessionID);
437 }
438
requestHJMaxMemorySmallSide(uint32_t sessionID,uint64_t amount)439 uint64_t requestHJMaxMemorySmallSide(uint32_t sessionID, uint64_t amount)
440 {
441 return fHJUmMaxMemorySmallSideDistributor.requestResource(sessionID, amount);
442 }
443
requestHJUmMaxMemorySmallSide(uint32_t sessionID)444 uint64_t requestHJUmMaxMemorySmallSide(uint32_t sessionID)
445 {
446 return fHJUmMaxMemorySmallSideDistributor.requestResource(sessionID);
447 }
returnHJUmMaxMemorySmallSide(uint64_t mem)448 void returnHJUmMaxMemorySmallSide(uint64_t mem)
449 {
450 fHJUmMaxMemorySmallSideDistributor.returnResource(mem);
451 }
452
453
454 EXPORT void jlFlushInterval();
455 EXPORT void jlFifoSize();
456 EXPORT void jlScanLbidReqLimit();
457 EXPORT void jlScanLbidReqThreshold();
458 EXPORT void jlProjectBlockReqLimit();
459 EXPORT void jlProjectBlockReqThreshold();
460 EXPORT void jlNumScanReceiveThreads();
461
462 EXPORT void psCount();
463 EXPORT void psConnectionsPerPrimProc() ;
464 EXPORT void psLBID_Shift();
465
466 EXPORT void scTempDiskPath();
467 EXPORT void scTempSaveSize() ;
468 EXPORT void scWorkingDir();
469
470 EXPORT void twMaxSize();
471 EXPORT void twInitialCapacity() ;
472 EXPORT void twMaxBuckets () ;
473 EXPORT void twNumThreads();
474
475 EXPORT void zdl_MaxElementsInMem();
476 EXPORT void zdl_MaxElementsPerBucket() ;
477
478 EXPORT void hbrPredicate();
479
setTraceFlags(uint32_t flags)480 void setTraceFlags(uint32_t flags)
481 {
482 fTraceFlags = flags;
483 fHJUmMaxMemorySmallSideDistributor.setTrace(((fTraceFlags & execplan::CalpontSelectExecutionPlan::TRACE_RESRCMGR) != 0));
484 }
rmtraceOn()485 bool rmtraceOn() const
486 {
487 return ((fTraceFlags & execplan::CalpontSelectExecutionPlan::TRACE_RESRCMGR) != 0);
488 }
489
numCores(unsigned numCores)490 void numCores(unsigned numCores)
491 {
492 fNumCores = numCores;
493 }
numCores()494 unsigned numCores() const
495 {
496 return fNumCores;
497 }
498
aggNumThreads(uint32_t numThreads)499 void aggNumThreads(uint32_t numThreads)
500 {
501 fAggNumThreads = numThreads;
502 }
aggNumThreads()503 uint32_t aggNumThreads() const
504 {
505 return fAggNumThreads;
506 }
507
aggNumBuckets(uint32_t numBuckets)508 void aggNumBuckets(uint32_t numBuckets)
509 {
510 fAggNumBuckets = numBuckets;
511 }
aggNumBuckets()512 uint32_t aggNumBuckets() const
513 {
514 return fAggNumBuckets;
515 }
516
aggNumRowGroups(uint32_t numRowGroups)517 void aggNumRowGroups(uint32_t numRowGroups)
518 {
519 fAggNumRowGroups = numRowGroups;
520 }
aggNumRowGroups()521 uint32_t aggNumRowGroups() const
522 {
523 return fAggNumRowGroups;
524 }
525
windowFunctionThreads(uint32_t n)526 void windowFunctionThreads(uint32_t n)
527 {
528 fWindowFunctionThreads = n;
529 }
windowFunctionThreads()530 uint32_t windowFunctionThreads() const
531 {
532 return fWindowFunctionThreads;
533 }
534
useHdfs()535 bool useHdfs() const
536 {
537 return fUseHdfs;
538 }
539
540 EXPORT bool getMysqldInfo(std::string& h, std::string& u, std::string& w, unsigned int& p) const;
541 EXPORT bool queryStatsEnabled() const;
542 EXPORT bool userPriorityEnabled() const;
543
getConfiguredUMMemLimit()544 uint64_t getConfiguredUMMemLimit() const
545 {
546 return configuredUmMemLimit;
547 }
548 private:
549
550 void logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue, uint64_t value, const std::string& source, logging::Message::MessageID mid);
551 /** @brief get name's value from section
552 *
553 * get name's value from section in the current config file or default value .
554 * @param section the name of the config file section to search
555 * @param name the param name whose value is to be returned
556 * @param defVal the default value returned if the value is missing
557 */
558 std::string getStringVal(const std::string& section,
559 const std::string& name,
560 const std::string& defVal,
561 const bool reReadConfigIfNeeded = false) const;
562
563 template<typename IntType>
564 IntType getUintVal(const std::string& section, const std::string& name, IntType defval) const;
565
566 template<typename IntType>
567 IntType getIntVal(const std::string& section, const std::string& name, IntType defval) const;
568
569 bool getBoolVal(const std::string& section, const std::string& name, bool defval) const;
570
571 void logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value = 0, uint32_t sessionId = 0);
572
573 /*static const*/ std::string fExeMgrStr;
574 static const std::string fHashJoinStr;
575 static const std::string fHashBucketReuseStr;
576 static const std::string fJobListStr;
577 static const std::string fPrimitiveServersStr;
578 /*static const*/ std::string fSystemConfigStr;
579 static const std::string fTupleWSDLStr;
580 static const std::string fZDLStr;
581 static const std::string fExtentMapStr;
582 /*static const*/ std::string fDMLProcStr;
583 /*static const*/ std::string fBatchInsertStr;
584 static const std::string fOrderByLimitStr;
585 static const std::string fRowAggregationStr;
586 config::Config* fConfig;
587 static ResourceManager* fInstance;
588 uint32_t fTraceFlags;
589
590 unsigned fNumCores;
591 unsigned fHjNumThreads;
592 uint32_t fJlProcessorThreadsPerScan;
593 uint32_t fJlNumScanReceiveThreads;
594 uint8_t fTwNumThreads;
595 uint32_t fJlMaxOutstandingRequests;
596
597 /* old HJ support */
598 ResourceDistributor fHJUmMaxMemorySmallSideDistributor;
599 LockedSessionMap fHJPmMaxMemorySmallSideSessionMap;
600
601 /* new HJ/Union/Aggregation support */
602 volatile int64_t totalUmMemLimit; // mem limit for join, union, and aggregation on the UM
603 uint64_t configuredUmMemLimit;
604 uint64_t pmJoinMemLimit; // mem limit on individual PM joins
605
606 /* multi-thread aggregate */
607 uint32_t fAggNumThreads;
608 uint32_t fAggNumBuckets;
609 uint32_t fAggNumRowGroups;
610
611 // window function
612 uint32_t fWindowFunctionThreads;
613
614
615 bool isExeMgr;
616 bool fUseHdfs;
617 bool fAllowedDiskAggregation{false};
618 };
619
620
getStringVal(const std::string & section,const std::string & name,const std::string & defval,const bool reReadConfigIfNeeded)621 inline std::string ResourceManager::getStringVal(const std::string& section,
622 const std::string& name,
623 const std::string& defval,
624 const bool reReadConfigIfNeeded) const
625 {
626 std::string val = UNLIKELY(reReadConfigIfNeeded)
627 ? fConfig->getFromActualConfig(section, name)
628 : fConfig->getConfig(section, name);
629 #ifdef DEBUGRM
630 std::cout << "RM getStringVal for " << section << " : " << name << " val: " << val << " default: " << defval << std::endl;
631 #endif
632 if (val.empty())
633 val = defval;
634 return val;
635 }
636
637 template<typename IntType>
getUintVal(const std::string & section,const std::string & name,IntType defval)638 inline IntType ResourceManager::getUintVal(const std::string& section, const std::string& name, IntType defval) const
639 {
640 IntType val = fConfig->uFromText(fConfig->getConfig(section, name));
641 #ifdef DEBUGRM
642 std::cout << "RM getUintVal val: " << section << " : " << name << " val: " << val << " default: " << defval << std::endl;
643 #endif
644 return ( 0 == val ? defval : val );
645
646 }
647
648 template<typename IntType>
getIntVal(const std::string & section,const std::string & name,IntType defval)649 inline IntType ResourceManager::getIntVal(const std::string& section, const std::string& name, IntType defval) const
650 {
651 std::string retStr = fConfig->getConfig(section, name);
652 #ifdef DEBUGRM
653 std::cout << "RM getIntVal val: " << section << " : " << name << " val: " << retStr << " default: " << defval << std::endl;
654 #endif
655 return ( 0 == retStr.length() ? defval : fConfig->fromText(retStr) );
656 }
657
getBoolVal(const std::string & section,const std::string & name,bool defval)658 inline bool ResourceManager::getBoolVal(const std::string& section, const std::string& name, bool defval) const
659 {
660 auto retStr = fConfig->getConfig(section, name);
661 return ( 0 == retStr.length() ? defval : (retStr == "y" || retStr == "Y") );
662 }
663
664 }
665
666 #undef EXPORT
667
668 #endif
669