1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /******************************************************************************************
19  * $Id: $
20  *
21  ******************************************************************************************/
22 #include <string>
23 #include <stdexcept>
24 #include <iostream>
25 #include <sstream>
26 #include <sys/time.h>
27 using namespace std;
28 
29 #include "jl_logger.h"
30 #include "resourcedistributor.h"
31 
32 namespace joblist
33 {
34 
35 const unsigned maxSessionsDefault = 100;
36 
requestResource(uint32_t sessionID)37 uint64_t ResourceDistributor::requestResource(uint32_t sessionID)
38 {
39     uint64_t resource = getSessionResource(sessionID);
40 
41     return requestResource(sessionID, resource);
42 }
43 
requestResource(uint32_t sessionID,uint64_t resource)44 uint64_t ResourceDistributor::requestResource(uint32_t sessionID, uint64_t resource)
45 {
46     if (fTraceOn)
47         logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
48 
49     boost::mutex::scoped_lock lk (fResourceLock );
50 
51     while (fTotalResource < resource)
52     {
53         if (fTraceOn)
54             logMessage(logging::LOG_TYPE_DEBUG, LogRDRequestWait, resource, sessionID);
55 
56         fResourceAvailable.wait(lk);
57 
58         if (fTraceOn)
59             logMessage(logging::LOG_TYPE_DEBUG, LogRDRequest, resource, sessionID);
60 
61     }
62 
63     fTotalResource -= resource;
64 
65     return resource;
66 }
67 
68 
69 
returnResource(uint64_t resource)70 void  ResourceDistributor::returnResource(uint64_t resource)
71 {
72     if (fTraceOn)
73         logMessage(logging::LOG_TYPE_DEBUG, LogRDReturn, resource);
74 
75     boost::mutex::scoped_lock lk (fResourceLock );
76     fTotalResource += resource;
77 
78     fResourceAvailable.notify_all();
79 }
80 
logMessage(logging::LOG_TYPE logLevel,logging::Message::MessageID mid,uint64_t value,uint32_t sessionID)81 void ResourceDistributor::logMessage(logging::LOG_TYPE logLevel, logging::Message::MessageID mid, uint64_t value, uint32_t sessionID)
82 {
83     logging::Message::Args args;
84     args.add(fJob);
85     args.add(fIdentity);
86     args.add(fTotalResource);
87 
88     if (value) args.add(value);
89 
90     Logger log;
91     log.logMessage(logLevel, mid, args, logging::LoggingID(5, sessionID));
92 }
93 
updateAging(uint32_t sessionID)94 void  LockedSessionMap::updateAging(uint32_t sessionID)
95 {
96     boost::mutex::scoped_lock lock(fSessionLock);
97     SessionList::iterator pos = find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID);
98 
99     if (fSessionAgingList.end() != pos)
100         fSessionAgingList.splice(fSessionAgingList.end(), fSessionAgingList, find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
101     else
102         fSessionAgingList.push_back(sessionID);
103 }
104 
getSessionResource(uint32_t sessionID)105 uint64_t  LockedSessionMap::getSessionResource(uint32_t sessionID)
106 {
107     SessionMap::const_iterator it = fSessionMap.find(sessionID);
108 
109     if (fSessionMap.end() != it)
110     {
111         updateAging(sessionID);
112         return it->second;
113     }
114 
115     return fResourceBlock;
116 }
117 
addSession(uint32_t sessionID,uint64_t resource,uint64_t limit)118 bool LockedSessionMap::addSession(uint32_t sessionID, uint64_t resource, uint64_t limit)
119 {
120     bool ret = true;
121 
122     if (resource > limit)
123     {
124         resource = limit;
125         ret = false;
126     }
127 
128     boost::mutex::scoped_lock maplock(fMapLock);
129     fSessionMap[sessionID] = resource;
130     updateAging(sessionID);
131 
132     if (fMaxSessions < fSessionMap.size())
133     {
134         boost::mutex::scoped_lock lock(fSessionLock);
135         uint32_t oldsession = fSessionAgingList.front();
136         fSessionMap.erase(oldsession);
137         fSessionAgingList.erase(fSessionAgingList.begin());
138     }
139 
140     return ret;
141 
142 }
143 
removeSession(uint32_t sessionID)144 void LockedSessionMap::removeSession(uint32_t sessionID)
145 {
146     boost::mutex::scoped_lock maplock(fMapLock);
147     fSessionMap.erase(sessionID);
148     boost::mutex::scoped_lock listlock(fSessionLock);
149     fSessionAgingList.erase(find(fSessionAgingList.begin(), fSessionAgingList.end(), sessionID));
150 }
151 
operator <<(ostream & os,const LockedSessionMap & lsm)152 ostream& operator<<(ostream& os, const LockedSessionMap&  lsm)
153 {
154     os << "Default Resource Block: " << lsm.fResourceBlock << "\tMax Number of saved sessions: " << lsm.fMaxSessions << endl;
155     os << "Session Map:\tsessionID\tvalue\n";
156     LockedSessionMap::SessionMap::const_iterator smIt = lsm.fSessionMap.begin(), smEnd = lsm.fSessionMap.end();
157 
158     for (; smIt != smEnd; ++smIt)
159         os << "\t\t" << smIt->first << "\t\t" << smIt->second << endl;
160 
161     os << "\nAging List:\tsessionID\n\t\t";
162     copy(lsm.fSessionAgingList.begin(), lsm.fSessionAgingList.end(), ostream_iterator<uint32_t>(os, "\n\t\t"));
163     os << endl;
164     return os;
165 }
166 } //namespace
167 
168