1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /******************************************************************************************
19 * $Id: $
20 *
21 ******************************************************************************************/
22 #include <string>
23 #include <stdexcept>
24 #include <iostream>
25 #include <sstream>
26 #include <sys/time.h>
27 using namespace std;
28
29 #include "jl_logger.h"
30 #include "resourcedistributor.h"
31
32 namespace joblist
33 {
34
35 const unsigned maxSessionsDefault = 100;
36
requestResource(uint32_t sessionID)37 uint64_t ResourceDistributor::requestResource(uint32_t sessionID)
38 {
39 uint64_t resource = getSessionResource(sessionID);
40
41 return requestResource(sessionID, resource);
42 }
43
requestResource(uint32_t sessionID,uint64_t resource)44 uint64_t ResourceDistributor::requestResource(uint32_t sessionID, uint64_t resource)
45 {
46 if (fTraceOn)
47 logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
48
49 boost::mutex::scoped_lock lk (fResourceLock );
50
51 while (fTotalResource < resource)
52 {
53 if (fTraceOn)
54 logMessage(logging::LOG_TYPE_DEBUG, LogRDRequestWait, resource, sessionID);
55
56 fResourceAvailable.wait(lk);
57
58 if (fTraceOn)
59 logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
60
61 }
62
63 fTotalResource -= resource;
64
65 return resource;
66 }
67
68
69
returnResource(uint64_t resource)70 void ResourceDistributor::returnResource(uint64_t resource)
71 {
72 if (fTraceOn)
73 logMessage(logging::LOG_TYPE_DEBUG, LogRDReturn, resource);
74
75 boost::mutex::scoped_lock lk (fResourceLock );
76 fTotalResource += resource;
77
78 fResourceAvailable.notify_all();
79 }
80
logMessage(logging::LOG_TYPE logLevel,logging::Message::MessageID mid,uint64_t value,uint32_t sessionID)81 void ResourceDistributor::logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value, uint32_t sessionID)
82 {
83 logging::Message::Args args;
84 args.add(fJob);
85 args.add(fIdentity);
86 args.add(fTotalResource);
87
88 if (value) args.add(value);
89
90 Logger log;
91 log.logMessage(logLevel, mid, args, logging::LoggingID(5, sessionID));
92 }
93
updateAging(uint32_t sessionID)94 void LockedSessionMap::updateAging(uint32_t sessionID)
95 {
96 boost::mutex::scoped_lock lock(fSessionLock);
97 SessionList::iterator pos = find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID);
98
99 if (fSessionAgingList.end() != pos)
100 fSessionAgingList.splice(fSessionAgingList.end(), fSessionAgingList, find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
101 else
102 fSessionAgingList.push_back(sessionID);
103 }
104
getSessionResource(uint32_t sessionID)105 uint64_t LockedSessionMap::getSessionResource(uint32_t sessionID)
106 {
107 SessionMap::const_iterator it = fSessionMap.find(sessionID);
108
109 if (fSessionMap.end() != it)
110 {
111 updateAging(sessionID);
112 return it->second;
113 }
114
115 return fResourceBlock;
116 }
117
addSession(uint32_t sessionID,uint64_t resource,uint64_t limit)118 bool LockedSessionMap::addSession(uint32_t sessionID, uint64_t resource, uint64_t limit)
119 {
120 bool ret = true;
121
122 if (resource > limit)
123 {
124 resource = limit;
125 ret = false;
126 }
127
128 boost::mutex::scoped_lock maplock(fMapLock);
129 fSessionMap[sessionID] = resource;
130 updateAging(sessionID);
131
132 if (fMaxSessions < fSessionMap.size())
133 {
134 boost::mutex::scoped_lock lock(fSessionLock);
135 uint32_t oldsession = fSessionAgingList.front();
136 fSessionMap.erase(oldsession);
137 fSessionAgingList.erase(fSessionAgingList.begin());
138 }
139
140 return ret;
141
142 }
143
removeSession(uint32_t sessionID)144 void LockedSessionMap::removeSession(uint32_t sessionID)
145 {
146 boost::mutex::scoped_lock maplock(fMapLock);
147 fSessionMap.erase(sessionID);
148 boost::mutex::scoped_lock listlock(fSessionLock);
149 fSessionAgingList.erase(find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
150 }
151
operator <<(ostream & os,const LockedSessionMap & lsm)152 ostream& operator<<(ostream& os, const LockedSessionMap& lsm)
153 {
154 os << "Default Resource Block: " << lsm.fResourceBlock << "\tMax Number of saved sessions: " << lsm.fMaxSessions << endl;
155 os << "Session Map:\tsessionID\tvalue\n";
156 LockedSessionMap::SessionMap::const_iterator smIt = lsm.fSessionMap.begin(), smEnd = lsm.fSessionMap.end();
157
158 for (; smIt != smEnd; ++smIt)
159 os << "\t\t" << smIt->first << "\t\t" << smIt->second << endl;
160
161 os << "\nAging List:\tsessionID\n\t\t";
162 copy(lsm.fSessionAgingList.begin(), lsm.fSessionAgingList.end(), ostream_iterator<uint32_t>(os, "\n\t\t"));
163 os << endl;
164 return os;
165 }
166 } //namespace
167
168