1 /*
2  Copyright 2015 Skytechnology sp. z o.o.
3 
4  This file is part of LizardFS.
5 
6  LizardFS is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, version 3.
9 
10  LizardFS is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 
21 #include "common/slice_read_planner.h"
22 
23 #include <cmath>
24 
25 /*!
26  * Prepares read planner for serving selected parts of a slice type.
27  * Firstly, function checks if:
28  *  - all requested parts are available,
29  *  - or requested parts can be recovered,
30  *  - or reading is not possible at all.
31  * Then, parts are sorted by their score.
32  * \param slice_type slice type for read operation
33  * \param slice_parts requested parts of selected slice type
34  * \param available_parts parts available in the system
35  */
prepare(Goal::Slice::Type slice_type,const PartIndexContainer & slice_parts,const PartsContainer & available_parts)36 void SliceReadPlanner::prepare(Goal::Slice::Type slice_type, const PartIndexContainer &slice_parts,
37 		const PartsContainer &available_parts) {
38 	reset(slice_type, slice_parts);
39 	std::bitset<Goal::Slice::kMaxPartsCount> part_bitset;
40 	for (const auto &part : available_parts) {
41 		if (part.getSliceType() == slice_type_) {
42 			part_bitset.set(part.getSlicePart());
43 		}
44 	}
45 
46 	required_parts_available_ =
47 	    std::all_of(slice_parts_.begin(), slice_parts_.end(),
48 	        [&part_bitset](int slice_part) {return part_bitset.test(slice_part);});
49 
50 	can_recover_parts_ = (int)part_bitset.count() >= slice_traits::requiredPartsToRecover(slice_type_);
51 	can_read_ = required_parts_available_ || can_recover_parts_;
52 
53 	if (!can_read_) {
54 		return;
55 	}
56 
57 	for (const auto &part : available_parts) {
58 		if (part.getSliceType() == slice_type_ && part_bitset[part.getSlicePart()]) {
59 			auto it = scores_.find(part);
60 			float score = it != scores_.end() ? it->second : 1;
61 			weighted_parts_to_use_.push_back({score, part});
62 			part_bitset.reset(part.getSlicePart());
63 		}
64 	}
65 
66 	std::stable_sort(
67 		weighted_parts_to_use_.begin(), weighted_parts_to_use_.end(),
68 		[](const WeightedPart &a, const WeightedPart &b) { return a.score > b.score; });
69 }
70 
71 /*!
72  * Adds read operations to plan.
73  * If part was one of the requested ones, its buffer offset is determined by its order.
74  * Additional parts have consecutive offsets starting after last requested part.
75  * \param plan - plan to be amended
76  * \param first_block first block to be read
77  * \param block_count number of blocks to be read
78  * \param parts_count number of parts to be added
79  * \param wave wave number for read operation
80  * \param buffer_offset current offset for additional parts
81  * \return offset for additional parts
82  */
addParts(SliceReadPlan * plan,int first_block,int block_count,int parts_count,int wave,int buffer_offset)83 int SliceReadPlanner::addParts(SliceReadPlan *plan, int first_block, int block_count,
84 		int parts_count, int wave, int buffer_offset) {
85 	int ops = plan->read_operations.size();
86 	int end = std::min<int>(ops + parts_count, weighted_parts_to_use_.size());
87 
88 	for (; ops < end; ++ops) {
89 		ReadPlan::ReadOperation op{first_block * MFSBLOCKSIZE, 0, 0, wave};
90 		op.request_size = MFSBLOCKSIZE * std::min(
91 				(int)slice_traits::getNumberOfBlocks(weighted_parts_to_use_[ops].type) - first_block,
92 				block_count);
93 		int index = part_indices_[weighted_parts_to_use_[ops].type.getSlicePart()];
94 		if (index < 0) {
95 			op.buffer_offset = buffer_offset;
96 			buffer_offset += block_count * MFSBLOCKSIZE;
97 		} else {
98 			op.buffer_offset = index * MFSBLOCKSIZE * block_count;
99 		}
100 		plan->read_operations.push_back({weighted_parts_to_use_[ops].type, op});
101 	}
102 
103 	plan->read_buffer_size = buffer_offset;
104 	return buffer_offset;
105 }
106 
107 /*!
108  * Adds read operations for parts to plan with highest priority (wave 0).
109  * \param plan - plan to be amended
110  * \param first_block first block to be read
111  * \param block_count number of blocks to be read
112  * \param parts_count number of parts to be added
113  * \return buffer offset for additional parts
114  */
addBasicParts(SliceReadPlan * plan,int first_block,int block_count,int parts_count)115 int SliceReadPlanner::addBasicParts(SliceReadPlan *plan, int first_block, int block_count,
116 		int parts_count) {
117 	int buffer_offset = plan->requested_parts.size() * plan->buffer_part_size;
118 	return addParts(plan, first_block, block_count, parts_count, 0, buffer_offset);
119 }
120 
121 /*!
122  * Adds additional read operations to plan.
123  * \param plan - plan to be amended
124  * \param first_block first block to be read
125  * \param block_count number of blocks to be read
126  * \return buffer offset for additional parts
127  */
addExtraParts(SliceReadPlan * plan,int first_block,int block_count,int buffer_offset)128 int SliceReadPlanner::addExtraParts(SliceReadPlan *plan, int first_block, int block_count,
129 		int buffer_offset) {
130 	int ops = plan->read_operations.size();
131 	size_t all_parts_count = weighted_parts_to_use_.size();
132 	int to_recover = std::min<int>(
133 			std::floor(bandwidth_overuse_ * slice_traits::requiredPartsToRecover(slice_type_)),
134 			all_parts_count);
135 	int wave = 1;
136 
137 	// Add parts needed to recover to wave 1
138 	if (ops < to_recover) {
139 		buffer_offset = addParts(plan, first_block, block_count, to_recover - ops, wave, buffer_offset);
140 		wave += 1;
141 	}
142 
143 	// Add the rest with rising waves
144 	while (plan->read_operations.size() < all_parts_count) {
145 		int parts_count = std::min<int>(2, all_parts_count - plan->read_operations.size());
146 		buffer_offset = addParts(plan, first_block, block_count, parts_count, wave, buffer_offset);
147 		wave += 1;
148 	}
149 
150 	return buffer_offset;
151 }
152 
shouldReadPartsRequiredForRecovery() const153 bool SliceReadPlanner::shouldReadPartsRequiredForRecovery() const {
154 	return !required_parts_available_;
155 }
156 
157 /*!
158  * Builds a read plan.
159  * First step is preparing an occurrence bitmap of requested parts.
160  * Then, if parts needed for recovery are to be read (recovery is needed or nearly all parts
161  * were requested anyway), they are queued for reading. Otherwise, requested parts
162  * are queued for first wave and additional parts are added to consecutive waves.
163  * \param first_block first block to be read
164  * \param block_count number of blocks to be read
165  * \return read plan ready to be executed
166  */
buildPlanFor(uint32_t first_block,uint32_t block_count)167 std::unique_ptr<ReadPlan> SliceReadPlanner::buildPlanFor(uint32_t first_block,
168 		uint32_t block_count) {
169 	std::unique_ptr<SliceReadPlan> plan = getPlan();
170 	plan->buffer_part_size = block_count * MFSBLOCKSIZE;
171 
172 	// Count occurrences
173 	std::bitset<Goal::Slice::kMaxPartsCount> part_bitset;
174 	for (const auto &part : slice_parts_) {
175 		assert(!part_bitset[part]);
176 		part_bitset.set(part);
177 	}
178 
179 	// Prepare indices for each requested part
180 	int next_index = 0;
181 	part_indices_.fill(-1);
182 	for (const auto &part : slice_parts_) {
183 		int size = MFSBLOCKSIZE * std::min<int>(
184 			slice_traits::getNumberOfBlocks(ChunkPartType(slice_type_, part)) - first_block,
185 			block_count);
186 		plan->requested_parts.push_back({part, size});
187 		part_indices_[part] = next_index++;
188 	}
189 
190 	if (shouldReadPartsRequiredForRecovery()) {
191 		int recovery_parts_count = std::max<int>(
192 			std::floor(bandwidth_overuse_ *
193 				(required_parts_available_ ?
194 					slice_parts_.size() : slice_traits::requiredPartsToRecover(slice_type_))),
195 			slice_traits::requiredPartsToRecover(slice_type_));
196 
197 		int offset = addBasicParts(plan.get(), first_block, block_count, recovery_parts_count);
198 		addExtraParts(plan.get(), first_block, block_count, offset);
199 	} else {
200 		auto breakpoint =
201 		    std::stable_partition(weighted_parts_to_use_.begin(),
202 		        weighted_parts_to_use_.end(), [&part_bitset](const WeightedPart &part) {
203 			        return part_bitset[part.type.getSlicePart()];
204 		        });
205 
206 		int requested_parts_count = std::distance(weighted_parts_to_use_.begin(), breakpoint);
207 		int offset = addBasicParts(plan.get(), first_block, block_count, requested_parts_count);
208 		addExtraParts(plan.get(), first_block, block_count, offset);
209 	}
210 
211 	return std::move(plan);
212 }
213