1/****************************************************************************** 2 * $Id: bandeddl.h.vector 3842 2008-03-26 15:02:51Z rdempsey $ 3 * 4 * Copyright (c) 2006 Calpont Corporation 5 * All rights reserved. 6 *****************************************************************************/ 7 8/** @file 9 * class XXX interface 10 */ 11 12#include <set> 13#include "largedatalist.h" 14#include "bucketdl.h" 15 16#include <time.h> 17 18#ifndef _BANDEDDL_HPP_ 19#define _BANDEDDL_HPP_ 20 21namespace joblist { 22 23/** @brief class BandedDL 24 * 25 */ 26template<typename element_t> 27class BandedDL : public LargeDataList<std::vector<element_t>, element_t> 28{ 29 typedef LargeDataList<std::vector<element_t>, element_t> base; 30 31 public: 32 BandedDL(uint numConsumers); 33 BandedDL(BucketDL<element_t> &, uint numConsumers); 34 virtual ~BandedDL(); 35 36 int saveBand(); 37 void loadBand(uint); 38 int bandCount(); 39 40 /// loads the first band, next() will return the first element 41 void restart(); 42 43 virtual void insert(const element_t &); 44 virtual uint getIterator(); 45 virtual bool next(uint it, element_t *e); 46 virtual void endOfInput(); 47 using DataListImpl<std::vector<element_t>, element_t>::shrink; 48 uint64_t totalSize(); 49 bool next(uint it, element_t *e, bool *endOfBand); 50// bool get(const element_t &key, element_t *out); 51 52 protected: 53 54 private: 55 explicit BandedDL() { }; 56 explicit BandedDL(const BandedDL &) { }; 57 BandedDL & operator=(const BandedDL &) { }; 58 59 // vars to support the WSDL-like next() fcn 60 pthread_cond_t nextSetLoaded; 61 uint waitingConsumers; 62}; 63 64template<typename element_t> 65BandedDL<element_t>::BandedDL(uint nc) : base(nc) 66{ 67 pthread_cond_init(&nextSetLoaded, NULL); 68 waitingConsumers = 0; 69} 70 71 72template<typename element_t> 73BandedDL<element_t>::BandedDL(BucketDL<element_t> &b, uint nc) : base(nc) 74{ 75 uint i, it; 76 element_t e; 77 bool more; 78 79 pthread_cond_init(&nextSetLoaded, NULL); 80 waitingConsumers = 0; 81 82#ifdef PROFILE 83 struct timespec ts1, ts2; 84 clock_gettime(CLOCK_REALTIME, &ts1); 85#endif 86 87 for (i = 0; i < b.bucketCount(); i++) { 88 it = b.getIterator(i); 89 more = b.next(i, it, &e); 90 while (more) { 91 insert(e); 92 more = b.next(i, it, &e); 93 } 94 saveBand(); 95 } 96 endOfInput(); 97 98#ifdef PROFILE 99 clock_gettime(CLOCK_REALTIME, &ts2); 100 /* What should we do with this profile info? */ 101#endif 102 103} 104 105template<typename element_t> 106BandedDL<element_t>::~BandedDL() 107{ 108 pthread_cond_destroy(&nextSetLoaded); 109} 110 111template<typename element_t> 112int BandedDL<element_t>::saveBand() 113{ 114 int ret; 115 116 if (base::multipleProducers) 117 base::lock(); 118 sort(base::c->begin(), base::c->end()); 119 if (typeid(element_t) == typeid(ElementType) || 120 typeid(element_t) == typeid(DoubleElementType)) 121 ret = base::save_contiguous(); 122 else 123 ret = base::save(); 124 base::registerNewSet(); 125 if (base::multipleProducers) 126 base::unlock(); 127 128 return ret; 129} 130 131template<typename element_t> 132void BandedDL<element_t>::loadBand(uint band) 133{ 134 135 base::lock(); 136 if (typeid(element_t) == typeid(ElementType) || 137 typeid(element_t) == typeid(DoubleElementType)) 138 base::load_contiguous(band); 139 else 140 base::load(band); 141 if (waitingConsumers > 0) 142 pthread_cond_broadcast(&nextSetLoaded); 143 base::unlock(); 144} 145 146template<typename element_t> 147int BandedDL<element_t>::bandCount() 148{ 149 int ret; 150 151 base::lock(); 152 ret = base::setCount(); 153 base::unlock(); 154 return ret; 155} 156 157template<typename element_t> 158uint BandedDL<element_t>::getIterator() 159{ 160 uint ret; 161 162 base::lock(); 163 ret = base::getIterator(); 164 base::unlock(); 165 return ret; 166} 167 168template<typename element_t> 169void BandedDL<element_t>::endOfInput() 170{ 171 base::lock(); 172 sort(base::c->begin(), base::c->end()); 173 base::endOfInput(); 174 if (typeid(element_t) == typeid(ElementType) || 175 typeid(element_t) == typeid(DoubleElementType)) { 176 base::save_contiguous(); 177 base::load_contiguous(0); 178 } 179 else { 180 base::save(); 181 base::load(0); 182 } 183 base::unlock(); 184} 185 186template<typename element_t> 187bool BandedDL<element_t>::next(uint it, element_t *e) 188{ 189 190/* Note: this is the code for WSDL::next(). The more I think about it, 191the more I think they're the same thing. Not entirely sure yet though. */ 192 193 bool ret, locked = false; 194 uint nextSet; 195 196 if (base::numConsumers > 1 || base::phase == 0) { 197 locked = true; 198 base::lock(); 199 } 200 201 ret = base::next(it, e); 202 203 /* XXXPAT: insignificant race condition here. Technically, there's no 204 guarantee the caller will be wakened when the next set is loaded. It could 205 get skipped. It won't happen realistically, but it exists... */ 206 207 // signifies the caller is at the end of the loaded set, 208 // but there are more sets 209 if (ret == false && (base::loadedSet < base::setCount - 1)) { 210 211 nextSet = base::loadedSet + 1; 212 waitingConsumers++; 213 if (waitingConsumers < base::numConsumers) 214 while (nextSet != base::loadedSet) { 215// std::cout << "waiting on nextSetLoaded" << std::endl; 216 pthread_cond_wait(&nextSetLoaded, &(this->mutex)); 217 } 218 else { 219// std::cout << "loading set " << nextSet << std::endl; 220 if (typeid(element_t) == typeid(ElementType) || 221 typeid(element_t) == typeid(DoubleElementType)) 222 base::load_contiguous(nextSet); 223 else 224 base::load(nextSet); 225 pthread_cond_broadcast(&nextSetLoaded); 226 } 227 waitingConsumers--; 228 ret = base::next(it, e); 229 } 230 231 if (ret == false && ++base::consumersFinished == base::numConsumers) 232 base::shrink(); 233 if (locked) 234 base::unlock(); 235 236 return ret; 237 238} 239 240template<typename element_t> 241void BandedDL<element_t>::insert(const element_t &e) 242{ 243 if (base::multipleProducers) 244 base::lock(); 245 base::insert(e); 246 if (base::multipleProducers) 247 base::unlock(); 248} 249 250/* 251template<typename element_t> 252bool BandedDL<element_t>::get(const element_t &key, element_t *out) 253{ 254 typename std::set<element_t>::iterator it; 255 bool ret, locked = false; 256 257 if (base::numConsumers > 1 || base::phase == 0) { 258 locked = true; 259 base::lock(); 260 } 261 262 it = base::c->find(key); 263 if (it != base::c->end()) { 264 *out = *it; 265 ret = true; 266 } 267 else 268 ret = false; 269 270 if (locked) 271 base::unlock(); 272 273 return ret; 274} 275*/ 276 277template<typename element_t> 278void BandedDL<element_t>::restart() 279{ 280 base::lock(); 281// base::waitForConsumePhase(); 282 if (typeid(element_t) == typeid(ElementType) || 283 typeid(element_t) == typeid(DoubleElementType)) 284 base::load_contiguous(0); 285 else 286 base::load(0); 287 base::unlock(); 288} 289 290template<typename element_t> 291bool BandedDL<element_t>::next(uint it, element_t *e, bool *endOfBand) 292{ 293 bool ret, locked = false; 294 295 if (base::numConsumers > 1 || base::phase == 0) { 296 locked = true; 297 base::lock(); 298 } 299 300 base::waitForConsumePhase(); 301 ret = base::next(it, e); 302 if (ret) { 303 if (locked) 304 base::unlock(); 305 *endOfBand = false; 306 return ret; 307 } 308 else { 309 *endOfBand = true; 310 ret = base::loadedSet < (base::setCount() - 1); 311 if (locked) 312 base::unlock(); 313 return ret; 314 } 315} 316 317template<typename element_t> 318uint64_t BandedDL<element_t>::totalSize() 319{ 320//std::cout << "BandedDL: c.size() = " << base::c.size() << std::endl; return base::c.size(); 321 uint64_t ret; 322 323 base::lock(); 324 ret = base::totalSize(); 325 base::unlock(); 326 327 return ret; 328} 329 330} // namespace 331 332#endif 333 334