1 #ifndef _DMUCS_DB_H_
2 #define _DMUCS_DB_H_ 1
3 
4 /*
5  * dmucs_db.h: the DMUCS database object definition
6  *
7  * Copyright (C) 2005, 2006  Victor T. Norman
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
17  * Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22  */
23 
24 #include <set>
25 #include <map>
26 #include <list>
27 #include "dmucs_host.h"
28 #include <pthread.h>
29 #include <stdio.h>
30 
31 
32 class DmucsDpropDb
33 {
34 private:
35     struct DmucsHostCompare {
36 	bool operator () (DmucsHost *lhs, DmucsHost *rhs) const;
37     };
38     typedef std::set<DmucsHost *, DmucsHostCompare> dmucs_host_set_t;
39     typedef dmucs_host_set_t::iterator dmucs_host_set_iter_t;
40 
41     /* Store a list of available cpu ipaddresses -- there will usually be
42        more than one instance of a cpu (ipaddress) in the list, as even on
43        a single cpu machine we can do more than one compile.
44 
45        Each list of IP addresses is a "tier" -- a set of cpus with
46        approximately equivalent computational power.  We then have a map
47        of these tiers (lists), indexed by an integer, where the lower the
48        integer, the less powerful the cpus in that tier. */
49     typedef std::list<unsigned int> dmucs_cpus_t;
50     typedef dmucs_cpus_t::iterator dmucs_cpus_iter_t;
51     typedef std::map<int, dmucs_cpus_t> dmucs_avail_cpus_t;
52     typedef dmucs_avail_cpus_t::iterator dmucs_avail_cpus_iter_t;
53     typedef dmucs_avail_cpus_t::reverse_iterator dmucs_avail_cpus_riter_t;
54 
55 
56     /* This is a mapping from sock address to host ip address -- the socket
57        of the connection from the "gethost" application to the dmucs server,
58        and the hostip of the cpu assigned to the "gethost" application. */
59     typedef std::map<const void *, const unsigned int>
60     		dmucs_assigned_cpus_t;
61     typedef dmucs_assigned_cpus_t::iterator dmucs_assigned_cpus_iter_t;
62 
63     /*
64      * Databases of hosts.
65      * o a collection of available hosts, sorted by tier.
66      * o a collection of assigned hosts.
67      * o a collection of silent hosts.
68      * o a collection of overloaded hosts.
69      *
70      * o a collectoin of available (unassigned) cpus.
71      * o a collection of assigned cpus.
72      */
73 
74     DmucsDprop		dprop_;		// the common dprop for all hosts here.
75     dmucs_host_set_t 	allHosts_;	// all known hosts are here.
76 
77     dmucs_host_set_t	availHosts_;	// avail hosts are also here.
78     dmucs_host_set_t	unavailHosts_;	// unavail hosts are also here.
79     dmucs_host_set_t 	silentHosts_;	// silent hosts are also here
80     dmucs_host_set_t	overloadedHosts_;// overloaded hosts are here.
81 
82     dmucs_avail_cpus_t	availCpus_;	// unassigned cpus are here.
83     dmucs_assigned_cpus_t assignedCpus_; // assigned cpus are here.
84 
85     /* Statistics */
86     int numAssignedCpus_;	/* the # of assigned CPUs during a collection
87 				   period */
88     int numConcurrentAssigned_; /* the max number of assigned CPUs at one
89 				   time. */
90 
91 
92 public:
93 
DmucsDpropDb(DmucsDprop dprop)94     DmucsDpropDb(DmucsDprop dprop) :
95         dprop_(dprop), numAssignedCpus_(0), numConcurrentAssigned_(0) {}
96 
97     DmucsHost * getHost(const struct in_addr &ipAddr);
98     bool 	haveHost(const struct in_addr &ipAddr);
99     unsigned int getBestAvailCpu();
100     void	assignCpuToClient(const unsigned int clientIp,
101 				  const void *cpuIp);
102     void 	moveCpus(DmucsHost *host, int oldTier, int newTier);
103     int 	delCpusFromTier(int tier, unsigned int ipAddr);
104 
105     void 	addNewHost(DmucsHost *host);
106     void	releaseCpu(const void *sock);
107 
108     void 	addToHostSet(dmucs_host_set_t *theSet, DmucsHost *host);
109     void 	delFromHostSet(dmucs_host_set_t *theSet, DmucsHost *host);
110     void 	addCpusToTier(int tierNum,
111                               const unsigned int ipAddr, const int numCpus);
112 
113     void 	addToAvailDb(DmucsHost *host);
114     void 	delFromAvailDb(DmucsHost *host);
115     void 	addToOverloadedDb(DmucsHost *host);
116     void 	delFromOverloadedDb(DmucsHost *host);
117     void 	addToSilentDb(DmucsHost *host);
118     void 	delFromSilentDb(DmucsHost *host);
119     void 	addToUnavailDb(DmucsHost *host);
120     void 	delFromUnavailDb(DmucsHost *host);
121 
122     void	handleSilentHosts();
123     std::string	serialize();
124     void	getStatsFromDb(int *served, int *max, int *totalCpus);
125     void	dump();
126 };
127 
128 class MutexMonitor
129 {
130  public:
MutexMonitor(pthread_mutex_t * m)131     MutexMonitor(pthread_mutex_t *m) : m_(m)
132     {
133 	pthread_mutex_lock(m_);
134     }
~MutexMonitor()135     ~MutexMonitor()
136     {
137 	pthread_mutex_unlock(m_);
138     }
139  private:
140     pthread_mutex_t *m_;
141 };
142 
143 
144 class DmucsDb
145 {
146 private:
147     typedef std::map<DmucsDprop, DmucsDpropDb> dmucs_dprop_db_t;
148     typedef dmucs_dprop_db_t::iterator dmucs_dprop_db_iter_t;
149 
150     /* A map of dmucs databases, indexed by the distinguishing property of
151        hosts in the system. */
152     dmucs_dprop_db_t  dbDb_;
153 
154     /* A mapping of socket to distinguishing property -- so that when a
155        host is released and all we have is the socket information, we can
156        figure out which DpropDb to put the host back into. */
157     typedef std::map<const void *, DmucsDprop> dmucs_sock_dprop_db_t;
158     typedef dmucs_sock_dprop_db_t::iterator dmucs_sock_dprop_db_iter_t;
159 
160     dmucs_sock_dprop_db_t sock2DpropDb_;
161 
162     static DmucsDb *instance_;
163     static pthread_mutexattr_t attr_;
164     static pthread_mutex_t mutex_;
165 
166     DmucsDb();
~DmucsDb()167     virtual ~DmucsDb() {}
168 
169 public:
170     static DmucsDb *getInstance();
171 
getHost(const struct in_addr & ipAddr,DmucsDprop dprop)172     DmucsHost *getHost(const struct in_addr &ipAddr, DmucsDprop dprop) {
173 	MutexMonitor m(&mutex_);
174 	dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
175 	if (itr == dbDb_.end()) {
176 	    throw DmucsHostNotFound();
177 	}
178 	return itr->second.getHost(ipAddr);
179     }
haveHost(const struct in_addr & ipAddr,DmucsDprop dprop)180     bool haveHost(const struct in_addr &ipAddr, DmucsDprop dprop)  {
181 	MutexMonitor m(&mutex_);
182 	dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
183 	if (itr == dbDb_.end()) {
184 	    return false;
185 	}
186 	return itr->second.haveHost(ipAddr);
187     }
getBestAvailCpu(DmucsDprop dprop)188     unsigned int getBestAvailCpu(DmucsDprop dprop) {
189 	MutexMonitor m(&mutex_);
190 	dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
191 	if (itr == dbDb_.end()) {
192             fprintf(stderr, "nothing in this db!: dprop %s\n",
193                          dprop2cstr(dprop));
194 	    return 0L;		// 32-bits of zeros = 0.0.0.0
195 	}
196 	return itr->second.getBestAvailCpu();
197     }
198     void assignCpuToClient(const unsigned int clientIp,
199                            const DmucsDprop dprop,
200                            const void *sock);
moveCpus(DmucsHost * host,int oldTier,int newTier)201     void moveCpus(DmucsHost *host, int oldTier, int newTier) {
202 	MutexMonitor m(&mutex_);
203 	// Assume the DmucsDpropDb is definitely there.
204 	return dbDb_.find(host->getDprop())->second.moveCpus(host, oldTier,
205 							     newTier);
206     }
delCpusFromTier(DmucsHost * host,int tier,unsigned int ipAddr)207     int delCpusFromTier(DmucsHost *host,
208                         int tier, unsigned int ipAddr) {
209 	MutexMonitor m(&mutex_);
210 	// Assume the DmucsDpropDb is definitely there.
211 	return dbDb_.find(host->getDprop())->second.delCpusFromTier(tier,
212 								    ipAddr);
213     }
214 
addNewHost(DmucsHost * host)215     void addNewHost(DmucsHost *host) {
216 	MutexMonitor m(&mutex_);
217 	DmucsDprop dprop = host->getDprop();
218 	dmucs_dprop_db_iter_t itr = dbDb_.find(dprop);
219 	if (itr == dbDb_.end()) {
220 	    std::pair<dmucs_dprop_db_iter_t, bool> status =
221 		dbDb_.insert(std::make_pair(dprop, DmucsDpropDb(dprop)));
222 	    if (!status.second) {
223 		fprintf(stderr, "%s: could not make a new Dprop-specific db\n",
224 			__func__);
225 		return;
226 	    }
227 	    itr = status.first;
228 	}
229 	return itr->second.addNewHost(host);
230     }
addToAvailDb(DmucsHost * host)231     void addToAvailDb(DmucsHost *host) {
232 	MutexMonitor m(&mutex_);
233 	return dbDb_.find(host->getDprop())->second.addToAvailDb(host);
234     }
delFromAvailDb(DmucsHost * host)235     void delFromAvailDb(DmucsHost *host) {
236 	MutexMonitor m(&mutex_);
237 	return dbDb_.find(host->getDprop())->second.delFromAvailDb(host);
238     };
addToOverloadedDb(DmucsHost * host)239     void addToOverloadedDb(DmucsHost *host) {
240 	MutexMonitor m(&mutex_);
241 	return dbDb_.find(host->getDprop())->second.addToOverloadedDb(host);
242     }
delFromOverloadedDb(DmucsHost * host)243     void delFromOverloadedDb(DmucsHost *host) {
244 	MutexMonitor m(&mutex_);
245 	return dbDb_.find(host->getDprop())->second.delFromOverloadedDb(host);
246     }
addToSilentDb(DmucsHost * host)247     void addToSilentDb(DmucsHost *host) {
248 	MutexMonitor m(&mutex_);
249 	return dbDb_.find(host->getDprop())->second.addToSilentDb(host);
250     }
delFromSilentDb(DmucsHost * host)251     void delFromSilentDb(DmucsHost *host) {
252 	MutexMonitor m(&mutex_);
253 	return dbDb_.find(host->getDprop())->second.delFromSilentDb(host);
254     }
addToUnavailDb(DmucsHost * host)255     void addToUnavailDb(DmucsHost *host) {
256 	MutexMonitor m(&mutex_);
257 	return dbDb_.find(host->getDprop())->second.addToUnavailDb(host);
258     }
delFromUnavailDb(DmucsHost * host)259     void delFromUnavailDb(DmucsHost *host) {
260 	MutexMonitor m(&mutex_);
261 	return dbDb_.find(host->getDprop())->second.delFromUnavailDb(host);
262     }
263 
264     void releaseCpu(const void *sock);
265 
handleSilentHosts()266     void handleSilentHosts() {
267 	MutexMonitor m(&mutex_);
268 	for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
269              itr != dbDb_.end(); ++itr) {
270 	    itr->second.handleSilentHosts();
271 	}
272     }
serialize()273     std::string	serialize() {
274 	MutexMonitor m(&mutex_);
275 	std::string res;
276 	for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
277 	     itr != dbDb_.end(); ++itr) {
278 	    res += itr->second.serialize();
279 	}
280 	return res;
281     }
getStatsFromDb(int * served,int * max,int * totalCpus)282     void getStatsFromDb(int *served, int *max, int *totalCpus) {
283 	MutexMonitor m(&mutex_);
284         int t_serv, t_max, t_total;
285         *served = 0; *max = 0; *totalCpus = 0;
286 	for (dmucs_dprop_db_iter_t itr = dbDb_.begin();
287 	     itr != dbDb_.end(); ++itr) {
288 	    itr->second.getStatsFromDb(&t_serv, &t_max, &t_total);
289             *served += t_serv;
290             *max += t_max;
291             *totalCpus += t_total;
292 	}
293     }
294     void	dump();
295 };
296 
297 
298 inline bool
operator()299 DmucsDpropDb::DmucsHostCompare::operator() (DmucsHost *lhs,
300 					    DmucsHost *rhs) const
301 {
302     // Just do pointer comparison.
303     return (lhs->getIpAddrInt() < rhs->getIpAddrInt());
304 }
305 
306 #endif
307 
308 
309