1 /*
2    Copyright 2005-2010 Jakub Kruszona-Zawadzki, Gemius SA, 2013-2014 EditShare, 2013-2017 Skytechnology sp. z o.o..
3 
4    This file was part of MooseFS and is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "mount/readdata.h"
21 
22 #include <errno.h>
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <sys/time.h>
30 #include <time.h>
31 #include <unistd.h>
32 #include <condition_variable>
33 #include <map>
34 #include <mutex>
35 
36 #include "common/connection_pool.h"
37 #include "common/datapack.h"
38 #include "common/exceptions.h"
39 #include "common/mfserr.h"
40 #include "common/read_plan_executor.h"
41 #include "common/slogger.h"
42 #include "common/sockets.h"
43 #include "common/time_utils.h"
44 #include "mount/chunk_locator.h"
45 #include "mount/chunk_reader.h"
46 #include "mount/mastercomm.h"
47 #include "mount/readahead_adviser.h"
48 #include "mount/readdata_cache.h"
49 #include "mount/tweaks.h"
50 #include "protocol/MFSCommunication.h"
51 
52 #define USECTICK 333333
53 #define REFRESHTICKS 15
54 
55 #define MAPBITS 10
56 #define MAPSIZE (1<<(MAPBITS))
57 #define MAPMASK (MAPSIZE-1)
58 #define MAPINDX(inode) (inode&MAPMASK)
59 
60 static std::atomic<uint32_t> gReadaheadMaxWindowSize;
61 static std::atomic<uint32_t> gCacheExpirationTime_ms;
62 
63 struct readrec {
64 	ChunkReader reader;
65 	ReadCache cache;
66 	ReadaheadAdviser readahead_adviser;
67 	std::vector<uint8_t> read_buffer;
68 	uint32_t inode;
69 	uint8_t refreshCounter;         // gMutex
70 	bool expired;                   // gMutex
71 	struct readrec *next;           // gMutex
72 	struct readrec *mapnext;        // gMutex
73 
readrecreadrec74 	readrec(uint32_t inode, ChunkConnector& connector, double bandwidth_overuse)
75 			: reader(connector, bandwidth_overuse),
76 			  cache(gCacheExpirationTime_ms),
77 			  readahead_adviser(gCacheExpirationTime_ms, gReadaheadMaxWindowSize),
78 			  inode(inode),
79 			  refreshCounter(0),
80 			  expired(false),
81 			  next(nullptr),
82 			  mapnext(nullptr) {
83 	}
84 };
85 
86 static ConnectionPool gReadConnectionPool;
87 static ChunkConnectorUsingPool gChunkConnector(gReadConnectionPool);
88 static std::mutex gMutex;
89 static readrec *rdinodemap[MAPSIZE];
90 static readrec *rdhead=NULL;
91 static pthread_t delayedOpsThread;
92 static std::atomic<uint32_t> gChunkserverConnectTimeout_ms;
93 static std::atomic<uint32_t> gChunkserverWaveReadTimeout_ms;
94 static std::atomic<uint32_t> gChunkserverTotalReadTimeout_ms;
95 static std::atomic<bool> gPrefetchXorStripes;
96 static bool readDataTerminate;
97 static std::atomic<uint32_t> maxRetries;
98 static double gBandwidthOveruse;
99 
100 const unsigned ReadaheadAdviser::kInitWindowSize;
101 const unsigned ReadaheadAdviser::kDefaultWindowSizeLimit;
102 const int ReadaheadAdviser::kRandomThreshold;
103 const int ReadaheadAdviser::kHistoryEntryLifespan_ns;
104 const int ReadaheadAdviser::kHistoryCapacity;
105 const unsigned ReadaheadAdviser::kHistoryValidityThreshold;
106 
read_data_get_wave_read_timeout_ms()107 uint32_t read_data_get_wave_read_timeout_ms() {
108 	return gChunkserverWaveReadTimeout_ms;
109 }
110 
read_data_get_connect_timeout_ms()111 uint32_t read_data_get_connect_timeout_ms() {
112 	return gChunkserverConnectTimeout_ms;
113 }
114 
read_data_get_total_read_timeout_ms()115 uint32_t read_data_get_total_read_timeout_ms() {
116 	return gChunkserverTotalReadTimeout_ms;
117 }
118 
read_data_get_prefetchxorstripes()119 bool read_data_get_prefetchxorstripes() {
120 	return gPrefetchXorStripes;
121 }
122 
read_data_delayed_ops(void * arg)123 void* read_data_delayed_ops(void *arg) {
124 	readrec *rrec,**rrecp;
125 	readrec **rrecmap;
126 	(void)arg;
127 	for (;;) {
128 		gReadConnectionPool.cleanup();
129 		std::unique_lock<std::mutex> lock(gMutex);
130 		if (readDataTerminate) {
131 			return NULL;
132 		}
133 		rrecp = &rdhead;
134 		while ((rrec = *rrecp) != NULL) {
135 			if (rrec->refreshCounter < REFRESHTICKS) {
136 				rrec->refreshCounter++;
137 			}
138 			if (rrec->expired) {
139 				*rrecp = rrec->next;
140 				rrecmap = &(rdinodemap[MAPINDX(rrec->inode)]);
141 				while (*rrecmap) {
142 					if ((*rrecmap)==rrec) {
143 						*rrecmap = rrec->mapnext;
144 					} else {
145 						rrecmap = &((*rrecmap)->mapnext);
146 					}
147 				}
148 				delete rrec;
149 			} else {
150 				rrecp = &(rrec->next);
151 			}
152 		}
153 		lock.unlock();
154 		usleep(USECTICK);
155 	}
156 }
157 
read_data_new(uint32_t inode)158 void* read_data_new(uint32_t inode) {
159 	readrec *rrec = new readrec(inode, gChunkConnector, gBandwidthOveruse);
160 	std::unique_lock<std::mutex> lock(gMutex);
161 	rrec->next = rdhead;
162 	rdhead = rrec;
163 	rrec->mapnext = rdinodemap[MAPINDX(inode)];
164 	rdinodemap[MAPINDX(inode)] = rrec;
165 	return rrec;
166 }
167 
read_data_end(void * rr)168 void read_data_end(void* rr) {
169 	readrec *rrec = (readrec*)rr;
170 
171 	std::unique_lock<std::mutex> lock(gMutex);
172 	rrec->expired = true;
173 }
174 
read_data_init(uint32_t retries,uint32_t chunkserverRoundTripTime_ms,uint32_t chunkserverConnectTimeout_ms,uint32_t chunkServerWaveReadTimeout_ms,uint32_t chunkserverTotalReadTimeout_ms,uint32_t cache_expiration_time_ms,uint32_t readahead_max_window_size_kB,bool prefetchXorStripes,double bandwidth_overuse)175 void read_data_init(uint32_t retries,
176 		uint32_t chunkserverRoundTripTime_ms,
177 		uint32_t chunkserverConnectTimeout_ms,
178 		uint32_t chunkServerWaveReadTimeout_ms,
179 		uint32_t chunkserverTotalReadTimeout_ms,
180 		uint32_t cache_expiration_time_ms,
181 		uint32_t readahead_max_window_size_kB,
182 		bool prefetchXorStripes,
183 		double bandwidth_overuse) {
184 	uint32_t i;
185 	pthread_attr_t thattr;
186 
187 	readDataTerminate = false;
188 	for (i=0 ; i<MAPSIZE ; i++) {
189 		rdinodemap[i]=NULL;
190 	}
191 	maxRetries=retries;
192 	gChunkserverConnectTimeout_ms = chunkserverConnectTimeout_ms;
193 	gChunkserverWaveReadTimeout_ms = chunkServerWaveReadTimeout_ms;
194 	gChunkserverTotalReadTimeout_ms = chunkserverTotalReadTimeout_ms;
195 	gCacheExpirationTime_ms = cache_expiration_time_ms;
196 	gReadaheadMaxWindowSize = readahead_max_window_size_kB * 1024;
197 	gPrefetchXorStripes = prefetchXorStripes;
198 	gBandwidthOveruse = bandwidth_overuse;
199 	gTweaks.registerVariable("PrefetchXorStripes", gPrefetchXorStripes);
200 	gChunkConnector.setRoundTripTime(chunkserverRoundTripTime_ms);
201 	gChunkConnector.setSourceIp(fs_getsrcip());
202 	pthread_attr_init(&thattr);
203 	pthread_attr_setstacksize(&thattr,0x100000);
204 	pthread_create(&delayedOpsThread,&thattr,read_data_delayed_ops,NULL);
205 	pthread_attr_destroy(&thattr);
206 
207 	gTweaks.registerVariable("ReadMaxRetries", maxRetries);
208 	gTweaks.registerVariable("ReadConnectTimeout", gChunkserverConnectTimeout_ms);
209 	gTweaks.registerVariable("ReadWaveTimeout", gChunkserverWaveReadTimeout_ms);
210 	gTweaks.registerVariable("ReadTotalTimeout", gChunkserverTotalReadTimeout_ms);
211 	gTweaks.registerVariable("CacheExpirationTime", gCacheExpirationTime_ms);
212 	gTweaks.registerVariable("ReadaheadMaxWindowSize", gReadaheadMaxWindowSize);
213 	gTweaks.registerVariable("ReadChunkPrepare", ChunkReader::preparations);
214 	gTweaks.registerVariable("ReqExecutedTotal", ReadPlanExecutor::executions_total_);
215 	gTweaks.registerVariable("ReqExecutedUsingAll", ReadPlanExecutor::executions_with_additional_operations_);
216 	gTweaks.registerVariable("ReqFinishedUsingAll", ReadPlanExecutor::executions_finished_by_additional_operations_);
217 }
218 
read_data_term(void)219 void read_data_term(void) {
220 	readrec *rr,*rrn;
221 
222 	{
223 		std::unique_lock<std::mutex> lock(gMutex);
224 		readDataTerminate = true;
225 	}
226 
227 	pthread_join(delayedOpsThread,NULL);
228 	for (rr = rdhead ; rr ; rr = rrn) {
229 		rrn = rr->next;
230 		delete rr;
231 	}
232 	for (auto& rr : rdinodemap) {
233 		rr = NULL;
234 	}
235 	rdhead = NULL;
236 }
237 
read_inode_ops(uint32_t inode)238 void read_inode_ops(uint32_t inode) { // attributes of inode have been changed - force reconnect and clear cache
239 	readrec *rrec;
240 	std::unique_lock<std::mutex> lock(gMutex);
241 	for (rrec = rdinodemap[MAPINDX(inode)] ; rrec ; rrec=rrec->mapnext) {
242 		if (rrec->inode == inode) {
243 			rrec->refreshCounter = REFRESHTICKS; // force reconnect on forthcoming access
244 		}
245 	}
246 }
247 
read_data_sleep_time_ms(int tryCounter)248 int read_data_sleep_time_ms(int tryCounter) {
249 	if (tryCounter <= 13) {            // 2^13 = 8192
250 		return (1 << tryCounter);  // 2^tryCounter milliseconds
251 	} else {
252 		return 1000 * 10;          // 10 seconds
253 	}
254 }
255 
print_error_msg(const readrec * rrec,uint32_t try_counter,const Exception & ex)256 static void print_error_msg(const readrec *rrec, uint32_t try_counter, const Exception &ex) {
257 	if (rrec->reader.isChunkLocated()) {
258 		lzfs_pretty_syslog(LOG_WARNING,
259 		                   "read file error, inode: %u, index: %u, chunk: %lu, version: %u - %s "
260 		                   "(try counter: %u)", rrec->reader.inode(), rrec->reader.index(),
261 		                   rrec->reader.chunkId(), rrec->reader.version(), ex.what(), try_counter);
262 	} else {
263 		lzfs_pretty_syslog(LOG_WARNING,
264 		                   "read file error, inode: %u, index: %u, chunk: failed to locate - %s "
265 		                   "(try counter: %u)", rrec->reader.inode(), rrec->reader.index(),
266 		                   ex.what(), try_counter);
267 	}
268 }
269 
read_to_buffer(readrec * rrec,uint64_t current_offset,uint64_t bytes_to_read,std::vector<uint8_t> & read_buffer,uint64_t * bytes_read)270 static int read_to_buffer(readrec *rrec, uint64_t current_offset, uint64_t bytes_to_read,
271 		std::vector<uint8_t> &read_buffer, uint64_t *bytes_read) {
272 	uint32_t try_counter = 0;
273 	uint32_t prepared_inode = 0; // this is always different than any real inode
274 	uint32_t prepared_chunk_id = 0;
275 	assert(*bytes_read == 0);
276 
277 	// forced sleep between retries caused by recoverable failures
278 	uint32_t sleep_time_ms = 0;
279 
280 	std::unique_lock<std::mutex> lock(gMutex);
281 	bool force_prepare = (rrec->refreshCounter == REFRESHTICKS);
282 	lock.unlock();
283 
284 	while (bytes_to_read > 0) {
285 		Timeout sleep_timeout = Timeout(std::chrono::milliseconds(sleep_time_ms));
286 		// Increase communicationTimeout to sleepTime; longer poll() can't be worse
287 		// than short poll() followed by nonproductive usleep().
288 		uint32_t timeout_ms = std::max(gChunkserverTotalReadTimeout_ms.load(), sleep_time_ms);
289 		Timeout communication_timeout = Timeout(std::chrono::milliseconds(timeout_ms));
290 		sleep_time_ms = 0;
291 		try {
292 			uint32_t chunk_id = current_offset / MFSCHUNKSIZE;
293 			if (force_prepare || prepared_inode != rrec->inode || prepared_chunk_id != chunk_id) {
294 				rrec->reader.prepareReadingChunk(rrec->inode, chunk_id, force_prepare);
295 				prepared_chunk_id = chunk_id;
296 				prepared_inode = rrec->inode;
297 				force_prepare = false;
298 				lock.lock();
299 				rrec->refreshCounter = 0;
300 				lock.unlock();
301 			}
302 
303 			uint64_t offset_of_chunk = static_cast<uint64_t>(chunk_id) * MFSCHUNKSIZE;
304 			uint32_t offset_in_chunk = current_offset - offset_of_chunk;
305 			uint32_t size_in_chunk = MFSCHUNKSIZE - offset_in_chunk;
306 			if (size_in_chunk > bytes_to_read) {
307 				size_in_chunk = bytes_to_read;
308 			}
309 			uint32_t bytes_read_from_chunk = rrec->reader.readData(
310 					read_buffer, offset_in_chunk, size_in_chunk,
311 					gChunkserverConnectTimeout_ms, gChunkserverWaveReadTimeout_ms,
312 					communication_timeout, gPrefetchXorStripes);
313 			// No exceptions thrown. We can increase the counters and go to the next chunk
314 			*bytes_read += bytes_read_from_chunk;
315 			current_offset += bytes_read_from_chunk;
316 			bytes_to_read -= bytes_read_from_chunk;
317 			if (bytes_read_from_chunk < size_in_chunk) {
318 				// end of file
319 				break;
320 			}
321 			try_counter = 0;
322 		} catch (UnrecoverableReadException &ex) {
323 			print_error_msg(rrec, try_counter, ex);
324 			if (ex.status() == LIZARDFS_ERROR_ENOENT) {
325 				return LIZARDFS_ERROR_EBADF; // stale handle
326 			} else {
327 				return LIZARDFS_ERROR_IO;
328 			}
329 		} catch (Exception &ex) {
330 			if (try_counter > 0) {
331 				print_error_msg(rrec, try_counter, ex);
332 			}
333 			force_prepare = true;
334 			if (try_counter > maxRetries) {
335 				return LIZARDFS_ERROR_IO;
336 			} else {
337 				usleep(sleep_timeout.remaining_us());
338 				sleep_time_ms = read_data_sleep_time_ms(try_counter);
339 			}
340 			try_counter++;
341 		}
342 	}
343 	return LIZARDFS_STATUS_OK;
344 }
345 
read_data(void * rr,uint64_t offset,uint32_t size,ReadCache::Result & ret)346 int read_data(void *rr, uint64_t offset, uint32_t size, ReadCache::Result &ret) {
347 	readrec *rrec = (readrec*)rr;
348 	assert(size % MFSBLOCKSIZE == 0);
349 	assert(offset % MFSBLOCKSIZE == 0);
350 
351 	if (size == 0) {
352 		return LIZARDFS_STATUS_OK;
353 	}
354 
355 	rrec->readahead_adviser.feed(offset, size);
356 
357 	ReadCache::Result result = rrec->cache.query(offset, size);
358 
359 	if (result.frontOffset() <= offset && offset + size <= result.endOffset()) {
360 		ret = std::move(result);
361 		return LIZARDFS_STATUS_OK;
362 	}
363 	uint64_t request_offset = result.remainingOffset();
364 	uint64_t bytes_to_read_left = std::max<uint64_t>(size, rrec->readahead_adviser.window()) - (request_offset - offset);
365 	bytes_to_read_left = (bytes_to_read_left + MFSBLOCKSIZE - 1) / MFSBLOCKSIZE * MFSBLOCKSIZE;
366 
367 	uint64_t bytes_read = 0;
368 	int err = read_to_buffer(rrec, request_offset, bytes_to_read_left, result.inputBuffer(), &bytes_read);
369 	if (err) {
370 		// paranoia check - discard any leftover bytes from incorrect read
371 		result.inputBuffer().clear();
372 		return err;
373 	}
374 
375 	ret = std::move(result);
376 	return LIZARDFS_STATUS_OK;
377 }
378