1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/experimental/QuotientMultiSet.h>
18
19 #include <folly/Format.h>
20 #include <folly/Portability.h>
21 #include <folly/experimental/Bits.h>
22 #include <folly/experimental/Select64.h>
23 #include <folly/lang/Bits.h>
24 #include <folly/lang/SafeAssert.h>
25
26 #include <glog/logging.h>
27
28 #if FOLLY_QUOTIENT_MULTI_SET_SUPPORTED
29
30 namespace folly {
31
32 namespace qms_detail {
33
34 /**
35 * Reference: Faster Remainder by Direct Computation: Applications to Compilers
36 * and Software Libraries, Software: Practice and Experience 49 (6), 2019.
37 */
getInverse(uint64_t divisor)38 FOLLY_ALWAYS_INLINE UInt64InverseType getInverse(uint64_t divisor) {
39 UInt64InverseType fraction = UInt64InverseType(-1);
40 fraction /= divisor;
41 fraction += 1;
42 return fraction;
43 }
44
45 FOLLY_ALWAYS_INLINE uint64_t
mul128(UInt64InverseType lowbits,uint64_t divisor)46 mul128(UInt64InverseType lowbits, uint64_t divisor) {
47 UInt64InverseType bottomHalf =
48 (lowbits & UINT64_C(0xFFFFFFFFFFFFFFFF)) * divisor;
49 bottomHalf >>= 64;
50 UInt64InverseType topHalf = (lowbits >> 64) * divisor;
51 UInt64InverseType bothHalves = bottomHalf + topHalf;
52 bothHalves >>= 64;
53 return static_cast<uint64_t>(bothHalves);
54 }
55
getQuotientAndRemainder(uint64_t dividend,uint64_t divisor,UInt64InverseType inverse)56 FOLLY_ALWAYS_INLINE std::pair<uint64_t, uint64_t> getQuotientAndRemainder(
57 uint64_t dividend, uint64_t divisor, UInt64InverseType inverse) {
58 if (FOLLY_UNLIKELY(divisor == 1)) {
59 return {dividend, 0};
60 }
61 auto quotient = mul128(inverse, dividend);
62 auto remainder = dividend - quotient * divisor;
63 DCHECK_LT(remainder, divisor);
64 return {quotient, remainder};
65 }
66
67 // Max value for given bits.
maxValue(uint32_t nbits)68 FOLLY_ALWAYS_INLINE uint64_t maxValue(uint32_t nbits) {
69 return nbits == 64 ? std::numeric_limits<uint64_t>::max()
70 : (uint64_t(1) << nbits) - 1;
71 }
72
73 } // namespace qms_detail
74
75 template <class Instructions>
76 struct QuotientMultiSet<Instructions>::Metadata {
77 // Total number of blocks.
78 uint64_t numBlocks;
79 uint64_t numKeys;
80 uint64_t divisor;
81 uint8_t keyBits;
82 uint8_t remainderBits;
83
84 std::string debugString() const {
85 return sformat(
86 "Number of blocks: {}\n"
87 "Number of elements: {}\n"
88 "Divisor: {}\n"
89 "Key bits: {}\n"
90 "Remainder bits: {}",
91 numBlocks,
92 numKeys,
93 divisor,
94 keyBits,
95 remainderBits);
96 }
97 } FOLLY_PACK_ATTR;
98
99 template <class Instructions>
100 struct QuotientMultiSet<Instructions>::Block {
101 static const Block* get(const char* data) {
102 return reinterpret_cast<const Block*>(data);
103 }
104
105 static BlockPtr make(size_t remainderBits) {
106 auto ptr = reinterpret_cast<Block*>(calloc(blockSize(remainderBits), 1));
107 return {ptr, free};
108 }
109
110 uint64_t payload;
111 uint64_t occupieds;
112 uint64_t offset;
113 uint64_t runends;
114 char remainders[0];
115
116 static uint64_t blockSize(size_t remainderBits) {
117 return sizeof(Block) + remainderBits * 8;
118 }
119
120 FOLLY_ALWAYS_INLINE bool isOccupied(size_t offsetInBlock) const {
121 return ((occupieds >> offsetInBlock) & uint64_t(1)) != 0;
122 }
123
124 FOLLY_ALWAYS_INLINE bool isRunend(size_t offsetInBlock) const {
125 return ((runends >> offsetInBlock) & uint64_t(1)) != 0;
126 }
127
128 FOLLY_ALWAYS_INLINE uint64_t getRemainder(
129 size_t offsetInBlock, size_t remainderBits, size_t remainderMask) const {
130 DCHECK_LE(remainderBits, 56);
131 const size_t bitPos = offsetInBlock * remainderBits;
132 const uint64_t remainderWord =
133 loadUnaligned<uint64_t>(remainders + (bitPos / 8));
134 return (remainderWord >> (bitPos % 8)) & remainderMask;
135 }
136
137 void setOccupied(size_t offsetInBlock) {
138 occupieds |= uint64_t(1) << offsetInBlock;
139 }
140
141 void setRunend(size_t offsetInBlock) {
142 runends |= uint64_t(1) << offsetInBlock;
143 }
144
145 void setRemainder(
146 size_t offsetInBlock, size_t remainderBits, uint64_t remainder) {
147 DCHECK_LT(offsetInBlock, kBlockSize);
148 if (FOLLY_UNLIKELY(remainderBits == 0)) {
149 return;
150 }
151 Bits<uint64_t>::set(
152 reinterpret_cast<uint64_t*>(remainders),
153 offsetInBlock * remainderBits,
154 remainderBits,
155 remainder);
156 }
157 } FOLLY_PACK_ATTR;
158
159 template <class Instructions>
160 QuotientMultiSet<Instructions>::QuotientMultiSet(StringPiece data) {
161 static_assert(
162 kIsLittleEndian, "QuotientMultiSet requires little endianness.");
163 StringPiece sp = data;
164 CHECK_GE(sp.size(), sizeof(Metadata));
165 sp.advance(sp.size() - sizeof(Metadata));
166 metadata_ = reinterpret_cast<const Metadata*>(sp.data());
167 VLOG(2) << "Metadata: " << metadata_->debugString();
168
169 numBlocks_ = metadata_->numBlocks;
170 numSlots_ = numBlocks_ * kBlockSize;
171 divisor_ = metadata_->divisor;
172 fraction_ = qms_detail::getInverse(divisor_);
173 keyBits_ = metadata_->keyBits;
174 maxKey_ = qms_detail::maxValue(keyBits_);
175 remainderBits_ = metadata_->remainderBits;
176 remainderMask_ = qms_detail::maxValue(remainderBits_);
177 blockSize_ = Block::blockSize(remainderBits_);
178
179 CHECK_EQ(data.size(), numBlocks_ * blockSize_ + sizeof(Metadata));
180 data_ = data.data();
181 }
182
183 template <class Instructions>
184 auto QuotientMultiSet<Instructions>::equalRange(uint64_t key) const
185 -> SlotRange {
186 if (key > maxKey_) {
187 return {0, 0};
188 }
189 const auto qr = qms_detail::getQuotientAndRemainder(key, divisor_, fraction_);
190 const auto& quotient = qr.first;
191 const auto& remainder = qr.second;
192 const size_t blockIndex = quotient / kBlockSize;
193 const size_t offsetInBlock = quotient % kBlockSize;
194
195 if (FOLLY_UNLIKELY(blockIndex >= numBlocks_)) {
196 return {0, 0};
197 }
198
199 const auto* occBlock = getBlock(blockIndex);
200 __builtin_prefetch(reinterpret_cast<const char*>(&occBlock->occupieds) + 64);
201 const auto firstRunend = occBlock->offset;
202 if (!occBlock->isOccupied(offsetInBlock)) {
203 // Return a position that depends on the contents of the block so
204 // we can create a dependency in benchmarks.
205 return {firstRunend, firstRunend};
206 }
207
208 // Look for the right runend for the given key.
209 const uint64_t occupiedRank = Instructions::popcount(
210 Instructions::bzhi(occBlock->occupieds, offsetInBlock));
211 auto runend = findRunend(occupiedRank, firstRunend);
212 auto& slot = runend.first;
213 auto& block = runend.second;
214
215 // Iterates over the run backwards to find the slots whose remainder
216 // matches the key.
217 SlotRange range = {slot + 1, slot + 1};
218 while (true) {
219 uint64_t slotRemainder =
220 block->getRemainder(slot % kBlockSize, remainderBits_, remainderMask_);
221
222 if (slotRemainder > remainder) {
223 range.begin = slot;
224 range.end = slot;
225 } else if (slotRemainder == remainder) {
226 range.begin = slot;
227 } else {
228 break;
229 }
230
231 if (FOLLY_UNLIKELY(slot % kBlockSize == 0)) {
232 // Reached block start and the run starts from a prev block.
233 size_t slotBlockIndex = slot / kBlockSize;
234 if (slotBlockIndex > blockIndex) {
235 block = getBlock(slotBlockIndex - 1);
236 } else {
237 break;
238 }
239 }
240
241 --slot;
242 // Encounters the previous run.
243 if (block->isRunend(slot % kBlockSize)) {
244 break;
245 }
246 }
247
248 return range;
249 }
250
251 template <class Instructions>
252 auto QuotientMultiSet<Instructions>::findRunend(
253 uint64_t occupiedRank, uint64_t firstRunend) const
254 -> std::pair<uint64_t, const Block*> {
255 // Look for the right runend.
256 size_t slotBlockIndex = firstRunend / kBlockSize;
257 auto block = getBlock(slotBlockIndex);
258 uint64_t runendWord =
259 block->runends & (uint64_t(-1) << (firstRunend % kBlockSize));
260
261 while (true) {
262 DCHECK_LE(slotBlockIndex, numBlocks_);
263
264 const size_t numRuns = Instructions::popcount(runendWord);
265 if (FOLLY_LIKELY(numRuns > occupiedRank)) {
266 break;
267 }
268 occupiedRank -= numRuns;
269 ++slotBlockIndex;
270 block = getBlock(slotBlockIndex);
271 runendWord = block->runends;
272 }
273
274 return {
275 slotBlockIndex * kBlockSize +
276 select64<Instructions>(runendWord, occupiedRank),
277 block};
278 }
279
280 template <class Instructions>
281 uint64_t QuotientMultiSet<Instructions>::getBlockPayload(
282 uint64_t blockIndex) const {
283 DCHECK_LT(blockIndex, numBlocks_);
284 return getBlock(blockIndex)->payload;
285 }
286
287 template <class Instructions>
288 QuotientMultiSet<Instructions>::Iterator::Iterator(
289 const QuotientMultiSet<Instructions>* qms)
290 : qms_(qms) {}
291
292 template <class Instructions>
293 bool QuotientMultiSet<Instructions>::Iterator::next() {
294 if (pos_ == size_t(-1) ||
295 qms_->getBlock(pos_ / kBlockSize)->isRunend(pos_ % kBlockSize)) {
296 // Move to start of next run.
297 if (!nextOccupied()) {
298 return setEnd();
299 }
300
301 // Next run either starts at pos + 1 or the start of block
302 // specified by occupied slot.
303 pos_ = std::max<uint64_t>(pos_ + 1, occBlockIndex_ * kBlockSize);
304 } else {
305 // Move to next slot since a run must be contiguous.
306 pos_++;
307 }
308
309 const Block* block = qms_->getBlock(pos_ / kBlockSize);
310 uint64_t quotient =
311 (occBlockIndex_ * kBlockSize + occOffsetInBlock_) * qms_->divisor_;
312 key_ = quotient +
313 block->getRemainder(
314 pos_ % kBlockSize, qms_->remainderBits_, qms_->remainderMask_);
315
316 DCHECK_LT(pos_, qms_->numBlocks_ * kBlockSize);
317 return true;
318 }
319
320 template <class Instructions>
321 bool QuotientMultiSet<Instructions>::Iterator::skipTo(uint64_t key) {
322 if (key > qms_->maxKey_) {
323 return setEnd();
324 }
325 const auto qr =
326 qms_detail::getQuotientAndRemainder(key, qms_->divisor_, qms_->fraction_);
327 const auto& quotient = qr.first;
328 occBlockIndex_ = quotient / kBlockSize;
329 occOffsetInBlock_ = quotient % kBlockSize;
330
331 if (FOLLY_UNLIKELY(occBlockIndex_ >= qms_->numBlocks_)) {
332 return setEnd();
333 }
334
335 occBlock_ = qms_->getBlock(occBlockIndex_);
336 occWord_ = occBlock_->occupieds & (uint64_t(-1) << occOffsetInBlock_);
337 if (!nextOccupied()) {
338 return setEnd();
339 }
340
341 // Search for the next runend.
342 uint64_t occupiedRank = Instructions::popcount(
343 Instructions::bzhi(occBlock_->occupieds, occOffsetInBlock_));
344 auto runend = qms_->findRunend(occupiedRank, occBlock_->offset);
345 auto& slot = runend.first;
346 auto& block = runend.second;
347 uint64_t slotBlockIndex = slot / kBlockSize;
348 uint64_t slotOffsetInBlock = slot % kBlockSize;
349 pos_ = slot;
350
351 uint64_t nextQuotient =
352 (occBlockIndex_ * kBlockSize + occOffsetInBlock_) * qms_->divisor_;
353 uint64_t nextRemainder = block->getRemainder(
354 slotOffsetInBlock, qms_->remainderBits_, qms_->remainderMask_);
355
356 if (nextQuotient + nextRemainder < key) {
357 // Lower bound element is at the start of next run.
358 return next();
359 }
360
361 // Iterate over the run backwards to find the first key that is larger than
362 // or equal to the given key.
363 while (true) {
364 uint64_t slotRemainder = block->getRemainder(
365 slotOffsetInBlock, qms_->remainderBits_, qms_->remainderMask_);
366 if (nextQuotient + slotRemainder < key) {
367 break;
368 }
369 pos_ = slot;
370 nextRemainder = slotRemainder;
371 if (FOLLY_UNLIKELY(slotOffsetInBlock == 0)) {
372 // Reached block start and the run starts from a prev block.
373 if (slotBlockIndex > occBlockIndex_) {
374 --slotBlockIndex;
375 block = qms_->getBlock(slotBlockIndex);
376 slotOffsetInBlock = kBlockSize;
377 } else {
378 break;
379 }
380 }
381 --slot;
382 --slotOffsetInBlock;
383
384 // Encounters the previous run.
385 if (block->isRunend(slotOffsetInBlock)) {
386 break;
387 }
388 }
389
390 key_ = nextQuotient + nextRemainder;
391 return true;
392 }
393
394 template <class Instructions>
395 bool QuotientMultiSet<Instructions>::Iterator::nextOccupied() {
396 while (FOLLY_UNLIKELY(occWord_ == 0)) {
397 if (FOLLY_UNLIKELY(++occBlockIndex_ >= qms_->numBlocks_)) {
398 return false;
399 }
400 occBlock_ = qms_->getBlock(occBlockIndex_);
401 occWord_ = occBlock_->occupieds;
402 }
403
404 occOffsetInBlock_ = Instructions::ctz(occWord_);
405 occWord_ = Instructions::blsr(occWord_);
406 return true;
407 }
408
409 } // namespace folly
410
411 #endif // FOLLY_QUOTIENT_MULTI_SET_SUPPORTED
412