1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /******************************************************************************************
19  * $Id: resourcemanager.cpp 9655 2013-06-25 23:08:13Z xlou $
20  *
21  ******************************************************************************************/
22 
23 #include <unistd.h>
24 #include <string>
25 #include <stdexcept>
26 #include <iostream>
27 #include <sstream>
28 #include <fstream>
29 #include <sys/time.h>
30 using namespace std;
31 
32 #include <boost/regex.hpp>
33 using namespace boost;
34 
35 #include "resourcemanager.h"
36 
37 #include "jl_logger.h"
38 #include "cgroupconfigurator.h"
39 #include "liboamcpp.h"
40 
41 using namespace config;
42 
43 namespace joblist
44 {
45 
46 //const string ResourceManager::fExeMgrStr("ExeMgr1");
47 const string ResourceManager::fHashJoinStr("HashJoin");
48 const string ResourceManager::fHashBucketReuseStr("HashBucketReuse");
49 const string ResourceManager::fJobListStr("JobList");
50 const string ResourceManager::fPrimitiveServersStr("PrimitiveServers");
51 //const string ResourceManager::fSystemConfigStr("SystemConfig");
52 const string ResourceManager::fTupleWSDLStr("TupleWSDL");
53 const string ResourceManager::fZDLStr("ZDL");
54 const string ResourceManager::fExtentMapStr("ExtentMap");
55 //const string ResourceManager::fDMLProcStr("DMLProc");
56 //const string ResourceManager::fBatchInsertStr("BatchInsert");
57 const string ResourceManager::fOrderByLimitStr("OrderByLimit");
58 const string ResourceManager::fRowAggregationStr("RowAggregation");
59 
60 ResourceManager* ResourceManager::fInstance = NULL;
61 boost::mutex mx;
62 
instance(bool runningInExeMgr)63 ResourceManager* ResourceManager::instance(bool runningInExeMgr)
64 {
65     boost::mutex::scoped_lock lk(mx);
66 
67     if (!fInstance)
68         fInstance = new ResourceManager(runningInExeMgr);
69 
70     return fInstance;
71 }
72 
ResourceManager(bool runningInExeMgr)73 ResourceManager::ResourceManager(bool runningInExeMgr) :
74     fExeMgrStr("ExeMgr1"),
75     fSystemConfigStr("SystemConfig"),
76     fDMLProcStr("DMLProc"),
77     fBatchInsertStr("BatchInsert"),
78     fConfig(Config::makeConfig()),
79     fNumCores(8),
80     fHjNumThreads(defaultNumThreads),
81     fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan),
82     fJlNumScanReceiveThreads(defaultScanReceiveThreads),
83     fTwNumThreads(defaultNumThreads),
84     fJlMaxOutstandingRequests(defaultMaxOutstandingRequests),
85     fHJUmMaxMemorySmallSideDistributor(fHashJoinStr,
86                                        "UmMaxMemorySmallSide",
87                                        getUintVal(fHashJoinStr, "TotalUmMaxMemorySmallSide", defaultTotalUmMemory),
88                                        getUintVal(fHashJoinStr, "UmMaxMemorySmallSide", defaultHJUmMaxMemorySmallSide),
89                                        0),
90     fHJPmMaxMemorySmallSideSessionMap(
91         getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide)),
92     isExeMgr(runningInExeMgr)
93 {
94     int temp;
95     int configNumCores = -1;
96 
97     fTraceFlags = 0;
98     //See if we want to override the calculated #cores
99     temp = getIntVal(fJobListStr, "NumCores", -1);
100 
101     if (temp > 0)
102         configNumCores = temp;
103 
104     if (configNumCores <= 0)
105     {
106         //count the actual #cores
107         utils::CGroupConfigurator cg;
108         fNumCores = cg.getNumCores();
109 
110         if (fNumCores <= 0)
111             fNumCores = 8;
112     }
113     else
114         fNumCores = configNumCores;
115 
116     //based on the #cores, calculate some thread parms
117     if (fNumCores > 0)
118     {
119         fHjNumThreads = fNumCores;
120         fJlNumScanReceiveThreads = fNumCores;
121         fTwNumThreads = fNumCores;
122     }
123 
124     //possibly override any calculated values
125     temp = getIntVal(fHashJoinStr, "NumThreads", -1);
126 
127     if (temp > 0)
128         fHjNumThreads = temp;
129 
130     temp = getIntVal(fJobListStr, "ProcessorThreadsPerScan", -1);
131 
132 
133     if (temp > 0)
134         fJlProcessorThreadsPerScan = temp;
135 
136     temp = getIntVal(fJobListStr, "MaxOutstandingRequests", -1);
137 
138     if (temp > 0)
139         fJlMaxOutstandingRequests = temp;
140     else
141     {
142         oam::Oam oam;
143         oam::ModuleTypeConfig moduletypeconfig;
144         oam.getSystemConfig("pm", moduletypeconfig);
145         const uint temp = moduletypeconfig.ModuleCount * fNumCores * 4 / fJlProcessorThreadsPerScan;
146         const uint minMaxOutstandingRequests = std::max(static_cast<uint>(moduletypeconfig.ModuleCount * 2), defaultMaxOutstandingRequests);
147         fJlMaxOutstandingRequests = temp > minMaxOutstandingRequests ? temp : minMaxOutstandingRequests;
148     }
149 
150     temp = getIntVal(fJobListStr, "NumScanReceiveThreads", -1);
151 
152     if (temp > 0)
153         fJlNumScanReceiveThreads = temp;
154 
155     temp = getIntVal(fTupleWSDLStr, "NumThreads", -1);
156 
157     if (temp > 0)
158         fTwNumThreads = temp;
159 
160     pmJoinMemLimit = getUintVal(fHashJoinStr, "PmMaxMemorySmallSide",
161                                defaultHJPmMaxMemorySmallSide);
162 
163     // Need to use different limits if this instance isn't running on the UM,
164     // or if it's an ExeMgr running on a PM node
165     if (!isExeMgr)
166         totalUmMemLimit = pmJoinMemLimit;
167     else
168     {
169         string whichLimit = "TotalUmMemory";
170         string pmWithUM = fConfig->getConfig("Installation", "PMwithUM");
171 
172         if (pmWithUM == "y" || pmWithUM == "Y")
173         {
174             oam::Oam OAM;
175             oam::oamModuleInfo_t moduleInfo = OAM.getModuleInfo();
176             string& moduleType = boost::get<1>(moduleInfo);
177 
178             if (moduleType == "pm" || moduleType == "PM")
179             {
180                 string doesItExist = fConfig->getConfig(fHashJoinStr, "TotalPmUmMemory");
181 
182                 if (!doesItExist.empty())
183                     whichLimit = "TotalPmUmMemory";
184             }
185         }
186 
187         string umtxt = fConfig->getConfig(fHashJoinStr, whichLimit);
188 
189         if (umtxt.empty())
190             totalUmMemLimit = defaultTotalUmMemory;
191         else
192         {
193             // is it an absolute or a percentage?
194             if (umtxt.find('%') != string::npos)
195             {
196                 utils::CGroupConfigurator cg;
197                 uint64_t totalMem = cg.getTotalMemory();
198                 totalUmMemLimit = atoll(umtxt.c_str()) / 100.0 * (double) totalMem;
199 
200                 if (totalUmMemLimit == 0 || totalUmMemLimit == LLONG_MIN ||
201                         totalUmMemLimit == LLONG_MAX)  // some garbage in the xml entry
202                     totalUmMemLimit = defaultTotalUmMemory;
203             }
204             else    // an absolute; use the existing converter
205             {
206                 totalUmMemLimit = getIntVal(fHashJoinStr, whichLimit,
207                                             defaultTotalUmMemory);
208             }
209         }
210     }
211 
212     configuredUmMemLimit = totalUmMemLimit;
213     //cout << "RM: total UM memory = " << totalUmMemLimit << endl;
214 
215     // multi-thread aggregate
216     string nt, nb, nr;
217     nt = fConfig->getConfig("RowAggregation", "RowAggrThreads");
218 
219     if (nt.empty())
220     {
221         if ( numCores() > 0 )
222             fAggNumThreads = numCores();
223         else
224             fAggNumThreads = 1;
225     }
226     else
227         fAggNumThreads = fConfig->uFromText(nt);
228 
229     nb = fConfig->getConfig("RowAggregation", "RowAggrBuckets");
230 
231     if (nb.empty())
232         fAggNumBuckets = fAggNumThreads * 4;
233     else
234         fAggNumBuckets = fConfig->uFromText(nb);
235 
236     nr = fConfig->getConfig("RowAggregation", "RowAggrRowGroupsPerThread");
237 
238     if (nr.empty())
239         fAggNumRowGroups = 20;
240     else
241         fAggNumRowGroups = fConfig->uFromText(nr);
242 
243     // window function
244     string wt = fConfig->getConfig("WindowFunction", "WorkThreads");
245 
246     if (wt.empty())
247         fWindowFunctionThreads = numCores();
248     else
249         fWindowFunctionThreads = fConfig->uFromText(wt);
250 
251     // hdfs info
252     string hdfs = fConfig->getConfig("SystemConfig", "DataFilePlugin");
253 
254     if ( hdfs.find("hdfs") != string::npos)
255         fUseHdfs = true;
256     else
257         fUseHdfs = false;
258 
259     fAllowedDiskAggregation = getBoolVal(fRowAggregationStr,
260                                          "AllowDiskBasedAggregation",
261                                          defaultAllowDiskAggregation);
262 }
263 
getEmPriority() const264 int ResourceManager::getEmPriority() const
265 {
266     int temp = getIntVal(fExeMgrStr, "Priority", defaultEMPriority);
267     // config file priority is 40..1 (highest..lowest)
268     // convert to  -20..19 (highest..lowest, defaults to -1)
269     int val;
270 
271     // @Bug3385 - the ExeMgr priority was being set backwards with 1 being the highest instead of the lowest.
272     if (temp < 1)
273         val = 19;
274     else if (temp > 40)
275         val = -20;
276     else
277         val = 20 - temp;
278 
279     return val;
280 }
281 
addHJPmMaxSmallSideMap(uint32_t sessionID,uint64_t mem)282 void ResourceManager::addHJPmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
283 {
284     if (fHJPmMaxMemorySmallSideSessionMap.addSession(sessionID, mem,
285             fHJUmMaxMemorySmallSideDistributor.getTotalResource()))
286         logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJPmMaxMemorySmallSide,
287                                  "PmMaxMemorySmallSide", LogRMResourceChange );
288     else
289     {
290         logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
291                                  fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
292                                  LogRMResourceChangeError);
293 
294         logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
295                                  fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "PmMaxMemorySmallSide",
296                                  LogRMResourceChangeError);
297     }
298 }
299 
addHJUmMaxSmallSideMap(uint32_t sessionID,uint64_t mem)300 void ResourceManager::addHJUmMaxSmallSideMap(uint32_t sessionID, uint64_t mem)
301 {
302     if (fHJUmMaxMemorySmallSideDistributor.addSession(sessionID, mem))
303         logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem, defaultHJUmMaxMemorySmallSide,
304                                  "UmMaxMemorySmallSide", LogRMResourceChange);
305     else
306     {
307         logResourceChangeMessage(logging::LOG_TYPE_WARNING, sessionID, mem,
308                                  fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
309                                  LogRMResourceChangeError);
310 
311         logResourceChangeMessage(logging::LOG_TYPE_INFO, sessionID, mem,
312                                  fHJUmMaxMemorySmallSideDistributor.getTotalResource(), "UmMaxMemorySmallSide",
313                                  LogRMResourceChangeError);
314 
315     }
316 
317 }
318 
logResourceChangeMessage(logging::LOG_TYPE logType,uint32_t sessionID,uint64_t newvalue,uint64_t value,const string & source,logging::Message::MessageID mid)319 void ResourceManager::logResourceChangeMessage(logging::LOG_TYPE logType, uint32_t sessionID, uint64_t newvalue,
320         uint64_t value, const string& source, logging::Message::MessageID mid)
321 {
322     logging::Message::Args args;
323     args.add(source);
324     args.add(newvalue);
325     args.add(value);
326     Logger log;
327     log.logMessage(logType,  mid, args, logging::LoggingID(5, sessionID));
328 }
329 
emServerThreads()330 void	ResourceManager::emServerThreads() {  }
emServerQueueSize()331 void	ResourceManager::emServerQueueSize() {  }
emSecondsBetweenMemChecks()332 void	ResourceManager::emSecondsBetweenMemChecks() {  }
emMaxPct()333 void	ResourceManager::emMaxPct()  	{  }
emPriority()334 void	ResourceManager::emPriority() 	{  }
emExecQueueSize()335 void	ResourceManager::emExecQueueSize()	{ }
336 
hjNumThreads()337 void  ResourceManager::hjNumThreads() { }
hjMaxBuckets()338 void	ResourceManager::hjMaxBuckets() { }
hjMaxElems()339 void	ResourceManager::hjMaxElems()  { }
hjFifoSizeLargeSide()340 void	ResourceManager::hjFifoSizeLargeSide() { }
hjPmMaxMemorySmallSide()341 void	ResourceManager::hjPmMaxMemorySmallSide() { }
342 
jlFlushInterval()343 void	ResourceManager::jlFlushInterval() { }
jlFifoSize()344 void	ResourceManager::jlFifoSize() { }
jlScanLbidReqLimit()345 void	ResourceManager::jlScanLbidReqLimit() { }
jlScanLbidReqThreshold()346 void	ResourceManager::jlScanLbidReqThreshold() { }
jlProjectBlockReqLimit()347 void	ResourceManager::jlProjectBlockReqLimit() { }
jlProjectBlockReqThreshold()348 void	ResourceManager::jlProjectBlockReqThreshold() { }
jlNumScanReceiveThreads()349 void	ResourceManager::jlNumScanReceiveThreads() { }
350 
351 
psCount()352 void	ResourceManager::psCount() { }
psConnectionsPerPrimProc()353 void	ResourceManager::psConnectionsPerPrimProc() { }
psLBID_Shift()354 void	ResourceManager::psLBID_Shift() { }
355 
scTempDiskPath()356 void	ResourceManager::scTempDiskPath() { }
scTempSaveSize()357 void	ResourceManager::scTempSaveSize() { }
scWorkingDir()358 void	ResourceManager::scWorkingDir() { }
359 
360 
twMaxSize()361 void	ResourceManager::twMaxSize() { }
twInitialCapacity()362 void	ResourceManager::twInitialCapacity() { }
twMaxBuckets()363 void	ResourceManager::twMaxBuckets	() { }
twNumThreads()364 void	ResourceManager::twNumThreads() { }
zdl_MaxElementsInMem()365 void	ResourceManager::zdl_MaxElementsInMem() { }
zdl_MaxElementsPerBucket()366 void	ResourceManager::zdl_MaxElementsPerBucket () { }
367 
hbrPredicate()368 void  ResourceManager::hbrPredicate() { }
369 
getMysqldInfo(std::string & h,std::string & u,std::string & w,unsigned int & p) const370 bool  ResourceManager::getMysqldInfo(
371     std::string& h, std::string& u, std::string& w, unsigned int& p) const
372 {
373     static const std::string hostUserUnassignedValue("unassigned");
374     // MCS will read username and pass from disk if the config changed.
375     bool reReadConfig = true;
376     u = getStringVal("CrossEngineSupport", "User", hostUserUnassignedValue, reReadConfig);
377     w = getStringVal("CrossEngineSupport", "Password", "", reReadConfig);
378     // MCS will not read username and pass from disk if the config changed.
379     h = getStringVal("CrossEngineSupport", "Host", hostUserUnassignedValue);
380     p = getUintVal("CrossEngineSupport", "Port", 0);
381 
382     return h != hostUserUnassignedValue && u != hostUserUnassignedValue && p;
383 }
384 
queryStatsEnabled() const385 bool ResourceManager::queryStatsEnabled() const
386 {
387     std::string val(getStringVal("QueryStats", "Enabled", "N" ));
388     boost::to_upper(val);
389     return "Y" == val;
390 }
391 
userPriorityEnabled() const392 bool ResourceManager::userPriorityEnabled() const
393 {
394     std::string val(getStringVal("UserPriority", "Enabled", "N" ));
395     boost::to_upper(val);
396     return "Y" == val;
397 }
398 
getMemory(int64_t amount,boost::shared_ptr<int64_t> sessionLimit,bool patience)399 bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t> sessionLimit, bool patience)
400 {
401     bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
402     bool ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0);
403 
404     uint32_t retryCounter = 0, maxRetries = 20;   // 10s delay
405 
406     while (patience && !(ret1 && ret2) && retryCounter++ < maxRetries)
407     {
408         atomicops::atomicAdd(&totalUmMemLimit, amount);
409         atomicops::atomicAdd(sessionLimit.get(), amount);
410         usleep(500000);
411         ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0);
412         ret2 = (atomicops::atomicSub(sessionLimit.get(), amount) >= 0);
413     }
414 
415     return (ret1 && ret2);
416 }
417 
418 
419 } //namespace
420