1 #ifndef _DMUCS_DB_H_
2 #define _DMUCS_DB_H_ 1
3
4 /*
5 * dmucs_db.h: the DMUCS database object definition
6 *
7 * Copyright (C) 2005, 2006 Victor T. Norman
8 *
9 * This program is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2 of the License, or (at your
12 * option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
17 * Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 */
23
24 #include <set>
25 #include <map>
26 #include <list>
27 #include "dmucs_host.h"
28 #include <pthread.h>
29 #include <stdio.h>
30
31
32 class DmucsDpropDb
33 {
34 private:
35 struct DmucsHostCompare {
36 bool operator () (DmucsHost *lhs, DmucsHost *rhs) const;
37 };
38 typedef std::set<DmucsHost *, DmucsHostCompare> dmucs_host_set_t;
39 typedef dmucs_host_set_t::iterator dmucs_host_set_iter_t;
40
41 /* Store a list of available cpu ipaddresses -- there will usually be
42 more than one instance of a cpu (ipaddress) in the list, as even on
43 a single cpu machine we can do more than one compile.
44
45 Each list of IP addresses is a "tier" -- a set of cpus with
46 approximately equivalent computational power. We then have a map
47 of these tiers (lists), indexed by an integer, where the lower the
48 integer, the less powerful the cpus in that tier. */
49 typedef std::list<unsigned int> dmucs_cpus_t;
50 typedef dmucs_cpus_t::iterator dmucs_cpus_iter_t;
51 typedef std::map<int, dmucs_cpus_t> dmucs_avail_cpus_t;
52 typedef dmucs_avail_cpus_t::iterator dmucs_avail_cpus_iter_t;
53 typedef dmucs_avail_cpus_t::reverse_iterator dmucs_avail_cpus_riter_t;
54
55
56 /* This is a mapping from sock address to host ip address -- the socket
57 of the connection from the "gethost" application to the dmucs server,
58 and the hostip of the cpu assigned to the "gethost" application. */
59 typedef std::map<const void *, const unsigned int>
60 dmucs_assigned_cpus_t;
61 typedef dmucs_assigned_cpus_t::iterator dmucs_assigned_cpus_iter_t;
62
63 /*
64 * Databases of hosts.
65 * o a collection of available hosts, sorted by tier.
66 * o a collection of assigned hosts.
67 * o a collection of silent hosts.
68 * o a collection of overloaded hosts.
69 *
70 * o a collectoin of available (unassigned) cpus.
71 * o a collection of assigned cpus.
72 */
73
74 DmucsDprop dprop_; // the common dprop for all hosts here.
75 dmucs_host_set_t allHosts_; // all known hosts are here.
76
77 dmucs_host_set_t availHosts_; // avail hosts are also here.
78 dmucs_host_set_t unavailHosts_; // unavail hosts are also here.
79 dmucs_host_set_t silentHosts_; // silent hosts are also here
80 dmucs_host_set_t overloadedHosts_;// overloaded hosts are here.
81
82 dmucs_avail_cpus_t availCpus_; // unassigned cpus are here.
83 dmucs_assigned_cpus_t assignedCpus_; // assigned cpus are here.
84
85 /* Statistics */
86 int numAssignedCpus_; /* the # of assigned CPUs during a collection
87 period */
88 int numConcurrentAssigned_; /* the max number of assigned CPUs at one
89 time. */
90
91
92 public:
93
DmucsDpropDb(DmucsDprop dprop)94 DmucsDpropDb(DmucsDprop dprop) :
95 dprop_(dprop), numAssignedCpus_(0), numConcurrentAssigned_(0) {}
96
97 DmucsHost * getHost(const struct in_addr &ipAddr);
98 bool haveHost(const struct in_addr &ipAddr);
99 unsigned int getBestAvailCpu();
100 void assignCpuToClient(const unsigned int clientIp,
101 const void *cpuIp);
102 void moveCpus(DmucsHost *host, int oldTier, int newTier);
103 int delCpusFromTier(int tier, unsigned int ipAddr);
104
105 void addNewHost(DmucsHost *host);
106 void releaseCpu(const void *sock);
107
108 void addToHostSet(dmucs_host_set_t *theSet, DmucsHost *host);
109 void delFromHostSet(dmucs_host_set_t *theSet, DmucsHost *host);
110 void addCpusToTier(int tierNum,
111 const unsigned int ipAddr, const int numCpus);
112
113 void addToAvailDb(DmucsHost *host);
114 void delFromAvailDb(DmucsHost *host);
115 void addToOverloadedDb(DmucsHost *host);
116 void delFromOverloadedDb(DmucsHost *host);
117 void addToSilentDb(DmucsHost *host);
118 void delFromSilentDb(DmucsHost *host);
119 void addToUnavailDb(DmucsHost *host);
120 void delFromUnavailDb(DmucsHost *host);
121
122 void handleSilentHosts();
123 std::string serialize();
124 void getStatsFromDb(int *served, int *max, int *totalCpus);
125 void dump();
126 };
127
128 class MutexMonitor
129 {
130 public:
MutexMonitor(pthread_mutex_t * m)131 MutexMonitor(pthread_mutex_t *m) : m_(m)
132 {
133 pthread_mutex_lock(m_);
134 }
~MutexMonitor()135 ~MutexMonitor()
136 {
137 pthread_mutex_unlock(m_);
138 }
139 private:
140 pthread_mutex_t *m_;
141 };
142
143
144 class DmucsDb
145 {
146 private:
147 typedef std::map<DmucsDprop, DmucsDpropDb> dmucs_dprop_db_t;
148 typedef dmucs_dprop_db_t::iterator dmucs_dprop_db_iter_t;
149
150 /* A map of dmucs databases, indexed by the distinguishing property of
151 hosts in the system. */
152 dmucs_dprop_db_t dbDb_;
153
154 /* A mapping of socket to distinguishing property -- so that when a
155 host is released and all we have is the socket information, we can
156 figure out which DpropDb to put the host back into. */
157 typedef std::map<const void *, DmucsDprop> dmucs_sock_dprop_db_t;
158 typedef dmucs_sock_dprop_db_t::iterator dmucs_sock_dprop_db_iter_t;
159
160 dmucs_sock_dprop_db_t sock2DpropDb_;
161
162 static DmucsDb *instance_;
163 static pthread_mutexattr_t attr_;
164 static pthread_mutex_t mutex_;
165
166 DmucsDb();
~DmucsDb()167 virtual ~DmucsDb() {}
168
169 public:
170 static DmucsDb *getInstance();
171
getHost(const struct in_addr & ipAddr,DmucsDprop dprop)172 DmucsHost *getHost(const struct in_addr &ipAddr, DmucsDprop dprop) {
173 MutexMonitor m(&mutex_);
174 dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
175 if (itr == dbDb_.end()) {
176 throw DmucsHostNotFound();
177 }
178 return itr->second.getHost(ipAddr);
179 }
haveHost(const struct in_addr & ipAddr,DmucsDprop dprop)180 bool haveHost(const struct in_addr &ipAddr, DmucsDprop dprop) {
181 MutexMonitor m(&mutex_);
182 dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
183 if (itr == dbDb_.end()) {
184 return false;
185 }
186 return itr->second.haveHost(ipAddr);
187 }
getBestAvailCpu(DmucsDprop dprop)188 unsigned int getBestAvailCpu(DmucsDprop dprop) {
189 MutexMonitor m(&mutex_);
190 dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
191 if (itr == dbDb_.end()) {
192 fprintf(stderr, "nothing in this db!: dprop %s\n",
193 dprop2cstr(dprop));
194 return 0L; // 32-bits of zeros = 0.0.0.0
195 }
196 return itr->second.getBestAvailCpu();
197 }
198 void assignCpuToClient(const unsigned int clientIp,
199 const DmucsDprop dprop,
200 const void *sock);
moveCpus(DmucsHost * host,int oldTier,int newTier)201 void moveCpus(DmucsHost *host, int oldTier, int newTier) {
202 MutexMonitor m(&mutex_);
203 // Assume the DmucsDpropDb is definitely there.
204 return dbDb_.find(host->getDprop())->second.moveCpus(host, oldTier,
205 newTier);
206 }
delCpusFromTier(DmucsHost * host,int tier,unsigned int ipAddr)207 int delCpusFromTier(DmucsHost *host,
208 int tier, unsigned int ipAddr) {
209 MutexMonitor m(&mutex_);
210 // Assume the DmucsDpropDb is definitely there.
211 return dbDb_.find(host->getDprop())->second.delCpusFromTier(tier,
212 ipAddr);
213 }
214
addNewHost(DmucsHost * host)215 void addNewHost(DmucsHost *host) {
216 MutexMonitor m(&mutex_);
217 DmucsDprop dprop = host->getDprop();
218 dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
219 if (itr == dbDb_.end()) {
220 std::pair<dmucs_dprop_db_iter_t, bool> status =
221 dbDb_.insert(std::make_pair(dprop, DmucsDpropDb(dprop)));
222 if (!status.second) {
223 fprintf(stderr, "%s: could not make a new Dprop-specific db\n",
224 __func__);
225 return;
226 }
227 itr = status.first;
228 }
229 return itr->second.addNewHost(host);
230 }
addToAvailDb(DmucsHost * host)231 void addToAvailDb(DmucsHost *host) {
232 MutexMonitor m(&mutex_);
233 return dbDb_.find(host->getDprop())->second.addToAvailDb(host);
234 }
delFromAvailDb(DmucsHost * host)235 void delFromAvailDb(DmucsHost *host) {
236 MutexMonitor m(&mutex_);
237 return dbDb_.find(host->getDprop())->second.delFromAvailDb(host);
238 };
addToOverloadedDb(DmucsHost * host)239 void addToOverloadedDb(DmucsHost *host) {
240 MutexMonitor m(&mutex_);
241 return dbDb_.find(host->getDprop())->second.addToOverloadedDb(host);
242 }
delFromOverloadedDb(DmucsHost * host)243 void delFromOverloadedDb(DmucsHost *host) {
244 MutexMonitor m(&mutex_);
245 return dbDb_.find(host->getDprop())->second.delFromOverloadedDb(host);
246 }
addToSilentDb(DmucsHost * host)247 void addToSilentDb(DmucsHost *host) {
248 MutexMonitor m(&mutex_);
249 return dbDb_.find(host->getDprop())->second.addToSilentDb(host);
250 }
delFromSilentDb(DmucsHost * host)251 void delFromSilentDb(DmucsHost *host) {
252 MutexMonitor m(&mutex_);
253 return dbDb_.find(host->getDprop())->second.delFromSilentDb(host);
254 }
addToUnavailDb(DmucsHost * host)255 void addToUnavailDb(DmucsHost *host) {
256 MutexMonitor m(&mutex_);
257 return dbDb_.find(host->getDprop())->second.addToUnavailDb(host);
258 }
delFromUnavailDb(DmucsHost * host)259 void delFromUnavailDb(DmucsHost *host) {
260 MutexMonitor m(&mutex_);
261 return dbDb_.find(host->getDprop())->second.delFromUnavailDb(host);
262 }
263
264 void releaseCpu(const void *sock);
265
handleSilentHosts()266 void handleSilentHosts() {
267 MutexMonitor m(&mutex_);
268 for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
269 itr != dbDb_.end(); ++itr) {
270 itr->second.handleSilentHosts();
271 }
272 }
serialize()273 std::string serialize() {
274 MutexMonitor m(&mutex_);
275 std::string res;
276 for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
277 itr != dbDb_.end(); ++itr) {
278 res += itr->second.serialize();
279 }
280 return res;
281 }
getStatsFromDb(int * served,int * max,int * totalCpus)282 void getStatsFromDb(int *served, int *max, int *totalCpus) {
283 MutexMonitor m(&mutex_);
284 int t_serv, t_max, t_total;
285 *served = 0; *max = 0; *totalCpus = 0;
286 for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
287 itr != dbDb_.end(); ++itr) {
288 itr->second.getStatsFromDb(&t_serv, &t_max, &t_total);
289 *served += t_serv;
290 *max += t_max;
291 *totalCpus += t_total;
292 }
293 }
294 void dump();
295 };
296
297
298 inline bool
operator()299 DmucsDpropDb::DmucsHostCompare::operator() (DmucsHost *lhs,
300 DmucsHost *rhs) const
301 {
302 // Just do pointer comparison.
303 return (lhs->getIpAddrInt() < rhs->getIpAddrInt());
304 }
305
306 #endif
307
308
309