1 /*
2    Copyright 2013-2017 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "mount/chunk_reader.h"
21 
22 #include <algorithm>
23 #include <cstring>
24 
25 #include "common/exceptions.h"
26 #include "common/read_plan_executor.h"
27 #include "common/time_utils.h"
28 #include "mount/global_chunkserver_stats.h"
29 
ChunkReader(ChunkConnector & connector,double bandwidth_overuse)30 ChunkReader::ChunkReader(ChunkConnector& connector, double bandwidth_overuse)
31 		: connector_(connector),
32 		  inode_(0),
33 		  index_(0),
34 		  planner_(bandwidth_overuse),
35 		  chunkAlreadyRead(false) {
36 }
37 
prepareReadingChunk(uint32_t inode,uint32_t index,bool force_prepare)38 void ChunkReader::prepareReadingChunk(uint32_t inode, uint32_t index, bool force_prepare) {
39 	if (inode != inode_ || index != index_) {
40 		// we moved to a new chunk
41 		crcErrors_.clear();
42 	} else if (!force_prepare) {
43 		// we didn't change chunk and aren't forced to prepare again
44 		return;
45 	}
46 	++preparations;
47 	inode_ = inode;
48 	index_ = index;
49 	locator_.invalidateCache(inode, index);
50 	location_ = locator_.locateChunk(inode, index);
51 	chunkAlreadyRead = false;
52 	if (location_->isEmptyChunk()) {
53 		return;
54 	}
55 	chunk_type_locations_.clear();
56 
57 	ChunkReadPlanner::ScoreContainer best_scores;
58 
59 	available_parts_.clear();
60 	for (const ChunkTypeWithAddress& chunk_type_with_address : location_->locations) {
61 		const ChunkPartType& type = chunk_type_with_address.chunk_type;
62 
63 		if (std::count(crcErrors_.begin(), crcErrors_.end(), chunk_type_with_address) > 0) {
64 			continue;
65 		}
66 
67 		float score = globalChunkserverStats.getStatisticsFor(chunk_type_with_address.address).score();
68 		if (chunk_type_locations_.count(type) == 0) {
69 			// first location of this type, choose it (for now)
70 			chunk_type_locations_[type] = chunk_type_with_address;
71 			best_scores[type] = score;
72 			available_parts_.push_back(type);
73 		} else {
74 			// we already know other locations
75 			if (score > best_scores[type]) {
76 				// this location is better, switch to it
77 				chunk_type_locations_[type] = chunk_type_with_address;
78 				best_scores[type] = score;
79 			}
80 		}
81 	}
82 	planner_.setScores(std::move(best_scores));
83 }
84 
readData(std::vector<uint8_t> & buffer,uint32_t offset,uint32_t size,uint32_t connectTimeout_ms,uint32_t wave_timeout_ms,const Timeout & communicationTimeout,bool prefetchXorStripes)85 uint32_t ChunkReader::readData(std::vector<uint8_t>& buffer, uint32_t offset, uint32_t size,
86 		uint32_t connectTimeout_ms, uint32_t wave_timeout_ms, const Timeout& communicationTimeout,
87 		bool prefetchXorStripes) {
88 	if (size == 0) {
89 		return 0;
90 	}
91 	sassert(offset + size <= MFSCHUNKSIZE);
92 	uint64_t offsetInFile = static_cast<uint64_t>(index_) * MFSCHUNKSIZE + offset;
93 	uint32_t availableSize = size;  // requested data may lie beyond end of file
94 	if (offsetInFile >= location_->fileLength) {
95 		// read request entirely beyond EOF, can't read anything
96 		availableSize = 0;
97 	} else if (offsetInFile + availableSize > location_->fileLength) {
98 		// read request partially beyond EOF, truncate request to EOF
99 		availableSize = location_->fileLength - offsetInFile;
100 	}
101 	if (availableSize == 0) {
102 		return 0;
103 	}
104 
105 	if (location_->isEmptyChunk()) {
106 		// We just have to append some zeros to the buffer
107 		buffer.resize(buffer.size() + availableSize, 0);
108 	} else {
109 		// We have to request for availableSize rounded up to MFSBLOCKSIZE
110 		uint32_t firstBlockToRead = offset / MFSBLOCKSIZE;
111 		uint32_t blockToReadCount = (availableSize + MFSBLOCKSIZE - 1) / MFSBLOCKSIZE;
112 
113 		planner_.prepare(firstBlockToRead, blockToReadCount, available_parts_);
114 		if (!planner_.isReadingPossible()) {
115 			throw NoValidCopiesReadException("no valid copies");
116 		}
117 
118 		auto plan = planner_.buildPlan();
119 		if (!prefetchXorStripes || chunkAlreadyRead || size != availableSize) {
120 			// Disable prefetching if:
121 			// - it was disabled with a config option
122 			// - all chunk parts were read before (in this case we rely on pagecache)
123 			// - we're reading the end of a chunk (there is no point in prefetching anything)
124 			plan->disable_prefetch = true;
125 		}
126 		ReadPlanExecutor executor(globalChunkserverStats, location_->chunkId, location_->version,
127 				std::move(plan));
128 		uint32_t initialBufferSize = buffer.size();
129 		try {
130 			chunkAlreadyRead = true;
131 			executor.executePlan(buffer, chunk_type_locations_, connector_,
132 					connectTimeout_ms, wave_timeout_ms,
133 					communicationTimeout);
134 			//TODO(haze): Improve scoring system so it can deal with disconnected chunkservers.
135 		} catch (ChunkCrcException &err) {
136 			crcErrors_.push_back(ChunkTypeWithAddress(err.server(), err.chunkType(), 0));
137 			throw;
138 		}
139 		// Shrink the buffer. If availableSize is not divisible by MFSBLOCKSIZE
140 		// we have to chop off the trailing zeros, that we've just read from a chunkserver.
141 		buffer.resize(initialBufferSize + availableSize);
142 	}
143 	return availableSize;
144 }
145 
146 std::atomic<uint64_t> ChunkReader::preparations;
147