1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /******************************************************************************************
19 * $Id: resourcemanager.cpp 9655 2013-06-25 23:08:13Z xlou $
20 *
21 ******************************************************************************************/
22
23 #include <unistd.h>
24 #include <string>
25 #include <stdexcept>
26 #include <iostream>
27 #include <sstream>
28 #include <fstream>
29 #include <sys/time.h>
30 using namespace std;
31
32 #include <boost/regex.hpp>
33 using namespace boost;
34
35 #include "resourcemanager.h"
36
37 #include "jl_logger.h"
38 #include "cgroupconfigurator.h"
39 #include "liboamcpp.h"
40
41 using namespace config;
42
43 namespace joblist
44 {
45
46 //const string ResourceManager::fExeMgrStr("ExeMgr1");
47 const string ResourceManager::fHashJoinStr("HashJoin");
48 const string ResourceManager::fHashBucketReuseStr("HashBucketReuse");
49 const string ResourceManager::fJobListStr("JobList");
50 const string ResourceManager::fPrimitiveServersStr("PrimitiveServers");
51 //const string ResourceManager::fSystemConfigStr("SystemConfig");
52 const string ResourceManager::fTupleWSDLStr("TupleWSDL");
53 const string ResourceManager::fZDLStr("ZDL");
54 const string ResourceManager::fExtentMapStr("ExtentMap");
55 //const string ResourceManager::fDMLProcStr("DMLProc");
56 //const string ResourceManager::fBatchInsertStr("BatchInsert");
57 const string ResourceManager::fOrderByLimitStr("OrderByLimit");
58 const string ResourceManager::fRowAggregationStr("RowAggregation");
59
60 ResourceManager* ResourceManager::fInstance = NULL;
61 boost::mutex mx;
62
instance(bool runningInExeMgr)63 ResourceManager* ResourceManager::instance(bool runningInExeMgr)
64 {
65 boost::mutex::scoped_lock lk(mx);
66
67 if (!fInstance)
68 fInstance = new ResourceManager(runningInExeMgr);
69
70 return fInstance;
71 }
72
ResourceManager(bool runningInExeMgr)73 ResourceManager::ResourceManager(bool runningInExeMgr) :
74 fExeMgrStr("ExeMgr1"),
75 fSystemConfigStr("SystemConfig"),
76 fDMLProcStr("DMLProc"),
77 fBatchInsertStr("BatchInsert"),
78 fConfig(Config::makeConfig()),
79 fNumCores(8),
80 fHjNumThreads(defaultNumThreads),
81 fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan),
82 fJlNumScanReceiveThreads(defaultScanReceiveThreads),
83 fTwNumThreads(defaultNumThreads),
84 fJlMaxOutstandingRequests(defaultMaxOutstandingRequests),
85 fHJUmMaxMemorySmallSideDistributor(fHashJoinStr,
86 "UmMaxMemorySmallSide",
87 getUintVal(fHashJoinStr, "TotalUmMaxMemorySmallSide", defaultTotalUmMemory),
88 getUintVal(fHashJoinStr, "UmMaxMemorySmallSide", defaultHJUmMaxMemorySmallSide),
89 0),
90 fHJPmMaxMemorySmallSideSessionMap(
91 getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide)),
92 isExeMgr(runningInExeMgr)
93 {
94 int temp;
95 int configNumCores = -1;
96
97 fTraceFlags = 0;
98 //See if we want to override the calculated #cores
99 temp = getIntVal(fJobListStr, "NumCores", -1);
100
101 if (temp > 0)
102 configNumCores = temp;
103
104 if (configNumCores <= 0)
105 {
106 //count the actual #cores
107 utils::CGroupConfigurator cg;
108 fNumCores = cg.getNumCores();
109
110 if (fNumCores <= 0)
111 fNumCores = 8;
112 }
113 else
114 fNumCores = configNumCores;
115
116 //based on the #cores, calculate some thread parms
117 if (fNumCores > 0)
118 {
119 fHjNumThreads = fNumCores;
120 fJlNumScanReceiveThreads = fNumCores;
121 fTwNumThreads = fNumCores;
122 }
123
124 //possibly override any calculated values
125 temp = getIntVal(fHashJoinStr, "NumThreads", -1);
126
127 if (temp > 0)
128 fHjNumThreads = temp;
129
130 temp = getIntVal(fJobListStr, "ProcessorThreadsPerScan", -1);
131
132
133 if (temp > 0)
134 fJlProcessorThreadsPerScan = temp;
135
136 temp = getIntVal(fJobListStr, "MaxOutstandingRequests", -1);
137
138 if (temp > 0)
139 fJlMaxOutstandingRequests = temp;
140 else
141 {
142 oam::Oam oam;
143 oam::ModuleTypeConfig moduletypeconfig;
144 oam.getSystemConfig("pm", moduletypeconfig);
145 const uint temp = moduletypeconfig.ModuleCount * fNumCores * 4 / fJlProcessorThreadsPerScan;
146 const uint minMaxOutstandingRequests = std::max(static_cast<uint>(moduletypeconfig.ModuleCount * 2), defaultMaxOutstandingRequests);
147 fJlMaxOutstandingRequests = temp > minMaxOutstandingRequests ? temp : minMaxOutstandingRequests;
148 }
149
150 temp = getIntVal(fJobListStr, "NumScanReceiveThreads", -1);
151
152 if (temp > 0)
153 fJlNumScanReceiveThreads = temp;
154
155 temp = getIntVal(fTupleWSDLStr, "NumThreads", -1);
156
157 if (temp > 0)
158 fTwNumThreads = temp;
159
160 pmJoinMemLimit = getUintVal(fHashJoinStr, "PmMaxMemorySmallSide",
161 defaultHJPmMaxMemorySmallSide);
162
163 // Need to use different limits if this instance isn't running on the UM,
164 // or if it's an ExeMgr running on a PM node
165 if (!isExeMgr)
166 totalUmMemLimit = pmJoinMemLimit;
167 else
168 {
169 string whichLimit = "TotalUmMemory";
170 string pmWithUM = fConfig->getConfig("Installation", "PMwithUM");
171
172 if (pmWithUM == "y" || pmWithUM == "Y")
173 {
174 oam::Oam OAM;
175 oam::oamModuleInfo_t moduleInfo = OAM.getModuleInfo();
176 string& moduleType = boost::get<1>(moduleInfo);
177
178 if (moduleType == "pm" || moduleType == "PM")
179 {
180 string doesItExist = fConfig->getConfig(fHashJoinStr, "TotalPmUmMemory");
181
182 if (!doesItExist.empty())
183 whichLimit = "TotalPmUmMemory";
184 }
185 }
186
187 string umtxt = fConfig->getConfig(fHashJoinStr, whichLimit);
188
189 if (umtxt.empty())
190 totalUmMemLimit = defaultTotalUmMemory;
191 else
192 {
193 // is it an absolute or a percentage?
194 if (umtxt.find('%') != string::npos)
195 {
196 utils::CGroupConfigurator cg;
197 uint64_t totalMem = cg.getTotalMemory();
198 totalUmMemLimit = atoll(umtxt.c_str()) / 100.0 * (double) totalMem;
199
200 if (totalUmMemLimit == 0 || totalUmMemLimit == LLONG_MIN ||
201 totalUmMemLimit == LLONG_MAX) // some garbage in the xml entry
202 totalUmMemLimit = defaultTotalUmMemory;
203 }
204 else // an absolute; use the existing converter
205 {
206 totalUmMemLimit = getIntVal(fHashJoinStr, whichLimit,
207 defaultTotalUmMemory);
208 }
209 }
210 }
211
212 configuredUmMemLimit = totalUmMemLimit;
213 //cout << "RM: total UM memory = " << totalUmMemLimit << endl;
214
215 // multi-thread aggregate
216 string nt, nb, nr;
217 nt = fConfig->getConfig("RowAggregation", "RowAggrThreads");
218
219 if (nt.empty())
220 {
221 if ( numCores() > 0 )
222 fAggNumThreads = numCores();
223 else
224 fAggNumThreads = 1;
225 }
226 else
227 fAggNumThreads = fConfig->uFromText(nt);
228
229 nb = fConfig->getConfig("RowAggregation", "RowAggrBuckets");
230
231 if (nb.empty())
232 fAggNumBuckets = fAggNumThreads * 4;
233 else
234 fAggNumBuckets = fConfig->uFromText(nb);
235
236 nr = fConfig->getConfig("RowAggregation", "RowAggrRowGroupsPerThread");
237
238 if (nr.empty())
239 fAggNumRowGroups = 20;
240 else
241 fAggNumRowGroups = fConfig->uFromText(nr);
242
243 // window function
244 string wt = fConfig->getConfig("WindowFunction", "WorkThreads");
245
246 if (wt.empty())
247 fWindowFunctionThreads = numCores();
248 else
249 fWindowFunctionThreads = fConfig->uFromText(wt);
250
251 // hdfs info
252 string hdfs = fConfig->getConfig("SystemConfig", "DataFilePlugin");
253
254 if ( hdfs.find("hdfs") != string::npos)
255 fUseHdfs = true;
256 else
257 fUseHdfs = false;
258
259 fAllowedDiskAggregation = getBoolVal(fRowAggregationStr,
260 "AllowDiskBasedAggregation",
261 defaultAllowDiskAggregation);
262 }
263
getEmPriority() const264 int ResourceManager::getEmPriority() const
265 {
266 int temp = getIntVal(fExeMgrStr, "Priority", defaultEMPriority);
267 // config file priority is 40..1 (highest..lowest)
268 // convert to -20..19 (highest..lowest, defaults to -1)
269 int val;
270
271 // @Bug3385 - the ExeMgr priority was being set backwards with 1 being the highest instead of the lowest.
272 if (temp < 1)
273 val = 19;
274 else if (temp > 40)
275 val = -20;
276 else
277 val = 20 - temp;
278
279 return val;
280 }
281
addHJPmMaxSmallSideMap(uint32_t sessionID,uint64_t mem)282 void ResourceManager::addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
283 {
284 if (fHJPmMaxMemorySmallSideSessionMap.addSession(sessionID, mem,
285 fHJUmMaxMemorySmallSideDistributor.getTotalResource()))
286 logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJPmMaxMemorySmallSide,
287 "PmMaxMemorySmallSide", LogRMResourceChange );
288 else
289 {
290 logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
291 fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
292 LogRMResourceChangeError);
293
294 logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
295 fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
296 LogRMResourceChangeError);
297 }
298 }
299
addHJUmMaxSmallSideMap(uint32_t sessionID,uint64_t mem)300 void ResourceManager::addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
301 {
302 if (fHJUmMaxMemorySmallSideDistributor.addSession(sessionID, mem))
303 logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJUmMaxMemorySmallSide,
304 "UmMaxMemorySmallSide", LogRMResourceChange);
305 else
306 {
307 logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
308 fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
309 LogRMResourceChangeError);
310
311 logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
312 fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
313 LogRMResourceChangeError);
314
315 }
316
317 }
318
logResourceChangeMessage(logging::LOG_TYPE logType,uint32_t sessionID,uint64_t newvalue,uint64_t value,const string & source,logging::Message::MessageID mid)319 void ResourceManager::logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue,
320 uint64_t value, const string& source, logging::Message::MessageID mid)
321 {
322 logging::Message::Args args;
323 args.add(source);
324 args.add(newvalue);
325 args.add(value);
326 Logger log;
327 log.logMessage(logType, mid, args, logging::LoggingID(5, sessionID));
328 }
329
emServerThreads()330 void ResourceManager::emServerThreads() { }
emServerQueueSize()331 void ResourceManager::emServerQueueSize() { }
emSecondsBetweenMemChecks()332 void ResourceManager::emSecondsBetweenMemChecks() { }
emMaxPct()333 void ResourceManager::emMaxPct() { }
emPriority()334 void ResourceManager::emPriority() { }
emExecQueueSize()335 void ResourceManager::emExecQueueSize() { }
336
hjNumThreads()337 void ResourceManager::hjNumThreads() { }
hjMaxBuckets()338 void ResourceManager::hjMaxBuckets() { }
hjMaxElems()339 void ResourceManager::hjMaxElems() { }
hjFifoSizeLargeSide()340 void ResourceManager::hjFifoSizeLargeSide() { }
hjPmMaxMemorySmallSide()341 void ResourceManager::hjPmMaxMemorySmallSide() { }
342
jlFlushInterval()343 void ResourceManager::jlFlushInterval() { }
jlFifoSize()344 void ResourceManager::jlFifoSize() { }
jlScanLbidReqLimit()345 void ResourceManager::jlScanLbidReqLimit() { }
jlScanLbidReqThreshold()346 void ResourceManager::jlScanLbidReqThreshold() { }
jlProjectBlockReqLimit()347 void ResourceManager::jlProjectBlockReqLimit() { }
jlProjectBlockReqThreshold()348 void ResourceManager::jlProjectBlockReqThreshold() { }
jlNumScanReceiveThreads()349 void ResourceManager::jlNumScanReceiveThreads() { }
350
351
psCount()352 void ResourceManager::psCount() { }
psConnectionsPerPrimProc()353 void ResourceManager::psConnectionsPerPrimProc() { }
psLBID_Shift()354 void ResourceManager::psLBID_Shift() { }
355
scTempDiskPath()356 void ResourceManager::scTempDiskPath() { }
scTempSaveSize()357 void ResourceManager::scTempSaveSize() { }
scWorkingDir()358 void ResourceManager::scWorkingDir() { }
359
360
twMaxSize()361 void ResourceManager::twMaxSize() { }
twInitialCapacity()362 void ResourceManager::twInitialCapacity() { }
twMaxBuckets()363 void ResourceManager::twMaxBuckets () { }
twNumThreads()364 void ResourceManager::twNumThreads() { }
zdl_MaxElementsInMem()365 void ResourceManager::zdl_MaxElementsInMem() { }
zdl_MaxElementsPerBucket()366 void ResourceManager::zdl_MaxElementsPerBucket () { }
367
hbrPredicate()368 void ResourceManager::hbrPredicate() { }
369
getMysqldInfo(std::string & h,std::string & u,std::string & w,unsigned int & p) const370 bool ResourceManager::getMysqldInfo(
371 std::string& h, std::string& u, std::string& w, unsigned int& p) const
372 {
373 static const std::string hostUserUnassignedValue("unassigned");
374 // MCS will read username and pass from disk if the config changed.
375 bool reReadConfig = true;
376 u = getStringVal("CrossEngineSupport", "User", hostUserUnassignedValue, reReadConfig);
377 w = getStringVal("CrossEngineSupport", "Password", "", reReadConfig);
378 // MCS will not read username and pass from disk if the config changed.
379 h = getStringVal("CrossEngineSupport", "Host", hostUserUnassignedValue);
380 p = getUintVal("CrossEngineSupport", "Port", 0);
381
382 return h != hostUserUnassignedValue && u != hostUserUnassignedValue && p;
383 }
384
queryStatsEnabled() const385 bool ResourceManager::queryStatsEnabled() const
386 {
387 std::string val(getStringVal("QueryStats", "Enabled", "N" ));
388 boost::to_upper(val);
389 return "Y" == val;
390 }
391
userPriorityEnabled() const392 bool ResourceManager::userPriorityEnabled() const
393 {
394 std::string val(getStringVal("UserPriority", "Enabled", "N" ));
395 boost::to_upper(val);
396 return "Y" == val;
397 }
398
getMemory(int64_t amount,boost::shared_ptr<int64_t> sessionLimit,bool patience)399 bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit, bool patience)
400 {
401 bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
402 bool ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0);
403
404 uint32_t retryCounter = 0, maxRetries = 20; // 10s delay
405
406 while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries)
407 {
408 atomicops::atomicAdd(&totalUmMemLimit, amount);
409 atomicops::atomicAdd(sessionLimit.get(), amount);
410 usleep(500000);
411 ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
412 ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0);
413 }
414
415 return (ret1 && ret2);
416 }
417
418
419 } //namespace
420