1 /*
2    Copyright 2016 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #pragma once
20 
21 #include "common/platform.h"
22 
23 #include "common/small_vector.h"
24 #include "common/time_utils.h"
25 
26 #include <atomic>
27 #include <cassert>
28 #include <cstring>
29 #include <deque>
30 #include <map>
31 #include <memory>
32 #include <numeric>
33 
34 #include <boost/intrusive/list.hpp>
35 #include <boost/intrusive/set.hpp>
36 
37 class ReadCache {
38 public:
39 	typedef uint64_t Offset;
40 	typedef uint32_t Size;
41 
42 	struct Entry {
43 		Offset offset;
44 		std::vector<uint8_t> buffer;
45 		Timer timer;
46 		std::atomic<int> refcount;
47 		boost::intrusive::set_member_hook<> set_member_hook;
48 		boost::intrusive::list_member_hook<> lru_member_hook;
49 		boost::intrusive::list_member_hook<> reserved_member_hook;
50 
51 		struct OffsetComp {
operatorEntry::OffsetComp52 			bool operator()(Offset offset, const Entry &entry) const {
53 				return offset < entry.offset;
54 			}
55 		};
56 
EntryEntry57 		Entry(Offset offset) : offset(offset), buffer(), timer(), refcount(0),
58 		      set_member_hook(), lru_member_hook() {}
59 
60 		bool operator<(const Entry &other) const {
61 			return offset < other.offset;
62 		}
63 
expiredEntry64 		bool expired(uint32_t expiration_time) const {
65 			return timer.elapsed_ms() >= expiration_time;
66 		}
67 
endOffsetEntry68 		Offset endOffset() const {
69 			return offset + buffer.size();
70 		}
71 
acquireEntry72 		void acquire() {
73 			timer.reset();
74 			refcount++;
75 		}
76 
releaseEntry77 		void release() {
78 			assert(refcount > 0);
79 			refcount--;
80 		}
81 	};
82 
83 	typedef boost::intrusive::set<Entry,
84 	        boost::intrusive::member_hook<Entry, boost::intrusive::set_member_hook<>,
85 	        &Entry::set_member_hook>> EntrySet;
86 	typedef boost::intrusive::list<Entry,
87 	        boost::intrusive::member_hook<Entry, boost::intrusive::list_member_hook<>,
88 	        &Entry::lru_member_hook>> EntryList;
89 	typedef boost::intrusive::list<Entry,
90 	        boost::intrusive::member_hook<Entry, boost::intrusive::list_member_hook<>,
91 	        &Entry::reserved_member_hook>> ReservedEntryList;
92 
93 	struct Result {
94 		small_vector<Entry *, 8> entries;
95 		bool is_fake;
96 
ResultResult97 		Result() : entries(), is_fake(false) {}
ResultResult98 		Result(Result &&other) noexcept
99 		       : entries(std::move(other.entries)), is_fake(other.is_fake) {}
100 		Result &operator=(Result &&other) noexcept {
101 			entries = std::move(other.entries);
102 			is_fake = other.is_fake;
103 			return *this;
104 		}
105 
106 		// Wrapper for returning data not really residing in cache
ResultResult107 		Result(std::vector<uint8_t> &&data) : entries(), is_fake(true) {
108 			Entry *entry = new Entry(0);
109 			entry->buffer = std::move(data);
110 			entries.push_back(entry);
111 		}
112 
~ResultResult113 		~Result() {
114 			if (is_fake) {
115 				assert(entries.size() == 1);
116 				delete entries.front();
117 			} else {
118 				release();
119 			}
120 		}
121 
frontOffsetResult122 		Offset frontOffset() const {
123 			assert(!entries.empty());
124 			return entries.front()->offset;
125 		}
126 
remainingOffsetResult127 		Offset remainingOffset() const {
128 			assert(!entries.empty());
129 			return entries.back()->offset;
130 		}
131 
endOffsetResult132 		Offset endOffset() const {
133 			assert(!entries.empty());
134 			return entries.back()->offset + entries.back()->buffer.size();
135 		}
136 
137 		/*!
138 		 * \brief Give access to a buffer which should be filled with data.
139 		 *
140 		 * If cache result is incomplete (i.e. some data should be read to fulfill the request),
141 		 * it should be read into this buffer in order to write it straight to cache.
142 		 */
inputBufferResult143 		std::vector<uint8_t> &inputBuffer() {
144 			assert(!entries.empty());
145 			assert(entries.back()->buffer.empty());
146 			assert(entries.back()->refcount > 0);
147 			return entries.back()->buffer;
148 		}
149 
150 		/*!
151 		 * \brief Serialize cache query result to an iovector.
152 		 *
153 		 * An iovector can be any structure that accepts pushing back
154 		 * a pair of {address, length}, which represents a consecutive array
155 		 * of bytes extracted from cache.
156 		 *
157 		 * \return number of bytes added to iovector
158 		 */
159 		template<typename IoVec>
toIoVecResult160 		Size toIoVec(IoVec &output, Offset real_offset, Size real_size) const {
161 			assert(real_offset >= frontOffset());
162 			uint64_t offset = real_offset;
163 			Size bytes_left = real_size;
164 			for (const auto &entry_ptr : entries) {
165 				const ReadCache::Entry &entry = *entry_ptr;
166 				if (bytes_left <= 0) {
167 					break;
168 				}
169 				// Special case: Read request was past the end of the file
170 				if (entry.buffer.empty() || offset >= entry.endOffset()) {
171 					break;
172 				}
173 				assert(offset >= entry.offset && offset < entry.endOffset());
174 				auto start = entry.buffer.data() + (offset - entry.offset);
175 				auto end = std::min(start + bytes_left, entry.buffer.data() + entry.buffer.size());
176 				assert(start < end);
177 				size_t length = std::distance(start, end);
178 
179 				output.push_back({(void *)start, length});
180 				offset += length;
181 				bytes_left -= length;
182 			}
183 			return offset - real_offset;
184 		}
185 
copyToBufferResult186 		Size copyToBuffer(uint8_t *output, Offset real_offset, Size real_size) const {
187 			assert(real_offset >= frontOffset());
188 			uint64_t offset = real_offset;
189 			Size bytes_left = real_size;
190 			for (const auto &entry_ptr : entries) {
191 				const ReadCache::Entry &entry = *entry_ptr;
192 				if (bytes_left <= 0) {
193 					break;
194 				}
195 				// Special case: Read request was past the end of the file
196 				if (entry.buffer.empty() || offset >= entry.endOffset()) {
197 					break;
198 				}
199 				assert(offset >= entry.offset && offset < entry.endOffset());
200 				auto start = entry.buffer.data() + (offset - entry.offset);
201 				auto end = std::min(start + bytes_left, entry.buffer.data() + entry.buffer.size());
202 				assert(start < end);
203 				size_t length = std::distance(start, end);
204 				std::memcpy(output, (void *)start, length);
205 				output += length;
206 				offset += length;
207 				bytes_left -= length;
208 			}
209 			return offset - real_offset;
210 		}
211 
emptyResult212 		bool empty() const {
213 			return entries.empty();
214 		}
215 
releaseResult216 		void release() {
217 			for (auto &entry : entries) {
218 				entry->release();
219 			}
220 			entries.clear();
221 		}
222 
addResult223 		void add(Entry &entry) {
224 			entry.acquire();
225 			assert(entries.empty() || endOffset() >= entry.offset);
226 			entries.push_back(std::addressof(entry));
227 		}
228 
requestSizeResult229 		Size requestSize(Offset real_offset, Size real_size) const {
230 			if (entries.empty()) {
231 				return 0;
232 			}
233 			assert(real_offset >= frontOffset());
234 			assert(real_offset <= endOffset());
235 			return std::min<Size>(endOffset() - real_offset, real_size);
236 		}
237 
toStringResult238 		std::string toString() const {
239 			std::string text;
240 			for(const auto &entry : entries) {
241 				text += "(" + std::to_string(entry->refcount) + "|"
242 				+ std::to_string(entry->offset) + ":"
243 				+ std::to_string(entry->buffer.size()) + "),";
244 			}
245 			return text;
246 		}
247 	};
248 
ReadCache(uint32_t expiration_time)249 	explicit ReadCache(uint32_t expiration_time)
250 	: entries_(), lru_(), reserved_entries_(), expiration_time_(expiration_time) {}
251 
~ReadCache()252 	~ReadCache() {
253 		clear();
254 		clearReserved(std::numeric_limits<unsigned>::max());
255 		assert(entries_.empty());
256 		assert(lru_.empty());
257 		assert(reserved_entries_.empty());
258 	}
259 
260 	/*!
261 	 * \brief Try to get data from cache.
262 	 *
263 	 * If all data is available in cache, it can be obtained from result
264 	 * as an iovector via result.toIoVec() call.
265 	 * If some or no data is available, the rest should be read into the result buffer
266 	 * via result.inputBuffer(). Then, it can be obtain as an iovector via result.toIoVec().
267 	 *
268 	 * \return cache query result
269 	 */
query(Offset offset,Size size)270 	Result query(Offset offset, Size size) {
271 		collectGarbage();
272 
273 		Result result;
274 		auto it = entries_.upper_bound(offset, Entry::OffsetComp());
275 		if (it != entries_.begin()) {
276 			--it;
277 		}
278 
279 		assert(size > 0);
280 
281 		Size bytes_left = size;
282 		while (it != entries_.end() && bytes_left > 0) {
283 			if (offset < it->offset) {
284 				break;
285 			}
286 
287 			if (it->expired(expiration_time_) || it->buffer.empty()) {
288 				it = erase(it);
289 				continue;
290 			}
291 
292 			if (offset < it->endOffset()) {
293 				Size bytes_from_buffer = std::min<Size>(it->buffer.size() - (offset - it->offset), bytes_left);
294 
295 				bytes_left -= bytes_from_buffer;
296 				offset += bytes_from_buffer;
297 				result.add(*it);
298 			}
299 			++it;
300 		}
301 
302 		if (bytes_left > 0) {
303 			auto inserted = insert(it, offset, bytes_left);
304 			result.add(*inserted);
305 		}
306 
307 		return result;
308 	}
309 
clear()310 	void clear() {
311 		auto it = entries_.begin();
312 		while (it != entries_.end()) {
313 			it = erase(it);
314 		}
315 	}
316 
317 protected:
insert(EntrySet::iterator it,Offset offset,Size size)318 	EntrySet::iterator insert(EntrySet::iterator it, Offset offset, Size size) {
319 		it = clearCollisions(it, offset + size);
320 		Entry *e = new Entry(offset);
321 		lru_.push_back(*e);
322 		assert(entries_.find(*e) == entries_.end());
323 		return entries_.insert(it, *e);
324 	}
325 
326 	void collectGarbage(unsigned count = 4) {
327 		unsigned reserved_count = count;
328 		while (!lru_.empty() && count-- > 0) {
329 			Entry *e = std::addressof(lru_.front());
330 			if (e->expired(expiration_time_)) {
331 				erase(entries_.iterator_to(*e));
332 			} else {
333 				break;
334 			}
335 		}
336 		clearReserved(reserved_count);
337 	}
338 
erase(EntrySet::iterator it)339 	EntrySet::iterator erase(EntrySet::iterator it) {
340 		assert(it != entries_.end());
341 		Entry *e = std::addressof(*it);
342 		auto ret = entries_.erase(it);
343 		lru_.erase(lru_.iterator_to(*e));
344 		if (e->refcount > 0) {
345 			reserved_entries_.push_back(*e);
346 		} else {
347 			assert(e->refcount == 0);
348 			delete e;
349 		}
350 		return ret;
351 	}
352 
clearReserved(unsigned count)353 	void clearReserved(unsigned count) {
354 		while (!reserved_entries_.empty() && count-- > 0) {
355 			Entry *e = std::addressof(reserved_entries_.front());
356 			if (e->refcount == 0) {
357 				reserved_entries_.pop_front();
358 				delete e;
359 			} else {
360 				assert(e->refcount >= 0);
361 				reserved_entries_.splice(reserved_entries_.end(), reserved_entries_,
362 							 reserved_entries_.begin());
363 			}
364 		}
365 	}
366 
clearCollisions(EntrySet::iterator it,Offset start_offset)367 	EntrySet::iterator clearCollisions(EntrySet::iterator it, Offset start_offset) {
368 		while (it != entries_.end() && it->offset < start_offset) {
369 			it = erase(it);
370 		}
371 		return it;
372 	}
373 
toString()374 	std::string toString() const {
375 		std::string text;
376 		for(const auto &entry : entries_) {
377 			text += "(" + std::to_string(entry.refcount) + "|"
378 			+ std::to_string(entry.offset) + ":"
379 			+ std::to_string(entry.buffer.size()) + "),";
380 		}
381 		return text;
382 	}
383 
384 	EntrySet entries_;
385 	EntryList lru_;
386 	ReservedEntryList reserved_entries_;
387 	uint32_t expiration_time_;
388 };
389