1 /*
2 Copyright 2013-2017 Skytechnology sp. z o.o.
3
4 This file is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "mount/chunk_reader.h"
21
22 #include <algorithm>
23 #include <cstring>
24
25 #include "common/exceptions.h"
26 #include "common/read_plan_executor.h"
27 #include "common/time_utils.h"
28 #include "mount/global_chunkserver_stats.h"
29
ChunkReader(ChunkConnector & connector,double bandwidth_overuse)30 ChunkReader::ChunkReader(ChunkConnector& connector, double bandwidth_overuse)
31 : connector_(connector),
32 inode_(0),
33 index_(0),
34 planner_(bandwidth_overuse),
35 chunkAlreadyRead(false) {
36 }
37
prepareReadingChunk(uint32_t inode,uint32_t index,bool force_prepare)38 void ChunkReader::prepareReadingChunk(uint32_t inode, uint32_t index, bool force_prepare) {
39 if (inode != inode_ || index != index_) {
40 // we moved to a new chunk
41 crcErrors_.clear();
42 } else if (!force_prepare) {
43 // we didn't change chunk and aren't forced to prepare again
44 return;
45 }
46 ++preparations;
47 inode_ = inode;
48 index_ = index;
49 locator_.invalidateCache(inode, index);
50 location_ = locator_.locateChunk(inode, index);
51 chunkAlreadyRead = false;
52 if (location_->isEmptyChunk()) {
53 return;
54 }
55 chunk_type_locations_.clear();
56
57 ChunkReadPlanner::ScoreContainer best_scores;
58
59 available_parts_.clear();
60 for (const ChunkTypeWithAddress& chunk_type_with_address : location_->locations) {
61 const ChunkPartType& type = chunk_type_with_address.chunk_type;
62
63 if (std::count(crcErrors_.begin(), crcErrors_.end(), chunk_type_with_address) > 0) {
64 continue;
65 }
66
67 float score = globalChunkserverStats.getStatisticsFor(chunk_type_with_address.address).score();
68 if (chunk_type_locations_.count(type) == 0) {
69 // first location of this type, choose it (for now)
70 chunk_type_locations_[type] = chunk_type_with_address;
71 best_scores[type] = score;
72 available_parts_.push_back(type);
73 } else {
74 // we already know other locations
75 if (score > best_scores[type]) {
76 // this location is better, switch to it
77 chunk_type_locations_[type] = chunk_type_with_address;
78 best_scores[type] = score;
79 }
80 }
81 }
82 planner_.setScores(std::move(best_scores));
83 }
84
readData(std::vector<uint8_t> & buffer,uint32_t offset,uint32_t size,uint32_t connectTimeout_ms,uint32_t wave_timeout_ms,const Timeout & communicationTimeout,bool prefetchXorStripes)85 uint32_t ChunkReader::readData(std::vector<uint8_t>& buffer, uint32_t offset, uint32_t size,
86 uint32_t connectTimeout_ms, uint32_t wave_timeout_ms, const Timeout& communicationTimeout,
87 bool prefetchXorStripes) {
88 if (size == 0) {
89 return 0;
90 }
91 sassert(offset + size <= MFSCHUNKSIZE);
92 uint64_t offsetInFile = static_cast<uint64_t>(index_) * MFSCHUNKSIZE + offset;
93 uint32_t availableSize = size; // requested data may lie beyond end of file
94 if (offsetInFile >= location_->fileLength) {
95 // read request entirely beyond EOF, can't read anything
96 availableSize = 0;
97 } else if (offsetInFile + availableSize > location_->fileLength) {
98 // read request partially beyond EOF, truncate request to EOF
99 availableSize = location_->fileLength - offsetInFile;
100 }
101 if (availableSize == 0) {
102 return 0;
103 }
104
105 if (location_->isEmptyChunk()) {
106 // We just have to append some zeros to the buffer
107 buffer.resize(buffer.size() + availableSize, 0);
108 } else {
109 // We have to request for availableSize rounded up to MFSBLOCKSIZE
110 uint32_t firstBlockToRead = offset / MFSBLOCKSIZE;
111 uint32_t blockToReadCount = (availableSize + MFSBLOCKSIZE - 1) / MFSBLOCKSIZE;
112
113 planner_.prepare(firstBlockToRead, blockToReadCount, available_parts_);
114 if (!planner_.isReadingPossible()) {
115 throw NoValidCopiesReadException("no valid copies");
116 }
117
118 auto plan = planner_.buildPlan();
119 if (!prefetchXorStripes || chunkAlreadyRead || size != availableSize) {
120 // Disable prefetching if:
121 // - it was disabled with a config option
122 // - all chunk parts were read before (in this case we rely on pagecache)
123 // - we're reading the end of a chunk (there is no point in prefetching anything)
124 plan->disable_prefetch = true;
125 }
126 ReadPlanExecutor executor(globalChunkserverStats, location_->chunkId, location_->version,
127 std::move(plan));
128 uint32_t initialBufferSize = buffer.size();
129 try {
130 chunkAlreadyRead = true;
131 executor.executePlan(buffer, chunk_type_locations_, connector_,
132 connectTimeout_ms, wave_timeout_ms,
133 communicationTimeout);
134 //TODO(haze): Improve scoring system so it can deal with disconnected chunkservers.
135 } catch (ChunkCrcException &err) {
136 crcErrors_.push_back(ChunkTypeWithAddress(err.server(), err.chunkType(), 0));
137 throw;
138 }
139 // Shrink the buffer. If availableSize is not divisible by MFSBLOCKSIZE
140 // we have to chop off the trailing zeros, that we've just read from a chunkserver.
141 buffer.resize(initialBufferSize + availableSize);
142 }
143 return availableSize;
144 }
145
146 std::atomic<uint64_t> ChunkReader::preparations;
147