1/******************************************************************************
2 * $Id: bandeddl.h.vector 3842 2008-03-26 15:02:51Z rdempsey $
3 *
4 * Copyright (c) 2006 Calpont Corporation
5 * All rights reserved.
6 *****************************************************************************/
7
8/** @file
9 * class XXX interface
10 */
11
12#include <set>
13#include "largedatalist.h"
14#include "bucketdl.h"
15
16#include <time.h>
17
18#ifndef _BANDEDDL_HPP_
19#define _BANDEDDL_HPP_
20
21namespace joblist {
22
23/** @brief class BandedDL
24 *
25 */
26template<typename element_t>
27class BandedDL : public LargeDataList<std::vector<element_t>, element_t>
28{
29	typedef LargeDataList<std::vector<element_t>, element_t> base;
30
31	public:
32		BandedDL(uint numConsumers);
33		BandedDL(BucketDL<element_t> &, uint numConsumers);
34		virtual ~BandedDL();
35
36		int saveBand();
37		void loadBand(uint);
38		int bandCount();
39
40		/// loads the first band, next() will return the first element
41		void restart();
42
43		virtual void insert(const element_t &);
44		virtual uint getIterator();
45		virtual bool next(uint it, element_t *e);
46		virtual void endOfInput();
47		using DataListImpl<std::vector<element_t>, element_t>::shrink;
48		uint64_t totalSize();
49		bool next(uint it, element_t *e, bool *endOfBand);
50// 		bool get(const element_t &key, element_t *out);
51
52	protected:
53
54	private:
55		explicit BandedDL() { };
56		explicit BandedDL(const BandedDL &) { };
57		BandedDL & operator=(const BandedDL &) { };
58
59		// vars to support the WSDL-like next() fcn
60		pthread_cond_t nextSetLoaded;
61		uint waitingConsumers;
62};
63
64template<typename element_t>
65BandedDL<element_t>::BandedDL(uint nc) : base(nc)
66{
67	pthread_cond_init(&nextSetLoaded, NULL);
68	waitingConsumers = 0;
69}
70
71
72template<typename element_t>
73BandedDL<element_t>::BandedDL(BucketDL<element_t> &b, uint nc) : base(nc)
74{
75	uint i, it;
76	element_t e;
77	bool more;
78
79	pthread_cond_init(&nextSetLoaded, NULL);
80	waitingConsumers = 0;
81
82#ifdef PROFILE
83	struct timespec ts1, ts2;
84	clock_gettime(CLOCK_REALTIME, &ts1);
85#endif
86
87	for (i = 0; i < b.bucketCount(); i++) {
88		it = b.getIterator(i);
89		more = b.next(i, it, &e);
90		while (more) {
91 			insert(e);
92			more = b.next(i, it, &e);
93		}
94		saveBand();
95	}
96	endOfInput();
97
98#ifdef PROFILE
99	clock_gettime(CLOCK_REALTIME, &ts2);
100	/* What should we do with this profile info? */
101#endif
102
103}
104
105template<typename element_t>
106BandedDL<element_t>::~BandedDL()
107{
108	pthread_cond_destroy(&nextSetLoaded);
109}
110
111template<typename element_t>
112int BandedDL<element_t>::saveBand()
113{
114	int ret;
115
116	if (base::multipleProducers)
117	 	base::lock();
118	sort(base::c->begin(), base::c->end());
119	if (typeid(element_t) == typeid(ElementType) ||
120		typeid(element_t) == typeid(DoubleElementType))
121		ret = base::save_contiguous();
122	else
123		ret = base::save();
124	base::registerNewSet();
125	if (base::multipleProducers)
126		base::unlock();
127
128	return ret;
129}
130
131template<typename element_t>
132void BandedDL<element_t>::loadBand(uint band)
133{
134
135	base::lock();
136	if (typeid(element_t) == typeid(ElementType) ||
137		typeid(element_t) == typeid(DoubleElementType))
138		base::load_contiguous(band);
139	else
140		base::load(band);
141	if (waitingConsumers > 0)
142		pthread_cond_broadcast(&nextSetLoaded);
143	base::unlock();
144}
145
146template<typename element_t>
147int BandedDL<element_t>::bandCount()
148{
149	int ret;
150
151	base::lock();
152	ret = base::setCount();
153	base::unlock();
154	return ret;
155}
156
157template<typename element_t>
158uint BandedDL<element_t>::getIterator()
159{
160	uint ret;
161
162	base::lock();
163	ret = base::getIterator();
164	base::unlock();
165	return ret;
166}
167
168template<typename element_t>
169void BandedDL<element_t>::endOfInput()
170{
171	base::lock();
172	sort(base::c->begin(), base::c->end());
173	base::endOfInput();
174	if (typeid(element_t) == typeid(ElementType) ||
175		typeid(element_t) == typeid(DoubleElementType)) {
176		base::save_contiguous();
177		base::load_contiguous(0);
178	}
179	else {
180		base::save();
181		base::load(0);
182	}
183	base::unlock();
184}
185
186template<typename element_t>
187bool BandedDL<element_t>::next(uint it, element_t *e)
188{
189
190/* Note: this is the code for WSDL::next().  The more I think about it,
191the more I think they're the same thing.  Not entirely sure yet though. */
192
193	bool ret, locked = false;
194	uint nextSet;
195
196 	if (base::numConsumers > 1 || base::phase == 0) {
197 		locked = true;
198		base::lock();
199 	}
200
201	ret = base::next(it, e);
202
203	/* XXXPAT: insignificant race condition here.  Technically, there's no
204	guarantee the caller will be wakened when the next set is loaded.  It could
205	get skipped.  It won't happen realistically, but it exists... */
206
207	// signifies the caller is at the end of the loaded set,
208	// but there are more sets
209	if (ret == false && (base::loadedSet < base::setCount - 1)) {
210
211 		nextSet = base::loadedSet + 1;
212		waitingConsumers++;
213		if (waitingConsumers < base::numConsumers)
214			while (nextSet != base::loadedSet) {
215// 				std::cout << "waiting on nextSetLoaded" << std::endl;
216				pthread_cond_wait(&nextSetLoaded, &(this->mutex));
217			}
218		else {
219// 			std::cout << "loading set " << nextSet << std::endl;
220			if (typeid(element_t) == typeid(ElementType) ||
221				typeid(element_t) == typeid(DoubleElementType))
222				base::load_contiguous(nextSet);
223			else
224				base::load(nextSet);
225			pthread_cond_broadcast(&nextSetLoaded);
226		}
227		waitingConsumers--;
228		ret = base::next(it, e);
229	}
230
231	if (ret == false && ++base::consumersFinished == base::numConsumers)
232		base::shrink();
233	if (locked)
234		base::unlock();
235
236	return ret;
237
238}
239
240template<typename element_t>
241void BandedDL<element_t>::insert(const element_t &e)
242{
243	if (base::multipleProducers)
244		base::lock();
245	base::insert(e);
246	if (base::multipleProducers)
247		base::unlock();
248}
249
250/*
251template<typename element_t>
252bool BandedDL<element_t>::get(const element_t &key, element_t *out)
253{
254	typename std::set<element_t>::iterator it;
255	bool ret, locked = false;
256
257 	if (base::numConsumers > 1 || base::phase == 0) {
258 		locked = true;
259		base::lock();
260 	}
261
262	it = base::c->find(key);
263	if (it != base::c->end()) {
264		*out = *it;
265		ret = true;
266	}
267	else
268		ret = false;
269
270	if (locked)
271		base::unlock();
272
273	return ret;
274}
275*/
276
277template<typename element_t>
278void BandedDL<element_t>::restart()
279{
280	base::lock();
281// 	base::waitForConsumePhase();
282	if (typeid(element_t) == typeid(ElementType) ||
283		typeid(element_t) == typeid(DoubleElementType))
284		base::load_contiguous(0);
285	else
286		base::load(0);
287	base::unlock();
288}
289
290template<typename element_t>
291bool BandedDL<element_t>::next(uint it, element_t *e, bool *endOfBand)
292{
293	bool ret, locked = false;
294
295 	if (base::numConsumers > 1 || base::phase == 0) {
296 		locked = true;
297		base::lock();
298 	}
299
300	base::waitForConsumePhase();
301	ret = base::next(it, e);
302	if (ret) {
303		if (locked)
304			base::unlock();
305		*endOfBand = false;
306		return ret;
307	}
308	else {
309		*endOfBand = true;
310		ret = base::loadedSet < (base::setCount() - 1);
311		if (locked)
312			base::unlock();
313		return ret;
314	}
315}
316
317template<typename element_t>
318uint64_t BandedDL<element_t>::totalSize()
319{
320//std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size();
321	uint64_t ret;
322
323	base::lock();
324	ret = base::totalSize();
325	base::unlock();
326
327	return ret;
328}
329
330}  // namespace
331
332#endif
333
334