1 /*
2 Copyright 2015 Skytechnology sp. z o.o.
3
4 This file is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20
21 #include "common/slice_read_planner.h"
22
23 #include <cmath>
24
25 /*!
26 * Prepares read planner for serving selected parts of a slice type.
27 * Firstly, function checks if:
28 * - all requested parts are available,
29 * - or requested parts can be recovered,
30 * - or reading is not possible at all.
31 * Then, parts are sorted by their score.
32 * \param slice_type slice type for read operation
33 * \param slice_parts requested parts of selected slice type
34 * \param available_parts parts available in the system
35 */
prepare(Goal::Slice::Type slice_type,const PartIndexContainer & slice_parts,const PartsContainer & available_parts)36 void SliceReadPlanner::prepare(Goal::Slice::Type slice_type, const PartIndexContainer &slice_parts,
37 const PartsContainer &available_parts) {
38 reset(slice_type, slice_parts);
39 std::bitset<Goal::Slice::kMaxPartsCount> part_bitset;
40 for (const auto &part : available_parts) {
41 if (part.getSliceType() == slice_type_) {
42 part_bitset.set(part.getSlicePart());
43 }
44 }
45
46 required_parts_available_ =
47 std::all_of(slice_parts_.begin(), slice_parts_.end(),
48 [&part_bitset](int slice_part) {return part_bitset.test(slice_part);});
49
50 can_recover_parts_ = (int)part_bitset.count() >= slice_traits::requiredPartsToRecover(slice_type_);
51 can_read_ = required_parts_available_ || can_recover_parts_;
52
53 if (!can_read_) {
54 return;
55 }
56
57 for (const auto &part : available_parts) {
58 if (part.getSliceType() == slice_type_ && part_bitset[part.getSlicePart()]) {
59 auto it = scores_.find(part);
60 float score = it != scores_.end() ? it->second : 1;
61 weighted_parts_to_use_.push_back({score, part});
62 part_bitset.reset(part.getSlicePart());
63 }
64 }
65
66 std::stable_sort(
67 weighted_parts_to_use_.begin(), weighted_parts_to_use_.end(),
68 [](const WeightedPart &a, const WeightedPart &b) { return a.score > b.score; });
69 }
70
71 /*!
72 * Adds read operations to plan.
73 * If part was one of the requested ones, its buffer offset is determined by its order.
74 * Additional parts have consecutive offsets starting after last requested part.
75 * \param plan - plan to be amended
76 * \param first_block first block to be read
77 * \param block_count number of blocks to be read
78 * \param parts_count number of parts to be added
79 * \param wave wave number for read operation
80 * \param buffer_offset current offset for additional parts
81 * \return offset for additional parts
82 */
addParts(SliceReadPlan * plan,int first_block,int block_count,int parts_count,int wave,int buffer_offset)83 int SliceReadPlanner::addParts(SliceReadPlan *plan, int first_block, int block_count,
84 int parts_count, int wave, int buffer_offset) {
85 int ops = plan->read_operations.size();
86 int end = std::min<int>(ops + parts_count, weighted_parts_to_use_.size());
87
88 for (; ops < end; ++ops) {
89 ReadPlan::ReadOperation op{first_block * MFSBLOCKSIZE, 0, 0, wave};
90 op.request_size = MFSBLOCKSIZE * std::min(
91 (int)slice_traits::getNumberOfBlocks(weighted_parts_to_use_[ops].type) - first_block,
92 block_count);
93 int index = part_indices_[weighted_parts_to_use_[ops].type.getSlicePart()];
94 if (index < 0) {
95 op.buffer_offset = buffer_offset;
96 buffer_offset += block_count * MFSBLOCKSIZE;
97 } else {
98 op.buffer_offset = index * MFSBLOCKSIZE * block_count;
99 }
100 plan->read_operations.push_back({weighted_parts_to_use_[ops].type, op});
101 }
102
103 plan->read_buffer_size = buffer_offset;
104 return buffer_offset;
105 }
106
107 /*!
108 * Adds read operations for parts to plan with highest priority (wave 0).
109 * \param plan - plan to be amended
110 * \param first_block first block to be read
111 * \param block_count number of blocks to be read
112 * \param parts_count number of parts to be added
113 * \return buffer offset for additional parts
114 */
addBasicParts(SliceReadPlan * plan,int first_block,int block_count,int parts_count)115 int SliceReadPlanner::addBasicParts(SliceReadPlan *plan, int first_block, int block_count,
116 int parts_count) {
117 int buffer_offset = plan->requested_parts.size() * plan->buffer_part_size;
118 return addParts(plan, first_block, block_count, parts_count, 0, buffer_offset);
119 }
120
121 /*!
122 * Adds additional read operations to plan.
123 * \param plan - plan to be amended
124 * \param first_block first block to be read
125 * \param block_count number of blocks to be read
126 * \return buffer offset for additional parts
127 */
addExtraParts(SliceReadPlan * plan,int first_block,int block_count,int buffer_offset)128 int SliceReadPlanner::addExtraParts(SliceReadPlan *plan, int first_block, int block_count,
129 int buffer_offset) {
130 int ops = plan->read_operations.size();
131 size_t all_parts_count = weighted_parts_to_use_.size();
132 int to_recover = std::min<int>(
133 std::floor(bandwidth_overuse_ * slice_traits::requiredPartsToRecover(slice_type_)),
134 all_parts_count);
135 int wave = 1;
136
137 // Add parts needed to recover to wave 1
138 if (ops < to_recover) {
139 buffer_offset = addParts(plan, first_block, block_count, to_recover - ops, wave, buffer_offset);
140 wave += 1;
141 }
142
143 // Add the rest with rising waves
144 while (plan->read_operations.size() < all_parts_count) {
145 int parts_count = std::min<int>(2, all_parts_count - plan->read_operations.size());
146 buffer_offset = addParts(plan, first_block, block_count, parts_count, wave, buffer_offset);
147 wave += 1;
148 }
149
150 return buffer_offset;
151 }
152
shouldReadPartsRequiredForRecovery() const153 bool SliceReadPlanner::shouldReadPartsRequiredForRecovery() const {
154 return !required_parts_available_;
155 }
156
157 /*!
158 * Builds a read plan.
159 * First step is preparing an occurrence bitmap of requested parts.
160 * Then, if parts needed for recovery are to be read (recovery is needed or nearly all parts
161 * were requested anyway), they are queued for reading. Otherwise, requested parts
162 * are queued for first wave and additional parts are added to consecutive waves.
163 * \param first_block first block to be read
164 * \param block_count number of blocks to be read
165 * \return read plan ready to be executed
166 */
buildPlanFor(uint32_t first_block,uint32_t block_count)167 std::unique_ptr<ReadPlan> SliceReadPlanner::buildPlanFor(uint32_t first_block,
168 uint32_t block_count) {
169 std::unique_ptr<SliceReadPlan> plan = getPlan();
170 plan->buffer_part_size = block_count * MFSBLOCKSIZE;
171
172 // Count occurrences
173 std::bitset<Goal::Slice::kMaxPartsCount> part_bitset;
174 for (const auto &part : slice_parts_) {
175 assert(!part_bitset[part]);
176 part_bitset.set(part);
177 }
178
179 // Prepare indices for each requested part
180 int next_index = 0;
181 part_indices_.fill(-1);
182 for (const auto &part : slice_parts_) {
183 int size = MFSBLOCKSIZE * std::min<int>(
184 slice_traits::getNumberOfBlocks(ChunkPartType(slice_type_, part)) - first_block,
185 block_count);
186 plan->requested_parts.push_back({part, size});
187 part_indices_[part] = next_index++;
188 }
189
190 if (shouldReadPartsRequiredForRecovery()) {
191 int recovery_parts_count = std::max<int>(
192 std::floor(bandwidth_overuse_ *
193 (required_parts_available_ ?
194 slice_parts_.size() : slice_traits::requiredPartsToRecover(slice_type_))),
195 slice_traits::requiredPartsToRecover(slice_type_));
196
197 int offset = addBasicParts(plan.get(), first_block, block_count, recovery_parts_count);
198 addExtraParts(plan.get(), first_block, block_count, offset);
199 } else {
200 auto breakpoint =
201 std::stable_partition(weighted_parts_to_use_.begin(),
202 weighted_parts_to_use_.end(), [&part_bitset](const WeightedPart &part) {
203 return part_bitset[part.type.getSlicePart()];
204 });
205
206 int requested_parts_count = std::distance(weighted_parts_to_use_.begin(), breakpoint);
207 int offset = addBasicParts(plan.get(), first_block, block_count, requested_parts_count);
208 addExtraParts(plan.get(), first_block, block_count, offset);
209 }
210
211 return std::move(plan);
212 }
213