1 /******************************************************************************* 2 * thrill/common/reservoir_sampling.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2017 Timo Bingmann <tb@panthema.net> 7 * Copyright (C) 2017 Lorenz Hübschle-Schneider <lorenz@4z2.de> 8 * 9 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 10 ******************************************************************************/ 11 12 #pragma once 13 #ifndef THRILL_COMMON_RESERVOIR_SAMPLING_HEADER 14 #define THRILL_COMMON_RESERVOIR_SAMPLING_HEADER 15 16 #include <thrill/common/logger.hpp> 17 18 #include <cassert> 19 #include <cmath> 20 #include <random> 21 #include <vector> 22 23 namespace thrill { 24 namespace common { 25 26 /*! 27 * Implementation of reservoir sampling using Vitter's Algorithm R. The 28 * reservoir size is fixed, new items replace old ones such that all items in 29 * the stream are sampled with the same uniform probability. 30 */ 31 template <typename Type, typename RNG = std::default_random_engine> 32 class ReservoirSampling 33 { 34 public: 35 //! initialize reservoir sampler ReservoirSampling(size_t size,std::vector<Type> & samples,RNG & rng)36 ReservoirSampling(size_t size, std::vector<Type>& samples, 37 RNG& rng) 38 : size_(size), samples_(samples), rng_(rng) { 39 samples_.reserve(size_); 40 } 41 42 //! visit item, maybe add it to the sample. add(const Type & item)43 void add(const Type& item) { 44 ++count_; 45 if (count_ <= size_) { 46 // if reservoir is too small then store item 47 samples_.emplace_back(item); 48 } 49 else { 50 // maybe replace an item 51 size_t x = rng_() % count_; 52 if (x < size_) 53 samples_[x] = item; 54 } 55 } 56 57 //! size of reservoir size() const58 size_t size() const { return size_; } 59 60 //! number of items seen count() const61 size_t count() const { return count_; } 62 63 //! access to samples samples() const64 const std::vector<Type>& samples() const { return samples_; } 65 66 private: 67 //! size of reservoir 68 size_t size_; 69 //! number of items seen 70 size_t count_ = 0; 71 //! reservoir 72 std::vector<Type>& samples_; 73 //! source of randomness 74 RNG& rng_; 75 }; 76 77 /*! 78 * Fast exact implementation of reservoir sampling using skip values. Algorithm 79 * L from Kim-Hung Li: Reservoir Sampling Algorithms of Time Complexity 80 * O(n(1+log(N/n))), ACM TOMS 1994. The reservoir size is fixed, new items 81 * replace old ones such that all items in the stream are sampled with the same 82 * uniform probability. 83 */ 84 template <typename Type, typename RNG = std::default_random_engine> 85 class ReservoirSamplingFast 86 { 87 public: 88 //! initialize reservoir sampler ReservoirSamplingFast(size_t size,std::vector<Type> & samples,RNG & rng)89 ReservoirSamplingFast(size_t size, std::vector<Type>& samples, 90 RNG& rng) 91 : size_(size), samples_(samples), rng_(rng) { 92 samples_.reserve(size_); 93 W_ = std::exp(std::log(uniform(rng_)) / size); 94 gap_ = std::floor(std::log(uniform(rng_)) / std::log(1 - W_)); 95 } 96 97 //! visit item, maybe add it to the sample. add(const Type & item)98 void add(const Type& item) { 99 ++count_; 100 if (count_ <= 4 * size_) { 101 if (count_ <= size_) { 102 // if reservoir is too small then store item 103 samples_.emplace_back(item); 104 } 105 else { 106 // use Vitter's algorithm for small count_ 107 size_t x = uniform(rng_) * count_; 108 if (x < size_) 109 samples_[x] = item; 110 111 // when count_ reaches 4 * size_ switch to gap algorithm 112 if (count_ == 4 * size_) 113 calc_next_gap(); 114 } 115 } 116 else if (gap_ == 0) { 117 // gap elapsed, this item is a sample 118 size_t x = uniform(rng_) * size_; 119 samples_[x] = item; 120 121 // pick gap size: the next gap_ items are not samples 122 calc_next_gap(); 123 } 124 else { 125 --gap_; 126 } 127 } 128 129 //! size of reservoir size() const130 size_t size() const { return size_; } 131 132 //! number of items seen count() const133 size_t count() const { return count_; } 134 135 //! access to samples samples() const136 const std::vector<Type>& samples() const { return samples_; } 137 138 private: 139 //! size of reservoir 140 size_t size_; 141 //! number of items seen 142 size_t count_ = 0; 143 //! number of items to skip until next sample 144 size_t gap_; 145 //! random value for gap calculation, distribution: largest value in a 146 //! sample of Uniform(0, old_W) of size size_, where old_W is 1 initially 147 double W_; 148 //! reservoir 149 std::vector<Type>& samples_; 150 //! source of randomness 151 RNG& rng_; 152 //! uniform [0.0, 1.0) distribution 153 std::uniform_real_distribution<double> uniform; 154 155 //! draw gap size from geometric distribution with p = size_ / count_ calc_next_gap()156 void calc_next_gap() { 157 W_ *= std::exp(std::log(uniform(rng_)) / size_); 158 gap_ = std::log(uniform(rng_)) / std::log(1 - W_); 159 } 160 }; 161 162 /*! 163 * Implementation of a fast approximation of adaptive reservoir sampling using 164 * http://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/ 165 * The reservoir size grows logarithmically with the number given to the 166 * sampler, new items replace old ones such that all items in the stream are 167 * sampled with the same approximately uniform probability. 168 * 169 * Growing of the reservoir is implemented trivially by selecting any current 170 * item to expand the array. This works well enough if growing steps become 171 * rarer for larger streams. 172 */ 173 template <typename Type, typename RNG = std::default_random_engine> 174 class ReservoirSamplingGrow 175 { 176 static constexpr bool debug = false; 177 178 public: 179 //! initialize reservoir sampler ReservoirSamplingGrow(std::vector<Type> & samples,RNG & rng,double desired_imbalance=0.05)180 ReservoirSamplingGrow(std::vector<Type>& samples, 181 RNG& rng, 182 double desired_imbalance = 0.05) 183 : samples_(samples), rng_(rng), desired_imbalance_(desired_imbalance) 184 { } 185 186 //! visit item, maybe add it to the sample. add(const Type & item)187 void add(const Type& item) { 188 sLOG0 << "ReservoirSamplingGrow::add" 189 << "count_" << count_ 190 << "size_" << size_ 191 << "gap_" << gap_; 192 193 ++count_; 194 195 // check if reservoir should be resize, this is equivalent to the check 196 // if (size_ != calc_sample_size()) 197 if (steps_to_resize == 0) 198 { 199 // calculate new reservoir size 200 size_t target_size = calc_sample_size(); 201 steps_to_resize = calc_steps_to_next_resize(); 202 203 sLOG << "steps_to_resize" << steps_to_resize 204 << "target_size" << target_size 205 << "size_" << size_ << "count_" << count_ 206 << "expanded_by" << target_size - size_; 207 assert(target_size >= size_); 208 209 // expand reservoir, sample new items from existing and new one 210 while (size_ < target_size) { 211 size_t x = rng_() % (size_ + 1); 212 if (x != size_) 213 samples_.emplace_back(samples_[x]); 214 else 215 samples_.emplace_back(item); 216 ++size_; 217 } 218 } 219 else { 220 --steps_to_resize; 221 } 222 223 assert(samples_.size() == size_); 224 if (debug && size_ != calc_sample_size()) 225 LOG0 << "delta: " << (int)size_ - (int)calc_sample_size() 226 << " count_ " << count_; 227 228 if (count_ <= 4 * size_) { 229 if (count_ <= size_) { 230 // fill slots initially in order 231 samples_[count_ - 1] = item; 232 } 233 else { 234 // replace items using Vitter's Algorithm R 235 size_t x = rng_() % count_; 236 if (x < size_) 237 samples_[x] = item; 238 239 // when count_ reaches 4 * size_ switch to gap algorithm 240 if (count_ == 4 * size_) 241 gap_ = calc_next_gap(); 242 } 243 } 244 else if (gap_ == 0) { 245 // gap elapsed, this item is a sample 246 size_t x = rng_() % size_; 247 samples_[x] = item; 248 249 // pick gap size: the next gap_ items are not samples 250 gap_ = calc_next_gap(); 251 } 252 else { 253 --gap_; 254 } 255 } 256 257 //! size of reservoir size() const258 size_t size() const { return size_; } 259 260 //! number of items seen count() const261 size_t count() const { return count_; } 262 263 //! access to samples samples() const264 const std::vector<Type>& samples() const { return samples_; } 265 266 //! desired imbalance desired_imbalance() const267 double desired_imbalance() const { return desired_imbalance_; } 268 269 //! calculate desired sample size calc_sample_size(size_t count) const270 size_t calc_sample_size(size_t count) const { 271 size_t s = static_cast<size_t>( 272 std::log2(count) 273 * (1.0 / (desired_imbalance_ * desired_imbalance_))); 274 return std::max(s, size_t(1)); 275 } 276 277 //! calculate desired sample size calc_sample_size() const278 size_t calc_sample_size() const { 279 return calc_sample_size(count_); 280 } 281 282 private: 283 //! size of reservoir 284 size_t size_ = 0; 285 //! number of items seen 286 size_t count_ = 0; 287 //! items to skip until next sample (used in gap algorithm) 288 size_t gap_ = 0; 289 //! items to process prior to checking for reservoir resize 290 size_t steps_to_resize = 0; 291 //! reservoir 292 std::vector<Type>& samples_; 293 //! source of randomness 294 RNG& rng_; 295 296 //! epsilon imbalance: this reservoir sampling works well for the range 0.5 297 //! to 0.01. Imbalance 0.5 results in 79 samples for 1 million items, 0.1 in 298 //! 1992, 0.05 in 6643, 0.02 in 49828, and 0.01 in 199315. 299 const double desired_imbalance_; 300 301 //! draw gap size from geometric distribution with p = size_ / count_ calc_next_gap() const302 size_t calc_next_gap() const { 303 if (0) { 304 // this is slower than the simpler approximation below 305 return std::geometric_distribution<size_t>( 306 static_cast<double>(size_) / static_cast<double>(count_))(rng_); 307 } 308 else { 309 // generate a geometric distributed variant with p = size_ / count_ 310 double p = static_cast<double>(size_) / static_cast<double>(count_); 311 double u = std::uniform_real_distribution<double>()(rng_); 312 return std::floor(std::log(u) / std::log(1 - p)); 313 } 314 } 315 316 //! calculate number of items/steps to process without checking for sample 317 //! resize calc_steps_to_next_resize() const318 size_t calc_steps_to_next_resize() const { 319 return std::floor( 320 count_ * ( 321 std::pow(2.0, desired_imbalance_ * desired_imbalance_) - 1.0)); 322 } 323 }; 324 325 } // namespace common 326 } // namespace thrill 327 328 #endif // !THRILL_COMMON_RESERVOIR_SAMPLING_HEADER 329 330 /******************************************************************************/ 331