1 /*******************************************************************************
2  * thrill/common/reservoir_sampling.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2017 Timo Bingmann <tb@panthema.net>
7  * Copyright (C) 2017 Lorenz Hübschle-Schneider <lorenz@4z2.de>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
14 #define THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
15 
16 #include <thrill/common/logger.hpp>
17 
18 #include <cassert>
19 #include <cmath>
20 #include <random>
21 #include <vector>
22 
23 namespace thrill {
24 namespace common {
25 
26 /*!
27  * Implementation of reservoir sampling using Vitter's Algorithm R. The
28  * reservoir size is fixed, new items replace old ones such that all items in
29  * the stream are sampled with the same uniform probability.
30  */
31 template <typename Type, typename RNG = std::default_random_engine>
32 class ReservoirSampling
33 {
34 public:
35     //! initialize reservoir sampler
ReservoirSampling(size_t size,std::vector<Type> & samples,RNG & rng)36     ReservoirSampling(size_t size, std::vector<Type>& samples,
37                       RNG& rng)
38         : size_(size), samples_(samples), rng_(rng) {
39         samples_.reserve(size_);
40     }
41 
42     //! visit item, maybe add it to the sample.
add(const Type & item)43     void add(const Type& item) {
44         ++count_;
45         if (count_ <= size_) {
46             // if reservoir is too small then store item
47             samples_.emplace_back(item);
48         }
49         else {
50             // maybe replace an item
51             size_t x = rng_() % count_;
52             if (x < size_)
53                 samples_[x] = item;
54         }
55     }
56 
57     //! size of reservoir
size() const58     size_t size() const { return size_; }
59 
60     //! number of items seen
count() const61     size_t count() const { return count_; }
62 
63     //! access to samples
samples() const64     const std::vector<Type>& samples() const { return samples_; }
65 
66 private:
67     //! size of reservoir
68     size_t size_;
69     //! number of items seen
70     size_t count_ = 0;
71     //! reservoir
72     std::vector<Type>& samples_;
73     //! source of randomness
74     RNG& rng_;
75 };
76 
77 /*!
78  * Fast exact implementation of reservoir sampling using skip values. Algorithm
79  * L from Kim-Hung Li: Reservoir Sampling Algorithms of Time Complexity
80  * O(n(1+log(N/n))), ACM TOMS 1994. The reservoir size is fixed, new items
81  * replace old ones such that all items in the stream are sampled with the same
82  * uniform probability.
83  */
84 template <typename Type, typename RNG = std::default_random_engine>
85 class ReservoirSamplingFast
86 {
87 public:
88     //! initialize reservoir sampler
ReservoirSamplingFast(size_t size,std::vector<Type> & samples,RNG & rng)89     ReservoirSamplingFast(size_t size, std::vector<Type>& samples,
90                           RNG& rng)
91         : size_(size), samples_(samples), rng_(rng) {
92         samples_.reserve(size_);
93         W_ = std::exp(std::log(uniform(rng_)) / size);
94         gap_ = std::floor(std::log(uniform(rng_)) / std::log(1 - W_));
95     }
96 
97     //! visit item, maybe add it to the sample.
add(const Type & item)98     void add(const Type& item) {
99         ++count_;
100         if (count_ <= 4 * size_) {
101             if (count_ <= size_) {
102                 // if reservoir is too small then store item
103                 samples_.emplace_back(item);
104             }
105             else {
106                 // use Vitter's algorithm for small count_
107                 size_t x = uniform(rng_) * count_;
108                 if (x < size_)
109                     samples_[x] = item;
110 
111                 // when count_ reaches 4 * size_ switch to gap algorithm
112                 if (count_ == 4 * size_)
113                     calc_next_gap();
114             }
115         }
116         else if (gap_ == 0) {
117             // gap elapsed, this item is a sample
118             size_t x = uniform(rng_) * size_;
119             samples_[x] = item;
120 
121             // pick gap size: the next gap_ items are not samples
122             calc_next_gap();
123         }
124         else {
125             --gap_;
126         }
127     }
128 
129     //! size of reservoir
size() const130     size_t size() const { return size_; }
131 
132     //! number of items seen
count() const133     size_t count() const { return count_; }
134 
135     //! access to samples
samples() const136     const std::vector<Type>& samples() const { return samples_; }
137 
138 private:
139     //! size of reservoir
140     size_t size_;
141     //! number of items seen
142     size_t count_ = 0;
143     //! number of items to skip until next sample
144     size_t gap_;
145     //! random value for gap calculation, distribution: largest value in a
146     //! sample of Uniform(0, old_W) of size size_, where old_W is 1 initially
147     double W_;
148     //! reservoir
149     std::vector<Type>& samples_;
150     //! source of randomness
151     RNG& rng_;
152     //! uniform [0.0, 1.0) distribution
153     std::uniform_real_distribution<double> uniform;
154 
155     //! draw gap size from geometric distribution with p = size_ / count_
calc_next_gap()156     void calc_next_gap() {
157         W_ *= std::exp(std::log(uniform(rng_)) / size_);
158         gap_ = std::log(uniform(rng_)) / std::log(1 - W_);
159     }
160 };
161 
162 /*!
163  * Implementation of a fast approximation of adaptive reservoir sampling using
164  * http://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
165  * The reservoir size grows logarithmically with the number given to the
166  * sampler, new items replace old ones such that all items in the stream are
167  * sampled with the same approximately uniform probability.
168  *
169  * Growing of the reservoir is implemented trivially by selecting any current
170  * item to expand the array. This works well enough if growing steps become
171  * rarer for larger streams.
172  */
173 template <typename Type, typename RNG = std::default_random_engine>
174 class ReservoirSamplingGrow
175 {
176     static constexpr bool debug = false;
177 
178 public:
179     //! initialize reservoir sampler
ReservoirSamplingGrow(std::vector<Type> & samples,RNG & rng,double desired_imbalance=0.05)180     ReservoirSamplingGrow(std::vector<Type>& samples,
181                           RNG& rng,
182                           double desired_imbalance = 0.05)
183         : samples_(samples), rng_(rng), desired_imbalance_(desired_imbalance)
184     { }
185 
186     //! visit item, maybe add it to the sample.
add(const Type & item)187     void add(const Type& item) {
188         sLOG0 << "ReservoirSamplingGrow::add"
189               << "count_" << count_
190               << "size_" << size_
191               << "gap_" << gap_;
192 
193         ++count_;
194 
195         // check if reservoir should be resize, this is equivalent to the check
196         // if (size_ != calc_sample_size())
197         if (steps_to_resize == 0)
198         {
199             // calculate new reservoir size
200             size_t target_size = calc_sample_size();
201             steps_to_resize = calc_steps_to_next_resize();
202 
203             sLOG << "steps_to_resize" << steps_to_resize
204                  << "target_size" << target_size
205                  << "size_" << size_ << "count_" << count_
206                  << "expanded_by" << target_size - size_;
207             assert(target_size >= size_);
208 
209             // expand reservoir, sample new items from existing and new one
210             while (size_ < target_size) {
211                 size_t x = rng_() % (size_ + 1);
212                 if (x != size_)
213                     samples_.emplace_back(samples_[x]);
214                 else
215                     samples_.emplace_back(item);
216                 ++size_;
217             }
218         }
219         else {
220             --steps_to_resize;
221         }
222 
223         assert(samples_.size() == size_);
224         if (debug && size_ != calc_sample_size())
225             LOG0 << "delta: " << (int)size_ - (int)calc_sample_size()
226                  << " count_ " << count_;
227 
228         if (count_ <= 4 * size_) {
229             if (count_ <= size_) {
230                 // fill slots initially in order
231                 samples_[count_ - 1] = item;
232             }
233             else {
234                 // replace items using Vitter's Algorithm R
235                 size_t x = rng_() % count_;
236                 if (x < size_)
237                     samples_[x] = item;
238 
239                 // when count_ reaches 4 * size_ switch to gap algorithm
240                 if (count_ == 4 * size_)
241                     gap_ = calc_next_gap();
242             }
243         }
244         else if (gap_ == 0) {
245             // gap elapsed, this item is a sample
246             size_t x = rng_() % size_;
247             samples_[x] = item;
248 
249             // pick gap size: the next gap_ items are not samples
250             gap_ = calc_next_gap();
251         }
252         else {
253             --gap_;
254         }
255     }
256 
257     //! size of reservoir
size() const258     size_t size() const { return size_; }
259 
260     //! number of items seen
count() const261     size_t count() const { return count_; }
262 
263     //! access to samples
samples() const264     const std::vector<Type>& samples() const { return samples_; }
265 
266     //! desired imbalance
desired_imbalance() const267     double desired_imbalance() const { return desired_imbalance_; }
268 
269     //! calculate desired sample size
calc_sample_size(size_t count) const270     size_t calc_sample_size(size_t count) const {
271         size_t s = static_cast<size_t>(
272             std::log2(count)
273             * (1.0 / (desired_imbalance_ * desired_imbalance_)));
274         return std::max(s, size_t(1));
275     }
276 
277     //! calculate desired sample size
calc_sample_size() const278     size_t calc_sample_size() const {
279         return calc_sample_size(count_);
280     }
281 
282 private:
283     //! size of reservoir
284     size_t size_ = 0;
285     //! number of items seen
286     size_t count_ = 0;
287     //! items to skip until next sample (used in gap algorithm)
288     size_t gap_ = 0;
289     //! items to process prior to checking for reservoir resize
290     size_t steps_to_resize = 0;
291     //! reservoir
292     std::vector<Type>& samples_;
293     //! source of randomness
294     RNG& rng_;
295 
296     //! epsilon imbalance: this reservoir sampling works well for the range 0.5
297     //! to 0.01. Imbalance 0.5 results in 79 samples for 1 million items, 0.1 in
298     //! 1992, 0.05 in 6643, 0.02 in 49828, and 0.01 in 199315.
299     const double desired_imbalance_;
300 
301     //! draw gap size from geometric distribution with p = size_ / count_
calc_next_gap() const302     size_t calc_next_gap() const {
303         if (0) {
304             // this is slower than the simpler approximation below
305             return std::geometric_distribution<size_t>(
306                 static_cast<double>(size_) / static_cast<double>(count_))(rng_);
307         }
308         else {
309             // generate a geometric distributed variant with p = size_ / count_
310             double p = static_cast<double>(size_) / static_cast<double>(count_);
311             double u = std::uniform_real_distribution<double>()(rng_);
312             return std::floor(std::log(u) / std::log(1 - p));
313         }
314     }
315 
316     //! calculate number of items/steps to process without checking for sample
317     //! resize
calc_steps_to_next_resize() const318     size_t calc_steps_to_next_resize() const {
319         return std::floor(
320             count_ * (
321                 std::pow(2.0, desired_imbalance_ * desired_imbalance_) - 1.0));
322     }
323 };
324 
325 } // namespace common
326 } // namespace thrill
327 
328 #endif // !THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
329 
330 /******************************************************************************/
331