1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/experimental/QuotientMultiSet.h>
18 
19 #include <folly/Format.h>
20 #include <folly/Portability.h>
21 #include <folly/experimental/Bits.h>
22 #include <folly/experimental/Select64.h>
23 #include <folly/lang/Bits.h>
24 #include <folly/lang/SafeAssert.h>
25 
26 #include <glog/logging.h>
27 
28 #if FOLLY_QUOTIENT_MULTI_SET_SUPPORTED
29 
30 namespace folly {
31 
32 namespace qms_detail {
33 
34 /**
35  * Reference: Faster Remainder by Direct Computation: Applications to Compilers
36  * and Software Libraries, Software: Practice and Experience 49 (6), 2019.
37  */
getInverse(uint64_t divisor)38 FOLLY_ALWAYS_INLINE UInt64InverseType getInverse(uint64_t divisor) {
39   UInt64InverseType fraction = UInt64InverseType(-1);
40   fraction /= divisor;
41   fraction += 1;
42   return fraction;
43 }
44 
45 FOLLY_ALWAYS_INLINE uint64_t
mul128(UInt64InverseType lowbits,uint64_t divisor)46 mul128(UInt64InverseType lowbits, uint64_t divisor) {
47   UInt64InverseType bottomHalf =
48       (lowbits & UINT64_C(0xFFFFFFFFFFFFFFFF)) * divisor;
49   bottomHalf >>= 64;
50   UInt64InverseType topHalf = (lowbits >> 64) * divisor;
51   UInt64InverseType bothHalves = bottomHalf + topHalf;
52   bothHalves >>= 64;
53   return static_cast<uint64_t>(bothHalves);
54 }
55 
getQuotientAndRemainder(uint64_t dividend,uint64_t divisor,UInt64InverseType inverse)56 FOLLY_ALWAYS_INLINE std::pair<uint64_t, uint64_t> getQuotientAndRemainder(
57     uint64_t dividend, uint64_t divisor, UInt64InverseType inverse) {
58   if (FOLLY_UNLIKELY(divisor == 1)) {
59     return {dividend, 0};
60   }
61   auto quotient = mul128(inverse, dividend);
62   auto remainder = dividend - quotient * divisor;
63   DCHECK_LT(remainder, divisor);
64   return {quotient, remainder};
65 }
66 
67 // Max value for given bits.
maxValue(uint32_t nbits)68 FOLLY_ALWAYS_INLINE uint64_t maxValue(uint32_t nbits) {
69   return nbits == 64 ? std::numeric_limits<uint64_t>::max()
70                      : (uint64_t(1) << nbits) - 1;
71 }
72 
73 } // namespace qms_detail
74 
75 template <class Instructions>
76 struct QuotientMultiSet<Instructions>::Metadata {
77   // Total number of blocks.
78   uint64_t numBlocks;
79   uint64_t numKeys;
80   uint64_t divisor;
81   uint8_t keyBits;
82   uint8_t remainderBits;
83 
84   std::string debugString() const {
85     return sformat(
86         "Number of blocks: {}\n"
87         "Number of elements: {}\n"
88         "Divisor: {}\n"
89         "Key bits: {}\n"
90         "Remainder bits: {}",
91         numBlocks,
92         numKeys,
93         divisor,
94         keyBits,
95         remainderBits);
96   }
97 } FOLLY_PACK_ATTR;
98 
99 template <class Instructions>
100 struct QuotientMultiSet<Instructions>::Block {
101   static const Block* get(const char* data) {
102     return reinterpret_cast<const Block*>(data);
103   }
104 
105   static BlockPtr make(size_t remainderBits) {
106     auto ptr = reinterpret_cast<Block*>(calloc(blockSize(remainderBits), 1));
107     return {ptr, free};
108   }
109 
110   uint64_t payload;
111   uint64_t occupieds;
112   uint64_t offset;
113   uint64_t runends;
114   char remainders[0];
115 
116   static uint64_t blockSize(size_t remainderBits) {
117     return sizeof(Block) + remainderBits * 8;
118   }
119 
120   FOLLY_ALWAYS_INLINE bool isOccupied(size_t offsetInBlock) const {
121     return ((occupieds >> offsetInBlock) & uint64_t(1)) != 0;
122   }
123 
124   FOLLY_ALWAYS_INLINE bool isRunend(size_t offsetInBlock) const {
125     return ((runends >> offsetInBlock) & uint64_t(1)) != 0;
126   }
127 
128   FOLLY_ALWAYS_INLINE uint64_t getRemainder(
129       size_t offsetInBlock, size_t remainderBits, size_t remainderMask) const {
130     DCHECK_LE(remainderBits, 56);
131     const size_t bitPos = offsetInBlock * remainderBits;
132     const uint64_t remainderWord =
133         loadUnaligned<uint64_t>(remainders + (bitPos / 8));
134     return (remainderWord >> (bitPos % 8)) & remainderMask;
135   }
136 
137   void setOccupied(size_t offsetInBlock) {
138     occupieds |= uint64_t(1) << offsetInBlock;
139   }
140 
141   void setRunend(size_t offsetInBlock) {
142     runends |= uint64_t(1) << offsetInBlock;
143   }
144 
145   void setRemainder(
146       size_t offsetInBlock, size_t remainderBits, uint64_t remainder) {
147     DCHECK_LT(offsetInBlock, kBlockSize);
148     if (FOLLY_UNLIKELY(remainderBits == 0)) {
149       return;
150     }
151     Bits<uint64_t>::set(
152         reinterpret_cast<uint64_t*>(remainders),
153         offsetInBlock * remainderBits,
154         remainderBits,
155         remainder);
156   }
157 } FOLLY_PACK_ATTR;
158 
159 template <class Instructions>
160 QuotientMultiSet<Instructions>::QuotientMultiSet(StringPiece data) {
161   static_assert(
162       kIsLittleEndian, "QuotientMultiSet requires little endianness.");
163   StringPiece sp = data;
164   CHECK_GE(sp.size(), sizeof(Metadata));
165   sp.advance(sp.size() - sizeof(Metadata));
166   metadata_ = reinterpret_cast<const Metadata*>(sp.data());
167   VLOG(2) << "Metadata: " << metadata_->debugString();
168 
169   numBlocks_ = metadata_->numBlocks;
170   numSlots_ = numBlocks_ * kBlockSize;
171   divisor_ = metadata_->divisor;
172   fraction_ = qms_detail::getInverse(divisor_);
173   keyBits_ = metadata_->keyBits;
174   maxKey_ = qms_detail::maxValue(keyBits_);
175   remainderBits_ = metadata_->remainderBits;
176   remainderMask_ = qms_detail::maxValue(remainderBits_);
177   blockSize_ = Block::blockSize(remainderBits_);
178 
179   CHECK_EQ(data.size(), numBlocks_ * blockSize_ + sizeof(Metadata));
180   data_ = data.data();
181 }
182 
183 template <class Instructions>
184 auto QuotientMultiSet<Instructions>::equalRange(uint64_t key) const
185     -> SlotRange {
186   if (key > maxKey_) {
187     return {0, 0};
188   }
189   const auto qr = qms_detail::getQuotientAndRemainder(key, divisor_, fraction_);
190   const auto& quotient = qr.first;
191   const auto& remainder = qr.second;
192   const size_t blockIndex = quotient / kBlockSize;
193   const size_t offsetInBlock = quotient % kBlockSize;
194 
195   if (FOLLY_UNLIKELY(blockIndex >= numBlocks_)) {
196     return {0, 0};
197   }
198 
199   const auto* occBlock = getBlock(blockIndex);
200   __builtin_prefetch(reinterpret_cast<const char*>(&occBlock->occupieds) + 64);
201   const auto firstRunend = occBlock->offset;
202   if (!occBlock->isOccupied(offsetInBlock)) {
203     // Return a position that depends on the contents of the block so
204     // we can create a dependency in benchmarks.
205     return {firstRunend, firstRunend};
206   }
207 
208   // Look for the right runend for the given key.
209   const uint64_t occupiedRank = Instructions::popcount(
210       Instructions::bzhi(occBlock->occupieds, offsetInBlock));
211   auto runend = findRunend(occupiedRank, firstRunend);
212   auto& slot = runend.first;
213   auto& block = runend.second;
214 
215   // Iterates over the run backwards to find the slots whose remainder
216   // matches the key.
217   SlotRange range = {slot + 1, slot + 1};
218   while (true) {
219     uint64_t slotRemainder =
220         block->getRemainder(slot % kBlockSize, remainderBits_, remainderMask_);
221 
222     if (slotRemainder > remainder) {
223       range.begin = slot;
224       range.end = slot;
225     } else if (slotRemainder == remainder) {
226       range.begin = slot;
227     } else {
228       break;
229     }
230 
231     if (FOLLY_UNLIKELY(slot % kBlockSize == 0)) {
232       // Reached block start and the run starts from a prev block.
233       size_t slotBlockIndex = slot / kBlockSize;
234       if (slotBlockIndex > blockIndex) {
235         block = getBlock(slotBlockIndex - 1);
236       } else {
237         break;
238       }
239     }
240 
241     --slot;
242     // Encounters the previous run.
243     if (block->isRunend(slot % kBlockSize)) {
244       break;
245     }
246   }
247 
248   return range;
249 }
250 
251 template <class Instructions>
252 auto QuotientMultiSet<Instructions>::findRunend(
253     uint64_t occupiedRank, uint64_t firstRunend) const
254     -> std::pair<uint64_t, const Block*> {
255   // Look for the right runend.
256   size_t slotBlockIndex = firstRunend / kBlockSize;
257   auto block = getBlock(slotBlockIndex);
258   uint64_t runendWord =
259       block->runends & (uint64_t(-1) << (firstRunend % kBlockSize));
260 
261   while (true) {
262     DCHECK_LE(slotBlockIndex, numBlocks_);
263 
264     const size_t numRuns = Instructions::popcount(runendWord);
265     if (FOLLY_LIKELY(numRuns > occupiedRank)) {
266       break;
267     }
268     occupiedRank -= numRuns;
269     ++slotBlockIndex;
270     block = getBlock(slotBlockIndex);
271     runendWord = block->runends;
272   }
273 
274   return {
275       slotBlockIndex * kBlockSize +
276           select64<Instructions>(runendWord, occupiedRank),
277       block};
278 }
279 
280 template <class Instructions>
281 uint64_t QuotientMultiSet<Instructions>::getBlockPayload(
282     uint64_t blockIndex) const {
283   DCHECK_LT(blockIndex, numBlocks_);
284   return getBlock(blockIndex)->payload;
285 }
286 
287 template <class Instructions>
288 QuotientMultiSet<Instructions>::Iterator::Iterator(
289     const QuotientMultiSet<Instructions>* qms)
290     : qms_(qms) {}
291 
292 template <class Instructions>
293 bool QuotientMultiSet<Instructions>::Iterator::next() {
294   if (pos_ == size_t(-1) ||
295       qms_->getBlock(pos_ / kBlockSize)->isRunend(pos_ % kBlockSize)) {
296     // Move to start of next run.
297     if (!nextOccupied()) {
298       return setEnd();
299     }
300 
301     // Next run either starts at pos + 1 or the start of block
302     // specified by occupied slot.
303     pos_ = std::max<uint64_t>(pos_ + 1, occBlockIndex_ * kBlockSize);
304   } else {
305     // Move to next slot since a run must be contiguous.
306     pos_++;
307   }
308 
309   const Block* block = qms_->getBlock(pos_ / kBlockSize);
310   uint64_t quotient =
311       (occBlockIndex_ * kBlockSize + occOffsetInBlock_) * qms_->divisor_;
312   key_ = quotient +
313       block->getRemainder(
314           pos_ % kBlockSize, qms_->remainderBits_, qms_->remainderMask_);
315 
316   DCHECK_LT(pos_, qms_->numBlocks_ * kBlockSize);
317   return true;
318 }
319 
320 template <class Instructions>
321 bool QuotientMultiSet<Instructions>::Iterator::skipTo(uint64_t key) {
322   if (key > qms_->maxKey_) {
323     return setEnd();
324   }
325   const auto qr =
326       qms_detail::getQuotientAndRemainder(key, qms_->divisor_, qms_->fraction_);
327   const auto& quotient = qr.first;
328   occBlockIndex_ = quotient / kBlockSize;
329   occOffsetInBlock_ = quotient % kBlockSize;
330 
331   if (FOLLY_UNLIKELY(occBlockIndex_ >= qms_->numBlocks_)) {
332     return setEnd();
333   }
334 
335   occBlock_ = qms_->getBlock(occBlockIndex_);
336   occWord_ = occBlock_->occupieds & (uint64_t(-1) << occOffsetInBlock_);
337   if (!nextOccupied()) {
338     return setEnd();
339   }
340 
341   // Search for the next runend.
342   uint64_t occupiedRank = Instructions::popcount(
343       Instructions::bzhi(occBlock_->occupieds, occOffsetInBlock_));
344   auto runend = qms_->findRunend(occupiedRank, occBlock_->offset);
345   auto& slot = runend.first;
346   auto& block = runend.second;
347   uint64_t slotBlockIndex = slot / kBlockSize;
348   uint64_t slotOffsetInBlock = slot % kBlockSize;
349   pos_ = slot;
350 
351   uint64_t nextQuotient =
352       (occBlockIndex_ * kBlockSize + occOffsetInBlock_) * qms_->divisor_;
353   uint64_t nextRemainder = block->getRemainder(
354       slotOffsetInBlock, qms_->remainderBits_, qms_->remainderMask_);
355 
356   if (nextQuotient + nextRemainder < key) {
357     // Lower bound element is at the start of next run.
358     return next();
359   }
360 
361   // Iterate over the run backwards to find the first key that is larger than
362   // or equal to the given key.
363   while (true) {
364     uint64_t slotRemainder = block->getRemainder(
365         slotOffsetInBlock, qms_->remainderBits_, qms_->remainderMask_);
366     if (nextQuotient + slotRemainder < key) {
367       break;
368     }
369     pos_ = slot;
370     nextRemainder = slotRemainder;
371     if (FOLLY_UNLIKELY(slotOffsetInBlock == 0)) {
372       // Reached block start and the run starts from a prev block.
373       if (slotBlockIndex > occBlockIndex_) {
374         --slotBlockIndex;
375         block = qms_->getBlock(slotBlockIndex);
376         slotOffsetInBlock = kBlockSize;
377       } else {
378         break;
379       }
380     }
381     --slot;
382     --slotOffsetInBlock;
383 
384     // Encounters the previous run.
385     if (block->isRunend(slotOffsetInBlock)) {
386       break;
387     }
388   }
389 
390   key_ = nextQuotient + nextRemainder;
391   return true;
392 }
393 
394 template <class Instructions>
395 bool QuotientMultiSet<Instructions>::Iterator::nextOccupied() {
396   while (FOLLY_UNLIKELY(occWord_ == 0)) {
397     if (FOLLY_UNLIKELY(++occBlockIndex_ >= qms_->numBlocks_)) {
398       return false;
399     }
400     occBlock_ = qms_->getBlock(occBlockIndex_);
401     occWord_ = occBlock_->occupieds;
402   }
403 
404   occOffsetInBlock_ = Instructions::ctz(occWord_);
405   occWord_ = Instructions::blsr(occWord_);
406   return true;
407 }
408 
409 } // namespace folly
410 
411 #endif // FOLLY_QUOTIENT_MULTI_SET_SUPPORTED
412