1 /* 2 Copyright 2016 Skytechnology sp. z o.o. 3 4 This file is part of LizardFS. 5 6 LizardFS is free software: you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation, version 3. 9 10 LizardFS is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>. 17 */ 18 19 #pragma once 20 21 #include "common/platform.h" 22 23 #include "common/small_vector.h" 24 #include "common/time_utils.h" 25 26 #include <atomic> 27 #include <cassert> 28 #include <cstring> 29 #include <deque> 30 #include <map> 31 #include <memory> 32 #include <numeric> 33 34 #include <boost/intrusive/list.hpp> 35 #include <boost/intrusive/set.hpp> 36 37 class ReadCache { 38 public: 39 typedef uint64_t Offset; 40 typedef uint32_t Size; 41 42 struct Entry { 43 Offset offset; 44 std::vector<uint8_t> buffer; 45 Timer timer; 46 std::atomic<int> refcount; 47 boost::intrusive::set_member_hook<> set_member_hook; 48 boost::intrusive::list_member_hook<> lru_member_hook; 49 boost::intrusive::list_member_hook<> reserved_member_hook; 50 51 struct OffsetComp { operatorEntry::OffsetComp52 bool operator()(Offset offset, const Entry &entry) const { 53 return offset < entry.offset; 54 } 55 }; 56 EntryEntry57 Entry(Offset offset) : offset(offset), buffer(), timer(), refcount(0), 58 set_member_hook(), lru_member_hook() {} 59 60 bool operator<(const Entry &other) const { 61 return offset < other.offset; 62 } 63 expiredEntry64 bool expired(uint32_t expiration_time) const { 65 return timer.elapsed_ms() >= expiration_time; 66 } 67 endOffsetEntry68 Offset endOffset() const { 69 return offset + buffer.size(); 70 } 71 acquireEntry72 void acquire() { 73 timer.reset(); 74 refcount++; 75 } 76 releaseEntry77 void release() { 78 assert(refcount > 0); 79 refcount--; 80 } 81 }; 82 83 typedef boost::intrusive::set<Entry, 84 boost::intrusive::member_hook<Entry, boost::intrusive::set_member_hook<>, 85 &Entry::set_member_hook>> EntrySet; 86 typedef boost::intrusive::list<Entry, 87 boost::intrusive::member_hook<Entry, boost::intrusive::list_member_hook<>, 88 &Entry::lru_member_hook>> EntryList; 89 typedef boost::intrusive::list<Entry, 90 boost::intrusive::member_hook<Entry, boost::intrusive::list_member_hook<>, 91 &Entry::reserved_member_hook>> ReservedEntryList; 92 93 struct Result { 94 small_vector<Entry *, 8> entries; 95 bool is_fake; 96 ResultResult97 Result() : entries(), is_fake(false) {} ResultResult98 Result(Result &&other) noexcept 99 : entries(std::move(other.entries)), is_fake(other.is_fake) {} 100 Result &operator=(Result &&other) noexcept { 101 entries = std::move(other.entries); 102 is_fake = other.is_fake; 103 return *this; 104 } 105 106 // Wrapper for returning data not really residing in cache ResultResult107 Result(std::vector<uint8_t> &&data) : entries(), is_fake(true) { 108 Entry *entry = new Entry(0); 109 entry->buffer = std::move(data); 110 entries.push_back(entry); 111 } 112 ~ResultResult113 ~Result() { 114 if (is_fake) { 115 assert(entries.size() == 1); 116 delete entries.front(); 117 } else { 118 release(); 119 } 120 } 121 frontOffsetResult122 Offset frontOffset() const { 123 assert(!entries.empty()); 124 return entries.front()->offset; 125 } 126 remainingOffsetResult127 Offset remainingOffset() const { 128 assert(!entries.empty()); 129 return entries.back()->offset; 130 } 131 endOffsetResult132 Offset endOffset() const { 133 assert(!entries.empty()); 134 return entries.back()->offset + entries.back()->buffer.size(); 135 } 136 137 /*! 138 * \brief Give access to a buffer which should be filled with data. 139 * 140 * If cache result is incomplete (i.e. some data should be read to fulfill the request), 141 * it should be read into this buffer in order to write it straight to cache. 142 */ inputBufferResult143 std::vector<uint8_t> &inputBuffer() { 144 assert(!entries.empty()); 145 assert(entries.back()->buffer.empty()); 146 assert(entries.back()->refcount > 0); 147 return entries.back()->buffer; 148 } 149 150 /*! 151 * \brief Serialize cache query result to an iovector. 152 * 153 * An iovector can be any structure that accepts pushing back 154 * a pair of {address, length}, which represents a consecutive array 155 * of bytes extracted from cache. 156 * 157 * \return number of bytes added to iovector 158 */ 159 template<typename IoVec> toIoVecResult160 Size toIoVec(IoVec &output, Offset real_offset, Size real_size) const { 161 assert(real_offset >= frontOffset()); 162 uint64_t offset = real_offset; 163 Size bytes_left = real_size; 164 for (const auto &entry_ptr : entries) { 165 const ReadCache::Entry &entry = *entry_ptr; 166 if (bytes_left <= 0) { 167 break; 168 } 169 // Special case: Read request was past the end of the file 170 if (entry.buffer.empty() || offset >= entry.endOffset()) { 171 break; 172 } 173 assert(offset >= entry.offset && offset < entry.endOffset()); 174 auto start = entry.buffer.data() + (offset - entry.offset); 175 auto end = std::min(start + bytes_left, entry.buffer.data() + entry.buffer.size()); 176 assert(start < end); 177 size_t length = std::distance(start, end); 178 179 output.push_back({(void *)start, length}); 180 offset += length; 181 bytes_left -= length; 182 } 183 return offset - real_offset; 184 } 185 copyToBufferResult186 Size copyToBuffer(uint8_t *output, Offset real_offset, Size real_size) const { 187 assert(real_offset >= frontOffset()); 188 uint64_t offset = real_offset; 189 Size bytes_left = real_size; 190 for (const auto &entry_ptr : entries) { 191 const ReadCache::Entry &entry = *entry_ptr; 192 if (bytes_left <= 0) { 193 break; 194 } 195 // Special case: Read request was past the end of the file 196 if (entry.buffer.empty() || offset >= entry.endOffset()) { 197 break; 198 } 199 assert(offset >= entry.offset && offset < entry.endOffset()); 200 auto start = entry.buffer.data() + (offset - entry.offset); 201 auto end = std::min(start + bytes_left, entry.buffer.data() + entry.buffer.size()); 202 assert(start < end); 203 size_t length = std::distance(start, end); 204 std::memcpy(output, (void *)start, length); 205 output += length; 206 offset += length; 207 bytes_left -= length; 208 } 209 return offset - real_offset; 210 } 211 emptyResult212 bool empty() const { 213 return entries.empty(); 214 } 215 releaseResult216 void release() { 217 for (auto &entry : entries) { 218 entry->release(); 219 } 220 entries.clear(); 221 } 222 addResult223 void add(Entry &entry) { 224 entry.acquire(); 225 assert(entries.empty() || endOffset() >= entry.offset); 226 entries.push_back(std::addressof(entry)); 227 } 228 requestSizeResult229 Size requestSize(Offset real_offset, Size real_size) const { 230 if (entries.empty()) { 231 return 0; 232 } 233 assert(real_offset >= frontOffset()); 234 assert(real_offset <= endOffset()); 235 return std::min<Size>(endOffset() - real_offset, real_size); 236 } 237 toStringResult238 std::string toString() const { 239 std::string text; 240 for(const auto &entry : entries) { 241 text += "(" + std::to_string(entry->refcount) + "|" 242 + std::to_string(entry->offset) + ":" 243 + std::to_string(entry->buffer.size()) + "),"; 244 } 245 return text; 246 } 247 }; 248 ReadCache(uint32_t expiration_time)249 explicit ReadCache(uint32_t expiration_time) 250 : entries_(), lru_(), reserved_entries_(), expiration_time_(expiration_time) {} 251 ~ReadCache()252 ~ReadCache() { 253 clear(); 254 clearReserved(std::numeric_limits<unsigned>::max()); 255 assert(entries_.empty()); 256 assert(lru_.empty()); 257 assert(reserved_entries_.empty()); 258 } 259 260 /*! 261 * \brief Try to get data from cache. 262 * 263 * If all data is available in cache, it can be obtained from result 264 * as an iovector via result.toIoVec() call. 265 * If some or no data is available, the rest should be read into the result buffer 266 * via result.inputBuffer(). Then, it can be obtain as an iovector via result.toIoVec(). 267 * 268 * \return cache query result 269 */ query(Offset offset,Size size)270 Result query(Offset offset, Size size) { 271 collectGarbage(); 272 273 Result result; 274 auto it = entries_.upper_bound(offset, Entry::OffsetComp()); 275 if (it != entries_.begin()) { 276 --it; 277 } 278 279 assert(size > 0); 280 281 Size bytes_left = size; 282 while (it != entries_.end() && bytes_left > 0) { 283 if (offset < it->offset) { 284 break; 285 } 286 287 if (it->expired(expiration_time_) || it->buffer.empty()) { 288 it = erase(it); 289 continue; 290 } 291 292 if (offset < it->endOffset()) { 293 Size bytes_from_buffer = std::min<Size>(it->buffer.size() - (offset - it->offset), bytes_left); 294 295 bytes_left -= bytes_from_buffer; 296 offset += bytes_from_buffer; 297 result.add(*it); 298 } 299 ++it; 300 } 301 302 if (bytes_left > 0) { 303 auto inserted = insert(it, offset, bytes_left); 304 result.add(*inserted); 305 } 306 307 return result; 308 } 309 clear()310 void clear() { 311 auto it = entries_.begin(); 312 while (it != entries_.end()) { 313 it = erase(it); 314 } 315 } 316 317 protected: insert(EntrySet::iterator it,Offset offset,Size size)318 EntrySet::iterator insert(EntrySet::iterator it, Offset offset, Size size) { 319 it = clearCollisions(it, offset + size); 320 Entry *e = new Entry(offset); 321 lru_.push_back(*e); 322 assert(entries_.find(*e) == entries_.end()); 323 return entries_.insert(it, *e); 324 } 325 326 void collectGarbage(unsigned count = 4) { 327 unsigned reserved_count = count; 328 while (!lru_.empty() && count-- > 0) { 329 Entry *e = std::addressof(lru_.front()); 330 if (e->expired(expiration_time_)) { 331 erase(entries_.iterator_to(*e)); 332 } else { 333 break; 334 } 335 } 336 clearReserved(reserved_count); 337 } 338 erase(EntrySet::iterator it)339 EntrySet::iterator erase(EntrySet::iterator it) { 340 assert(it != entries_.end()); 341 Entry *e = std::addressof(*it); 342 auto ret = entries_.erase(it); 343 lru_.erase(lru_.iterator_to(*e)); 344 if (e->refcount > 0) { 345 reserved_entries_.push_back(*e); 346 } else { 347 assert(e->refcount == 0); 348 delete e; 349 } 350 return ret; 351 } 352 clearReserved(unsigned count)353 void clearReserved(unsigned count) { 354 while (!reserved_entries_.empty() && count-- > 0) { 355 Entry *e = std::addressof(reserved_entries_.front()); 356 if (e->refcount == 0) { 357 reserved_entries_.pop_front(); 358 delete e; 359 } else { 360 assert(e->refcount >= 0); 361 reserved_entries_.splice(reserved_entries_.end(), reserved_entries_, 362 reserved_entries_.begin()); 363 } 364 } 365 } 366 clearCollisions(EntrySet::iterator it,Offset start_offset)367 EntrySet::iterator clearCollisions(EntrySet::iterator it, Offset start_offset) { 368 while (it != entries_.end() && it->offset < start_offset) { 369 it = erase(it); 370 } 371 return it; 372 } 373 toString()374 std::string toString() const { 375 std::string text; 376 for(const auto &entry : entries_) { 377 text += "(" + std::to_string(entry.refcount) + "|" 378 + std::to_string(entry.offset) + ":" 379 + std::to_string(entry.buffer.size()) + "),"; 380 } 381 return text; 382 } 383 384 EntrySet entries_; 385 EntryList lru_; 386 ReservedEntryList reserved_entries_; 387 uint32_t expiration_time_; 388 }; 389