1 /*
2    Copyright 2016 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #pragma once
20 
21 #include "common/platform.h"
22 
23 #include "common/ring_buffer.h"
24 #include "common/time_utils.h"
25 #include <array>
26 #include <cassert>
27 
28 /*! Adviser class for readahead mechanism.
29  *
30  * This class is used to predict size of readahead requests on the basis of
31  * sequentiality of read operations and estimated latency.
32  */
33 class ReadaheadAdviser {
34 public:
35 	struct HistoryEntry {
36 		int64_t timestamp;
37 		uint32_t request_size;
38 	};
39 
40 	ReadaheadAdviser(uint32_t timeout_ms, uint32_t window_size_limit = kDefaultWindowSizeLimit) :
current_offset_()41 		current_offset_(),
42 		window_(kInitWindowSize),
43 		random_candidates_(),
44 		max_window_size_(window_size_limit),
45 		window_size_limit_(window_size_limit),
46 		random_threshold_(kRandomThreshold),
47 		requested_bytes_(),
48 		timer_(),
49 		timeout_ms_(timeout_ms) {}
50 
51 	/*!
52 	 * \brief Acknowledge read request and judge whether it is sequential or random.
53 	 * \param offset offset of read operation
54 	 * \param size size of read operation
55 	 */
feed(uint64_t offset,uint32_t size)56 	void feed(uint64_t offset, uint32_t size) {
57 		if (timeout_ms_ == 0) {
58 			window_ = 0;
59 			return;
60 		}
61 		addToHistory(size);
62 		if (offset == current_offset_) {
63 			random_candidates_ = 0;
64 			expand();
65 			current_offset_ = offset + size;
66 		} else {
67 			random_candidates_++;
68 			if (looksRandom()) {
69 				reduce();
70 				current_offset_ = offset + size;
71 			}
72 		}
73 	}
74 
75 	/*!
76 	 * \brief Count suggested readahead window size.
77 	 * \return suggested readahead window size
78 	 */
window()79 	int window() const {
80 		return std::min(window_, max_window_size_);
81 	}
82 
83 private:
84 
85 	/*!
86 	 * \brief Check if history entry is not overdue.
87 	 * \return true if history entry is overdue
88 	 */
expired(HistoryEntry entry,int64_t timestamp)89 	bool expired(HistoryEntry entry, int64_t timestamp) {
90 		return entry.timestamp + kHistoryEntryLifespan_ns < timestamp;
91 	}
92 
93 	/*!
94 	 * \brief Add size of read request to history.
95 	 * \param size size of read request
96 	 */
addToHistory(uint32_t size)97 	void addToHistory(uint32_t size) {
98 		int64_t timestamp = timer_.elapsed_us();
99 		// Remove stale history entries
100 		while (history_.full() || (!history_.empty() && expired(history_.front(), timestamp))) {
101 			requested_bytes_ -= history_.front().request_size;
102 			history_.pop_front();
103 		}
104 
105 		history_.push_back(HistoryEntry{timestamp, size});
106 		requested_bytes_ += size;
107 
108 		// If there is enough data in history to predict max window size, do it
109 		if (history_.size() >= kHistoryValidityThreshold  && timestamp != history_.front().timestamp) {
110 			adjustMaxWindowSize(timestamp);
111 		}
112 	}
113 
114 	/*!
115 	 * \brief Adjust max window size on the basis of estimated latency.
116 	 * \param timestamp time point used for latency estimation
117 	 */
adjustMaxWindowSize(int64_t timestamp)118 	void adjustMaxWindowSize(int64_t timestamp) {
119 		double throughput_MBps = (double)requested_bytes_ / (timestamp - history_.front().timestamp);
120 		// Max window size is set on the basis of estimated throughput
121 		max_window_size_ = std::min<uint64_t>(window_size_limit_, 2 * throughput_MBps * timeout_ms_ * 1024);
122 		max_window_size_ = std::max(max_window_size_, kInitWindowSize);
123 	}
124 
125 	/*!
126 	 * \brief Increase window size.
127 	 */
expand()128 	void expand() {
129 		if (window_ >= max_window_size_) {
130 			return;
131 		}
132 
133 		if (window_ < max_window_size_ / 16) {
134 			window_ *= 4;
135 		} else {
136 			window_ *= 2;
137 		}
138 	}
139 
140 	/*!
141 	 * \brief Decrease window size.
142 	 */
reduce()143 	void reduce() {
144 		if (window_ >= 2 * kInitWindowSize) {
145 			window_ /= 2;
146 		}
147 	}
148 
149 	/*!
150 	 * \brief Check if read operations seem to be random.
151 	 */
looksRandom()152 	bool looksRandom() {
153 		return random_candidates_ > random_threshold_;
154 	}
155 
156 	static const unsigned kInitWindowSize = 1 << 16;
157 	static const unsigned kDefaultWindowSizeLimit = 1 << 22;
158 	static const int kRandomThreshold = 3;
159 	static const int kHistoryEntryLifespan_ns = 1 << 20;
160 	static const int kHistoryCapacity = 64;
161 	static const unsigned kHistoryValidityThreshold = 3;
162 	static_assert(kHistoryCapacity >= (int)kHistoryValidityThreshold,
163 			"History validity threshold must not be greater than history capacity");
164 
165 	uint64_t current_offset_;
166 	unsigned window_;
167 	int random_candidates_;
168 
169 	unsigned max_window_size_;
170 	unsigned window_size_limit_;
171 	int random_threshold_;
172 
173 	RingBuffer<HistoryEntry, kHistoryCapacity> history_;
174 	uint64_t requested_bytes_;
175 	Timer timer_;
176 	uint32_t timeout_ms_;
177 };
178